From 1e62ad8665bb0f4138e140974108d693a20ee287 Mon Sep 17 00:00:00 2001 From: Rob Winch Date: Thu, 29 Nov 2012 11:55:53 -0600 Subject: [PATCH] Add beforeConcurrentHandling support Previously CallableProcessingInterceptor and DeferredResultProcessingInterceptor did not have support for capturing the state of the original Thread just prior to processing. This made it difficult to transfer the state of one Thread (i.e. ThreadLocal) to the Thread used to process the Callable. This commit adds a new method to CallableProcessingInterceptor and DeferredResultProcessingInterceptor named beforeConcurrentHandling which will be invoked on the original Thread used to submit the Callable or DeferredResult. This means the state of the original Thread can be captured in beforeConcurrentHandling and transfered to the new Thread in preProcess. Issue: SPR-10052 --- .../async/CallableInterceptorChain.java | 7 ++ .../async/CallableProcessingInterceptor.java | 19 +++++ .../CallableProcessingInterceptorAdapter.java | 7 ++ .../async/DeferredResultInterceptorChain.java | 6 ++ .../DeferredResultProcessingInterceptor.java | 12 +++ ...redResultProcessingInterceptorAdapter.java | 7 ++ .../request/async/WebAsyncManager.java | 13 +++- .../request/async/WebAsyncManagerTests.java | 75 ++++++++++++++++++- .../async/WebAsyncManagerTimeoutTests.java | 4 + 9 files changed, 143 insertions(+), 7 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/web/context/request/async/CallableInterceptorChain.java b/spring-web/src/main/java/org/springframework/web/context/request/async/CallableInterceptorChain.java index e0e5e6fb0c9..ba3885330d0 100644 --- a/spring-web/src/main/java/org/springframework/web/context/request/async/CallableInterceptorChain.java +++ b/spring-web/src/main/java/org/springframework/web/context/request/async/CallableInterceptorChain.java @@ -26,6 +26,7 @@ import org.springframework.web.context.request.NativeWebRequest; * Assists with the invocation of {@link CallableProcessingInterceptor}'s. * * @author Rossen Stoyanchev + * @author Rob Winch * @since 3.2 */ class CallableInterceptorChain { @@ -41,6 +42,12 @@ class CallableInterceptorChain { this.interceptors = interceptors; } + public void applyBeforeConcurrentHandling(NativeWebRequest request, Callable task) throws Exception { + for (CallableProcessingInterceptor interceptor : this.interceptors) { + interceptor.beforeConcurrentHandling(request, task); + } + } + public void applyPreProcess(NativeWebRequest request, Callable task) throws Exception { for (CallableProcessingInterceptor interceptor : this.interceptors) { interceptor.preProcess(request, task); diff --git a/spring-web/src/main/java/org/springframework/web/context/request/async/CallableProcessingInterceptor.java b/spring-web/src/main/java/org/springframework/web/context/request/async/CallableProcessingInterceptor.java index f727b8f8c83..cc7f15e113b 100644 --- a/spring-web/src/main/java/org/springframework/web/context/request/async/CallableProcessingInterceptor.java +++ b/spring-web/src/main/java/org/springframework/web/context/request/async/CallableProcessingInterceptor.java @@ -39,6 +39,7 @@ import org.springframework.web.context.request.NativeWebRequest; * can select a value to be used to resume processing. * * @author Rossen Stoyanchev + * @author Rob Winch * @since 3.2 */ public interface CallableProcessingInterceptor { @@ -47,6 +48,24 @@ public interface CallableProcessingInterceptor { static final Object RESPONSE_HANDLED = new Object(); + /** + * Invoked before the start of concurrent handling in the original + * thread in which the {@code Callable} is submitted for concurrent handling. + * + *

