From 7e9437738272a31e1d764972291de42936194ab1 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Thu, 29 Sep 2016 21:04:27 +0300 Subject: [PATCH 1/2] Handle async operation events Problem: The following exception is observed on an async timeout: "java.lang.IllegalStateException: It is invalid to call isReady() when the response has not been put into non-blocking mode" Current Implementation: The async operation events sent by the web container are not propagated to the internal implementation. When timeout/error happens and if the application does not complete the async operation, the web container will complete it. At that point if the application tries to read/write, the operation will fail with an exception (above) that there is not async operation started. Proposed Solution: On async timeout or error, make calls to: - AbstractRequestBodyPublisher.onError, - AbstractResponseBodyProcessor.onError, - AbstractResponseBodyFlushProcessor.onError As a result of these calls the async operation will be completed and no more invocations of read/write will be made. --- .../AbstractResponseBodyFlushProcessor.java | 2 +- .../reactive/ServletHttpHandlerAdapter.java | 39 +++++++++++++++++++ .../reactive/ServletServerHttpRequest.java | 6 +++ .../reactive/ServletServerHttpResponse.java | 17 +++++++- 4 files changed, 62 insertions(+), 2 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java index fdfc577ef89..57510a9c094 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java @@ -120,7 +120,7 @@ abstract class AbstractResponseBodyFlushProcessor implements Processor, Void> createBodyFlushProcessor() { - Processor, Void> processor = new ResponseBodyFlushProcessor(); + ResponseBodyFlushProcessor processor = new ResponseBodyFlushProcessor(); registerListener(); + bodyFlushProcessor = processor; return processor; } @@ -152,6 +155,18 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons } + void onError(Throwable t) { + if (bodyFlushProcessor != null) { + bodyFlushProcessor.cancel(); + bodyFlushProcessor.onError(t); + } + if (bodyProcessor != null) { + bodyProcessor.cancel(); + bodyProcessor.onError(t); + } + } + + private class ResponseBodyProcessor extends AbstractResponseBodyProcessor { private final ServletOutputStream outputStream; From c1e5e3a87e4f0f8296a9da7907066b9ba5733e0d Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 13 Oct 2016 16:31:53 -0400 Subject: [PATCH 2/2] Polish AsyncListener in ServletHttpHandlerAdapter --- .../reactive/ServletHttpHandlerAdapter.java | 39 +++++++++++-------- .../reactive/ServletServerHttpRequest.java | 5 ++- .../reactive/ServletServerHttpResponse.java | 16 ++++---- 3 files changed, 34 insertions(+), 26 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java index b965a2f0f36..05e87092b29 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java @@ -91,7 +91,7 @@ public class ServletHttpHandlerAdapter extends HttpServlet { servletRequest, this.dataBufferFactory, this.bufferSize); ServletServerHttpResponse response = new ServletServerHttpResponse( servletResponse, this.dataBufferFactory, this.bufferSize); - asyncContext.addListener(new HandlerAsyncEvent(request, response)); + asyncContext.addListener(new ErrorHandlingAsyncListener(request, response)); HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber(asyncContext); this.handler.handle(request, response).subscribe(resultSubscriber); } @@ -130,40 +130,47 @@ public class ServletHttpHandlerAdapter extends HttpServlet { } } - private static final class HandlerAsyncEvent implements AsyncListener { + + private static final class ErrorHandlingAsyncListener implements AsyncListener { + private final ServletServerHttpRequest request; + private final ServletServerHttpResponse response; - public HandlerAsyncEvent(ServletServerHttpRequest request, + + public ErrorHandlingAsyncListener(ServletServerHttpRequest request, ServletServerHttpResponse response) { + this.request = request; this.response = response; } + @Override - public void onComplete(AsyncEvent event) throws IOException { - // no op + public void onTimeout(AsyncEvent event) { + Throwable ex = event.getThrowable(); + if (ex == null) { + ex = new IllegalStateException("Async operation timeout."); + } + this.request.handleAsyncListenerError(ex); + this.response.handleAsyncListenerError(ex); } @Override - public void onTimeout(AsyncEvent event) throws IOException { - Throwable t = event.getThrowable(); - if (t == null) { - t = new IllegalStateException("Async operation timeout."); - } - request.onError(t); - response.onError(t); + public void onError(AsyncEvent event) { + this.request.handleAsyncListenerError(event.getThrowable()); + this.response.handleAsyncListenerError(event.getThrowable()); } @Override - public void onError(AsyncEvent event) throws IOException { - request.onError(event.getThrowable()); - response.onError(event.getThrowable()); + public void onStartAsync(AsyncEvent event) { + // no op } @Override - public void onStartAsync(AsyncEvent event) throws IOException { + public void onComplete(AsyncEvent event) { // no op } } + } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java index 65a22d078aa..acbf881b103 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java @@ -170,9 +170,10 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest { } } - void onError(Throwable t) { + /** Handle a timeout/error callback from the Servlet container */ + void handleAsyncListenerError(Throwable ex) { if (this.bodyPublisher != null) { - this.bodyPublisher.onError(t); + this.bodyPublisher.onError(ex); } } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java index 7233f417756..2ff1ff99ec8 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java @@ -154,15 +154,15 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons } } - - void onError(Throwable t) { - if (bodyFlushProcessor != null) { - bodyFlushProcessor.cancel(); - bodyFlushProcessor.onError(t); + /** Handle a timeout/error callback from the Servlet container */ + void handleAsyncListenerError(Throwable ex) { + if (this.bodyFlushProcessor != null) { + this.bodyFlushProcessor.cancel(); + this.bodyFlushProcessor.onError(ex); } - if (bodyProcessor != null) { - bodyProcessor.cancel(); - bodyProcessor.onError(t); + if (this.bodyProcessor != null) { + this.bodyProcessor.cancel(); + this.bodyProcessor.onError(ex); } }