Browse Source

Polish

pull/1307/head
Rossen Stoyanchev 9 years ago
parent
commit
a2aaa05592
  1. 2
      build.gradle
  2. 23
      spring-web/src/main/java/org/springframework/http/server/reactive/JettyHttpHandlerAdapter.java
  3. 23
      spring-web/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java
  4. 9
      spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java
  5. 6
      spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java
  6. 44
      spring-web/src/main/java/org/springframework/http/server/reactive/TomcatHttpHandlerAdapter.java

2
build.gradle

@ -748,6 +748,7 @@ project("spring-web") {
optional("io.reactivex:rxjava:${rxjavaVersion}") optional("io.reactivex:rxjava:${rxjavaVersion}")
optional "io.reactivex.rxjava2:rxjava:${rxjava2Version}" optional "io.reactivex.rxjava2:rxjava:${rxjava2Version}"
optional("io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}") optional("io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}")
optional("org.apache.tomcat:tomcat-catalina:${tomcatVersion}")
optional("io.undertow:undertow-core:${undertowVersion}") optional("io.undertow:undertow-core:${undertowVersion}")
optional("org.jboss.xnio:xnio-api:${xnioVersion}") optional("org.jboss.xnio:xnio-api:${xnioVersion}")
optional("io.netty:netty-buffer:${nettyVersion}") // temporarily for JsonObjectDecoder optional("io.netty:netty-buffer:${nettyVersion}") // temporarily for JsonObjectDecoder
@ -779,7 +780,6 @@ project("spring-web") {
optional("javax.xml.ws:jaxws-api:${jaxwsVersion}") optional("javax.xml.ws:jaxws-api:${jaxwsVersion}")
optional("javax.mail:javax.mail-api:${javamailVersion}") optional("javax.mail:javax.mail-api:${javamailVersion}")
optional("org.jetbrains.kotlin:kotlin-stdlib:${kotlinVersion}") optional("org.jetbrains.kotlin:kotlin-stdlib:${kotlinVersion}")
optional("org.apache.tomcat:tomcat-catalina:${tomcatVersion}")
testCompile(project(":spring-context-support")) // for JafMediaTypeFactory testCompile(project(":spring-context-support")) // for JafMediaTypeFactory
testCompile("io.projectreactor.addons:reactor-test") testCompile("io.projectreactor.addons:reactor-test")
testCompile("org.apache.taglibs:taglibs-standard-jstlel:1.2.1") { testCompile("org.apache.taglibs:taglibs-standard-jstlel:1.2.1") {

23
spring-web/src/main/java/org/springframework/http/server/reactive/JettyHttpHandlerAdapter.java

@ -31,9 +31,8 @@ import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferFactory;
/** /**
* Adapt {@link HttpHandler} to an {@link HttpServlet} using Servlet Async * {@link ServletHttpHandlerAdapter} extension that uses Jetty APIs for writing
* support and Servlet 3.1 non-blocking I/O. Use Jetty API for writing with * to the response with {@link ByteBuffer}.
* ByteBuffer.
* *
* @author Violeta Georgieva * @author Violeta Georgieva
* @since 5.0 * @since 5.0
@ -41,6 +40,7 @@ import org.springframework.core.io.buffer.DataBufferFactory;
@WebServlet(asyncSupported = true) @WebServlet(asyncSupported = true)
public class JettyHttpHandlerAdapter extends ServletHttpHandlerAdapter { public class JettyHttpHandlerAdapter extends ServletHttpHandlerAdapter {
public JettyHttpHandlerAdapter(HttpHandler httpHandler) { public JettyHttpHandlerAdapter(HttpHandler httpHandler) {
super(httpHandler); super(httpHandler);
} }
@ -51,22 +51,23 @@ public class JettyHttpHandlerAdapter extends ServletHttpHandlerAdapter {
@Override @Override
protected ServerHttpResponse createServletServerHttpResponse( protected ServerHttpResponse createResponse(HttpServletResponse response,
HttpServletResponse response, AsyncContext asyncContext) throws IOException { AsyncContext context) throws IOException {
return new JettyServerHttpResponse(
response, asyncContext, getDataBufferFactory(), getBufferSize()); return new JettyServerHttpResponse(response, context, getDataBufferFactory(), getBufferSize());
} }
private static final class JettyServerHttpResponse extends ServletServerHttpResponse { private static final class JettyServerHttpResponse extends ServletServerHttpResponse {
public JettyServerHttpResponse(HttpServletResponse response, AsyncContext asyncContext, public JettyServerHttpResponse(HttpServletResponse response, AsyncContext context,
DataBufferFactory bufferFactory, int bufferSize) throws IOException { DataBufferFactory factory, int bufferSize) throws IOException {
super(response, asyncContext, bufferFactory, bufferSize);
super(response, context, factory, bufferSize);
} }
@Override @Override
protected int writeDataBuffer(DataBuffer dataBuffer) throws IOException { protected int writeToOutputStream(DataBuffer dataBuffer) throws IOException {
ServletOutputStream outputStream = getServletResponse().getOutputStream(); ServletOutputStream outputStream = getServletResponse().getOutputStream();
ByteBuffer input = dataBuffer.asByteBuffer(); ByteBuffer input = dataBuffer.asByteBuffer();
int len = input.remaining(); int len = input.remaining();

23
spring-web/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java

@ -98,26 +98,23 @@ public class ServletHttpHandlerAdapter extends HttpHandlerAdapterSupport impleme
// Start async before Read/WriteListener registration // Start async before Read/WriteListener registration
AsyncContext asyncContext = request.startAsync(); AsyncContext asyncContext = request.startAsync();
ServerHttpRequest httpRequest = createServletServerHttpRequest( ServerHttpRequest httpRequest = createRequest(((HttpServletRequest) request), asyncContext);
((HttpServletRequest) request), asyncContext); ServerHttpResponse httpResponse = createResponse(((HttpServletResponse) response), asyncContext);
ServerHttpResponse httpResponse = createServletServerHttpResponse(
((HttpServletResponse) response), asyncContext);
HandlerResultSubscriber subscriber = new HandlerResultSubscriber(asyncContext); HandlerResultSubscriber subscriber = new HandlerResultSubscriber(asyncContext);
getHttpHandler().handle(httpRequest, httpResponse).subscribe(subscriber); getHttpHandler().handle(httpRequest, httpResponse).subscribe(subscriber);
} }
protected ServerHttpRequest createServletServerHttpRequest(HttpServletRequest request, protected ServerHttpRequest createRequest(HttpServletRequest request, AsyncContext context)
AsyncContext asyncContext) throws IOException { throws IOException {
return new ServletServerHttpRequest(
request, asyncContext, getDataBufferFactory(), getBufferSize()); return new ServletServerHttpRequest(request, context, getDataBufferFactory(), getBufferSize());
} }
protected ServerHttpResponse createServletServerHttpResponse(HttpServletResponse response, protected ServerHttpResponse createResponse(HttpServletResponse response, AsyncContext context)
AsyncContext asyncContext) throws IOException { throws IOException {
return new ServletServerHttpResponse(
response, asyncContext, getDataBufferFactory(), getBufferSize()); return new ServletServerHttpResponse(response, context, getDataBufferFactory(), getBufferSize());
} }
// Other Servlet methods... // Other Servlet methods...

9
spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java

@ -54,6 +54,9 @@ import org.springframework.util.StringUtils;
*/ */
public class ServletServerHttpRequest extends AbstractServerHttpRequest { public class ServletServerHttpRequest extends AbstractServerHttpRequest {
protected final Log logger = LogFactory.getLog(getClass());
private final HttpServletRequest request; private final HttpServletRequest request;
private final RequestBodyPublisher bodyPublisher; private final RequestBodyPublisher bodyPublisher;
@ -64,8 +67,6 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest {
private final byte[] buffer; private final byte[] buffer;
protected final Log logger = LogFactory.getLog(getClass());
public ServletServerHttpRequest(HttpServletRequest request, AsyncContext asyncContext, public ServletServerHttpRequest(HttpServletRequest request, AsyncContext asyncContext,
DataBufferFactory bufferFactory, int bufferSize) throws IOException { DataBufferFactory bufferFactory, int bufferSize) throws IOException {
@ -179,7 +180,7 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest {
return Flux.from(this.bodyPublisher); return Flux.from(this.bodyPublisher);
} }
protected DataBuffer readDataBuffer() throws IOException { protected DataBuffer readFromInputStream() throws IOException {
int read = this.request.getInputStream().read(this.buffer); int read = this.request.getInputStream().read(this.buffer);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("read:" + read); logger.trace("read:" + read);
@ -242,7 +243,7 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest {
@Override @Override
protected DataBuffer read() throws IOException { protected DataBuffer read() throws IOException {
if (this.inputStream.isReady()) { if (this.inputStream.isReady()) {
return readDataBuffer(); return readFromInputStream();
} }
return null; return null;
} }

6
spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java

@ -132,12 +132,12 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
return processor; return processor;
} }
protected int writeDataBuffer(DataBuffer dataBuffer) throws IOException { protected int writeToOutputStream(DataBuffer dataBuffer) throws IOException {
ServletOutputStream outputStream = response.getOutputStream(); ServletOutputStream outputStream = response.getOutputStream();
InputStream input = dataBuffer.asInputStream(); InputStream input = dataBuffer.asInputStream();
int bytesWritten = 0; int bytesWritten = 0;
byte[] buffer = new byte[this.bufferSize]; byte[] buffer = new byte[this.bufferSize];
int bytesRead = -1; int bytesRead;
while (outputStream.isReady() && (bytesRead = input.read(buffer)) != -1) { while (outputStream.isReady() && (bytesRead = input.read(buffer)) != -1) {
outputStream.write(buffer, 0, bytesRead); outputStream.write(buffer, 0, bytesRead);
bytesWritten += bytesRead; bytesWritten += bytesRead;
@ -287,7 +287,7 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
} }
if (ready) { if (ready) {
int total = dataBuffer.readableByteCount(); int total = dataBuffer.readableByteCount();
int written = writeDataBuffer(dataBuffer); int written = writeToOutputStream(dataBuffer);
if (this.logger.isTraceEnabled()) { if (this.logger.isTraceEnabled()) {
this.logger.trace("written: " + written + " total: " + total); this.logger.trace("written: " + written + " total: " + total);

44
spring-web/src/main/java/org/springframework/http/server/reactive/TomcatHttpHandlerAdapter.java

@ -19,30 +19,29 @@ package org.springframework.http.server.reactive;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Map; import java.util.Map;
import javax.servlet.AsyncContext; import javax.servlet.AsyncContext;
import javax.servlet.ServletOutputStream; import javax.servlet.ServletOutputStream;
import javax.servlet.annotation.WebServlet; import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import org.apache.catalina.connector.CoyoteInputStream; import org.apache.catalina.connector.CoyoteInputStream;
import org.apache.catalina.connector.CoyoteOutputStream; import org.apache.catalina.connector.CoyoteOutputStream;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferFactory;
/** /**
* Adapt {@link HttpHandler} to an {@link HttpServlet} using Servlet Async * {@link ServletHttpHandlerAdapter} extension that uses Jetty APIs for reading
* support and Servlet 3.1 non-blocking I/O. Use Tomcat API for * from the request and writing to the response with {@link ByteBuffer}.
* reading/writing with ByteBuffer. *
*
* @author Violeta Georgieva * @author Violeta Georgieva
* @since 5.0 * @since 5.0
*/ */
@WebServlet(asyncSupported = true) @WebServlet(asyncSupported = true)
public class TomcatHttpHandlerAdapter extends ServletHttpHandlerAdapter { public class TomcatHttpHandlerAdapter extends ServletHttpHandlerAdapter {
public TomcatHttpHandlerAdapter(HttpHandler httpHandler) { public TomcatHttpHandlerAdapter(HttpHandler httpHandler) {
super(httpHandler); super(httpHandler);
} }
@ -53,35 +52,31 @@ public class TomcatHttpHandlerAdapter extends ServletHttpHandlerAdapter {
@Override @Override
protected ServerHttpRequest createServletServerHttpRequest( protected ServerHttpRequest createRequest(HttpServletRequest request, AsyncContext cxt) throws IOException {
HttpServletRequest request, AsyncContext asyncContext) throws IOException { return new TomcatServerHttpRequest(request, cxt, getDataBufferFactory(), getBufferSize());
return new TomcatServerHttpRequest(
request, asyncContext, getDataBufferFactory(), getBufferSize());
} }
@Override @Override
protected ServerHttpResponse createServletServerHttpResponse( protected ServerHttpResponse createResponse(HttpServletResponse response, AsyncContext cxt) throws IOException {
HttpServletResponse response, AsyncContext asyncContext) throws IOException { return new TomcatServerHttpResponse(response, cxt, getDataBufferFactory(), getBufferSize());
return new TomcatServerHttpResponse(
response, asyncContext, getDataBufferFactory(), getBufferSize());
} }
private final class TomcatServerHttpRequest extends ServletServerHttpRequest { private final class TomcatServerHttpRequest extends ServletServerHttpRequest {
public TomcatServerHttpRequest(HttpServletRequest request, AsyncContext asyncContext, public TomcatServerHttpRequest(HttpServletRequest request, AsyncContext context,
DataBufferFactory bufferFactory, int bufferSize) throws IOException { DataBufferFactory factory, int bufferSize) throws IOException {
super(request, asyncContext, bufferFactory, bufferSize);
super(request, context, factory, bufferSize);
} }
@Override @Override
protected DataBuffer readDataBuffer() throws IOException { protected DataBuffer readFromInputStream() throws IOException {
DataBuffer buffer = getDataBufferFactory().allocateBuffer(getBufferSize()); DataBuffer buffer = getDataBufferFactory().allocateBuffer(getBufferSize());
ByteBuffer byteBuffer = buffer.asByteBuffer(); ByteBuffer byteBuffer = buffer.asByteBuffer();
byteBuffer.limit(byteBuffer.capacity()); byteBuffer.limit(byteBuffer.capacity());
int read = ((CoyoteInputStream) getServletRequest().getInputStream()).read( int read = ((CoyoteInputStream) getServletRequest().getInputStream()).read(byteBuffer);
byteBuffer);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("read:" + read); logger.trace("read:" + read);
} }
@ -97,13 +92,14 @@ public class TomcatHttpHandlerAdapter extends ServletHttpHandlerAdapter {
private static final class TomcatServerHttpResponse extends ServletServerHttpResponse { private static final class TomcatServerHttpResponse extends ServletServerHttpResponse {
public TomcatServerHttpResponse(HttpServletResponse response, AsyncContext asyncContext, public TomcatServerHttpResponse(HttpServletResponse response, AsyncContext context,
DataBufferFactory bufferFactory, int bufferSize) throws IOException { DataBufferFactory factory, int bufferSize) throws IOException {
super(response, asyncContext, bufferFactory, bufferSize);
super(response, context, factory, bufferSize);
} }
@Override @Override
protected int writeDataBuffer(DataBuffer dataBuffer) throws IOException { protected int writeToOutputStream(DataBuffer dataBuffer) throws IOException {
ServletOutputStream outputStream = getServletResponse().getOutputStream(); ServletOutputStream outputStream = getServletResponse().getOutputStream();
ByteBuffer input = dataBuffer.asByteBuffer(); ByteBuffer input = dataBuffer.asByteBuffer();
int len = input.remaining(); int len = input.remaining();

Loading…
Cancel
Save