@ -38,25 +38,16 @@ import org.springframework.web.util.UrlPathHelper;
@@ -38,25 +38,16 @@ import org.springframework.web.util.UrlPathHelper;
* as an SPI and not typically used directly by application classes .
*
* < p > An async scenario starts with request processing as usual in a thread ( T1 ) .
* When a handler decides to handle the request concurrently , it calls
* Concurrent request handling can be innitiated by calling
* { @linkplain # startCallableProcessing ( Callable , Object . . . ) startCallableProcessing } or
* { @linkplain # startDeferredResultProcessing ( DeferredResult , Object . . . ) startDeferredResultProcessing }
* both of which will process in a separate thread ( T2 ) .
* After the start of concurrent handling { @link # isConcurrentHandlingStarted ( ) }
* returns "true" and this can be used by classes involved in processing on the
* main thread ( T1 ) quickly and with very minimal processing .
* both of which produce a result in a separate thread ( T2 ) . The result is saved
* and the request dispatched to the container , to resume processing with the saved
* result in a third thread ( T3 ) . Within the dispatched thread ( T3 ) , the saved
* result can be accessed via { @link # getConcurrentResult ( ) } or its presence
* detected via { @link # hasConcurrentResult ( ) } .
*
* < p > When the concurrent handling completes in a separate thread ( T2 ) , both
* { @code startCallableProcessing } and { @code startDeferredResultProcessing }
* save the results and dispatched to the container , essentially to the
* same request URI as the one that started concurrent handling . This allows for
* further processing of the concurrent results . Classes in the dispatched
* thread ( T3 ) , can access the results via { @link # getConcurrentResult ( ) } or
* detect their presence via { @link # hasConcurrentResult ( ) } . Also in the
* dispatched thread { @link # isConcurrentHandlingStarted ( ) } will return "false"
* unless concurrent handling is started once again .
*
* TODO . . mention Servlet 3 configuration
* < p > TODO . . Servlet 3 config
*
* @author Rossen Stoyanchev
* @since 3 . 2
@ -73,42 +64,38 @@ public final class WebAsyncManager {
@@ -73,42 +64,38 @@ public final class WebAsyncManager {
private static final Log logger = LogFactory . getLog ( WebAsyncManager . class ) ;
private AsyncWebRequest asyncWebRequest ;
private AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor ( this . getClass ( ) . getSimpleName ( ) ) ;
private final Map < Object , AsyncThreadInitializer > threadInitializers = new LinkedHashMap < Object , AsyncThreadInitializer > ( ) ;
private Object concurrentResult = RESULT_NONE ;
private Object [ ] concurrentResultContext ;
private final Map < Object , WebAsyncThreadInitializer > threadInitializers = new LinkedHashMap < Object , WebAsyncThreadInitializer > ( ) ;
private static final UrlPathHelper urlPathHelper = new UrlPathHelper ( ) ;
/ * *
* Package private constructor
* @see AsyncWebUtils
* /
WebAsyncManager ( ) {
}
/ * *
* Configure an AsyncTaskExecutor for use with { @link # startCallableProcessing ( Callable ) } .
* < p > By default a { @link SimpleAsyncTaskExecutor } instance is used . Applications
* are advised to provide a TaskExecutor configured for production use .
* @see org . springframework . web . servlet . mvc . method . annotation . RequestMappingHandlerAdapter # setAsyncTaskExecutor
* Package private constructor .
* @see AsyncWebUtils # getAsyncManager ( javax . servlet . ServletRequest )
* @see AsyncWebUtils # getAsyncManager ( org . springframework . web . context . request . WebRequest )
* /
public void setTaskExecutor ( AsyncTaskExecutor taskExecutor ) {
this . taskExecutor = taskExecutor ;
WebAsyncManager ( ) {
}
/ * *
* Provide an { @link AsyncWebRequest } to use to start and to dispatch request .
* This property must be set before the start of concurrent handling .
* @param asyncWebRequest the request to use
* Configure the { @link AsyncWebRequest } to use . This property may be
* set more than once during a single request to accurately reflect the
* current state of the request ( e . g . following a forward , request / response
* wrapping , etc ) . However , it should not be set while concurrent handling is
* in progress , i . e . while { @link # isConcurrentHandlingStarted ( ) } is { @code true } .
* @param asyncWebRequest the web request to use
* /
public void setAsyncWebRequest ( final AsyncWebRequest asyncWebRequest ) {
Assert . notNull ( asyncWebRequest , "Expected AsyncWebRequest" ) ;
Assert . notNull ( asyncWebRequest , "AsyncWebRequest must not be null " ) ;
Assert . state ( ! isConcurrentHandlingStarted ( ) , "Can't set AsyncWebRequest with concurrent handling in progress" ) ;
this . asyncWebRequest = asyncWebRequest ;
this . asyncWebRequest . addCompletionHandler ( new Runnable ( ) {
@ -119,18 +106,27 @@ public final class WebAsyncManager {
@@ -119,18 +106,27 @@ public final class WebAsyncManager {
}
/ * *
* Whether the handler for the current request is executed concurrently .
* Once concurrent handling is done , the result is saved , and the request
* dispatched again to resume processing where the result of concurrent
* handling is available via { @link # getConcurrentResult ( ) } .
* Configure an AsyncTaskExecutor for use with concurrent processing via
* { @link # startCallableProcessing ( Callable , Object . . . ) } .
* < p > By default a { @link SimpleAsyncTaskExecutor } instance is used .
* /
public void setTaskExecutor ( AsyncTaskExecutor taskExecutor ) {
this . taskExecutor = taskExecutor ;
}
/ * *
* Whether the target handler chose to handle the request asynchronously .
* A return value of "true" indicates concurrent handling is under way and the
* response will remain open . A return value of "false" will be returned again after concurrent
* handling produces a result and the request is dispatched to resume processing .
* /
public boolean isConcurrentHandlingStarted ( ) {
return ( ( this . asyncWebRequest ! = null ) & & ( this . asyncWebRequest . isAsyncStarted ( ) ) ) ;
}
/ * *
* Whether the current thread was dispatched to continue processing the result
* of concurrent handler execution .
* Whether the request was dispatched to resum e processing the result of
* concurrent handling .
* /
public boolean hasConcurrentResult ( ) {
@ -142,84 +138,58 @@ public final class WebAsyncManager {
@@ -142,84 +138,58 @@ public final class WebAsyncManager {
}
/ * *
* Return the result of concurrent handler execution . This may be an Object
* value on successful return or an { @code Exception } or { @code Throwable } .
* Provides access to the result from concurrent handling .
* @return an Object , possibly an { @code Exception } or { @code Throwable } if
* concurrent handling raised one .
* /
public Object getConcurrentResult ( ) {
return this . concurrentResult ;
}
/ * *
* Return the processing context saved at the start of concurrent handling .
* Provides access to additional processing context saved at the start of
* concurrent handling .
* /
public Object [ ] getConcurrentResultContext ( ) {
return this . concurrentResultContext ;
}
/ * *
* Reset the { @linkplain # getConcurrentResult ( ) concurrentResult } and the
* Clear { @linkplain # getConcurrentResult ( ) concurrentResult } and
* { @linkplain # getConcurrentResultContext ( ) concurrentResultContext } .
* /
public void reset ConcurrentResult( ) {
public void clear ConcurrentResult( ) {
this . concurrentResult = RESULT_NONE ;
this . concurrentResultContext = null ;
}
/ * *
* Register an { @link AsyncThreadInitializer } with the WebAsyncManager instance
* for the current request . It may later be accessed and applied via
* { @link # applyAsyncThreadInitializer ( String ) } and will also be used to
* initialize and reset threads for concurrent handler execution .
* @param key a unique the key under which to keep the initializer
* @param initializer the initializer instance
* /
public void registerAsyncThreadInitializer ( Object key , AsyncThreadInitializer initializer ) {
Assert . notNull ( initializer , "An AsyncThreadInitializer instance is required" ) ;
this . threadInitializers . put ( key , initializer ) ;
}
/ * *
* Invoke the { @linkplain AsyncThreadInitializer # initialize ( ) initialize ( ) }
* method of the named { @link AsyncThreadInitializer } .
* @param key the key under which the initializer was registered
* @return whether an initializer was found and applied
* /
public boolean applyAsyncThreadInitializer ( Object key ) {
AsyncThreadInitializer initializer = this . threadInitializers . get ( key ) ;
if ( initializer ! = null ) {
initializer . initialize ( ) ;
return true ;
}
return false ;
}
/ * *
* Submit a request handling task for concurrent execution . Returns immediately
* and subsequent calls to { @link # isConcurrentHandlingStarted ( ) } return "true" .
* < p > When concurrent handling is done , the resulting value , which may be an
* Object or a raised { @code Exception } or { @code Throwable } , is saved and the
* request is dispatched for further processing of that result . In the dispatched
* thread , the result can be accessed via { @link # getConcurrentResult ( ) } while
* { @link # hasConcurrentResult ( ) } returns "true" and
* { @link # isConcurrentHandlingStarted ( ) } is back to returning "false" .
* Start concurrent request processing and execute the given task with an
* { @link # setTaskExecutor ( AsyncTaskExecutor ) AsyncTaskExecutor } . The result
* from the task execution is saved and the request dispatched in order to
* resume processing of that result . If the task raises an Exception then
* the saved result will be the raised Exception .
*
* @param callable a unit of work to be executed asynchronously
* @param processingContext additional context to save for later access via
* { @link # getConcurrentResultContext ( ) }
* @param processingContext additional context to save that can be accessed
* via { @link # getConcurrentResultContext ( ) }
*
* @see # getConcurrentResult ( )
* @see # getConcurrentResultContext ( )
* /
public void startCallableProcessing ( final Callable < ? > callable , Object . . . processingContext ) {
Assert . notNull ( callable , "Callable is required" ) ;
Assert . notNull ( callable , "Callable must not be null" ) ;
startAsyncProcessing ( processingContext ) ;
this . taskExecutor . submit ( new Runnable ( ) {
public void run ( ) {
List < AsyncThreadInitializer > initializers =
new ArrayList < AsyncThreadInitializer > ( threadInitializers . values ( ) ) ;
List < WebAsyncThreadInitializer > initializers =
new ArrayList < WebAsyncThreadInitializer > ( threadInitializers . values ( ) ) ;
try {
for ( AsyncThreadInitializer initializer : initializers ) {
for ( WebAsyncThreadInitializer initializer : initializers ) {
initializer . initialize ( ) ;
}
concurrentResult = callable . call ( ) ;
@ -229,7 +199,7 @@ public final class WebAsyncManager {
@@ -229,7 +199,7 @@ public final class WebAsyncManager {
}
finally {
Collections . reverse ( initializers ) ;
for ( AsyncThreadInitializer initializer : initializers ) {
for ( Web AsyncThreadInitializer initializer : initializers ) {
initializer . reset ( ) ;
}
}
@ -250,24 +220,51 @@ public final class WebAsyncManager {
@@ -250,24 +220,51 @@ public final class WebAsyncManager {
}
/ * *
* Initialize the given given { @link DeferredResult } so that whenever the
* DeferredResult is set , the resulting value , which may be an Object or a
* raised { @code Exception } or { @code Throwable } , is saved and the request
* is dispatched for further processing of the result . In the dispatch
* thread , the result value can be accessed via { @link # getConcurrentResult ( ) } .
* < p > The method returns immediately and it ' s up to the caller to set the
* DeferredResult . Subsequent calls to { @link # isConcurrentHandlingStarted ( ) }
* return "true" until after the dispatch when { @link # hasConcurrentResult ( ) }
* returns "true" and { @link # isConcurrentHandlingStarted ( ) } is back to "false" .
* Use the given { @link AsyncTask } to configure the task executor as well as
* the timeout value of the { @code AsyncWebRequest } before delegating to
* { @link # startCallableProcessing ( Callable , Object . . . ) } .
* @param asyncTask an asyncTask containing the target { @code Callable }
* @param processingContext additional context to save that can be accessed
* via { @link # getConcurrentResultContext ( ) }
* /
public void startCallableProcessing ( AsyncTask asyncTask , Object . . . processingContext ) {
Assert . notNull ( asyncTask , "AsyncTask must not be null" ) ;
Long timeout = asyncTask . getTimeout ( ) ;
if ( timeout ! = null ) {
this . asyncWebRequest . setTimeout ( timeout ) ;
}
AsyncTaskExecutor executor = asyncTask . getExecutor ( ) ;
if ( executor ! = null ) {
this . taskExecutor = executor ;
}
startCallableProcessing ( asyncTask . getCallable ( ) , processingContext ) ;
}
/ * *
* Start concurrent request processing and initialize the given { @link DeferredResult }
* with a { @link DeferredResultHandler } that saves the result and dispatches
* the request to resume processing of that result .
* The { @code AsyncWebRequest } is also updated with a completion handler that
* expires the { @code DeferredResult } and a timeout handler assuming the
* { @code DeferredResult } has a default timeout result .
*
* @param deferredResult the DeferredResult instance to initialize
* @param processingContext additional context to save for later access via
* { @link # getConcurrentResultContext ( ) }
* @param processingContext additional context to save that can be accessed
* via { @link # getConcurrentResultContext ( ) }
*
* @see # getConcurrentResult ( )
* @see # getConcurrentResultContext ( )
* /
public void startDeferredResultProcessing ( final DeferredResult < ? > deferredResult , Object . . . processingContext ) {
Assert . notNull ( deferredResult , "DeferredResult is required" ) ;
Assert . notNull ( deferredResult , "DeferredResult must not be null " ) ;
startAsyncProcessing ( processingContext ) ;
Long timeout = deferredResult . getTimeoutMilliseconds ( ) ;
if ( timeout ! = null ) {
this . asyncWebRequest . setTimeout ( timeout ) ;
}
this . asyncWebRequest . addCompletionHandler ( new Runnable ( ) {
public void run ( ) {
@ -283,6 +280,8 @@ public final class WebAsyncManager {
@@ -283,6 +280,8 @@ public final class WebAsyncManager {
} ) ;
}
startAsyncProcessing ( processingContext ) ;
deferredResult . setResultHandler ( new DeferredResultHandler ( ) {
public void handleResult ( Object result ) {
@ -300,13 +299,13 @@ public final class WebAsyncManager {
@@ -300,13 +299,13 @@ public final class WebAsyncManager {
} ) ;
}
private void startAsyncProcessing ( Object . . . c ontext) {
private void startAsyncProcessing ( Object [ ] processingC ontext) {
Assert . state ( this . asyncWebRequest ! = null , "AsyncWebRequest was not set " ) ;
Assert . state ( this . asyncWebRequest ! = null , "AsyncWebRequest must not be null " ) ;
this . asyncWebRequest . startAsync ( ) ;
this . concurrentResult = null ;
this . concurrentResultContext = context ;
this . concurrentResultContext = pro cessingC ontext;
if ( logger . isDebugEnabled ( ) ) {
HttpServletRequest request = asyncWebRequest . getNativeRequest ( HttpServletRequest . class ) ;
@ -315,11 +314,38 @@ public final class WebAsyncManager {
@@ -315,11 +314,38 @@ public final class WebAsyncManager {
}
}
/ * *
* Register an { @link WebAsyncThreadInitializer } for the current request . It may
* later be accessed and applied via { @link # initializeAsyncThread ( String ) }
* and will also be used to initialize and reset threads for concurrent handler execution .
* @param key a unique the key under which to keep the initializer
* @param initializer the initializer instance
* /
public void registerAsyncThreadInitializer ( Object key , WebAsyncThreadInitializer initializer ) {
Assert . notNull ( initializer , "WebAsyncThreadInitializer must not be null" ) ;
this . threadInitializers . put ( key , initializer ) ;
}
/ * *
* Invoke the { @linkplain WebAsyncThreadInitializer # initialize ( ) initialize ( ) }
* method of the named { @link WebAsyncThreadInitializer } .
* @param key the key under which the initializer was registered
* @return whether an initializer was found and applied
* /
public boolean initializeAsyncThread ( Object key ) {
WebAsyncThreadInitializer initializer = this . threadInitializers . get ( key ) ;
if ( initializer ! = null ) {
initializer . initialize ( ) ;
return true ;
}
return false ;
}
/ * *
* A contract for initializing and resetting a thread .
* Initialize and reset thread - bound variables .
* /
public interface AsyncThreadInitializer {
public interface Web AsyncThreadInitializer {
void initialize ( ) ;