Browse Source

SettableListenableFuture centralizes state in ListenableFutureTask subclass

Issue: SPR-15216
(cherry picked from commit c9b99da)
pull/1316/head
Juergen Hoeller 9 years ago
parent
commit
e3cd5c8ef0
  1. 5
      spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureTask.java
  2. 89
      spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java

5
spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureTask.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -63,8 +63,9 @@ public class ListenableFutureTask<T> extends FutureTask<T> implements Listenable @@ -63,8 +63,9 @@ public class ListenableFutureTask<T> extends FutureTask<T> implements Listenable
this.callbacks.addFailureCallback(failureCallback);
}
@Override
protected final void done() {
protected void done() {
Throwable cause;
try {
T result = get();

89
spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java

@ -20,10 +20,8 @@ import java.util.concurrent.Callable; @@ -20,10 +20,8 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
/**
* A {@link org.springframework.util.concurrent.ListenableFuture ListenableFuture}
@ -39,15 +37,15 @@ import org.springframework.util.ReflectionUtils; @@ -39,15 +37,15 @@ import org.springframework.util.ReflectionUtils;
*/
public class SettableListenableFuture<T> implements ListenableFuture<T> {
private final SettableTask<T> settableTask;
private final ListenableFutureTask<T> listenableFuture;
private static final Callable<Object> DUMMY_CALLABLE = new Callable<Object>() {
@Override
public Object call() throws Exception {
throw new IllegalStateException("Should never be called");
}
};
public SettableListenableFuture() {
this.settableTask = new SettableTask<T>();
this.listenableFuture = new ListenableFutureTask<T>(this.settableTask);
}
private final SettableTask<T> settableTask = new SettableTask<T>();
/**
@ -58,11 +56,7 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> { @@ -58,11 +56,7 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> {
* @return {@code true} if the value was successfully set, else {@code false}
*/
public boolean set(T value) {
boolean success = this.settableTask.setValue(value);
if (success) {
this.listenableFuture.run();
}
return success;
return this.settableTask.setResultValue(value);
}
/**
@ -74,27 +68,22 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> { @@ -74,27 +68,22 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> {
*/
public boolean setException(Throwable exception) {
Assert.notNull(exception, "Exception must not be null");
boolean success = this.settableTask.setException(exception);
if (success) {
this.listenableFuture.run();
}
return success;
return this.settableTask.setExceptionResult(exception);
}
@Override
public void addCallback(ListenableFutureCallback<? super T> callback) {
this.listenableFuture.addCallback(callback);
this.settableTask.addCallback(callback);
}
@Override
public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) {
this.listenableFuture.addCallback(successCallback, failureCallback);
this.settableTask.addCallback(successCallback, failureCallback);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = this.settableTask.setCancelled();
this.listenableFuture.cancel(mayInterruptIfRunning);
boolean cancelled = this.settableTask.cancel(mayInterruptIfRunning);
if (cancelled && mayInterruptIfRunning) {
interruptTask();
}
@ -121,7 +110,7 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> { @@ -121,7 +110,7 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> {
*/
@Override
public T get() throws InterruptedException, ExecutionException {
return this.listenableFuture.get();
return this.settableTask.get();
}
/**
@ -136,7 +125,7 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> { @@ -136,7 +125,7 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> {
*/
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return this.listenableFuture.get(timeout, unit);
return this.settableTask.get(timeout, unit);
}
/**
@ -149,42 +138,40 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> { @@ -149,42 +138,40 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> {
}
private static class SettableTask<T> implements Callable<T> {
private static final Object NO_VALUE = new Object();
private static final Object CANCELLED = new Object();
private static class SettableTask<T> extends ListenableFutureTask<T> {
private final AtomicReference<Object> value = new AtomicReference<Object>(NO_VALUE);
private volatile Thread completingThread;
public boolean setValue(T value) {
return this.value.compareAndSet(NO_VALUE, value);
}
public boolean setException(Throwable exception) {
return this.value.compareAndSet(NO_VALUE, exception);
}
public boolean setCancelled() {
return this.value.compareAndSet(NO_VALUE, CANCELLED);
@SuppressWarnings("unchecked")
public SettableTask() {
super((Callable<T>) DUMMY_CALLABLE);
}
public boolean isCancelled() {
return (this.value.get() == CANCELLED);
public boolean setResultValue(T value) {
set(value);
return checkCompletingThread();
}
public boolean isDone() {
return (this.value.get() != NO_VALUE);
public boolean setExceptionResult(Throwable exception) {
setException(exception);
return checkCompletingThread();
}
@SuppressWarnings("unchecked")
@Override
public T call() throws Exception {
Object val = this.value.get();
if (val instanceof Throwable) {
ReflectionUtils.rethrowException((Throwable) val);
protected void done() {
if (!isCancelled()) {
// Implicitly invoked by set/setException: store current thread for
// determining whether the given result has actually triggered completion
// (since FutureTask.set/setException unfortunately don't expose that)
this.completingThread = Thread.currentThread();
}
return (T) val;
super.done();
}
private boolean checkCompletingThread() {
boolean check = (this.completingThread == Thread.currentThread());
this.completingThread = null; // only first check actually counts
return check;
}
}

Loading…
Cancel
Save