Browse Source

Merge branch '25751-exchange-deprecation' into master

Closes gh-25751
pull/25823/head
Rossen Stoyanchev 5 years ago
parent
commit
bedf2de614
  1. 201
      spring-test/src/main/java/org/springframework/test/web/reactive/server/DefaultWebTestClient.java
  2. 229
      spring-test/src/main/java/org/springframework/test/web/reactive/server/DefaultWebTestClientBuilder.java
  3. 24
      spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientResponse.java
  4. 129
      spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java
  5. 14
      spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClientBuilder.java
  6. 97
      spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java
  7. 3
      spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/client/WebClientExtensions.kt
  8. 16
      spring-webflux/src/test/java/org/springframework/web/reactive/function/MultipartIntegrationTests.java
  9. 39
      spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultWebClientTests.java
  10. 7
      spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientDataBufferAllocatingTests.java
  11. 45
      spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java
  12. 14
      spring-webflux/src/test/java/org/springframework/web/reactive/function/server/LocaleContextResolverIntegrationTests.java
  13. 11
      spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/MultipartIntegrationTests.java
  14. 52
      spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ProtobufIntegrationTests.java
  15. 15
      spring-webflux/src/test/java/org/springframework/web/reactive/result/view/LocaleContextResolverIntegrationTests.java
  16. 1
      spring-webflux/src/test/kotlin/org/springframework/web/reactive/function/client/WebClientExtensionsTests.kt
  17. 171
      src/docs/asciidoc/web/webflux-webclient.adoc

201
spring-test/src/main/java/org/springframework/test/web/reactive/server/DefaultWebTestClient.java

