|
|
|
@ -1,5 +1,5 @@ |
|
|
|
/* |
|
|
|
/* |
|
|
|
* Copyright 2002-2016 the original author or authors. |
|
|
|
* Copyright 2002-2017 the original author or authors. |
|
|
|
* |
|
|
|
* |
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
|
|
* you may not use this file except in compliance with the License. |
|
|
|
* you may not use this file except in compliance with the License. |
|
|
|
@ -20,10 +20,8 @@ import java.util.concurrent.Callable; |
|
|
|
import java.util.concurrent.ExecutionException; |
|
|
|
import java.util.concurrent.ExecutionException; |
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
import java.util.concurrent.TimeoutException; |
|
|
|
import java.util.concurrent.TimeoutException; |
|
|
|
import java.util.concurrent.atomic.AtomicReference; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import org.springframework.util.Assert; |
|
|
|
import org.springframework.util.Assert; |
|
|
|
import org.springframework.util.ReflectionUtils; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
* A {@link org.springframework.util.concurrent.ListenableFuture ListenableFuture} |
|
|
|
* A {@link org.springframework.util.concurrent.ListenableFuture ListenableFuture} |
|
|
|
@ -39,15 +37,15 @@ import org.springframework.util.ReflectionUtils; |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
public class SettableListenableFuture<T> implements ListenableFuture<T> { |
|
|
|
public class SettableListenableFuture<T> implements ListenableFuture<T> { |
|
|
|
|
|
|
|
|
|
|
|
private final SettableTask<T> settableTask; |
|
|
|
private static final Callable<Object> DUMMY_CALLABLE = new Callable<Object>() { |
|
|
|
|
|
|
|
@Override |
|
|
|
private final ListenableFutureTask<T> listenableFuture; |
|
|
|
public Object call() throws Exception { |
|
|
|
|
|
|
|
throw new IllegalStateException("Should never be called"); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public SettableListenableFuture() { |
|
|
|
private final SettableTask<T> settableTask = new SettableTask<>(); |
|
|
|
this.settableTask = new SettableTask<>(); |
|
|
|
|
|
|
|
this.listenableFuture = new ListenableFutureTask<>(this.settableTask); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
@ -58,11 +56,7 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> { |
|
|
|
* @return {@code true} if the value was successfully set, else {@code false} |
|
|
|
* @return {@code true} if the value was successfully set, else {@code false} |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
public boolean set(T value) { |
|
|
|
public boolean set(T value) { |
|
|
|
boolean success = this.settableTask.setValue(value); |
|
|
|
return this.settableTask.setResultValue(value); |
|
|
|
if (success) { |
|
|
|
|
|
|
|
this.listenableFuture.run(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
return success; |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
@ -74,27 +68,22 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> { |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
public boolean setException(Throwable exception) { |
|
|
|
public boolean setException(Throwable exception) { |
|
|
|
Assert.notNull(exception, "Exception must not be null"); |
|
|
|
Assert.notNull(exception, "Exception must not be null"); |
|
|
|
boolean success = this.settableTask.setException(exception); |
|
|
|
return this.settableTask.setExceptionResult(exception); |
|
|
|
if (success) { |
|
|
|
|
|
|
|
this.listenableFuture.run(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
return success; |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public void addCallback(ListenableFutureCallback<? super T> callback) { |
|
|
|
public void addCallback(ListenableFutureCallback<? super T> callback) { |
|
|
|
this.listenableFuture.addCallback(callback); |
|
|
|
this.settableTask.addCallback(callback); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) { |
|
|
|
public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) { |
|
|
|
this.listenableFuture.addCallback(successCallback, failureCallback); |
|
|
|
this.settableTask.addCallback(successCallback, failureCallback); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public boolean cancel(boolean mayInterruptIfRunning) { |
|
|
|
public boolean cancel(boolean mayInterruptIfRunning) { |
|
|
|
boolean cancelled = this.settableTask.setCancelled(); |
|
|
|
boolean cancelled = this.settableTask.cancel(mayInterruptIfRunning); |
|
|
|
this.listenableFuture.cancel(mayInterruptIfRunning); |
|
|
|
|
|
|
|
if (cancelled && mayInterruptIfRunning) { |
|
|
|
if (cancelled && mayInterruptIfRunning) { |
|
|
|
interruptTask(); |
|
|
|
interruptTask(); |
|
|
|
} |
|
|
|
} |
|
|
|
@ -113,78 +102,76 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> { |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
* Retrieve the value. |
|
|
|
* Retrieve the value. |
|
|
|
* <p>Will return the value if it has been set via {@link #set(Object)}, |
|
|
|
* <p>This method returns the value if it has been set via {@link #set(Object)}, |
|
|
|
* throw an {@link java.util.concurrent.ExecutionException} if it has been |
|
|
|
* throws an {@link java.util.concurrent.ExecutionException} if an exception has |
|
|
|
* set via {@link #setException(Throwable)} or throw a |
|
|
|
* been set via {@link #setException(Throwable)}, or throws a |
|
|
|
* {@link java.util.concurrent.CancellationException} if it has been cancelled. |
|
|
|
* {@link java.util.concurrent.CancellationException} if the future has been cancelled. |
|
|
|
* @return The value associated with this future. |
|
|
|
* @return the value associated with this future |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public T get() throws InterruptedException, ExecutionException { |
|
|
|
public T get() throws InterruptedException, ExecutionException { |
|
|
|
return this.listenableFuture.get(); |
|
|
|
return this.settableTask.get(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
* Retrieve the value. |
|
|
|
* Retrieve the value. |
|
|
|
* <p>Will return the value if it has been set via {@link #set(Object)}, |
|
|
|
* <p>This method returns the value if it has been set via {@link #set(Object)}, |
|
|
|
* throw an {@link java.util.concurrent.ExecutionException} if it has been |
|
|
|
* throws an {@link java.util.concurrent.ExecutionException} if an exception has |
|
|
|
* set via {@link #setException(Throwable)} or throw a |
|
|
|
* been set via {@link #setException(Throwable)}, or throws a |
|
|
|
* {@link java.util.concurrent.CancellationException} if it has been cancelled. |
|
|
|
* {@link java.util.concurrent.CancellationException} if the future has been cancelled. |
|
|
|
* @param timeout the maximum time to wait. |
|
|
|
* @param timeout the maximum time to wait |
|
|
|
* @param unit the time unit of the timeout argument. |
|
|
|
* @param unit the unit of the timeout argument |
|
|
|
* @return The value associated with this future. |
|
|
|
* @return the value associated with this future |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { |
|
|
|
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { |
|
|
|
return this.listenableFuture.get(timeout, unit); |
|
|
|
return this.settableTask.get(timeout, unit); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
* Subclasses can override this method to implement interruption of the future's |
|
|
|
* Subclasses can override this method to implement interruption of the future's |
|
|
|
* computation. The method is invoked automatically by a successful call to |
|
|
|
* computation. The method is invoked automatically by a successful call to |
|
|
|
* {@link #cancel(boolean) cancel(true)}. |
|
|
|
* {@link #cancel(boolean) cancel(true)}. |
|
|
|
* <p>The default implementation does nothing. |
|
|
|
* <p>The default implementation is empty. |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
protected void interruptTask() { |
|
|
|
protected void interruptTask() { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private static class SettableTask<T> implements Callable<T> { |
|
|
|
private static class SettableTask<T> extends ListenableFutureTask<T> { |
|
|
|
|
|
|
|
|
|
|
|
private static final Object NO_VALUE = new Object(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private static final Object CANCELLED = new Object(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private final AtomicReference<Object> value = new AtomicReference<>(NO_VALUE); |
|
|
|
private volatile Thread completingThread; |
|
|
|
|
|
|
|
|
|
|
|
public boolean setValue(T value) { |
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
return this.value.compareAndSet(NO_VALUE, value); |
|
|
|
public SettableTask() { |
|
|
|
} |
|
|
|
super((Callable<T>) DUMMY_CALLABLE); |
|
|
|
|
|
|
|
|
|
|
|
public boolean setException(Throwable exception) { |
|
|
|
|
|
|
|
return this.value.compareAndSet(NO_VALUE, exception); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public boolean setCancelled() { |
|
|
|
|
|
|
|
return this.value.compareAndSet(NO_VALUE, CANCELLED); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public boolean isCancelled() { |
|
|
|
public boolean setResultValue(T value) { |
|
|
|
return (this.value.get() == CANCELLED); |
|
|
|
set(value); |
|
|
|
|
|
|
|
return checkCompletingThread(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public boolean isDone() { |
|
|
|
public boolean setExceptionResult(Throwable exception) { |
|
|
|
return (this.value.get() != NO_VALUE); |
|
|
|
setException(exception); |
|
|
|
|
|
|
|
return checkCompletingThread(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public T call() throws Exception { |
|
|
|
protected void done() { |
|
|
|
Object val = this.value.get(); |
|
|
|
if (!isCancelled()) { |
|
|
|
if (val instanceof Throwable) { |
|
|
|
// Implicitly invoked by set/setException: store current thread for
|
|
|
|
ReflectionUtils.rethrowException((Throwable) val); |
|
|
|
// determining whether the given result has actually triggered completion
|
|
|
|
|
|
|
|
// (since FutureTask.set/setException unfortunately don't expose that)
|
|
|
|
|
|
|
|
this.completingThread = Thread.currentThread(); |
|
|
|
} |
|
|
|
} |
|
|
|
return (T) val; |
|
|
|
super.done(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private boolean checkCompletingThread() { |
|
|
|
|
|
|
|
boolean check = (this.completingThread == Thread.currentThread()); |
|
|
|
|
|
|
|
this.completingThread = null; // only first check actually counts
|
|
|
|
|
|
|
|
return check; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|