diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpConnector.java
index be82720c726..f090eb5ce9e 100644
--- a/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpConnector.java
+++ b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpConnector.java
@@ -27,6 +27,7 @@ import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStreamResetException;
import org.apache.hc.core5.http.Message;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
@@ -43,6 +44,7 @@ import org.springframework.util.Assert;
* {@link ClientHttpConnector} implementation for the Apache HttpComponents HttpClient 5.x.
*
* @author Martin Tarjányi
+ * @author Arjen Poutsma
* @since 5.3
* @see Apache HttpComponents
*/
@@ -148,7 +150,12 @@ public class HttpComponentsClientHttpConnector implements ClientHttpConnector {
@Override
public void failed(Exception ex) {
- this.sink.error(ex);
+ Throwable t = ex;
+ if (t instanceof HttpStreamResetException) {
+ HttpStreamResetException httpStreamResetException = (HttpStreamResetException) ex;
+ t = httpStreamResetException.getCause();
+ }
+ this.sink.error(t);
}
@Override
diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpRequest.java
index b9998217dd6..623ffc188a1 100644
--- a/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpRequest.java
+++ b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpRequest.java
@@ -20,6 +20,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.Collection;
+import java.util.function.Function;
import org.apache.hc.client5.http.cookie.CookieStore;
import org.apache.hc.client5.http.impl.cookie.BasicClientCookie;
@@ -39,13 +40,14 @@ import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.lang.Nullable;
+import org.springframework.util.Assert;
import static org.springframework.http.MediaType.ALL_VALUE;
/**
* {@link ClientHttpRequest} implementation for the Apache HttpComponents HttpClient 5.x.
- *
* @author Martin Tarjányi
+ * @author Arjen Poutsma
* @since 5.3
* @see Apache HttpComponents
*/
@@ -72,7 +74,9 @@ class HttpComponentsClientHttpRequest extends AbstractClientHttpRequest {
@Override
public HttpMethod getMethod() {
- return HttpMethod.resolve(this.httpRequest.getMethod());
+ HttpMethod method = HttpMethod.resolve(this.httpRequest.getMethod());
+ Assert.state(method != null, "Method must not be null");
+ return method;
}
@Override
@@ -100,7 +104,7 @@ class HttpComponentsClientHttpRequest extends AbstractClientHttpRequest {
@Override
public Mono writeAndFlushWith(Publisher extends Publisher extends DataBuffer>> body) {
- return writeWith(Flux.from(body).flatMap(p -> p));
+ return writeWith(Flux.from(body).flatMap(Function.identity()));
}
@Override
diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpResponse.java
index 207993b546e..2ae593121ec 100644
--- a/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpResponse.java
+++ b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpResponse.java
@@ -39,6 +39,7 @@ import org.springframework.util.MultiValueMap;
* {@link ClientHttpResponse} implementation for the Apache HttpComponents HttpClient 5.x.
*
* @author Martin Tarjányi
+ * @author Arjen Poutsma
* @since 5.3
* @see Apache HttpComponents
*/
diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java
index 481d3c0582f..a45ca479024 100644
--- a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java
+++ b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2019 the original author or authors.
+ * Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -91,12 +91,13 @@ public class JettyClientHttpConnector implements ClientHttpConnector {
}
}
-
+ /**
+ * Set the buffer factory to be used.
+ */
public void setBufferFactory(DataBufferFactory bufferFactory) {
this.bufferFactory = bufferFactory;
}
-
@Override
public Mono connect(HttpMethod method, URI uri,
Function super ClientHttpRequest, Mono> requestCallback) {
@@ -125,16 +126,7 @@ public class JettyClientHttpConnector implements ClientHttpConnector {
}
private DataBuffer toDataBuffer(ContentChunk chunk) {
-
- // We must copy until this is resolved:
- // https://github.com/eclipse/jetty.project/issues/2429
-
- // Use copy instead of buffer wrapping because Callback#succeeded() is
- // used not only to release the buffer but also to request more data
- // which is a problem for codecs that buffer data.
-
- DataBuffer buffer = this.bufferFactory.allocateBuffer(chunk.buffer.capacity());
- buffer.write(chunk.buffer);
+ DataBuffer buffer = this.bufferFactory.wrap(chunk.buffer);
chunk.callback.succeeded();
return buffer;
}
diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java
index b019a6192cb..de1a0b84b27 100644
--- a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java
+++ b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2018 the original author or authors.
+ * Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -26,14 +26,13 @@ import org.eclipse.jetty.reactive.client.ContentChunk;
import org.eclipse.jetty.reactive.client.ReactiveRequest;
import org.eclipse.jetty.util.Callback;
import org.reactivestreams.Publisher;
-import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoSink;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
-import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
@@ -87,21 +86,18 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest {
@Override
public Mono writeWith(Publisher extends DataBuffer> body) {
- Flux chunks = Flux.from(body).map(this::toContentChunk);
- ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, getContentType());
- this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build();
- return doCommit(this::completes);
+ return Mono.create(sink -> {
+ Flux chunks = Flux.from(body).map(buffer -> toContentChunk(buffer, sink));
+ ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, getContentType());
+ this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build();
+ sink.success();
+ })
+ .then(doCommit(this::completes));
}
@Override
public Mono writeAndFlushWith(Publisher extends Publisher extends DataBuffer>> body) {
- Flux chunks = Flux.from(body)
- .flatMap(Function.identity())
- .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)
- .map(this::toContentChunk);
- ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, getContentType());
- this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build();
- return doCommit(this::completes);
+ return writeWith(Flux.from(body).flatMap(Function.identity()));
}
private String getContentType() {
@@ -113,7 +109,7 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest {
return Mono.empty();
}
- private ContentChunk toContentChunk(DataBuffer buffer) {
+ private ContentChunk toContentChunk(DataBuffer buffer, MonoSink sink) {
return new ContentChunk(buffer.asByteBuffer(), new Callback() {
@Override
public void succeeded() {
@@ -123,7 +119,7 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest {
@Override
public void failed(Throwable x) {
DataBufferUtils.release(buffer);
- throw Exceptions.propagate(x);
+ sink.error(x);
}
});
}
diff --git a/spring-web/src/test/java/org/springframework/http/client/reactive/ClientHttpConnectorTests.java b/spring-web/src/test/java/org/springframework/http/client/reactive/ClientHttpConnectorTests.java
new file mode 100644
index 00000000000..19621e0e680
--- /dev/null
+++ b/spring-web/src/test/java/org/springframework/http/client/reactive/ClientHttpConnectorTests.java
@@ -0,0 +1,237 @@
+/*
+ * Copyright 2002-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.http.client.reactive;
+
+import java.io.IOException;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Consumer;
+
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
+import okio.Buffer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.DataBufferFactory;
+import org.springframework.core.io.buffer.DataBufferUtils;
+import org.springframework.core.io.buffer.DefaultDataBufferFactory;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ReactiveHttpOutputMessage;
+import org.springframework.lang.NonNull;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+
+/**
+ * @author Arjen Poutsma
+ */
+public class ClientHttpConnectorTests {
+
+ private static final int BUF_SIZE = 1024;
+
+ private static final EnumSet METHODS_WITH_BODY =
+ EnumSet.of(HttpMethod.PUT, HttpMethod.POST, HttpMethod.PATCH);
+
+ private final MockWebServer server = new MockWebServer();
+
+ private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
+
+ @BeforeEach
+ void startServer() throws IOException {
+ server.start();
+ }
+
+ @AfterEach
+ void stopServer() throws IOException {
+ server.shutdown();
+ }
+
+ @ParameterizedTest
+ @MethodSource("org.springframework.http.client.reactive.ClientHttpConnectorTests#methodsWithConnectors")
+ void basic(ClientHttpConnector connector, HttpMethod method) throws Exception {
+ URI uri = this.server.url("/").uri();
+
+ String responseBody = "bar\r\n";
+ prepareResponse(response -> {
+ response.setResponseCode(200);
+ response.addHeader("Baz", "Qux");
+ response.setBody(responseBody);
+ });
+
+ String requestBody = "foo\r\n";
+ boolean requestHasBody = METHODS_WITH_BODY.contains(method);
+
+ Mono futureResponse = connector.connect(method, uri, request -> {
+ assertThat(request.getMethod()).isEqualTo(method);
+ assertThat(request.getURI()).isEqualTo(uri);
+ request.getHeaders().add("Foo", "Bar");
+ if (requestHasBody) {
+ Mono body = Mono.fromCallable(() -> {
+ byte[] bytes = requestBody.getBytes(StandardCharsets.UTF_8);
+ return this.bufferFactory.wrap(bytes);
+ });
+ return request.writeWith(body);
+ }
+ else {
+ return request.setComplete();
+ }
+ });
+
+ CountDownLatch latch = new CountDownLatch(1);
+ StepVerifier.create(futureResponse)
+ .assertNext(response -> {
+ assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
+ assertThat(response.getHeaders().getFirst("Baz")).isEqualTo("Qux");
+ DataBufferUtils.join(response.getBody())
+ .map(buffer -> {
+ String s = buffer.toString(StandardCharsets.UTF_8);
+ DataBufferUtils.release(buffer);
+ return s;
+ }).subscribe(
+ s -> assertThat(s).isEqualTo(responseBody),
+ throwable -> {
+ latch.countDown();
+ fail(throwable.getMessage(), throwable);
+ },
+ latch::countDown);
+ })
+ .verifyComplete();
+ latch.await();
+
+ expectRequest(request -> {
+ assertThat(request.getMethod()).isEqualTo(method.name());
+ assertThat(request.getHeader("Foo")).isEqualTo("Bar");
+ if (requestHasBody) {
+ assertThat(request.getBody().readUtf8()).isEqualTo(requestBody);
+ }
+ });
+ }
+
+ @ParameterizedConnectorTest
+ void errorInRequestBody(ClientHttpConnector connector) {
+ Exception error = new RuntimeException();
+ Flux body = Flux.concat(
+ stringBuffer("foo"),
+ Mono.error(error)
+ );
+ prepareResponse(response -> response.setResponseCode(200));
+ Mono futureResponse =
+ connector.connect(HttpMethod.POST, this.server.url("/").uri(), request -> request.writeWith(body));
+ StepVerifier.create(futureResponse)
+ .expectErrorSatisfies(throwable -> assertThat(throwable).isSameAs(error))
+ .verify();
+ }
+
+ @ParameterizedConnectorTest
+ void cancelResponseBody(ClientHttpConnector connector) {
+ Buffer responseBody = randomBody(100);
+ prepareResponse(response -> response.setBody(responseBody));
+
+ ClientHttpResponse response = connector.connect(HttpMethod.POST, this.server.url("/").uri(),
+ ReactiveHttpOutputMessage::setComplete).block();
+ assertThat(response).isNotNull();
+
+ StepVerifier.create(response.getBody(), 1)
+ .expectNextCount(1)
+ .thenRequest(1)
+ .thenCancel()
+ .verify();
+ }
+
+ @NonNull
+ private Buffer randomBody(int size) {
+ Buffer responseBody = new Buffer();
+ Random rnd = new Random();
+ for (int i = 0; i < size; i++) {
+ byte[] bytes = new byte[BUF_SIZE];
+ rnd.nextBytes(bytes);
+ responseBody.write(bytes);
+ }
+ return responseBody;
+ }
+
+ private void prepareResponse(Consumer consumer) {
+ MockResponse response = new MockResponse();
+ consumer.accept(response);
+ this.server.enqueue(response);
+ }
+
+ private void expectRequest(Consumer consumer) {
+ try {
+ consumer.accept(this.server.takeRequest());
+ }
+ catch (InterruptedException ex) {
+ throw new IllegalStateException(ex);
+ }
+ }
+
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.METHOD)
+ @ParameterizedTest
+ @MethodSource("org.springframework.http.client.reactive.ClientHttpConnectorTests#connectors")
+ public @interface ParameterizedConnectorTest {
+
+ }
+
+ static List connectors() {
+ return Arrays.asList(
+ new ReactorClientHttpConnector(),
+ new JettyClientHttpConnector(),
+ new HttpComponentsClientHttpConnector()
+ );
+ }
+
+ static List methodsWithConnectors() {
+ List result = new ArrayList<>();
+ for (ClientHttpConnector connector : connectors()) {
+ for (HttpMethod method : HttpMethod.values()) {
+ result.add(Arguments.of(connector, method));
+ }
+ }
+ return result;
+ }
+
+ private Mono stringBuffer(String value) {
+ return Mono.fromCallable(() -> {
+ byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
+ DataBuffer buffer = this.bufferFactory.allocateBuffer(bytes.length);
+ buffer.write(bytes);
+ return buffer;
+ });
+ }
+
+}