@ -17,8 +17,13 @@
package org.springframework.http.server.reactive ;
package org.springframework.http.server.reactive ;
import java.io.IOException ;
import java.io.IOException ;
import java.io.InputStream ;
import javax.servlet.AsyncContext ;
import javax.servlet.AsyncContext ;
import javax.servlet.ReadListener ;
import javax.servlet.ServletException ;
import javax.servlet.ServletException ;
import javax.servlet.ServletInputStream ;
import javax.servlet.ServletOutputStream ;
import javax.servlet.WriteListener ;
import javax.servlet.annotation.WebServlet ;
import javax.servlet.annotation.WebServlet ;
import javax.servlet.http.HttpServlet ;
import javax.servlet.http.HttpServlet ;
import javax.servlet.http.HttpServletRequest ;
import javax.servlet.http.HttpServletRequest ;
@ -28,7 +33,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory ;
import org.apache.commons.logging.LogFactory ;
import org.reactivestreams.Subscriber ;
import org.reactivestreams.Subscriber ;
import org.reactivestreams.Subscription ;
import org.reactivestreams.Subscription ;
import reactor.core.publisher.Mono ;
import reactor.core.util.BackpressureUtils ;
import org.springframework.core.io.buffer.DataBuffer ;
import org.springframework.core.io.buffer.DataBufferAllocator ;
import org.springframework.core.io.buffer.DataBufferAllocator ;
import org.springframework.core.io.buffer.DefaultDataBufferAllocator ;
import org.springframework.core.io.buffer.DefaultDataBufferAllocator ;
import org.springframework.http.HttpStatus ;
import org.springframework.http.HttpStatus ;
@ -77,13 +85,19 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
AsyncContext context = servletRequest . startAsync ( ) ;
AsyncContext context = servletRequest . startAsync ( ) ;
ServletAsyncContextSynchronizer synchronizer = new ServletAsyncContextSynchronizer ( context ) ;
ServletAsyncContextSynchronizer synchronizer = new ServletAsyncContextSynchronizer ( context ) ;
RequestBodyPublisher requestBody =
new RequestBodyPublisher ( synchronizer , allocator , bufferSize ) ;
requestBody . registerListener ( ) ;
ServletServerHttpRequest request =
ServletServerHttpRequest request =
new ServletServerHttpRequest ( synchronizer , this . allocator ,
new ServletServerHttpRequest ( servletRequest , requestBody ) ;
this . bufferSize ) ;
ResponseBodySubscriber responseBody =
new ResponseBodySubscriber ( synchronizer , bufferSize ) ;
responseBody . registerListener ( ) ;
ServletServerHttpResponse response =
ServletServerHttpResponse response =
new ServletServerHttpResponse ( synchronizer , this . bufferSize ,
new ServletServerHttpResponse ( servletResponse , allocator ,
this . allocator ) ;
publisher - > Mono
. from ( subscriber - > publisher . subscribe ( responseBody ) ) ) ;
HandlerResultSubscriber resultSubscriber =
HandlerResultSubscriber resultSubscriber =
new HandlerResultSubscriber ( synchronizer ) ;
new HandlerResultSubscriber ( synchronizer ) ;
@ -124,4 +138,238 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
this . synchronizer . complete ( ) ;
this . synchronizer . complete ( ) ;
}
}
}
}
private static class RequestBodyPublisher extends AbstractRequestBodyPublisher {
private static final Log logger = LogFactory . getLog ( RequestBodyPublisher . class ) ;
private final RequestBodyReadListener readListener =
new RequestBodyReadListener ( ) ;
private final ServletAsyncContextSynchronizer synchronizer ;
private final DataBufferAllocator allocator ;
private final byte [ ] buffer ;
public RequestBodyPublisher ( ServletAsyncContextSynchronizer synchronizer ,
DataBufferAllocator allocator , int bufferSize ) {
this . synchronizer = synchronizer ;
this . allocator = allocator ;
this . buffer = new byte [ bufferSize ] ;
}
public void registerListener ( ) throws IOException {
this . synchronizer . getRequest ( ) . getInputStream ( ) . setReadListener ( readListener ) ;
}
@Override
protected void noLongerStalled ( ) {
try {
readListener . onDataAvailable ( ) ;
}
catch ( IOException ex ) {
readListener . onError ( ex ) ;
}
}
private class RequestBodyReadListener implements ReadListener {
@Override
public void onDataAvailable ( ) throws IOException {
if ( isSubscriptionCancelled ( ) ) {
return ;
}
logger . trace ( "onDataAvailable" ) ;
ServletInputStream input = synchronizer . getRequest ( ) . getInputStream ( ) ;
while ( true ) {
if ( ! checkSubscriptionForDemand ( ) ) {
break ;
}
boolean ready = input . isReady ( ) ;
logger . trace (
"Input ready: " + ready + " finished: " + input . isFinished ( ) ) ;
if ( ! ready ) {
break ;
}
int read = input . read ( buffer ) ;
logger . trace ( "Input read:" + read ) ;
if ( read = = - 1 ) {
break ;
}
else if ( read > 0 ) {
DataBuffer dataBuffer = allocator . allocateBuffer ( read ) ;
dataBuffer . write ( buffer , 0 , read ) ;
publishOnNext ( dataBuffer ) ;
}
}
}
@Override
public void onAllDataRead ( ) throws IOException {
logger . trace ( "All data read" ) ;
synchronizer . readComplete ( ) ;
publishOnComplete ( ) ;
}
@Override
public void onError ( Throwable t ) {
logger . trace ( "RequestBodyReadListener Error" , t ) ;
synchronizer . readComplete ( ) ;
publishOnError ( t ) ;
}
}
}
private static class ResponseBodySubscriber implements Subscriber < DataBuffer > {
private static final Log logger = LogFactory . getLog ( ResponseBodySubscriber . class ) ;
private final ResponseBodyWriteListener writeListener =
new ResponseBodyWriteListener ( ) ;
private final ServletAsyncContextSynchronizer synchronizer ;
private final int bufferSize ;
private volatile DataBuffer dataBuffer ;
private volatile boolean completed = false ;
private Subscription subscription ;
public ResponseBodySubscriber ( ServletAsyncContextSynchronizer synchronizer ,
int bufferSize ) {
this . synchronizer = synchronizer ;
this . bufferSize = bufferSize ;
}
public void registerListener ( ) throws IOException {
synchronizer . getResponse ( ) . getOutputStream ( ) . setWriteListener ( writeListener ) ;
}
@Override
public void onSubscribe ( Subscription subscription ) {
logger . trace ( "onSubscribe. Subscription: " + subscription ) ;
if ( BackpressureUtils . validate ( this . subscription , subscription ) ) {
this . subscription = subscription ;
this . subscription . request ( 1 ) ;
}
}
@Override
public void onNext ( DataBuffer dataBuffer ) {
Assert . state ( this . dataBuffer = = null ) ;
logger . trace ( "onNext. buffer: " + dataBuffer ) ;
this . dataBuffer = dataBuffer ;
try {
this . writeListener . onWritePossible ( ) ;
}
catch ( IOException e ) {
onError ( e ) ;
}
}
@Override
public void onError ( Throwable t ) {
logger . error ( "onError" , t ) ;
HttpServletResponse response =
( HttpServletResponse ) this . synchronizer . getResponse ( ) ;
response . setStatus ( HttpStatus . INTERNAL_SERVER_ERROR . value ( ) ) ;
this . synchronizer . complete ( ) ;
}
@Override
public void onComplete ( ) {
logger . trace ( "onComplete. buffer: " + this . dataBuffer ) ;
this . completed = true ;
if ( this . dataBuffer ! = null ) {
try {
this . writeListener . onWritePossible ( ) ;
}
catch ( IOException ex ) {
onError ( ex ) ;
}
}
if ( this . dataBuffer = = null ) {
this . synchronizer . writeComplete ( ) ;
}
}
private class ResponseBodyWriteListener implements WriteListener {
@Override
public void onWritePossible ( ) throws IOException {
logger . trace ( "onWritePossible" ) ;
ServletOutputStream output = synchronizer . getResponse ( ) . getOutputStream ( ) ;
boolean ready = output . isReady ( ) ;
logger . trace ( "ready: " + ready + " buffer: " + dataBuffer ) ;
if ( ready ) {
if ( dataBuffer ! = null ) {
int total = dataBuffer . readableByteCount ( ) ;
int written = writeDataBuffer ( ) ;
logger . trace ( "written: " + written + " total: " + total ) ;
if ( written = = total ) {
releaseBuffer ( ) ;
if ( ! completed ) {
subscription . request ( 1 ) ;
}
else {
synchronizer . writeComplete ( ) ;
}
}
}
else if ( subscription ! = null ) {
subscription . request ( 1 ) ;
}
}
}
private int writeDataBuffer ( ) throws IOException {
InputStream input = dataBuffer . asInputStream ( ) ;
ServletOutputStream output = synchronizer . getResponse ( ) . getOutputStream ( ) ;
int bytesWritten = 0 ;
byte [ ] buffer = new byte [ bufferSize ] ;
int bytesRead = - 1 ;
while ( output . isReady ( ) & & ( bytesRead = input . read ( buffer ) ) ! = - 1 ) {
output . write ( buffer , 0 , bytesRead ) ;
bytesWritten + = bytesRead ;
}
return bytesWritten ;
}
private void releaseBuffer ( ) {
// TODO: call PooledDataBuffer.release() when we it is introduced
dataBuffer = null ;
}
@Override
public void onError ( Throwable ex ) {
logger . error ( "ResponseBodyWriteListener error" , ex ) ;
}
}
}
}
}