From 43f2de467173c6eff650047755d9d7ee9d140f0e Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 18 Jul 2017 18:13:47 +0200 Subject: [PATCH] Defensive checks in WebClient and Reactor connector Since there is no reason for an exchange to ever complete without a ClientResponse I've added a switchIfEmpty check at the WebClient level. Also, temporarily a second check closer to the problem in the ReactorClientHttpConnector suggesting a workaround and providing a reference to the Reactor Netty issue #138. Issue: SPR-15784 --- .../reactive/ReactorClientHttpConnector.java | 28 ++++++++++++++++--- .../function/client/DefaultWebClient.java | 6 +++- .../client/DefaultWebClientTests.java | 16 +++++++++-- 3 files changed, 42 insertions(+), 8 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java index 6f8edc2a433..94044b93b78 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java @@ -23,6 +23,8 @@ import java.util.function.Function; import reactor.core.publisher.Mono; import reactor.ipc.netty.http.client.HttpClient; import reactor.ipc.netty.http.client.HttpClientOptions; +import reactor.ipc.netty.http.client.HttpClientRequest; +import reactor.ipc.netty.http.client.HttpClientResponse; import reactor.ipc.netty.options.ClientOptions; import org.springframework.http.HttpMethod; @@ -36,6 +38,12 @@ import org.springframework.http.HttpMethod; */ public class ReactorClientHttpConnector implements ClientHttpConnector { + private static final Mono NO_CLIENT_RESPONSE_ERROR = Mono.error( + new IllegalStateException("HttpClient completed without a response. " + + "As a temporary workaround try to disable connection pool. " + + "See https://github.com/reactor/reactor-netty/issues/138.")); + + private final HttpClient httpClient; @@ -61,11 +69,23 @@ public class ReactorClientHttpConnector implements ClientHttpConnector { Function> requestCallback) { return this.httpClient - .request(io.netty.handler.codec.http.HttpMethod.valueOf(method.name()), + .request(adaptHttpMethod(method), uri.toString(), - httpClientRequest -> requestCallback - .apply(new ReactorClientHttpRequest(method, uri, httpClientRequest))) - .map(ReactorClientHttpResponse::new); + request -> requestCallback.apply(adaptRequest(method, uri, request))) + .map(this::adaptResponse) + .switchIfEmpty(NO_CLIENT_RESPONSE_ERROR); + } + + private io.netty.handler.codec.http.HttpMethod adaptHttpMethod(HttpMethod method) { + return io.netty.handler.codec.http.HttpMethod.valueOf(method.name()); + } + + private ReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpClientRequest request) { + return new ReactorClientHttpRequest(method, uri, request); + } + + private ClientHttpResponse adaptResponse(HttpClientResponse response) { + return new ReactorClientHttpResponse(response); } } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java index 568b46bbc54..a9484fe027f 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java @@ -63,6 +63,10 @@ import org.springframework.web.util.UriBuilderFactory; */ class DefaultWebClient implements WebClient { + private static final Mono NO_HTTP_CLIENT_RESPONSE_ERROR = Mono.error( + new IllegalStateException("The underlying HTTP client completed without emitting a response.")); + + private final ExchangeFunction exchangeFunction; private final UriBuilderFactory uriBuilderFactory; @@ -309,7 +313,7 @@ class DefaultWebClient implements WebClient { ClientRequest request = (this.inserter != null ? initRequestBuilder().body(this.inserter).build() : initRequestBuilder().build()); - return exchangeFunction.exchange(request); + return exchangeFunction.exchange(request).switchIfEmpty(NO_HTTP_CLIENT_RESPONSE_ERROR); } private ClientRequest.Builder initRequestBuilder() { diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultWebClientTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultWebClientTests.java index a008345488f..4eef4cb27d6 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultWebClientTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultWebClientTests.java @@ -16,6 +16,7 @@ package org.springframework.web.reactive.function.client; +import java.time.Duration; import java.util.Collections; import org.junit.Before; @@ -25,12 +26,15 @@ import org.mockito.Captor; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; /** * Unit tests for {@link DefaultWebClient}. @@ -160,7 +164,7 @@ public class DefaultWebClientTests { @Test public void apply() { WebClient client = builder() - .apply(builder -> builder.defaultHeader("Accept", "application/json").defaultCookie("id", "123")) + .apply(builder -> builder.defaultHeader("Accept", "application/json").defaultCookie("id", "123")) .build(); client.get().uri("/path").exchange(); @@ -170,6 +174,12 @@ public class DefaultWebClientTests { verifyNoMoreInteractions(this.exchangeFunction); } + @Test + public void switchToErrorOnEmptyClientResponseMono() throws Exception { + StepVerifier.create(builder().build().get().uri("/path").exchange()) + .expectErrorMessage("The underlying HTTP client completed without emitting a response.") + .verify(Duration.ofSeconds(5)); + } private WebClient.Builder builder() {