+ * This is useful for capturing the state of the current thread just prior to + * invoking the {@link Callable}. Once the state is captured, it can then be + * transfered to the new {@link Thread} in + * {@link #preProcess(NativeWebRequest, Callable)}. Capturing the state of + * Spring Security's SecurityContextHolder and migrating it to the new Thread + * is a concrete example of where this is useful. + *

+ * + * @param request the current request + * @param task the task for the current async request + * @throws Exception in case of errors + */ + void beforeConcurrentHandling(NativeWebRequest request, Callable task) throws Exception; /** * Invoked after the start of concurrent handling in the async diff --git a/spring-web/src/main/java/org/springframework/web/context/request/async/CallableProcessingInterceptorAdapter.java b/spring-web/src/main/java/org/springframework/web/context/request/async/CallableProcessingInterceptorAdapter.java index 2d27f4711c3..264e44237ff 100644 --- a/spring-web/src/main/java/org/springframework/web/context/request/async/CallableProcessingInterceptorAdapter.java +++ b/spring-web/src/main/java/org/springframework/web/context/request/async/CallableProcessingInterceptorAdapter.java @@ -24,10 +24,17 @@ import org.springframework.web.context.request.NativeWebRequest; * for simplified implementation of individual methods. * * @author Rossen Stoyanchev + * @author Rob Winch * @since 3.2 */ public abstract class CallableProcessingInterceptorAdapter implements CallableProcessingInterceptor { + /** + * This implementation is empty. + */ + public void beforeConcurrentHandling(NativeWebRequest request, Callable task) throws Exception { + } + /** * This implementation is empty. */ diff --git a/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResultInterceptorChain.java b/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResultInterceptorChain.java index 09763330c0f..ffe9bb81265 100644 --- a/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResultInterceptorChain.java +++ b/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResultInterceptorChain.java @@ -40,6 +40,12 @@ class DeferredResultInterceptorChain { this.interceptors = interceptors; } + public void applyBeforeConcurrentHandling(NativeWebRequest request, DeferredResult deferredResult) throws Exception { + for (DeferredResultProcessingInterceptor interceptor : this.interceptors) { + interceptor.beforeConcurrentHandling(request, deferredResult); + } + } + public void applyPreProcess(NativeWebRequest request, DeferredResult deferredResult) throws Exception { for (DeferredResultProcessingInterceptor interceptor : this.interceptors) { interceptor.preProcess(request, deferredResult); diff --git a/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResultProcessingInterceptor.java b/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResultProcessingInterceptor.java index 281e478968b..3adcba96fcd 100644 --- a/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResultProcessingInterceptor.java +++ b/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResultProcessingInterceptor.java @@ -36,10 +36,22 @@ import org.springframework.web.context.request.NativeWebRequest; * method can set the {@code DeferredResult} in order to resume processing. * * @author Rossen Stoyanchev + * @author Rob Winch * @since 3.2 */ public interface DeferredResultProcessingInterceptor { + /** + * Invoked immediately before the start of concurrent handling, in the same + * thread that started it. This method may be used to capture state just prior + * to the start of concurrent processing with the given {@code DeferredResult}. + * + * @param request the current request + * @param deferredResult the DeferredResult for the current request + * @throws Exception in case of errors + */ + void beforeConcurrentHandling(NativeWebRequest request, DeferredResult deferredResult) throws Exception; + /** * Invoked immediately after the start of concurrent handling, in the same * thread that started it. This method may be used to detect the start of diff --git a/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResultProcessingInterceptorAdapter.java b/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResultProcessingInterceptorAdapter.java index a6fa80d12d4..774761bdb65 100644 --- a/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResultProcessingInterceptorAdapter.java +++ b/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResultProcessingInterceptorAdapter.java @@ -22,10 +22,17 @@ import org.springframework.web.context.request.NativeWebRequest; * interface for simplified implementation of individual methods. * * @author Rossen Stoyanchev + * @author Rob Winch * @since 3.2 */ public abstract class DeferredResultProcessingInterceptorAdapter implements DeferredResultProcessingInterceptor { + /** + * This implementation is empty. + */ + public void beforeConcurrentHandling(NativeWebRequest request, DeferredResult deferredResult) throws Exception { + } + /** * This implementation is empty. */ diff --git a/spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncManager.java b/spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncManager.java index a4f0e00e3a3..1b5d9d597aa 100644 --- a/spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncManager.java +++ b/spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncManager.java @@ -249,12 +249,13 @@ public final class WebAsyncManager { * @param callable a unit of work to be executed asynchronously * @param processingContext additional context to save that can be accessed * via {@link #getConcurrentResultContext()} + * @throws Exception If concurrent processing failed to start * * @see #getConcurrentResult() * @see #getConcurrentResultContext() */ @SuppressWarnings({ "rawtypes", "unchecked" }) - public void startCallableProcessing(final Callable callable, Object... processingContext) { + public void startCallableProcessing(final Callable callable, Object... processingContext) throws Exception { Assert.notNull(callable, "Callable must not be null"); startCallableProcessing(new WebAsyncTask(callable), processingContext); } @@ -267,8 +268,9 @@ public final class WebAsyncManager { * @param webAsyncTask an WebAsyncTask containing the target {@code Callable} * @param processingContext additional context to save that can be accessed * via {@link #getConcurrentResultContext()} + * @throws Exception If concurrent processing failed to start */ - public void startCallableProcessing(final WebAsyncTask webAsyncTask, Object... processingContext) { + public void startCallableProcessing(final WebAsyncTask webAsyncTask, Object... processingContext) throws Exception { Assert.notNull(webAsyncTask, "WebAsyncTask must not be null"); Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null"); @@ -306,6 +308,8 @@ public final class WebAsyncManager { } }); + interceptorChain.applyBeforeConcurrentHandling(asyncWebRequest, callable); + startAsyncProcessing(processingContext); this.taskExecutor.submit(new Runnable() { @@ -356,12 +360,13 @@ public final class WebAsyncManager { * @param deferredResult the DeferredResult instance to initialize * @param processingContext additional context to save that can be accessed * via {@link #getConcurrentResultContext()} + * @throws Exception If concurrent processing failed to start * * @see #getConcurrentResult() * @see #getConcurrentResultContext() */ public void startDeferredResultProcessing( - final DeferredResult deferredResult, Object... processingContext) { + final DeferredResult deferredResult, Object... processingContext) throws Exception { Assert.notNull(deferredResult, "DeferredResult must not be null"); Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null"); @@ -395,6 +400,8 @@ public final class WebAsyncManager { } }); + interceptorChain.applyBeforeConcurrentHandling(asyncWebRequest, deferredResult); + startAsyncProcessing(processingContext); try { diff --git a/spring-web/src/test/java/org/springframework/web/context/request/async/WebAsyncManagerTests.java b/spring-web/src/test/java/org/springframework/web/context/request/async/WebAsyncManagerTests.java index a8436b651b0..1e5847b8309 100644 --- a/spring-web/src/test/java/org/springframework/web/context/request/async/WebAsyncManagerTests.java +++ b/spring-web/src/test/java/org/springframework/web/context/request/async/WebAsyncManagerTests.java @@ -66,7 +66,7 @@ public class WebAsyncManagerTests { } @Test - public void startAsyncProcessingWithoutAsyncWebRequest() { + public void startAsyncProcessingWithoutAsyncWebRequest() throws Exception { WebAsyncManager manager = WebAsyncUtils.getAsyncManager(new MockHttpServletRequest()); try { @@ -118,6 +118,7 @@ public class WebAsyncManagerTests { Callable task = new StubCallable(concurrentResult); CallableProcessingInterceptor interceptor = createStrictMock(CallableProcessingInterceptor.class); + interceptor.beforeConcurrentHandling(this.asyncWebRequest, task); interceptor.preProcess(this.asyncWebRequest, task); interceptor.postProcess(this.asyncWebRequest, task, new Integer(concurrentResult)); replay(interceptor); @@ -140,6 +141,7 @@ public class WebAsyncManagerTests { Callable task = new StubCallable(concurrentResult); CallableProcessingInterceptor interceptor = createStrictMock(CallableProcessingInterceptor.class); + interceptor.beforeConcurrentHandling(this.asyncWebRequest, task); interceptor.preProcess(this.asyncWebRequest, task); interceptor.postProcess(this.asyncWebRequest, task, concurrentResult); replay(interceptor); @@ -155,6 +157,34 @@ public class WebAsyncManagerTests { verify(interceptor, this.asyncWebRequest); } + @Test + public void startCallableProcessingBeforeConcurrentHandlingException() throws Exception { + Callable task = new StubCallable(21); + Exception exception = new Exception(); + + CallableProcessingInterceptor interceptor = createStrictMock(CallableProcessingInterceptor.class); + interceptor.beforeConcurrentHandling(this.asyncWebRequest, task); + expectLastCall().andThrow(exception); + replay(interceptor); + + this.asyncWebRequest.addTimeoutHandler((Runnable) notNull()); + this.asyncWebRequest.addCompletionHandler((Runnable) notNull()); + replay(this.asyncWebRequest); + + this.asyncManager.registerCallableInterceptor("interceptor", interceptor); + + try { + this.asyncManager.startCallableProcessing(task); + fail("Expected Exception"); + }catch(Exception e) { + assertEquals(exception, e); + } + + assertFalse(this.asyncManager.hasConcurrentResult()); + + verify(this.asyncWebRequest, interceptor); + } + @Test public void startCallableProcessingPreProcessException() throws Exception { @@ -162,6 +192,7 @@ public class WebAsyncManagerTests { Exception exception = new Exception(); CallableProcessingInterceptor interceptor = createStrictMock(CallableProcessingInterceptor.class); + interceptor.beforeConcurrentHandling(this.asyncWebRequest, task); interceptor.preProcess(this.asyncWebRequest, task); expectLastCall().andThrow(exception); replay(interceptor); @@ -184,6 +215,7 @@ public class WebAsyncManagerTests { Exception exception = new Exception(); CallableProcessingInterceptor interceptor = createStrictMock(CallableProcessingInterceptor.class); + interceptor.beforeConcurrentHandling(this.asyncWebRequest, task); interceptor.preProcess(this.asyncWebRequest, task); interceptor.postProcess(this.asyncWebRequest, task, 21); expectLastCall().andThrow(exception); @@ -207,11 +239,13 @@ public class WebAsyncManagerTests { Exception exception = new Exception(); CallableProcessingInterceptor interceptor1 = createMock(CallableProcessingInterceptor.class); + interceptor1.beforeConcurrentHandling(this.asyncWebRequest, task); interceptor1.preProcess(this.asyncWebRequest, task); interceptor1.postProcess(this.asyncWebRequest, task, 21); replay(interceptor1); CallableProcessingInterceptor interceptor2 = createMock(CallableProcessingInterceptor.class); + interceptor2.beforeConcurrentHandling(this.asyncWebRequest, task); interceptor2.preProcess(this.asyncWebRequest, task); interceptor2.postProcess(this.asyncWebRequest, task, 21); expectLastCall().andThrow(exception); @@ -231,7 +265,7 @@ public class WebAsyncManagerTests { } @Test - public void startCallableProcessingWithAsyncTask() { + public void startCallableProcessingWithAsyncTask() throws Exception { AsyncTaskExecutor executor = createMock(AsyncTaskExecutor.class); expect(executor.submit((Runnable) notNull())).andReturn(null); @@ -251,7 +285,7 @@ public class WebAsyncManagerTests { } @Test - public void startCallableProcessingNullInput() { + public void startCallableProcessingNullInput() throws Exception { try { this.asyncManager.startCallableProcessing((Callable) null); fail("Expected exception"); @@ -268,6 +302,7 @@ public class WebAsyncManagerTests { String concurrentResult = "abc"; DeferredResultProcessingInterceptor interceptor = createStrictMock(DeferredResultProcessingInterceptor.class); + interceptor.beforeConcurrentHandling(this.asyncWebRequest, deferredResult); interceptor.preProcess(this.asyncWebRequest, deferredResult); interceptor.postProcess(asyncWebRequest, deferredResult, concurrentResult); replay(interceptor); @@ -284,6 +319,36 @@ public class WebAsyncManagerTests { verify(this.asyncWebRequest, interceptor); } + @Test + public void startDeferredResultProcessingBeforeConcurrentHandlingException() throws Exception { + + DeferredResult deferredResult = new DeferredResult(); + Exception exception = new Exception(); + + DeferredResultProcessingInterceptor interceptor = createStrictMock(DeferredResultProcessingInterceptor.class); + interceptor.beforeConcurrentHandling(this.asyncWebRequest, deferredResult); + expectLastCall().andThrow(exception); + replay(interceptor); + + this.asyncWebRequest.addTimeoutHandler((Runnable) notNull()); + this.asyncWebRequest.addCompletionHandler((Runnable) notNull()); + replay(this.asyncWebRequest); + + this.asyncManager.registerDeferredResultInterceptor("interceptor", interceptor); + + try { + this.asyncManager.startDeferredResultProcessing(deferredResult); + fail("Expected Exception"); + } + catch(Exception success) { + assertEquals(exception, success); + } + + assertFalse(this.asyncManager.hasConcurrentResult()); + + verify(this.asyncWebRequest, interceptor); + } + @Test public void startDeferredResultProcessingPreProcessException() throws Exception { @@ -291,6 +356,7 @@ public class WebAsyncManagerTests { Exception exception = new Exception(); DeferredResultProcessingInterceptor interceptor = createStrictMock(DeferredResultProcessingInterceptor.class); + interceptor.beforeConcurrentHandling(this.asyncWebRequest, deferredResult); interceptor.preProcess(this.asyncWebRequest, deferredResult); expectLastCall().andThrow(exception); replay(interceptor); @@ -313,6 +379,7 @@ public class WebAsyncManagerTests { Exception exception = new Exception(); DeferredResultProcessingInterceptor interceptor = createStrictMock(DeferredResultProcessingInterceptor.class); + interceptor.beforeConcurrentHandling(this.asyncWebRequest, deferredResult); interceptor.preProcess(this.asyncWebRequest, deferredResult); interceptor.postProcess(this.asyncWebRequest, deferredResult, 25); expectLastCall().andThrow(exception); @@ -330,7 +397,7 @@ public class WebAsyncManagerTests { } @Test - public void startDeferredResultProcessingNullInput() { + public void startDeferredResultProcessingNullInput() throws Exception { try { this.asyncManager.startDeferredResultProcessing((DeferredResult) null); fail("Expected exception"); diff --git a/spring-web/src/test/java/org/springframework/web/context/request/async/WebAsyncManagerTimeoutTests.java b/spring-web/src/test/java/org/springframework/web/context/request/async/WebAsyncManagerTimeoutTests.java index 0948d831c4b..391b4357248 100644 --- a/spring-web/src/test/java/org/springframework/web/context/request/async/WebAsyncManagerTimeoutTests.java +++ b/spring-web/src/test/java/org/springframework/web/context/request/async/WebAsyncManagerTimeoutTests.java @@ -80,6 +80,7 @@ public class WebAsyncManagerTimeoutTests { StubCallable callable = new StubCallable(); CallableProcessingInterceptor interceptor = createStrictMock(CallableProcessingInterceptor.class); + interceptor.beforeConcurrentHandling(this.asyncWebRequest, callable); expect(interceptor.handleTimeout(this.asyncWebRequest, callable)).andReturn(RESULT_NONE); interceptor.afterCompletion(this.asyncWebRequest, callable); replay(interceptor); @@ -123,6 +124,7 @@ public class WebAsyncManagerTimeoutTests { StubCallable callable = new StubCallable(); CallableProcessingInterceptor interceptor = createStrictMock(CallableProcessingInterceptor.class); + interceptor.beforeConcurrentHandling(this.asyncWebRequest, callable); expect(interceptor.handleTimeout(this.asyncWebRequest, callable)).andReturn(22); replay(interceptor); @@ -145,6 +147,7 @@ public class WebAsyncManagerTimeoutTests { Exception exception = new Exception(); CallableProcessingInterceptor interceptor = createStrictMock(CallableProcessingInterceptor.class); + interceptor.beforeConcurrentHandling(this.asyncWebRequest, callable); expect(interceptor.handleTimeout(this.asyncWebRequest, callable)).andThrow(exception); replay(interceptor); @@ -166,6 +169,7 @@ public class WebAsyncManagerTimeoutTests { DeferredResult deferredResult = new DeferredResult(); DeferredResultProcessingInterceptor interceptor = createStrictMock(DeferredResultProcessingInterceptor.class); + interceptor.beforeConcurrentHandling(this.asyncWebRequest, deferredResult); interceptor.preProcess(this.asyncWebRequest, deferredResult); expect(interceptor.handleTimeout(this.asyncWebRequest, deferredResult)).andReturn(true); interceptor.afterCompletion(this.asyncWebRequest, deferredResult);