@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets; @@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -46,12 +47,17 @@ import org.springframework.test.util.AssertionErrors; @@ -46,12 +47,17 @@ import org.springframework.test.util.AssertionErrors;
import org.springframework.test.util.JsonExpectationsHelper;
import org.springframework.test.util.XmlExpectationsHelper;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MimeType;
import org.springframework.util.MultiValueMap;
import org.springframework.web.reactive.function.BodyInserter;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.ExchangeFunction;
import org.springframework.web.util.UriBuilder;
import org.springframework.web.util.UriBuilderFactory;
/**
* Default implementation of {@link WebTestClient}.
@ -61,30 +67,42 @@ import org.springframework.web.util.UriBuilder; @@ -61,30 +67,42 @@ import org.springframework.web.util.UriBuilder;
*/
class DefaultWebTestClient implements WebTestClient {
private final WebClient webClient;
private final WiretapConnector wiretapConnector;
private final Duration timeout;
private final ExchangeFunction exchangeFunction;
private final UriBuilderFactory uriBuilderFactory;
@Nullable
private final HttpHeaders defaultHeaders;
@Nullable
private final MultiValueMap<String, String> defaultCookies;
private final Duration responseTimeout;
private final DefaultWebTestClientBuilder builder;
private final AtomicLong requestIndex = new AtomicLong();
DefaultWebTestClient(WebClient.Builder clientBuilder, ClientHttpConnector connector,
@Nullable Duration timeout, DefaultWebTestClientBuilder webTestClientBuilder) {
DefaultWebTestClient(ClientHttpConnector connector,
Function<ClientHttpConnector, ExchangeFunction> exchangeFactory, UriBuilderFactory uriBuilderFactory,
@Nullable HttpHeaders headers, @Nullable MultiValueMap<String, String> cookies,
@Nullable Duration responseTimeout, DefaultWebTestClientBuilder clientBuilder) {
Assert.notNull(clientBuilder, "WebClient.Builder is required");
this.wiretapConnector = new WiretapConnector(connector);
this.webClient = clientBuilder.clientConnector(this.wiretapConnector).build();
this.timeout = (timeout != null ? timeout : Duration.ofSeconds(5));
this.builder = webTestClientBuilder;
this.exchangeFunction = exchangeFactory.apply(this.wiretapConnector);
this.uriBuilderFactory = uriBuilderFactory;
this.defaultHeaders = headers;
this.defaultCookies = cookies;
this.responseTimeout = (responseTimeout != null ? responseTimeout : Duration.ofSeconds(5));
this.builder = clientBuilder;
}
private Duration getTimeout() {
return this.timeout;
private Duration getResponseTimeout() {
return this.responseTimeout;
}
@ -124,12 +142,12 @@ class DefaultWebTestClient implements WebTestClient { @@ -124,12 +142,12 @@ class DefaultWebTestClient implements WebTestClient {
}
@Override
public RequestBodyUriSpec method(HttpMethod method) {
return methodInternal(method);
public RequestBodyUriSpec method(HttpMethod httpMethod) {
return methodInternal(httpMethod);
}
private RequestBodyUriSpec methodInternal(HttpMethod method) {
return new DefaultRequestBodyUriSpec(this.webClient.method(method));
private RequestBodyUriSpec methodInternal(HttpMethod httpMethod) {
return new DefaultRequestBodyUriSpec(httpMethod);
}
@Override
@ -145,154 +163,180 @@ class DefaultWebTestClient implements WebTestClient { @@ -145,154 +163,180 @@ class DefaultWebTestClient implements WebTestClient {
private class DefaultRequestBodyUriSpec implements RequestBodyUriSpec {
private final WebClient.RequestBodyUriSpec bodySpec;
private final HttpMethod httpMethod;
@Nullable
private URI uri;
private final HttpHeaders headers;
@Nullable
private MultiValueMap<String, String> cookies;
@Nullable
private BodyInserter<?, ? super ClientHttpRequest> inserter;
private final Map<String, Object> attributes = new LinkedHashMap<>(4);
@Nullable
private Consumer<ClientHttpRequest> httpRequestConsumer;
@Nullable
private String uriTemplate;
private final String requestId;
DefaultRequestBodyUriSpec(WebClient.RequestBodyUriSpec spec) {
this.bodySpec = spec;
DefaultRequestBodyUriSpec(HttpMethod httpMethod) {
this.httpMethod = httpMethod;
this.requestId = String.valueOf(requestIndex.incrementAndGet());
this.bodySpec.header(WebTestClient.WEBTESTCLIENT_REQUEST_ID, this.requestId);
this.headers = new HttpHeaders();
this.headers.add(WebTestClient.WEBTESTCLIENT_REQUEST_ID, this.requestId);
}
@Override
public RequestBodySpec uri(String uriTemplate, Object... uriVariables) {
this.bodySpec.uri(uriTemplate, uriVariables);
this.uriTemplate = uriTemplate;
return this;
return uri(uriBuilderFactory.expand(uriTemplate, uriVariables));
}
@Override
public RequestBodySpec uri(String uriTemplate, Map<String, ?> uriVariables) {
this.bodySpec.uri(uriTemplate, uriVariables);
this.uriTemplate = uriTemplate;
return this;
return uri(uriBuilderFactory.expand(uriTemplate, uriVariables));
}
@Override
public RequestBodySpec uri(Function<UriBuilder, URI> uriFunction) {
this.bodySpec.uri(uriFunction);
this.uriTemplate = null;
return this;
return uri(uriFunction.apply(uriBuilderFactory.builder()));
}
@Override
public RequestBodySpec uri(URI uri) {
this.bodySpec.uri(uri);
this.uriTemplate = null;
this.uri = uri;
return this;
}
private HttpHeaders getHeaders() {
return this.headers;
}
private MultiValueMap<String, String> getCookies() {
if (this.cookies == null) {
this.cookies = new LinkedMultiValueMap<>(3);
}
return this.cookies;
}
@Override
public RequestBodySpec header(String headerName, String... headerValues) {
this.bodySpec.header(headerName, headerValues);
for (String headerValue : headerValues) {
getHeaders().add(headerName, headerValue);
}
return this;
}
@Override
public RequestBodySpec headers(Consumer<HttpHeaders> headersConsumer) {
this.bodySpec.headers(headersConsumer);
headersConsumer.accept(getHeaders());
return this;
}
@Override
public RequestBodySpec attribute(String name, Object value) {
this.bodySpec.attribute(name, value);
this.attributes.put(name, value);
return this;
}
@Override
public RequestBodySpec attributes(
Consumer<Map<String, Object>> attributesConsumer) {
this.bodySpec.attributes(attributesConsumer);
public RequestBodySpec attributes(Consumer<Map<String, Object>> attributesConsumer) {
attributesConsumer.accept(this.attributes);
return this;
}
@Override
public RequestBodySpec accept(MediaType... acceptableMediaTypes) {
this.bodySpec.accept(acceptableMediaTypes);
getHeaders().setAccept(Arrays.asList(acceptableMediaTypes));
return this;
}
@Override
public RequestBodySpec acceptCharset(Charset... acceptableCharsets) {
this.bodySpec.acceptCharset(acceptableCharsets);
getHeaders().setAcceptCharset(Arrays.asList(acceptableCharsets));
return this;
}
@Override
public RequestBodySpec contentType(MediaType contentType) {
this.bodySpec.contentType(contentType);
getHeaders().setContentType(contentType);
return this;
}
@Override
public RequestBodySpec contentLength(long contentLength) {
this.bodySpec.contentLength(contentLength);
getHeaders().setContentLength(contentLength);
return this;
}
@Override
public RequestBodySpec cookie(String name, String value) {
this.bodySpec.cookie(name, value);
getCookies().add(name, value);
return this;
}
@Override
public RequestBodySpec cookies(
Consumer<MultiValueMap<String, String>> cookiesConsumer) {
this.bodySpec.cookies(cookiesConsumer);
public RequestBodySpec cookies(Consumer<MultiValueMap<String, String>> cookiesConsumer) {
cookiesConsumer.accept(getCookies());
return this;
}
@Override
public RequestBodySpec ifModifiedSince(ZonedDateTime ifModifiedSince) {
this.bodySpec.ifModifiedSince(ifModifiedSince);
getHeaders().setIfModifiedSince(ifModifiedSince);
return this;
}
@Override
public RequestBodySpec ifNoneMatch(String... ifNoneMatches) {
this.bodySpec.ifNoneMatch(ifNoneMatches);
getHeaders().setIfNoneMatch(Arrays.asList(ifNoneMatches));
return this;
}
@Override
public RequestHeadersSpec<?> bodyValue(Object body) {
this.bodySpec.bodyValue(body);
this.inserter = BodyInserters.fromValue(body);
return this;
}
@Override
public <T, S extends Publisher<T>> RequestHeadersSpec<?> body(S publisher, Class<T> elementClass) {
this.bodySpec.body(publisher, elementClass);
public <T, P extends Publisher<T>> RequestHeadersSpec<?> body(
P publisher, ParameterizedTypeReference<T> elementTypeRef) {
this.inserter = BodyInserters.fromPublisher(publisher, elementTypeRef);
return this;
}
@Override
public <T, S extends Publisher<T>> RequestHeadersSpec<?> body(S publisher, ParameterizedTypeReference<T> elementTypeRef) {
this.bodySpec.body(publisher, elementTypeRef);
public <T, P extends Publisher<T>> RequestHeadersSpec<?> body(P publisher, Class<T> elementClass) {
this.inserter = BodyInserters.fromPublisher(publisher, elementClass);
return this;
}
@Override
public RequestHeadersSpec<?> body(Object producer, Class<?> elementClass) {
this.bodySpec.body(producer, elementClass);
this.inserter = BodyInserters.fromProducer(producer, elementClass);
return this;
}
@Override
public RequestHeadersSpec<?> body(Object producer, ParameterizedTypeReference<?> elementTypeRef) {
this.bodySpec.body(producer, elementTypeRef);
this.inserter = BodyInserters.fromProducer(producer, elementTypeRef);
return this;
}
@Override
public RequestHeadersSpec<?> body(BodyInserter<?, ? super ClientHttpRequest> inserter) {
this.bodySpec.body(inserter);
this.inserter = inserter;
return this;
}
@ -304,10 +348,57 @@ class DefaultWebTestClient implements WebTestClient { @@ -304,10 +348,57 @@ class DefaultWebTestClient implements WebTestClient {
@Override
public ResponseSpec exchange() {
ClientResponse clientResponse = this.bodySpec.exchange().block(getTimeout());
Assert.state(clientResponse != null, "No ClientResponse");
ExchangeResult result = wiretapConnector.getExchangeResult(this.requestId, this.uriTemplate, getTimeout());
return new DefaultResponseSpec(result, clientResponse, getTimeout());
ClientRequest request = (this.inserter != null ?
initRequestBuilder().body(this.inserter).build() :
initRequestBuilder().build());
ClientResponse response = exchangeFunction.exchange(request).block(getResponseTimeout());
Assert.state(response != null, "No ClientResponse");
ExchangeResult result = wiretapConnector.getExchangeResult(
this.requestId, this.uriTemplate, getResponseTimeout());
return new DefaultResponseSpec(result, response, getResponseTimeout());
}
private ClientRequest.Builder initRequestBuilder() {
ClientRequest.Builder builder = ClientRequest.create(this.httpMethod, initUri())
.headers(headers -> headers.addAll(initHeaders()))
.cookies(cookies -> cookies.addAll(initCookies()))
.attributes(attributes -> attributes.putAll(this.attributes));
if (this.httpRequestConsumer != null) {
builder.httpRequest(this.httpRequestConsumer);
}
return builder;
}
private URI initUri() {
return (this.uri != null ? this.uri : uriBuilderFactory.expand(""));
}
private HttpHeaders initHeaders() {
if (CollectionUtils.isEmpty(defaultHeaders)) {
return this.headers;
}
HttpHeaders result = new HttpHeaders();
result.putAll(defaultHeaders);
result.putAll(this.headers);
return result;
}
private MultiValueMap<String, String> initCookies() {
if (CollectionUtils.isEmpty(this.cookies)) {
return (defaultCookies != null ? defaultCookies : new LinkedMultiValueMap<>());
}
else if (CollectionUtils.isEmpty(defaultCookies)) {
return this.cookies;
}
else {
MultiValueMap<String, String> result = new LinkedMultiValueMap<>();
result.putAll(defaultCookies);
result.putAll(this.cookies);
return result;
}
}
}

229
spring-test/src/main/java/org/springframework/test/web/reactive/server/DefaultWebTestClientBuilder.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,20 +17,30 @@ @@ -17,20 +17,30 @@
package org.springframework.test.web.reactive.server;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import org.springframework.http.HttpHeaders;
import org.springframework.http.client.reactive.ClientHttpConnector;
import org.springframework.http.client.reactive.HttpComponentsClientHttpConnector;
import org.springframework.http.client.reactive.JettyClientHttpConnector;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.codec.ClientCodecConfigurer;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.ExchangeFunction;
import org.springframework.web.reactive.function.client.ExchangeFunctions;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
import org.springframework.web.util.DefaultUriBuilderFactory;
import org.springframework.web.util.UriBuilderFactory;
/**
@ -41,7 +51,21 @@ import org.springframework.web.util.UriBuilderFactory; @@ -41,7 +51,21 @@ import org.springframework.web.util.UriBuilderFactory;
*/
class DefaultWebTestClientBuilder implements WebTestClient.Builder {
private final WebClient.Builder webClientBuilder;
private static final boolean reactorClientPresent;
private static final boolean jettyClientPresent;
private static final boolean httpComponentsClientPresent;
static {
ClassLoader loader = DefaultWebTestClientBuilder.class.getClassLoader();
reactorClientPresent = ClassUtils.isPresent("reactor.netty.http.client.HttpClient", loader);
jettyClientPresent = ClassUtils.isPresent("org.eclipse.jetty.client.HttpClient", loader);
httpComponentsClientPresent =
ClassUtils.isPresent("org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient", loader) &&
ClassUtils.isPresent("org.apache.hc.core5.reactive.ReactiveDataConsumer", loader);
}
@Nullable
private final WebHttpHandlerBuilder httpHandlerBuilder;
@ -49,136 +73,243 @@ class DefaultWebTestClientBuilder implements WebTestClient.Builder { @@ -49,136 +73,243 @@ class DefaultWebTestClientBuilder implements WebTestClient.Builder {
@Nullable
private final ClientHttpConnector connector;
@Nullable
private String baseUrl;
@Nullable
private UriBuilderFactory uriBuilderFactory;
@Nullable
private HttpHeaders defaultHeaders;
@Nullable
private MultiValueMap<String, String> defaultCookies;
@Nullable
private List<ExchangeFilterFunction> filters;
@Nullable
private ExchangeStrategies strategies;
@Nullable
private List<Consumer<ExchangeStrategies.Builder>> strategiesConfigurers;
@Nullable
private Duration responseTimeout;
/** Connect to server via Reactor Netty. */
/** Determine connector via classpath detection. */
DefaultWebTestClientBuilder() {
this(new ReactorClientHttpConnector());
}
/** Connect to server through the given connector. */
DefaultWebTestClientBuilder(ClientHttpConnector connector) {
this(null, null, connector, null);
this(null, null);
}
/** Connect to given mock server with mock request and response. */
/** Use HttpHandlerConnector with mock server. */
DefaultWebTestClientBuilder(WebHttpHandlerBuilder httpHandlerBuilder) {
this(null, httpHandlerBuilder, null, null);
this(httpHandlerBuilder, null);
}
/** Copy constructor. */
DefaultWebTestClientBuilder(DefaultWebTestClientBuilder other) {
this(other.webClientBuilder.clone(), other.httpHandlerBuilder, other.connector,
other.responseTimeout);
/** Use given connector. */
DefaultWebTestClientBuilder(ClientHttpConnector connector) {
this(null, connector);
}
private DefaultWebTestClientBuilder(@Nullable WebClient.Builder webClientBuilder,
@Nullable WebHttpHandlerBuilder httpHandlerBuilder, @Nullable ClientHttpConnector connector,
@Nullable Duration responseTimeout) {
DefaultWebTestClientBuilder(
@Nullable WebHttpHandlerBuilder httpHandlerBuilder, @Nullable ClientHttpConnector connector) {
Assert.isTrue(httpHandlerBuilder != null || connector != null,
"Either WebHttpHandlerBuilder or ClientHttpConnector must be provided");
Assert.isTrue(httpHandlerBuilder == null || connector == null,
"Expected WebHttpHandlerBuilder or ClientHttpConnector but not both.");
this.webClientBuilder = (webClientBuilder != null ? webClientBuilder : WebClient.builder());
this.httpHandlerBuilder = (httpHandlerBuilder != null ? httpHandlerBuilder.clone() : null);
this.connector = connector;
this.responseTimeout = responseTimeout;
this.httpHandlerBuilder = (httpHandlerBuilder != null ? httpHandlerBuilder.clone() : null);
}
/** Copy constructor. */
DefaultWebTestClientBuilder(DefaultWebTestClientBuilder other) {
this.httpHandlerBuilder = (other.httpHandlerBuilder != null ? other.httpHandlerBuilder.clone() : null);
this.connector = other.connector;
this.responseTimeout = other.responseTimeout;
this.baseUrl = other.baseUrl;
this.uriBuilderFactory = other.uriBuilderFactory;
if (other.defaultHeaders != null) {
this.defaultHeaders = new HttpHeaders();
this.defaultHeaders.putAll(other.defaultHeaders);
}
else {
this.defaultHeaders = null;
}
this.defaultCookies = (other.defaultCookies != null ?
new LinkedMultiValueMap<>(other.defaultCookies) : null);
this.filters = (other.filters != null ? new ArrayList<>(other.filters) : null);
this.strategies = other.strategies;
this.strategiesConfigurers = (other.strategiesConfigurers != null ?
new ArrayList<>(other.strategiesConfigurers) : null);
}
@Override
public WebTestClient.Builder baseUrl(String baseUrl) {
this.webClientBuilder.baseUrl(baseUrl);
this.baseUrl = baseUrl;
return this;
}
@Override
public WebTestClient.Builder uriBuilderFactory(UriBuilderFactory uriBuilderFactory) {
this.webClientBuilder.uriBuilderFactory(uriBuilderFactory);
this.uriBuilderFactory = uriBuilderFactory;
return this;
}
@Override
public WebTestClient.Builder defaultHeader(String headerName, String... headerValues) {
this.webClientBuilder.defaultHeader(headerName, headerValues);
public WebTestClient.Builder defaultHeader(String header, String... values) {
initHeaders().put(header, Arrays.asList(values));
return this;
}
@Override
public WebTestClient.Builder defaultHeaders(Consumer<HttpHeaders> headersConsumer) {
this.webClientBuilder.defaultHeaders(headersConsumer);
headersConsumer.accept(initHeaders());
return this;
}
private HttpHeaders initHeaders() {
if (this.defaultHeaders == null) {
this.defaultHeaders = new HttpHeaders();
}
return this.defaultHeaders;
}
@Override
public WebTestClient.Builder defaultCookie(String cookieName, String... cookieValues) {
this.webClientBuilder.defaultCookie(cookieName, cookieValues);
public WebTestClient.Builder defaultCookie(String cookie, String... values) {
initCookies().addAll(cookie, Arrays.asList(values));
return this;
}
@Override
public WebTestClient.Builder defaultCookies(
Consumer<MultiValueMap<String, String>> cookiesConsumer) {
this.webClientBuilder.defaultCookies(cookiesConsumer);
public WebTestClient.Builder defaultCookies(Consumer<MultiValueMap<String, String>> cookiesConsumer) {
cookiesConsumer.accept(initCookies());
return this;
}
private MultiValueMap<String, String> initCookies() {
if (this.defaultCookies == null) {
this.defaultCookies = new LinkedMultiValueMap<>(3);
}
return this.defaultCookies;
}
@Override
public WebTestClient.Builder filter(ExchangeFilterFunction filter) {
this.webClientBuilder.filter(filter);
Assert.notNull(filter, "ExchangeFilterFunction must not be null");
initFilters().add(filter);
return this;
}
@Override
public WebTestClient.Builder filters(Consumer<List<ExchangeFilterFunction>> filtersConsumer) {
this.webClientBuilder.filters(filtersConsumer);
filtersConsumer.accept(initFilters());
return this;
}
private List<ExchangeFilterFunction> initFilters() {
if (this.filters == null) {
this.filters = new ArrayList<>();
}
return this.filters;
}
@Override
public WebTestClient.Builder codecs(Consumer<ClientCodecConfigurer> configurer) {
this.webClientBuilder.codecs(configurer);
if (this.strategiesConfigurers == null) {
this.strategiesConfigurers = new ArrayList<>(4);
}
this.strategiesConfigurers.add(builder -> builder.codecs(configurer));
return this;
}
@Override
public WebTestClient.Builder exchangeStrategies(ExchangeStrategies strategies) {
this.webClientBuilder.exchangeStrategies(strategies);
this.strategies = strategies;
return this;
}
@SuppressWarnings("deprecation")
@Override
@SuppressWarnings("deprecation")
public WebTestClient.Builder exchangeStrategies(Consumer<ExchangeStrategies.Builder> configurer) {
this.webClientBuilder.exchangeStrategies(configurer);
if (this.strategiesConfigurers == null) {
this.strategiesConfigurers = new ArrayList<>(4);
}
this.strategiesConfigurers.add(configurer);
return this;
}
@Override
public WebTestClient.Builder responseTimeout(Duration timeout) {
this.responseTimeout = timeout;
public WebTestClient.Builder apply(WebTestClientConfigurer configurer) {
configurer.afterConfigurerAdded(this, this.httpHandlerBuilder, this.connector);
return this;
}
@Override
public WebTestClient.Builder apply(WebTestClientConfigurer configurer) {
configurer.afterConfigurerAdded(this, this.httpHandlerBuilder, this.connector);
public WebTestClient.Builder responseTimeout(Duration timeout) {
this.responseTimeout = timeout;
return this;
}
@Override
public WebTestClient build() {
ClientHttpConnector connectorToUse = this.connector;
if (connectorToUse == null) {
Assert.state(this.httpHandlerBuilder != null, "No WebHttpHandlerBuilder available");
connectorToUse = new HttpHandlerConnector(this.httpHandlerBuilder.build());
if (this.httpHandlerBuilder != null) {
connectorToUse = new HttpHandlerConnector(this.httpHandlerBuilder.build());
}
}
if (connectorToUse == null) {
connectorToUse = initConnector();
}
Function<ClientHttpConnector, ExchangeFunction> exchangeFactory = connector -> {
ExchangeFunction exchange = ExchangeFunctions.create(connector, initExchangeStrategies());
if (CollectionUtils.isEmpty(this.filters)) {
return exchange;
}
return this.filters.stream()
.reduce(ExchangeFilterFunction::andThen)
.map(filter -> filter.apply(exchange))
.orElse(exchange);
};
return new DefaultWebTestClient(connectorToUse, exchangeFactory, initUriBuilderFactory(),
this.defaultHeaders != null ? HttpHeaders.readOnlyHttpHeaders(this.defaultHeaders) : null,
this.defaultCookies != null ? CollectionUtils.unmodifiableMultiValueMap(this.defaultCookies) : null,
this.responseTimeout, new DefaultWebTestClientBuilder(this));
}
private static ClientHttpConnector initConnector() {
if (reactorClientPresent) {
return new ReactorClientHttpConnector();
}
else if (jettyClientPresent) {
return new JettyClientHttpConnector();
}
else if (httpComponentsClientPresent) {
return new HttpComponentsClientHttpConnector();
}
throw new IllegalStateException("No suitable default ClientHttpConnector found");
}
return new DefaultWebTestClient(this.webClientBuilder,
connectorToUse, this.responseTimeout, new DefaultWebTestClientBuilder(this));
private ExchangeStrategies initExchangeStrategies() {
if (CollectionUtils.isEmpty(this.strategiesConfigurers)) {
return (this.strategies != null ? this.strategies : ExchangeStrategies.withDefaults());
}
ExchangeStrategies.Builder builder =
(this.strategies != null ? this.strategies.mutate() : ExchangeStrategies.builder());
this.strategiesConfigurers.forEach(configurer -> configurer.accept(builder));
return builder.build();
}
private UriBuilderFactory initUriBuilderFactory() {
if (this.uriBuilderFactory != null) {
return this.uriBuilderFactory;
}
return (this.baseUrl != null ?
new DefaultUriBuilderFactory(this.baseUrl) : new DefaultUriBuilderFactory());
}
}

24
spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientResponse.java

@ -45,30 +45,6 @@ import org.springframework.web.reactive.function.BodyExtractor; @@ -45,30 +45,6 @@ import org.springframework.web.reactive.function.BodyExtractor;
* {@link ExchangeFunction}. Provides access to the response status and
* headers, and also methods to consume the response body.
*
* <p><strong>NOTE:</strong> When using a {@link ClientResponse}
* through the {@code WebClient}
* {@link WebClient.RequestHeadersSpec#exchange() exchange()} method,
* you have to make sure that the body is consumed or released by using
* one of the following methods:
* <ul>
* <li>{@link #body(BodyExtractor)}</li>
* <li>{@link #bodyToMono(Class)} or
* {@link #bodyToMono(ParameterizedTypeReference)}</li>
* <li>{@link #bodyToFlux(Class)} or
* {@link #bodyToFlux(ParameterizedTypeReference)}</li>
* <li>{@link #toEntity(Class)} or
* {@link #toEntity(ParameterizedTypeReference)}</li>
* <li>{@link #toEntityList(Class)} or
* {@link #toEntityList(ParameterizedTypeReference)}</li>
* <li>{@link #toBodilessEntity()}</li>
* <li>{@link #releaseBody()}</li>
* </ul>
* You can also use {@code bodyToMono(Void.class)} if no response content is
* expected. However keep in mind the connection will be closed, instead of
* being placed back in the pool, if any content does arrive. This is in
* contrast to {@link #releaseBody()} which does consume the full body and
* releases any content received.
*
* @author Brian Clozel
* @author Arjen Poutsma
* @since 5.0

129
spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java

@ -49,7 +49,6 @@ import org.springframework.util.LinkedMultiValueMap; @@ -49,7 +49,6 @@ import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.reactive.function.BodyInserter;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.util.DefaultUriBuilderFactory;
import org.springframework.web.util.UriBuilder;
import org.springframework.web.util.UriBuilderFactory;
@ -85,12 +84,12 @@ class DefaultWebClient implements WebClient { @@ -85,12 +84,12 @@ class DefaultWebClient implements WebClient {
private final DefaultWebClientBuilder builder;
DefaultWebClient(ExchangeFunction exchangeFunction, @Nullable UriBuilderFactory factory,
DefaultWebClient(ExchangeFunction exchangeFunction, UriBuilderFactory uriBuilderFactory,
@Nullable HttpHeaders defaultHeaders, @Nullable MultiValueMap<String, String> defaultCookies,
@Nullable Consumer<RequestHeadersSpec<?>> defaultRequest, DefaultWebClientBuilder builder) {
this.exchangeFunction = exchangeFunction;
this.uriBuilderFactory = (factory != null ? factory : new DefaultUriBuilderFactory());
this.uriBuilderFactory = uriBuilderFactory;
this.defaultHeaders = defaultHeaders;
this.defaultCookies = defaultCookies;
this.defaultRequest = defaultRequest;
@ -147,6 +146,14 @@ class DefaultWebClient implements WebClient { @@ -147,6 +146,14 @@ class DefaultWebClient implements WebClient {
return new DefaultWebClientBuilder(this.builder);
}
private static Mono<Void> releaseIfNotConsumed(ClientResponse response) {
return response.releaseBody().onErrorResume(ex2 -> Mono.empty());
}
private static <T> Mono<T> releaseIfNotConsumed(ClientResponse response, Throwable ex) {
return response.releaseBody().onErrorResume(ex2 -> Mono.empty()).then(Mono.error(ex));
}
private class DefaultRequestBodyUriSpec implements RequestBodyUriSpec {
@ -243,13 +250,6 @@ class DefaultWebClient implements WebClient { @@ -243,13 +250,6 @@ class DefaultWebClient implements WebClient {
return this;
}
@Override
public RequestBodySpec httpRequest(Consumer<ClientHttpRequest> requestConsumer) {
this.httpRequestConsumer = (this.httpRequestConsumer != null ?
this.httpRequestConsumer.andThen(requestConsumer) : requestConsumer);
return this;
}
@Override
public DefaultRequestBodyUriSpec accept(MediaType... acceptableMediaTypes) {
getHeaders().setAccept(Arrays.asList(acceptableMediaTypes));
@ -298,6 +298,13 @@ class DefaultWebClient implements WebClient { @@ -298,6 +298,13 @@ class DefaultWebClient implements WebClient {
return this;
}
@Override
public RequestBodySpec httpRequest(Consumer<ClientHttpRequest> requestConsumer) {
this.httpRequestConsumer = (this.httpRequestConsumer != null ?
this.httpRequestConsumer.andThen(requestConsumer) : requestConsumer);
return this;
}
@Override
public RequestHeadersSpec<?> bodyValue(Object body) {
this.inserter = BodyInserters.fromValue(body);
@ -342,6 +349,65 @@ class DefaultWebClient implements WebClient { @@ -342,6 +349,65 @@ class DefaultWebClient implements WebClient {
}
@Override
public ResponseSpec retrieve() {
return new DefaultResponseSpec(exchange(), this::createRequest);
}
private HttpRequest createRequest() {
return new HttpRequest() {
private final URI uri = initUri();
private final HttpHeaders headers = initHeaders();
@Override
public HttpMethod getMethod() {
return httpMethod;
}
@Override
public String getMethodValue() {
return httpMethod.name();
}
@Override
public URI getURI() {
return this.uri;
}
@Override
public HttpHeaders getHeaders() {
return this.headers;
}
};
}
@Override
public <V> Mono<V> exchangeToMono(Function<ClientResponse, ? extends Mono<V>> responseHandler) {
return exchange().flatMap(response -> {
try {
return responseHandler.apply(response)
.flatMap(value -> releaseIfNotConsumed(response).thenReturn(value))
.switchIfEmpty(Mono.defer(() -> releaseIfNotConsumed(response).then(Mono.empty())))
.onErrorResume(ex -> releaseIfNotConsumed(response, ex));
}
catch (Throwable ex) {
return releaseIfNotConsumed(response, ex);
}
});
}
@Override
public <V> Flux<V> exchangeToFlux(Function<ClientResponse, ? extends Flux<V>> responseHandler) {
return exchange().flatMapMany(response -> {
try {
return responseHandler.apply(response)
.concatWith(Flux.defer(() -> releaseIfNotConsumed(response).then(Mono.empty())))
.onErrorResume(ex -> releaseIfNotConsumed(response, ex));
}
catch (Throwable ex) {
return releaseIfNotConsumed(response, ex);
}
});
}
@Override
@SuppressWarnings("deprecation")
public Mono<ClientResponse> exchange() {
ClientRequest request = (this.inserter != null ?
initRequestBuilder().body(this.inserter).build() :
@ -398,35 +464,6 @@ class DefaultWebClient implements WebClient { @@ -398,35 +464,6 @@ class DefaultWebClient implements WebClient {
return result;
}
}
@Override
public ResponseSpec retrieve() {
return new DefaultResponseSpec(exchange(), this::createRequest);
}
private HttpRequest createRequest() {
return new HttpRequest() {
private final URI uri = initUri();
private final HttpHeaders headers = initHeaders();
@Override
public HttpMethod getMethod() {
return httpMethod;
}
@Override
public String getMethodValue() {
return httpMethod.name();
}
@Override
public URI getURI() {
return this.uri;
}
@Override
public HttpHeaders getHeaders() {
return this.headers;
}
};
}
}
@ -530,11 +567,11 @@ class DefaultWebClient implements WebClient { @@ -530,11 +567,11 @@ class DefaultWebClient implements WebClient {
Mono<? extends Throwable> exMono;
try {
exMono = handler.apply(response);
exMono = exMono.flatMap(ex -> drainBody(response, ex));
exMono = exMono.onErrorResume(ex -> drainBody(response, ex));
exMono = exMono.flatMap(ex -> releaseIfNotConsumed(response, ex));
exMono = exMono.onErrorResume(ex -> releaseIfNotConsumed(response, ex));
}
catch (Throwable ex2) {
exMono = drainBody(response, ex2);
exMono = releaseIfNotConsumed(response, ex2);
}
Mono<T> result = exMono.flatMap(Mono::error);
HttpRequest request = this.requestSupplier.get();
@ -544,14 +581,6 @@ class DefaultWebClient implements WebClient { @@ -544,14 +581,6 @@ class DefaultWebClient implements WebClient {
return null;
}
@SuppressWarnings("unchecked")
private <T> Mono<T> drainBody(ClientResponse response, Throwable ex) {
// Ensure the body is drained, even if the StatusHandler didn't consume it,
// but ignore exception, in case the handler did consume.
return (Mono<T>) response.releaseBody()
.onErrorResume(ex2 -> Mono.empty()).thenReturn(ex);
}
private <T> Mono<T> insertCheckpoint(Mono<T> result, int statusCode, HttpRequest request) {
String httpMethod = request.getMethodValue();
URI uri = request.getURI();

14
spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClientBuilder.java

@ -262,24 +262,26 @@ final class DefaultWebClientBuilder implements WebClient.Builder { @@ -262,24 +262,26 @@ final class DefaultWebClientBuilder implements WebClient.Builder {
@Override
public WebClient build() {
ClientHttpConnector connectorToUse =
(this.connector != null ? this.connector : initConnector());
ExchangeFunction exchange = (this.exchangeFunction == null ?
ExchangeFunctions.create(getOrInitConnector(), initExchangeStrategies()) :
ExchangeFunctions.create(connectorToUse, initExchangeStrategies()) :
this.exchangeFunction);
ExchangeFunction filteredExchange = (this.filters != null ? this.filters.stream()
.reduce(ExchangeFilterFunction::andThen)
.map(filter -> filter.apply(exchange))
.orElse(exchange) : exchange);
return new DefaultWebClient(filteredExchange, initUriBuilderFactory(),
this.defaultHeaders != null ? HttpHeaders.readOnlyHttpHeaders(this.defaultHeaders) : null,
this.defaultCookies != null ? CollectionUtils.unmodifiableMultiValueMap(this.defaultCookies) : null,
this.defaultRequest, new DefaultWebClientBuilder(this));
}
private ClientHttpConnector getOrInitConnector() {
if (this.connector != null) {
return this.connector;
}
else if (reactorClientPresent) {
private ClientHttpConnector initConnector() {
if (reactorClientPresent) {
return new ReactorClientHttpConnector();
}
else if (jettyClientPresent) {

97
spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java

@ -57,7 +57,8 @@ import org.springframework.web.util.UriBuilderFactory; @@ -57,7 +57,8 @@ import org.springframework.web.util.UriBuilderFactory;
* <p>For examples with a response body see:
* <ul>
* <li>{@link RequestHeadersSpec#retrieve() retrieve()}
* <li>{@link RequestHeadersSpec#exchange() exchange()}
* <li>{@link RequestHeadersSpec#exchangeToMono(Function) exchangeToMono()}
* <li>{@link RequestHeadersSpec#exchangeToFlux(Function) exchangeToFlux()}
* </ul>
* <p>For examples with a request body see:
* <ul>
@ -252,8 +253,7 @@ public interface WebClient { @@ -252,8 +253,7 @@ public interface WebClient {
Builder defaultCookies(Consumer<MultiValueMap<String, String>> cookiesConsumer);
/**
* Provide a consumer to modify every request being built just before the
* call to {@link RequestHeadersSpec#exchange() exchange()}.
* Provide a consumer to customize every request being built.
* @param defaultRequest the consumer to use for modifying requests
* @since 5.1
*/
@ -483,21 +483,93 @@ public interface WebClient { @@ -483,21 +483,93 @@ public interface WebClient {
S httpRequest(Consumer<ClientHttpRequest> requestConsumer);
/**
* Perform the HTTP request and retrieve the response body:
* Proceed to declare how to extract the response. For example to extract
* a {@link ResponseEntity} with status, headers, and body:
* <p><pre>
* Mono&lt;Person&gt; bodyMono = client.get()
* Mono&lt;ResponseEntity&lt;Person&gt;&gt; entityMono = client.get()
* .uri("/persons/1")
* .accept(MediaType.APPLICATION_JSON)
* .retrieve()
* .toEntity(Person.class);
* </pre>
* <p>Or if interested only in the body:
* <p><pre>
* Mono&lt;Person&gt; entityMono = client.get()
* .uri("/persons/1")
* .accept(MediaType.APPLICATION_JSON)
* .retrieve()
* .bodyToMono(Person.class);
* </pre>
* <p>This method is a shortcut to using {@link #exchange()} and
* decoding the response body through {@link ClientResponse}.
* @return {@code ResponseSpec} to specify how to decode the body
* @see #exchange()
* <p>By default, 4xx and 5xx responses result in a
* {@link WebClientResponseException}. To customize error handling, use
* {@link ResponseSpec#onStatus(Predicate, Function) onStatus} handlers.
*/
ResponseSpec retrieve();
/**
* An alternative to {@link #retrieve()} that provides more control via
* access to the {@link ClientResponse}. This can be useful for advanced
* scenarios, for example to decode the response differently depending
* on the response status:
* <p><pre>
* Mono&lt;Object&gt; entityMono = client.get()
* .uri("/persons/1")
* .accept(MediaType.APPLICATION_JSON)
* .exchangeToMono(response -> {
* if (response.statusCode().equals(HttpStatus.OK)) {
* return response.bodyToMono(Person.class);
* }
* else if (response.statusCode().is4xxClientError()) {
* return response.bodyToMono(ErrorContainer.class);
* }
* else {
* return Mono.error(response.createException());
* }
* });
* </pre>
* <p><strong>Note:</strong> After the returned {@code Mono} completes,
* the response body is automatically released if it hasn't been consumed.
* If the response content is needed, the provided function must declare
* how to decode it.
* @param responseHandler the function to handle the response with
* @param <V> the type of Object the response will be transformed to
* @return a {@code Mono} produced from the response
* @since 5.3
*/
<V> Mono<V> exchangeToMono(Function<ClientResponse, ? extends Mono<V>> responseHandler);
/**
* An alternative to {@link #retrieve()} that provides more control via
* access to the {@link ClientResponse}. This can be useful for advanced
* scenarios, for example to decode the response differently depending
* on the response status:
* <p><pre>
* Mono&lt;Object&gt; entityMono = client.get()
* .uri("/persons")
* .accept(MediaType.APPLICATION_JSON)
* .exchangeToFlux(response -> {
* if (response.statusCode().equals(HttpStatus.OK)) {
* return response.bodyToFlux(Person.class);
* }
* else if (response.statusCode().is4xxClientError()) {
* return response.bodyToMono(ErrorContainer.class).flux();
* }
* else {
* return Flux.error(response.createException());
* }
* });
* </pre>
* <p><strong>Note:</strong> After the returned {@code Flux} completes,
* the response body is automatically released if it hasn't been consumed.
* If the response content is needed, the provided function must declare
* how to decode it.
* @param responseHandler the function to handle the response with
* @param <V> the type of Objects the response will be transformed to
* @return a {@code Flux} of Objects produced from the response
* @since 5.3
*/
<V> Flux<V> exchangeToFlux(Function<ClientResponse, ? extends Flux<V>> responseHandler);
/**
* Perform the HTTP request and return a {@link ClientResponse} with the
* response status and headers. You can then use methods of the response
@ -526,7 +598,14 @@ public interface WebClient { @@ -526,7 +598,14 @@ public interface WebClient {
* if to consume the response.
* @return a {@code Mono} for the response
* @see #retrieve()
* @deprecated since 5.3 due to the possibility to leak memory and/or
* connections; please, use {@link #exchangeToMono(Function)},
* {@link #exchangeToFlux(Function)}; consider also using
* {@link #retrieve()} which provides access to the response status
* and headers via {@link ResponseEntity} along with error status
* handling.
*/
@Deprecated
Mono<ClientResponse> exchange();
}

3
spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/client/WebClientExtensions.kt

@ -17,8 +17,8 @@ @@ -17,8 +17,8 @@
package org.springframework.web.reactive.function.client
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.awaitSingle
import org.reactivestreams.Publisher
import org.springframework.core.ParameterizedTypeReference
import org.springframework.web.reactive.function.client.WebClient.RequestBodySpec
@ -69,6 +69,7 @@ inline fun <reified T : Any> RequestBodySpec.body(producer: Any): RequestHeaders @@ -69,6 +69,7 @@ inline fun <reified T : Any> RequestBodySpec.body(producer: Any): RequestHeaders
* @author Sebastien Deleuze
* @since 5.2
*/
@Suppress("DEPRECATION")
suspend fun RequestHeadersSpec<out RequestHeadersSpec<*>>.awaitExchange(): ClientResponse =
exchange().awaitSingle()

16
spring-webflux/src/test/java/org/springframework/web/reactive/function/MultipartIntegrationTests.java

@ -29,13 +29,13 @@ import reactor.test.StepVerifier; @@ -29,13 +29,13 @@ import reactor.test.StepVerifier;
import org.springframework.core.io.ClassPathResource;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.MultipartBodyBuilder;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.http.codec.multipart.FormFieldPart;
import org.springframework.http.codec.multipart.Part;
import org.springframework.util.FileCopyUtils;
import org.springframework.util.MultiValueMap;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.server.AbstractRouterFunctionIntegrationTests;
import org.springframework.web.reactive.function.server.RouterFunction;
@ -62,15 +62,16 @@ class MultipartIntegrationTests extends AbstractRouterFunctionIntegrationTests { @@ -62,15 +62,16 @@ class MultipartIntegrationTests extends AbstractRouterFunctionIntegrationTests {
void multipartData(HttpServer httpServer) throws Exception {
startServer(httpServer);
Mono<ClientResponse> result = webClient
Mono<ResponseEntity<Void>> result = webClient
.post()
.uri("http://localhost:" + this.port + "/multipartData")
.bodyValue(generateBody())
.exchange();
.retrieve()
.toEntity(Void.class);
StepVerifier
.create(result)
.consumeNextWith(response -> assertThat(response.statusCode()).isEqualTo(HttpStatus.OK))
.consumeNextWith(entity -> assertThat(entity.getStatusCode()).isEqualTo(HttpStatus.OK))
.verifyComplete();
}
@ -78,15 +79,16 @@ class MultipartIntegrationTests extends AbstractRouterFunctionIntegrationTests { @@ -78,15 +79,16 @@ class MultipartIntegrationTests extends AbstractRouterFunctionIntegrationTests {
void parts(HttpServer httpServer) throws Exception {
startServer(httpServer);
Mono<ClientResponse> result = webClient
Mono<ResponseEntity<Void>> result = webClient
.post()
.uri("http://localhost:" + this.port + "/parts")
.bodyValue(generateBody())
.exchange();
.retrieve()
.toEntity(Void.class);
StepVerifier
.create(result)
.consumeNextWith(response -> assertThat(response.statusCode()).isEqualTo(HttpStatus.OK))
.consumeNextWith(entity -> assertThat(entity.getStatusCode()).isEqualTo(HttpStatus.OK))
.verifyComplete();
}

39
spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultWebClientTests.java

@ -43,6 +43,7 @@ import static org.assertj.core.api.Assertions.assertThat; @@ -43,6 +43,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.when;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
@ -69,6 +70,7 @@ public class DefaultWebClientTests { @@ -69,6 +70,7 @@ public class DefaultWebClientTests {
@BeforeEach
public void setup() {
ClientResponse mockResponse = mock(ClientResponse.class);
when(mockResponse.bodyToMono(Void.class)).thenReturn(Mono.empty());
given(this.exchangeFunction.exchange(this.captor.capture())).willReturn(Mono.just(mockResponse));
this.builder = WebClient.builder().baseUrl("/base").exchangeFunction(this.exchangeFunction);
}
@ -77,7 +79,7 @@ public class DefaultWebClientTests { @@ -77,7 +79,7 @@ public class DefaultWebClientTests {
@Test
public void basic() {
this.builder.build().get().uri("/path")
.exchange().block(Duration.ofSeconds(10));
.retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10));
ClientRequest request = verifyAndGetRequest();
assertThat(request.url().toString()).isEqualTo("/base/path");
@ -89,7 +91,7 @@ public class DefaultWebClientTests { @@ -89,7 +91,7 @@ public class DefaultWebClientTests {
public void uriBuilder() {
this.builder.build().get()
.uri(builder -> builder.path("/path").queryParam("q", "12").build())
.exchange().block(Duration.ofSeconds(10));
.retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10));
ClientRequest request = verifyAndGetRequest();
assertThat(request.url().toString()).isEqualTo("/base/path?q=12");
@ -98,8 +100,8 @@ public class DefaultWebClientTests { @@ -98,8 +100,8 @@ public class DefaultWebClientTests {
@Test // gh-22705
public void uriBuilderWithUriTemplate() {
this.builder.build().get()
.uri("/path/{id}", builder -> builder.queryParam("q", "12").build("identifier"))
.exchange().block(Duration.ofSeconds(10));
.uri("/path/{id}", builder -> builder.queryParam("q", "12").build("identifier"))
.retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10));
ClientRequest request = verifyAndGetRequest();
assertThat(request.url().toString()).isEqualTo("/base/path/identifier?q=12");
@ -110,7 +112,7 @@ public class DefaultWebClientTests { @@ -110,7 +112,7 @@ public class DefaultWebClientTests {
public void uriBuilderWithPathOverride() {
this.builder.build().get()
.uri(builder -> builder.replacePath("/path").build())
.exchange().block(Duration.ofSeconds(10));
.retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10));
ClientRequest request = verifyAndGetRequest();
assertThat(request.url().toString()).isEqualTo("/path");
@ -120,7 +122,7 @@ public class DefaultWebClientTests { @@ -120,7 +122,7 @@ public class DefaultWebClientTests {
public void requestHeaderAndCookie() {
this.builder.build().get().uri("/path").accept(MediaType.APPLICATION_JSON)
.cookies(cookies -> cookies.add("id", "123")) // SPR-16178
.exchange().block(Duration.ofSeconds(10));
.retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10));
ClientRequest request = verifyAndGetRequest();
assertThat(request.headers().getFirst("Accept")).isEqualTo("application/json");
@ -131,7 +133,7 @@ public class DefaultWebClientTests { @@ -131,7 +133,7 @@ public class DefaultWebClientTests {
public void httpRequest() {
this.builder.build().get().uri("/path")
.httpRequest(httpRequest -> {})
.exchange().block(Duration.ofSeconds(10));
.retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10));
ClientRequest request = verifyAndGetRequest();
assertThat(request.httpRequest()).isNotNull();
@ -143,7 +145,8 @@ public class DefaultWebClientTests { @@ -143,7 +145,8 @@ public class DefaultWebClientTests {
.defaultHeader("Accept", "application/json").defaultCookie("id", "123")
.build();
client.get().uri("/path").exchange().block(Duration.ofSeconds(10));
client.get().uri("/path")
.retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10));
ClientRequest request = verifyAndGetRequest();
assertThat(request.headers().getFirst("Accept")).isEqualTo("application/json");
@ -160,7 +163,7 @@ public class DefaultWebClientTests { @@ -160,7 +163,7 @@ public class DefaultWebClientTests {
client.get().uri("/path")
.header("Accept", "application/xml")
.cookie("id", "456")
.exchange().block(Duration.ofSeconds(10));
.retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10));
ClientRequest request = verifyAndGetRequest();
assertThat(request.headers().getFirst("Accept")).isEqualTo("application/xml");
@ -185,7 +188,7 @@ public class DefaultWebClientTests { @@ -185,7 +188,7 @@ public class DefaultWebClientTests {
try {
context.set("bar");
client.get().uri("/path").attribute("foo", "bar")
.exchange().block(Duration.ofSeconds(10));
.retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10));
}
finally {
context.remove();
@ -271,7 +274,7 @@ public class DefaultWebClientTests { @@ -271,7 +274,7 @@ public class DefaultWebClientTests {
this.builder.filter(filter).build()
.get().uri("/path")
.attribute("foo", "bar")
.exchange().block(Duration.ofSeconds(10));
.retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10));
assertThat(actual.get("foo")).isEqualTo("bar");
@ -290,7 +293,7 @@ public class DefaultWebClientTests { @@ -290,7 +293,7 @@ public class DefaultWebClientTests {
this.builder.filter(filter).build()
.get().uri("/path")
.attribute("foo", null)
.exchange().block(Duration.ofSeconds(10));
.retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10));
assertThat(actual.get("foo")).isNull();
@ -306,7 +309,7 @@ public class DefaultWebClientTests { @@ -306,7 +309,7 @@ public class DefaultWebClientTests {
.defaultCookie("id", "123"))
.build();
client.get().uri("/path").exchange().block(Duration.ofSeconds(10));
client.get().uri("/path").retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10));
ClientRequest request = verifyAndGetRequest();
assertThat(request.headers().getFirst("Accept")).isEqualTo("application/json");
@ -317,8 +320,8 @@ public class DefaultWebClientTests { @@ -317,8 +320,8 @@ public class DefaultWebClientTests {
public void switchToErrorOnEmptyClientResponseMono() {
ExchangeFunction exchangeFunction = mock(ExchangeFunction.class);
given(exchangeFunction.exchange(any())).willReturn(Mono.empty());
WebClient.Builder builder = WebClient.builder().baseUrl("/base").exchangeFunction(exchangeFunction);
StepVerifier.create(builder.build().get().uri("/path").exchange())
WebClient client = WebClient.builder().baseUrl("/base").exchangeFunction(exchangeFunction).build();
StepVerifier.create(client.get().uri("/path").retrieve().bodyToMono(Void.class))
.expectErrorMessage("The underlying HTTP client completed without emitting a response.")
.verify(Duration.ofSeconds(5));
}
@ -333,9 +336,11 @@ public class DefaultWebClientTests { @@ -333,9 +336,11 @@ public class DefaultWebClientTests {
.build())
)
.build();
Mono<ClientResponse> exchange = client.get().uri("/path").exchange();
Mono<Void> result = client.get().uri("/path").retrieve().bodyToMono(Void.class);
verifyNoInteractions(this.exchangeFunction);
exchange.block(Duration.ofSeconds(10));
result.block(Duration.ofSeconds(10));
ClientRequest request = verifyAndGetRequest();
assertThat(request.headers().getFirst("Custom")).isEqualTo("value");
}

7
spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientDataBufferAllocatingTests.java

@ -182,9 +182,7 @@ class WebClientDataBufferAllocatingTests extends AbstractDataBufferAllocatingTes @@ -182,9 +182,7 @@ class WebClientDataBufferAllocatingTests extends AbstractDataBufferAllocatingTes
.setBody("foo bar"));
Mono<Void> result = this.webClient.get()
.exchange()
.flatMap(ClientResponse::releaseBody);
.exchangeToMono(ClientResponse::releaseBody);
StepVerifier.create(result)
.expectComplete()
@ -201,8 +199,7 @@ class WebClientDataBufferAllocatingTests extends AbstractDataBufferAllocatingTes @@ -201,8 +199,7 @@ class WebClientDataBufferAllocatingTests extends AbstractDataBufferAllocatingTes
.setBody("foo bar"));
Mono<ResponseEntity<Void>> result = this.webClient.get()
.exchange()
.flatMap(ClientResponse::toBodilessEntity);
.exchangeToMono(ClientResponse::toBodilessEntity);
StepVerifier.create(result)
.assertNext(entity -> {

45
spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java

@ -837,8 +837,7 @@ class WebClientIntegrationTests { @@ -837,8 +837,7 @@ class WebClientIntegrationTests {
Mono<String> result = this.webClient.get()
.uri("/greeting")
.header("X-Test-Header", "testvalue")
.exchange()
.flatMap(response -> response.bodyToMono(String.class));
.retrieve().bodyToMono(String.class);
StepVerifier.create(result)
.expectNext("Hello Spring!")
@ -862,8 +861,7 @@ class WebClientIntegrationTests { @@ -862,8 +861,7 @@ class WebClientIntegrationTests {
Mono<ResponseEntity<Pojo>> result = this.webClient.get()
.uri("/json").accept(MediaType.APPLICATION_JSON)
.exchange()
.flatMap(response -> response.toEntity(Pojo.class));
.retrieve().toEntity(Pojo.class);
StepVerifier.create(result)
.consumeNextWith(entity -> {
@ -890,8 +888,7 @@ class WebClientIntegrationTests { @@ -890,8 +888,7 @@ class WebClientIntegrationTests {
Mono<ResponseEntity<Void>> result = this.webClient.get()
.uri("/json").accept(MediaType.APPLICATION_JSON)
.exchange()
.flatMap(ClientResponse::toBodilessEntity);
.retrieve().toBodilessEntity();
StepVerifier.create(result)
.consumeNextWith(entity -> {
@ -919,8 +916,7 @@ class WebClientIntegrationTests { @@ -919,8 +916,7 @@ class WebClientIntegrationTests {
Mono<ResponseEntity<List<Pojo>>> result = this.webClient.get()
.uri("/json").accept(MediaType.APPLICATION_JSON)
.exchange()
.flatMap(response -> response.toEntityList(Pojo.class));
.retrieve().toEntityList(Pojo.class);
StepVerifier.create(result)
.consumeNextWith(entity -> {
@ -948,8 +944,7 @@ class WebClientIntegrationTests { @@ -948,8 +944,7 @@ class WebClientIntegrationTests {
Mono<ResponseEntity<Void>> result = this.webClient.get()
.uri("/noContent")
.exchange()
.flatMap(response -> response.toEntity(Void.class));
.retrieve().toBodilessEntity();
StepVerifier.create(result)
.assertNext(r -> assertThat(r.getStatusCode().is2xxSuccessful()).isTrue())
@ -963,10 +958,11 @@ class WebClientIntegrationTests { @@ -963,10 +958,11 @@ class WebClientIntegrationTests {
prepareResponse(response -> response.setResponseCode(404)
.setHeader("Content-Type", "text/plain").setBody("Not Found"));
Mono<ClientResponse> result = this.webClient.get().uri("/greeting").exchange();
Mono<ResponseEntity<Void>> result = this.webClient.get().uri("/greeting")
.exchangeToMono(ClientResponse::toBodilessEntity);
StepVerifier.create(result)
.consumeNextWith(response -> assertThat(response.statusCode()).isEqualTo(HttpStatus.NOT_FOUND))
.consumeNextWith(entity -> assertThat(entity.getStatusCode()).isEqualTo(HttpStatus.NOT_FOUND))
.expectComplete()
.verify(Duration.ofSeconds(3));
@ -987,12 +983,12 @@ class WebClientIntegrationTests { @@ -987,12 +983,12 @@ class WebClientIntegrationTests {
prepareResponse(response -> response.setResponseCode(errorStatus)
.setHeader("Content-Type", "text/plain").setBody(errorMessage));
Mono<ClientResponse> result = this.webClient.get()
Mono<ResponseEntity<Void>> result = this.webClient.get()
.uri("/unknownPage")
.exchange();
.exchangeToMono(ClientResponse::toBodilessEntity);
StepVerifier.create(result)
.consumeNextWith(response -> assertThat(response.rawStatusCode()).isEqualTo(555))
.consumeNextWith(entity -> assertThat(entity.getStatusCodeValue()).isEqualTo(555))
.expectComplete()
.verify(Duration.ofSeconds(3));
@ -1008,7 +1004,8 @@ class WebClientIntegrationTests { @@ -1008,7 +1004,8 @@ class WebClientIntegrationTests {
startServer(connector);
String uri = "/api/v4/groups/1";
Mono<ClientResponse> responseMono = WebClient.builder().build().get().uri(uri).exchange();
Mono<ResponseEntity<Void>> responseMono = WebClient.builder().build().get().uri(uri)
.retrieve().toBodilessEntity();
StepVerifier.create(responseMono)
.expectErrorSatisfies(throwable -> {
@ -1103,12 +1100,9 @@ class WebClientIntegrationTests { @@ -1103,12 +1100,9 @@ class WebClientIntegrationTests {
.addHeader("Set-Cookie", "testkey2=testvalue2; Max-Age=42; HttpOnly; SameSite=Lax; Secure")
.setBody("test"));
Mono<ClientResponse> result = this.webClient.get()
this.webClient.get()
.uri("/test")
.exchange();
StepVerifier.create(result)
.consumeNextWith(response -> {
.exchangeToMono(response -> {
assertThat(response.cookies()).containsOnlyKeys("testkey1", "testkey2");
ResponseCookie cookie1 = response.cookies().get("testkey1").get(0);
@ -1123,9 +1117,10 @@ class WebClientIntegrationTests { @@ -1123,9 +1117,10 @@ class WebClientIntegrationTests {
assertThat(cookie2.isHttpOnly()).isTrue();
assertThat(cookie2.getSameSite()).isEqualTo("Lax");
assertThat(cookie2.getMaxAge().getSeconds()).isEqualTo(42);
return response.releaseBody();
})
.expectComplete()
.verify(Duration.ofSeconds(3));
.block(Duration.ofSeconds(3));
expectRequestCount(1);
}
@ -1135,9 +1130,7 @@ class WebClientIntegrationTests { @@ -1135,9 +1130,7 @@ class WebClientIntegrationTests {
startServer(connector);
String url = "http://example.invalid";
Mono<ClientResponse> result = this.webClient.get().
uri(url)
.exchange();
Mono<Void> result = this.webClient.get().uri(url).retrieve().bodyToMono(Void.class);
StepVerifier.create(result)
.expectErrorSatisfies(throwable -> {

14
spring-webflux/src/test/java/org/springframework/web/reactive/function/server/LocaleContextResolverIntegrationTests.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -26,8 +26,8 @@ import reactor.test.StepVerifier; @@ -26,8 +26,8 @@ import reactor.test.StepVerifier;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.lang.Nullable;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.result.view.View;
import org.springframework.web.reactive.result.view.ViewResolver;
@ -66,16 +66,16 @@ class LocaleContextResolverIntegrationTests extends AbstractRouterFunctionIntegr @@ -66,16 +66,16 @@ class LocaleContextResolverIntegrationTests extends AbstractRouterFunctionIntegr
void fixedLocale(HttpServer httpServer) throws Exception {
startServer(httpServer);
Mono<ClientResponse> result = webClient
Mono<ResponseEntity<Void>> result = webClient
.get()
.uri("http://localhost:" + this.port + "/")
.exchange();
.retrieve().toBodilessEntity();
StepVerifier
.create(result)
.consumeNextWith(response -> {
assertThat(response.statusCode()).isEqualTo(HttpStatus.OK);
assertThat(response.headers().asHttpHeaders().getContentLanguage()).isEqualTo(Locale.GERMANY);
.consumeNextWith(entity -> {
assertThat(entity.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(entity.getHeaders().getContentLanguage()).isEqualTo(Locale.GERMANY);
})
.verifyComplete();
}

11
spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/MultipartIntegrationTests.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -37,6 +37,7 @@ import org.springframework.core.io.ClassPathResource; @@ -37,6 +37,7 @@ import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.MultipartBodyBuilder;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.http.codec.multipart.FormFieldPart;
@ -52,7 +53,6 @@ import org.springframework.web.bind.annotation.RequestPart; @@ -52,7 +53,6 @@ import org.springframework.web.bind.annotation.RequestPart;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.DispatcherHandler;
import org.springframework.web.reactive.config.EnableWebFlux;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
import org.springframework.web.testfixture.http.server.reactive.bootstrap.AbstractHttpHandlerIntegrationTests;
@ -85,15 +85,16 @@ class MultipartIntegrationTests extends AbstractHttpHandlerIntegrationTests { @@ -85,15 +85,16 @@ class MultipartIntegrationTests extends AbstractHttpHandlerIntegrationTests {
void requestPart(HttpServer httpServer) throws Exception {
startServer(httpServer);
Mono<ClientResponse> result = webClient
Mono<ResponseEntity<Void>> result = webClient
.post()
.uri("/requestPart")
.bodyValue(generateBody())
.exchange();
.retrieve()
.toBodilessEntity();
StepVerifier
.create(result)
.consumeNextWith(response -> assertThat(response.statusCode()).isEqualTo(HttpStatus.OK))
.consumeNextWith(entity -> assertThat(entity.getStatusCode()).isEqualTo(HttpStatus.OK))
.verifyComplete();
}

52
spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ProtobufIntegrationTests.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package org.springframework.web.reactive.result.method.annotation;
import java.time.Duration;
import java.util.List;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -26,6 +27,8 @@ import org.springframework.context.ApplicationContext; @@ -26,6 +27,8 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpHeaders;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.config.EnableWebFlux;
@ -66,18 +69,19 @@ class ProtobufIntegrationTests extends AbstractRequestMappingIntegrationTests { @@ -66,18 +69,19 @@ class ProtobufIntegrationTests extends AbstractRequestMappingIntegrationTests {
void value(HttpServer httpServer) throws Exception {
startServer(httpServer);
Mono<Msg> result = this.webClient.get()
Mono<ResponseEntity<Msg>> result = this.webClient.get()
.uri("/message")
.exchange()
.doOnNext(response -> {
assertThat(response.headers().contentType().get().getParameters().containsKey("delimited")).isFalse();
assertThat(response.headers().header("X-Protobuf-Schema").get(0)).isEqualTo("sample.proto");
assertThat(response.headers().header("X-Protobuf-Message").get(0)).isEqualTo("Msg");
})
.flatMap(response -> response.bodyToMono(Msg.class));
.retrieve()
.toEntity(Msg.class);
StepVerifier.create(result)
.expectNext(TEST_MSG)
.consumeNextWith(entity -> {
HttpHeaders headers = entity.getHeaders();
assertThat(headers.getContentType().getParameters().containsKey("delimited")).isFalse();
assertThat(headers.getFirst("X-Protobuf-Schema")).isEqualTo("sample.proto");
assertThat(headers.getFirst("X-Protobuf-Message")).isEqualTo("Msg");
assertThat(entity.getBody()).isEqualTo(TEST_MSG);
})
.verifyComplete();
}
@ -85,20 +89,19 @@ class ProtobufIntegrationTests extends AbstractRequestMappingIntegrationTests { @@ -85,20 +89,19 @@ class ProtobufIntegrationTests extends AbstractRequestMappingIntegrationTests {
void values(HttpServer httpServer) throws Exception {
startServer(httpServer);
Flux<Msg> result = this.webClient.get()
Mono<ResponseEntity<List<Msg>>> result = this.webClient.get()
.uri("/messages")
.exchange()
.doOnNext(response -> {
assertThat(response.headers().contentType().get().getParameters().get("delimited")).isEqualTo("true");
assertThat(response.headers().header("X-Protobuf-Schema").get(0)).isEqualTo("sample.proto");
assertThat(response.headers().header("X-Protobuf-Message").get(0)).isEqualTo("Msg");
})
.flatMapMany(response -> response.bodyToFlux(Msg.class));
.retrieve()
.toEntityList(Msg.class);
StepVerifier.create(result)
.expectNext(TEST_MSG)
.expectNext(TEST_MSG)
.expectNext(TEST_MSG)
.consumeNextWith(entity -> {
HttpHeaders headers = entity.getHeaders();
assertThat(headers.getContentType().getParameters().get("delimited")).isEqualTo("true");
assertThat(headers.getFirst("X-Protobuf-Schema")).isEqualTo("sample.proto");
assertThat(headers.getFirst("X-Protobuf-Message")).isEqualTo("Msg");
assertThat(entity.getBody()).containsExactly(TEST_MSG, TEST_MSG, TEST_MSG);
})
.verifyComplete();
}
@ -108,13 +111,12 @@ class ProtobufIntegrationTests extends AbstractRequestMappingIntegrationTests { @@ -108,13 +111,12 @@ class ProtobufIntegrationTests extends AbstractRequestMappingIntegrationTests {
Flux<Msg> result = this.webClient.get()
.uri("/message-stream")
.exchange()
.doOnNext(response -> {
.exchangeToFlux(response -> {
assertThat(response.headers().contentType().get().getParameters().get("delimited")).isEqualTo("true");
assertThat(response.headers().header("X-Protobuf-Schema").get(0)).isEqualTo("sample.proto");
assertThat(response.headers().header("X-Protobuf-Message").get(0)).isEqualTo("Msg");
})
.flatMapMany(response -> response.bodyToFlux(Msg.class));
return response.bodyToFlux(Msg.class);
});
StepVerifier.create(result)
.expectNext(Msg.newBuilder().setFoo("Foo").setBlah(SecondMsg.newBuilder().setBlah(0).build()).build())

15
spring-webflux/src/test/java/org/springframework/web/reactive/result/view/LocaleContextResolverIntegrationTests.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -30,12 +30,12 @@ import org.springframework.context.annotation.ComponentScan; @@ -30,12 +30,12 @@ import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.lang.Nullable;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.reactive.config.ViewResolverRegistry;
import org.springframework.web.reactive.config.WebFluxConfigurationSupport;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.result.method.annotation.AbstractRequestMappingIntegrationTests;
import org.springframework.web.server.ServerWebExchange;
@ -66,15 +66,16 @@ class LocaleContextResolverIntegrationTests extends AbstractRequestMappingIntegr @@ -66,15 +66,16 @@ class LocaleContextResolverIntegrationTests extends AbstractRequestMappingIntegr
void fixedLocale(HttpServer httpServer) throws Exception {
startServer(httpServer);
Mono<ClientResponse> result = webClient
Mono<ResponseEntity<Void>> result = webClient
.get()
.uri("http://localhost:" + this.port + "/")
.exchange();
.retrieve()
.toBodilessEntity();
StepVerifier.create(result)
.consumeNextWith(response -> {
assertThat(response.statusCode()).isEqualTo(HttpStatus.OK);
assertThat(response.headers().asHttpHeaders().getContentLanguage()).isEqualTo(Locale.GERMANY);
.consumeNextWith(entity -> {
assertThat(entity.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(entity.getHeaders().getContentLanguage()).isEqualTo(Locale.GERMANY);
})
.verifyComplete();
}

1
spring-webflux/src/test/kotlin/org/springframework/web/reactive/function/client/WebClientExtensionsTests.kt

@ -80,6 +80,7 @@ class WebClientExtensionsTests { @@ -80,6 +80,7 @@ class WebClientExtensionsTests {
}
@Test
@Suppress("DEPRECATION")
fun awaitExchange() {
val response = mockk<ClientResponse>()
every { requestBodySpec.exchange() } returns Mono.just(response)

171
src/docs/asciidoc/web/webflux-webclient.adoc

@ -1,16 +1,20 @@ @@ -1,16 +1,20 @@
[[webflux-client]]
= WebClient
Spring WebFlux includes a reactive, non-blocking `WebClient` for HTTP requests. The client
has a functional, fluent API with reactive types for declarative composition, see
<<web-reactive.adoc#webflux-reactive-libraries>>. WebFlux client and server rely on the
same non-blocking <<web-reactive.adoc#webflux-codecs, codecs>> to encode and decode request
and response content.
Spring WebFlux includes a client to perform HTTP requests with. `WebClient` has a
functional, fluent API based on Reactor, see <<web-reactive.adoc#webflux-reactive-libraries>>,
which enables declarative composition of asynchronous logic without the need to deal with
threads or concurrency. It is fully non-blocking, it supports streaming, and relies on
the same <<web-reactive.adoc#webflux-codecs, codecs>> that are also used to encode and
decode request and response content on the server side.
Internally `WebClient` delegates to an HTTP client library. By default, it uses
https://github.com/reactor/reactor-netty[Reactor Netty], there is built-in support for
the Jetty https://github.com/jetty-project/jetty-reactive-httpclient[reactive HttpClient],
and others can be plugged in through a `ClientHttpConnector`.
`WebClient` needs an HTTP client library to perform requests with. There is built-in
support for the following:
* https://github.com/reactor/reactor-netty[Reactor Netty]
* https://github.com/jetty-project/jetty-reactive-httpclient[Jetty Reactive HttpClient]
* https://hc.apache.org/index.html[Apache HttpComponents]
* Others can be plugged via `ClientHttpConnector`.
@ -23,12 +27,10 @@ The simplest way to create a `WebClient` is through one of the static factory me @@ -23,12 +27,10 @@ The simplest way to create a `WebClient` is through one of the static factory me
* `WebClient.create()`
* `WebClient.create(String baseUrl)`
The above methods use the Reactor Netty `HttpClient` with default settings and expect
`io.projectreactor.netty:reactor-netty` to be on the classpath.
You can also use `WebClient.builder()` with further options:
* `uriBuilderFactory`: Customized `UriBuilderFactory` to use as a base URL.
* `defaultUriVariables`: default values to use when expanding URI templates.
* `defaultHeader`: Headers for every request.
* `defaultCookie`: Cookies for every request.
* `defaultRequest`: `Consumer` to customize every request.
@ -36,33 +38,25 @@ You can also use `WebClient.builder()` with further options: @@ -36,33 +38,25 @@ You can also use `WebClient.builder()` with further options:
* `exchangeStrategies`: HTTP message reader/writer customizations.
* `clientConnector`: HTTP client library settings.
The following example configures <<web-reactive.adoc#webflux-codecs, HTTP codecs>>:
For example:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
----
WebClient client = WebClient.builder()
.exchangeStrategies(builder -> {
return builder.codecs(codecConfigurer -> {
//...
});
})
.codecs(configurer -> ... )
.build();
----
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
.Kotlin
----
val webClient = WebClient.builder()
.exchangeStrategies { strategies ->
strategies.codecs {
//...
}
}
.codecs { configurer -> ... }
.build()
----
Once built, a `WebClient` instance is immutable. However, you can clone it and build a
modified copy without affecting the original instance, as the following example shows:
Once built, a `WebClient` is immutable. However, you can clone it and build a
modified copy as follows:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
@ -94,37 +88,29 @@ modified copy without affecting the original instance, as the following example @@ -94,37 +88,29 @@ modified copy without affecting the original instance, as the following example
[[webflux-client-builder-maxinmemorysize]]
=== MaxInMemorySize
Spring WebFlux configures <<web-reactive.adoc#webflux-codecs-limits,limits>> for buffering
data in-memory in codec to avoid application memory issues. By the default this is
configured to 256KB and if that's not enough for your use case, you'll see the following:
Codecs have <<web-reactive.adoc#webflux-codecs-limits,limits>> for buffering data in
memory to avoid application memory issues. By the default those are set to 256KB.
If that's not enough you'll get the following error:
----
org.springframework.core.io.buffer.DataBufferLimitException: Exceeded limit on max bytes to buffer
----
You can configure this limit on all default codecs with the following code sample:
To change the limit for default codecs, use the following:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
----
WebClient webClient = WebClient.builder()
.exchangeStrategies(builder ->
builder.codecs(codecs ->
codecs.defaultCodecs().maxInMemorySize(2 * 1024 * 1024)
)
)
.codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(2 * 1024 * 1024));
.build();
----
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
.Kotlin
----
val webClient = WebClient.builder()
.exchangeStrategies { builder ->
builder.codecs {
it.defaultCodecs().maxInMemorySize(2 * 1024 * 1024)
}
}
.build()
.codecs { configurer -> configurer.defaultCodecs().maxInMemorySize(2 * 1024 * 1024) }
.build()
----
@ -132,7 +118,7 @@ You can configure this limit on all default codecs with the following code sampl @@ -132,7 +118,7 @@ You can configure this limit on all default codecs with the following code sampl
[[webflux-client-builder-reactor]]
=== Reactor Netty
To customize Reactor Netty settings, simple provide a pre-configured `HttpClient`:
To customize Reactor Netty settings, provide a pre-configured `HttpClient`:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
@ -457,8 +443,30 @@ The following example shows how to customize Apache HttpComponents `HttpClient` @@ -457,8 +443,30 @@ The following example shows how to customize Apache HttpComponents `HttpClient`
[[webflux-client-retrieve]]
== `retrieve()`
The `retrieve()` method is the easiest way to get a response body and decode it.
The following example shows how to do so:
The `retrieve()` method can be used to declare how to extract the response. For example:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
----
WebClient client = WebClient.create("https://example.org");
Mono<ResponseEntity<Person>> result = client.get()
.uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
.retrieve()
.toEntity(Person.class);
----
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
.Kotlin
----
val client = WebClient.create("https://example.org")
val result = client.get()
.uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
.retrieve()
.toEntity<Person>().awaitSingle()
----
Or to get only the body:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
@ -481,7 +489,7 @@ The following example shows how to do so: @@ -481,7 +489,7 @@ The following example shows how to do so:
.awaitBody<Person>()
----
You can also get a stream of objects decoded from the response, as the following example shows:
To get a stream of decoded objects:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
@ -500,11 +508,9 @@ You can also get a stream of objects decoded from the response, as the following @@ -500,11 +508,9 @@ You can also get a stream of objects decoded from the response, as the following
.bodyToFlow<Quote>()
----
By default, responses with 4xx or 5xx status codes result in an
`WebClientResponseException` or one of its HTTP status specific sub-classes, such as
`WebClientResponseException.BadRequest`, `WebClientResponseException.NotFound`, and others.
You can also use the `onStatus` method to customize the resulting exception,
as the following example shows:
By default, 4xx or 5xx responses result in an `WebClientResponseException`, including
sub-classes for specific HTTP status codes. To customize the handling of error
responses, use `onStatus` handlers as follows:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
@ -527,67 +533,44 @@ as the following example shows: @@ -527,67 +533,44 @@ as the following example shows:
.awaitBody<Person>()
----
When `onStatus` is used, if the response is expected to have content, then the `onStatus`
callback should consume it. If not, the content will be automatically drained to ensure
resources are released.
[[webflux-client-exchange]]
== `exchange()`
== `exchangeToMono()`
The `exchange()` method provides more control than the `retrieve` method. The following example is equivalent
to `retrieve()` but also provides access to the `ClientResponse`:
The `exchangeToMono()` and `exchangeToFlux()` methods are useful for more advanced
cases that require more control, such as to decode the response differently
depending on the response status:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
----
Mono<Person> result = client.get()
.uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
.exchange()
.flatMap(response -> response.bodyToMono(Person.class));
Mono<Object> entityMono = client.get()
.uri("/persons/1")
.accept(MediaType.APPLICATION_JSON)
.exchangeToMono(response -> {
if (response.statusCode().equals(HttpStatus.OK)) {
return response.bodyToMono(Person.class);
}
else if (response.statusCode().is4xxClientError()) {
return response.bodyToMono(ErrorContainer.class);
}
else {
return Mono.error(response.createException());
}
});
----
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
.Kotlin
----
val result = client.get()
.uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Person>()
----
At this level, you can also create a full `ResponseEntity`:
When using the above, after the returned `Mono` or `Flux` completes, the response body
is checked and if not consumed it is released to prevent memory and connection leaks.
Therefore the response cannot be decoded further downstream. It is up to the provided
function to declare how to decode the response if needed.
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
----
Mono<ResponseEntity<Person>> result = client.get()
.uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
.exchange()
.flatMap(response -> response.toEntity(Person.class));
----
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
.Kotlin
----
val result = client.get()
.uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.toEntity<Person>()
----
Note that (unlike `retrieve()`), with `exchange()`, there are no automatic error signals for
4xx and 5xx responses. You have to check the status code and decide how to proceed.
[CAUTION]
====
Unlike `retrieve()`, when using `exchange()`, it is the responsibility of the application
to consume any response content regardless of the scenario (success, error, unexpected
data, etc). Not doing so can cause a memory leak. The Javadoc for `ClientResponse` lists
all the available options for consuming the body. Generally prefer using `retrieve()`
unless you have a good reason for using `exchange()` which does allow to check the
response status and headers before deciding how to or if to consume the response.
====

Loading…
Cancel
Save