From 8d786e8bba86de45d486c0dd858522ca4190f0d1 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Wed, 7 Dec 2016 15:33:14 +0200 Subject: [PATCH 1/2] Refactor the usage of Undertow ByteBufferPool - lazy allocate the PooledByteBuffer, only if there is a request body for reading - close the PooledByteBuffer once the request finishes --- .../AbstractListenerReadPublisher.java | 2 +- .../reactive/UndertowHttpHandlerAdapter.java | 4 +++- .../reactive/UndertowServerHttpRequest.java | 23 ++++++++++++++++--- 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java index 51146b4ee95..321795ed8c7 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java @@ -78,7 +78,7 @@ public abstract class AbstractListenerReadPublisher implements Publisher { * @see ReadListener#onAllDataRead() * @see org.xnio.ChannelListener#handleEvent(Channel) */ - public final void onAllDataRead() { + public void onAllDataRead() { if (this.logger.isTraceEnabled()) { this.logger.trace(this.state + " onAllDataRead"); } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java index d7a1f750cb4..273f9549f6f 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java @@ -58,7 +58,7 @@ public class UndertowHttpHandlerAdapter extends HttpHandlerAdapterSupport @Override public void handleRequest(HttpServerExchange exchange) throws Exception { - ServerHttpRequest request = new UndertowServerHttpRequest(exchange, this.dataBufferFactory); + UndertowServerHttpRequest request = new UndertowServerHttpRequest(exchange, this.dataBufferFactory); ServerHttpResponse response = new UndertowServerHttpResponse(exchange, this.dataBufferFactory); getHttpHandler().handle(request, response).subscribe(new Subscriber() { @@ -76,11 +76,13 @@ public class UndertowHttpHandlerAdapter extends HttpHandlerAdapterSupport if (!exchange.isResponseStarted() && exchange.getStatusCode() <= 500) { exchange.setStatusCode(500); } + request.close(); exchange.endExchange(); } @Override public void onComplete() { logger.debug("Successfully completed request"); + request.close(); exchange.endExchange(); } }); diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java index 9065bae6a35..e6161230cbd 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java @@ -21,6 +21,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; +import io.undertow.connector.ByteBufferPool; import io.undertow.connector.PooledByteBuffer; import io.undertow.server.HttpServerExchange; import io.undertow.server.handlers.Cookie; @@ -106,6 +107,10 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest { return Flux.from(this.body); } + void close() { + this.body.onAllDataRead(); + } + private static class RequestBodyPublisher extends AbstractListenerReadPublisher { private final ChannelListener readListener = @@ -118,13 +123,14 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest { private final DataBufferFactory dataBufferFactory; - private final PooledByteBuffer pooledByteBuffer; + private final ByteBufferPool byteBufferPool; + + private PooledByteBuffer pooledByteBuffer; public RequestBodyPublisher(HttpServerExchange exchange, DataBufferFactory dataBufferFactory) { this.requestChannel = exchange.getRequestChannel(); - this.pooledByteBuffer = - exchange.getConnection().getByteBufferPool().allocate(); + this.byteBufferPool = exchange.getConnection().getByteBufferPool(); this.dataBufferFactory = dataBufferFactory; } @@ -141,6 +147,9 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest { @Override protected DataBuffer read() throws IOException { + if (this.pooledByteBuffer == null) { + this.pooledByteBuffer = this.byteBufferPool.allocate(); + } ByteBuffer byteBuffer = this.pooledByteBuffer.getBuffer(); int read = this.requestChannel.read(byteBuffer); if (logger.isTraceEnabled()) { @@ -157,6 +166,14 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest { return null; } + @Override + public void onAllDataRead() { + if (this.pooledByteBuffer != null && this.pooledByteBuffer.isOpen()) { + this.pooledByteBuffer.close(); + } + super.onAllDataRead(); + } + private class ReadListener implements ChannelListener { @Override From d3e05296e175d13533f4abbb43673ecc1a3582c8 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 14 Dec 2016 13:48:40 -0500 Subject: [PATCH 2/2] Replace close method with Undertow exchange listener The Undertow HttpServerExchange has a complete listener which we can use instead of the close() method UndertowServerHttpRequest. --- .../server/reactive/UndertowHttpHandlerAdapter.java | 2 -- .../server/reactive/UndertowServerHttpRequest.java | 11 ++++++----- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java index 273f9549f6f..15eb1d4a68c 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java @@ -76,13 +76,11 @@ public class UndertowHttpHandlerAdapter extends HttpHandlerAdapterSupport if (!exchange.isResponseStarted() && exchange.getStatusCode() <= 500) { exchange.setStatusCode(500); } - request.close(); exchange.endExchange(); } @Override public void onComplete() { logger.debug("Successfully completed request"); - request.close(); exchange.endExchange(); } }); diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java index e6161230cbd..9ece475de54 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java @@ -58,7 +58,7 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest { super(initUri(exchange), initHeaders(exchange)); this.exchange = exchange; this.body = new RequestBodyPublisher(exchange, dataBufferFactory); - this.body.registerListener(); + this.body.registerListener(exchange); } private static URI initUri(HttpServerExchange exchange) { @@ -107,9 +107,6 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest { return Flux.from(this.body); } - void close() { - this.body.onAllDataRead(); - } private static class RequestBodyPublisher extends AbstractListenerReadPublisher { @@ -134,7 +131,11 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest { this.dataBufferFactory = dataBufferFactory; } - private void registerListener() { + private void registerListener(HttpServerExchange exchange) { + exchange.addExchangeCompleteListener((ex, next) -> { + onAllDataRead(); + next.proceed(); + }); this.requestChannel.getReadSetter().set(this.readListener); this.requestChannel.getCloseSetter().set(this.closeListener); this.requestChannel.resumeReads();