diff --git a/build.gradle b/build.gradle index 864f84f1cc3..38834bdd0a3 100644 --- a/build.gradle +++ b/build.gradle @@ -779,6 +779,7 @@ project("spring-web") { optional("javax.xml.ws:jaxws-api:${jaxwsVersion}") optional("javax.mail:javax.mail-api:${javamailVersion}") optional("org.jetbrains.kotlin:kotlin-stdlib:${kotlinVersion}") + optional("org.apache.tomcat:tomcat-catalina:${tomcatVersion}") testCompile(project(":spring-context-support")) // for JafMediaTypeFactory testCompile("io.projectreactor.addons:reactor-test") testCompile("org.apache.taglibs:taglibs-standard-jstlel:1.2.1") { diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/JettyHttpHandlerAdapter.java b/spring-web/src/main/java/org/springframework/http/server/reactive/JettyHttpHandlerAdapter.java new file mode 100644 index 00000000000..cafdb26f803 --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/JettyHttpHandlerAdapter.java @@ -0,0 +1,80 @@ +/* + * Copyright 2002-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.server.reactive; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; + +import javax.servlet.AsyncContext; +import javax.servlet.ServletOutputStream; +import javax.servlet.annotation.WebServlet; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.server.HttpOutput; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; + +/** + * Adapt {@link HttpHandler} to an {@link HttpServlet} using Servlet Async + * support and Servlet 3.1 non-blocking I/O. Use Jetty API for writing with + * ByteBuffer. + * + * @author Violeta Georgieva + * @since 5.0 + */ +@WebServlet(asyncSupported = true) +public class JettyHttpHandlerAdapter extends ServletHttpHandlerAdapter { + + public JettyHttpHandlerAdapter(HttpHandler httpHandler) { + super(httpHandler); + } + + public JettyHttpHandlerAdapter(Map handlerMap) { + super(handlerMap); + } + + + @Override + protected ServerHttpResponse createServletServerHttpResponse( + HttpServletResponse response, AsyncContext asyncContext) throws IOException { + return new JettyServerHttpResponse( + response, asyncContext, getDataBufferFactory(), getBufferSize()); + } + + + private static final class JettyServerHttpResponse extends ServletServerHttpResponse { + + public JettyServerHttpResponse(HttpServletResponse response, AsyncContext asyncContext, + DataBufferFactory bufferFactory, int bufferSize) throws IOException { + super(response, asyncContext, bufferFactory, bufferSize); + } + + @Override + protected int writeDataBuffer(DataBuffer dataBuffer) throws IOException { + ServletOutputStream outputStream = getServletResponse().getOutputStream(); + ByteBuffer input = dataBuffer.asByteBuffer(); + int len = input.remaining(); + if (outputStream.isReady() && len > 0) { + ((HttpOutput) outputStream).write(input); + } + return len; + } + } + +} \ No newline at end of file diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java index 0f242f781e0..cb049774a36 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java @@ -98,16 +98,28 @@ public class ServletHttpHandlerAdapter extends HttpHandlerAdapterSupport impleme // Start async before Read/WriteListener registration AsyncContext asyncContext = request.startAsync(); - ServerHttpRequest httpRequest = new ServletServerHttpRequest( - ((HttpServletRequest) request), asyncContext, getDataBufferFactory(), getBufferSize()); + ServerHttpRequest httpRequest = createServletServerHttpRequest( + ((HttpServletRequest) request), asyncContext); - ServerHttpResponse httpResponse = new ServletServerHttpResponse( - ((HttpServletResponse) response), asyncContext, getDataBufferFactory(), getBufferSize()); + ServerHttpResponse httpResponse = createServletServerHttpResponse( + ((HttpServletResponse) response), asyncContext); HandlerResultSubscriber subscriber = new HandlerResultSubscriber(asyncContext); getHttpHandler().handle(httpRequest, httpResponse).subscribe(subscriber); } + protected ServerHttpRequest createServletServerHttpRequest(HttpServletRequest request, + AsyncContext asyncContext) throws IOException { + return new ServletServerHttpRequest( + request, asyncContext, getDataBufferFactory(), getBufferSize()); + } + + protected ServerHttpResponse createServletServerHttpResponse(HttpServletResponse response, + AsyncContext asyncContext) throws IOException { + return new ServletServerHttpResponse( + response, asyncContext, getDataBufferFactory(), getBufferSize()); + } + // Other Servlet methods... @Override diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java index 05f309d9c46..0a4a72d41c2 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java @@ -32,6 +32,8 @@ import javax.servlet.http.HttpServletRequest; import reactor.core.publisher.Flux; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.http.HttpCookie; @@ -58,6 +60,12 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest { private final Object cookieLock = new Object(); + private final DataBufferFactory bufferFactory; + + private final byte[] buffer; + + protected final Log logger = LogFactory.getLog(getClass()); + public ServletServerHttpRequest(HttpServletRequest request, AsyncContext asyncContext, DataBufferFactory bufferFactory, int bufferSize) throws IOException { @@ -68,12 +76,14 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest { Assert.isTrue(bufferSize > 0, "'bufferSize' must be higher than 0"); this.request = request; + this.bufferFactory = bufferFactory; + this.buffer = new byte[bufferSize]; asyncContext.addListener(new RequestAsyncListener()); // Tomcat expects ReadListener registration on initial thread ServletInputStream inputStream = request.getInputStream(); - this.bodyPublisher = new RequestBodyPublisher(inputStream, bufferFactory, bufferSize); + this.bodyPublisher = new RequestBodyPublisher(inputStream); this.bodyPublisher.registerReadListener(); } @@ -169,6 +179,21 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest { return Flux.from(this.bodyPublisher); } + protected DataBuffer readDataBuffer() throws IOException { + int read = this.request.getInputStream().read(this.buffer); + if (logger.isTraceEnabled()) { + logger.trace("read:" + read); + } + + if (read > 0) { + DataBuffer dataBuffer = this.bufferFactory.allocateBuffer(read); + dataBuffer.write(this.buffer, 0, read); + return dataBuffer; + } + + return null; + } + private final class RequestAsyncListener implements AsyncListener { @@ -193,21 +218,14 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest { } } - private static class RequestBodyPublisher extends AbstractListenerReadPublisher { + private class RequestBodyPublisher extends AbstractListenerReadPublisher { private final ServletInputStream inputStream; - private final DataBufferFactory bufferFactory; - - private final byte[] buffer; - - public RequestBodyPublisher(ServletInputStream inputStream, - DataBufferFactory bufferFactory, int bufferSize) { + public RequestBodyPublisher(ServletInputStream inputStream) { this.inputStream = inputStream; - this.bufferFactory = bufferFactory; - this.buffer = new byte[bufferSize]; } public void registerReadListener() throws IOException { @@ -224,16 +242,7 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest { @Override protected DataBuffer read() throws IOException { if (this.inputStream.isReady()) { - int read = this.inputStream.read(this.buffer); - if (logger.isTraceEnabled()) { - logger.trace("read:" + read); - } - - if (read > 0) { - DataBuffer dataBuffer = this.bufferFactory.allocateBuffer(read); - dataBuffer.write(this.buffer, 0, read); - return dataBuffer; - } + return readDataBuffer(); } return null; } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java index 11be9b81424..fc3fa1e01f1 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java @@ -132,6 +132,19 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons return processor; } + protected int writeDataBuffer(DataBuffer dataBuffer) throws IOException { + ServletOutputStream outputStream = response.getOutputStream(); + InputStream input = dataBuffer.asInputStream(); + int bytesWritten = 0; + byte[] buffer = new byte[this.bufferSize]; + int bytesRead = -1; + while (outputStream.isReady() && (bytesRead = input.read(buffer)) != -1) { + outputStream.write(buffer, 0, bytesRead); + bytesWritten += bytesRead; + } + return bytesWritten; + } + private void flush() throws IOException { ServletOutputStream outputStream = this.response.getOutputStream(); if (outputStream.isReady()) { @@ -215,7 +228,7 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons protected Processor createWriteProcessor() { try { ServletOutputStream outputStream = response.getOutputStream(); - bodyProcessor = new ResponseBodyProcessor(outputStream, bufferSize); + bodyProcessor = new ResponseBodyProcessor(outputStream); return bodyProcessor; } catch (IOException ex) { @@ -236,12 +249,9 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons private final ServletOutputStream outputStream; - private final int bufferSize; - - public ResponseBodyProcessor(ServletOutputStream outputStream, int bufferSize) { + public ResponseBodyProcessor(ServletOutputStream outputStream) { this.outputStream = outputStream; - this.bufferSize = bufferSize; } @Override @@ -288,18 +298,6 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons return false; } } - - private int writeDataBuffer(DataBuffer dataBuffer) throws IOException { - InputStream input = dataBuffer.asInputStream(); - int bytesWritten = 0; - byte[] buffer = new byte[this.bufferSize]; - int bytesRead = -1; - while (this.outputStream.isReady() && (bytesRead = input.read(buffer)) != -1) { - this.outputStream.write(buffer, 0, bytesRead); - bytesWritten += bytesRead; - } - return bytesWritten; - } } } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/TomcatHttpHandlerAdapter.java b/spring-web/src/main/java/org/springframework/http/server/reactive/TomcatHttpHandlerAdapter.java new file mode 100644 index 00000000000..e493c62b3ca --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/TomcatHttpHandlerAdapter.java @@ -0,0 +1,117 @@ +/* + * Copyright 2002-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.server.reactive; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; + +import javax.servlet.AsyncContext; +import javax.servlet.ServletOutputStream; +import javax.servlet.annotation.WebServlet; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.catalina.connector.CoyoteInputStream; +import org.apache.catalina.connector.CoyoteOutputStream; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; + +/** + * Adapt {@link HttpHandler} to an {@link HttpServlet} using Servlet Async + * support and Servlet 3.1 non-blocking I/O. Use Tomcat API for + * reading/writing with ByteBuffer. + * + * @author Violeta Georgieva + * @since 5.0 + */ +@WebServlet(asyncSupported = true) +public class TomcatHttpHandlerAdapter extends ServletHttpHandlerAdapter { + + public TomcatHttpHandlerAdapter(HttpHandler httpHandler) { + super(httpHandler); + } + + public TomcatHttpHandlerAdapter(Map handlerMap) { + super(handlerMap); + } + + + @Override + protected ServerHttpRequest createServletServerHttpRequest( + HttpServletRequest request, AsyncContext asyncContext) throws IOException { + return new TomcatServerHttpRequest( + request, asyncContext, getDataBufferFactory(), getBufferSize()); + } + + @Override + protected ServerHttpResponse createServletServerHttpResponse( + HttpServletResponse response, AsyncContext asyncContext) throws IOException { + return new TomcatServerHttpResponse( + response, asyncContext, getDataBufferFactory(), getBufferSize()); + } + + + private final class TomcatServerHttpRequest extends ServletServerHttpRequest { + + public TomcatServerHttpRequest(HttpServletRequest request, AsyncContext asyncContext, + DataBufferFactory bufferFactory, int bufferSize) throws IOException { + super(request, asyncContext, bufferFactory, bufferSize); + } + + @Override + protected DataBuffer readDataBuffer() throws IOException { + DataBuffer buffer = getDataBufferFactory().allocateBuffer(getBufferSize()); + ByteBuffer byteBuffer = buffer.asByteBuffer(); + byteBuffer.limit(byteBuffer.capacity()); + + int read = ((CoyoteInputStream) getServletRequest().getInputStream()).read( + byteBuffer); + if (logger.isTraceEnabled()) { + logger.trace("read:" + read); + } + + if (read > 0) { + return getDataBufferFactory().wrap(byteBuffer); + } + + return null; + } + } + + + private static final class TomcatServerHttpResponse extends ServletServerHttpResponse { + + public TomcatServerHttpResponse(HttpServletResponse response, AsyncContext asyncContext, + DataBufferFactory bufferFactory, int bufferSize) throws IOException { + super(response, asyncContext, bufferFactory, bufferSize); + } + + @Override + protected int writeDataBuffer(DataBuffer dataBuffer) throws IOException { + ServletOutputStream outputStream = getServletResponse().getOutputStream(); + ByteBuffer input = dataBuffer.asByteBuffer(); + int len = input.remaining(); + if (outputStream.isReady() && len > 0) { + ((CoyoteOutputStream) outputStream).write(input); + } + return len; + } + } + +} \ No newline at end of file diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/JettyHttpServer.java b/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/JettyHttpServer.java index 50886c30c81..5e98637bf95 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/JettyHttpServer.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/JettyHttpServer.java @@ -22,6 +22,7 @@ import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; import org.springframework.beans.factory.InitializingBean; +import org.springframework.http.server.reactive.JettyHttpHandlerAdapter; import org.springframework.http.server.reactive.ServletHttpHandlerAdapter; import org.springframework.util.Assert; @@ -56,11 +57,11 @@ public class JettyHttpServer extends HttpServerSupport implements HttpServer, In private ServletHttpHandlerAdapter initServletHttpHandlerAdapter() { if (getHttpHandlerMap() != null) { - return new ServletHttpHandlerAdapter(getHttpHandlerMap()); + return new JettyHttpHandlerAdapter(getHttpHandlerMap()); } else { Assert.notNull(getHttpHandler()); - return new ServletHttpHandlerAdapter(getHttpHandler()); + return new JettyHttpHandlerAdapter(getHttpHandler()); } } diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/TomcatHttpServer.java b/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/TomcatHttpServer.java index 62337269f5a..958901bbd4a 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/TomcatHttpServer.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/TomcatHttpServer.java @@ -24,6 +24,7 @@ import org.apache.catalina.startup.Tomcat; import org.springframework.beans.factory.InitializingBean; import org.springframework.http.server.reactive.ServletHttpHandlerAdapter; +import org.springframework.http.server.reactive.TomcatHttpHandlerAdapter; import org.springframework.util.Assert; /** @@ -75,11 +76,11 @@ public class TomcatHttpServer extends HttpServerSupport implements HttpServer, I private ServletHttpHandlerAdapter initServletHttpHandlerAdapter() { if (getHttpHandlerMap() != null) { - return new ServletHttpHandlerAdapter(getHttpHandlerMap()); + return new TomcatHttpHandlerAdapter(getHttpHandlerMap()); } else { Assert.notNull(getHttpHandler()); - return new ServletHttpHandlerAdapter(getHttpHandler()); + return new TomcatHttpHandlerAdapter(getHttpHandler()); } }