diff --git a/spring-web-reactive/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java b/spring-web-reactive/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java
index cda8fd58eda..1c6a4af0510 100644
--- a/spring-web-reactive/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java
+++ b/spring-web-reactive/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java
@@ -62,4 +62,15 @@ public interface ReactiveHttpOutputMessage extends HttpMessage {
*/
DataBufferFactory bufferFactory();
+ /**
+ * Indicate that message handling is complete, allowing for any cleanup or
+ * end-of-processing tasks to be performed such as applying header changes
+ * made via {@link #getHeaders()} to the underlying HTTP message (if not
+ * applied already).
+ *
This method should be automatically invoked at the end of message
+ * processing so typically applications should not have to invoke it.
+ * If invoked multiple times it should have no side effects.
+ */
+ Mono setComplete();
+
}
diff --git a/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/AbstractClientHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/AbstractClientHttpRequest.java
index 6ecb0caea86..e448cb149f3 100644
--- a/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/AbstractClientHttpRequest.java
+++ b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/AbstractClientHttpRequest.java
@@ -46,13 +46,13 @@ public abstract class AbstractClientHttpRequest implements ClientHttpRequest {
private final List>> beforeCommitActions = new ArrayList<>(4);
- public AbstractClientHttpRequest(HttpHeaders httpHeaders) {
- if (httpHeaders == null) {
- this.headers = new HttpHeaders();
- }
- else {
- this.headers = httpHeaders;
- }
+ public AbstractClientHttpRequest() {
+ this(new HttpHeaders());
+ }
+
+ public AbstractClientHttpRequest(HttpHeaders headers) {
+ Assert.notNull(headers);
+ this.headers = headers;
this.cookies = new LinkedMultiValueMap<>();
}
@@ -85,8 +85,8 @@ public abstract class AbstractClientHttpRequest implements ClientHttpRequest {
})
.then(() -> {
this.state.set(State.COMITTED);
- //writeHeaders();
- //writeCookies();
+ writeHeaders();
+ writeCookies();
return Mono.empty();
});
}
@@ -99,5 +99,9 @@ public abstract class AbstractClientHttpRequest implements ClientHttpRequest {
this.beforeCommitActions.add(action);
}
+ protected abstract void writeHeaders();
+
+ protected abstract void writeCookies();
+
private enum State {NEW, COMMITTING, COMITTED}
-}
+}
\ No newline at end of file
diff --git a/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ClientHttpConnector.java b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ClientHttpConnector.java
new file mode 100644
index 00000000000..bea20e29e0c
--- /dev/null
+++ b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ClientHttpConnector.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2002-2016 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.http.client.reactive;
+
+import java.net.URI;
+import java.util.function.Function;
+
+import reactor.core.publisher.Mono;
+
+import org.springframework.http.HttpMethod;
+
+/**
+ * Client abstraction for HTTP client runtimes.
+ * {@link ClientHttpConnector} drives the underlying HTTP client implementation
+ * so as to connect to the origin server and provide all the necessary infrastructure
+ * to send the actual {@link ClientHttpRequest} and receive the {@link ClientHttpResponse}
+ *
+ * @author Brian Clozel
+ */
+public interface ClientHttpConnector {
+
+ /**
+ * Connect to the origin server using the given {@code HttpMethod} and {@code URI},
+ * then apply the given {@code requestCallback} on the {@link ClientHttpRequest}
+ * once the connection has been established.
+ * Return a publisher of the {@link ClientHttpResponse}.
+ *
+ * @param method the HTTP request method
+ * @param uri the HTTP request URI
+ * @param requestCallback a function that prepares and writes the request,
+ * returning a publisher that signals when it's done interacting with the request.
+ * Implementations should return a {@code Mono} by calling
+ * {@link ClientHttpRequest#writeWith} or {@link ClientHttpRequest#setComplete}.
+ * @return a publisher of the {@link ClientHttpResponse}
+ */
+ Mono connect(HttpMethod method, URI uri,
+ Function super ClientHttpRequest, Mono> requestCallback);
+
+}
\ No newline at end of file
diff --git a/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ClientHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ClientHttpRequest.java
index 3e7098a0434..9819a372e37 100644
--- a/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ClientHttpRequest.java
+++ b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ClientHttpRequest.java
@@ -18,8 +18,6 @@ package org.springframework.http.client.reactive;
import java.net.URI;
-import reactor.core.publisher.Mono;
-
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpMethod;
import org.springframework.http.ReactiveHttpOutputMessage;
@@ -48,14 +46,4 @@ public interface ClientHttpRequest extends ReactiveHttpOutputMessage {
*/
MultiValueMap getCookies();
- /**
- * Execute this request, resulting in a reactive stream of a single
- * {@link org.springframework.http.client.ClientHttpResponse}.
- *
- * @return a {@code Mono} that signals when the the response
- * status and headers have been received. The response body is made available with
- * a separate Publisher within the {@code ClientHttpResponse}.
- */
- Mono execute();
-
-}
+}
\ No newline at end of file
diff --git a/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ClientHttpRequestFactory.java b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java
similarity index 52%
rename from spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ClientHttpRequestFactory.java
rename to spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java
index ce8b5b7e17f..a5f38eb162e 100644
--- a/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ClientHttpRequestFactory.java
+++ b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java
@@ -13,31 +13,31 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.springframework.http.client.reactive;
import java.net.URI;
+import java.util.function.Function;
-import org.reactivestreams.Publisher;
-
-import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
+import reactor.core.publisher.Mono;
+
/**
- * Factory for {@link ClientHttpRequest} objects.
+ * Reactor-Netty implementation of {@link ClientHttpConnector}
*
* @author Brian Clozel
*/
-public interface ClientHttpRequestFactory {
+public class ReactorClientHttpConnector implements ClientHttpConnector {
- /**
- * Create a new {@link ClientHttpRequest} for the specified HTTP method, URI and headers
- * The returned request can be {@link ClientHttpRequest#writeWith(Publisher) written to},
- * and then executed by calling {@link ClientHttpRequest#execute()}
- *
- * @param httpMethod the HTTP method to execute
- * @param uri the URI to create a request for
- * @param headers the HTTP request headers
- */
- ClientHttpRequest createRequest(HttpMethod httpMethod, URI uri, HttpHeaders headers);
+ @Override
+ public Mono connect(HttpMethod method, URI uri,
+ Function super ClientHttpRequest, Mono> requestCallback) {
-}
+ return reactor.io.netty.http.HttpClient.create(uri.getHost(), uri.getPort())
+ .request(io.netty.handler.codec.http.HttpMethod.valueOf(method.name()),
+ uri.toString(),
+ httpOutbound -> requestCallback.apply(new ReactorClientHttpRequest(method, uri, httpOutbound)))
+ .map(httpInbound -> new ReactorClientHttpResponse(httpInbound));
+ }
+}
\ No newline at end of file
diff --git a/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java
index 4bec8778a1b..f9573f7ea4b 100644
--- a/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java
+++ b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java
@@ -17,7 +17,6 @@
package org.springframework.http.client.reactive;
import java.net.URI;
-import java.util.Collection;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@@ -26,45 +25,40 @@ import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.io.netty.http.HttpClient;
+import reactor.io.netty.http.HttpClientRequest;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
-import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.core.io.buffer.NettyDataBuffer;
-import org.springframework.http.HttpHeaders;
+import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpMethod;
/**
- * {@link ClientHttpRequest} implementation for the Reactor Net HTTP client
+ * {@link ClientHttpRequest} implementation for the Reactor-Netty HTTP client
*
* @author Brian Clozel
- * @see HttpClient
+ * @see reactor.io.netty.http.HttpClient
*/
public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
- private final DataBufferFactory dataBufferFactory;
-
private final HttpMethod httpMethod;
private final URI uri;
- private final HttpClient httpClient;
-
- private Flux body;
+ private final HttpClientRequest httpRequest;
+ private final NettyDataBufferFactory bufferFactory;
- public ReactorClientHttpRequest(HttpMethod httpMethod, URI uri, HttpClient httpClient, HttpHeaders headers) {
- super(headers);
- //FIXME use Netty factory
- this.dataBufferFactory = new DefaultDataBufferFactory();
+ public ReactorClientHttpRequest(HttpMethod httpMethod, URI uri, HttpClientRequest httpRequest) {
this.httpMethod = httpMethod;
this.uri = uri;
- this.httpClient = httpClient;
+ this.httpRequest = httpRequest;
+ this.bufferFactory = new NettyDataBufferFactory(httpRequest.delegate().alloc());
}
@Override
public DataBufferFactory bufferFactory() {
- return this.dataBufferFactory;
+ return this.bufferFactory;
}
@Override
@@ -77,51 +71,15 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
return this.uri;
}
- /**
- * Set the body of the message to the given {@link Publisher}.
- *
- * Since the HTTP channel is not yet created when this method
- * is called, the {@code Mono} return value completes immediately.
- * For an event that signals that we're done writing the request, check the
- * {@link #execute()} method.
- *
- * @return a publisher that completes immediately.
- * @see #execute()
- */
@Override
public Mono writeWith(Publisher body) {
-
- this.body = Flux.from(body).map(this::toByteBuf);
- return Mono.empty();
+ return applyBeforeCommit()
+ .then(httpRequest.send(Flux.from(body).map(this::toByteBuf)));
}
@Override
- public Mono execute() {
-
- return this.httpClient.request(new io.netty.handler.codec.http.HttpMethod(httpMethod.toString()), uri.toString(),
- channel -> {
- // see https://github.com/reactor/reactor-io/pull/8
- if (body == null) {
- channel.removeTransferEncodingChunked();
- }
- return applyBeforeCommit()
- .then(() -> {
- getHeaders().entrySet().stream().forEach(e ->
- channel.headers().set(e.getKey(), e.getValue()));
- getCookies().values().stream().flatMap(Collection::stream).forEach(cookie ->
- channel.addCookie(new DefaultCookie(cookie.getName(), cookie.getValue())));
- return Mono.empty();
- })
- .then(() -> {
- if (body != null) {
- return channel.send(body);
- }
- else {
- return channel.sendHeaders();
- }
- });
- }).map(httpChannel -> new ReactorClientHttpResponse(httpChannel,
- dataBufferFactory));
+ public Mono setComplete() {
+ return applyBeforeCommit().then(httpRequest.sendHeaders());
}
private ByteBuf toByteBuf(DataBuffer buffer) {
@@ -133,5 +91,18 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
}
}
-}
+ @Override
+ protected void writeHeaders() {
+ getHeaders().entrySet().stream()
+ .forEach(e -> this.httpRequest.headers().set(e.getKey(), e.getValue()));
+ }
+
+ @Override
+ protected void writeCookies() {
+ getCookies().values()
+ .stream().flatMap(cookies -> cookies.stream())
+ .map(cookie -> new DefaultCookie(cookie.getName(), cookie.getValue()))
+ .forEach(cookie -> this.httpRequest.addCookie(cookie));
+ }
+}
\ No newline at end of file
diff --git a/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java
index 0a6e4f6d735..049f20fd637 100644
--- a/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java
+++ b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java
@@ -17,12 +17,15 @@
package org.springframework.http.client.reactive;
import java.util.Collection;
+import java.util.function.Function;
+import io.netty.buffer.ByteBuf;
import reactor.core.publisher.Flux;
-import reactor.io.netty.http.HttpInbound;
+import reactor.io.netty.http.HttpClientResponse;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
+import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseCookie;
@@ -31,44 +34,47 @@ import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
/**
- * {@link ClientHttpResponse} implementation for the Reactor Net HTTP client
+ * {@link ClientHttpResponse} implementation for the Reactor-Netty HTTP client
*
* @author Brian Clozel
* @see reactor.io.netty.http.HttpClient
*/
public class ReactorClientHttpResponse implements ClientHttpResponse {
- private final DataBufferFactory dataBufferFactory;
+ private final NettyDataBufferFactory dataBufferFactory;
- private final HttpInbound channel;
+ private final HttpClientResponse response;
- public ReactorClientHttpResponse(HttpInbound channel,
- DataBufferFactory dataBufferFactory) {
- this.dataBufferFactory = dataBufferFactory;
- this.channel = channel;
+ public ReactorClientHttpResponse(HttpClientResponse response) {
+ this.response = response;
+ this.dataBufferFactory = new NettyDataBufferFactory(response.delegate().alloc());
}
@Override
public Flux getBody() {
- return channel.receiveByteBuffer().map(dataBufferFactory::wrap);
+ return response.receive()
+ .map(buf -> {
+ buf.retain();
+ return dataBufferFactory.wrap(buf);
+ });
}
@Override
public HttpHeaders getHeaders() {
HttpHeaders headers = new HttpHeaders();
- this.channel.responseHeaders().entries().stream().forEach(e -> headers.add(e.getKey(), e.getValue()));
+ this.response.responseHeaders().entries().stream().forEach(e -> headers.add(e.getKey(), e.getValue()));
return headers;
}
@Override
public HttpStatus getStatusCode() {
- return HttpStatus.valueOf(this.channel.status().code());
+ return HttpStatus.valueOf(this.response.status().code());
}
@Override
public MultiValueMap getCookies() {
MultiValueMap result = new LinkedMultiValueMap<>();
- this.channel.cookies().values().stream().flatMap(Collection::stream)
+ this.response.cookies().values().stream().flatMap(Collection::stream)
.forEach(cookie -> {
ResponseCookie responseCookie = ResponseCookie.from(cookie.name(), cookie.value())
.domain(cookie.domain())
@@ -85,8 +91,8 @@ public class ReactorClientHttpResponse implements ClientHttpResponse {
@Override
public String toString() {
return "ReactorClientHttpResponse{" +
- "request=" + this.channel.method().name() + " " + this.channel.uri() + "," +
+ "request=" + this.response.method().name() + " " + this.response.uri() + "," +
"status=" + getStatusCode() +
'}';
}
-}
+}
\ No newline at end of file
diff --git a/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorHttpClientRequestFactory.java b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorHttpClientRequestFactory.java
deleted file mode 100644
index 70becdd9993..00000000000
--- a/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/ReactorHttpClientRequestFactory.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright 2002-2016 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.springframework.http.client.reactive;
-
-import java.net.URI;
-
-import reactor.io.netty.http.HttpClient;
-
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpMethod;
-import org.springframework.util.Assert;
-
-/**
- * Create a {@link ClientHttpRequest} for the Reactor Net HTTP client
- *
- * @author Brian Clozel
- */
-public class ReactorHttpClientRequestFactory implements ClientHttpRequestFactory {
-
- private final HttpClient httpClient;
-
- public ReactorHttpClientRequestFactory() {
- this(reactor.io.netty.http.HttpClient.create());
- }
-
- protected ReactorHttpClientRequestFactory(HttpClient httpClient) {
- this.httpClient = httpClient;
- }
-
- @Override
- public ClientHttpRequest createRequest(HttpMethod httpMethod, URI uri, HttpHeaders headers) {
- Assert.notNull(httpMethod, "HTTP method is required");
- Assert.notNull(uri, "request URI is required");
- Assert.notNull(headers, "request headers are required");
-
- return new ReactorClientHttpRequest(httpMethod, uri, this.httpClient, headers);
- }
-
-}
diff --git a/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/RxNettyClientHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/RxNettyClientHttpRequest.java
deleted file mode 100644
index 8d3dbcd8ace..00000000000
--- a/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/RxNettyClientHttpRequest.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Copyright 2002-2016 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.springframework.http.client.reactive;
-
-import java.net.URI;
-import java.util.List;
-import java.util.Map;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.handler.codec.http.cookie.DefaultCookie;
-import io.reactivex.netty.protocol.http.client.HttpClient;
-import io.reactivex.netty.protocol.http.client.HttpClientRequest;
-import org.reactivestreams.Publisher;
-import reactor.core.converter.RxJava1ObservableConverter;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import rx.Observable;
-
-import org.springframework.core.io.buffer.DataBuffer;
-import org.springframework.core.io.buffer.DataBufferFactory;
-import org.springframework.core.io.buffer.NettyDataBufferFactory;
-import org.springframework.http.HttpCookie;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpMethod;
-
-/**
- * {@link ClientHttpRequest} implementation for the RxNetty HTTP client
- *
- * @author Brian Clozel
- */
-public class RxNettyClientHttpRequest extends AbstractClientHttpRequest {
-
- private final NettyDataBufferFactory dataBufferFactory;
-
- private final HttpMethod httpMethod;
-
- private final URI uri;
-
- private Observable body;
-
- public RxNettyClientHttpRequest(HttpMethod httpMethod, URI uri, HttpHeaders headers,
- NettyDataBufferFactory dataBufferFactory) {
- super(headers);
- this.httpMethod = httpMethod;
- this.uri = uri;
- this.dataBufferFactory = dataBufferFactory;
- }
-
- @Override
- public DataBufferFactory bufferFactory() {
- return this.dataBufferFactory;
- }
-
- /**
- * Set the body of the message to the given {@link Publisher}.
- *
- * Since the HTTP channel is not yet created when this method
- * is called, the {@code Mono} return value completes immediately.
- * For an event that signals that we're done writing the request, check the
- * {@link #execute()} method.
- *
- * @return a publisher that completes immediately.
- * @see #execute()
- */
- @Override
- public Mono writeWith(Publisher body) {
-
- this.body = RxJava1ObservableConverter.fromPublisher(Flux.from(body)
- .map(b -> dataBufferFactory.wrap(b.asByteBuffer()).getNativeBuffer()));
-
- return Mono.empty();
- }
-
- @Override
- public HttpMethod getMethod() {
- return this.httpMethod;
- }
-
- @Override
- public URI getURI() {
- return this.uri;
- }
-
- @Override
- public Mono execute() {
- try {
- HttpClientRequest request = HttpClient
- .newClient(this.uri.getHost(), this.uri.getPort())
- .createRequest(io.netty.handler.codec.http.HttpMethod.valueOf(this.httpMethod.name()), uri.getRawPath());
-
- return applyBeforeCommit()
- .then(() -> Mono.just(request))
- .map(req -> {
- for (Map.Entry> entry : getHeaders().entrySet()) {
- for (String value : entry.getValue()) {
- req = req.addHeader(entry.getKey(), value);
- }
- }
- for (Map.Entry> entry : getCookies().entrySet()) {
- for (HttpCookie cookie : entry.getValue()) {
- req.addCookie(new DefaultCookie(cookie.getName(), cookie.getValue()));
- }
- }
- return req;
- })
- .map(req -> {
- if (this.body != null) {
- return RxJava1ObservableConverter.toPublisher(req.writeContent(this.body));
- }
- else {
- return RxJava1ObservableConverter.toPublisher(req);
- }
- })
- .flatMap(resp -> resp)
- .next().map(response -> new RxNettyClientHttpResponse(response,
- this.dataBufferFactory));
- }
- catch (IllegalArgumentException exc) {
- return Mono.error(exc);
- }
- }
-
-}
diff --git a/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/RxNettyClientHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/RxNettyClientHttpResponse.java
deleted file mode 100644
index af25af009b3..00000000000
--- a/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/RxNettyClientHttpResponse.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Copyright 2002-2016 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.springframework.http.client.reactive;
-
-import java.util.Collection;
-
-import io.netty.buffer.ByteBuf;
-import io.reactivex.netty.protocol.http.client.HttpClientResponse;
-import reactor.core.converter.RxJava1ObservableConverter;
-import reactor.core.publisher.Flux;
-
-import org.springframework.core.io.buffer.DataBuffer;
-import org.springframework.core.io.buffer.NettyDataBufferFactory;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseCookie;
-import org.springframework.util.Assert;
-import org.springframework.util.CollectionUtils;
-import org.springframework.util.LinkedMultiValueMap;
-import org.springframework.util.MultiValueMap;
-
-/**
- * {@link ClientHttpResponse} implementation for the RxNetty HTTP client
- *
- * @author Brian Clozel
- */
-public class RxNettyClientHttpResponse implements ClientHttpResponse {
-
- private final HttpClientResponse response;
-
- private final HttpHeaders headers;
-
- private final MultiValueMap cookies;
-
- private final NettyDataBufferFactory dataBufferFactory;
-
-
- public RxNettyClientHttpResponse(HttpClientResponse response,
- NettyDataBufferFactory dataBufferFactory) {
- Assert.notNull("'request', request must not be null");
- Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
- this.dataBufferFactory = dataBufferFactory;
- this.response = response;
- this.headers = new HttpHeaders();
- this.response.headerIterator().forEachRemaining(e -> this.headers.set(e.getKey().toString(), e.getValue().toString()));
- this.cookies = initCookies(response);
- }
-
- private static MultiValueMap initCookies(HttpClientResponse response) {
- MultiValueMap result = new LinkedMultiValueMap<>();
- response.getCookies().values().stream().flatMap(Collection::stream)
- .forEach(cookie -> {
- ResponseCookie responseCookie = ResponseCookie.from(cookie.name(), cookie.value())
- .domain(cookie.domain())
- .path(cookie.path())
- .maxAge(cookie.maxAge())
- .secure(cookie.isSecure())
- .httpOnly(cookie.isHttpOnly())
- .build();
- result.add(cookie.name(), responseCookie);
- });
- return CollectionUtils.unmodifiableMultiValueMap(result);
- }
-
-
- @Override
- public HttpStatus getStatusCode() {
- return HttpStatus.valueOf(this.response.getStatus().code());
- }
-
- @Override
- public Flux getBody() {
- return RxJava1ObservableConverter
- .toPublisher(this.response.getContent().map(dataBufferFactory::wrap));
- }
-
- @Override
- public HttpHeaders getHeaders() {
- return this.headers;
- }
-
- @Override
- public MultiValueMap getCookies() {
- return this.cookies;
- }
-
-}
diff --git a/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/RxNettyHttpClientRequestFactory.java b/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/RxNettyHttpClientRequestFactory.java
deleted file mode 100644
index 0b935af8128..00000000000
--- a/spring-web-reactive/src/main/java/org/springframework/http/client/reactive/RxNettyHttpClientRequestFactory.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2002-2016 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.springframework.http.client.reactive;
-
-import java.net.URI;
-
-import org.springframework.core.io.buffer.NettyDataBufferFactory;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpMethod;
-import org.springframework.util.Assert;
-
-/**
- * Create a {@link ClientHttpRequestFactory} for the RxNetty HTTP client
- *
- * @author Brian Clozel
- */
-public class RxNettyHttpClientRequestFactory implements ClientHttpRequestFactory {
-
- private final NettyDataBufferFactory dataBufferFactory;
-
- public RxNettyHttpClientRequestFactory(NettyDataBufferFactory dataBufferFactory) {
- this.dataBufferFactory = dataBufferFactory;
- }
-
- @Override
- public ClientHttpRequest createRequest(HttpMethod httpMethod, URI uri, HttpHeaders headers) {
- Assert.notNull(httpMethod, "HTTP method is required");
- Assert.notNull(uri, "request URI is required");
- Assert.notNull(headers, "request headers are required");
-
- return new RxNettyClientHttpRequest(httpMethod, uri, headers,
- this.dataBufferFactory);
- }
-}
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/ClientWebRequest.java b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/ClientWebRequest.java
new file mode 100644
index 00000000000..c16950f87cb
--- /dev/null
+++ b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/ClientWebRequest.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2002-2016 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.web.client.reactive;
+
+import java.net.URI;
+
+import org.reactivestreams.Publisher;
+
+import org.springframework.core.ResolvableType;
+import org.springframework.http.HttpCookie;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.util.MultiValueMap;
+
+/**
+ * Holds all the application information required to build an actual HTTP client request.
+ * The request body is materialized by a {@code Publisher} of Objects and their type
+ * by a {@code ResolvableType} instance; it should be later converted to a
+ * {@code Publisher} to be written to the actual HTTP client request.
+ *
+ * @author Brian Clozel
+ */
+public class ClientWebRequest {
+
+ protected final HttpMethod httpMethod;
+
+ protected final URI url;
+
+ protected HttpHeaders httpHeaders;
+
+ private MultiValueMap cookies;
+
+ protected Publisher body;
+
+ protected ResolvableType elementType;
+
+
+ public ClientWebRequest(HttpMethod httpMethod, URI url) {
+ this.httpMethod = httpMethod;
+ this.url = url;
+ }
+
+ public HttpMethod getMethod() {
+ return httpMethod;
+ }
+
+ public URI getUrl() {
+ return url;
+ }
+
+ public HttpHeaders getHttpHeaders() {
+ return httpHeaders;
+ }
+
+ public void setHttpHeaders(HttpHeaders httpHeaders) {
+ this.httpHeaders = httpHeaders;
+ }
+
+ public MultiValueMap getCookies() {
+ return cookies;
+ }
+
+ public void setCookies(MultiValueMap cookies) {
+ this.cookies = cookies;
+ }
+
+ public Publisher getBody() {
+ return body;
+ }
+
+ public void setBody(Publisher body) {
+ this.body = body;
+ }
+
+ public ResolvableType getElementType() {
+ return elementType;
+ }
+
+ public void setElementType(ResolvableType elementType) {
+ this.elementType = elementType;
+ }
+}
\ No newline at end of file
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/WebResponseExtractor.java b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/ClientWebRequestBuilder.java
similarity index 65%
rename from spring-web-reactive/src/main/java/org/springframework/web/client/reactive/WebResponseExtractor.java
rename to spring-web-reactive/src/main/java/org/springframework/web/client/reactive/ClientWebRequestBuilder.java
index 2bb1592447e..1baf43a2e9d 100644
--- a/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/WebResponseExtractor.java
+++ b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/ClientWebRequestBuilder.java
@@ -17,15 +17,12 @@
package org.springframework.web.client.reactive;
/**
- * A {@code WebResponseExtractor} extracts the relevant part of a
- * raw {@link org.springframework.http.client.reactive.ClientHttpResponse},
- * optionally decoding the response body and using a target composition API.
- *
- * See static factory methods in {@link WebResponseExtractors}.
+ * Build {@link ClientWebRequest}s
*
* @author Brian Clozel
*/
-public interface WebResponseExtractor {
+public interface ClientWebRequestBuilder {
+
+ ClientWebRequest build();
- T extract(WebResponse webResponse);
-}
+}
\ No newline at end of file
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/ClientWebRequestBuilders.java b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/ClientWebRequestBuilders.java
new file mode 100644
index 00000000000..277dd570392
--- /dev/null
+++ b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/ClientWebRequestBuilders.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2002-2016 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.web.client.reactive;
+
+
+import org.springframework.http.HttpMethod;
+
+/**
+ * Static factory methods for {@link DefaultClientWebRequestBuilder ClientWebRequestBuilders}
+ *
+ * @author Brian Clozel
+ */
+public abstract class ClientWebRequestBuilders {
+
+ /**
+ * Create a {@link DefaultClientWebRequestBuilder} for a GET request.
+ *
+ * @param urlTemplate a URL template; the resulting URL will be encoded
+ * @param urlVariables zero or more URL variables
+ */
+ public static DefaultClientWebRequestBuilder get(String urlTemplate, Object... urlVariables) {
+ return new DefaultClientWebRequestBuilder(HttpMethod.GET, urlTemplate, urlVariables);
+ }
+
+ /**
+ * Create a {@link DefaultClientWebRequestBuilder} for a POST request.
+ *
+ * @param urlTemplate a URL template; the resulting URL will be encoded
+ * @param urlVariables zero or more URL variables
+ */
+ public static DefaultClientWebRequestBuilder post(String urlTemplate, Object... urlVariables) {
+ return new DefaultClientWebRequestBuilder(HttpMethod.POST, urlTemplate, urlVariables);
+ }
+
+
+ /**
+ * Create a {@link DefaultClientWebRequestBuilder} for a PUT request.
+ *
+ * @param urlTemplate a URL template; the resulting URL will be encoded
+ * @param urlVariables zero or more URL variables
+ */
+ public static DefaultClientWebRequestBuilder put(String urlTemplate, Object... urlVariables) {
+ return new DefaultClientWebRequestBuilder(HttpMethod.PUT, urlTemplate, urlVariables);
+ }
+
+ /**
+ * Create a {@link DefaultClientWebRequestBuilder} for a PATCH request.
+ *
+ * @param urlTemplate a URL template; the resulting URL will be encoded
+ * @param urlVariables zero or more URL variables
+ */
+ public static DefaultClientWebRequestBuilder patch(String urlTemplate, Object... urlVariables) {
+ return new DefaultClientWebRequestBuilder(HttpMethod.PATCH, urlTemplate, urlVariables);
+ }
+
+ /**
+ * Create a {@link DefaultClientWebRequestBuilder} for a DELETE request.
+ *
+ * @param urlTemplate a URL template; the resulting URL will be encoded
+ * @param urlVariables zero or more URL variables
+ */
+ public static DefaultClientWebRequestBuilder delete(String urlTemplate, Object... urlVariables) {
+ return new DefaultClientWebRequestBuilder(HttpMethod.DELETE, urlTemplate, urlVariables);
+ }
+
+ /**
+ * Create a {@link DefaultClientWebRequestBuilder} for an OPTIONS request.
+ *
+ * @param urlTemplate a URL template; the resulting URL will be encoded
+ * @param urlVariables zero or more URL variables
+ */
+ public static DefaultClientWebRequestBuilder options(String urlTemplate, Object... urlVariables) {
+ return new DefaultClientWebRequestBuilder(HttpMethod.OPTIONS, urlTemplate, urlVariables);
+ }
+
+ /**
+ * Create a {@link DefaultClientWebRequestBuilder} for a HEAD request.
+ *
+ * @param urlTemplate a URL template; the resulting URL will be encoded
+ * @param urlVariables zero or more URL variables
+ */
+ public static DefaultClientWebRequestBuilder head(String urlTemplate, Object... urlVariables) {
+ return new DefaultClientWebRequestBuilder(HttpMethod.HEAD, urlTemplate, urlVariables);
+ }
+
+ /**
+ * Create a {@link DefaultClientWebRequestBuilder} for a request with the given HTTP method.
+ *
+ * @param httpMethod the HTTP method
+ * @param urlTemplate a URL template; the resulting URL will be encoded
+ * @param urlVariables zero or more URL variables
+ */
+ public static DefaultClientWebRequestBuilder request(HttpMethod httpMethod, String urlTemplate, Object... urlVariables) {
+ return new DefaultClientWebRequestBuilder(httpMethod, urlTemplate, urlVariables);
+ }
+
+}
\ No newline at end of file
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/RequestPostProcessor.java b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/ClientWebRequestPostProcessor.java
similarity index 59%
rename from spring-web-reactive/src/main/java/org/springframework/web/client/reactive/RequestPostProcessor.java
rename to spring-web-reactive/src/main/java/org/springframework/web/client/reactive/ClientWebRequestPostProcessor.java
index ac8ec9783ec..40df3495316 100644
--- a/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/RequestPostProcessor.java
+++ b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/ClientWebRequestPostProcessor.java
@@ -17,20 +17,20 @@
package org.springframework.web.client.reactive;
/**
- * Allows post processing the {@link DefaultHttpRequestBuilder} for strategy for
- * performing more complex operations.
+ * Allow post processing and/or wrapping the {@link ClientWebRequest} before
+ * it's sent to the origin server.
*
* @author Rob Winch
- * @see DefaultHttpRequestBuilder#apply(RequestPostProcessor)
+ * @author Brian Clozel
+ * @see DefaultClientWebRequestBuilder#apply(ClientWebRequestPostProcessor)
*/
-public interface RequestPostProcessor {
+public interface ClientWebRequestPostProcessor {
/**
- * Implementations can modify the {@link DefaultHttpRequestBuilder} passed
- * in.
+ * Implementations can modify and/or wrap the {@link ClientWebRequest} passed in
+ * and return it
*
- * @param toPostProcess
- * the {@link DefaultHttpRequestBuilder} to be modified.
+ * @param request the {@link ClientWebRequest} to be modified and/or wrapped.
*/
- void postProcess(DefaultHttpRequestBuilder toPostProcess);
+ ClientWebRequest postProcess(ClientWebRequest request);
}
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/DefaultClientWebRequestBuilder.java b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/DefaultClientWebRequestBuilder.java
new file mode 100644
index 00000000000..4e399f65f1d
--- /dev/null
+++ b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/DefaultClientWebRequestBuilder.java
@@ -0,0 +1,196 @@
+/*
+ * Copyright 2002-2016 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.web.client.reactive;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Mono;
+
+import org.springframework.core.ResolvableType;
+import org.springframework.http.HttpCookie;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.MediaType;
+import org.springframework.http.client.reactive.ClientHttpRequest;
+import org.springframework.util.Assert;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+import org.springframework.web.util.DefaultUriTemplateHandler;
+import org.springframework.web.util.UriTemplateHandler;
+
+/**
+ * Builds a {@link ClientHttpRequest} using a {@link Publisher}
+ * as request body.
+ *
+ * See static factory methods in {@link ClientWebRequestBuilders}
+ *
+ * @author Brian Clozel
+ * @see ClientWebRequestBuilders
+ */
+public class DefaultClientWebRequestBuilder implements ClientWebRequestBuilder {
+
+
+ private final UriTemplateHandler uriTemplateHandler = new DefaultUriTemplateHandler();
+
+ private HttpMethod httpMethod;
+
+ private HttpHeaders httpHeaders;
+
+ private URI url;
+
+ private final MultiValueMap cookies = new LinkedMultiValueMap<>();
+
+ private Publisher body;
+
+ private ResolvableType elementType;
+
+ private List postProcessors = new ArrayList<>();
+
+ protected DefaultClientWebRequestBuilder() {
+ }
+
+ public DefaultClientWebRequestBuilder(HttpMethod httpMethod, String urlTemplate,
+ Object... urlVariables) {
+ this.httpMethod = httpMethod;
+ this.httpHeaders = new HttpHeaders();
+ this.url = this.uriTemplateHandler.expand(urlTemplate, urlVariables);
+ }
+
+ public DefaultClientWebRequestBuilder(HttpMethod httpMethod, URI url) {
+ this.httpMethod = httpMethod;
+ this.httpHeaders = new HttpHeaders();
+ this.url = url;
+ }
+
+ /**
+ * Add an HTTP request header
+ */
+ public DefaultClientWebRequestBuilder header(String name, String... values) {
+ Arrays.stream(values).forEach(value -> this.httpHeaders.add(name, value));
+ return this;
+ }
+
+ /**
+ * Add all provided HTTP request headers
+ */
+ public DefaultClientWebRequestBuilder headers(HttpHeaders httpHeaders) {
+ this.httpHeaders = httpHeaders;
+ return this;
+ }
+
+ /**
+ * Set the Content-Type request header to the given {@link MediaType}
+ */
+ public DefaultClientWebRequestBuilder contentType(MediaType contentType) {
+ this.httpHeaders.setContentType(contentType);
+ return this;
+ }
+
+ /**
+ * Set the Content-Type request header to the given media type
+ */
+ public DefaultClientWebRequestBuilder contentType(String contentType) {
+ this.httpHeaders.setContentType(MediaType.parseMediaType(contentType));
+ return this;
+ }
+
+ /**
+ * Set the Accept request header to the given {@link MediaType}s
+ */
+ public DefaultClientWebRequestBuilder accept(MediaType... mediaTypes) {
+ this.httpHeaders.setAccept(Arrays.asList(mediaTypes));
+ return this;
+ }
+
+ /**
+ * Set the Accept request header to the given media types
+ */
+ public DefaultClientWebRequestBuilder accept(String... mediaTypes) {
+ this.httpHeaders.setAccept(
+ Arrays.stream(mediaTypes).map(type -> MediaType.parseMediaType(type))
+ .collect(Collectors.toList()));
+ return this;
+ }
+
+ /**
+ * Add a Cookie to the HTTP request
+ */
+ public DefaultClientWebRequestBuilder cookie(String name, String value) {
+ return cookie(new HttpCookie(name, value));
+ }
+
+ /**
+ * Add a Cookie to the HTTP request
+ */
+ public DefaultClientWebRequestBuilder cookie(HttpCookie cookie) {
+ this.cookies.add(cookie.getName(), cookie);
+ return this;
+ }
+
+ /**
+ * Allows performing more complex operations with a strategy. For example, a
+ * {@link ClientWebRequestPostProcessor} implementation might accept the arguments of username
+ * and password and set an HTTP Basic authentication header.
+ *
+ * @param postProcessor the {@link ClientWebRequestPostProcessor} to use. Cannot be null.
+ *
+ * @return this instance for further modifications.
+ */
+ public DefaultClientWebRequestBuilder apply(ClientWebRequestPostProcessor postProcessor) {
+ Assert.notNull(postProcessor, "`postProcessor` is required");
+ this.postProcessors.add(postProcessor);
+ return this;
+ }
+
+ /**
+ * Use the given object as the request body
+ */
+ public DefaultClientWebRequestBuilder body(Object content) {
+ this.body = Mono.just(content);
+ this.elementType = ResolvableType.forInstance(content);
+ return this;
+ }
+
+ /**
+ * Use the given {@link Publisher} as the request body and use its {@link ResolvableType}
+ * as type information for the element published by this reactive stream
+ */
+ public DefaultClientWebRequestBuilder body(Publisher> content, ResolvableType publisherType) {
+ this.body = content;
+ this.elementType = publisherType;
+ return this;
+ }
+
+ @Override
+ public ClientWebRequest build() {
+ ClientWebRequest clientWebRequest = new ClientWebRequest(this.httpMethod, this.url);
+ clientWebRequest.setHttpHeaders(this.httpHeaders);
+ clientWebRequest.setCookies(this.cookies);
+ clientWebRequest.setBody(this.body);
+ clientWebRequest.setElementType(this.elementType);
+ for (ClientWebRequestPostProcessor postProcessor : this.postProcessors) {
+ clientWebRequest = postProcessor.postProcess(clientWebRequest);
+ }
+ return clientWebRequest;
+ }
+
+}
\ No newline at end of file
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/DefaultHttpRequestBuilder.java b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/DefaultHttpRequestBuilder.java
deleted file mode 100644
index d76b53f50a7..00000000000
--- a/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/DefaultHttpRequestBuilder.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Copyright 2002-2016 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.springframework.web.client.reactive;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-import org.reactivestreams.Publisher;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import org.springframework.core.ResolvableType;
-import org.springframework.core.codec.Encoder;
-import org.springframework.http.HttpCookie;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpMethod;
-import org.springframework.http.MediaType;
-import org.springframework.http.client.reactive.ClientHttpRequest;
-import org.springframework.http.client.reactive.ClientHttpRequestFactory;
-import org.springframework.util.Assert;
-import org.springframework.web.client.RestClientException;
-import org.springframework.web.util.DefaultUriTemplateHandler;
-import org.springframework.web.util.UriTemplateHandler;
-
-/**
- * Builds a {@link ClientHttpRequest}
- *
- * See static factory methods in {@link HttpRequestBuilders}
- *
- * @author Brian Clozel
- * @see HttpRequestBuilders
- */
-public class DefaultHttpRequestBuilder implements HttpRequestBuilder {
-
- private final UriTemplateHandler uriTemplateHandler = new DefaultUriTemplateHandler();
-
- protected HttpMethod httpMethod;
-
- protected HttpHeaders httpHeaders;
-
- protected URI url;
-
- protected Publisher contentPublisher;
-
- protected ResolvableType contentType;
-
- protected final List cookies = new ArrayList();
-
- protected DefaultHttpRequestBuilder() {
- }
-
- public DefaultHttpRequestBuilder(HttpMethod httpMethod, String urlTemplate, Object... urlVariables) throws RestClientException {
- this.httpMethod = httpMethod;
- this.httpHeaders = new HttpHeaders();
- this.url = this.uriTemplateHandler.expand(urlTemplate, urlVariables);
- }
-
- public DefaultHttpRequestBuilder(HttpMethod httpMethod, URI url) {
- this.httpMethod = httpMethod;
- this.httpHeaders = new HttpHeaders();
- this.url = url;
- }
-
- public DefaultHttpRequestBuilder header(String name, String... values) {
- Arrays.stream(values).forEach(value -> this.httpHeaders.add(name, value));
- return this;
- }
-
- public DefaultHttpRequestBuilder headers(HttpHeaders httpHeaders) {
- this.httpHeaders = httpHeaders;
- return this;
- }
-
- public DefaultHttpRequestBuilder contentType(MediaType contentType) {
- this.httpHeaders.setContentType(contentType);
- return this;
- }
-
- public DefaultHttpRequestBuilder contentType(String contentType) {
- this.httpHeaders.setContentType(MediaType.parseMediaType(contentType));
- return this;
- }
-
- public DefaultHttpRequestBuilder accept(MediaType... mediaTypes) {
- this.httpHeaders.setAccept(Arrays.asList(mediaTypes));
- return this;
- }
-
- public DefaultHttpRequestBuilder accept(String... mediaTypes) {
- this.httpHeaders.setAccept(Arrays.stream(mediaTypes)
- .map(type -> MediaType.parseMediaType(type))
- .collect(Collectors.toList()));
- return this;
- }
-
- public DefaultHttpRequestBuilder content(Object content) {
- this.contentPublisher = Mono.just(content);
- this.contentType = ResolvableType.forInstance(content);
- return this;
- }
-
- public DefaultHttpRequestBuilder contentStream(Publisher> content, ResolvableType type) {
- this.contentPublisher = Flux.from(content);
- this.contentType = type;
- return this;
- }
-
- /**
- * Allows performing more complex operations with a strategy. For example, a
- * {@link RequestPostProcessor} implementation might accept the arguments of
- * username and password and set an HTTP Basic authentication header.
- *
- * @param postProcessor the {@link RequestPostProcessor} to use. Cannot be null.
- *
- * @return this instance for further modifications.
- */
- public DefaultHttpRequestBuilder apply(RequestPostProcessor postProcessor) {
- Assert.notNull(postProcessor, "`postProcessor` is required");
- postProcessor.postProcess(this);
- return this;
- }
-
- public ClientHttpRequest build(ClientHttpRequestFactory factory, List> messageEncoders) {
- ClientHttpRequest request = factory.createRequest(this.httpMethod, this.url, this.httpHeaders);
- request.getHeaders().putAll(this.httpHeaders);
-
- if (this.contentPublisher != null) {
- MediaType mediaType = request.getHeaders().getContentType();
-
- Optional> messageEncoder = messageEncoders
- .stream()
- .filter(e -> e.canEncode(this.contentType, mediaType))
- .findFirst();
-
- if (messageEncoder.isPresent()) {
- request.writeWith(messageEncoder.get()
- .encode(this.contentPublisher, request.bufferFactory(),
- this.contentType, mediaType));
- }
- else {
- throw new WebClientException("Can't write request body " +
- "of type '" + this.contentType.toString() +
- "' for content-type '" + mediaType.toString() + "'");
- }
- }
- return request;
- }
-
-}
\ No newline at end of file
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/DefaultWebResponse.java b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/DefaultWebResponse.java
deleted file mode 100644
index 328d1198e2e..00000000000
--- a/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/DefaultWebResponse.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright 2002-2016 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.springframework.web.client.reactive;
-
-import java.util.List;
-
-import reactor.core.publisher.Mono;
-
-import org.springframework.core.codec.Decoder;
-import org.springframework.http.client.reactive.ClientHttpResponse;
-
-/**
- * Default implementation of the {@link WebResponse} interface
- *
- * @author Brian Clozel
- */
-public class DefaultWebResponse implements WebResponse {
-
- private final Mono clientResponse;
-
- private final List> messageDecoders;
-
-
- public DefaultWebResponse(Mono clientResponse, List> messageDecoders) {
- this.clientResponse = clientResponse;
- this.messageDecoders = messageDecoders;
- }
-
- @Override
- public Mono getClientResponse() {
- return this.clientResponse;
- }
-
- @Override
- public List> getMessageDecoders() {
- return this.messageDecoders;
- }
-}
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/HttpRequestBuilder.java b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/HttpRequestBuilder.java
deleted file mode 100644
index 32e2527a5ea..00000000000
--- a/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/HttpRequestBuilder.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright 2002-2016 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.springframework.web.client.reactive;
-
-import java.util.List;
-
-import org.springframework.core.codec.Encoder;
-import org.springframework.http.client.reactive.ClientHttpRequest;
-import org.springframework.http.client.reactive.ClientHttpRequestFactory;
-
-/**
- * Build {@link ClientHttpRequest} using a {@link ClientHttpRequestFactory}
- * which wraps an HTTP client implementation.
- *
- * @author Brian Clozel
- */
-public interface HttpRequestBuilder {
-
- /**
- * Build a {@link ClientHttpRequest}
- *
- * @param factory the factory that creates the actual {@link ClientHttpRequest}
- * @param messageEncoders the {@link Encoder}s to use for encoding the request body
- */
- ClientHttpRequest build(ClientHttpRequestFactory factory, List> messageEncoders);
-}
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/HttpRequestBuilders.java b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/HttpRequestBuilders.java
deleted file mode 100644
index 13009f3651c..00000000000
--- a/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/HttpRequestBuilders.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Copyright 2002-2016 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.springframework.web.client.reactive;
-
-import org.springframework.http.HttpMethod;
-
-/**
- * Static factory methods for {@link DefaultHttpRequestBuilder RequestBuilders}.
- *
- * @author Brian Clozel
- */
-public abstract class HttpRequestBuilders {
-
- /**
- * Create a {@link DefaultHttpRequestBuilder} for a GET request.
- *
- * @param urlTemplate a URL template; the resulting URL will be encoded
- * @param urlVariables zero or more URL variables
- */
- public static DefaultHttpRequestBuilder get(String urlTemplate, Object... urlVariables) {
- return new DefaultHttpRequestBuilder(HttpMethod.GET, urlTemplate, urlVariables);
- }
-
- /**
- * Create a {@link DefaultHttpRequestBuilder} for a POST request.
- *
- * @param urlTemplate a URL template; the resulting URL will be encoded
- * @param urlVariables zero or more URL variables
- */
- public static DefaultHttpRequestBuilder post(String urlTemplate, Object... urlVariables) {
- return new DefaultHttpRequestBuilder(HttpMethod.POST, urlTemplate, urlVariables);
- }
-
-
- /**
- * Create a {@link DefaultHttpRequestBuilder} for a PUT request.
- *
- * @param urlTemplate a URL template; the resulting URL will be encoded
- * @param urlVariables zero or more URL variables
- */
- public static DefaultHttpRequestBuilder put(String urlTemplate, Object... urlVariables) {
- return new DefaultHttpRequestBuilder(HttpMethod.PUT, urlTemplate, urlVariables);
- }
-
- /**
- * Create a {@link DefaultHttpRequestBuilder} for a PATCH request.
- *
- * @param urlTemplate a URL template; the resulting URL will be encoded
- * @param urlVariables zero or more URL variables
- */
- public static DefaultHttpRequestBuilder patch(String urlTemplate, Object... urlVariables) {
- return new DefaultHttpRequestBuilder(HttpMethod.PATCH, urlTemplate, urlVariables);
- }
-
- /**
- * Create a {@link DefaultHttpRequestBuilder} for a DELETE request.
- *
- * @param urlTemplate a URL template; the resulting URL will be encoded
- * @param urlVariables zero or more URL variables
- */
- public static DefaultHttpRequestBuilder delete(String urlTemplate, Object... urlVariables) {
- return new DefaultHttpRequestBuilder(HttpMethod.DELETE, urlTemplate, urlVariables);
- }
-
- /**
- * Create a {@link DefaultHttpRequestBuilder} for an OPTIONS request.
- *
- * @param urlTemplate a URL template; the resulting URL will be encoded
- * @param urlVariables zero or more URL variables
- */
- public static DefaultHttpRequestBuilder options(String urlTemplate, Object... urlVariables) {
- return new DefaultHttpRequestBuilder(HttpMethod.OPTIONS, urlTemplate, urlVariables);
- }
-
- /**
- * Create a {@link DefaultHttpRequestBuilder} for a HEAD request.
- *
- * @param urlTemplate a URL template; the resulting URL will be encoded
- * @param urlVariables zero or more URL variables
- */
- public static DefaultHttpRequestBuilder head(String urlTemplate, Object... urlVariables) {
- return new DefaultHttpRequestBuilder(HttpMethod.HEAD, urlTemplate, urlVariables);
- }
-
- /**
- * Create a {@link DefaultHttpRequestBuilder} for a request with the given HTTP method.
- *
- * @param httpMethod the HTTP method
- * @param urlTemplate a URL template; the resulting URL will be encoded
- * @param urlVariables zero or more URL variables
- */
- public static DefaultHttpRequestBuilder request(HttpMethod httpMethod, String urlTemplate, Object... urlVariables) {
- return new DefaultHttpRequestBuilder(httpMethod, urlTemplate, urlVariables);
- }
-
-}
\ No newline at end of file
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/WebResponse.java b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/ResponseExtractor.java
similarity index 61%
rename from spring-web-reactive/src/main/java/org/springframework/web/client/reactive/WebResponse.java
rename to spring-web-reactive/src/main/java/org/springframework/web/client/reactive/ResponseExtractor.java
index 6a92847dd19..3ed162c97d3 100644
--- a/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/WebResponse.java
+++ b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/ResponseExtractor.java
@@ -20,25 +20,19 @@ import java.util.List;
import reactor.core.publisher.Mono;
-import org.springframework.core.codec.Decoder;
import org.springframework.http.client.reactive.ClientHttpResponse;
+import org.springframework.http.converter.reactive.HttpMessageConverter;
/**
- * Result of a {@code ClientHttpRequest} sent to a remote server by the {@code WebClient}
+ * A {@code ResponseExtractor} extracts the relevant part of a
+ * raw {@link org.springframework.http.client.reactive.ClientHttpResponse},
+ * optionally decoding the response body and using a target composition API.
*
- * Contains all the required information to extract relevant information from the raw response.
+ *
See static factory methods in {@link ResponseExtractors}.
*
* @author Brian Clozel
*/
-public interface WebResponse {
+public interface ResponseExtractor {
- /**
- * Return the raw response received by the {@code WebClient}
- */
- Mono getClientResponse();
-
- /**
- * Return the configured list of {@link Decoder}s that can be used to decode the raw response body
- */
- List> getMessageDecoders();
+ T extract(Mono clientResponse, List> messageConverters);
}
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/ResponseExtractors.java b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/ResponseExtractors.java
new file mode 100644
index 00000000000..1e650ec91af
--- /dev/null
+++ b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/ResponseExtractors.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2002-2016 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.web.client.reactive;
+
+import java.util.List;
+import java.util.Optional;
+
+import org.springframework.core.ResolvableType;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.http.client.reactive.ClientHttpResponse;
+import org.springframework.http.converter.reactive.HttpMessageConverter;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+/**
+ * Static factory methods for {@link ResponseExtractor} based on the {@link Flux} and
+ * {@link Mono} API.
+ *
+ * @author Brian Clozel
+ */
+public class ResponseExtractors {
+
+ private static final Object EMPTY_BODY = new Object();
+
+ /**
+ * Extract the response body and decode it, returning it as a {@code Mono}
+ * @see ResolvableType#forClassWithGenerics(Class, Class[])
+ */
+ public static ResponseExtractor> body(ResolvableType bodyType) {
+ // noinspection unchecked
+ return (clientResponse, messageConverters) -> (Mono) clientResponse
+ .flatMap(resp -> decodeResponseBody(resp, bodyType,
+ messageConverters))
+ .next();
+ }
+
+ /**
+ * Extract the response body and decode it, returning it as a {@code Mono}
+ */
+ public static ResponseExtractor> body(Class sourceClass) {
+ ResolvableType bodyType = ResolvableType.forClass(sourceClass);
+ return body(bodyType);
+ }
+
+ /**
+ * Extract the response body and decode it, returning it as a {@code Flux}
+ * @see ResolvableType#forClassWithGenerics(Class, Class[])
+ */
+ public static ResponseExtractor> bodyStream(ResolvableType bodyType) {
+ return (clientResponse, messageConverters) -> clientResponse
+ .flatMap(resp -> decodeResponseBody(resp, bodyType, messageConverters));
+ }
+
+ /**
+ * Extract the response body and decode it, returning it as a {@code Flux}
+ */
+ public static ResponseExtractor> bodyStream(Class sourceClass) {
+ ResolvableType bodyType = ResolvableType.forClass(sourceClass);
+ return bodyStream(bodyType);
+ }
+
+ /**
+ * Extract the full response body as a {@code ResponseEntity} with its body decoded as
+ * a single type {@code T}
+ * @see ResolvableType#forClassWithGenerics(Class, Class[])
+ */
+ public static ResponseExtractor>> response(
+ ResolvableType bodyType) {
+ return (clientResponse, messageConverters) -> clientResponse.then(response -> {
+ return Mono.when(
+ decodeResponseBody(response, bodyType,
+ messageConverters).next().defaultIfEmpty(
+ EMPTY_BODY),
+ Mono.just(response.getHeaders()),
+ Mono.just(response.getStatusCode()));
+ }).map(tuple -> {
+ Object body = (tuple.getT1() != EMPTY_BODY ? tuple.getT1() : null);
+ // noinspection unchecked
+ return new ResponseEntity<>((T) body, tuple.getT2(), tuple.getT3());
+ });
+ }
+
+ /**
+ * Extract the full response body as a {@code ResponseEntity} with its body decoded as
+ * a single type {@code T}
+ */
+ public static ResponseExtractor>> response(
+ Class bodyClass) {
+ ResolvableType bodyType = ResolvableType.forClass(bodyClass);
+ return response(bodyType);
+ }
+
+ /**
+ * Extract the full response body as a {@code ResponseEntity} with its body decoded as
+ * a {@code Flux}
+ * @see ResolvableType#forClassWithGenerics(Class, Class[])
+ */
+ public static ResponseExtractor>>> responseStream(
+ ResolvableType type) {
+ return (clientResponse, messageConverters) -> clientResponse
+ .map(response -> new ResponseEntity<>(
+ decodeResponseBody(response, type,
+ messageConverters),
+ response.getHeaders(), response.getStatusCode()));
+ }
+
+ /**
+ * Extract the full response body as a {@code ResponseEntity} with its body decoded as
+ * a {@code Flux}
+ */
+ public static ResponseExtractor>>> responseStream(
+ Class sourceClass) {
+ ResolvableType resolvableType = ResolvableType.forClass(sourceClass);
+ return responseStream(resolvableType);
+ }
+
+ /**
+ * Extract the response headers as an {@code HttpHeaders} instance
+ */
+ public static ResponseExtractor> headers() {
+ return (clientResponse, messageConverters) -> clientResponse.map(resp -> resp.getHeaders());
+ }
+
+ protected static Flux decodeResponseBody(ClientHttpResponse response,
+ ResolvableType responseType,
+ List> messageConverters) {
+
+ MediaType contentType = response.getHeaders().getContentType();
+ Optional> converter = resolveConverter(messageConverters,
+ responseType, contentType);
+ if (!converter.isPresent()) {
+ return Flux.error(new IllegalStateException(
+ "Could not decode response body of type '" + contentType
+ + "' with target type '" + responseType.toString() + "'"));
+ }
+ // noinspection unchecked
+ return (Flux) converter.get().read(responseType, response);
+ }
+
+ protected static Optional> resolveConverter(
+ List> messageConverters, ResolvableType type,
+ MediaType mediaType) {
+ return messageConverters.stream().filter(e -> e.canRead(type, mediaType))
+ .findFirst();
+ }
+}
\ No newline at end of file
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/RxJava1WebResponseExtractors.java b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/RxJava1WebResponseExtractors.java
deleted file mode 100644
index 03e1bd92580..00000000000
--- a/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/RxJava1WebResponseExtractors.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Copyright 2002-2016 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.springframework.web.client.reactive;
-
-import java.nio.charset.Charset;
-import java.util.List;
-import java.util.Optional;
-
-import reactor.core.converter.RxJava1ObservableConverter;
-import reactor.core.converter.RxJava1SingleConverter;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import rx.Observable;
-import rx.Single;
-
-import org.springframework.core.ResolvableType;
-import org.springframework.core.codec.Decoder;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.MediaType;
-import org.springframework.http.ResponseEntity;
-import org.springframework.http.client.reactive.ClientHttpResponse;
-
-/**
- * Static factory methods for {@link WebResponseExtractor}
- * based on the {@link Observable} and {@link Single} API.
- *
- * @author Brian Clozel
- */
-public class RxJava1WebResponseExtractors {
-
- private static final Charset UTF_8 = Charset.forName("UTF-8");
-
- private static final Object[] HINTS = new Object[] {UTF_8};
-
- /**
- * Extract the response body and decode it, returning it as a {@code Single}
- */
- public static WebResponseExtractor> body(Class sourceClass) {
-
- ResolvableType resolvableType = ResolvableType.forClass(sourceClass);
- //noinspection unchecked
- return webResponse -> (Single) RxJava1SingleConverter.fromPublisher(webResponse.getClientResponse()
- .flatMap(resp -> decodeResponseBody(resp, resolvableType, webResponse.getMessageDecoders()))
- .next());
- }
-
- /**
- * Extract the response body and decode it, returning it as an {@code Observable}
- */
- public static WebResponseExtractor> bodyStream(Class sourceClass) {
-
- ResolvableType resolvableType = ResolvableType.forClass(sourceClass);
- return webResponse -> RxJava1ObservableConverter.fromPublisher(webResponse.getClientResponse()
- .flatMap(resp -> decodeResponseBody(resp, resolvableType, webResponse.getMessageDecoders())));
- }
-
- /**
- * Extract the full response body as a {@code ResponseEntity}
- * with its body decoded as a single type {@code T}
- */
- public static WebResponseExtractor>> response(Class sourceClass) {
-
- ResolvableType resolvableType = ResolvableType.forClass(sourceClass);
- return webResponse -> (Single>)
- RxJava1SingleConverter.fromPublisher(webResponse.getClientResponse()
- .then(response ->
- Mono.when(
- decodeResponseBody(response, resolvableType, webResponse.getMessageDecoders()).next(),
- Mono.just(response.getHeaders()),
- Mono.just(response.getStatusCode())))
- .map(tuple -> {
- //noinspection unchecked
- return new ResponseEntity<>((T) tuple.getT1(), tuple.getT2(), tuple.getT3());
- }));
- }
-
- /**
- * Extract the full response body as a {@code ResponseEntity}
- * with its body decoded as an {@code Observable}
- */
- public static WebResponseExtractor>>> responseStream(Class sourceClass) {
- ResolvableType resolvableType = ResolvableType.forClass(sourceClass);
- return webResponse -> RxJava1SingleConverter.fromPublisher(webResponse.getClientResponse()
- .map(response -> new ResponseEntity<>(
- RxJava1ObservableConverter
- .fromPublisher(decodeResponseBody(response, resolvableType, webResponse.getMessageDecoders())),
- response.getHeaders(),
- response.getStatusCode())));
- }
-
- /**
- * Extract the response headers as an {@code HttpHeaders} instance
- */
- public static WebResponseExtractor> headers() {
- return webResponse -> RxJava1SingleConverter
- .fromPublisher(webResponse.getClientResponse().map(resp -> resp.getHeaders()));
- }
-
- protected static Flux decodeResponseBody(ClientHttpResponse response, ResolvableType responseType,
- List> messageDecoders) {
-
- MediaType contentType = response.getHeaders().getContentType();
- Optional> decoder = resolveDecoder(messageDecoders, responseType, contentType);
- if (!decoder.isPresent()) {
- return Flux.error(new IllegalStateException("Could not decode response body of type '" + contentType +
- "' with target type '" + responseType.toString() + "'"));
- }
- //noinspection unchecked
- return (Flux) decoder.get().decode(response.getBody(), responseType, contentType, HINTS);
- }
-
-
- protected static Optional> resolveDecoder(List> messageDecoders, ResolvableType type,
- MediaType mediaType) {
- return messageDecoders.stream().filter(e -> e.canDecode(type, mediaType)).findFirst();
- }
-}
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/WebClient.java b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/WebClient.java
index 270eeb53aa4..94f4840c54e 100644
--- a/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/WebClient.java
+++ b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/WebClient.java
@@ -16,12 +16,17 @@
package org.springframework.web.client.reactive;
-import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.logging.Level;
-import reactor.core.publisher.Mono;
+import org.reactivestreams.Publisher;
+import org.springframework.core.ResolvableType;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.Encoder;
import org.springframework.core.codec.ByteBufferDecoder;
@@ -31,105 +36,194 @@ import org.springframework.http.codec.json.JacksonJsonEncoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.core.codec.StringEncoder;
import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ClientHttpRequest;
-import org.springframework.http.client.reactive.ClientHttpRequestFactory;
import org.springframework.http.client.reactive.ClientHttpResponse;
+import org.springframework.http.client.reactive.ClientHttpConnector;
+import org.springframework.http.codec.xml.Jaxb2Decoder;
+import org.springframework.http.codec.xml.Jaxb2Encoder;
+import org.springframework.http.converter.reactive.CodecHttpMessageConverter;
+import org.springframework.http.converter.reactive.HttpMessageConverter;
+import org.springframework.http.converter.reactive.ResourceHttpMessageConverter;
+import org.springframework.util.ClassUtils;
+
+import reactor.core.publisher.Mono;
/**
* Reactive Web client supporting the HTTP/1.1 protocol
*
* Here is a simple example of a GET request:
+ *
*
- * WebClient client = new WebClient(new ReactorHttpClientRequestFactory());
+ * // should be shared between HTTP calls
+ * WebClient client = new WebClient(new ReactorHttpClient());
+ *
* Mono<String> result = client
- * .perform(HttpRequestBuilders.get("http://example.org/resource")
- * .accept(MediaType.TEXT_PLAIN))
- * .extract(WebResponseExtractors.body(String.class));
+ * .perform(ClientWebRequestBuilders.get("http://example.org/resource")
+ * .accept(MediaType.TEXT_PLAIN))
+ * .extract(ResponseExtractors.body(String.class));
*
*
* This Web client relies on
*
- * - a {@link ClientHttpRequestFactory} that drives the underlying library (e.g. Reactor-Net, RxNetty...)
- * - an {@link HttpRequestBuilder} which create a Web request with a builder API (see {@link HttpRequestBuilders})
- * - an {@link WebResponseExtractor} which extracts the relevant part of the server response
- * with the composition API of choice (see {@link WebResponseExtractors}
+ * - an {@link ClientHttpConnector} implementation that drives the underlying library (e.g. Reactor-Netty)
+ * - a {@link ClientWebRequestBuilder} which creates a Web request with a builder API (see
+ * {@link ClientWebRequestBuilders})
+ * - an {@link ResponseExtractor} which extracts the relevant part of the server
+ * response with the composition API of choice (see {@link ResponseExtractors}
*
*
* @author Brian Clozel
- * @see HttpRequestBuilders
- * @see WebResponseExtractors
+ * @see ClientWebRequestBuilders
+ * @see ResponseExtractors
*/
public final class WebClient {
- private ClientHttpRequestFactory requestFactory;
+ private static final ClassLoader classLoader = WebClient.class.getClassLoader();
- private List> messageEncoders;
+ private static final boolean jackson2Present = ClassUtils
+ .isPresent("com.fasterxml.jackson.databind.ObjectMapper", classLoader)
+ && ClassUtils.isPresent("com.fasterxml.jackson.core.JsonGenerator",
+ classLoader);
- private List> messageDecoders;
+ private static final boolean jaxb2Present = ClassUtils
+ .isPresent("javax.xml.bind.Binder", classLoader);
+
+ private ClientHttpConnector clientHttpConnector;
+
+ private List> messageConverters;
/**
- * Create a {@code ReactiveRestClient} instance, using the {@link ClientHttpRequestFactory}
- * implementation given as an argument to drive the underlying HTTP client implementation.
+ * Create a {@code WebClient} instance, using the {@link ClientHttpConnector}
+ * implementation given as an argument to drive the underlying
+ * implementation.
*
* Register by default the following Encoders and Decoders:
*
- * - {@link ByteBufferEncoder} / {@link ByteBufferDecoder}
- * - {@link StringEncoder} / {@link StringDecoder}
- * - {@link JacksonJsonEncoder} / {@link JacksonJsonDecoder}
+ * - {@link ByteBufferEncoder} / {@link ByteBufferDecoder}
+ * - {@link StringEncoder} / {@link StringDecoder}
+ * - {@link Jaxb2Encoder} / {@link Jaxb2Decoder}
+ * - {@link JacksonJsonEncoder} / {@link JacksonJsonDecoder}
*
*
- * @param requestFactory the {@code ClientHttpRequestFactory} to use
+ * @param clientHttpConnector the {@code ClientHttpRequestFactory} to use
*/
- public WebClient(ClientHttpRequestFactory requestFactory) {
- this.requestFactory = requestFactory;
- this.messageEncoders = Arrays.asList(new ByteBufferEncoder(), new StringEncoder(),
- new JacksonJsonEncoder());
- this.messageDecoders = Arrays.asList(new ByteBufferDecoder(), new StringDecoder(),
- new JacksonJsonDecoder());
+ public WebClient(ClientHttpConnector clientHttpConnector) {
+ this.clientHttpConnector = clientHttpConnector;
+ this.messageConverters = new ArrayList<>();
+ addDefaultHttpMessageConverters(this.messageConverters);
}
/**
- * Set the list of {@link Encoder}s to use for encoding messages
+ * Adds default HTTP message converters
*/
- public void setMessageEncoders(List> messageEncoders) {
- this.messageEncoders = messageEncoders;
+ protected final void addDefaultHttpMessageConverters(
+ List> converters) {
+ converters.add(converter(new ByteBufferEncoder(), new ByteBufferDecoder()));
+ converters.add(converter(new StringEncoder(), new StringDecoder()));
+ converters.add(new ResourceHttpMessageConverter());
+ if (jaxb2Present) {
+ converters.add(converter(new Jaxb2Encoder(), new Jaxb2Decoder()));
+ }
+ if (jackson2Present) {
+ converters.add(converter(new JacksonJsonEncoder(), new JacksonJsonDecoder()));
+ }
+ }
+
+ private static HttpMessageConverter converter(Encoder encoder,
+ Decoder decoder) {
+ return new CodecHttpMessageConverter<>(encoder, decoder);
}
/**
- * Set the list of {@link Decoder}s to use for decoding messages
+ * Set the list of {@link HttpMessageConverter}s to use for encoding and decoding HTTP
+ * messages
*/
- public void setMessageDecoders(List> messageDecoders) {
- this.messageDecoders = messageDecoders;
+ public void setMessageConverters(List> messageConverters) {
+ this.messageConverters = messageConverters;
}
/**
* Perform the actual HTTP request/response exchange
*
- * Pulling demand from the exposed {@code Flux} will result in:
+ *
+ * Requesting from the exposed {@code Flux} will result in:
*
- * - building the actual HTTP request using the provided {@code RequestBuilder}
- * - encoding the HTTP request body with the configured {@code Encoder}s
- * - returning the response with a publisher of the body
+ * - building the actual HTTP request using the provided {@code ClientWebRequestBuilder}
+ * - encoding the HTTP request body with the configured {@code HttpMessageConverter}s
+ * - returning the response with a publisher of the body
*
*/
- public WebResponseActions perform(HttpRequestBuilder builder) {
+ public WebResponseActions perform(ClientWebRequestBuilder builder) {
+
+ ClientWebRequest clientWebRequest = builder.build();
- ClientHttpRequest request = builder.build(this.requestFactory, this.messageEncoders);
- final Mono clientResponse = request.execute()
- .log("org.springframework.http.client.reactive");
+ final Mono clientResponse = this.clientHttpConnector
+ .connect(clientWebRequest.getMethod(), clientWebRequest.getUrl(),
+ new DefaultRequestCallback(clientWebRequest))
+ .log("org.springframework.web.client.reactive", Level.FINE);
return new WebResponseActions() {
@Override
public void doWithStatus(Consumer consumer) {
- // TODO: implement
+ clientResponse.doOnNext(clientHttpResponse ->
+ consumer.accept(clientHttpResponse.getStatusCode()));
}
@Override
- public T extract(WebResponseExtractor extractor) {
- return extractor.extract(new DefaultWebResponse(clientResponse, messageDecoders));
+ public T extract(ResponseExtractor extractor) {
+ return extractor.extract(clientResponse, messageConverters);
}
-
};
}
-}
+ protected class DefaultRequestCallback implements Function> {
+
+ private final ClientWebRequest clientWebRequest;
+
+ public DefaultRequestCallback(ClientWebRequest clientWebRequest) {
+ this.clientWebRequest = clientWebRequest;
+ }
+
+ @Override
+ public Mono apply(ClientHttpRequest clientHttpRequest) {
+ clientHttpRequest.getHeaders().putAll(this.clientWebRequest.getHttpHeaders());
+ if (clientHttpRequest.getHeaders().getAccept().isEmpty()) {
+ clientHttpRequest.getHeaders().setAccept(
+ Collections.singletonList(MediaType.ALL));
+ }
+ clientWebRequest.getCookies().values()
+ .stream().flatMap(cookies -> cookies.stream())
+ .forEach(cookie -> clientHttpRequest.getCookies().add(cookie.getName(), cookie));
+ if (this.clientWebRequest.getBody() != null) {
+ return writeRequestBody(this.clientWebRequest.getBody(),
+ this.clientWebRequest.getElementType(), clientHttpRequest, messageConverters);
+ }
+ else {
+ return clientHttpRequest.setComplete();
+ }
+ }
+
+ protected Mono writeRequestBody(Publisher> content,
+ ResolvableType requestType, ClientHttpRequest request,
+ List> messageConverters) {
+
+ MediaType contentType = request.getHeaders().getContentType();
+ Optional> converter = resolveConverter(messageConverters, requestType, contentType);
+ if (!converter.isPresent()) {
+ return Mono.error(new IllegalStateException(
+ "Could not encode request body of type '" + contentType
+ + "' with target type '" + requestType.toString() + "'"));
+ }
+ // noinspection unchecked
+ return converter.get().write((Publisher) content, requestType, contentType, request);
+ }
+
+ protected Optional> resolveConverter(
+ List> messageConverters, ResolvableType type,
+ MediaType mediaType) {
+ return messageConverters.stream().filter(e -> e.canWrite(type, mediaType)).findFirst();
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/WebResponseActions.java b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/WebResponseActions.java
index e18e2decdfc..0c6e709ff17 100644
--- a/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/WebResponseActions.java
+++ b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/WebResponseActions.java
@@ -45,5 +45,5 @@ public interface WebResponseActions {
* .extract(response(String.class));
*
*/
- T extract(WebResponseExtractor extractor);
+ T extract(ResponseExtractor extractor);
}
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/WebResponseExtractors.java b/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/WebResponseExtractors.java
deleted file mode 100644
index 2d3d7aeda83..00000000000
--- a/spring-web-reactive/src/main/java/org/springframework/web/client/reactive/WebResponseExtractors.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Copyright 2002-2016 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.springframework.web.client.reactive;
-
-import java.nio.charset.Charset;
-import java.util.List;
-import java.util.Optional;
-
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import org.springframework.core.ResolvableType;
-import org.springframework.core.codec.Decoder;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.MediaType;
-import org.springframework.http.ResponseEntity;
-import org.springframework.http.client.reactive.ClientHttpResponse;
-
-/**
- * Static factory methods for {@link WebResponseExtractor}
- * based on the {@link Flux} and {@link Mono} API.
- *
- * @author Brian Clozel
- */
-public class WebResponseExtractors {
-
- private static final Charset UTF_8 = Charset.forName("UTF-8");
-
- private static final Object[] HINTS = new Object[] {UTF_8};
-
- private static final Object EMPTY_BODY = new Object();
-
-
- /**
- * Extract the response body and decode it, returning it as a {@code Mono}
- * @see ResolvableType#forClassWithGenerics(Class, Class[])
- */
- public static WebResponseExtractor> body(ResolvableType bodyType) {
- //noinspection unchecked
- return webResponse -> (Mono) webResponse.getClientResponse()
- .flatMap(resp -> decodeResponseBody(resp, bodyType, webResponse.getMessageDecoders()))
- .next();
- }
-
- /**
- * Extract the response body and decode it, returning it as a {@code Mono}
- */
- public static WebResponseExtractor> body(Class sourceClass) {
- ResolvableType bodyType = ResolvableType.forClass(sourceClass);
- return body(bodyType);
- }
-
-
- /**
- * Extract the response body and decode it, returning it as a {@code Flux}
- * @see ResolvableType#forClassWithGenerics(Class, Class[])
- */
- public static WebResponseExtractor> bodyStream(ResolvableType bodyType) {
- return webResponse -> webResponse.getClientResponse()
- .flatMap(resp -> decodeResponseBody(resp, bodyType, webResponse.getMessageDecoders()));
- }
-
- /**
- * Extract the response body and decode it, returning it as a {@code Flux}
- */
- public static WebResponseExtractor> bodyStream(Class sourceClass) {
- ResolvableType bodyType = ResolvableType.forClass(sourceClass);
- return bodyStream(bodyType);
- }
-
- /**
- * Extract the full response body as a {@code ResponseEntity}
- * with its body decoded as a single type {@code T}
- * @see ResolvableType#forClassWithGenerics(Class, Class[])
- */
- public static WebResponseExtractor>> response(ResolvableType bodyType) {
- return webResponse -> webResponse.getClientResponse()
- .then(response -> {
- List> decoders = webResponse.getMessageDecoders();
- return Mono.when(
- decodeResponseBody(response, bodyType, decoders).next().defaultIfEmpty(EMPTY_BODY),
- Mono.just(response.getHeaders()),
- Mono.just(response.getStatusCode()));
- })
- .map(tuple -> {
- Object body = (tuple.getT1() != EMPTY_BODY ? tuple.getT1() : null);
- //noinspection unchecked
- return new ResponseEntity<>((T) body, tuple.getT2(), tuple.getT3());
- });
- }
-
- /**
- * Extract the full response body as a {@code ResponseEntity}
- * with its body decoded as a single type {@code T}
- */
- public static WebResponseExtractor>> response(Class bodyClass) {
- ResolvableType bodyType = ResolvableType.forClass(bodyClass);
- return response(bodyType);
- }
-
- /**
- * Extract the full response body as a {@code ResponseEntity}
- * with its body decoded as a {@code Flux}
- * @see ResolvableType#forClassWithGenerics(Class, Class[])
- */
- public static WebResponseExtractor>>> responseStream(ResolvableType type) {
- return webResponse -> webResponse.getClientResponse()
- .map(response -> new ResponseEntity<>(
- decodeResponseBody(response, type, webResponse.getMessageDecoders()),
- response.getHeaders(), response.getStatusCode()));
- }
-
- /**
- * Extract the full response body as a {@code ResponseEntity}
- * with its body decoded as a {@code Flux}
- */
- public static WebResponseExtractor>>> responseStream(Class sourceClass) {
- ResolvableType resolvableType = ResolvableType.forClass(sourceClass);
- return responseStream(resolvableType);
- }
-
- /**
- * Extract the response headers as an {@code HttpHeaders} instance
- */
- public static WebResponseExtractor> headers() {
- return webResponse -> webResponse.getClientResponse().map(resp -> resp.getHeaders());
- }
-
- protected static Flux decodeResponseBody(ClientHttpResponse response, ResolvableType responseType,
- List> messageDecoders) {
-
- MediaType contentType = response.getHeaders().getContentType();
- Optional> decoder = resolveDecoder(messageDecoders, responseType, contentType);
- if (!decoder.isPresent()) {
- return Flux.error(new IllegalStateException("Could not decode response body of type '" + contentType +
- "' with target type '" + responseType.toString() + "'"));
- }
- //noinspection unchecked
- return (Flux) decoder.get().decode(response.getBody(), responseType, contentType, HINTS);
- }
-
-
- protected static Optional> resolveDecoder(List> messageDecoders, ResolvableType type,
- MediaType mediaType) {
- return messageDecoders.stream().filter(e -> e.canDecode(type, mediaType)).findFirst();
- }
-}
diff --git a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/FlushingIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/FlushingIntegrationTests.java
index d3a45693ecd..831e5a55ad3 100644
--- a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/FlushingIntegrationTests.java
+++ b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/FlushingIntegrationTests.java
@@ -18,15 +18,16 @@ package org.springframework.http.server.reactive;
import org.junit.Before;
import org.junit.Test;
-import static org.springframework.web.client.reactive.HttpRequestBuilders.get;
-import static org.springframework.web.client.reactive.WebResponseExtractors.bodyStream;
+
+import static org.springframework.web.client.reactive.ClientWebRequestBuilders.get;
+import static org.springframework.web.client.reactive.ResponseExtractors.bodyStream;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.test.TestSubscriber;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.FlushingDataBuffer;
-import org.springframework.http.client.reactive.ReactorHttpClientRequestFactory;
+import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.client.reactive.WebClient;
/**
@@ -39,7 +40,7 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest
@Before
public void setup() throws Exception {
super.setup();
- this.webClient = new WebClient(new ReactorHttpClientRequestFactory());
+ this.webClient = new WebClient(new ReactorClientHttpConnector());
}
@Test
diff --git a/spring-web-reactive/src/test/java/org/springframework/web/client/reactive/DefaultHttpRequestBuilderTests.java b/spring-web-reactive/src/test/java/org/springframework/web/client/reactive/DefaultWebRequestBuilderTests.java
similarity index 55%
rename from spring-web-reactive/src/test/java/org/springframework/web/client/reactive/DefaultHttpRequestBuilderTests.java
rename to spring-web-reactive/src/test/java/org/springframework/web/client/reactive/DefaultWebRequestBuilderTests.java
index 112c62d6d78..ba7364c315f 100644
--- a/spring-web-reactive/src/test/java/org/springframework/web/client/reactive/DefaultHttpRequestBuilderTests.java
+++ b/spring-web-reactive/src/test/java/org/springframework/web/client/reactive/DefaultWebRequestBuilderTests.java
@@ -16,33 +16,41 @@
package org.springframework.web.client.reactive;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
import org.springframework.http.HttpMethod;
/**
*
* @author Rob Winch
- *
*/
-public class DefaultHttpRequestBuilderTests {
- private DefaultHttpRequestBuilder builder;
+public class DefaultWebRequestBuilderTests {
+ private DefaultClientWebRequestBuilder builder;
@Before
public void setup() {
- builder = new DefaultHttpRequestBuilder(HttpMethod.GET, "https://example.com/foo");
+ builder = new DefaultClientWebRequestBuilder(HttpMethod.GET, "https://example.com/foo");
}
@Test
public void apply() {
- RequestPostProcessor postProcessor = mock(RequestPostProcessor.class);
+ ClientWebRequestPostProcessor postProcessor = mock(ClientWebRequestPostProcessor.class);
+ when(postProcessor.postProcess(any(ClientWebRequest.class))).thenAnswer(new Answer() {
+ @Override
+ public ClientWebRequest answer(InvocationOnMock invocation) throws Throwable {
+ return (ClientWebRequest) invocation.getArguments()[0];
+ }
+ });
- builder.apply(postProcessor);
+ ClientWebRequest webRequest = builder.apply(postProcessor).build();
- verify(postProcessor).postProcess(builder);
+ verify(postProcessor).postProcess(webRequest);
}
@Test(expected = IllegalArgumentException.class)
diff --git a/spring-web-reactive/src/test/java/org/springframework/web/client/reactive/WebClientIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/web/client/reactive/WebClientIntegrationTests.java
index f038ab1cb50..88d97aa84ec 100644
--- a/spring-web-reactive/src/test/java/org/springframework/web/client/reactive/WebClientIntegrationTests.java
+++ b/spring-web-reactive/src/test/java/org/springframework/web/client/reactive/WebClientIntegrationTests.java
@@ -17,8 +17,8 @@
package org.springframework.web.client.reactive;
import static org.junit.Assert.*;
-import static org.springframework.web.client.reactive.HttpRequestBuilders.*;
-import static org.springframework.web.client.reactive.WebResponseExtractors.*;
+import static org.springframework.web.client.reactive.ClientWebRequestBuilders.*;
+import static org.springframework.web.client.reactive.ResponseExtractors.*;
import java.util.function.Consumer;
@@ -38,9 +38,11 @@ import org.springframework.http.codec.Pojo;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
-import org.springframework.http.client.reactive.ReactorHttpClientRequestFactory;
+import org.springframework.http.client.reactive.ReactorClientHttpConnector;
/**
+ * {@link WebClient} integration tests with the {@code Flux} and {@code Mono} API.
+ *
* @author Brian Clozel
*/
public class WebClientIntegrationTests {
@@ -52,7 +54,7 @@ public class WebClientIntegrationTests {
@Before
public void setup() {
this.server = new MockWebServer();
- this.webClient = new WebClient(new ReactorHttpClientRequestFactory());
+ this.webClient = new WebClient(new ReactorClientHttpConnector());
}
@Test
@@ -228,12 +230,14 @@ public class WebClientIntegrationTests {
public void shouldPostPojoAsJson() throws Exception {
HttpUrl baseUrl = server.url("/pojo/capitalize");
- this.server.enqueue(new MockResponse().setBody("{\"bar\":\"BARBAR\",\"foo\":\"FOOFOO\"}"));
+ this.server.enqueue(new MockResponse()
+ .setHeader("Content-Type", "application/json")
+ .setBody("{\"bar\":\"BARBAR\",\"foo\":\"FOOFOO\"}"));
Pojo spring = new Pojo("foofoo", "barbar");
Mono result = this.webClient
.perform(post(baseUrl.toString())
- .content(spring)
+ .body(spring)
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON))
.extract(body(Pojo.class));
@@ -252,6 +256,28 @@ public class WebClientIntegrationTests {
assertEquals("application/json", request.getHeader(HttpHeaders.CONTENT_TYPE));
}
+ @Test
+ public void shouldSendCookieHeader() throws Exception {
+ HttpUrl baseUrl = server.url("/test");
+ this.server.enqueue(new MockResponse()
+ .setHeader("Content-Type", "text/plain").setBody("test"));
+
+ Mono result = this.webClient
+ .perform(get(baseUrl.toString())
+ .cookie("testkey", "testvalue"))
+ .extract(body(String.class));
+
+ TestSubscriber
+ .subscribe(result)
+ .awaitAndAssertNextValues("test")
+ .assertComplete();
+
+ RecordedRequest request = server.takeRequest();
+ assertEquals(1, server.getRequestCount());
+ assertEquals("/test", request.getPath());
+ assertEquals("testkey=testvalue", request.getHeader(HttpHeaders.COOKIE));
+ }
+
@Test
public void shouldGetErrorWhen404() throws Exception {
@@ -262,7 +288,6 @@ public class WebClientIntegrationTests {
.perform(get(baseUrl.toString()))
.extract(body(String.class));
- // TODO: error message should be converted to a ClientException
TestSubscriber
.subscribe(result)
.await()
diff --git a/spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java
index 79beae54ef8..9de15cab870 100644
--- a/spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java
+++ b/spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java
@@ -16,7 +16,11 @@
package org.springframework.web.reactive.result.method.annotation;
+import static org.springframework.web.client.reactive.ClientWebRequestBuilders.*;
+import static org.springframework.web.client.reactive.ResponseExtractors.*;
+
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -29,14 +33,16 @@ import reactor.core.test.TestSubscriber;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import org.springframework.core.codec.Encoder;
import org.springframework.core.codec.ByteBufferDecoder;
-import org.springframework.http.codec.json.JacksonJsonDecoder;
-import org.springframework.http.codec.json.JacksonJsonEncoder;
+import org.springframework.core.codec.ByteBufferEncoder;
+import org.springframework.core.codec.Encoder;
import org.springframework.core.codec.StringDecoder;
+import org.springframework.core.codec.StringEncoder;
import org.springframework.http.MediaType;
-import org.springframework.http.client.reactive.ReactorHttpClientRequestFactory;
+import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.codec.SseEventEncoder;
+import org.springframework.http.codec.json.JacksonJsonDecoder;
+import org.springframework.http.codec.json.JacksonJsonEncoder;
import org.springframework.http.converter.reactive.CodecHttpMessageConverter;
import org.springframework.http.converter.reactive.HttpMessageConverter;
import org.springframework.http.server.reactive.AbstractHttpHandlerIntegrationTests;
@@ -49,9 +55,6 @@ import org.springframework.web.reactive.config.WebReactiveConfiguration;
import org.springframework.web.reactive.sse.SseEvent;
import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
-import static org.springframework.web.client.reactive.HttpRequestBuilders.get;
-import static org.springframework.web.client.reactive.WebResponseExtractors.bodyStream;
-
/**
* @author Sebastien Deleuze
*/
@@ -64,11 +67,12 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
@Before
public void setup() throws Exception {
super.setup();
- this.webClient = new WebClient(new ReactorHttpClientRequestFactory());
- this.webClient.setMessageDecoders(Arrays.asList(
- new ByteBufferDecoder(),
- new StringDecoder(false),
- new JacksonJsonDecoder()));
+ this.webClient = new WebClient(new ReactorClientHttpConnector());
+ List> converters = new ArrayList<>();
+ converters.add(new CodecHttpMessageConverter<>(new ByteBufferEncoder(), new ByteBufferDecoder()));
+ converters.add(new CodecHttpMessageConverter<>(new StringEncoder(), new StringDecoder(false)));
+ converters.add(new CodecHttpMessageConverter<>(new JacksonJsonEncoder(), new JacksonJsonDecoder()));
+ this.webClient.setMessageConverters(converters);
}
@Override