@ -1,5 +1,5 @@
@@ -1,5 +1,5 @@
/ *
* Copyright 2002 - 2016 the original author or authors .
* Copyright 2002 - 2017 the original author or authors .
*
* Licensed under the Apache License , Version 2 . 0 ( the "License" ) ;
* you may not use this file except in compliance with the License .
@ -20,10 +20,8 @@ import java.util.concurrent.Callable;
@@ -20,10 +20,8 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.TimeoutException ;
import java.util.concurrent.atomic.AtomicReference ;
import org.springframework.util.Assert ;
import org.springframework.util.ReflectionUtils ;
/ * *
* A { @link org . springframework . util . concurrent . ListenableFuture ListenableFuture }
@ -39,15 +37,15 @@ import org.springframework.util.ReflectionUtils;
@@ -39,15 +37,15 @@ import org.springframework.util.ReflectionUtils;
* /
public class SettableListenableFuture < T > implements ListenableFuture < T > {
private final SettableTask < T > settableTask ;
private final ListenableFutureTask < T > listenableFuture ;
private static final Callable < Object > DUMMY_CALLABLE = new Callable < Object > ( ) {
@Override
public Object call ( ) throws Exception {
throw new IllegalStateException ( "Should never be called" ) ;
}
} ;
public SettableListenableFuture ( ) {
this . settableTask = new SettableTask < > ( ) ;
this . listenableFuture = new ListenableFutureTask < > ( this . settableTask ) ;
}
private final SettableTask < T > settableTask = new SettableTask < > ( ) ;
/ * *
@ -58,11 +56,7 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> {
@@ -58,11 +56,7 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> {
* @return { @code true } if the value was successfully set , else { @code false }
* /
public boolean set ( T value ) {
boolean success = this . settableTask . setValue ( value ) ;
if ( success ) {
this . listenableFuture . run ( ) ;
}
return success ;
return this . settableTask . setResultValue ( value ) ;
}
/ * *
@ -74,27 +68,22 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> {
@@ -74,27 +68,22 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> {
* /
public boolean setException ( Throwable exception ) {
Assert . notNull ( exception , "Exception must not be null" ) ;
boolean success = this . settableTask . setException ( exception ) ;
if ( success ) {
this . listenableFuture . run ( ) ;
}
return success ;
return this . settableTask . setExceptionResult ( exception ) ;
}
@Override
public void addCallback ( ListenableFutureCallback < ? super T > callback ) {
this . listenableFuture . addCallback ( callback ) ;
this . settableTask . addCallback ( callback ) ;
}
@Override
public void addCallback ( SuccessCallback < ? super T > successCallback , FailureCallback failureCallback ) {
this . listenableFuture . addCallback ( successCallback , failureCallback ) ;
this . settableTask . addCallback ( successCallback , failureCallback ) ;
}
@Override
public boolean cancel ( boolean mayInterruptIfRunning ) {
boolean cancelled = this . settableTask . setCancelled ( ) ;
this . listenableFuture . cancel ( mayInterruptIfRunning ) ;
boolean cancelled = this . settableTask . cancel ( mayInterruptIfRunning ) ;
if ( cancelled & & mayInterruptIfRunning ) {
interruptTask ( ) ;
}
@ -113,78 +102,76 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> {
@@ -113,78 +102,76 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> {
/ * *
* Retrieve the value .
* < p > Will return the value if it has been set via { @link # set ( Object ) } ,
* throw an { @link java . util . concurrent . ExecutionException } if it has been
* set via { @link # setException ( Throwable ) } or throw a
* { @link java . util . concurrent . CancellationException } if it has been cancelled .
* @return T he value associated with this future .
* < p > This method returns the value if it has been set via { @link # set ( Object ) } ,
* throws an { @link java . util . concurrent . ExecutionException } if an exception has
* been set via { @link # setException ( Throwable ) } , or throws a
* { @link java . util . concurrent . CancellationException } if the future has been cancelled .
* @return t he value associated with this future
* /
@Override
public T get ( ) throws InterruptedException , ExecutionException {
return this . listenableFuture . get ( ) ;
return this . settableTask . get ( ) ;
}
/ * *
* Retrieve the value .
* < p > Will return the value if it has been set via { @link # set ( Object ) } ,
* throw an { @link java . util . concurrent . ExecutionException } if it has been
* set via { @link # setException ( Throwable ) } or throw a
* { @link java . util . concurrent . CancellationException } if it has been cancelled .
* @param timeout the maximum time to wait .
* @param unit the time unit of the timeout argument .
* @return T he value associated with this future .
* < p > This method returns the value if it has been set via { @link # set ( Object ) } ,
* throws an { @link java . util . concurrent . ExecutionException } if an exception has
* been set via { @link # setException ( Throwable ) } , or throws a
* { @link java . util . concurrent . CancellationException } if the future has been cancelled .
* @param timeout the maximum time to wait
* @param unit the unit of the timeout argument
* @return t he value associated with this future
* /
@Override
public T get ( long timeout , TimeUnit unit ) throws InterruptedException , ExecutionException , TimeoutException {
return this . listenableFuture . get ( timeout , unit ) ;
return this . settableTask . get ( timeout , unit ) ;
}
/ * *
* Subclasses can override this method to implement interruption of the future ' s
* computation . The method is invoked automatically by a successful call to
* { @link # cancel ( boolean ) cancel ( true ) } .
* < p > The default implementation does nothing .
* < p > The default implementation is empty .
* /
protected void interruptTask ( ) {
}
private static class SettableTask < T > implements Callable < T > {
private static final Object NO_VALUE = new Object ( ) ;
private static final Object CANCELLED = new Object ( ) ;
private static class SettableTask < T > extends ListenableFutureTask < T > {
private final AtomicReference < Object > value = new AtomicReference < > ( NO_VALUE ) ;
private volatile Thread completingThread ;
public boolean setValue ( T value ) {
return this . value . compareAndSet ( NO_VALUE , value ) ;
}
public boolean setException ( Throwable exception ) {
return this . value . compareAndSet ( NO_VALUE , exception ) ;
}
public boolean setCancelled ( ) {
return this . value . compareAndSet ( NO_VALUE , CANCELLED ) ;
@SuppressWarnings ( "unchecked" )
public SettableTask ( ) {
super ( ( Callable < T > ) DUMMY_CALLABLE ) ;
}
public boolean isCancelled ( ) {
return ( this . value . get ( ) = = CANCELLED ) ;
public boolean setResultValue ( T value ) {
set ( value ) ;
return checkCompletingThread ( ) ;
}
public boolean isDone ( ) {
return ( this . value . get ( ) ! = NO_VALUE ) ;
public boolean setExceptionResult ( Throwable exception ) {
setException ( exception ) ;
return checkCompletingThread ( ) ;
}
@SuppressWarnings ( "unchecked" )
@Override
public T call ( ) throws Exception {
Object val = this . value . get ( ) ;
if ( val instanceof Throwable ) {
ReflectionUtils . rethrowException ( ( Throwable ) val ) ;
protected void done ( ) {
if ( ! isCancelled ( ) ) {
// Implicitly invoked by set/setException: store current thread for
// determining whether the given result has actually triggered completion
// (since FutureTask.set/setException unfortunately don't expose that)
this . completingThread = Thread . currentThread ( ) ;
}
return ( T ) val ;
super . done ( ) ;
}
private boolean checkCompletingThread ( ) {
boolean check = ( this . completingThread = = Thread . currentThread ( ) ) ;
this . completingThread = null ; // only first check actually counts
return check ;
}
}