From a87764f1fd218356690e05cf2aa964e92202eed6 Mon Sep 17 00:00:00 2001 From: sdeleuze Date: Wed, 11 Apr 2018 10:42:36 +0200 Subject: [PATCH] Add support for Jetty Reactive Streams HTTP client Leverage https://github.com/jetty-project/jetty-reactive-httpclient to add support for Jetty in WebClient via JettyClientHttpConnector. Implemented with buffer copy instead of optimized buffer wrapping because the latter hangs since Callback#succeeded doesn't allow releasing the buffer and requesting more data at different times (required for Mono for example). See https://github.com/eclipse/jetty.project/issues/2429. Issue: SPR-15092 --- spring-web/spring-web.gradle | 1 + .../reactive/JettyClientHttpConnector.java | 143 +++++++++++++++++ .../reactive/JettyClientHttpRequest.java | 148 ++++++++++++++++++ .../reactive/JettyClientHttpResponse.java | 94 +++++++++++ spring-webflux/spring-webflux.gradle | 1 + .../client/WebClientIntegrationTests.java | 46 +++++- .../annotation/SseIntegrationTests.java | 30 +++- src/docs/asciidoc/web/webflux-webclient.adoc | 5 +- 8 files changed, 465 insertions(+), 3 deletions(-) create mode 100644 spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java create mode 100644 spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java create mode 100644 spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpResponse.java diff --git a/spring-web/spring-web.gradle b/spring-web/spring-web.gradle index 7c97c2f829a..fddab6b6ee6 100644 --- a/spring-web/spring-web.gradle +++ b/spring-web/spring-web.gradle @@ -62,6 +62,7 @@ dependencies { optional("org.codehaus.groovy:groovy-all:${groovyVersion}") optional("org.jetbrains.kotlin:kotlin-reflect:${kotlinVersion}") optional("org.jetbrains.kotlin:kotlin-stdlib:${kotlinVersion}") + optional("org.eclipse.jetty:jetty-reactive-httpclient:1.0.0") testCompile("io.projectreactor:reactor-test") testCompile("org.apache.taglibs:taglibs-standard-jstlel:1.2.5") { exclude group: "org.apache.taglibs", module: "taglibs-standard-spec" diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java new file mode 100644 index 00000000000..3fb96c4d8c0 --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java @@ -0,0 +1,143 @@ +/* + * Copyright 2002-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.http.client.reactive; + +import java.net.URI; +import java.util.function.Function; + +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.util.Callback; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.context.SmartLifecycle; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.http.HttpMethod; + +/** + * Jetty ReactiveStreams HttpClient implementation of {@link ClientHttpConnector}. + * + * Implemented with buffer copy instead of optimized buffer wrapping because the latter + * hangs since {@link Callback#succeeded()} doesn't allow releasing the buffer and + * requesting more data at different times (required for {@code Mono} for example). + * See https://github.com/eclipse/jetty.project/issues/2429 for more details. + * + * @author Sebastien Deleuze + * @since 5.1 + * @see Jetty ReactiveStreams HttpClient + */ +public class JettyClientHttpConnector implements ClientHttpConnector, SmartLifecycle { + + private final HttpClient httpClient; + + private DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); + + + /** + * Create a Jetty {@link ClientHttpConnector} with the default {@link HttpClient}. + */ + public JettyClientHttpConnector() { + this(new HttpClient()); + } + + /** + * Create a Jetty {@link ClientHttpConnector} with the given {@link HttpClient}. + */ + public JettyClientHttpConnector(HttpClient httpClient) { + this.httpClient = httpClient; + } + + + public void setBufferFactory(DataBufferFactory bufferFactory) { + this.bufferFactory = bufferFactory; + } + + @Override + public int getPhase() { + return Integer.MAX_VALUE; + } + + @Override + public boolean isAutoStartup() { + return true; + } + + @Override + public boolean isRunning() { + return this.httpClient.isRunning(); + } + + @Override + public void start() { + try { + // HttpClient is internally synchronized and protected with state checks + this.httpClient.start(); + } + catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + @Override + public void stop() { + try { + this.httpClient.stop(); + } + catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + @Override + public void stop(Runnable callback) { + stop(); + callback.run(); + } + + @Override + public Mono connect(HttpMethod method, URI uri, + Function> requestCallback) { + + if (!uri.isAbsolute()) { + return Mono.error(new IllegalArgumentException("URI is not absolute: " + uri)); + } + + if (!this.httpClient.isStarted()) { + try { + this.httpClient.start(); + } + catch (Exception ex) { + return Mono.error(ex); + } + } + + JettyClientHttpRequest clientHttpRequest = new JettyClientHttpRequest( + this.httpClient.newRequest(uri).method(method.toString()), this.bufferFactory); + return requestCallback.apply(clientHttpRequest).then(Mono.from( + clientHttpRequest.getReactiveRequest().response((reactiveResponse, contentChunks) -> { + Flux content = Flux.from(contentChunks).map(chunk -> { + DataBuffer buffer = this.bufferFactory.allocateBuffer(chunk.buffer.capacity()); + buffer.write(chunk.buffer); + chunk.callback.succeeded(); + return buffer; + }); + return Mono.just(new JettyClientHttpResponse(reactiveResponse, content)); + }))); + } + +} diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java new file mode 100644 index 00000000000..b2a51b1681d --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java @@ -0,0 +1,148 @@ +/* + * Copyright 2002-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.http.client.reactive; + +import java.net.HttpCookie; +import java.net.URI; +import java.util.Collection; +import java.util.function.Function; + +import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.reactive.client.ContentChunk; +import org.eclipse.jetty.reactive.client.ReactiveRequest; +import org.eclipse.jetty.util.Callback; +import org.reactivestreams.Publisher; +import reactor.core.Exceptions; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.MediaType; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +/** + * {@link ClientHttpRequest} implementation for the Jetty ReactiveStreams HTTP client. + * + * @author Sebastien Deleuze + * @since 5.1 + * @see Jetty ReactiveStreams HttpClient + */ +class JettyClientHttpRequest extends AbstractClientHttpRequest { + + private final Request jettyRequest; + + private final DataBufferFactory bufferFactory; + + @Nullable + private ReactiveRequest reactiveRequest; + + + public JettyClientHttpRequest(Request jettyRequest, DataBufferFactory bufferFactory) { + this.jettyRequest = jettyRequest; + this.bufferFactory = bufferFactory; + } + + + @Override + public HttpMethod getMethod() { + HttpMethod method = HttpMethod.resolve(this.jettyRequest.getMethod()); + Assert.state(method != null, "Method must not be null"); + return method; + } + + @Override + public URI getURI() { + return this.jettyRequest.getURI(); + } + + @Override + public Mono setComplete() { + return doCommit(this::completes); + } + + @Override + public DataBufferFactory bufferFactory() { + return this.bufferFactory; + } + + @Override + public Mono writeWith(Publisher publisher) { + Flux chunks = Flux.from(publisher).map(this::toContentChunk); + MediaType contentType = getHeaders().getContentType(); + ReactiveRequest.Content requestContent = ReactiveRequest.Content.fromPublisher(chunks, + (contentType != null ? contentType.toString() : MediaType.APPLICATION_OCTET_STREAM_VALUE)); + this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(requestContent).build(); + return doCommit(this::completes); + } + + @Override + public Mono writeAndFlushWith(Publisher> body) { + String contentType = this.jettyRequest.getHeaders().getField(HttpHeader.CONTENT_TYPE).getValue(); + Flux chunks = Flux.from(body).flatMap(Function.identity()).map(this::toContentChunk); + ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, contentType); + this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build(); + return doCommit(this::completes); + } + + private Mono completes() { + return Mono.empty(); + } + + private ContentChunk toContentChunk(DataBuffer buffer) { + return new ContentChunk(buffer.asByteBuffer(), new Callback() { + @Override + public void succeeded() { + DataBufferUtils.release(buffer); + } + + @Override + public void failed(Throwable x) { + DataBufferUtils.release(buffer); + throw Exceptions.propagate(x); + } + }); + } + + + @Override + protected void applyCookies() { + getCookies().values().stream().flatMap(Collection::stream) + .map(cookie -> new HttpCookie(cookie.getName(), cookie.getValue())) + .forEach(this.jettyRequest::cookie); + } + + @Override + protected void applyHeaders() { + HttpHeaders headers = getHeaders(); + headers.forEach((key, value) -> value.forEach(v -> this.jettyRequest.header(key, v))); + if (!headers.containsKey(HttpHeaders.ACCEPT)) { + this.jettyRequest.header(HttpHeaders.ACCEPT, "*/*"); + } + } + + ReactiveRequest getReactiveRequest() { + if (this.reactiveRequest == null) { + this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).build(); + } + return this.reactiveRequest; + } +} diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpResponse.java new file mode 100644 index 00000000000..074c772ec24 --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpResponse.java @@ -0,0 +1,94 @@ +/* + * Copyright 2002-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.http.client.reactive; + +import java.net.HttpCookie; + +import org.eclipse.jetty.reactive.client.ReactiveResponse; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; + +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseCookie; +import org.springframework.util.Assert; +import org.springframework.util.CollectionUtils; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +/** + * {@link ClientHttpResponse} implementation for the Jetty ReactiveStreams HTTP client. + * + * @author Sebastien Deleuze + * @since 5.1 + * @see Jetty ReactiveStreams HttpClient + */ +class JettyClientHttpResponse implements ClientHttpResponse { + + private final ReactiveResponse reactiveResponse; + + private final Flux content; + + + public JettyClientHttpResponse(ReactiveResponse reactiveResponse, Publisher content) { + Assert.notNull(reactiveResponse, "reactiveResponse should not be null"); + Assert.notNull(content, "content should not be null"); + this.reactiveResponse = reactiveResponse; + this.content = Flux.from(content); + } + + + @Override + public HttpStatus getStatusCode() { + return HttpStatus.valueOf(getRawStatusCode()); + } + + @Override + public int getRawStatusCode() { + return this.reactiveResponse.getStatus(); + } + + @Override + public MultiValueMap getCookies() { + MultiValueMap result = new LinkedMultiValueMap<>(); + getHeaders().get(HttpHeaders.SET_COOKIE).forEach(header -> { + HttpCookie.parse(header).forEach(cookie -> result.add(cookie.getName(), ResponseCookie.from(cookie.getName(), cookie.getValue()) + .domain(cookie.getDomain()) + .path(cookie.getPath()) + .maxAge(cookie.getMaxAge()) + .secure(cookie.getSecure()) + .httpOnly(cookie.isHttpOnly()) + .build())); + + }); + return CollectionUtils.unmodifiableMultiValueMap(result); + } + + @Override + public Flux getBody() { + return this.content; + } + + @Override + public HttpHeaders getHeaders() { + HttpHeaders headers = new HttpHeaders(); + this.reactiveResponse.getHeaders().stream() + .forEach(e -> headers.add(e.getName(), e.getValue())); + return headers; + } + +} diff --git a/spring-webflux/spring-webflux.gradle b/spring-webflux/spring-webflux.gradle index 4e33a440489..b51475dcd40 100644 --- a/spring-webflux/spring-webflux.gradle +++ b/spring-webflux/spring-webflux.gradle @@ -45,6 +45,7 @@ dependencies { testCompile("io.projectreactor:reactor-test") testCompile("org.apache.tomcat:tomcat-util:${tomcatVersion}") testCompile("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}") + testCompile("org.eclipse.jetty:jetty-reactive-httpclient:1.0.0") testCompile("org.eclipse.jetty:jetty-server") testCompile("org.eclipse.jetty:jetty-servlet") testCompile("io.undertow:undertow-core:${undertowVersion}") diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java index c92dce5b125..79100893a1a 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java @@ -32,6 +32,8 @@ import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -46,6 +48,9 @@ import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; +import org.springframework.http.client.reactive.ClientHttpConnector; +import org.springframework.http.client.reactive.JettyClientHttpConnector; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.http.codec.Pojo; import static org.junit.Assert.*; @@ -56,18 +61,35 @@ import static org.junit.Assert.*; * @author Brian Clozel * @author Rossen Stoyanchev * @author Denys Ivano + * @author Sebastien Deleuze */ +@RunWith(Parameterized.class) public class WebClientIntegrationTests { private MockWebServer server; private WebClient webClient; + @Parameterized.Parameter(0) + public ClientHttpConnector connector; + + @Parameterized.Parameters(name = "webClient [{0}]") + public static Object[][] arguments() { + + return new Object[][] { + {new JettyClientHttpConnector()}, + {new ReactorClientHttpConnector()} + }; + } @Before public void setup() { this.server = new MockWebServer(); - this.webClient = WebClient.create(this.server.url("/").toString()); + this.webClient = WebClient + .builder() + .clientConnector(this.connector) + .baseUrl(this.server.url("/").toString()) + .build(); } @After @@ -124,6 +146,28 @@ public class WebClientIntegrationTests { }); } + @Test + public void shouldReceivePlainTextFlux() throws Exception { + prepareResponse(response -> response.setBody("Hello Spring!")); + + Flux result = this.webClient.get() + .uri("/greeting?name=Spring") + .header("X-Test-Header", "testvalue") + .exchange() + .flatMapMany(response -> response.bodyToFlux(String.class)); + + StepVerifier.create(result) + .expectNext("Hello Spring!") + .expectComplete().verify(Duration.ofSeconds(3)); + + expectRequestCount(1); + expectRequest(request -> { + assertEquals("testvalue", request.getHeader("X-Test-Header")); + assertEquals("*/*", request.getHeader(HttpHeaders.ACCEPT)); + assertEquals("/greeting?name=Spring", request.getPath()); + }); + } + @Test public void shouldReceiveJsonAsString() { String content = "{\"bar\":\"barbar\",\"foo\":\"foofoo\"}"; diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java index c43534f2578..29128d50eb3 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java @@ -16,12 +16,14 @@ package org.springframework.web.reactive.result.method.annotation; +import java.io.File; import java.time.Duration; import org.junit.Assume; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.junit.runners.Parameterized; import reactor.core.publisher.Flux; import reactor.core.publisher.MonoProcessor; import reactor.test.StepVerifier; @@ -30,11 +32,16 @@ import org.springframework.context.annotation.AnnotationConfigApplicationContext import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.client.reactive.ClientHttpConnector; +import org.springframework.http.client.reactive.JettyClientHttpConnector; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.http.codec.ServerSentEvent; import org.springframework.http.server.reactive.AbstractHttpHandlerIntegrationTests; import org.springframework.http.server.reactive.HttpHandler; import org.springframework.http.server.reactive.bootstrap.JettyHttpServer; import org.springframework.http.server.reactive.bootstrap.ReactorHttpServer; +import org.springframework.http.server.reactive.bootstrap.TomcatHttpServer; +import org.springframework.http.server.reactive.bootstrap.UndertowHttpServer; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @@ -56,12 +63,33 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { private WebClient webClient; + @Parameterized.Parameter(1) + public ClientHttpConnector connector; + + @Parameterized.Parameters(name = "server [{0}] webClient [{1}]") + public static Object[][] arguments() { + File base = new File(System.getProperty("java.io.tmpdir")); + return new Object[][] { + {new JettyHttpServer(), new ReactorClientHttpConnector()}, + {new JettyHttpServer(), new JettyClientHttpConnector()}, + {new ReactorHttpServer(), new ReactorClientHttpConnector()}, + {new ReactorHttpServer(), new JettyClientHttpConnector()}, + {new TomcatHttpServer(base.getAbsolutePath()), new ReactorClientHttpConnector()}, + {new TomcatHttpServer(base.getAbsolutePath()), new JettyClientHttpConnector()}, + {new UndertowHttpServer(), new ReactorClientHttpConnector()}, + {new UndertowHttpServer(), new JettyClientHttpConnector()} + }; + } @Override @Before public void setup() throws Exception { super.setup(); - this.webClient = WebClient.create("http://localhost:" + this.port + "/sse"); + this.webClient = WebClient + .builder() + .clientConnector(this.connector) + .baseUrl("http://localhost:" + this.port + "/sse") + .build(); } diff --git a/src/docs/asciidoc/web/webflux-webclient.adoc b/src/docs/asciidoc/web/webflux-webclient.adoc index 85a8b3fb501..e8a880e206b 100644 --- a/src/docs/asciidoc/web/webflux-webclient.adoc +++ b/src/docs/asciidoc/web/webflux-webclient.adoc @@ -12,7 +12,10 @@ server <>. The API exposes Reactor `Flux` and `Mono` types, also see <>. By default it uses it uses https://github.com/reactor/reactor-netty[Reactor Netty] as the HTTP client -library but others can be plugged in through a custom `ClientHttpConnector`. +library and +https://github.com/jetty-project/jetty-reactive-httpclient[Jetty ReactiveStreams HttpClient] +is supported as well via `JettyClientHttpConnector`, but others can be plugged in +through a custom `ClientHttpConnector`. By comparison to the <>, the `WebClient` is: