From f63960af0a89eae21de18540e9ec4ba9be1bbe3d Mon Sep 17 00:00:00 2001 From: Brian Clozel Date: Mon, 1 Feb 2016 18:44:37 +0100 Subject: [PATCH] Add client Request/Response impl. for RxNetty This commit adds the `ClientHttpRequest` and `ClientHttpResponse` implementations for the RxNetty HTTP client. This client library is based on the `Single` and `Observable` composition API, so this has to be converted to the `Flux`/`Mono` variants. --- .../reactive/RxNettyClientHttpRequest.java | 124 ++++++++++++++++++ .../reactive/RxNettyClientHttpResponse.java | 67 ++++++++++ .../RxNettyHttpClientRequestFactory.java | 47 +++++++ 3 files changed, 238 insertions(+) create mode 100644 spring-web-reactive/src/main/java/org/springframework/http/client/reactive/RxNettyClientHttpRequest.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/http/client/reactive/RxNettyClientHttpResponse.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/http/client/reactive/RxNettyHttpClientRequestFactory.java diff --git a/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/RxNettyClientHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/RxNettyClientHttpRequest.java new file mode 100644 index 00000000000..74c894e7fa8 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/RxNettyClientHttpRequest.java @@ -0,0 +1,124 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.client.reactive; + +import java.net.URI; +import java.util.List; +import java.util.Map; + +import io.netty.buffer.ByteBuf; +import io.reactivex.netty.protocol.http.client.HttpClient; +import io.reactivex.netty.protocol.http.client.HttpClientRequest; +import org.reactivestreams.Publisher; +import reactor.core.converter.RxJava1ObservableConverter; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import rx.Observable; + +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.NettyDataBufferAllocator; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; + +/** + * {@link ClientHttpRequest} implementation for the RxNetty HTTP client + * + * @author Brian Clozel + */ +public class RxNettyClientHttpRequest extends AbstractClientHttpRequest { + + private final NettyDataBufferAllocator allocator; + + private final HttpMethod httpMethod; + + private final URI uri; + + private Observable body; + + + public RxNettyClientHttpRequest(HttpMethod httpMethod, URI uri, HttpHeaders headers, NettyDataBufferAllocator allocator) { + super(headers); + this.httpMethod = httpMethod; + this.uri = uri; + this.allocator = allocator; + } + + /** + * Set the body of the message to the given {@link Publisher}. + * + *

Since the HTTP channel is not yet created when this method + * is called, the {@code Mono} return value completes immediately. + * For an event that signals that we're done writing the request, check the + * {@link #execute()} method. + * + * @return a publisher that completes immediately. + * @see #execute() + */ + @Override + public Mono setBody(Publisher body) { + + this.body = RxJava1ObservableConverter.from(Flux.from(body) + .map(b -> allocator.wrap(b.asByteBuffer()).getNativeBuffer())); + + return Mono.empty(); + } + + @Override + public HttpMethod getMethod() { + return this.httpMethod; + } + + @Override + public URI getURI() { + return this.uri; + } + + @Override + public Mono execute() { + try { + HttpClientRequest request = HttpClient + .newClient(this.uri.getHost(), this.uri.getPort()) + .createRequest(io.netty.handler.codec.http.HttpMethod.valueOf(this.httpMethod.name()), uri.getRawPath()); + + return applyBeforeCommit() + .after(() -> Mono.just(request)) + .map(req -> { + for (Map.Entry> entry : getHeaders().entrySet()) { + for (String value : entry.getValue()) { + req = req.addHeader(entry.getKey(), value); + } + } + return req; + }) + .map(req -> { + if (this.body != null) { + return RxJava1ObservableConverter.from(req.writeContent(this.body)); + } + else { + return RxJava1ObservableConverter.from(req); + } + }) + .flatMap(resp -> resp) + .next() + .map(response -> new RxNettyClientHttpResponse(response, this.allocator)); + } + catch (IllegalArgumentException exc) { + return Mono.error(exc); + } + } + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/RxNettyClientHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/RxNettyClientHttpResponse.java new file mode 100644 index 00000000000..a6a2efe3b2f --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/RxNettyClientHttpResponse.java @@ -0,0 +1,67 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.client.reactive; + +import io.netty.buffer.ByteBuf; +import io.reactivex.netty.protocol.http.client.HttpClientResponse; +import reactor.core.converter.RxJava1ObservableConverter; +import reactor.core.publisher.Flux; + +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.NettyDataBufferAllocator; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.util.Assert; + +/** + * {@link ClientHttpResponse} implementation for the RxNetty HTTP client + * + * @author Brian Clozel + */ +public class RxNettyClientHttpResponse implements ClientHttpResponse { + + private final HttpClientResponse response; + + private final HttpHeaders headers; + + private final NettyDataBufferAllocator allocator; + + public RxNettyClientHttpResponse(HttpClientResponse response, + NettyDataBufferAllocator allocator) { + Assert.notNull("'request', request must not be null"); + Assert.notNull(allocator, "'allocator' must not be null"); + this.allocator = allocator; + this.response = response; + this.headers = new HttpHeaders(); + this.response.headerIterator().forEachRemaining(e -> this.headers.set(e.getKey(), e.getValue())); + } + + @Override + public HttpStatus getStatusCode() { + return HttpStatus.valueOf(this.response.getStatus().code()); + } + + @Override + public Flux getBody() { + return RxJava1ObservableConverter.from(this.response.getContent().map(allocator::wrap)); + } + + @Override + public HttpHeaders getHeaders() { + return this.headers; + } +} diff --git a/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/RxNettyHttpClientRequestFactory.java b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/RxNettyHttpClientRequestFactory.java new file mode 100644 index 00000000000..f96e97a0c2f --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/RxNettyHttpClientRequestFactory.java @@ -0,0 +1,47 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.client.reactive; + +import java.net.URI; + +import org.springframework.core.io.buffer.NettyDataBufferAllocator; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.util.Assert; + +/** + * Create a {@link ClientHttpRequestFactory} for the RxNetty HTTP client + * + * @author Brian Clozel + */ +public class RxNettyHttpClientRequestFactory implements ClientHttpRequestFactory { + + private final NettyDataBufferAllocator allocator; + + public RxNettyHttpClientRequestFactory(NettyDataBufferAllocator allocator) { + this.allocator = allocator; + } + + @Override + public ClientHttpRequest createRequest(HttpMethod httpMethod, URI uri, HttpHeaders headers) { + Assert.notNull(httpMethod, "HTTP method is required"); + Assert.notNull(uri, "request URI is required"); + Assert.notNull(headers, "request headers are required"); + + return new RxNettyClientHttpRequest(httpMethod, uri, headers, this.allocator); + } +}