diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureTask.java b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureTask.java index 393a6e64c73..11c22ecda91 100644 --- a/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureTask.java +++ b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureTask.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2014 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -63,8 +63,9 @@ public class ListenableFutureTask extends FutureTask implements Listenable this.callbacks.addFailureCallback(failureCallback); } + @Override - protected final void done() { + protected void done() { Throwable cause; try { T result = get(); diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java b/spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java index c49c29d4eda..5f8b29cfe7a 100644 --- a/spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java +++ b/spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java @@ -20,10 +20,8 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; import org.springframework.util.Assert; -import org.springframework.util.ReflectionUtils; /** * A {@link org.springframework.util.concurrent.ListenableFuture ListenableFuture} @@ -39,15 +37,15 @@ import org.springframework.util.ReflectionUtils; */ public class SettableListenableFuture implements ListenableFuture { - private final SettableTask settableTask; - - private final ListenableFutureTask listenableFuture; + private static final Callable DUMMY_CALLABLE = new Callable() { + @Override + public Object call() throws Exception { + throw new IllegalStateException("Should never be called"); + } + }; - public SettableListenableFuture() { - this.settableTask = new SettableTask(); - this.listenableFuture = new ListenableFutureTask(this.settableTask); - } + private final SettableTask settableTask = new SettableTask(); /** @@ -58,11 +56,7 @@ public class SettableListenableFuture implements ListenableFuture { * @return {@code true} if the value was successfully set, else {@code false} */ public boolean set(T value) { - boolean success = this.settableTask.setValue(value); - if (success) { - this.listenableFuture.run(); - } - return success; + return this.settableTask.setResultValue(value); } /** @@ -74,27 +68,22 @@ public class SettableListenableFuture implements ListenableFuture { */ public boolean setException(Throwable exception) { Assert.notNull(exception, "Exception must not be null"); - boolean success = this.settableTask.setException(exception); - if (success) { - this.listenableFuture.run(); - } - return success; + return this.settableTask.setExceptionResult(exception); } @Override public void addCallback(ListenableFutureCallback callback) { - this.listenableFuture.addCallback(callback); + this.settableTask.addCallback(callback); } @Override public void addCallback(SuccessCallback successCallback, FailureCallback failureCallback) { - this.listenableFuture.addCallback(successCallback, failureCallback); + this.settableTask.addCallback(successCallback, failureCallback); } @Override public boolean cancel(boolean mayInterruptIfRunning) { - boolean cancelled = this.settableTask.setCancelled(); - this.listenableFuture.cancel(mayInterruptIfRunning); + boolean cancelled = this.settableTask.cancel(mayInterruptIfRunning); if (cancelled && mayInterruptIfRunning) { interruptTask(); } @@ -121,7 +110,7 @@ public class SettableListenableFuture implements ListenableFuture { */ @Override public T get() throws InterruptedException, ExecutionException { - return this.listenableFuture.get(); + return this.settableTask.get(); } /** @@ -136,7 +125,7 @@ public class SettableListenableFuture implements ListenableFuture { */ @Override public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - return this.listenableFuture.get(timeout, unit); + return this.settableTask.get(timeout, unit); } /** @@ -149,42 +138,40 @@ public class SettableListenableFuture implements ListenableFuture { } - private static class SettableTask implements Callable { - - private static final Object NO_VALUE = new Object(); - - private static final Object CANCELLED = new Object(); + private static class SettableTask extends ListenableFutureTask { - private final AtomicReference value = new AtomicReference(NO_VALUE); + private volatile Thread completingThread; - public boolean setValue(T value) { - return this.value.compareAndSet(NO_VALUE, value); - } - - public boolean setException(Throwable exception) { - return this.value.compareAndSet(NO_VALUE, exception); - } - - public boolean setCancelled() { - return this.value.compareAndSet(NO_VALUE, CANCELLED); + @SuppressWarnings("unchecked") + public SettableTask() { + super((Callable) DUMMY_CALLABLE); } - public boolean isCancelled() { - return (this.value.get() == CANCELLED); + public boolean setResultValue(T value) { + set(value); + return checkCompletingThread(); } - public boolean isDone() { - return (this.value.get() != NO_VALUE); + public boolean setExceptionResult(Throwable exception) { + setException(exception); + return checkCompletingThread(); } - @SuppressWarnings("unchecked") @Override - public T call() throws Exception { - Object val = this.value.get(); - if (val instanceof Throwable) { - ReflectionUtils.rethrowException((Throwable) val); + protected void done() { + if (!isCancelled()) { + // Implicitly invoked by set/setException: store current thread for + // determining whether the given result has actually triggered completion + // (since FutureTask.set/setException unfortunately don't expose that) + this.completingThread = Thread.currentThread(); } - return (T) val; + super.done(); + } + + private boolean checkCompletingThread() { + boolean check = (this.completingThread == Thread.currentThread()); + this.completingThread = null; // only first check actually counts + return check; } }