From d0aa158aef7647419b0df72c98023c32dea1c050 Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Mon, 2 Sep 2013 15:23:57 +0200 Subject: [PATCH] Added ListenableFuture interface Added extension to Future with capabilities for registering callbacks when the future is complete. - Added ListenableFuture, ListenableFutureCallback, ListenableFutureCallbackRegistry, and ListenableFutureTask. - Using ListenableFuture in AsyncRestOperations/AsyncRestTemplate. - Added AsyncListenableTaskExecutor, implemented in SimpleAsyncTaskExecutor. - Added FutureAdapter and ListenableFutureAdapter. --- .../task/AsyncListenableTaskExecutor.java | 52 ++++ .../core/task/SimpleAsyncTaskExecutor.java | 18 +- .../util/concurrent/FutureAdapter.java | 122 ++++++++ .../util/concurrent/ListenableFuture.java | 41 +++ .../concurrent/ListenableFutureAdapter.java | 69 +++++ .../concurrent/ListenableFutureCallback.java | 40 +++ .../ListenableFutureCallbackRegistry.java | 99 +++++++ .../util/concurrent/ListenableFutureTask.java | 83 ++++++ .../util/concurrent/package-info.java | 7 + .../util/concurrent/FutureAdapterTests.java | 88 ++++++ .../concurrent/ListenableFutureTaskTests.java | 82 ++++++ .../AbstractAsyncClientHttpRequest.java | 14 +- ...stractBufferingAsyncClientHttpRequest.java | 11 +- .../http/client/AsyncClientHttpRequest.java | 4 +- .../HttpComponentsAsyncClientHttpRequest.java | 65 +++-- ...SimpleBufferingAsyncClientHttpRequest.java | 14 +- .../SimpleClientHttpRequestFactory.java | 8 +- ...SimpleStreamingAsyncClientHttpRequest.java | 12 +- .../web/client/AsyncRestOperations.java | 61 ++-- .../web/client/AsyncRestTemplate.java | 235 ++++++++-------- ...stractAsyncHttpRequestFactoryTestCase.java | 43 +++ ...redSimpleAsyncHttpRequestFactoryTests.java | 4 +- .../AsyncRestTemplateIntegrationTests.java | 263 +++++++++++++++++- 23 files changed, 1216 insertions(+), 219 deletions(-) create mode 100644 spring-core/src/main/java/org/springframework/core/task/AsyncListenableTaskExecutor.java create mode 100644 spring-core/src/main/java/org/springframework/util/concurrent/FutureAdapter.java create mode 100644 spring-core/src/main/java/org/springframework/util/concurrent/ListenableFuture.java create mode 100644 spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureAdapter.java create mode 100644 spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallback.java create mode 100644 spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallbackRegistry.java create mode 100644 spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureTask.java create mode 100644 spring-core/src/main/java/org/springframework/util/concurrent/package-info.java create mode 100644 spring-core/src/test/java/org/springframework/util/concurrent/FutureAdapterTests.java create mode 100644 spring-core/src/test/java/org/springframework/util/concurrent/ListenableFutureTaskTests.java diff --git a/spring-core/src/main/java/org/springframework/core/task/AsyncListenableTaskExecutor.java b/spring-core/src/main/java/org/springframework/core/task/AsyncListenableTaskExecutor.java new file mode 100644 index 00000000000..6adaad38057 --- /dev/null +++ b/spring-core/src/main/java/org/springframework/core/task/AsyncListenableTaskExecutor.java @@ -0,0 +1,52 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.task; + +import java.util.concurrent.Callable; + +import org.springframework.util.concurrent.ListenableFuture; + +/** + * Extension of the {@link AsyncTaskExecutor} interface, adding the capability to submit + * tasks for {@link ListenableFuture}s. + * + * @author Arjen Poutsma + * @since 4.0 + * @see ListenableFuture + */ +public interface AsyncListenableTaskExecutor extends AsyncTaskExecutor { + + /** + * Submit a {@code Runnable} task for execution, receiving a {@code ListenableFuture} + * representing that task. The Future will return a {@code null} result upon completion. + * @param task the {@code Runnable} to execute (never {@code null}) + * @return a {@code ListenableFuture} representing pending completion of the task + * @throws TaskRejectedException if the given task was not accepted + */ + ListenableFuture submitListenable(Runnable task); + + /** + * Submit a {@code Callable} task for execution, receiving a {@code ListenableFuture} + * representing that task. The Future will return the Callable's result upon + * completion. + * @param task the {@code Callable} to execute (never {@code null}) + * @return a {@code ListenableFuture} representing pending completion of the task + * @throws TaskRejectedException if the given task was not accepted + */ + ListenableFuture submitListenable(Callable task); + +} diff --git a/spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java b/spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java index 21622a29cc8..05e759ab874 100644 --- a/spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java +++ b/spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java @@ -25,6 +25,8 @@ import java.util.concurrent.ThreadFactory; import org.springframework.util.Assert; import org.springframework.util.ConcurrencyThrottleSupport; import org.springframework.util.CustomizableThreadCreator; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureTask; /** * {@link TaskExecutor} implementation that fires up a new Thread for each task, @@ -45,7 +47,7 @@ import org.springframework.util.CustomizableThreadCreator; * @see org.springframework.scheduling.commonj.WorkManagerTaskExecutor */ @SuppressWarnings("serial") -public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator implements AsyncTaskExecutor, Serializable { +public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator implements AsyncListenableTaskExecutor, Serializable { /** * Permit any number of concurrent invocations: that is, don't throttle concurrency. @@ -184,6 +186,20 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator implement return future; } + @Override + public ListenableFuture submitListenable(Runnable task) { + ListenableFutureTask future = new ListenableFutureTask(task, null); + execute(future, TIMEOUT_INDEFINITE); + return future; + } + + @Override + public ListenableFuture submitListenable(Callable task) { + ListenableFutureTask future = new ListenableFutureTask(task); + execute(future, TIMEOUT_INDEFINITE); + return future; + } + /** * Template method for the actual execution of a task. *

The default implementation creates a new Thread and starts it. diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/FutureAdapter.java b/spring-core/src/main/java/org/springframework/util/concurrent/FutureAdapter.java new file mode 100644 index 00000000000..fe0e7f03dd3 --- /dev/null +++ b/spring-core/src/main/java/org/springframework/util/concurrent/FutureAdapter.java @@ -0,0 +1,122 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.util.concurrent; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.springframework.util.Assert; + +/** + * Abstract class that adapts a {@link Future} parameterized over S into a {@code + * Future} parameterized over T. All methods are delegated to the adaptee, where {@link + * #get()} and {@link #get(long, TimeUnit)} call {@@link #adapt(Object)} on the adaptee's + * result. + * + * @param the type of this {@code Future} + * @param the type of the adaptee's {@code Future} + * @author Arjen Poutsma + * @since 4.0 + */ +public abstract class FutureAdapter implements Future { + + private final Future adaptee; + + private Object result = null; + + private State state = State.NEW; + + private final Object mutex = new Object(); + + /** + * Constructs a new {@code FutureAdapter} with the given adaptee. + * @param adaptee the future to delegate to + */ + protected FutureAdapter(Future adaptee) { + Assert.notNull(adaptee, "'delegate' must not be null"); + this.adaptee = adaptee; + } + + /** + * Returns the adaptee. + */ + protected Future getAdaptee() { + return adaptee; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return adaptee.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return adaptee.isCancelled(); + } + + @Override + public boolean isDone() { + return adaptee.isDone(); + } + + @Override + public T get() throws InterruptedException, ExecutionException { + return adaptInternal(adaptee.get()); + } + + @Override + public T get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return adaptInternal(adaptee.get(timeout, unit)); + } + + @SuppressWarnings("unchecked") + final T adaptInternal(S adapteeResult) throws ExecutionException { + synchronized (mutex) { + switch (state) { + case SUCCESS: + return (T) result; + case FAILURE: + throw (ExecutionException) result; + case NEW: + try { + T adapted = adapt(adapteeResult); + result = adapted; + state = State.SUCCESS; + return adapted; + } catch (ExecutionException ex) { + result = ex; + state = State.FAILURE; + throw ex; + } + default: + throw new IllegalStateException(); + } + } + } + + /** + * Adapts the given adaptee's result into T. + * @return the adapted result + */ + protected abstract T adapt(S adapteeResult) throws ExecutionException; + + private enum State {NEW, SUCCESS, FAILURE} + +} diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFuture.java b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFuture.java new file mode 100644 index 00000000000..5e5ae005621 --- /dev/null +++ b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFuture.java @@ -0,0 +1,41 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.util.concurrent; + +import java.util.concurrent.Future; + +/** + * Extends the {@link Future} interface with the capability to accept completion + * callbacks. If the future has already completed when the callback is added, the + * callback will be triggered immediately. + *

Inspired by {@link com.google.common.util.concurrent.ListenableFuture}. + + * @author Arjen Poutsma + * @since 4.0 + */ +public interface ListenableFuture extends Future { + + /** + * Registers the given callback to this {@code ListenableFuture}. The callback will + * be triggered when this {@code Future} is complete or, if it is already complete, + * immediately. + * + * @param callback the callback to register + */ + void addCallback(ListenableFutureCallback callback); + +} diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureAdapter.java b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureAdapter.java new file mode 100644 index 00000000000..8ac2948fa6b --- /dev/null +++ b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureAdapter.java @@ -0,0 +1,69 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.util.concurrent; + +import java.util.concurrent.ExecutionException; + +/** + * Abstract class that adapts a {@link ListenableFuture} parameterized over S into a + * {@code ListenableFuture} parameterized over T. All methods are delegated to the + * adaptee, where {@link #get()}, {@link #get(long, java.util.concurrent.TimeUnit)}, and + * {@link ListenableFutureCallback#onSuccess(Object)} call {@@link #adapt(Object)} on the + * adaptee's result. + * + * @param the type of this {@code Future} + * @param the type of the adaptee's {@code Future} + * @author Arjen Poutsma + * @since 4.0 + */ +public abstract class ListenableFutureAdapter extends FutureAdapter + implements ListenableFuture { + + /** + * Constructs a new {@code ListenableFutureAdapter} with the given adaptee. + * @param adaptee the future to adaptee to + */ + protected ListenableFutureAdapter(ListenableFuture adaptee) { + super(adaptee); + } + + + @Override + public void addCallback(final ListenableFutureCallback callback) { + ListenableFuture listenableAdaptee = (ListenableFuture) getAdaptee(); + listenableAdaptee.addCallback(new ListenableFutureCallback() { + @Override + public void onSuccess(S result) { + try { + callback.onSuccess(adaptInternal(result)); + } + catch (ExecutionException ex) { + Throwable cause = ex.getCause(); + onFailure(cause != null ? cause : ex); + } + catch (Throwable t) { + onFailure(t); + } + } + + @Override + public void onFailure(Throwable t) { + callback.onFailure(t); + } + }); + } +} diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallback.java b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallback.java new file mode 100644 index 00000000000..d112f9fa142 --- /dev/null +++ b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallback.java @@ -0,0 +1,40 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.util.concurrent; + +/** + * Defines the contract for callbacks that accept the result of a + * {@link ListenableFuture}. + * + * @author Arjen Poutsma + * @since 4.0 + */ +public interface ListenableFutureCallback { + + /** + * Called when the {@link ListenableFuture} successfully completes. + * @param result the result + */ + void onSuccess(T result); + + /** + * Called when the {@link ListenableFuture} fails to complete. + * @param t the exception that triggered the failure + */ + void onFailure(Throwable t); + +} diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallbackRegistry.java b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallbackRegistry.java new file mode 100644 index 00000000000..938999e2ce3 --- /dev/null +++ b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallbackRegistry.java @@ -0,0 +1,99 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.util.concurrent; + +import java.util.LinkedList; +import java.util.Queue; + +import org.springframework.util.Assert; + +/** + * Registry for {@link ListenableFutureCallback} instances. + *

Inspired by {@link com.google.common.util.concurrent.ExecutionList}. + * @author Arjen Poutsma + * @since 4.0 + */ +public class ListenableFutureCallbackRegistry { + + private final Queue> callbacks = + new LinkedList>(); + + private State state = State.NEW; + + private Object result = null; + + private final Object mutex = new Object(); + + + /** + * Adds the given callback to this registry. + * @param callback the callback to add + */ + @SuppressWarnings("unchecked") + public void addCallback(ListenableFutureCallback callback) { + Assert.notNull(callback, "'callback' must not be null"); + + synchronized (mutex) { + switch (state) { + case NEW: + callbacks.add(callback); + break; + case SUCCESS: + callback.onSuccess((T)result); + break; + case FAILURE: + callback.onFailure((Throwable) result); + break; + } + } + } + + /** + * Triggers a {@link ListenableFutureCallback#onSuccess(Object)} call on all added + * callbacks with the given result + * @param result the result to trigger the callbacks with + */ + public void success(T result) { + synchronized (mutex) { + state = State.SUCCESS; + this.result = result; + + while (!callbacks.isEmpty()) { + callbacks.poll().onSuccess(result); + } + } + } + + /** + * Triggers a {@link ListenableFutureCallback#onFailure(Throwable)} call on all added + * callbacks with the given {@code Throwable}. + * @param t the exception to trigger the callbacks with + */ + public void failure(Throwable t) { + synchronized (mutex) { + state = State.FAILURE; + this.result = t; + + while (!callbacks.isEmpty()) { + callbacks.poll().onFailure(t); + } + } + } + + private enum State {NEW, SUCCESS, FAILURE} + +} diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureTask.java b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureTask.java new file mode 100644 index 00000000000..79b5e334b57 --- /dev/null +++ b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureTask.java @@ -0,0 +1,83 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.util.concurrent; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.FutureTask; + +/** + * Extension of {@link FutureTask} that implements {@link ListenableFuture}. + * + * @author Arjen Poutsma + * @since 4.0 + */ +public class ListenableFutureTask extends FutureTask + implements ListenableFuture { + + private final ListenableFutureCallbackRegistry callbacks = + new ListenableFutureCallbackRegistry(); + + /** + * Creates a new {@code ListenableFutureTask} that will, upon running, execute the given + * {@link Callable}. + * @param callable the callable task + */ + public ListenableFutureTask(Callable callable) { + super(callable); + } + + /** + * Creates a {@code ListenableFutureTask} that will, upon running, execute the given + * {@link Runnable}, and arrange that {@link #get()} will return the given result on + * successful completion. + * @param runnable the runnable task + * @param result the result to return on successful completion + */ + public ListenableFutureTask(Runnable runnable, T result) { + super(runnable, result); + } + + @Override + public void addCallback(ListenableFutureCallback callback) { + callbacks.addCallback(callback); + } + + @Override + protected final void done() { + Throwable cause; + try { + T result = get(); + callbacks.success(result); + return; + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + return; + } + catch (ExecutionException ex) { + cause = ex.getCause(); + if (cause == null) { + cause = ex; + } + } + catch (Throwable t) { + cause = t; + } + callbacks.failure(cause); + } +} diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/package-info.java b/spring-core/src/main/java/org/springframework/util/concurrent/package-info.java new file mode 100644 index 00000000000..a03788bd271 --- /dev/null +++ b/spring-core/src/main/java/org/springframework/util/concurrent/package-info.java @@ -0,0 +1,7 @@ + +/** + * + * Useful generic {@code java.util.concurrent.Future} extension. + */ +package org.springframework.util.concurrent; + diff --git a/spring-core/src/test/java/org/springframework/util/concurrent/FutureAdapterTests.java b/spring-core/src/test/java/org/springframework/util/concurrent/FutureAdapterTests.java new file mode 100644 index 00000000000..f8b8913348c --- /dev/null +++ b/spring-core/src/test/java/org/springframework/util/concurrent/FutureAdapterTests.java @@ -0,0 +1,88 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.util.concurrent; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import org.junit.Before; +import org.junit.Test; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.mock; + +/** + * @author Arjen Poutsma + */ +public class FutureAdapterTests { + + private FutureAdapter adapter; + + private Future adaptee; + + + @Before + @SuppressWarnings("unchecked") + public void setUp() { + adaptee = mock(Future.class); + adapter = new FutureAdapter(adaptee) { + @Override + protected String adapt(Integer adapteeResult) throws ExecutionException { + return adapteeResult.toString(); + } + }; + } + + @Test + public void cancel() throws Exception { + given(adaptee.cancel(true)).willReturn(true); + boolean result = adapter.cancel(true); + assertTrue(result); + } + + @Test + public void isCancelled() { + given(adaptee.isCancelled()).willReturn(true); + boolean result = adapter.isCancelled(); + assertTrue(result); + } + + @Test + public void isDone() { + given(adaptee.isDone()).willReturn(true); + boolean result = adapter.isDone(); + assertTrue(result); + } + + @Test + public void get() throws Exception { + given(adaptee.get()).willReturn(42); + String result = adapter.get(); + assertEquals("42", result); + } + + @Test + public void getTimeOut() throws Exception { + given(adaptee.get(1, TimeUnit.SECONDS)).willReturn(42); + String result = adapter.get(1, TimeUnit.SECONDS); + assertEquals("42", result); + } + + +} diff --git a/spring-core/src/test/java/org/springframework/util/concurrent/ListenableFutureTaskTests.java b/spring-core/src/test/java/org/springframework/util/concurrent/ListenableFutureTaskTests.java new file mode 100644 index 00000000000..84d351e1817 --- /dev/null +++ b/spring-core/src/test/java/org/springframework/util/concurrent/ListenableFutureTaskTests.java @@ -0,0 +1,82 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.util.concurrent; + +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import org.junit.Test; + +/** + * @author Arjen Poutsma + */ +public class ListenableFutureTaskTests { + + @Test + public void success() throws ExecutionException, InterruptedException { + final String s = "Hello World"; + Callable callable = new Callable() { + @Override + public String call() throws Exception { + return s; + } + }; + ListenableFutureTask task = new ListenableFutureTask(callable); + task.addCallback(new ListenableFutureCallback() { + @Override + public void onSuccess(String result) { + assertEquals(s, result); + } + + @Override + public void onFailure(Throwable t) { + fail(t.getMessage()); + } + }); + task.run(); + } + + @Test + public void failure() throws ExecutionException, InterruptedException { + final String s = "Hello World"; + Callable callable = new Callable() { + @Override + public String call() throws Exception { + throw new IOException(s); + } + }; + ListenableFutureTask task = new ListenableFutureTask(callable); + task.addCallback(new ListenableFutureCallback() { + @Override + public void onSuccess(String result) { + fail("onSuccess not expected"); + } + + @Override + public void onFailure(Throwable t) { + assertEquals(s, t.getMessage()); + } + }); + task.run(); + } + + + +} diff --git a/spring-web/src/main/java/org/springframework/http/client/AbstractAsyncClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/AbstractAsyncClientHttpRequest.java index 9292bcf2804..cc9112034a1 100644 --- a/spring-web/src/main/java/org/springframework/http/client/AbstractAsyncClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/AbstractAsyncClientHttpRequest.java @@ -18,10 +18,10 @@ package org.springframework.http.client; import java.io.IOException; import java.io.OutputStream; -import java.util.concurrent.Future; import org.springframework.http.HttpHeaders; import org.springframework.util.Assert; +import org.springframework.util.concurrent.ListenableFuture; /** * Abstract base for {@link AsyncClientHttpRequest} that makes sure that headers and body @@ -49,15 +49,15 @@ abstract class AbstractAsyncClientHttpRequest implements AsyncClientHttpRequest } @Override - public Future executeAsync() throws IOException { + public ListenableFuture executeAsync() throws IOException { assertNotExecuted(); - Future result = executeInternal(this.headers); + ListenableFuture result = executeInternal(this.headers); this.executed = true; return result; } /** - * Asserts that this request has not been {@linkplain #execute() executed} yet. + * Asserts that this request has not been {@linkplain #executeAsync() executed} yet. * * @throws IllegalStateException if this request has been executed */ @@ -74,10 +74,12 @@ abstract class AbstractAsyncClientHttpRequest implements AsyncClientHttpRequest protected abstract OutputStream getBodyInternal(HttpHeaders headers) throws IOException; /** - * Abstract template method that writes the given headers and content to the HTTP request. + * Abstract template method that writes the given headers and content to the HTTP + * request. * @param headers the HTTP headers * @return the response object for the executed request */ - protected abstract Future executeInternal(HttpHeaders headers) throws IOException; + protected abstract ListenableFuture executeInternal( + HttpHeaders headers) throws IOException; } diff --git a/spring-web/src/main/java/org/springframework/http/client/AbstractBufferingAsyncClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/AbstractBufferingAsyncClientHttpRequest.java index efa290f5360..76658da4021 100644 --- a/spring-web/src/main/java/org/springframework/http/client/AbstractBufferingAsyncClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/AbstractBufferingAsyncClientHttpRequest.java @@ -19,9 +19,9 @@ package org.springframework.http.client; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; -import java.util.concurrent.Future; import org.springframework.http.HttpHeaders; +import org.springframework.util.concurrent.ListenableFuture; /** * Abstract base for {@link org.springframework.http.client.ClientHttpRequest} that buffers output in a byte array before sending it over the wire. @@ -40,12 +40,13 @@ abstract class AbstractBufferingAsyncClientHttpRequest } @Override - protected Future executeInternal(HttpHeaders headers) throws IOException { + protected ListenableFuture executeInternal(HttpHeaders headers) + throws IOException { byte[] bytes = this.bufferedOutput.toByteArray(); if (headers.getContentLength() == -1) { headers.setContentLength(bytes.length); } - Future result = executeInternal(headers, bytes); + ListenableFuture result = executeInternal(headers, bytes); this.bufferedOutput = null; return result; } @@ -58,8 +59,8 @@ abstract class AbstractBufferingAsyncClientHttpRequest * @param bufferedOutput the body content * @return the response object for the executed request */ - protected abstract Future executeInternal(HttpHeaders headers, - byte[] bufferedOutput) throws IOException; + protected abstract ListenableFuture executeInternal( + HttpHeaders headers, byte[] bufferedOutput) throws IOException; } diff --git a/spring-web/src/main/java/org/springframework/http/client/AsyncClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/AsyncClientHttpRequest.java index 55ec53a39f9..06cf17e70cf 100644 --- a/spring-web/src/main/java/org/springframework/http/client/AsyncClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/AsyncClientHttpRequest.java @@ -17,10 +17,10 @@ package org.springframework.http.client; import java.io.IOException; -import java.util.concurrent.Future; import org.springframework.http.HttpOutputMessage; import org.springframework.http.HttpRequest; +import org.springframework.util.concurrent.ListenableFuture; /** * Represents a client-side asynchronous HTTP request. Created via an implementation of @@ -41,7 +41,7 @@ public interface AsyncClientHttpRequest extends HttpRequest, HttpOutputMessage { * @return the future response result of the execution * @throws java.io.IOException in case of I/O errors */ - Future executeAsync() throws IOException; + ListenableFuture executeAsync() throws IOException; } diff --git a/spring-web/src/main/java/org/springframework/http/client/HttpComponentsAsyncClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/HttpComponentsAsyncClientHttpRequest.java index ecbc88c8375..0344c0bb728 100644 --- a/spring-web/src/main/java/org/springframework/http/client/HttpComponentsAsyncClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/HttpComponentsAsyncClientHttpRequest.java @@ -18,21 +18,23 @@ package org.springframework.http.client; import java.io.IOException; import java.net.URI; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.apache.http.HttpEntity; import org.apache.http.HttpEntityEnclosingRequest; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.concurrent.FutureCallback; import org.apache.http.nio.client.HttpAsyncClient; import org.apache.http.nio.entity.NByteArrayEntity; import org.apache.http.protocol.HttpContext; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; +import org.springframework.util.concurrent.FutureAdapter; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureCallback; +import org.springframework.util.concurrent.ListenableFutureCallbackRegistry; /** * {@link ClientHttpRequest} implementation that uses Apache HttpComponents HttpClient to @@ -72,7 +74,7 @@ final class HttpComponentsAsyncClientHttpRequest extends AbstractBufferingAsyncC } @Override - protected Future executeInternal(HttpHeaders headers, + protected ListenableFuture executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException { HttpComponentsClientHttpRequest.addHeaders(this.httpRequest, headers); @@ -83,50 +85,61 @@ final class HttpComponentsAsyncClientHttpRequest extends AbstractBufferingAsyncC entityEnclosingRequest.setEntity(requestEntity); } + final HttpResponseFutureCallback callback = new HttpResponseFutureCallback(); + final Future futureResponse = - this.httpClient.execute(this.httpRequest, this.httpContext, null); - return new ClientHttpResponseFuture(futureResponse); + this.httpClient.execute(this.httpRequest, this.httpContext, callback); + return new ClientHttpResponseFuture(futureResponse, callback); } + private static class HttpResponseFutureCallback implements FutureCallback { - private static class ClientHttpResponseFuture implements Future { - - private final Future futureResponse; - + private final ListenableFutureCallbackRegistry callbacks = + new ListenableFutureCallbackRegistry(); - public ClientHttpResponseFuture(Future futureResponse) { - this.futureResponse = futureResponse; + public void addCallback( + ListenableFutureCallback callback) { + callbacks.addCallback(callback); } @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return futureResponse.cancel(mayInterruptIfRunning); + public void completed(HttpResponse result) { + callbacks.success(new HttpComponentsAsyncClientHttpResponse(result)); } @Override - public boolean isCancelled() { - return futureResponse.isCancelled(); + public void failed(Exception ex) { + callbacks.failure(ex); } @Override - public boolean isDone() { - return futureResponse.isDone(); + public void cancelled() { } - @Override - public ClientHttpResponse get() - throws InterruptedException, ExecutionException { - HttpResponse response = futureResponse.get(); - return new HttpComponentsAsyncClientHttpResponse(response); + } + + + private static class ClientHttpResponseFuture extends FutureAdapter + implements ListenableFuture { + + private final HttpResponseFutureCallback callback; + + private ClientHttpResponseFuture(Future futureResponse, + HttpResponseFutureCallback callback) { + super(futureResponse); + this.callback = callback; } @Override - public ClientHttpResponse get(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - HttpResponse response = futureResponse.get(timeout, unit); + protected ClientHttpResponse adapt(HttpResponse response) { return new HttpComponentsAsyncClientHttpResponse(response); } + @Override + public void addCallback( + ListenableFutureCallback callback) { + this.callback.addCallback(callback); + } } diff --git a/spring-web/src/main/java/org/springframework/http/client/SimpleBufferingAsyncClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/SimpleBufferingAsyncClientHttpRequest.java index 8ceff285cf6..f38221ca5b1 100644 --- a/spring-web/src/main/java/org/springframework/http/client/SimpleBufferingAsyncClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/SimpleBufferingAsyncClientHttpRequest.java @@ -23,12 +23,12 @@ import java.net.URISyntaxException; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.Future; -import org.springframework.core.task.AsyncTaskExecutor; +import org.springframework.core.task.AsyncListenableTaskExecutor; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.util.FileCopyUtils; +import org.springframework.util.concurrent.ListenableFuture; /** * {@link org.springframework.http.client.ClientHttpRequest} implementation that uses @@ -45,10 +45,10 @@ final class SimpleBufferingAsyncClientHttpRequest extends AbstractBufferingAsync private final boolean outputStreaming; - private final AsyncTaskExecutor taskExecutor; + private final AsyncListenableTaskExecutor taskExecutor; SimpleBufferingAsyncClientHttpRequest(HttpURLConnection connection, - boolean outputStreaming, AsyncTaskExecutor taskExecutor) { + boolean outputStreaming, AsyncListenableTaskExecutor taskExecutor) { this.connection = connection; this.outputStreaming = outputStreaming; this.taskExecutor = taskExecutor; @@ -70,9 +70,9 @@ final class SimpleBufferingAsyncClientHttpRequest extends AbstractBufferingAsync } @Override - protected Future executeInternal(final HttpHeaders headers, - final byte[] bufferedOutput) throws IOException { - return taskExecutor.submit(new Callable() { + protected ListenableFuture executeInternal( + final HttpHeaders headers, final byte[] bufferedOutput) throws IOException { + return taskExecutor.submitListenable(new Callable() { @Override public ClientHttpResponse call() throws Exception { for (Map.Entry> entry : headers.entrySet()) { diff --git a/spring-web/src/main/java/org/springframework/http/client/SimpleClientHttpRequestFactory.java b/spring-web/src/main/java/org/springframework/http/client/SimpleClientHttpRequestFactory.java index 006886a2f00..ff67b6e7fab 100644 --- a/spring-web/src/main/java/org/springframework/http/client/SimpleClientHttpRequestFactory.java +++ b/spring-web/src/main/java/org/springframework/http/client/SimpleClientHttpRequestFactory.java @@ -23,7 +23,7 @@ import java.net.URI; import java.net.URL; import java.net.URLConnection; -import org.springframework.core.task.AsyncTaskExecutor; +import org.springframework.core.task.AsyncListenableTaskExecutor; import org.springframework.http.HttpMethod; import org.springframework.util.Assert; @@ -54,7 +54,7 @@ public class SimpleClientHttpRequestFactory private boolean outputStreaming = true; - private AsyncTaskExecutor taskExecutor; + private AsyncListenableTaskExecutor taskExecutor; /** @@ -131,7 +131,7 @@ public class SimpleClientHttpRequestFactory * request}. * @param taskExecutor the task executor */ - public void setTaskExecutor(AsyncTaskExecutor taskExecutor) { + public void setTaskExecutor(AsyncListenableTaskExecutor taskExecutor) { this.taskExecutor = taskExecutor; } @@ -149,7 +149,7 @@ public class SimpleClientHttpRequestFactory /** * {@inheritDoc} - *

Setting the {@link #setTaskExecutor(AsyncTaskExecutor) taskExecutor} property + *

Setting the {@link #setTaskExecutor(org.springframework.core.task.AsyncListenableTaskExecutor) taskExecutor} property * is required before calling this method. */ @Override diff --git a/spring-web/src/main/java/org/springframework/http/client/SimpleStreamingAsyncClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/SimpleStreamingAsyncClientHttpRequest.java index dfb78502910..6681196d8c3 100644 --- a/spring-web/src/main/java/org/springframework/http/client/SimpleStreamingAsyncClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/SimpleStreamingAsyncClientHttpRequest.java @@ -24,12 +24,12 @@ import java.net.URISyntaxException; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.Future; -import org.springframework.core.task.AsyncTaskExecutor; +import org.springframework.core.task.AsyncListenableTaskExecutor; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.util.StreamUtils; +import org.springframework.util.concurrent.ListenableFuture; /** * {@link org.springframework.http.client.ClientHttpRequest} implementation that uses @@ -51,10 +51,10 @@ final class SimpleStreamingAsyncClientHttpRequest extends AbstractAsyncClientHtt private final boolean outputStreaming; - private final AsyncTaskExecutor taskExecutor; + private final AsyncListenableTaskExecutor taskExecutor; SimpleStreamingAsyncClientHttpRequest(HttpURLConnection connection, int chunkSize, - boolean outputStreaming, AsyncTaskExecutor taskExecutor) { + boolean outputStreaming, AsyncListenableTaskExecutor taskExecutor) { this.connection = connection; this.chunkSize = chunkSize; this.outputStreaming = outputStreaming; @@ -106,9 +106,9 @@ final class SimpleStreamingAsyncClientHttpRequest extends AbstractAsyncClientHtt } @Override - protected Future executeInternal(final HttpHeaders headers) + protected ListenableFuture executeInternal(final HttpHeaders headers) throws IOException { - return taskExecutor.submit(new Callable() { + return taskExecutor.submitListenable(new Callable() { @Override public ClientHttpResponse call() throws Exception { try { diff --git a/spring-web/src/main/java/org/springframework/web/client/AsyncRestOperations.java b/spring-web/src/main/java/org/springframework/web/client/AsyncRestOperations.java index 8c8af1125cc..0c4420366e2 100644 --- a/spring-web/src/main/java/org/springframework/web/client/AsyncRestOperations.java +++ b/spring-web/src/main/java/org/springframework/web/client/AsyncRestOperations.java @@ -26,6 +26,7 @@ import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.ResponseEntity; +import org.springframework.util.concurrent.ListenableFuture; /** * Interface specifying a basic set of asynchronous RESTful operations. Implemented by @@ -54,7 +55,7 @@ public interface AsyncRestOperations { * @param uriVariables the variables to expand the template * @return the entity wrapped in a {@link Future} */ - Future> getForEntity(String url, Class responseType, + ListenableFuture> getForEntity(String url, Class responseType, Object... uriVariables) throws RestClientException; /** @@ -66,7 +67,7 @@ public interface AsyncRestOperations { * @param uriVariables the map containing variables for the URI template * @return the entity wrapped in a {@link Future} */ - Future> getForEntity(String url, Class responseType, + ListenableFuture> getForEntity(String url, Class responseType, Map uriVariables) throws RestClientException; /** @@ -76,7 +77,7 @@ public interface AsyncRestOperations { * @param responseType the type of the return value * @return the entity wrapped in a {@link Future} */ - Future> getForEntity(URI url, Class responseType) + ListenableFuture> getForEntity(URI url, Class responseType) throws RestClientException; // HEAD @@ -88,7 +89,7 @@ public interface AsyncRestOperations { * @param uriVariables the variables to expand the template * @return all HTTP headers of that resource wrapped in a {@link Future} */ - Future headForHeaders(String url, Object... uriVariables) + ListenableFuture headForHeaders(String url, Object... uriVariables) throws RestClientException; /** @@ -98,7 +99,7 @@ public interface AsyncRestOperations { * @param uriVariables the map containing variables for the URI template * @return all HTTP headers of that resource wrapped in a {@link Future} */ - Future headForHeaders(String url, Map uriVariables) + ListenableFuture headForHeaders(String url, Map uriVariables) throws RestClientException; /** @@ -106,7 +107,7 @@ public interface AsyncRestOperations { * @param url the URL * @return all HTTP headers of that resource wrapped in a {@link Future} */ - Future headForHeaders(URI url) throws RestClientException; + ListenableFuture headForHeaders(URI url) throws RestClientException; // POST @@ -121,7 +122,7 @@ public interface AsyncRestOperations { * @return the value for the {@code Location} header wrapped in a {@link Future} * @see org.springframework.http.HttpEntity */ - Future postForLocation(String url, HttpEntity request, Object... uriVariables) + ListenableFuture postForLocation(String url, HttpEntity request, Object... uriVariables) throws RestClientException; /** @@ -135,7 +136,7 @@ public interface AsyncRestOperations { * @return the value for the {@code Location} header wrapped in a {@link Future} * @see org.springframework.http.HttpEntity */ - Future postForLocation(String url, HttpEntity request, Map uriVariables) + ListenableFuture postForLocation(String url, HttpEntity request, Map uriVariables) throws RestClientException; /** @@ -147,7 +148,7 @@ public interface AsyncRestOperations { * @return the value for the {@code Location} header wrapped in a {@link Future} * @see org.springframework.http.HttpEntity */ - Future postForLocation(URI url, HttpEntity request) throws RestClientException; + ListenableFuture postForLocation(URI url, HttpEntity request) throws RestClientException; /** * Create a new resource by POSTing the given object to the URI template, @@ -159,7 +160,7 @@ public interface AsyncRestOperations { * @return the entity wrapped in a {@link Future} * @see org.springframework.http.HttpEntity */ - Future> postForEntity(String url, HttpEntity request, + ListenableFuture> postForEntity(String url, HttpEntity request, Class responseType, Object... uriVariables) throws RestClientException; /** @@ -172,7 +173,7 @@ public interface AsyncRestOperations { * @return the entity wrapped in a {@link Future} * @see org.springframework.http.HttpEntity */ - Future> postForEntity(String url, HttpEntity request, + ListenableFuture> postForEntity(String url, HttpEntity request, Class responseType, Map uriVariables) throws RestClientException; @@ -184,7 +185,7 @@ public interface AsyncRestOperations { * @return the entity wrapped in a {@link Future} * @see org.springframework.http.HttpEntity */ - Future> postForEntity(URI url, HttpEntity request, + ListenableFuture> postForEntity(URI url, HttpEntity request, Class responseType) throws RestClientException; // PUT @@ -198,7 +199,7 @@ public interface AsyncRestOperations { * @param uriVariables the variables to expand the template * @see HttpEntity */ - Future put(String url, HttpEntity request, Object... uriVariables) + ListenableFuture put(String url, HttpEntity request, Object... uriVariables) throws RestClientException; /** @@ -210,7 +211,7 @@ public interface AsyncRestOperations { * @param uriVariables the variables to expand the template * @see HttpEntity */ - Future put(String url, HttpEntity request, Map uriVariables) + ListenableFuture put(String url, HttpEntity request, Map uriVariables) throws RestClientException; /** @@ -220,7 +221,7 @@ public interface AsyncRestOperations { * @param request the Object to be PUT, may be {@code null} * @see HttpEntity */ - Future put(URI url, HttpEntity request) throws RestClientException; + ListenableFuture put(URI url, HttpEntity request) throws RestClientException; // DELETE @@ -231,7 +232,7 @@ public interface AsyncRestOperations { * @param url the URL * @param uriVariables the variables to expand in the template */ - Future delete(String url, Object... uriVariables) throws RestClientException; + ListenableFuture delete(String url, Object... uriVariables) throws RestClientException; /** * Asynchronously delete the resources at the specified URI. @@ -240,7 +241,7 @@ public interface AsyncRestOperations { * @param url the URL * @param uriVariables the variables to expand in the template */ - Future delete(String url, Map uriVariables) throws RestClientException; + ListenableFuture delete(String url, Map uriVariables) throws RestClientException; /** * Asynchronously delete the resources at the specified URI. @@ -248,7 +249,7 @@ public interface AsyncRestOperations { *

The Future will return a {@code null} result upon completion. * @param url the URL */ - Future delete(URI url) throws RestClientException; + ListenableFuture delete(URI url) throws RestClientException; // OPTIONS @@ -259,7 +260,7 @@ public interface AsyncRestOperations { * @param uriVariables the variables to expand in the template * @return the value of the allow header wrapped in a {@link Future} */ - Future> optionsForAllow(String url, Object... uriVariables) + ListenableFuture> optionsForAllow(String url, Object... uriVariables) throws RestClientException; /** @@ -269,7 +270,7 @@ public interface AsyncRestOperations { * @param uriVariables the variables to expand in the template * @return the value of the allow header wrapped in a {@link Future} */ - Future> optionsForAllow(String url, Map uriVariables) + ListenableFuture> optionsForAllow(String url, Map uriVariables) throws RestClientException; /** @@ -277,7 +278,7 @@ public interface AsyncRestOperations { * @param url the URL * @return the value of the allow header wrapped in a {@link Future} */ - Future> optionsForAllow(URI url) throws RestClientException; + ListenableFuture> optionsForAllow(URI url) throws RestClientException; // exchange @@ -295,7 +296,7 @@ public interface AsyncRestOperations { * @param uriVariables the variables to expand in the template * @return the response as entity wrapped in a {@link Future} */ - Future> exchange(String url, HttpMethod method, + ListenableFuture> exchange(String url, HttpMethod method, HttpEntity requestEntity, Class responseType, Object... uriVariables) throws RestClientException; @@ -312,7 +313,7 @@ public interface AsyncRestOperations { * @param uriVariables the variables to expand in the template * @return the response as entity wrapped in a {@link Future} */ - Future> exchange(String url, HttpMethod method, + ListenableFuture> exchange(String url, HttpMethod method, HttpEntity requestEntity, Class responseType, Map uriVariables) throws RestClientException; @@ -327,7 +328,7 @@ public interface AsyncRestOperations { * @param responseType the type of the return value * @return the response as entity wrapped in a {@link Future} */ - Future> exchange(URI url, HttpMethod method, + ListenableFuture> exchange(URI url, HttpMethod method, HttpEntity requestEntity, Class responseType) throws RestClientException; @@ -348,7 +349,7 @@ public interface AsyncRestOperations { * @param uriVariables the variables to expand in the template * @return the response as entity wrapped in a {@link Future} */ - Future> exchange(String url, HttpMethod method, + ListenableFuture> exchange(String url, HttpMethod method, HttpEntity requestEntity, ParameterizedTypeReference responseType, Object... uriVariables) throws RestClientException; @@ -368,7 +369,7 @@ public interface AsyncRestOperations { * @param uriVariables the variables to expand in the template * @return the response as entity wrapped in a {@link Future} */ - Future> exchange(String url, HttpMethod method, + ListenableFuture> exchange(String url, HttpMethod method, HttpEntity requestEntity, ParameterizedTypeReference responseType, Map uriVariables) throws RestClientException; @@ -387,7 +388,7 @@ public interface AsyncRestOperations { * @param responseType the type of the return value * @return the response as entity wrapped in a {@link Future} */ - Future> exchange(URI url, HttpMethod method, + ListenableFuture> exchange(URI url, HttpMethod method, HttpEntity requestEntity, ParameterizedTypeReference responseType) throws RestClientException; @@ -406,7 +407,7 @@ public interface AsyncRestOperations { * @param uriVariables the variables to expand in the template * @return an arbitrary object, as returned by the {@link ResponseExtractor} */ - Future execute(String url, HttpMethod method, + ListenableFuture execute(String url, HttpMethod method, AsyncRequestCallback requestCallback, ResponseExtractor responseExtractor, Object... uriVariables) throws RestClientException; @@ -422,7 +423,7 @@ public interface AsyncRestOperations { * @param uriVariables the variables to expand in the template * @return an arbitrary object, as returned by the {@link ResponseExtractor} */ - Future execute(String url, HttpMethod method, + ListenableFuture execute(String url, HttpMethod method, AsyncRequestCallback requestCallback, ResponseExtractor responseExtractor, Map uriVariables) throws RestClientException; @@ -436,7 +437,7 @@ public interface AsyncRestOperations { * @param responseExtractor object that extracts the return value from the response * @return an arbitrary object, as returned by the {@link ResponseExtractor} */ - Future execute(URI url, HttpMethod method, + ListenableFuture execute(URI url, HttpMethod method, AsyncRequestCallback requestCallback, ResponseExtractor responseExtractor) throws RestClientException; diff --git a/spring-web/src/main/java/org/springframework/web/client/AsyncRestTemplate.java b/spring-web/src/main/java/org/springframework/web/client/AsyncRestTemplate.java index 3e8092ef8f7..f2b25284021 100644 --- a/spring-web/src/main/java/org/springframework/web/client/AsyncRestTemplate.java +++ b/spring-web/src/main/java/org/springframework/web/client/AsyncRestTemplate.java @@ -24,11 +24,11 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.springframework.core.ParameterizedTypeReference; +import org.springframework.core.task.AsyncListenableTaskExecutor; import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.http.HttpEntity; @@ -44,12 +44,15 @@ import org.springframework.http.client.SimpleClientHttpRequestFactory; import org.springframework.http.client.support.AsyncHttpAccessor; import org.springframework.http.converter.HttpMessageConverter; import org.springframework.util.Assert; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureAdapter; +import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.web.util.UriTemplate; /** * Spring's central class for asynchronous client-side HTTP access. - * Exposes similar methods as {@link RestTemplate}, but returns {@link Future} wrappers - * as opposed to concrete results. + * Exposes similar methods as {@link RestTemplate}, but returns {@link ListenableFuture} + * wrappers as opposed to concrete results. * *

The {@code AsyncRestTemplate} exposes a synchronous {@link RestTemplate} via the * {@link #getRestOperations()} method, and it shares its @@ -69,7 +72,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe /** - * Create a new instance of the {@link AsyncRestTemplate} using default settings. + * Create a new instance of the {@code AsyncRestTemplate} using default settings. *

This constructor uses a {@link SimpleClientHttpRequestFactory} in combination * with a {@link SimpleAsyncTaskExecutor} for asynchronous execution. */ @@ -78,21 +81,22 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe } /** - * Create a new instance of the {@link AsyncRestTemplate} using the given + * Create a new instance of the {@code AsyncRestTemplate} using the given * {@link AsyncTaskExecutor}. *

This constructor uses a {@link SimpleClientHttpRequestFactory} in combination * with the given {@code AsyncTaskExecutor} for asynchronous execution. */ - public AsyncRestTemplate(AsyncTaskExecutor taskExecutor) { + public AsyncRestTemplate(AsyncListenableTaskExecutor taskExecutor) { Assert.notNull(taskExecutor, "AsyncTaskExecutor must not be null"); - SimpleClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory(); + SimpleClientHttpRequestFactory requestFactory = + new SimpleClientHttpRequestFactory(); requestFactory.setTaskExecutor(taskExecutor); this.syncTemplate = new RestTemplate(requestFactory); setAsyncRequestFactory(requestFactory); } /** - * Create a new instance of the {@link AsyncRestTemplate} using the given + * Create a new instance of the {@code AsyncRestTemplate} using the given * {@link AsyncClientHttpRequestFactory}. *

This constructor will cast the given asynchronous * {@code AsyncClientHttpRequestFactory} to a {@link ClientHttpRequestFactory}. Since @@ -105,22 +109,24 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe } /** - * Creates a new instance of the {@link AsyncRestTemplate} using the given + * Creates a new instance of the {@code AsyncRestTemplate} using the given * asynchronous and synchronous request factories. * @param asyncRequestFactory the asynchronous request factory * @param syncRequestFactory the synchronous request factory */ - public AsyncRestTemplate(AsyncClientHttpRequestFactory asyncRequestFactory, ClientHttpRequestFactory syncRequestFactory) { + public AsyncRestTemplate(AsyncClientHttpRequestFactory asyncRequestFactory, + ClientHttpRequestFactory syncRequestFactory) { this(asyncRequestFactory, new RestTemplate(syncRequestFactory)); } /** - * Create a new instance of the {@link AsyncRestTemplate} using the given + * Create a new instance of the {@code AsyncRestTemplate} using the given * {@link AsyncClientHttpRequestFactory} and synchronous {@link RestTemplate}. * @param requestFactory the asynchronous request factory to use * @param restTemplate the synchronous template to use */ - public AsyncRestTemplate(AsyncClientHttpRequestFactory requestFactory, RestTemplate restTemplate) { + public AsyncRestTemplate(AsyncClientHttpRequestFactory requestFactory, + RestTemplate restTemplate) { Assert.notNull(restTemplate, "'restTemplate' must not be null"); this.syncTemplate = restTemplate; setAsyncRequestFactory(requestFactory); @@ -165,7 +171,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe // GET @Override - public Future> getForEntity(String url, Class responseType, Object... uriVariables) + public ListenableFuture> getForEntity(String url, Class responseType, Object... uriVariables) throws RestClientException { AsyncRequestCallback requestCallback = acceptHeaderRequestCallback(responseType); @@ -174,7 +180,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe } @Override - public Future> getForEntity(String url, Class responseType, + public ListenableFuture> getForEntity(String url, Class responseType, Map urlVariables) throws RestClientException { AsyncRequestCallback requestCallback = acceptHeaderRequestCallback(responseType); @@ -183,7 +189,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe } @Override - public Future> getForEntity(URI url, Class responseType) throws RestClientException { + public ListenableFuture> getForEntity(URI url, Class responseType) throws RestClientException { AsyncRequestCallback requestCallback = acceptHeaderRequestCallback(responseType); ResponseExtractor> responseExtractor = responseEntityExtractor(responseType); return execute(url, HttpMethod.GET, requestCallback, responseExtractor); @@ -192,19 +198,19 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe // HEAD @Override - public Future headForHeaders(String url, Object... uriVariables) throws RestClientException { + public ListenableFuture headForHeaders(String url, Object... uriVariables) throws RestClientException { ResponseExtractor headersExtractor = headersExtractor(); return execute(url, HttpMethod.HEAD, null, headersExtractor, uriVariables); } @Override - public Future headForHeaders(String url, Map uriVariables) throws RestClientException { + public ListenableFuture headForHeaders(String url, Map uriVariables) throws RestClientException { ResponseExtractor headersExtractor = headersExtractor(); return execute(url, HttpMethod.HEAD, null, headersExtractor, uriVariables); } @Override - public Future headForHeaders(URI url) throws RestClientException { + public ListenableFuture headForHeaders(URI url) throws RestClientException { ResponseExtractor headersExtractor = headersExtractor(); return execute(url, HttpMethod.HEAD, null, headersExtractor); } @@ -212,39 +218,55 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe // POST @Override - public Future postForLocation(String url, HttpEntity request, + public ListenableFuture postForLocation(String url, HttpEntity request, Object... uriVariables) throws RestClientException { AsyncRequestCallback requestCallback = httpEntityCallback(request); ResponseExtractor headersExtractor = headersExtractor(); - Future headersFuture = + ListenableFuture headersFuture = execute(url, HttpMethod.POST, requestCallback, headersExtractor, uriVariables); return extractLocationHeader(headersFuture); } @Override - public Future postForLocation(String url, HttpEntity request, + public ListenableFuture postForLocation(String url, HttpEntity request, Map uriVariables) throws RestClientException { AsyncRequestCallback requestCallback = httpEntityCallback(request); ResponseExtractor headersExtractor = headersExtractor(); - Future headersFuture = + ListenableFuture headersFuture = execute(url, HttpMethod.POST, requestCallback, headersExtractor, uriVariables); return extractLocationHeader(headersFuture); } @Override - public Future postForLocation(URI url, HttpEntity request) + public ListenableFuture postForLocation(URI url, HttpEntity request) throws RestClientException { AsyncRequestCallback requestCallback = httpEntityCallback(request); ResponseExtractor headersExtractor = headersExtractor(); - Future headersFuture = + ListenableFuture headersFuture = execute(url, HttpMethod.POST, requestCallback, headersExtractor); return extractLocationHeader(headersFuture); } - private static Future extractLocationHeader(final Future headersFuture) { - return new Future() { + private static ListenableFuture extractLocationHeader(final ListenableFuture headersFuture) { + return new ListenableFuture() { + + @Override + public void addCallback(final ListenableFutureCallback callback) { + headersFuture.addCallback(new ListenableFutureCallback() { + @Override + public void onSuccess(HttpHeaders result) { + callback.onSuccess(result.getLocation()); + } + + @Override + public void onFailure(Throwable t) { + callback.onFailure(t); + } + }); + } + @Override public boolean cancel(boolean mayInterruptIfRunning) { return headersFuture.cancel(mayInterruptIfRunning); @@ -272,7 +294,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe } @Override - public Future> postForEntity(String url, HttpEntity request, + public ListenableFuture> postForEntity(String url, HttpEntity request, Class responseType, Object... uriVariables) throws RestClientException { AsyncRequestCallback requestCallback = httpEntityCallback(request, responseType); ResponseExtractor> responseExtractor = @@ -282,7 +304,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe } @Override - public Future> postForEntity(String url, HttpEntity request, + public ListenableFuture> postForEntity(String url, HttpEntity request, Class responseType, Map uriVariables) throws RestClientException { AsyncRequestCallback requestCallback = httpEntityCallback(request, responseType); @@ -293,7 +315,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe } @Override - public Future> postForEntity(URI url, HttpEntity request, + public ListenableFuture> postForEntity(URI url, HttpEntity request, Class responseType) throws RestClientException { AsyncRequestCallback requestCallback = httpEntityCallback(request, responseType); ResponseExtractor> responseExtractor = @@ -304,21 +326,21 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe // PUT @Override - public Future put(String url, HttpEntity request, Object... uriVariables) + public ListenableFuture put(String url, HttpEntity request, Object... uriVariables) throws RestClientException { AsyncRequestCallback requestCallback = httpEntityCallback(request); return execute(url, HttpMethod.PUT, requestCallback, null, uriVariables); } @Override - public Future put(String url, HttpEntity request, + public ListenableFuture put(String url, HttpEntity request, Map uriVariables) throws RestClientException { AsyncRequestCallback requestCallback = httpEntityCallback(request); return execute(url, HttpMethod.PUT, requestCallback, null, uriVariables); } @Override - public Future put(URI url, HttpEntity request) throws RestClientException { + public ListenableFuture put(URI url, HttpEntity request) throws RestClientException { AsyncRequestCallback requestCallback = httpEntityCallback(request); return execute(url, HttpMethod.PUT, requestCallback, null); } @@ -326,47 +348,64 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe // DELETE @Override - public Future delete(String url, Object... urlVariables) + public ListenableFuture delete(String url, Object... urlVariables) throws RestClientException { return execute(url, HttpMethod.DELETE, null, null, urlVariables); } @Override - public Future delete(String url, Map urlVariables) + public ListenableFuture delete(String url, Map urlVariables) throws RestClientException { return execute(url, HttpMethod.DELETE, null, null, urlVariables); } @Override - public Future delete(URI url) throws RestClientException { + public ListenableFuture delete(URI url) throws RestClientException { return execute(url, HttpMethod.DELETE, null, null); } // OPTIONS @Override - public Future> optionsForAllow(String url, Object... uriVariables) throws RestClientException { + public ListenableFuture> optionsForAllow(String url, Object... uriVariables) throws RestClientException { ResponseExtractor headersExtractor = headersExtractor(); - Future headersFuture = execute(url, HttpMethod.OPTIONS, null, headersExtractor, uriVariables); + ListenableFuture headersFuture = execute(url, HttpMethod.OPTIONS, null, headersExtractor, uriVariables); return extractAllowHeader(headersFuture); } @Override - public Future> optionsForAllow(String url, Map uriVariables) throws RestClientException { + public ListenableFuture> optionsForAllow(String url, Map uriVariables) throws RestClientException { ResponseExtractor headersExtractor = headersExtractor(); - Future headersFuture = execute(url, HttpMethod.OPTIONS, null, headersExtractor, uriVariables); + ListenableFuture headersFuture = execute(url, HttpMethod.OPTIONS, null, headersExtractor, uriVariables); return extractAllowHeader(headersFuture); } @Override - public Future> optionsForAllow(URI url) throws RestClientException { + public ListenableFuture> optionsForAllow(URI url) throws RestClientException { ResponseExtractor headersExtractor = headersExtractor(); - Future headersFuture = execute(url, HttpMethod.OPTIONS, null, headersExtractor); + ListenableFuture headersFuture = execute(url, HttpMethod.OPTIONS, null, headersExtractor); return extractAllowHeader(headersFuture); } - private static Future> extractAllowHeader(final Future headersFuture) { - return new Future>() { + private static ListenableFuture> extractAllowHeader(final ListenableFuture headersFuture) { + return new ListenableFuture>() { + + @Override + public void addCallback( + final ListenableFutureCallback> callback) { + headersFuture.addCallback(new ListenableFutureCallback() { + @Override + public void onSuccess(HttpHeaders result) { + callback.onSuccess(result.getAllow()); + } + + @Override + public void onFailure(Throwable t) { + callback.onFailure(t); + } + }); + } + @Override public boolean cancel(boolean mayInterruptIfRunning) { return headersFuture.cancel(mayInterruptIfRunning); @@ -397,7 +436,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe // exchange @Override - public Future> exchange(String url, HttpMethod method, + public ListenableFuture> exchange(String url, HttpMethod method, HttpEntity requestEntity, Class responseType, Object... uriVariables) throws RestClientException { AsyncRequestCallback requestCallback = @@ -408,7 +447,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe } @Override - public Future> exchange(String url, HttpMethod method, + public ListenableFuture> exchange(String url, HttpMethod method, HttpEntity requestEntity, Class responseType, Map uriVariables) throws RestClientException { AsyncRequestCallback requestCallback = @@ -419,7 +458,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe } @Override - public Future> exchange(URI url, HttpMethod method, + public ListenableFuture> exchange(URI url, HttpMethod method, HttpEntity requestEntity, Class responseType) throws RestClientException { AsyncRequestCallback requestCallback = @@ -430,7 +469,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe } @Override - public Future> exchange(String url, HttpMethod method, + public ListenableFuture> exchange(String url, HttpMethod method, HttpEntity requestEntity, ParameterizedTypeReference responseType, Object... uriVariables) throws RestClientException { Type type = responseType.getType(); @@ -441,7 +480,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe } @Override - public Future> exchange(String url, HttpMethod method, + public ListenableFuture> exchange(String url, HttpMethod method, HttpEntity requestEntity, ParameterizedTypeReference responseType, Map uriVariables) throws RestClientException { Type type = responseType.getType(); @@ -452,7 +491,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe } @Override - public Future> exchange(URI url, HttpMethod method, + public ListenableFuture> exchange(URI url, HttpMethod method, HttpEntity requestEntity, ParameterizedTypeReference responseType) throws RestClientException { Type type = responseType.getType(); @@ -466,7 +505,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe // general execution @Override - public Future execute(String url, HttpMethod method, + public ListenableFuture execute(String url, HttpMethod method, AsyncRequestCallback requestCallback, ResponseExtractor responseExtractor, Object... urlVariables) throws RestClientException { @@ -475,7 +514,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe } @Override - public Future execute(String url, HttpMethod method, + public ListenableFuture execute(String url, HttpMethod method, AsyncRequestCallback requestCallback, ResponseExtractor responseExtractor, Map urlVariables) throws RestClientException { @@ -484,7 +523,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe } @Override - public Future execute(URI url, HttpMethod method, + public ListenableFuture execute(URI url, HttpMethod method, AsyncRequestCallback requestCallback, ResponseExtractor responseExtractor) throws RestClientException { @@ -504,7 +543,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe * @return an arbitrary object, as returned by the {@link ResponseExtractor} */ @SuppressWarnings("unchecked") - protected Future doExecute(URI url, HttpMethod method, AsyncRequestCallback requestCallback, + protected ListenableFuture doExecute(URI url, HttpMethod method, AsyncRequestCallback requestCallback, ResponseExtractor responseExtractor) throws RestClientException { Assert.notNull(url, "'url' must not be null"); @@ -514,13 +553,9 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe if (requestCallback != null) { requestCallback.doWithRequest(request); } - Future responseFuture = request.executeAsync(); - if (responseExtractor != null) { - return new ResponseExtractorFuture(method, url, responseFuture, responseExtractor); - } - else { - return (Future) new VoidResponseFuture(method, url, responseFuture); - } + ListenableFuture responseFuture = request.executeAsync(); + return new ResponseExtractorFuture(method, url, responseFuture, + responseExtractor); } catch (IOException ex) { throw new ResourceAccessException("I/O error on " + method.name() + @@ -594,47 +629,30 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe return this.syncTemplate.headersExtractor(); } - - private abstract class ResponseFuture implements Future { + /** + * Future returned from + * {@link #doExecute(URI, HttpMethod, AsyncRequestCallback, ResponseExtractor)} + */ + private class ResponseExtractorFuture + extends ListenableFutureAdapter { private final HttpMethod method; private final URI url; - private final Future responseFuture; + private final ResponseExtractor responseExtractor; - public ResponseFuture(HttpMethod method, URI url, Future responseFuture) { + public ResponseExtractorFuture(HttpMethod method, URI url, + ListenableFuture clientHttpResponseFuture, + ResponseExtractor responseExtractor) { + super(clientHttpResponseFuture); this.method = method; this.url = url; - this.responseFuture = responseFuture; - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return this.responseFuture.cancel(mayInterruptIfRunning); - } - - @Override - public boolean isCancelled() { - return this.responseFuture.isCancelled(); - } - - @Override - public boolean isDone() { - return this.responseFuture.isDone(); - } - - @Override - public T get() throws InterruptedException, ExecutionException { - return getInternal(this.responseFuture.get()); + this.responseExtractor = responseExtractor; } @Override - public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - return getInternal(this.responseFuture.get(timeout, unit)); - } - - private T getInternal(ClientHttpResponse response) throws ExecutionException { + protected final T adapt(ClientHttpResponse response) throws ExecutionException { try { if (!getErrorHandler().hasError(response)) { logResponseStatus(this.method, this.url, response); @@ -642,7 +660,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe else { handleResponseError(this.method, this.url, response); } - return extractData(response); + return convertResponse(response); } catch (IOException ex) { throw new ExecutionException(ex); @@ -654,42 +672,13 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe } } - protected abstract T extractData(ClientHttpResponse response) - throws IOException; - - } - - - private class ResponseExtractorFuture extends ResponseFuture { - - private final ResponseExtractor responseExtractor; - - public ResponseExtractorFuture(HttpMethod method, URI url, Future responseFuture, - ResponseExtractor responseExtractor) { - super(method, url, responseFuture); - this.responseExtractor = responseExtractor; + protected T convertResponse(ClientHttpResponse response) throws IOException { + return responseExtractor != null ? responseExtractor.extractData(response) : + null; } - @Override - protected T extractData(ClientHttpResponse response) throws IOException { - return responseExtractor.extractData(response); - } } - - private class VoidResponseFuture extends ResponseFuture { - - public VoidResponseFuture(HttpMethod method, URI url, Future responseFuture) { - super(method, url, responseFuture); - } - - @Override - protected Void extractData(ClientHttpResponse response) throws IOException { - return null; - } - } - - /** * Adapts a {@link RequestCallback} to the {@link AsyncRequestCallback} interface. */ diff --git a/spring-web/src/test/java/org/springframework/http/client/AbstractAsyncHttpRequestFactoryTestCase.java b/spring-web/src/test/java/org/springframework/http/client/AbstractAsyncHttpRequestFactoryTestCase.java index ba99d0c678a..472812b63d8 100644 --- a/spring-web/src/test/java/org/springframework/http/client/AbstractAsyncHttpRequestFactoryTestCase.java +++ b/spring-web/src/test/java/org/springframework/http/client/AbstractAsyncHttpRequestFactoryTestCase.java @@ -34,6 +34,8 @@ import org.springframework.http.HttpStatus; import org.springframework.http.StreamingHttpOutputMessage; import org.springframework.util.FileCopyUtils; import org.springframework.util.StreamUtils; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureCallback; public abstract class AbstractAsyncHttpRequestFactoryTestCase extends AbstractJettyServerTestCase { @@ -62,6 +64,47 @@ public abstract class AbstractAsyncHttpRequestFactoryTestCase extends response.getStatusCode()); } + @Test + public void statusCallback() throws Exception { + URI uri = new URI(baseUrl + "/status/notfound"); + AsyncClientHttpRequest request = factory.createAsyncRequest(uri, HttpMethod.GET); + assertEquals("Invalid HTTP method", HttpMethod.GET, request.getMethod()); + assertEquals("Invalid HTTP URI", uri, request.getURI()); + Future futureResponse = request.executeAsync(); + if (futureResponse instanceof ListenableFuture) { + ListenableFuture listenableFuture = + (ListenableFuture) futureResponse; + + + listenableFuture.addCallback(new ListenableFutureCallback() { + @Override + public void onSuccess(ClientHttpResponse result) { + try { + System.out.println("SUCCESS! " + result.getStatusCode()); + System.out.println("Callback: " + System.currentTimeMillis()); + System.out.println(Thread.currentThread().getId()); + assertEquals("Invalid status code", HttpStatus.NOT_FOUND, + result.getStatusCode()); + } + catch (IOException e) { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + + @Override + public void onFailure(Throwable t) { + System.out.println("FAILURE: " + t); + } + }); + + } + ClientHttpResponse response = futureResponse.get(); + System.out.println("Main thread: " + System.currentTimeMillis()); + assertEquals("Invalid status code", HttpStatus.NOT_FOUND, + response.getStatusCode()); + System.out.println(Thread.currentThread().getId()); + } + @Test public void echo() throws Exception { AsyncClientHttpRequest diff --git a/spring-web/src/test/java/org/springframework/http/client/BufferedSimpleAsyncHttpRequestFactoryTests.java b/spring-web/src/test/java/org/springframework/http/client/BufferedSimpleAsyncHttpRequestFactoryTests.java index 519246b7b6e..f79edea646f 100644 --- a/spring-web/src/test/java/org/springframework/http/client/BufferedSimpleAsyncHttpRequestFactoryTests.java +++ b/spring-web/src/test/java/org/springframework/http/client/BufferedSimpleAsyncHttpRequestFactoryTests.java @@ -20,7 +20,7 @@ import java.net.ProtocolException; import org.junit.Test; -import org.springframework.core.task.AsyncTaskExecutor; +import org.springframework.core.task.AsyncListenableTaskExecutor; import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.http.HttpMethod; @@ -29,7 +29,7 @@ public class BufferedSimpleAsyncHttpRequestFactoryTests extends AbstractAsyncHtt @Override protected AsyncClientHttpRequestFactory createRequestFactory() { SimpleClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory(); - AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); + AsyncListenableTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); requestFactory.setTaskExecutor(taskExecutor); return requestFactory; } diff --git a/spring-web/src/test/java/org/springframework/web/client/AsyncRestTemplateIntegrationTests.java b/spring-web/src/test/java/org/springframework/web/client/AsyncRestTemplateIntegrationTests.java index 33ab3c25ac0..277aaa2398b 100644 --- a/spring-web/src/test/java/org/springframework/web/client/AsyncRestTemplateIntegrationTests.java +++ b/spring-web/src/test/java/org/springframework/web/client/AsyncRestTemplateIntegrationTests.java @@ -40,6 +40,8 @@ import org.springframework.http.ResponseEntity; import org.springframework.http.client.HttpComponentsAsyncClientHttpRequestFactory; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureCallback; /** @author Arjen Poutsma */ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCase { @@ -63,6 +65,37 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa assertEquals("Invalid status code", HttpStatus.OK, entity.getStatusCode()); } + @Test + public void multipleFutureGets() throws ExecutionException, InterruptedException { + Future> + futureEntity = template.getForEntity(baseUrl + "/{method}", String.class, "get"); + futureEntity.get(); + futureEntity.get(); + } + + @Test + public void getEntityCallback() throws ExecutionException, InterruptedException { + ListenableFuture> + futureEntity = template.getForEntity(baseUrl + "/{method}", String.class, "get"); + futureEntity.addCallback(new ListenableFutureCallback>() { + @Override + public void onSuccess(ResponseEntity entity) { + assertEquals("Invalid content", helloWorld, entity.getBody()); + assertFalse("No headers", entity.getHeaders().isEmpty()); + assertEquals("Invalid content-type", contentType, entity.getHeaders().getContentType()); + assertEquals("Invalid status code", HttpStatus.OK, entity.getStatusCode()); + } + + @Override + public void onFailure(Throwable t) { + fail(t.getMessage()); + } + }); + // wait till done + while (!futureEntity.isDone()) { + } + } + @Test public void getNoResponse() throws ExecutionException, InterruptedException { Future> @@ -111,27 +144,60 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa assertTrue("No Content-Type header", headers.containsKey("Content-Type")); } + @Test + public void headForHeadersCallback() throws ExecutionException, InterruptedException { + ListenableFuture headersFuture = template.headForHeaders(baseUrl + "/get"); + headersFuture.addCallback(new ListenableFutureCallback() { + @Override + public void onSuccess(HttpHeaders result) { + assertTrue("No Content-Type header", result.containsKey("Content-Type")); + } + + @Override + public void onFailure(Throwable t) { + fail(t.getMessage()); + } + }); + while (!headersFuture.isDone()) { + } + } + @Test public void postForLocation() throws URISyntaxException, ExecutionException, InterruptedException { - HttpEntity requestEntity = new HttpEntity<>(helloWorld); - Future locationFuture = template.postForLocation(baseUrl + "/{method}", requestEntity, + HttpHeaders entityHeaders = new HttpHeaders(); + entityHeaders.setContentType(new MediaType("text", "plain", Charset.forName("ISO-8859-15"))); + HttpEntity entity = new HttpEntity(helloWorld, entityHeaders); + Future + locationFuture = template.postForLocation(baseUrl + "/{method}", entity, "post"); URI location = locationFuture.get(); assertEquals("Invalid location", new URI(baseUrl + "/post/1"), location); } @Test - public void postForLocationEntity() + public void postForLocationCallback() throws URISyntaxException, ExecutionException, InterruptedException { HttpHeaders entityHeaders = new HttpHeaders(); entityHeaders.setContentType(new MediaType("text", "plain", Charset.forName("ISO-8859-15"))); HttpEntity entity = new HttpEntity(helloWorld, entityHeaders); - Future + final URI expected = new URI(baseUrl + "/post/1"); + ListenableFuture locationFuture = template.postForLocation(baseUrl + "/{method}", entity, "post"); - URI location = locationFuture.get(); - assertEquals("Invalid location", new URI(baseUrl + "/post/1"), location); + locationFuture.addCallback(new ListenableFutureCallback() { + @Override + public void onSuccess(URI result) { + assertEquals("Invalid location", expected, result); + } + + @Override + public void onFailure(Throwable t) { + fail(t.getMessage()); + } + }); + while (!locationFuture.isDone()) { + } } @Test @@ -145,6 +211,28 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa assertEquals("Invalid content", helloWorld, responseEntity.getBody()); } + @Test + public void postForEntityCallback() + throws URISyntaxException, ExecutionException, InterruptedException { + HttpEntity requestEntity = new HttpEntity<>(helloWorld); + ListenableFuture> + responseEntityFuture = template.postForEntity(baseUrl + "/{method}", requestEntity, + String.class, "post"); + responseEntityFuture.addCallback(new ListenableFutureCallback>() { + @Override + public void onSuccess(ResponseEntity result) { + assertEquals("Invalid content", helloWorld, result.getBody()); + } + + @Override + public void onFailure(Throwable t) { + fail(t.getMessage()); + } + }); + while (!responseEntityFuture.isDone()) { + } + } + @Test public void put() throws URISyntaxException, ExecutionException, InterruptedException { @@ -155,17 +243,59 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa responseEntityFuture.get(); } + @Test + public void putCallback() + throws URISyntaxException, ExecutionException, InterruptedException { + HttpEntity requestEntity = new HttpEntity<>(helloWorld); + ListenableFuture + responseEntityFuture = template.put(baseUrl + "/{method}", requestEntity, + "put"); + responseEntityFuture.addCallback(new ListenableFutureCallback() { + @Override + public void onSuccess(Object result) { + assertNull(result); + } + + @Override + public void onFailure(Throwable t) { + fail(t.getMessage()); + } + }); + while (!responseEntityFuture.isDone()) { + } + } + @Test public void delete() throws URISyntaxException, ExecutionException, InterruptedException { Future deletedFuture = template.delete(new URI(baseUrl + "/delete")); + deletedFuture.get(); } + @Test + public void deleteCallback() + throws URISyntaxException, ExecutionException, InterruptedException { + ListenableFuture deletedFuture = template.delete(new URI(baseUrl + "/delete")); + deletedFuture.addCallback(new ListenableFutureCallback() { + @Override + public void onSuccess(Object result) { + assertNull(result); + } + + @Override + public void onFailure(Throwable t) { + fail(t.getMessage()); + } + }); + while (!deletedFuture.isDone()) { + } + } + @Test public void notFound() throws ExecutionException, InterruptedException { try { - Future future = template.execute(baseUrl + "/status/notfound", HttpMethod.GET, null, null); + Future future = template.execute(baseUrl + "/status/notfound", HttpMethod.GET, null, null); future.get(); fail("HttpClientErrorException expected"); } @@ -176,6 +306,30 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa } } + @Test + public void notFoundCallback() throws ExecutionException, InterruptedException { + ListenableFuture future = + template.execute(baseUrl + "/status/notfound", HttpMethod.GET, null, + null); + future.addCallback(new ListenableFutureCallback() { + @Override + public void onSuccess(Object result) { + fail("onSuccess not expected"); + } + + @Override + public void onFailure(Throwable t) { + assertTrue(t instanceof HttpClientErrorException); + HttpClientErrorException ex = (HttpClientErrorException) t; + assertEquals(HttpStatus.NOT_FOUND, ex.getStatusCode()); + assertNotNull(ex.getStatusText()); + assertNotNull(ex.getResponseBodyAsString()); + } + }); + while (!future.isDone()) { + } + } + @Test public void serverError() throws ExecutionException, InterruptedException { try { @@ -190,6 +344,28 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa } } + @Test + public void serverErrorCallback() throws ExecutionException, InterruptedException { + ListenableFuture future = template.execute(baseUrl + "/status/server", HttpMethod.GET, null, null); + future.addCallback(new ListenableFutureCallback() { + @Override + public void onSuccess(Void result) { + fail("onSuccess not expected"); + } + + @Override + public void onFailure(Throwable t) { + assertTrue(t instanceof HttpServerErrorException); + HttpServerErrorException ex = (HttpServerErrorException) t; + assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, ex.getStatusCode()); + assertNotNull(ex.getStatusText()); + assertNotNull(ex.getResponseBodyAsString()); + } + }); + while (!future.isDone()) { + } + } + @Test public void optionsForAllow() throws URISyntaxException, ExecutionException, InterruptedException { @@ -200,6 +376,27 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa EnumSet.of(HttpMethod.GET, HttpMethod.OPTIONS, HttpMethod.HEAD, HttpMethod.TRACE), allowed); } + @Test + public void optionsForAllowCallback() + throws URISyntaxException, ExecutionException, InterruptedException { + ListenableFuture> + allowedFuture = template.optionsForAllow(new URI(baseUrl + "/get")); + allowedFuture.addCallback(new ListenableFutureCallback>() { + @Override + public void onSuccess(Set result) { + assertEquals("Invalid response", + EnumSet.of(HttpMethod.GET, HttpMethod.OPTIONS, HttpMethod.HEAD, HttpMethod.TRACE), result); + } + + @Override + public void onFailure(Throwable t) { + fail(t.getMessage()); + } + }); + while (!allowedFuture.isDone()) { + } + } + @Test @SuppressWarnings("unchecked") public void exchangeGet() throws Exception { @@ -213,6 +410,30 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa assertEquals("Invalid content", helloWorld, response.getBody()); } + @Test + @SuppressWarnings("unchecked") + public void exchangeGetCallback() throws Exception { + HttpHeaders requestHeaders = new HttpHeaders(); + requestHeaders.set("MyHeader", "MyValue"); + HttpEntity requestEntity = new HttpEntity(requestHeaders); + ListenableFuture> responseFuture = + template.exchange(baseUrl + "/{method}", HttpMethod.GET, requestEntity, + String.class, "get"); + responseFuture.addCallback(new ListenableFutureCallback>() { + @Override + public void onSuccess(ResponseEntity result) { + assertEquals("Invalid content", helloWorld, result.getBody()); + } + + @Override + public void onFailure(Throwable t) { + fail(t.getMessage()); + } + }); + while (!responseFuture.isDone()) { + } + } + @Test public void exchangePost() throws Exception { HttpHeaders requestHeaders = new HttpHeaders(); @@ -228,6 +449,34 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa assertFalse(result.hasBody()); } + @Test + public void exchangePostCallback() throws Exception { + HttpHeaders requestHeaders = new HttpHeaders(); + requestHeaders.set("MyHeader", "MyValue"); + requestHeaders.setContentType(MediaType.TEXT_PLAIN); + HttpEntity requestEntity = new HttpEntity(helloWorld, requestHeaders); + ListenableFuture> + resultFuture = template.exchange(baseUrl + "/{method}", HttpMethod.POST, + requestEntity, Void.class, "post"); + final URI expected =new URI(baseUrl + "/post/1"); + resultFuture.addCallback(new ListenableFutureCallback>() { + @Override + public void onSuccess(ResponseEntity result) { + assertEquals("Invalid location", expected, + result.getHeaders().getLocation()); + assertFalse(result.hasBody()); + } + + @Override + public void onFailure(Throwable t) { + fail(t.getMessage()); + } + }); + while (!resultFuture.isDone()) { + } + + } + @Test public void multipart() throws UnsupportedEncodingException, ExecutionException, InterruptedException {