From 7ee821d3d153c7d528cc29b2ebf295c3877a5dff Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 26 Apr 2012 17:38:31 -0400 Subject: [PATCH] Add ability to handle a timeout to DeferredResult When a controller returns a DeferredResult, the underlying async request will eventually time out. Until now the default behavior was to send a 503 (SERVICE_UNAVAILABLE). However, this is not desirable in all cases. For example if waiting on an event, a timeout simply means there is no new information to send. To handle those cases a DeferredResult now accespts a timeout result Object in its constructor. If the timeout occurs before the DeferredResult is set, the timeout result provided to the constructor is used instead. Issue: SPR-8617 --- .../request/async/AsyncExecutionChain.java | 38 +++-- .../async/AsyncExecutionChainRunnable.java | 1 - .../request/async/AsyncWebRequest.java | 6 + .../context/request/async/DeferredResult.java | 133 +++++++++++++----- .../request/async/NoOpAsyncWebRequest.java | 3 + .../async/StandardServletAsyncWebRequest.java | 13 +- .../async/AsyncExecutionChainTests.java | 10 +- .../request/async/DeferredResultTests.java | 119 ++++++++++++++++ 8 files changed, 270 insertions(+), 53 deletions(-) create mode 100644 spring-web/src/test/java/org/springframework/web/context/request/async/DeferredResultTests.java diff --git a/spring-web/src/main/java/org/springframework/web/context/request/async/AsyncExecutionChain.java b/spring-web/src/main/java/org/springframework/web/context/request/async/AsyncExecutionChain.java index 268b337801c..489d2c67313 100644 --- a/spring-web/src/main/java/org/springframework/web/context/request/async/AsyncExecutionChain.java +++ b/spring-web/src/main/java/org/springframework/web/context/request/async/AsyncExecutionChain.java @@ -143,7 +143,7 @@ public final class AsyncExecutionChain { } private Callable buildChain() { - Assert.state(this.callable != null, "The callable field is required to complete the chain"); + Assert.state(this.callable != null, "The last callable is required to build the async chain"); this.delegatingCallables.add(new StaleAsyncRequestCheckingCallable(asyncWebRequest)); Callable result = this.callable; for (int i = this.delegatingCallables.size() - 1; i >= 0; i--) { @@ -165,25 +165,39 @@ public final class AsyncExecutionChain { * the threading model, i.e. whether a TaskExecutor is used. * @see DeferredResult */ - public void startDeferredResultProcessing(DeferredResult deferredResult) { - Assert.notNull(deferredResult, "A DeferredResult is required"); + public void startDeferredResultProcessing(final DeferredResult deferredResult) { + Assert.notNull(deferredResult, "DeferredResult is required"); startAsync(); - deferredResult.setValueProcessor(new DeferredResultHandler() { - public void handle(Object value) { + deferredResult.init(new DeferredResultHandler() { + public void handle(Object result) { if (asyncWebRequest.isAsyncCompleted()) { throw new StaleAsyncWebRequestException("Async request processing already completed"); } - setCallable(getSimpleCallable(value)); + setCallable(new PassThroughCallable(result)); new AsyncExecutionChainRunnable(asyncWebRequest, buildChain()).run(); } }); + if (deferredResult.canHandleTimeout()) { + this.asyncWebRequest.setTimeoutHandler(new Runnable() { + public void run() { + deferredResult.handleTimeout(); + } + }); + } } - private Callable getSimpleCallable(final Object value) { - return new Callable() { - public Object call() throws Exception { - return value; - } - }; + + private static class PassThroughCallable implements Callable { + + private final Object value; + + public PassThroughCallable(Object value) { + this.value = value; + } + + public Object call() throws Exception { + return this.value; + } } + } diff --git a/spring-web/src/main/java/org/springframework/web/context/request/async/AsyncExecutionChainRunnable.java b/spring-web/src/main/java/org/springframework/web/context/request/async/AsyncExecutionChainRunnable.java index 9a326f0e1f5..03e86b24261 100644 --- a/spring-web/src/main/java/org/springframework/web/context/request/async/AsyncExecutionChainRunnable.java +++ b/spring-web/src/main/java/org/springframework/web/context/request/async/AsyncExecutionChainRunnable.java @@ -49,7 +49,6 @@ public class AsyncExecutionChainRunnable implements Runnable { public AsyncExecutionChainRunnable(AsyncWebRequest asyncWebRequest, Callable callable) { Assert.notNull(asyncWebRequest, "An AsyncWebRequest is required"); Assert.notNull(callable, "A Callable is required"); - Assert.state(asyncWebRequest.isAsyncStarted(), "Not an async request"); this.asyncWebRequest = asyncWebRequest; this.callable = callable; } diff --git a/spring-web/src/main/java/org/springframework/web/context/request/async/AsyncWebRequest.java b/spring-web/src/main/java/org/springframework/web/context/request/async/AsyncWebRequest.java index c897281209e..51e6a84a7c8 100644 --- a/spring-web/src/main/java/org/springframework/web/context/request/async/AsyncWebRequest.java +++ b/spring-web/src/main/java/org/springframework/web/context/request/async/AsyncWebRequest.java @@ -35,6 +35,12 @@ public interface AsyncWebRequest extends NativeWebRequest { */ void setTimeout(Long timeout); + /** + * Invoked on a timeout to complete the response instead of the default + * behavior that sets the status to 503 (SERVICE_UNAVAILABLE). + */ + void setTimeoutHandler(Runnable runnable); + /** * Mark the start of async request processing for example ensuring the * request remains open in order to be completed in a separate thread. diff --git a/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResult.java b/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResult.java index 16c5843574b..bf966a84182 100644 --- a/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResult.java +++ b/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResult.java @@ -16,72 +16,135 @@ package org.springframework.web.context.request.async; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.util.Assert; /** - * DeferredResult provides an alternative to using a Callable to complete async - * request processing. Whereas with a Callable the framework manages a thread on - * behalf of the application through an {@link AsyncTaskExecutor}, with a - * DeferredResult the application can produce a value using a thread of its choice. + * DeferredResult provides an alternative to using a Callable for async request + * processing. With a Callable the framework manages a thread on behalf of the + * application through an {@link AsyncTaskExecutor}. With a DeferredResult the + * application sets the result in a thread of its choice. * - *

The following sequence describes typical use of a DeferredResult: + *

The following sequence describes the intended use scenario: *

    - *
  1. Application method (e.g. controller method) returns a DeferredResult instance - *
  2. The framework completes initialization of the returned DeferredResult in the same thread - *
  3. The application calls {@link DeferredResult#set(Object)} from another thread - *
  4. The framework completes request processing in the thread in which it is invoked + *
  5. thread-1: framework calls application method + *
  6. thread-1: application method returns a DeferredResult + *
  7. thread-1: framework initializes DeferredResult + *
  8. thread-2: application calls {@link #set(Object)} + *
  9. thread-2: framework completes async processing with given result *
* - *

Note: {@link DeferredResult#set(Object)} will block if - * called before the DeferredResult is fully initialized (by the framework). - * Application code should never create a DeferredResult and set it immediately: - * - *

- * DeferredResult value = new DeferredResult();
- * value.set(1);  // blocks
- * 
+ *

If the application calls {@link #set(Object)} in thread-2 before the + * DeferredResult is initialized by the framework in thread-1, then thread-2 + * will block and wait for the initialization to complete. Therefore an + * application should never create and set the DeferredResult in the same + * thread because the initialization will never complete.

* * @author Rossen Stoyanchev * @since 3.2 */ public final class DeferredResult { - private final AtomicReference value = new AtomicReference(); + private final static Object TIMEOUT_RESULT_NONE = new Object(); + + private Object result; + + private final Object timeoutResult; + + private DeferredResultHandler resultHandler; + + private final CountDownLatch readySignal = new CountDownLatch(1); + + private final ReentrantLock timeoutLock = new ReentrantLock(); + + /** + * Create a new instance. + */ + public DeferredResult() { + this(TIMEOUT_RESULT_NONE); + } + + /** + * Create a new instance and also provide a default result to use if a + * timeout occurs before {@link #set(Object)} is called. + */ + public DeferredResult(Object timeoutResult) { + this.timeoutResult = timeoutResult; + } - private final BlockingQueue handlers = new ArrayBlockingQueue(1); + boolean canHandleTimeout() { + return this.timeoutResult != TIMEOUT_RESULT_NONE; + } /** - * Provide a value to use to complete async request processing. - * This method should be invoked only once and usually from a separate - * thread to allow the framework to fully initialize the created - * DeferrredValue. See the class level documentation for more details. + * Complete async processing with the given result. If the DeferredResult is + * not yet fully initialized, this method will block and wait for that to + * occur before proceeding. See the class level javadoc for more details. * * @throws StaleAsyncWebRequestException if the underlying async request - * ended due to a timeout or an error before the value was set. + * has already timed out or ended due to a network error. */ - public void set(Object value) throws StaleAsyncWebRequestException { - Assert.isNull(this.value.get(), "Value already set"); - this.value.set(value); + public void set(Object result) throws StaleAsyncWebRequestException { + if (this.timeoutLock.tryLock() && (this.result != this.timeoutResult)) { + try { + handle(result); + } + finally { + this.timeoutLock.unlock(); + } + } + else { + // A timeout is in progress + throw new StaleAsyncWebRequestException("Async request already timed out"); + } + } + + /** + * Invoked to complete async processing when a timeout occurs before + * {@link #set(Object)} is called. Or if {@link #set(Object)} is already in + * progress, this method blocks, waits for it to complete, and then returns. + */ + void handleTimeout() { + Assert.state(canHandleTimeout(), "Can't handle timeout"); + this.timeoutLock.lock(); + try { + if (this.result == null) { + handle(this.timeoutResult); + } + } + finally { + this.timeoutLock.unlock(); + } + } + + private void handle(Object result) throws StaleAsyncWebRequestException { + Assert.isNull(this.result, "A deferred result can be set once only"); + this.result = result; try { - this.handlers.take().handle(value); + this.readySignal.await(10, TimeUnit.SECONDS); } catch (InterruptedException e) { - throw new IllegalStateException("Failed to process deferred return value: " + value, e); + throw new IllegalStateException( + "Gave up on waiting for DeferredResult to be initialized. " + + "Are you perhaps creating and setting a DeferredResult in the same thread? " + + "The DeferredResult must be fully initialized before you can set it. " + + "See the class javadoc for more details"); } + this.resultHandler.handle(result); } - void setValueProcessor(DeferredResultHandler handler) { - this.handlers.add(handler); + void init(DeferredResultHandler handler) { + this.resultHandler = handler; + this.readySignal.countDown(); } /** - * Puts the set value through processing wiht the async execution chain. + * Completes processing when {@link DeferredResult#set(Object)} is called. */ interface DeferredResultHandler { diff --git a/spring-web/src/main/java/org/springframework/web/context/request/async/NoOpAsyncWebRequest.java b/spring-web/src/main/java/org/springframework/web/context/request/async/NoOpAsyncWebRequest.java index d51bbd67ce3..fd425bc1389 100644 --- a/spring-web/src/main/java/org/springframework/web/context/request/async/NoOpAsyncWebRequest.java +++ b/spring-web/src/main/java/org/springframework/web/context/request/async/NoOpAsyncWebRequest.java @@ -39,6 +39,9 @@ public class NoOpAsyncWebRequest extends ServletWebRequest implements AsyncWebRe public void setTimeout(Long timeout) { } + public void setTimeoutHandler(Runnable runnable) { + } + public boolean isAsyncStarted() { return false; } diff --git a/spring-web/src/main/java/org/springframework/web/context/request/async/StandardServletAsyncWebRequest.java b/spring-web/src/main/java/org/springframework/web/context/request/async/StandardServletAsyncWebRequest.java index 0c92940d767..3558ae60442 100644 --- a/spring-web/src/main/java/org/springframework/web/context/request/async/StandardServletAsyncWebRequest.java +++ b/spring-web/src/main/java/org/springframework/web/context/request/async/StandardServletAsyncWebRequest.java @@ -48,6 +48,8 @@ public class StandardServletAsyncWebRequest extends ServletWebRequest implements private AtomicBoolean asyncCompleted = new AtomicBoolean(false); + private Runnable timeoutHandler; + public StandardServletAsyncWebRequest(HttpServletRequest request, HttpServletResponse response) { super(request, response); } @@ -64,6 +66,10 @@ public class StandardServletAsyncWebRequest extends ServletWebRequest implements return this.asyncCompleted.get(); } + public void setTimeoutHandler(Runnable timeoutHandler) { + this.timeoutHandler = timeoutHandler; + } + public void startAsync() { Assert.state(getRequest().isAsyncSupported(), "Async support must be enabled on a servlet and for all filters involved " + @@ -111,8 +117,13 @@ public class StandardServletAsyncWebRequest extends ServletWebRequest implements // --------------------------------------------------------------------- public void onTimeout(AsyncEvent event) throws IOException { + if (this.timeoutHandler == null) { + getResponse().sendError(HttpStatus.SERVICE_UNAVAILABLE.value()); + } + else { + this.timeoutHandler.run(); + } completeInternal(); - getResponse().sendError(HttpStatus.SERVICE_UNAVAILABLE.value()); } public void onError(AsyncEvent event) throws IOException { diff --git a/spring-web/src/test/java/org/springframework/web/context/request/async/AsyncExecutionChainTests.java b/spring-web/src/test/java/org/springframework/web/context/request/async/AsyncExecutionChainTests.java index c411d2f8082..0857ff9df0d 100644 --- a/spring-web/src/test/java/org/springframework/web/context/request/async/AsyncExecutionChainTests.java +++ b/spring-web/src/test/java/org/springframework/web/context/request/async/AsyncExecutionChainTests.java @@ -123,7 +123,7 @@ public class AsyncExecutionChainTests { fail("Expected exception"); } catch (IllegalStateException ex) { - assertThat(ex.getMessage(), containsString("The callable field is required")); + assertThat(ex.getMessage(), containsString("last callable is required")); } } @@ -171,7 +171,7 @@ public class AsyncExecutionChainTests { fail("Expected exception"); } catch (IllegalArgumentException ex) { - assertThat(ex.getMessage(), containsString("A DeferredResult is required")); + assertThat(ex.getMessage(), containsString("DeferredResult is required")); } } @@ -186,6 +186,10 @@ public class AsyncExecutionChainTests { super(request, response); } + public void setTimeout(Long timeout) { } + + public void setTimeoutHandler(Runnable runnable) { } + public void startAsync() { this.asyncStarted = true; } @@ -194,8 +198,6 @@ public class AsyncExecutionChainTests { return this.asyncStarted; } - public void setTimeout(Long timeout) { } - public void complete() { this.asyncStarted = false; this.asyncCompleted = true; diff --git a/spring-web/src/test/java/org/springframework/web/context/request/async/DeferredResultTests.java b/spring-web/src/test/java/org/springframework/web/context/request/async/DeferredResultTests.java new file mode 100644 index 00000000000..fc91196aad7 --- /dev/null +++ b/spring-web/src/test/java/org/springframework/web/context/request/async/DeferredResultTests.java @@ -0,0 +1,119 @@ +/* + * Copyright 2002-2012 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.context.request.async; + +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.reset; +import static org.easymock.EasyMock.verify; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import org.junit.Test; +import org.springframework.web.context.request.async.DeferredResult.DeferredResultHandler; + +/** + * DeferredResult tests. + * + * @author Rossen Stoyanchev + */ +public class DeferredResultTests { + + @Test + public void canHandleTimeout() { + assertFalse(new DeferredResult().canHandleTimeout()); + assertTrue(new DeferredResult("foo").canHandleTimeout()); + } + + @Test + public void set() { + DeferredResultHandler resultHandler = createMock(DeferredResultHandler.class); + DeferredResult deferredResult = new DeferredResult(); + deferredResult.init(resultHandler); + + resultHandler.handle("foo"); + replay(resultHandler); + + deferredResult.set("foo"); + + verify(resultHandler); + } + + @Test + public void handleTimeout() { + DeferredResultHandler resultHandler = createMock(DeferredResultHandler.class); + DeferredResult deferredResult = new DeferredResult("foo"); + deferredResult.init(resultHandler); + + resultHandler.handle("foo"); + replay(resultHandler); + + deferredResult.handleTimeout(); + + verify(resultHandler); + } + + @Test(expected=IllegalStateException.class) + public void handleTimeout_timeoutResultNone() { + new DeferredResult().handleTimeout(); + } + + @Test + public void setAfterHandleTimeout() { + DeferredResultHandler resultHandler = createMock(DeferredResultHandler.class); + DeferredResult deferredResult = new DeferredResult("foo"); + deferredResult.init(resultHandler); + + resultHandler.handle("foo"); + replay(resultHandler); + + deferredResult.handleTimeout(); + + verify(resultHandler); + + try { + deferredResult.set("foo"); + fail("Expected exception"); + } + catch (StaleAsyncWebRequestException ex) { + // expected + } + } + + @Test + public void setBeforeHandleTimeout() { + DeferredResultHandler resultHandler = createMock(DeferredResultHandler.class); + DeferredResult deferredResult = new DeferredResult("foo"); + deferredResult.init(resultHandler); + + resultHandler.handle("foo"); + replay(resultHandler); + + deferredResult.set("foo"); + + verify(resultHandler); + + reset(resultHandler); + replay(resultHandler); + + deferredResult.handleTimeout(); + + verify(resultHandler); + } + +}