diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactor/ReactorHttpHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactor/HttpHandlerChannelHandler.java similarity index 69% rename from spring-web-reactive/src/main/java/org/springframework/http/server/reactor/ReactorHttpHandlerAdapter.java rename to spring-web-reactive/src/main/java/org/springframework/http/server/reactor/HttpHandlerChannelHandler.java index e85564f92e8..b77a7e58408 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactor/ReactorHttpHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactor/HttpHandlerChannelHandler.java @@ -16,7 +16,6 @@ package org.springframework.http.server.reactor; import org.reactivestreams.Publisher; -import reactor.core.publisher.convert.DependencyUtils; import reactor.io.buffer.Buffer; import reactor.io.net.ReactiveChannelHandler; import reactor.io.net.http.HttpChannel; @@ -27,30 +26,22 @@ import org.springframework.util.Assert; /** * @author Stephane Maldini */ -public class ReactorHttpHandlerAdapter +public class HttpHandlerChannelHandler implements ReactiveChannelHandler> { private final ReactiveHttpHandler httpHandler; - public ReactorHttpHandlerAdapter(ReactiveHttpHandler httpHandler) { + public HttpHandlerChannelHandler(ReactiveHttpHandler httpHandler) { Assert.notNull(httpHandler, "'httpHandler' is required."); this.httpHandler = httpHandler; } @Override public Publisher apply(HttpChannel channel) { - final PublisherReactorServerHttpRequest adaptedRequest; - final PublisherReactorServerHttpResponse adaptedResponse; - - if(DependencyUtils.hasReactorStream()){ - adaptedRequest = new ReactorServerHttpRequest(channel); - adaptedResponse = new ReactorServerHttpResponse(channel); - } - else{ - adaptedRequest = new PublisherReactorServerHttpRequest(channel); - adaptedResponse = new PublisherReactorServerHttpResponse(channel); - } + ReactorServerHttpRequest adaptedRequest = new ReactorServerHttpRequest(channel); + ReactorServerHttpResponse adaptedResponse = new ReactorServerHttpResponse(channel); return this.httpHandler.handle(adaptedRequest, adaptedResponse); } + } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactor/PublisherReactorServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactor/PublisherReactorServerHttpRequest.java deleted file mode 100644 index 16d883089fb..00000000000 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactor/PublisherReactorServerHttpRequest.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright 2002-2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.http.server.reactor; - -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.ByteBuffer; - -import org.reactivestreams.Publisher; -import reactor.Publishers; -import reactor.io.buffer.Buffer; -import reactor.io.net.http.HttpChannel; - -import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpMethod; -import org.springframework.http.server.ReactiveServerHttpRequest; -import org.springframework.util.Assert; - -/** - * @author Stephane Maldini - */ -public class PublisherReactorServerHttpRequest implements ReactiveServerHttpRequest { - - private final HttpChannel channel; - - private HttpHeaders headers; - - - public PublisherReactorServerHttpRequest(HttpChannel request) { - Assert.notNull("'request', request must not be null."); - this.channel = request; - } - - - @Override - public HttpHeaders getHeaders() { - if (this.headers == null) { - this.headers = new HttpHeaders(); - for (String name : this.channel.headers().names()) { - for (String value : this.channel.headers().getAll(name)) { - this.headers.add(name, value); - } - } - } - return this.headers; - } - - @Override - public HttpMethod getMethod() { - return HttpMethod.valueOf(this.channel.method().getName()); - } - - @Override - public URI getURI() { - try { - return new URI(this.channel.uri()); - } catch (URISyntaxException ex) { - throw new IllegalStateException("Could not get URI: " + ex.getMessage(), ex); - } - - } - - @Override - public Publisher getBody() { - return Publishers.map(channel.input(), Buffer::byteBuffer); - } - -} diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactor/PublisherReactorServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactor/PublisherReactorServerHttpResponse.java deleted file mode 100644 index ad7193f528c..00000000000 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactor/PublisherReactorServerHttpResponse.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright 2002-2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.http.server.reactor; - -import java.nio.ByteBuffer; - -import org.reactivestreams.Publisher; -import reactor.Publishers; -import reactor.io.buffer.Buffer; -import reactor.io.net.http.HttpChannel; -import reactor.io.net.http.model.Status; - -import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpStatus; -import org.springframework.http.server.ReactiveServerHttpResponse; -import org.springframework.util.Assert; - -/** - * @author Stephane Maldini - */ -public class PublisherReactorServerHttpResponse implements ReactiveServerHttpResponse { - - private final HttpChannel channel; - - private final HttpHeaders headers; - - private boolean headersWritten = false; - - - public PublisherReactorServerHttpResponse(HttpChannel response) { - Assert.notNull("'response', response must not be null."); - this.channel = response; - this.headers = new HttpHeaders(); - } - - - @Override - public void setStatusCode(HttpStatus status) { - this.channel.responseStatus(Status.valueOf(status.value())); - } - - @Override - public HttpHeaders getHeaders() { - return (this.headersWritten ? HttpHeaders.readOnlyHttpHeaders(this.headers) : this.headers); - } - - @Override - public Publisher writeHeaders() { - if (this.headersWritten) { - return Publishers.empty(); - } - applyHeaders(); - return this.channel.writeHeaders(); - } - - @Override - public Publisher setBody(Publisher contentPublisher) { - applyHeaders(); - return this.channel.writeWith(Publishers.map(contentPublisher, Buffer::new)); - } - - private void applyHeaders() { - if (!this.headersWritten) { - for (String name : this.headers.keySet()) { - for (String value : this.headers.get(name)) { - this.channel.responseHeaders().add(name, value); - } - } - this.headersWritten = true; - } - } -} diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactor/ReactorServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactor/ReactorServerHttpRequest.java index 71fff0002b5..6d5667b6db5 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactor/ReactorServerHttpRequest.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactor/ReactorServerHttpRequest.java @@ -13,28 +13,69 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.springframework.http.server.reactor; +import java.net.URI; +import java.net.URISyntaxException; import java.nio.ByteBuffer; +import org.reactivestreams.Publisher; +import reactor.Publishers; import reactor.io.buffer.Buffer; import reactor.io.net.http.HttpChannel; -import reactor.rx.Stream; -import reactor.rx.Streams; + +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.server.ReactiveServerHttpRequest; +import org.springframework.util.Assert; /** * @author Stephane Maldini */ -public class ReactorServerHttpRequest extends PublisherReactorServerHttpRequest { +public class ReactorServerHttpRequest implements ReactiveServerHttpRequest { + + private final HttpChannel channel; + + private HttpHeaders headers; + public ReactorServerHttpRequest(HttpChannel request) { - super(request); + Assert.notNull("'request', request must not be null."); + this.channel = request; + } + + + @Override + public HttpHeaders getHeaders() { + if (this.headers == null) { + this.headers = new HttpHeaders(); + for (String name : this.channel.headers().names()) { + for (String value : this.channel.headers().getAll(name)) { + this.headers.add(name, value); + } + } + } + return this.headers; + } + + @Override + public HttpMethod getMethod() { + return HttpMethod.valueOf(this.channel.method().getName()); + } + + @Override + public URI getURI() { + try { + return new URI(this.channel.uri()); + } catch (URISyntaxException ex) { + throw new IllegalStateException("Could not get URI: " + ex.getMessage(), ex); + } + } @Override - public Stream getBody() { - return Streams.wrap(super.getBody()); + public Publisher getBody() { + return Publishers.map(this.channel.input(), Buffer::byteBuffer); } } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactor/ReactorServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactor/ReactorServerHttpResponse.java index ece8e23cd21..d42992f3b81 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactor/ReactorServerHttpResponse.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactor/ReactorServerHttpResponse.java @@ -13,33 +13,73 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.springframework.http.server.reactor; import java.nio.ByteBuffer; import org.reactivestreams.Publisher; +import reactor.Publishers; import reactor.io.buffer.Buffer; import reactor.io.net.http.HttpChannel; -import reactor.rx.Stream; -import reactor.rx.Streams; +import reactor.io.net.http.model.Status; + +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.server.ReactiveServerHttpResponse; +import org.springframework.util.Assert; /** * @author Stephane Maldini */ -public class ReactorServerHttpResponse extends PublisherReactorServerHttpResponse { +public class ReactorServerHttpResponse implements ReactiveServerHttpResponse { + + private final HttpChannel channel; + + private final HttpHeaders headers; + + private boolean headersWritten = false; + public ReactorServerHttpResponse(HttpChannel response) { - super(response); + Assert.notNull("'response', response must not be null."); + this.channel = response; + this.headers = new HttpHeaders(); } + @Override - public Stream writeHeaders() { - return Streams.wrap(super.writeHeaders()); + public void setStatusCode(HttpStatus status) { + this.channel.responseStatus(Status.valueOf(status.value())); } @Override - public Stream setBody(Publisher contentPublisher) { - return Streams.wrap(super.setBody(contentPublisher)); + public HttpHeaders getHeaders() { + return (this.headersWritten ? HttpHeaders.readOnlyHttpHeaders(this.headers) : this.headers); + } + + @Override + public Publisher writeHeaders() { + if (this.headersWritten) { + return Publishers.empty(); + } + applyHeaders(); + return this.channel.writeHeaders(); + } + + @Override + public Publisher setBody(Publisher contentPublisher) { + applyHeaders(); + return this.channel.writeWith(Publishers.map(contentPublisher, Buffer::new)); + } + + private void applyHeaders() { + if (!this.headersWritten) { + for (String name : this.headers.keySet()) { + for (String value : this.headers.get(name)) { + this.channel.responseHeaders().add(name, value); + } + } + this.headersWritten = true; + } } } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/rxnetty/RxNettyHttpHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/http/server/rxnetty/HttpHandlerRequestHandler.java similarity index 93% rename from spring-web-reactive/src/main/java/org/springframework/http/server/rxnetty/RxNettyHttpHandlerAdapter.java rename to spring-web-reactive/src/main/java/org/springframework/http/server/rxnetty/HttpHandlerRequestHandler.java index 3d6dde0e486..07866c3f87f 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/rxnetty/RxNettyHttpHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/rxnetty/HttpHandlerRequestHandler.java @@ -30,12 +30,12 @@ import org.springframework.util.Assert; /** * @author Rossen Stoyanchev */ -public class RxNettyHttpHandlerAdapter implements RequestHandler { +public class HttpHandlerRequestHandler implements RequestHandler { private final ReactiveHttpHandler httpHandler; - public RxNettyHttpHandlerAdapter(ReactiveHttpHandler httpHandler) { + public HttpHandlerRequestHandler(ReactiveHttpHandler httpHandler) { Assert.notNull(httpHandler, "'httpHandler' is required."); this.httpHandler = httpHandler; } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/servlet31/Servlet31HttpHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/http/server/servlet31/HttpHandlerServlet.java similarity index 95% rename from spring-web-reactive/src/main/java/org/springframework/http/server/servlet31/Servlet31HttpHandlerAdapter.java rename to spring-web-reactive/src/main/java/org/springframework/http/server/servlet31/HttpHandlerServlet.java index 47668092617..00945747af5 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/servlet31/Servlet31HttpHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/servlet31/HttpHandlerServlet.java @@ -37,11 +37,11 @@ import org.springframework.http.server.ReactiveHttpHandler; * @author Rossen Stoyanchev */ @WebServlet(asyncSupported = true) -public class Servlet31HttpHandlerAdapter extends HttpServlet { +public class HttpHandlerServlet extends HttpServlet { private static final int BUFFER_SIZE = 8192; - private static Log logger = LogFactory.getLog(Servlet31HttpHandlerAdapter.class); + private static Log logger = LogFactory.getLog(HttpHandlerServlet.class); private ReactiveHttpHandler handler; diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/support/JettyHttpServer.java b/spring-web-reactive/src/main/java/org/springframework/http/server/support/JettyHttpServer.java index 38cc7d1a83b..43bdaf62cfe 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/support/JettyHttpServer.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/support/JettyHttpServer.java @@ -24,7 +24,7 @@ import org.eclipse.jetty.servlet.ServletHolder; import org.springframework.beans.factory.InitializingBean; import org.springframework.util.Assert; import org.springframework.util.SocketUtils; -import org.springframework.http.server.servlet31.Servlet31HttpHandlerAdapter; +import org.springframework.http.server.servlet31.HttpHandlerServlet; /** * @author Rossen Stoyanchev @@ -51,7 +51,7 @@ public class JettyHttpServer extends HttpServerSupport implements InitializingBe this.jettyServer = new Server(); Assert.notNull(getHttpHandler()); - Servlet31HttpHandlerAdapter servlet = new Servlet31HttpHandlerAdapter(); + HttpHandlerServlet servlet = new HttpHandlerServlet(); servlet.setHandler(getHttpHandler()); ServletHolder servletHolder = new ServletHolder(servlet); diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/support/ReactorHttpServer.java b/spring-web-reactive/src/main/java/org/springframework/http/server/support/ReactorHttpServer.java index 868f0f53a6b..91245e1ed94 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/support/ReactorHttpServer.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/support/ReactorHttpServer.java @@ -21,7 +21,7 @@ import reactor.io.net.ReactiveNet; import org.springframework.beans.factory.InitializingBean; import org.springframework.util.Assert; -import org.springframework.http.server.reactor.ReactorHttpHandlerAdapter; +import org.springframework.http.server.reactor.HttpHandlerChannelHandler; /** * @author Stephane Maldini @@ -29,7 +29,7 @@ import org.springframework.http.server.reactor.ReactorHttpHandlerAdapter; public class ReactorHttpServer extends HttpServerSupport implements InitializingBean, HttpServer { - private ReactorHttpHandlerAdapter reactorHandler; + private HttpHandlerChannelHandler reactorHandler; private reactor.io.net.http.HttpServer reactorServer; @@ -44,7 +44,7 @@ public class ReactorHttpServer extends HttpServerSupport public void afterPropertiesSet() throws Exception { Assert.notNull(getHttpHandler()); - this.reactorHandler = new ReactorHttpHandlerAdapter(getHttpHandler()); + this.reactorHandler = new HttpHandlerChannelHandler(getHttpHandler()); this.reactorServer = (getPort() != -1 ? ReactiveNet.httpServer(getPort()) : ReactiveNet.httpServer()); diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/support/RxNettyHttpServer.java b/spring-web-reactive/src/main/java/org/springframework/http/server/support/RxNettyHttpServer.java index fe0bf6323d8..f71859cfab8 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/support/RxNettyHttpServer.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/support/RxNettyHttpServer.java @@ -20,7 +20,7 @@ import io.netty.buffer.ByteBuf; import org.springframework.beans.factory.InitializingBean; import org.springframework.util.Assert; -import org.springframework.http.server.rxnetty.RxNettyHttpHandlerAdapter; +import org.springframework.http.server.rxnetty.HttpHandlerRequestHandler; /** @@ -28,7 +28,7 @@ import org.springframework.http.server.rxnetty.RxNettyHttpHandlerAdapter; */ public class RxNettyHttpServer extends HttpServerSupport implements InitializingBean, HttpServer { - private RxNettyHttpHandlerAdapter rxNettyHandler; + private HttpHandlerRequestHandler rxNettyHandler; private io.reactivex.netty.protocol.http.server.HttpServer rxNettyServer; @@ -45,7 +45,7 @@ public class RxNettyHttpServer extends HttpServerSupport implements Initializing public void afterPropertiesSet() throws Exception { Assert.notNull(getHttpHandler()); - this.rxNettyHandler = new RxNettyHttpHandlerAdapter(getHttpHandler()); + this.rxNettyHandler = new HttpHandlerRequestHandler(getHttpHandler()); this.rxNettyServer = (getPort() != -1 ? io.reactivex.netty.protocol.http.server.HttpServer.newServer(getPort()) : diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/support/TomcatHttpServer.java b/spring-web-reactive/src/main/java/org/springframework/http/server/support/TomcatHttpServer.java index fa4d989480d..ed73f3d31e4 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/support/TomcatHttpServer.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/support/TomcatHttpServer.java @@ -25,7 +25,7 @@ import org.apache.catalina.startup.Tomcat; import org.springframework.beans.factory.InitializingBean; import org.springframework.util.Assert; import org.springframework.util.SocketUtils; -import org.springframework.http.server.servlet31.Servlet31HttpHandlerAdapter; +import org.springframework.http.server.servlet31.HttpHandlerServlet; /** @@ -54,7 +54,7 @@ public class TomcatHttpServer extends HttpServerSupport implements InitializingB this.tomcatServer.setPort(getPort()); Assert.notNull(getHttpHandler()); - Servlet31HttpHandlerAdapter servlet = new Servlet31HttpHandlerAdapter(); + HttpHandlerServlet servlet = new HttpHandlerServlet(); servlet.setHandler(getHttpHandler()); File base = new File(System.getProperty("java.io.tmpdir")); diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/support/UndertowHttpServer.java b/spring-web-reactive/src/main/java/org/springframework/http/server/support/UndertowHttpServer.java index 8a0cb5b40a0..cc267f4c55d 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/support/UndertowHttpServer.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/support/UndertowHttpServer.java @@ -18,7 +18,7 @@ package org.springframework.http.server.support; import org.springframework.beans.factory.InitializingBean; import org.springframework.util.Assert; -import org.springframework.http.server.undertow.UndertowHttpHandlerAdapter; +import org.springframework.http.server.undertow.HttpHandlerHttpHandler; import io.undertow.Undertow; import io.undertow.server.HttpHandler; @@ -36,7 +36,7 @@ public class UndertowHttpServer extends HttpServerSupport implements Initializin @Override public void afterPropertiesSet() throws Exception { Assert.notNull(getHttpHandler()); - HttpHandler handler = new UndertowHttpHandlerAdapter(getHttpHandler()); + HttpHandler handler = new HttpHandlerHttpHandler(getHttpHandler()); int port = (getPort() != -1 ? getPort() : 8080); this.server = Undertow.builder().addHttpListener(port, "localhost") .setHandler(handler).build(); diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/undertow/UndertowHttpHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/http/server/undertow/HttpHandlerHttpHandler.java similarity index 91% rename from spring-web-reactive/src/main/java/org/springframework/http/server/undertow/UndertowHttpHandlerAdapter.java rename to spring-web-reactive/src/main/java/org/springframework/http/server/undertow/HttpHandlerHttpHandler.java index 040e208765c..1f9cb090761 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/undertow/UndertowHttpHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/undertow/HttpHandlerHttpHandler.java @@ -32,15 +32,15 @@ import org.reactivestreams.Subscription; * @author Marek Hawrylczak * @author Rossen Stoyanchev */ -public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandler { +public class HttpHandlerHttpHandler implements io.undertow.server.HttpHandler { - private static Log logger = LogFactory.getLog(UndertowHttpHandlerAdapter.class); + private static Log logger = LogFactory.getLog(HttpHandlerHttpHandler.class); private final ReactiveHttpHandler delegate; - public UndertowHttpHandlerAdapter(ReactiveHttpHandler delegate) { + public HttpHandlerHttpHandler(ReactiveHttpHandler delegate) { Assert.notNull(delegate, "'delegate' is required."); this.delegate = delegate; }