Browse Source

SettableListenableFuture consistently tracks cancellation state

Issue: SPR-15202
(cherry picked from commit 9666fcc)
pull/1316/head
Juergen Hoeller 9 years ago
parent
commit
dec1c93b24
  1. 63
      spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java
  2. 129
      spring-core/src/test/java/org/springframework/util/concurrent/SettableListenableFutureTests.java

63
spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -27,13 +27,14 @@ import org.springframework.util.ReflectionUtils; @@ -27,13 +27,14 @@ import org.springframework.util.ReflectionUtils;
/**
* A {@link org.springframework.util.concurrent.ListenableFuture ListenableFuture}
* whose value can be set via {@link #set(Object)} or
* {@link #setException(Throwable)}. It may also be cancelled.
* whose value can be set via {@link #set(T)} or {@link #setException(Throwable)}.
* It may also be cancelled.
*
* <p>Inspired by {@code com.google.common.util.concurrent.SettableFuture}.
*
* @author Mattias Severson
* @author Rossen Stoyanchev
* @author Juergen Hoeller
* @since 4.1
*/
public class SettableListenableFuture<T> implements ListenableFuture<T> {
@ -92,8 +93,8 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> { @@ -92,8 +93,8 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
this.settableTask.setCancelled();
boolean cancelled = this.listenableFuture.cancel(mayInterruptIfRunning);
boolean cancelled = this.settableTask.setCancelled();
this.listenableFuture.cancel(mayInterruptIfRunning);
if (cancelled && mayInterruptIfRunning) {
interruptTask();
}
@ -102,21 +103,21 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> { @@ -102,21 +103,21 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> {
@Override
public boolean isCancelled() {
return this.listenableFuture.isCancelled();
return this.settableTask.isCancelled();
}
@Override
public boolean isDone() {
return this.listenableFuture.isDone();
return this.settableTask.isDone();
}
/**
* Retrieve the value.
* <p>Will return the value if it has been set via {@link #set(Object)},
* throw an {@link java.util.concurrent.ExecutionException} if it has been
* set via {@link #setException(Throwable)} or throw a
* {@link java.util.concurrent.CancellationException} if it has been cancelled.
* @return The value associated with this future.
* <p>This method returns the value if it has been set via {@link #set(Object)},
* throws an {@link java.util.concurrent.ExecutionException} if an exception has
* been set via {@link #setException(Throwable)}, or throws a
* {@link java.util.concurrent.CancellationException} if the future has been cancelled.
* @return the value associated with this future
*/
@Override
public T get() throws InterruptedException, ExecutionException {
@ -125,13 +126,13 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> { @@ -125,13 +126,13 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> {
/**
* Retrieve the value.
* <p>Will return the value if it has been set via {@link #set(Object)},
* throw an {@link java.util.concurrent.ExecutionException} if it has been
* set via {@link #setException(Throwable)} or throw a
* {@link java.util.concurrent.CancellationException} if it has been cancelled.
* @param timeout the maximum time to wait.
* @param unit the time unit of the timeout argument.
* @return The value associated with this future.
* <p>This method returns the value if it has been set via {@link #set(Object)},
* throws an {@link java.util.concurrent.ExecutionException} if an exception has
* been set via {@link #setException(Throwable)}, or throws a
* {@link java.util.concurrent.CancellationException} if the future has been cancelled.
* @param timeout the maximum time to wait
* @param unit the unit of the timeout argument
* @return the value associated with this future
*/
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
@ -142,7 +143,7 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> { @@ -142,7 +143,7 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> {
* Subclasses can override this method to implement interruption of the future's
* computation. The method is invoked automatically by a successful call to
* {@link #cancel(boolean) cancel(true)}.
* <p>The default implementation does nothing.
* <p>The default implementation is empty.
*/
protected void interruptTask() {
}
@ -152,26 +153,28 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> { @@ -152,26 +153,28 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> {
private static final Object NO_VALUE = new Object();
private final AtomicReference<Object> value = new AtomicReference<Object>(NO_VALUE);
private static final Object CANCELLED = new Object();
private volatile boolean cancelled = false;
private final AtomicReference<Object> value = new AtomicReference<Object>(NO_VALUE);
public boolean setValue(T value) {
if (this.cancelled) {
return false;
}
return this.value.compareAndSet(NO_VALUE, value);
}
public boolean setException(Throwable exception) {
if (this.cancelled) {
return false;
}
return this.value.compareAndSet(NO_VALUE, exception);
}
public void setCancelled() {
this.cancelled = true;
public boolean setCancelled() {
return this.value.compareAndSet(NO_VALUE, CANCELLED);
}
public boolean isCancelled() {
return (this.value.get() == CANCELLED);
}
public boolean isDone() {
return (this.value.get() != NO_VALUE);
}
@SuppressWarnings("unchecked")

129
spring-core/src/test/java/org/springframework/util/concurrent/SettableListenableFutureTests.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -30,38 +30,40 @@ import static org.mockito.Mockito.*; @@ -30,38 +30,40 @@ import static org.mockito.Mockito.*;
/**
* @author Mattias Severson
* @author Juergen Hoeller
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public class SettableListenableFutureTests {
private final SettableListenableFuture<String> settableListenableFuture = new SettableListenableFuture<String>();
private final SettableListenableFuture<String> settableListenableFuture = new SettableListenableFuture<>();
@Test
public void validateInitialValues() {
assertFalse(settableListenableFuture.isDone());
assertFalse(settableListenableFuture.isCancelled());
assertFalse(settableListenableFuture.isDone());
}
@Test
public void returnsSetValue() throws ExecutionException, InterruptedException {
String string = "hello";
boolean wasSet = settableListenableFuture.set(string);
assertTrue(wasSet);
assertTrue(settableListenableFuture.set(string));
assertThat(settableListenableFuture.get(), equalTo(string));
assertFalse(settableListenableFuture.isCancelled());
assertTrue(settableListenableFuture.isDone());
}
@Test
public void setValueUpdatesDoneStatus() {
settableListenableFuture.set("hello");
assertFalse(settableListenableFuture.isCancelled());
assertTrue(settableListenableFuture.isDone());
}
@Test
public void throwsSetExceptionWrappedInExecutionException() throws ExecutionException, InterruptedException {
Throwable exception = new RuntimeException();
boolean wasSet = settableListenableFuture.setException(exception);
assertTrue(wasSet);
assertTrue(settableListenableFuture.setException(exception));
try {
settableListenableFuture.get();
fail("Expected ExecutionException");
@ -69,13 +71,16 @@ public class SettableListenableFutureTests { @@ -69,13 +71,16 @@ public class SettableListenableFutureTests {
catch (ExecutionException ex) {
assertThat(ex.getCause(), equalTo(exception));
}
assertFalse(settableListenableFuture.isCancelled());
assertTrue(settableListenableFuture.isDone());
}
@Test
public void throwsSetErrorWrappedInExecutionException() throws ExecutionException, InterruptedException {
Throwable exception = new OutOfMemoryError();
boolean wasSet = settableListenableFuture.setException(exception);
assertTrue(wasSet);
assertTrue(settableListenableFuture.setException(exception));
try {
settableListenableFuture.get();
fail("Expected ExecutionException");
@ -83,12 +88,16 @@ public class SettableListenableFutureTests { @@ -83,12 +88,16 @@ public class SettableListenableFutureTests {
catch (ExecutionException ex) {
assertThat(ex.getCause(), equalTo(exception));
}
assertFalse(settableListenableFuture.isCancelled());
assertTrue(settableListenableFuture.isDone());
}
@Test
public void setValueTriggersCallback() {
String string = "hello";
final String[] callbackHolder = new String[1];
settableListenableFuture.addCallback(new ListenableFutureCallback<String>() {
@Override
public void onSuccess(String result) {
@ -99,14 +108,18 @@ public class SettableListenableFutureTests { @@ -99,14 +108,18 @@ public class SettableListenableFutureTests {
fail("Expected onSuccess() to be called");
}
});
settableListenableFuture.set(string);
assertThat(callbackHolder[0], equalTo(string));
assertFalse(settableListenableFuture.isCancelled());
assertTrue(settableListenableFuture.isDone());
}
@Test
public void setValueTriggersCallbackOnlyOnce() {
String string = "hello";
final String[] callbackHolder = new String[1];
settableListenableFuture.addCallback(new ListenableFutureCallback<String>() {
@Override
public void onSuccess(String result) {
@ -117,15 +130,19 @@ public class SettableListenableFutureTests { @@ -117,15 +130,19 @@ public class SettableListenableFutureTests {
fail("Expected onSuccess() to be called");
}
});
settableListenableFuture.set(string);
assertFalse(settableListenableFuture.set("good bye"));
assertThat(callbackHolder[0], equalTo(string));
assertFalse(settableListenableFuture.isCancelled());
assertTrue(settableListenableFuture.isDone());
}
@Test
public void setExceptionTriggersCallback() {
Throwable exception = new RuntimeException();
final Throwable[] callbackHolder = new Throwable[1];
settableListenableFuture.addCallback(new ListenableFutureCallback<String>() {
@Override
public void onSuccess(String result) {
@ -136,14 +153,18 @@ public class SettableListenableFutureTests { @@ -136,14 +153,18 @@ public class SettableListenableFutureTests {
callbackHolder[0] = ex;
}
});
settableListenableFuture.setException(exception);
assertThat(callbackHolder[0], equalTo(exception));
assertFalse(settableListenableFuture.isCancelled());
assertTrue(settableListenableFuture.isDone());
}
@Test
public void setExceptionTriggersCallbackOnlyOnce() {
Throwable exception = new RuntimeException();
final Throwable[] callbackHolder = new Throwable[1];
settableListenableFuture.addCallback(new ListenableFutureCallback<String>() {
@Override
public void onSuccess(String result) {
@ -154,20 +175,26 @@ public class SettableListenableFutureTests { @@ -154,20 +175,26 @@ public class SettableListenableFutureTests {
callbackHolder[0] = ex;
}
});
settableListenableFuture.setException(exception);
assertFalse(settableListenableFuture.setException(new IllegalArgumentException()));
assertThat(callbackHolder[0], equalTo(exception));
assertFalse(settableListenableFuture.isCancelled());
assertTrue(settableListenableFuture.isDone());
}
@Test
public void nullIsAcceptedAsValueToSet() throws ExecutionException, InterruptedException {
settableListenableFuture.set(null);
assertNull(settableListenableFuture.get());
assertFalse(settableListenableFuture.isCancelled());
assertTrue(settableListenableFuture.isDone());
}
@Test
public void getWaitsForCompletion() throws ExecutionException, InterruptedException {
final String string = "hello";
new Thread(new Runnable() {
@Override
public void run() {
@ -180,8 +207,11 @@ public class SettableListenableFutureTests { @@ -180,8 +207,11 @@ public class SettableListenableFutureTests {
}
}
}).start();
String value = settableListenableFuture.get();
assertThat(value, equalTo(string));
assertFalse(settableListenableFuture.isCancelled());
assertTrue(settableListenableFuture.isDone());
}
@Test
@ -198,6 +228,7 @@ public class SettableListenableFutureTests { @@ -198,6 +228,7 @@ public class SettableListenableFutureTests {
@Test
public void getWithTimeoutWaitsForCompletion() throws ExecutionException, InterruptedException, TimeoutException {
final String string = "hello";
new Thread(new Runnable() {
@Override
public void run() {
@ -210,65 +241,74 @@ public class SettableListenableFutureTests { @@ -210,65 +241,74 @@ public class SettableListenableFutureTests {
}
}
}).start();
String value = settableListenableFuture.get(100L, TimeUnit.MILLISECONDS);
String value = settableListenableFuture.get(500L, TimeUnit.MILLISECONDS);
assertThat(value, equalTo(string));
assertFalse(settableListenableFuture.isCancelled());
assertTrue(settableListenableFuture.isDone());
}
@Test
public void cancelPreventsValueFromBeingSet() {
boolean wasCancelled = settableListenableFuture.cancel(true);
assertTrue(wasCancelled);
boolean wasSet = settableListenableFuture.set("hello");
assertFalse(wasSet);
assertTrue(settableListenableFuture.cancel(true));
assertFalse(settableListenableFuture.set("hello"));
assertTrue(settableListenableFuture.isCancelled());
assertTrue(settableListenableFuture.isDone());
}
@Test
public void cancelSetsFutureToDone() {
settableListenableFuture.cancel(true);
assertTrue(settableListenableFuture.isCancelled());
assertTrue(settableListenableFuture.isDone());
}
@Test
public void cancelWithMayInterruptIfRunningTrueCallsOverridenMethod() {
InterruptableSettableListenableFuture tested = new InterruptableSettableListenableFuture();
tested.cancel(true);
assertTrue(tested.calledInterruptTask());
public void cancelWithMayInterruptIfRunningTrueCallsOverriddenMethod() {
InterruptibleSettableListenableFuture interruptibleFuture = new InterruptibleSettableListenableFuture();
assertTrue(interruptibleFuture.cancel(true));
assertTrue(interruptibleFuture.calledInterruptTask());
assertTrue(interruptibleFuture.isCancelled());
assertTrue(interruptibleFuture.isDone());
}
@Test
public void cancelWithMayInterruptIfRunningFalseDoesNotCallOverridenMethod() {
InterruptableSettableListenableFuture tested = new InterruptableSettableListenableFuture();
tested.cancel(false);
assertFalse(tested.calledInterruptTask());
public void cancelWithMayInterruptIfRunningFalseDoesNotCallOverriddenMethod() {
InterruptibleSettableListenableFuture interruptibleFuture = new InterruptibleSettableListenableFuture();
assertTrue(interruptibleFuture.cancel(false));
assertFalse(interruptibleFuture.calledInterruptTask());
assertTrue(interruptibleFuture.isCancelled());
assertTrue(interruptibleFuture.isDone());
}
@Test
public void setPreventsCancel() {
boolean wasSet = settableListenableFuture.set("hello");
assertTrue(wasSet);
boolean wasCancelled = settableListenableFuture.cancel(true);
assertFalse(wasCancelled);
assertTrue(settableListenableFuture.set("hello"));
assertFalse(settableListenableFuture.cancel(true));
assertFalse(settableListenableFuture.isCancelled());
assertTrue(settableListenableFuture.isDone());
}
@Test
public void cancelPreventsExceptionFromBeingSet() {
boolean wasCancelled = settableListenableFuture.cancel(true);
assertTrue(wasCancelled);
boolean wasSet = settableListenableFuture.setException(new RuntimeException());
assertFalse(wasSet);
assertTrue(settableListenableFuture.cancel(true));
assertFalse(settableListenableFuture.setException(new RuntimeException()));
assertTrue(settableListenableFuture.isCancelled());
assertTrue(settableListenableFuture.isDone());
}
@Test
public void setExceptionPreventsCancel() {
boolean wasSet = settableListenableFuture.setException(new RuntimeException());
assertTrue(wasSet);
boolean wasCancelled = settableListenableFuture.cancel(true);
assertFalse(wasCancelled);
assertTrue(settableListenableFuture.setException(new RuntimeException()));
assertFalse(settableListenableFuture.cancel(true));
assertFalse(settableListenableFuture.isCancelled());
assertTrue(settableListenableFuture.isDone());
}
@Test
public void cancelStateThrowsExceptionWhenCallingGet() throws ExecutionException, InterruptedException {
settableListenableFuture.cancel(true);
try {
settableListenableFuture.get();
fail("Expected CancellationException");
@ -276,6 +316,9 @@ public class SettableListenableFutureTests { @@ -276,6 +316,9 @@ public class SettableListenableFutureTests {
catch (CancellationException ex) {
// expected
}
assertTrue(settableListenableFuture.isCancelled());
assertTrue(settableListenableFuture.isDone());
}
@Test
@ -292,16 +335,21 @@ public class SettableListenableFutureTests { @@ -292,16 +335,21 @@ public class SettableListenableFutureTests {
}
}
}).start();
try {
settableListenableFuture.get(100L, TimeUnit.MILLISECONDS);
settableListenableFuture.get(500L, TimeUnit.MILLISECONDS);
fail("Expected CancellationException");
}
catch (CancellationException ex) {
// expected
}
assertTrue(settableListenableFuture.isCancelled());
assertTrue(settableListenableFuture.isDone());
}
@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void cancelDoesNotNotifyCallbacksOnSet() {
ListenableFutureCallback callback = mock(ListenableFutureCallback.class);
settableListenableFuture.addCallback(callback);
@ -312,9 +360,13 @@ public class SettableListenableFutureTests { @@ -312,9 +360,13 @@ public class SettableListenableFutureTests {
settableListenableFuture.set("hello");
verifyNoMoreInteractions(callback);
assertTrue(settableListenableFuture.isCancelled());
assertTrue(settableListenableFuture.isDone());
}
@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void cancelDoesNotNotifyCallbacksOnSetException() {
ListenableFutureCallback callback = mock(ListenableFutureCallback.class);
settableListenableFuture.addCallback(callback);
@ -325,10 +377,13 @@ public class SettableListenableFutureTests { @@ -325,10 +377,13 @@ public class SettableListenableFutureTests {
settableListenableFuture.setException(new RuntimeException());
verifyNoMoreInteractions(callback);
assertTrue(settableListenableFuture.isCancelled());
assertTrue(settableListenableFuture.isDone());
}
private static class InterruptableSettableListenableFuture extends SettableListenableFuture<String> {
private static class InterruptibleSettableListenableFuture extends SettableListenableFuture<String> {
private boolean interrupted = false;

Loading…
Cancel
Save