diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java index 51146b4ee95..321795ed8c7 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java @@ -78,7 +78,7 @@ public abstract class AbstractListenerReadPublisher implements Publisher { * @see ReadListener#onAllDataRead() * @see org.xnio.ChannelListener#handleEvent(Channel) */ - public final void onAllDataRead() { + public void onAllDataRead() { if (this.logger.isTraceEnabled()) { this.logger.trace(this.state + " onAllDataRead"); } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java index d7a1f750cb4..15eb1d4a68c 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java @@ -58,7 +58,7 @@ public class UndertowHttpHandlerAdapter extends HttpHandlerAdapterSupport @Override public void handleRequest(HttpServerExchange exchange) throws Exception { - ServerHttpRequest request = new UndertowServerHttpRequest(exchange, this.dataBufferFactory); + UndertowServerHttpRequest request = new UndertowServerHttpRequest(exchange, this.dataBufferFactory); ServerHttpResponse response = new UndertowServerHttpResponse(exchange, this.dataBufferFactory); getHttpHandler().handle(request, response).subscribe(new Subscriber() { diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java index 9065bae6a35..9ece475de54 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java @@ -21,6 +21,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; +import io.undertow.connector.ByteBufferPool; import io.undertow.connector.PooledByteBuffer; import io.undertow.server.HttpServerExchange; import io.undertow.server.handlers.Cookie; @@ -57,7 +58,7 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest { super(initUri(exchange), initHeaders(exchange)); this.exchange = exchange; this.body = new RequestBodyPublisher(exchange, dataBufferFactory); - this.body.registerListener(); + this.body.registerListener(exchange); } private static URI initUri(HttpServerExchange exchange) { @@ -106,6 +107,7 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest { return Flux.from(this.body); } + private static class RequestBodyPublisher extends AbstractListenerReadPublisher { private final ChannelListener readListener = @@ -118,17 +120,22 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest { private final DataBufferFactory dataBufferFactory; - private final PooledByteBuffer pooledByteBuffer; + private final ByteBufferPool byteBufferPool; + + private PooledByteBuffer pooledByteBuffer; public RequestBodyPublisher(HttpServerExchange exchange, DataBufferFactory dataBufferFactory) { this.requestChannel = exchange.getRequestChannel(); - this.pooledByteBuffer = - exchange.getConnection().getByteBufferPool().allocate(); + this.byteBufferPool = exchange.getConnection().getByteBufferPool(); this.dataBufferFactory = dataBufferFactory; } - private void registerListener() { + private void registerListener(HttpServerExchange exchange) { + exchange.addExchangeCompleteListener((ex, next) -> { + onAllDataRead(); + next.proceed(); + }); this.requestChannel.getReadSetter().set(this.readListener); this.requestChannel.getCloseSetter().set(this.closeListener); this.requestChannel.resumeReads(); @@ -141,6 +148,9 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest { @Override protected DataBuffer read() throws IOException { + if (this.pooledByteBuffer == null) { + this.pooledByteBuffer = this.byteBufferPool.allocate(); + } ByteBuffer byteBuffer = this.pooledByteBuffer.getBuffer(); int read = this.requestChannel.read(byteBuffer); if (logger.isTraceEnabled()) { @@ -157,6 +167,14 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest { return null; } + @Override + public void onAllDataRead() { + if (this.pooledByteBuffer != null && this.pooledByteBuffer.isOpen()) { + this.pooledByteBuffer.close(); + } + super.onAllDataRead(); + } + private class ReadListener implements ChannelListener { @Override