Browse Source

Merge pull request #351 from poutsma/listenable_future

* listenable_future:
  Added ListenableFuture interface
pull/781/head
Arjen Poutsma 13 years ago
parent
commit
476af2e962
  1. 52
      spring-core/src/main/java/org/springframework/core/task/AsyncListenableTaskExecutor.java
  2. 18
      spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java
  3. 122
      spring-core/src/main/java/org/springframework/util/concurrent/FutureAdapter.java
  4. 41
      spring-core/src/main/java/org/springframework/util/concurrent/ListenableFuture.java
  5. 69
      spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureAdapter.java
  6. 40
      spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallback.java
  7. 99
      spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallbackRegistry.java
  8. 83
      spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureTask.java
  9. 7
      spring-core/src/main/java/org/springframework/util/concurrent/package-info.java
  10. 88
      spring-core/src/test/java/org/springframework/util/concurrent/FutureAdapterTests.java
  11. 82
      spring-core/src/test/java/org/springframework/util/concurrent/ListenableFutureTaskTests.java
  12. 14
      spring-web/src/main/java/org/springframework/http/client/AbstractAsyncClientHttpRequest.java
  13. 11
      spring-web/src/main/java/org/springframework/http/client/AbstractBufferingAsyncClientHttpRequest.java
  14. 4
      spring-web/src/main/java/org/springframework/http/client/AsyncClientHttpRequest.java
  15. 65
      spring-web/src/main/java/org/springframework/http/client/HttpComponentsAsyncClientHttpRequest.java
  16. 14
      spring-web/src/main/java/org/springframework/http/client/SimpleBufferingAsyncClientHttpRequest.java
  17. 8
      spring-web/src/main/java/org/springframework/http/client/SimpleClientHttpRequestFactory.java
  18. 12
      spring-web/src/main/java/org/springframework/http/client/SimpleStreamingAsyncClientHttpRequest.java
  19. 61
      spring-web/src/main/java/org/springframework/web/client/AsyncRestOperations.java
  20. 235
      spring-web/src/main/java/org/springframework/web/client/AsyncRestTemplate.java
  21. 43
      spring-web/src/test/java/org/springframework/http/client/AbstractAsyncHttpRequestFactoryTestCase.java
  22. 4
      spring-web/src/test/java/org/springframework/http/client/BufferedSimpleAsyncHttpRequestFactoryTests.java
  23. 263
      spring-web/src/test/java/org/springframework/web/client/AsyncRestTemplateIntegrationTests.java

52
spring-core/src/main/java/org/springframework/core/task/AsyncListenableTaskExecutor.java

@ -0,0 +1,52 @@ @@ -0,0 +1,52 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.task;
import java.util.concurrent.Callable;
import org.springframework.util.concurrent.ListenableFuture;
/**
* Extension of the {@link AsyncTaskExecutor} interface, adding the capability to submit
* tasks for {@link ListenableFuture}s.
*
* @author Arjen Poutsma
* @since 4.0
* @see ListenableFuture
*/
public interface AsyncListenableTaskExecutor extends AsyncTaskExecutor {
/**
* Submit a {@code Runnable} task for execution, receiving a {@code ListenableFuture}
* representing that task. The Future will return a {@code null} result upon completion.
* @param task the {@code Runnable} to execute (never {@code null})
* @return a {@code ListenableFuture} representing pending completion of the task
* @throws TaskRejectedException if the given task was not accepted
*/
ListenableFuture<?> submitListenable(Runnable task);
/**
* Submit a {@code Callable} task for execution, receiving a {@code ListenableFuture}
* representing that task. The Future will return the Callable's result upon
* completion.
* @param task the {@code Callable} to execute (never {@code null})
* @return a {@code ListenableFuture} representing pending completion of the task
* @throws TaskRejectedException if the given task was not accepted
*/
<T> ListenableFuture<T> submitListenable(Callable<T> task);
}

18
spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java

@ -25,6 +25,8 @@ import java.util.concurrent.ThreadFactory; @@ -25,6 +25,8 @@ import java.util.concurrent.ThreadFactory;
import org.springframework.util.Assert;
import org.springframework.util.ConcurrencyThrottleSupport;
import org.springframework.util.CustomizableThreadCreator;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureTask;
/**
* {@link TaskExecutor} implementation that fires up a new Thread for each task,
@ -45,7 +47,7 @@ import org.springframework.util.CustomizableThreadCreator; @@ -45,7 +47,7 @@ import org.springframework.util.CustomizableThreadCreator;
* @see org.springframework.scheduling.commonj.WorkManagerTaskExecutor
*/
@SuppressWarnings("serial")
public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator implements AsyncTaskExecutor, Serializable {
public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator implements AsyncListenableTaskExecutor, Serializable {
/**
* Permit any number of concurrent invocations: that is, don't throttle concurrency.
@ -184,6 +186,20 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator implement @@ -184,6 +186,20 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator implement
return future;
}
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
ListenableFutureTask<Object> future = new ListenableFutureTask<Object>(task, null);
execute(future, TIMEOUT_INDEFINITE);
return future;
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
ListenableFutureTask<T> future = new ListenableFutureTask<T>(task);
execute(future, TIMEOUT_INDEFINITE);
return future;
}
/**
* Template method for the actual execution of a task.
* <p>The default implementation creates a new Thread and starts it.

122
spring-core/src/main/java/org/springframework/util/concurrent/FutureAdapter.java

@ -0,0 +1,122 @@ @@ -0,0 +1,122 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.util.concurrent;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.springframework.util.Assert;
/**
* Abstract class that adapts a {@link Future} parameterized over S into a {@code
* Future} parameterized over T. All methods are delegated to the adaptee, where {@link
* #get()} and {@link #get(long, TimeUnit)} call {@@link #adapt(Object)} on the adaptee's
* result.
*
* @param <T> the type of this {@code Future}
* @param <S> the type of the adaptee's {@code Future}
* @author Arjen Poutsma
* @since 4.0
*/
public abstract class FutureAdapter<T, S> implements Future<T> {
private final Future<S> adaptee;
private Object result = null;
private State state = State.NEW;
private final Object mutex = new Object();
/**
* Constructs a new {@code FutureAdapter} with the given adaptee.
* @param adaptee the future to delegate to
*/
protected FutureAdapter(Future<S> adaptee) {
Assert.notNull(adaptee, "'delegate' must not be null");
this.adaptee = adaptee;
}
/**
* Returns the adaptee.
*/
protected Future<S> getAdaptee() {
return adaptee;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return adaptee.cancel(mayInterruptIfRunning);
}
@Override
public boolean isCancelled() {
return adaptee.isCancelled();
}
@Override
public boolean isDone() {
return adaptee.isDone();
}
@Override
public T get() throws InterruptedException, ExecutionException {
return adaptInternal(adaptee.get());
}
@Override
public T get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return adaptInternal(adaptee.get(timeout, unit));
}
@SuppressWarnings("unchecked")
final T adaptInternal(S adapteeResult) throws ExecutionException {
synchronized (mutex) {
switch (state) {
case SUCCESS:
return (T) result;
case FAILURE:
throw (ExecutionException) result;
case NEW:
try {
T adapted = adapt(adapteeResult);
result = adapted;
state = State.SUCCESS;
return adapted;
} catch (ExecutionException ex) {
result = ex;
state = State.FAILURE;
throw ex;
}
default:
throw new IllegalStateException();
}
}
}
/**
* Adapts the given adaptee's result into T.
* @return the adapted result
*/
protected abstract T adapt(S adapteeResult) throws ExecutionException;
private enum State {NEW, SUCCESS, FAILURE}
}

41
spring-core/src/main/java/org/springframework/util/concurrent/ListenableFuture.java

@ -0,0 +1,41 @@ @@ -0,0 +1,41 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.util.concurrent;
import java.util.concurrent.Future;
/**
* Extends the {@link Future} interface with the capability to accept completion
* callbacks. If the future has already completed when the callback is added, the
* callback will be triggered immediately.
* <p>Inspired by {@link com.google.common.util.concurrent.ListenableFuture}.
* @author Arjen Poutsma
* @since 4.0
*/
public interface ListenableFuture<T> extends Future<T> {
/**
* Registers the given callback to this {@code ListenableFuture}. The callback will
* be triggered when this {@code Future} is complete or, if it is already complete,
* immediately.
*
* @param callback the callback to register
*/
void addCallback(ListenableFutureCallback<? super T> callback);
}

69
spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureAdapter.java

@ -0,0 +1,69 @@ @@ -0,0 +1,69 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.util.concurrent;
import java.util.concurrent.ExecutionException;
/**
* Abstract class that adapts a {@link ListenableFuture} parameterized over S into a
* {@code ListenableFuture} parameterized over T. All methods are delegated to the
* adaptee, where {@link #get()}, {@link #get(long, java.util.concurrent.TimeUnit)}, and
* {@link ListenableFutureCallback#onSuccess(Object)} call {@@link #adapt(Object)} on the
* adaptee's result.
*
* @param <T> the type of this {@code Future}
* @param <S> the type of the adaptee's {@code Future}
* @author Arjen Poutsma
* @since 4.0
*/
public abstract class ListenableFutureAdapter<T, S> extends FutureAdapter<T, S>
implements ListenableFuture<T> {
/**
* Constructs a new {@code ListenableFutureAdapter} with the given adaptee.
* @param adaptee the future to adaptee to
*/
protected ListenableFutureAdapter(ListenableFuture<S> adaptee) {
super(adaptee);
}
@Override
public void addCallback(final ListenableFutureCallback<? super T> callback) {
ListenableFuture<S> listenableAdaptee = (ListenableFuture<S>) getAdaptee();
listenableAdaptee.addCallback(new ListenableFutureCallback<S>() {
@Override
public void onSuccess(S result) {
try {
callback.onSuccess(adaptInternal(result));
}
catch (ExecutionException ex) {
Throwable cause = ex.getCause();
onFailure(cause != null ? cause : ex);
}
catch (Throwable t) {
onFailure(t);
}
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
});
}
}

40
spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallback.java

@ -0,0 +1,40 @@ @@ -0,0 +1,40 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.util.concurrent;
/**
* Defines the contract for callbacks that accept the result of a
* {@link ListenableFuture}.
*
* @author Arjen Poutsma
* @since 4.0
*/
public interface ListenableFutureCallback<T> {
/**
* Called when the {@link ListenableFuture} successfully completes.
* @param result the result
*/
void onSuccess(T result);
/**
* Called when the {@link ListenableFuture} fails to complete.
* @param t the exception that triggered the failure
*/
void onFailure(Throwable t);
}

99
spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallbackRegistry.java

@ -0,0 +1,99 @@ @@ -0,0 +1,99 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.util.concurrent;
import java.util.LinkedList;
import java.util.Queue;
import org.springframework.util.Assert;
/**
* Registry for {@link ListenableFutureCallback} instances.
* <p>Inspired by {@link com.google.common.util.concurrent.ExecutionList}.
* @author Arjen Poutsma
* @since 4.0
*/
public class ListenableFutureCallbackRegistry<T> {
private final Queue<ListenableFutureCallback<? super T>> callbacks =
new LinkedList<ListenableFutureCallback<? super T>>();
private State state = State.NEW;
private Object result = null;
private final Object mutex = new Object();
/**
* Adds the given callback to this registry.
* @param callback the callback to add
*/
@SuppressWarnings("unchecked")
public void addCallback(ListenableFutureCallback<? super T> callback) {
Assert.notNull(callback, "'callback' must not be null");
synchronized (mutex) {
switch (state) {
case NEW:
callbacks.add(callback);
break;
case SUCCESS:
callback.onSuccess((T)result);
break;
case FAILURE:
callback.onFailure((Throwable) result);
break;
}
}
}
/**
* Triggers a {@link ListenableFutureCallback#onSuccess(Object)} call on all added
* callbacks with the given result
* @param result the result to trigger the callbacks with
*/
public void success(T result) {
synchronized (mutex) {
state = State.SUCCESS;
this.result = result;
while (!callbacks.isEmpty()) {
callbacks.poll().onSuccess(result);
}
}
}
/**
* Triggers a {@link ListenableFutureCallback#onFailure(Throwable)} call on all added
* callbacks with the given {@code Throwable}.
* @param t the exception to trigger the callbacks with
*/
public void failure(Throwable t) {
synchronized (mutex) {
state = State.FAILURE;
this.result = t;
while (!callbacks.isEmpty()) {
callbacks.poll().onFailure(t);
}
}
}
private enum State {NEW, SUCCESS, FAILURE}
}

83
spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureTask.java

@ -0,0 +1,83 @@ @@ -0,0 +1,83 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.util.concurrent;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
/**
* Extension of {@link FutureTask} that implements {@link ListenableFuture}.
*
* @author Arjen Poutsma
* @since 4.0
*/
public class ListenableFutureTask<T> extends FutureTask<T>
implements ListenableFuture<T> {
private final ListenableFutureCallbackRegistry<T> callbacks =
new ListenableFutureCallbackRegistry<T>();
/**
* Creates a new {@code ListenableFutureTask} that will, upon running, execute the given
* {@link Callable}.
* @param callable the callable task
*/
public ListenableFutureTask(Callable<T> callable) {
super(callable);
}
/**
* Creates a {@code ListenableFutureTask} that will, upon running, execute the given
* {@link Runnable}, and arrange that {@link #get()} will return the given result on
* successful completion.
* @param runnable the runnable task
* @param result the result to return on successful completion
*/
public ListenableFutureTask(Runnable runnable, T result) {
super(runnable, result);
}
@Override
public void addCallback(ListenableFutureCallback<? super T> callback) {
callbacks.addCallback(callback);
}
@Override
protected final void done() {
Throwable cause;
try {
T result = get();
callbacks.success(result);
return;
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
return;
}
catch (ExecutionException ex) {
cause = ex.getCause();
if (cause == null) {
cause = ex;
}
}
catch (Throwable t) {
cause = t;
}
callbacks.failure(cause);
}
}

7
spring-core/src/main/java/org/springframework/util/concurrent/package-info.java

@ -0,0 +1,7 @@ @@ -0,0 +1,7 @@
/**
*
* Useful generic {@code java.util.concurrent.Future} extension.
*/
package org.springframework.util.concurrent;

88
spring-core/src/test/java/org/springframework/util/concurrent/FutureAdapterTests.java

@ -0,0 +1,88 @@ @@ -0,0 +1,88 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.util.concurrent;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.Before;
import org.junit.Test;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.mock;
/**
* @author Arjen Poutsma
*/
public class FutureAdapterTests {
private FutureAdapter<String, Integer> adapter;
private Future<Integer> adaptee;
@Before
@SuppressWarnings("unchecked")
public void setUp() {
adaptee = mock(Future.class);
adapter = new FutureAdapter<String, Integer>(adaptee) {
@Override
protected String adapt(Integer adapteeResult) throws ExecutionException {
return adapteeResult.toString();
}
};
}
@Test
public void cancel() throws Exception {
given(adaptee.cancel(true)).willReturn(true);
boolean result = adapter.cancel(true);
assertTrue(result);
}
@Test
public void isCancelled() {
given(adaptee.isCancelled()).willReturn(true);
boolean result = adapter.isCancelled();
assertTrue(result);
}
@Test
public void isDone() {
given(adaptee.isDone()).willReturn(true);
boolean result = adapter.isDone();
assertTrue(result);
}
@Test
public void get() throws Exception {
given(adaptee.get()).willReturn(42);
String result = adapter.get();
assertEquals("42", result);
}
@Test
public void getTimeOut() throws Exception {
given(adaptee.get(1, TimeUnit.SECONDS)).willReturn(42);
String result = adapter.get(1, TimeUnit.SECONDS);
assertEquals("42", result);
}
}

82
spring-core/src/test/java/org/springframework/util/concurrent/ListenableFutureTaskTests.java

@ -0,0 +1,82 @@ @@ -0,0 +1,82 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.util.concurrent;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import org.junit.Test;
/**
* @author Arjen Poutsma
*/
public class ListenableFutureTaskTests {
@Test
public void success() throws ExecutionException, InterruptedException {
final String s = "Hello World";
Callable<String> callable = new Callable<String>() {
@Override
public String call() throws Exception {
return s;
}
};
ListenableFutureTask<String> task = new ListenableFutureTask<String>(callable);
task.addCallback(new ListenableFutureCallback<String>() {
@Override
public void onSuccess(String result) {
assertEquals(s, result);
}
@Override
public void onFailure(Throwable t) {
fail(t.getMessage());
}
});
task.run();
}
@Test
public void failure() throws ExecutionException, InterruptedException {
final String s = "Hello World";
Callable<String> callable = new Callable<String>() {
@Override
public String call() throws Exception {
throw new IOException(s);
}
};
ListenableFutureTask<String> task = new ListenableFutureTask<String>(callable);
task.addCallback(new ListenableFutureCallback<String>() {
@Override
public void onSuccess(String result) {
fail("onSuccess not expected");
}
@Override
public void onFailure(Throwable t) {
assertEquals(s, t.getMessage());
}
});
task.run();
}
}

14
spring-web/src/main/java/org/springframework/http/client/AbstractAsyncClientHttpRequest.java

@ -18,10 +18,10 @@ package org.springframework.http.client; @@ -18,10 +18,10 @@ package org.springframework.http.client;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.Future;
import org.springframework.http.HttpHeaders;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
/**
* Abstract base for {@link AsyncClientHttpRequest} that makes sure that headers and body
@ -49,15 +49,15 @@ abstract class AbstractAsyncClientHttpRequest implements AsyncClientHttpRequest @@ -49,15 +49,15 @@ abstract class AbstractAsyncClientHttpRequest implements AsyncClientHttpRequest
}
@Override
public Future<ClientHttpResponse> executeAsync() throws IOException {
public ListenableFuture<ClientHttpResponse> executeAsync() throws IOException {
assertNotExecuted();
Future<ClientHttpResponse> result = executeInternal(this.headers);
ListenableFuture<ClientHttpResponse> result = executeInternal(this.headers);
this.executed = true;
return result;
}
/**
* Asserts that this request has not been {@linkplain #execute() executed} yet.
* Asserts that this request has not been {@linkplain #executeAsync() executed} yet.
*
* @throws IllegalStateException if this request has been executed
*/
@ -74,10 +74,12 @@ abstract class AbstractAsyncClientHttpRequest implements AsyncClientHttpRequest @@ -74,10 +74,12 @@ abstract class AbstractAsyncClientHttpRequest implements AsyncClientHttpRequest
protected abstract OutputStream getBodyInternal(HttpHeaders headers) throws IOException;
/**
* Abstract template method that writes the given headers and content to the HTTP request.
* Abstract template method that writes the given headers and content to the HTTP
* request.
* @param headers the HTTP headers
* @return the response object for the executed request
*/
protected abstract Future<ClientHttpResponse> executeInternal(HttpHeaders headers) throws IOException;
protected abstract ListenableFuture<ClientHttpResponse> executeInternal(
HttpHeaders headers) throws IOException;
}

11
spring-web/src/main/java/org/springframework/http/client/AbstractBufferingAsyncClientHttpRequest.java

@ -19,9 +19,9 @@ package org.springframework.http.client; @@ -19,9 +19,9 @@ package org.springframework.http.client;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.Future;
import org.springframework.http.HttpHeaders;
import org.springframework.util.concurrent.ListenableFuture;
/**
* Abstract base for {@link org.springframework.http.client.ClientHttpRequest} that buffers output in a byte array before sending it over the wire.
@ -40,12 +40,13 @@ abstract class AbstractBufferingAsyncClientHttpRequest @@ -40,12 +40,13 @@ abstract class AbstractBufferingAsyncClientHttpRequest
}
@Override
protected Future<ClientHttpResponse> executeInternal(HttpHeaders headers) throws IOException {
protected ListenableFuture<ClientHttpResponse> executeInternal(HttpHeaders headers)
throws IOException {
byte[] bytes = this.bufferedOutput.toByteArray();
if (headers.getContentLength() == -1) {
headers.setContentLength(bytes.length);
}
Future<ClientHttpResponse> result = executeInternal(headers, bytes);
ListenableFuture<ClientHttpResponse> result = executeInternal(headers, bytes);
this.bufferedOutput = null;
return result;
}
@ -58,8 +59,8 @@ abstract class AbstractBufferingAsyncClientHttpRequest @@ -58,8 +59,8 @@ abstract class AbstractBufferingAsyncClientHttpRequest
* @param bufferedOutput the body content
* @return the response object for the executed request
*/
protected abstract Future<ClientHttpResponse> executeInternal(HttpHeaders headers,
byte[] bufferedOutput) throws IOException;
protected abstract ListenableFuture<ClientHttpResponse> executeInternal(
HttpHeaders headers, byte[] bufferedOutput) throws IOException;
}

4
spring-web/src/main/java/org/springframework/http/client/AsyncClientHttpRequest.java

@ -17,10 +17,10 @@ @@ -17,10 +17,10 @@
package org.springframework.http.client;
import java.io.IOException;
import java.util.concurrent.Future;
import org.springframework.http.HttpOutputMessage;
import org.springframework.http.HttpRequest;
import org.springframework.util.concurrent.ListenableFuture;
/**
* Represents a client-side asynchronous HTTP request. Created via an implementation of
@ -41,7 +41,7 @@ public interface AsyncClientHttpRequest extends HttpRequest, HttpOutputMessage { @@ -41,7 +41,7 @@ public interface AsyncClientHttpRequest extends HttpRequest, HttpOutputMessage {
* @return the future response result of the execution
* @throws java.io.IOException in case of I/O errors
*/
Future<ClientHttpResponse> executeAsync() throws IOException;
ListenableFuture<ClientHttpResponse> executeAsync() throws IOException;
}

65
spring-web/src/main/java/org/springframework/http/client/HttpComponentsAsyncClientHttpRequest.java

@ -18,21 +18,23 @@ package org.springframework.http.client; @@ -18,21 +18,23 @@ package org.springframework.http.client;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.http.HttpEntity;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.nio.client.HttpAsyncClient;
import org.apache.http.nio.entity.NByteArrayEntity;
import org.apache.http.protocol.HttpContext;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.util.concurrent.FutureAdapter;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.ListenableFutureCallbackRegistry;
/**
* {@link ClientHttpRequest} implementation that uses Apache HttpComponents HttpClient to
@ -72,7 +74,7 @@ final class HttpComponentsAsyncClientHttpRequest extends AbstractBufferingAsyncC @@ -72,7 +74,7 @@ final class HttpComponentsAsyncClientHttpRequest extends AbstractBufferingAsyncC
}
@Override
protected Future<ClientHttpResponse> executeInternal(HttpHeaders headers,
protected ListenableFuture<ClientHttpResponse> executeInternal(HttpHeaders headers,
byte[] bufferedOutput) throws IOException {
HttpComponentsClientHttpRequest.addHeaders(this.httpRequest, headers);
@ -83,50 +85,61 @@ final class HttpComponentsAsyncClientHttpRequest extends AbstractBufferingAsyncC @@ -83,50 +85,61 @@ final class HttpComponentsAsyncClientHttpRequest extends AbstractBufferingAsyncC
entityEnclosingRequest.setEntity(requestEntity);
}
final HttpResponseFutureCallback callback = new HttpResponseFutureCallback();
final Future<HttpResponse> futureResponse =
this.httpClient.execute(this.httpRequest, this.httpContext, null);
return new ClientHttpResponseFuture(futureResponse);
this.httpClient.execute(this.httpRequest, this.httpContext, callback);
return new ClientHttpResponseFuture(futureResponse, callback);
}
private static class HttpResponseFutureCallback implements FutureCallback<HttpResponse> {
private static class ClientHttpResponseFuture implements Future<ClientHttpResponse> {
private final Future<HttpResponse> futureResponse;
private final ListenableFutureCallbackRegistry<ClientHttpResponse> callbacks =
new ListenableFutureCallbackRegistry<ClientHttpResponse>();
public ClientHttpResponseFuture(Future<HttpResponse> futureResponse) {
this.futureResponse = futureResponse;
public void addCallback(
ListenableFutureCallback<? super ClientHttpResponse> callback) {
callbacks.addCallback(callback);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return futureResponse.cancel(mayInterruptIfRunning);
public void completed(HttpResponse result) {
callbacks.success(new HttpComponentsAsyncClientHttpResponse(result));
}
@Override
public boolean isCancelled() {
return futureResponse.isCancelled();
public void failed(Exception ex) {
callbacks.failure(ex);
}
@Override
public boolean isDone() {
return futureResponse.isDone();
public void cancelled() {
}
@Override
public ClientHttpResponse get()
throws InterruptedException, ExecutionException {
HttpResponse response = futureResponse.get();
return new HttpComponentsAsyncClientHttpResponse(response);
}
private static class ClientHttpResponseFuture extends FutureAdapter<ClientHttpResponse, HttpResponse>
implements ListenableFuture<ClientHttpResponse> {
private final HttpResponseFutureCallback callback;
private ClientHttpResponseFuture(Future<HttpResponse> futureResponse,
HttpResponseFutureCallback callback) {
super(futureResponse);
this.callback = callback;
}
@Override
public ClientHttpResponse get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
HttpResponse response = futureResponse.get(timeout, unit);
protected ClientHttpResponse adapt(HttpResponse response) {
return new HttpComponentsAsyncClientHttpResponse(response);
}
@Override
public void addCallback(
ListenableFutureCallback<? super ClientHttpResponse> callback) {
this.callback.addCallback(callback);
}
}

14
spring-web/src/main/java/org/springframework/http/client/SimpleBufferingAsyncClientHttpRequest.java

@ -23,12 +23,12 @@ import java.net.URISyntaxException; @@ -23,12 +23,12 @@ import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.util.FileCopyUtils;
import org.springframework.util.concurrent.ListenableFuture;
/**
* {@link org.springframework.http.client.ClientHttpRequest} implementation that uses
@ -45,10 +45,10 @@ final class SimpleBufferingAsyncClientHttpRequest extends AbstractBufferingAsync @@ -45,10 +45,10 @@ final class SimpleBufferingAsyncClientHttpRequest extends AbstractBufferingAsync
private final boolean outputStreaming;
private final AsyncTaskExecutor taskExecutor;
private final AsyncListenableTaskExecutor taskExecutor;
SimpleBufferingAsyncClientHttpRequest(HttpURLConnection connection,
boolean outputStreaming, AsyncTaskExecutor taskExecutor) {
boolean outputStreaming, AsyncListenableTaskExecutor taskExecutor) {
this.connection = connection;
this.outputStreaming = outputStreaming;
this.taskExecutor = taskExecutor;
@ -70,9 +70,9 @@ final class SimpleBufferingAsyncClientHttpRequest extends AbstractBufferingAsync @@ -70,9 +70,9 @@ final class SimpleBufferingAsyncClientHttpRequest extends AbstractBufferingAsync
}
@Override
protected Future<ClientHttpResponse> executeInternal(final HttpHeaders headers,
final byte[] bufferedOutput) throws IOException {
return taskExecutor.submit(new Callable<ClientHttpResponse>() {
protected ListenableFuture<ClientHttpResponse> executeInternal(
final HttpHeaders headers, final byte[] bufferedOutput) throws IOException {
return taskExecutor.submitListenable(new Callable<ClientHttpResponse>() {
@Override
public ClientHttpResponse call() throws Exception {
for (Map.Entry<String, List<String>> entry : headers.entrySet()) {

8
spring-web/src/main/java/org/springframework/http/client/SimpleClientHttpRequestFactory.java

@ -23,7 +23,7 @@ import java.net.URI; @@ -23,7 +23,7 @@ import java.net.URI;
import java.net.URL;
import java.net.URLConnection;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.http.HttpMethod;
import org.springframework.util.Assert;
@ -54,7 +54,7 @@ public class SimpleClientHttpRequestFactory @@ -54,7 +54,7 @@ public class SimpleClientHttpRequestFactory
private boolean outputStreaming = true;
private AsyncTaskExecutor taskExecutor;
private AsyncListenableTaskExecutor taskExecutor;
/**
@ -131,7 +131,7 @@ public class SimpleClientHttpRequestFactory @@ -131,7 +131,7 @@ public class SimpleClientHttpRequestFactory
* request}.
* @param taskExecutor the task executor
*/
public void setTaskExecutor(AsyncTaskExecutor taskExecutor) {
public void setTaskExecutor(AsyncListenableTaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
@ -149,7 +149,7 @@ public class SimpleClientHttpRequestFactory @@ -149,7 +149,7 @@ public class SimpleClientHttpRequestFactory
/**
* {@inheritDoc}
* <p>Setting the {@link #setTaskExecutor(AsyncTaskExecutor) taskExecutor} property
* <p>Setting the {@link #setTaskExecutor(org.springframework.core.task.AsyncListenableTaskExecutor) taskExecutor} property
* is required before calling this method.
*/
@Override

12
spring-web/src/main/java/org/springframework/http/client/SimpleStreamingAsyncClientHttpRequest.java

@ -24,12 +24,12 @@ import java.net.URISyntaxException; @@ -24,12 +24,12 @@ import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.util.StreamUtils;
import org.springframework.util.concurrent.ListenableFuture;
/**
* {@link org.springframework.http.client.ClientHttpRequest} implementation that uses
@ -51,10 +51,10 @@ final class SimpleStreamingAsyncClientHttpRequest extends AbstractAsyncClientHtt @@ -51,10 +51,10 @@ final class SimpleStreamingAsyncClientHttpRequest extends AbstractAsyncClientHtt
private final boolean outputStreaming;
private final AsyncTaskExecutor taskExecutor;
private final AsyncListenableTaskExecutor taskExecutor;
SimpleStreamingAsyncClientHttpRequest(HttpURLConnection connection, int chunkSize,
boolean outputStreaming, AsyncTaskExecutor taskExecutor) {
boolean outputStreaming, AsyncListenableTaskExecutor taskExecutor) {
this.connection = connection;
this.chunkSize = chunkSize;
this.outputStreaming = outputStreaming;
@ -106,9 +106,9 @@ final class SimpleStreamingAsyncClientHttpRequest extends AbstractAsyncClientHtt @@ -106,9 +106,9 @@ final class SimpleStreamingAsyncClientHttpRequest extends AbstractAsyncClientHtt
}
@Override
protected Future<ClientHttpResponse> executeInternal(final HttpHeaders headers)
protected ListenableFuture<ClientHttpResponse> executeInternal(final HttpHeaders headers)
throws IOException {
return taskExecutor.submit(new Callable<ClientHttpResponse>() {
return taskExecutor.submitListenable(new Callable<ClientHttpResponse>() {
@Override
public ClientHttpResponse call() throws Exception {
try {

61
spring-web/src/main/java/org/springframework/web/client/AsyncRestOperations.java

@ -26,6 +26,7 @@ import org.springframework.http.HttpEntity; @@ -26,6 +26,7 @@ import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.util.concurrent.ListenableFuture;
/**
* Interface specifying a basic set of asynchronous RESTful operations. Implemented by
@ -54,7 +55,7 @@ public interface AsyncRestOperations { @@ -54,7 +55,7 @@ public interface AsyncRestOperations {
* @param uriVariables the variables to expand the template
* @return the entity wrapped in a {@link Future}
*/
<T> Future<ResponseEntity<T>> getForEntity(String url, Class<T> responseType,
<T> ListenableFuture<ResponseEntity<T>> getForEntity(String url, Class<T> responseType,
Object... uriVariables) throws RestClientException;
/**
@ -66,7 +67,7 @@ public interface AsyncRestOperations { @@ -66,7 +67,7 @@ public interface AsyncRestOperations {
* @param uriVariables the map containing variables for the URI template
* @return the entity wrapped in a {@link Future}
*/
<T> Future<ResponseEntity<T>> getForEntity(String url, Class<T> responseType,
<T> ListenableFuture<ResponseEntity<T>> getForEntity(String url, Class<T> responseType,
Map<String, ?> uriVariables) throws RestClientException;
/**
@ -76,7 +77,7 @@ public interface AsyncRestOperations { @@ -76,7 +77,7 @@ public interface AsyncRestOperations {
* @param responseType the type of the return value
* @return the entity wrapped in a {@link Future}
*/
<T> Future<ResponseEntity<T>> getForEntity(URI url, Class<T> responseType)
<T> ListenableFuture<ResponseEntity<T>> getForEntity(URI url, Class<T> responseType)
throws RestClientException;
// HEAD
@ -88,7 +89,7 @@ public interface AsyncRestOperations { @@ -88,7 +89,7 @@ public interface AsyncRestOperations {
* @param uriVariables the variables to expand the template
* @return all HTTP headers of that resource wrapped in a {@link Future}
*/
Future<HttpHeaders> headForHeaders(String url, Object... uriVariables)
ListenableFuture<HttpHeaders> headForHeaders(String url, Object... uriVariables)
throws RestClientException;
/**
@ -98,7 +99,7 @@ public interface AsyncRestOperations { @@ -98,7 +99,7 @@ public interface AsyncRestOperations {
* @param uriVariables the map containing variables for the URI template
* @return all HTTP headers of that resource wrapped in a {@link Future}
*/
Future<HttpHeaders> headForHeaders(String url, Map<String, ?> uriVariables)
ListenableFuture<HttpHeaders> headForHeaders(String url, Map<String, ?> uriVariables)
throws RestClientException;
/**
@ -106,7 +107,7 @@ public interface AsyncRestOperations { @@ -106,7 +107,7 @@ public interface AsyncRestOperations {
* @param url the URL
* @return all HTTP headers of that resource wrapped in a {@link Future}
*/
Future<HttpHeaders> headForHeaders(URI url) throws RestClientException;
ListenableFuture<HttpHeaders> headForHeaders(URI url) throws RestClientException;
// POST
@ -121,7 +122,7 @@ public interface AsyncRestOperations { @@ -121,7 +122,7 @@ public interface AsyncRestOperations {
* @return the value for the {@code Location} header wrapped in a {@link Future}
* @see org.springframework.http.HttpEntity
*/
Future<URI> postForLocation(String url, HttpEntity<?> request, Object... uriVariables)
ListenableFuture<URI> postForLocation(String url, HttpEntity<?> request, Object... uriVariables)
throws RestClientException;
/**
@ -135,7 +136,7 @@ public interface AsyncRestOperations { @@ -135,7 +136,7 @@ public interface AsyncRestOperations {
* @return the value for the {@code Location} header wrapped in a {@link Future}
* @see org.springframework.http.HttpEntity
*/
Future<URI> postForLocation(String url, HttpEntity<?> request, Map<String, ?> uriVariables)
ListenableFuture<URI> postForLocation(String url, HttpEntity<?> request, Map<String, ?> uriVariables)
throws RestClientException;
/**
@ -147,7 +148,7 @@ public interface AsyncRestOperations { @@ -147,7 +148,7 @@ public interface AsyncRestOperations {
* @return the value for the {@code Location} header wrapped in a {@link Future}
* @see org.springframework.http.HttpEntity
*/
Future<URI> postForLocation(URI url, HttpEntity<?> request) throws RestClientException;
ListenableFuture<URI> postForLocation(URI url, HttpEntity<?> request) throws RestClientException;
/**
* Create a new resource by POSTing the given object to the URI template,
@ -159,7 +160,7 @@ public interface AsyncRestOperations { @@ -159,7 +160,7 @@ public interface AsyncRestOperations {
* @return the entity wrapped in a {@link Future}
* @see org.springframework.http.HttpEntity
*/
<T> Future<ResponseEntity<T>> postForEntity(String url, HttpEntity<?> request,
<T> ListenableFuture<ResponseEntity<T>> postForEntity(String url, HttpEntity<?> request,
Class<T> responseType, Object... uriVariables) throws RestClientException;
/**
@ -172,7 +173,7 @@ public interface AsyncRestOperations { @@ -172,7 +173,7 @@ public interface AsyncRestOperations {
* @return the entity wrapped in a {@link Future}
* @see org.springframework.http.HttpEntity
*/
<T> Future<ResponseEntity<T>> postForEntity(String url, HttpEntity<?> request,
<T> ListenableFuture<ResponseEntity<T>> postForEntity(String url, HttpEntity<?> request,
Class<T> responseType, Map<String, ?> uriVariables)
throws RestClientException;
@ -184,7 +185,7 @@ public interface AsyncRestOperations { @@ -184,7 +185,7 @@ public interface AsyncRestOperations {
* @return the entity wrapped in a {@link Future}
* @see org.springframework.http.HttpEntity
*/
<T> Future<ResponseEntity<T>> postForEntity(URI url, HttpEntity<?> request,
<T> ListenableFuture<ResponseEntity<T>> postForEntity(URI url, HttpEntity<?> request,
Class<T> responseType) throws RestClientException;
// PUT
@ -198,7 +199,7 @@ public interface AsyncRestOperations { @@ -198,7 +199,7 @@ public interface AsyncRestOperations {
* @param uriVariables the variables to expand the template
* @see HttpEntity
*/
Future<?> put(String url, HttpEntity<?> request, Object... uriVariables)
ListenableFuture<?> put(String url, HttpEntity<?> request, Object... uriVariables)
throws RestClientException;
/**
@ -210,7 +211,7 @@ public interface AsyncRestOperations { @@ -210,7 +211,7 @@ public interface AsyncRestOperations {
* @param uriVariables the variables to expand the template
* @see HttpEntity
*/
Future<?> put(String url, HttpEntity<?> request, Map<String, ?> uriVariables)
ListenableFuture<?> put(String url, HttpEntity<?> request, Map<String, ?> uriVariables)
throws RestClientException;
/**
@ -220,7 +221,7 @@ public interface AsyncRestOperations { @@ -220,7 +221,7 @@ public interface AsyncRestOperations {
* @param request the Object to be PUT, may be {@code null}
* @see HttpEntity
*/
Future<?> put(URI url, HttpEntity<?> request) throws RestClientException;
ListenableFuture<?> put(URI url, HttpEntity<?> request) throws RestClientException;
// DELETE
@ -231,7 +232,7 @@ public interface AsyncRestOperations { @@ -231,7 +232,7 @@ public interface AsyncRestOperations {
* @param url the URL
* @param uriVariables the variables to expand in the template
*/
Future<?> delete(String url, Object... uriVariables) throws RestClientException;
ListenableFuture<?> delete(String url, Object... uriVariables) throws RestClientException;
/**
* Asynchronously delete the resources at the specified URI.
@ -240,7 +241,7 @@ public interface AsyncRestOperations { @@ -240,7 +241,7 @@ public interface AsyncRestOperations {
* @param url the URL
* @param uriVariables the variables to expand in the template
*/
Future<?> delete(String url, Map<String, ?> uriVariables) throws RestClientException;
ListenableFuture<?> delete(String url, Map<String, ?> uriVariables) throws RestClientException;
/**
* Asynchronously delete the resources at the specified URI.
@ -248,7 +249,7 @@ public interface AsyncRestOperations { @@ -248,7 +249,7 @@ public interface AsyncRestOperations {
* <p>The Future will return a {@code null} result upon completion.
* @param url the URL
*/
Future<?> delete(URI url) throws RestClientException;
ListenableFuture<?> delete(URI url) throws RestClientException;
// OPTIONS
@ -259,7 +260,7 @@ public interface AsyncRestOperations { @@ -259,7 +260,7 @@ public interface AsyncRestOperations {
* @param uriVariables the variables to expand in the template
* @return the value of the allow header wrapped in a {@link Future}
*/
Future<Set<HttpMethod>> optionsForAllow(String url, Object... uriVariables)
ListenableFuture<Set<HttpMethod>> optionsForAllow(String url, Object... uriVariables)
throws RestClientException;
/**
@ -269,7 +270,7 @@ public interface AsyncRestOperations { @@ -269,7 +270,7 @@ public interface AsyncRestOperations {
* @param uriVariables the variables to expand in the template
* @return the value of the allow header wrapped in a {@link Future}
*/
Future<Set<HttpMethod>> optionsForAllow(String url, Map<String, ?> uriVariables)
ListenableFuture<Set<HttpMethod>> optionsForAllow(String url, Map<String, ?> uriVariables)
throws RestClientException;
/**
@ -277,7 +278,7 @@ public interface AsyncRestOperations { @@ -277,7 +278,7 @@ public interface AsyncRestOperations {
* @param url the URL
* @return the value of the allow header wrapped in a {@link Future}
*/
Future<Set<HttpMethod>> optionsForAllow(URI url) throws RestClientException;
ListenableFuture<Set<HttpMethod>> optionsForAllow(URI url) throws RestClientException;
// exchange
@ -295,7 +296,7 @@ public interface AsyncRestOperations { @@ -295,7 +296,7 @@ public interface AsyncRestOperations {
* @param uriVariables the variables to expand in the template
* @return the response as entity wrapped in a {@link Future}
*/
<T> Future<ResponseEntity<T>> exchange(String url, HttpMethod method,
<T> ListenableFuture<ResponseEntity<T>> exchange(String url, HttpMethod method,
HttpEntity<?> requestEntity, Class<T> responseType, Object... uriVariables)
throws RestClientException;
@ -312,7 +313,7 @@ public interface AsyncRestOperations { @@ -312,7 +313,7 @@ public interface AsyncRestOperations {
* @param uriVariables the variables to expand in the template
* @return the response as entity wrapped in a {@link Future}
*/
<T> Future<ResponseEntity<T>> exchange(String url, HttpMethod method,
<T> ListenableFuture<ResponseEntity<T>> exchange(String url, HttpMethod method,
HttpEntity<?> requestEntity, Class<T> responseType,
Map<String, ?> uriVariables) throws RestClientException;
@ -327,7 +328,7 @@ public interface AsyncRestOperations { @@ -327,7 +328,7 @@ public interface AsyncRestOperations {
* @param responseType the type of the return value
* @return the response as entity wrapped in a {@link Future}
*/
<T> Future<ResponseEntity<T>> exchange(URI url, HttpMethod method,
<T> ListenableFuture<ResponseEntity<T>> exchange(URI url, HttpMethod method,
HttpEntity<?> requestEntity, Class<T> responseType)
throws RestClientException;
@ -348,7 +349,7 @@ public interface AsyncRestOperations { @@ -348,7 +349,7 @@ public interface AsyncRestOperations {
* @param uriVariables the variables to expand in the template
* @return the response as entity wrapped in a {@link Future}
*/
<T> Future<ResponseEntity<T>> exchange(String url, HttpMethod method,
<T> ListenableFuture<ResponseEntity<T>> exchange(String url, HttpMethod method,
HttpEntity<?> requestEntity, ParameterizedTypeReference<T> responseType,
Object... uriVariables) throws RestClientException;
@ -368,7 +369,7 @@ public interface AsyncRestOperations { @@ -368,7 +369,7 @@ public interface AsyncRestOperations {
* @param uriVariables the variables to expand in the template
* @return the response as entity wrapped in a {@link Future}
*/
<T> Future<ResponseEntity<T>> exchange(String url, HttpMethod method,
<T> ListenableFuture<ResponseEntity<T>> exchange(String url, HttpMethod method,
HttpEntity<?> requestEntity, ParameterizedTypeReference<T> responseType,
Map<String, ?> uriVariables) throws RestClientException;
@ -387,7 +388,7 @@ public interface AsyncRestOperations { @@ -387,7 +388,7 @@ public interface AsyncRestOperations {
* @param responseType the type of the return value
* @return the response as entity wrapped in a {@link Future}
*/
<T> Future<ResponseEntity<T>> exchange(URI url, HttpMethod method,
<T> ListenableFuture<ResponseEntity<T>> exchange(URI url, HttpMethod method,
HttpEntity<?> requestEntity, ParameterizedTypeReference<T> responseType)
throws RestClientException;
@ -406,7 +407,7 @@ public interface AsyncRestOperations { @@ -406,7 +407,7 @@ public interface AsyncRestOperations {
* @param uriVariables the variables to expand in the template
* @return an arbitrary object, as returned by the {@link ResponseExtractor}
*/
<T> Future<T> execute(String url, HttpMethod method,
<T> ListenableFuture<T> execute(String url, HttpMethod method,
AsyncRequestCallback requestCallback, ResponseExtractor<T> responseExtractor,
Object... uriVariables) throws RestClientException;
@ -422,7 +423,7 @@ public interface AsyncRestOperations { @@ -422,7 +423,7 @@ public interface AsyncRestOperations {
* @param uriVariables the variables to expand in the template
* @return an arbitrary object, as returned by the {@link ResponseExtractor}
*/
<T> Future<T> execute(String url, HttpMethod method,
<T> ListenableFuture<T> execute(String url, HttpMethod method,
AsyncRequestCallback requestCallback, ResponseExtractor<T> responseExtractor,
Map<String, ?> uriVariables) throws RestClientException;
@ -436,7 +437,7 @@ public interface AsyncRestOperations { @@ -436,7 +437,7 @@ public interface AsyncRestOperations {
* @param responseExtractor object that extracts the return value from the response
* @return an arbitrary object, as returned by the {@link ResponseExtractor}
*/
<T> Future<T> execute(URI url, HttpMethod method,
<T> ListenableFuture<T> execute(URI url, HttpMethod method,
AsyncRequestCallback requestCallback, ResponseExtractor<T> responseExtractor)
throws RestClientException;

235
spring-web/src/main/java/org/springframework/web/client/AsyncRestTemplate.java

@ -24,11 +24,11 @@ import java.util.List; @@ -24,11 +24,11 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.http.HttpEntity;
@ -44,12 +44,15 @@ import org.springframework.http.client.SimpleClientHttpRequestFactory; @@ -44,12 +44,15 @@ import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.http.client.support.AsyncHttpAccessor;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureAdapter;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.util.UriTemplate;
/**
* <strong>Spring's central class for asynchronous client-side HTTP access.</strong>
* Exposes similar methods as {@link RestTemplate}, but returns {@link Future} wrappers
* as opposed to concrete results.
* Exposes similar methods as {@link RestTemplate}, but returns {@link ListenableFuture}
* wrappers as opposed to concrete results.
*
* <p>The {@code AsyncRestTemplate} exposes a synchronous {@link RestTemplate} via the
* {@link #getRestOperations()} method, and it shares its
@ -69,7 +72,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe @@ -69,7 +72,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe
/**
* Create a new instance of the {@link AsyncRestTemplate} using default settings.
* Create a new instance of the {@code AsyncRestTemplate} using default settings.
* <p>This constructor uses a {@link SimpleClientHttpRequestFactory} in combination
* with a {@link SimpleAsyncTaskExecutor} for asynchronous execution.
*/
@ -78,21 +81,22 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe @@ -78,21 +81,22 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe
}
/**
* Create a new instance of the {@link AsyncRestTemplate} using the given
* Create a new instance of the {@code AsyncRestTemplate} using the given
* {@link AsyncTaskExecutor}.
* <p>This constructor uses a {@link SimpleClientHttpRequestFactory} in combination
* with the given {@code AsyncTaskExecutor} for asynchronous execution.
*/
public AsyncRestTemplate(AsyncTaskExecutor taskExecutor) {
public AsyncRestTemplate(AsyncListenableTaskExecutor taskExecutor) {
Assert.notNull(taskExecutor, "AsyncTaskExecutor must not be null");
SimpleClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();
SimpleClientHttpRequestFactory requestFactory =
new SimpleClientHttpRequestFactory();
requestFactory.setTaskExecutor(taskExecutor);
this.syncTemplate = new RestTemplate(requestFactory);
setAsyncRequestFactory(requestFactory);
}
/**
* Create a new instance of the {@link AsyncRestTemplate} using the given
* Create a new instance of the {@code AsyncRestTemplate} using the given
* {@link AsyncClientHttpRequestFactory}.
* <p>This constructor will cast the given asynchronous
* {@code AsyncClientHttpRequestFactory} to a {@link ClientHttpRequestFactory}. Since
@ -105,22 +109,24 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe @@ -105,22 +109,24 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe
}
/**
* Creates a new instance of the {@link AsyncRestTemplate} using the given
* Creates a new instance of the {@code AsyncRestTemplate} using the given
* asynchronous and synchronous request factories.
* @param asyncRequestFactory the asynchronous request factory
* @param syncRequestFactory the synchronous request factory
*/
public AsyncRestTemplate(AsyncClientHttpRequestFactory asyncRequestFactory, ClientHttpRequestFactory syncRequestFactory) {
public AsyncRestTemplate(AsyncClientHttpRequestFactory asyncRequestFactory,
ClientHttpRequestFactory syncRequestFactory) {
this(asyncRequestFactory, new RestTemplate(syncRequestFactory));
}
/**
* Create a new instance of the {@link AsyncRestTemplate} using the given
* Create a new instance of the {@code AsyncRestTemplate} using the given
* {@link AsyncClientHttpRequestFactory} and synchronous {@link RestTemplate}.
* @param requestFactory the asynchronous request factory to use
* @param restTemplate the synchronous template to use
*/
public AsyncRestTemplate(AsyncClientHttpRequestFactory requestFactory, RestTemplate restTemplate) {
public AsyncRestTemplate(AsyncClientHttpRequestFactory requestFactory,
RestTemplate restTemplate) {
Assert.notNull(restTemplate, "'restTemplate' must not be null");
this.syncTemplate = restTemplate;
setAsyncRequestFactory(requestFactory);
@ -165,7 +171,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe @@ -165,7 +171,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe
// GET
@Override
public <T> Future<ResponseEntity<T>> getForEntity(String url, Class<T> responseType, Object... uriVariables)
public <T> ListenableFuture<ResponseEntity<T>> getForEntity(String url, Class<T> responseType, Object... uriVariables)
throws RestClientException {
AsyncRequestCallback requestCallback = acceptHeaderRequestCallback(responseType);
@ -174,7 +180,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe @@ -174,7 +180,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe
}
@Override
public <T> Future<ResponseEntity<T>> getForEntity(String url, Class<T> responseType,
public <T> ListenableFuture<ResponseEntity<T>> getForEntity(String url, Class<T> responseType,
Map<String, ?> urlVariables) throws RestClientException {
AsyncRequestCallback requestCallback = acceptHeaderRequestCallback(responseType);
@ -183,7 +189,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe @@ -183,7 +189,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe
}
@Override
public <T> Future<ResponseEntity<T>> getForEntity(URI url, Class<T> responseType) throws RestClientException {
public <T> ListenableFuture<ResponseEntity<T>> getForEntity(URI url, Class<T> responseType) throws RestClientException {
AsyncRequestCallback requestCallback = acceptHeaderRequestCallback(responseType);
ResponseExtractor<ResponseEntity<T>> responseExtractor = responseEntityExtractor(responseType);
return execute(url, HttpMethod.GET, requestCallback, responseExtractor);
@ -192,19 +198,19 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe @@ -192,19 +198,19 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe
// HEAD
@Override
public Future<HttpHeaders> headForHeaders(String url, Object... uriVariables) throws RestClientException {
public ListenableFuture<HttpHeaders> headForHeaders(String url, Object... uriVariables) throws RestClientException {
ResponseExtractor<HttpHeaders> headersExtractor = headersExtractor();
return execute(url, HttpMethod.HEAD, null, headersExtractor, uriVariables);
}
@Override
public Future<HttpHeaders> headForHeaders(String url, Map<String, ?> uriVariables) throws RestClientException {
public ListenableFuture<HttpHeaders> headForHeaders(String url, Map<String, ?> uriVariables) throws RestClientException {
ResponseExtractor<HttpHeaders> headersExtractor = headersExtractor();
return execute(url, HttpMethod.HEAD, null, headersExtractor, uriVariables);
}
@Override
public Future<HttpHeaders> headForHeaders(URI url) throws RestClientException {
public ListenableFuture<HttpHeaders> headForHeaders(URI url) throws RestClientException {
ResponseExtractor<HttpHeaders> headersExtractor = headersExtractor();
return execute(url, HttpMethod.HEAD, null, headersExtractor);
}
@ -212,39 +218,55 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe @@ -212,39 +218,55 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe
// POST
@Override
public Future<URI> postForLocation(String url, HttpEntity<?> request,
public ListenableFuture<URI> postForLocation(String url, HttpEntity<?> request,
Object... uriVariables) throws RestClientException {
AsyncRequestCallback requestCallback = httpEntityCallback(request);
ResponseExtractor<HttpHeaders> headersExtractor = headersExtractor();
Future<HttpHeaders> headersFuture =
ListenableFuture<HttpHeaders> headersFuture =
execute(url, HttpMethod.POST, requestCallback, headersExtractor,
uriVariables);
return extractLocationHeader(headersFuture);
}
@Override
public Future<URI> postForLocation(String url, HttpEntity<?> request,
public ListenableFuture<URI> postForLocation(String url, HttpEntity<?> request,
Map<String, ?> uriVariables) throws RestClientException {
AsyncRequestCallback requestCallback = httpEntityCallback(request);
ResponseExtractor<HttpHeaders> headersExtractor = headersExtractor();
Future<HttpHeaders> headersFuture =
ListenableFuture<HttpHeaders> headersFuture =
execute(url, HttpMethod.POST, requestCallback, headersExtractor,
uriVariables);
return extractLocationHeader(headersFuture);
}
@Override
public Future<URI> postForLocation(URI url, HttpEntity<?> request)
public ListenableFuture<URI> postForLocation(URI url, HttpEntity<?> request)
throws RestClientException {
AsyncRequestCallback requestCallback = httpEntityCallback(request);
ResponseExtractor<HttpHeaders> headersExtractor = headersExtractor();
Future<HttpHeaders> headersFuture =
ListenableFuture<HttpHeaders> headersFuture =
execute(url, HttpMethod.POST, requestCallback, headersExtractor);
return extractLocationHeader(headersFuture);
}
private static Future<URI> extractLocationHeader(final Future<HttpHeaders> headersFuture) {
return new Future<URI>() {
private static ListenableFuture<URI> extractLocationHeader(final ListenableFuture<HttpHeaders> headersFuture) {
return new ListenableFuture<URI>() {
@Override
public void addCallback(final ListenableFutureCallback<? super URI> callback) {
headersFuture.addCallback(new ListenableFutureCallback<HttpHeaders>() {
@Override
public void onSuccess(HttpHeaders result) {
callback.onSuccess(result.getLocation());
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
});
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return headersFuture.cancel(mayInterruptIfRunning);
@ -272,7 +294,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe @@ -272,7 +294,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe
}
@Override
public <T> Future<ResponseEntity<T>> postForEntity(String url, HttpEntity<?> request,
public <T> ListenableFuture<ResponseEntity<T>> postForEntity(String url, HttpEntity<?> request,
Class<T> responseType, Object... uriVariables) throws RestClientException {
AsyncRequestCallback requestCallback = httpEntityCallback(request, responseType);
ResponseExtractor<ResponseEntity<T>> responseExtractor =
@ -282,7 +304,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe @@ -282,7 +304,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe
}
@Override
public <T> Future<ResponseEntity<T>> postForEntity(String url, HttpEntity<?> request,
public <T> ListenableFuture<ResponseEntity<T>> postForEntity(String url, HttpEntity<?> request,
Class<T> responseType, Map<String, ?> uriVariables)
throws RestClientException {
AsyncRequestCallback requestCallback = httpEntityCallback(request, responseType);
@ -293,7 +315,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe @@ -293,7 +315,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe
}
@Override
public <T> Future<ResponseEntity<T>> postForEntity(URI url, HttpEntity<?> request,
public <T> ListenableFuture<ResponseEntity<T>> postForEntity(URI url, HttpEntity<?> request,
Class<T> responseType) throws RestClientException {
AsyncRequestCallback requestCallback = httpEntityCallback(request, responseType);
ResponseExtractor<ResponseEntity<T>> responseExtractor =
@ -304,21 +326,21 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe @@ -304,21 +326,21 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe
// PUT
@Override
public Future<?> put(String url, HttpEntity<?> request, Object... uriVariables)
public ListenableFuture<?> put(String url, HttpEntity<?> request, Object... uriVariables)
throws RestClientException {
AsyncRequestCallback requestCallback = httpEntityCallback(request);
return execute(url, HttpMethod.PUT, requestCallback, null, uriVariables);
}
@Override
public Future<?> put(String url, HttpEntity<?> request,
public ListenableFuture<?> put(String url, HttpEntity<?> request,
Map<String, ?> uriVariables) throws RestClientException {
AsyncRequestCallback requestCallback = httpEntityCallback(request);
return execute(url, HttpMethod.PUT, requestCallback, null, uriVariables);
}
@Override
public Future<?> put(URI url, HttpEntity<?> request) throws RestClientException {
public ListenableFuture<?> put(URI url, HttpEntity<?> request) throws RestClientException {
AsyncRequestCallback requestCallback = httpEntityCallback(request);
return execute(url, HttpMethod.PUT, requestCallback, null);
}
@ -326,47 +348,64 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe @@ -326,47 +348,64 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe
// DELETE
@Override
public Future<?> delete(String url, Object... urlVariables)
public ListenableFuture<?> delete(String url, Object... urlVariables)
throws RestClientException {
return execute(url, HttpMethod.DELETE, null, null, urlVariables);
}
@Override
public Future<?> delete(String url, Map<String, ?> urlVariables)
public ListenableFuture<?> delete(String url, Map<String, ?> urlVariables)
throws RestClientException {
return execute(url, HttpMethod.DELETE, null, null, urlVariables);
}
@Override
public Future<?> delete(URI url) throws RestClientException {
public ListenableFuture<?> delete(URI url) throws RestClientException {
return execute(url, HttpMethod.DELETE, null, null);
}
// OPTIONS
@Override
public Future<Set<HttpMethod>> optionsForAllow(String url, Object... uriVariables) throws RestClientException {
public ListenableFuture<Set<HttpMethod>> optionsForAllow(String url, Object... uriVariables) throws RestClientException {
ResponseExtractor<HttpHeaders> headersExtractor = headersExtractor();
Future<HttpHeaders> headersFuture = execute(url, HttpMethod.OPTIONS, null, headersExtractor, uriVariables);
ListenableFuture<HttpHeaders> headersFuture = execute(url, HttpMethod.OPTIONS, null, headersExtractor, uriVariables);
return extractAllowHeader(headersFuture);
}
@Override
public Future<Set<HttpMethod>> optionsForAllow(String url, Map<String, ?> uriVariables) throws RestClientException {
public ListenableFuture<Set<HttpMethod>> optionsForAllow(String url, Map<String, ?> uriVariables) throws RestClientException {
ResponseExtractor<HttpHeaders> headersExtractor = headersExtractor();
Future<HttpHeaders> headersFuture = execute(url, HttpMethod.OPTIONS, null, headersExtractor, uriVariables);
ListenableFuture<HttpHeaders> headersFuture = execute(url, HttpMethod.OPTIONS, null, headersExtractor, uriVariables);
return extractAllowHeader(headersFuture);
}
@Override
public Future<Set<HttpMethod>> optionsForAllow(URI url) throws RestClientException {
public ListenableFuture<Set<HttpMethod>> optionsForAllow(URI url) throws RestClientException {
ResponseExtractor<HttpHeaders> headersExtractor = headersExtractor();
Future<HttpHeaders> headersFuture = execute(url, HttpMethod.OPTIONS, null, headersExtractor);
ListenableFuture<HttpHeaders> headersFuture = execute(url, HttpMethod.OPTIONS, null, headersExtractor);
return extractAllowHeader(headersFuture);
}
private static Future<Set<HttpMethod>> extractAllowHeader(final Future<HttpHeaders> headersFuture) {
return new Future<Set<HttpMethod>>() {
private static ListenableFuture<Set<HttpMethod>> extractAllowHeader(final ListenableFuture<HttpHeaders> headersFuture) {
return new ListenableFuture<Set<HttpMethod>>() {
@Override
public void addCallback(
final ListenableFutureCallback<? super Set<HttpMethod>> callback) {
headersFuture.addCallback(new ListenableFutureCallback<HttpHeaders>() {
@Override
public void onSuccess(HttpHeaders result) {
callback.onSuccess(result.getAllow());
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
});
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return headersFuture.cancel(mayInterruptIfRunning);
@ -397,7 +436,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe @@ -397,7 +436,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe
// exchange
@Override
public <T> Future<ResponseEntity<T>> exchange(String url, HttpMethod method,
public <T> ListenableFuture<ResponseEntity<T>> exchange(String url, HttpMethod method,
HttpEntity<?> requestEntity, Class<T> responseType, Object... uriVariables)
throws RestClientException {
AsyncRequestCallback requestCallback =
@ -408,7 +447,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe @@ -408,7 +447,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe
}
@Override
public <T> Future<ResponseEntity<T>> exchange(String url, HttpMethod method,
public <T> ListenableFuture<ResponseEntity<T>> exchange(String url, HttpMethod method,
HttpEntity<?> requestEntity, Class<T> responseType,
Map<String, ?> uriVariables) throws RestClientException {
AsyncRequestCallback requestCallback =
@ -419,7 +458,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe @@ -419,7 +458,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe
}
@Override
public <T> Future<ResponseEntity<T>> exchange(URI url, HttpMethod method,
public <T> ListenableFuture<ResponseEntity<T>> exchange(URI url, HttpMethod method,
HttpEntity<?> requestEntity, Class<T> responseType)
throws RestClientException {
AsyncRequestCallback requestCallback =
@ -430,7 +469,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe @@ -430,7 +469,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe
}
@Override
public <T> Future<ResponseEntity<T>> exchange(String url, HttpMethod method,
public <T> ListenableFuture<ResponseEntity<T>> exchange(String url, HttpMethod method,
HttpEntity<?> requestEntity, ParameterizedTypeReference<T> responseType,
Object... uriVariables) throws RestClientException {
Type type = responseType.getType();
@ -441,7 +480,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe @@ -441,7 +480,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe
}
@Override
public <T> Future<ResponseEntity<T>> exchange(String url, HttpMethod method,
public <T> ListenableFuture<ResponseEntity<T>> exchange(String url, HttpMethod method,
HttpEntity<?> requestEntity, ParameterizedTypeReference<T> responseType,
Map<String, ?> uriVariables) throws RestClientException {
Type type = responseType.getType();
@ -452,7 +491,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe @@ -452,7 +491,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe
}
@Override
public <T> Future<ResponseEntity<T>> exchange(URI url, HttpMethod method,
public <T> ListenableFuture<ResponseEntity<T>> exchange(URI url, HttpMethod method,
HttpEntity<?> requestEntity, ParameterizedTypeReference<T> responseType)
throws RestClientException {
Type type = responseType.getType();
@ -466,7 +505,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe @@ -466,7 +505,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe
// general execution
@Override
public <T> Future<T> execute(String url, HttpMethod method,
public <T> ListenableFuture<T> execute(String url, HttpMethod method,
AsyncRequestCallback requestCallback, ResponseExtractor<T> responseExtractor,
Object... urlVariables) throws RestClientException {
@ -475,7 +514,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe @@ -475,7 +514,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe
}
@Override
public <T> Future<T> execute(String url, HttpMethod method,
public <T> ListenableFuture<T> execute(String url, HttpMethod method,
AsyncRequestCallback requestCallback, ResponseExtractor<T> responseExtractor,
Map<String, ?> urlVariables) throws RestClientException {
@ -484,7 +523,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe @@ -484,7 +523,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe
}
@Override
public <T> Future<T> execute(URI url, HttpMethod method,
public <T> ListenableFuture<T> execute(URI url, HttpMethod method,
AsyncRequestCallback requestCallback, ResponseExtractor<T> responseExtractor)
throws RestClientException {
@ -504,7 +543,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe @@ -504,7 +543,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe
* @return an arbitrary object, as returned by the {@link ResponseExtractor}
*/
@SuppressWarnings("unchecked")
protected <T> Future<T> doExecute(URI url, HttpMethod method, AsyncRequestCallback requestCallback,
protected <T> ListenableFuture<T> doExecute(URI url, HttpMethod method, AsyncRequestCallback requestCallback,
ResponseExtractor<T> responseExtractor) throws RestClientException {
Assert.notNull(url, "'url' must not be null");
@ -514,13 +553,9 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe @@ -514,13 +553,9 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe
if (requestCallback != null) {
requestCallback.doWithRequest(request);
}
Future<ClientHttpResponse> responseFuture = request.executeAsync();
if (responseExtractor != null) {
return new ResponseExtractorFuture<T>(method, url, responseFuture, responseExtractor);
}
else {
return (Future<T>) new VoidResponseFuture(method, url, responseFuture);
}
ListenableFuture<ClientHttpResponse> responseFuture = request.executeAsync();
return new ResponseExtractorFuture<T>(method, url, responseFuture,
responseExtractor);
}
catch (IOException ex) {
throw new ResourceAccessException("I/O error on " + method.name() +
@ -594,47 +629,30 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe @@ -594,47 +629,30 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe
return this.syncTemplate.headersExtractor();
}
private abstract class ResponseFuture<T> implements Future<T> {
/**
* Future returned from
* {@link #doExecute(URI, HttpMethod, AsyncRequestCallback, ResponseExtractor)}
*/
private class ResponseExtractorFuture<T>
extends ListenableFutureAdapter<T, ClientHttpResponse> {
private final HttpMethod method;
private final URI url;
private final Future<ClientHttpResponse> responseFuture;
private final ResponseExtractor<T> responseExtractor;
public ResponseFuture(HttpMethod method, URI url, Future<ClientHttpResponse> responseFuture) {
public ResponseExtractorFuture(HttpMethod method, URI url,
ListenableFuture<ClientHttpResponse> clientHttpResponseFuture,
ResponseExtractor<T> responseExtractor) {
super(clientHttpResponseFuture);
this.method = method;
this.url = url;
this.responseFuture = responseFuture;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return this.responseFuture.cancel(mayInterruptIfRunning);
}
@Override
public boolean isCancelled() {
return this.responseFuture.isCancelled();
}
@Override
public boolean isDone() {
return this.responseFuture.isDone();
}
@Override
public T get() throws InterruptedException, ExecutionException {
return getInternal(this.responseFuture.get());
this.responseExtractor = responseExtractor;
}
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return getInternal(this.responseFuture.get(timeout, unit));
}
private T getInternal(ClientHttpResponse response) throws ExecutionException {
protected final T adapt(ClientHttpResponse response) throws ExecutionException {
try {
if (!getErrorHandler().hasError(response)) {
logResponseStatus(this.method, this.url, response);
@ -642,7 +660,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe @@ -642,7 +660,7 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe
else {
handleResponseError(this.method, this.url, response);
}
return extractData(response);
return convertResponse(response);
}
catch (IOException ex) {
throw new ExecutionException(ex);
@ -654,42 +672,13 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe @@ -654,42 +672,13 @@ public class AsyncRestTemplate extends AsyncHttpAccessor implements AsyncRestOpe
}
}
protected abstract T extractData(ClientHttpResponse response)
throws IOException;
}
private class ResponseExtractorFuture<T> extends ResponseFuture<T> {
private final ResponseExtractor<T> responseExtractor;
public ResponseExtractorFuture(HttpMethod method, URI url, Future<ClientHttpResponse> responseFuture,
ResponseExtractor<T> responseExtractor) {
super(method, url, responseFuture);
this.responseExtractor = responseExtractor;
protected T convertResponse(ClientHttpResponse response) throws IOException {
return responseExtractor != null ? responseExtractor.extractData(response) :
null;
}
@Override
protected T extractData(ClientHttpResponse response) throws IOException {
return responseExtractor.extractData(response);
}
}
private class VoidResponseFuture extends ResponseFuture<Void> {
public VoidResponseFuture(HttpMethod method, URI url, Future<ClientHttpResponse> responseFuture) {
super(method, url, responseFuture);
}
@Override
protected Void extractData(ClientHttpResponse response) throws IOException {
return null;
}
}
/**
* Adapts a {@link RequestCallback} to the {@link AsyncRequestCallback} interface.
*/

43
spring-web/src/test/java/org/springframework/http/client/AbstractAsyncHttpRequestFactoryTestCase.java

@ -34,6 +34,8 @@ import org.springframework.http.HttpStatus; @@ -34,6 +34,8 @@ import org.springframework.http.HttpStatus;
import org.springframework.http.StreamingHttpOutputMessage;
import org.springframework.util.FileCopyUtils;
import org.springframework.util.StreamUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
public abstract class AbstractAsyncHttpRequestFactoryTestCase extends
AbstractJettyServerTestCase {
@ -62,6 +64,47 @@ public abstract class AbstractAsyncHttpRequestFactoryTestCase extends @@ -62,6 +64,47 @@ public abstract class AbstractAsyncHttpRequestFactoryTestCase extends
response.getStatusCode());
}
@Test
public void statusCallback() throws Exception {
URI uri = new URI(baseUrl + "/status/notfound");
AsyncClientHttpRequest request = factory.createAsyncRequest(uri, HttpMethod.GET);
assertEquals("Invalid HTTP method", HttpMethod.GET, request.getMethod());
assertEquals("Invalid HTTP URI", uri, request.getURI());
Future<ClientHttpResponse> futureResponse = request.executeAsync();
if (futureResponse instanceof ListenableFuture) {
ListenableFuture<ClientHttpResponse> listenableFuture =
(ListenableFuture<ClientHttpResponse>) futureResponse;
listenableFuture.addCallback(new ListenableFutureCallback<ClientHttpResponse>() {
@Override
public void onSuccess(ClientHttpResponse result) {
try {
System.out.println("SUCCESS! " + result.getStatusCode());
System.out.println("Callback: " + System.currentTimeMillis());
System.out.println(Thread.currentThread().getId());
assertEquals("Invalid status code", HttpStatus.NOT_FOUND,
result.getStatusCode());
}
catch (IOException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
@Override
public void onFailure(Throwable t) {
System.out.println("FAILURE: " + t);
}
});
}
ClientHttpResponse response = futureResponse.get();
System.out.println("Main thread: " + System.currentTimeMillis());
assertEquals("Invalid status code", HttpStatus.NOT_FOUND,
response.getStatusCode());
System.out.println(Thread.currentThread().getId());
}
@Test
public void echo() throws Exception {
AsyncClientHttpRequest

4
spring-web/src/test/java/org/springframework/http/client/BufferedSimpleAsyncHttpRequestFactoryTests.java

@ -20,7 +20,7 @@ import java.net.ProtocolException; @@ -20,7 +20,7 @@ import java.net.ProtocolException;
import org.junit.Test;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.http.HttpMethod;
@ -29,7 +29,7 @@ public class BufferedSimpleAsyncHttpRequestFactoryTests extends AbstractAsyncHtt @@ -29,7 +29,7 @@ public class BufferedSimpleAsyncHttpRequestFactoryTests extends AbstractAsyncHtt
@Override
protected AsyncClientHttpRequestFactory createRequestFactory() {
SimpleClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();
AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
AsyncListenableTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
requestFactory.setTaskExecutor(taskExecutor);
return requestFactory;
}

263
spring-web/src/test/java/org/springframework/web/client/AsyncRestTemplateIntegrationTests.java

@ -40,6 +40,8 @@ import org.springframework.http.ResponseEntity; @@ -40,6 +40,8 @@ import org.springframework.http.ResponseEntity;
import org.springframework.http.client.HttpComponentsAsyncClientHttpRequestFactory;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
/** @author Arjen Poutsma */
public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCase {
@ -63,6 +65,37 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa @@ -63,6 +65,37 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa
assertEquals("Invalid status code", HttpStatus.OK, entity.getStatusCode());
}
@Test
public void multipleFutureGets() throws ExecutionException, InterruptedException {
Future<ResponseEntity<String>>
futureEntity = template.getForEntity(baseUrl + "/{method}", String.class, "get");
futureEntity.get();
futureEntity.get();
}
@Test
public void getEntityCallback() throws ExecutionException, InterruptedException {
ListenableFuture<ResponseEntity<String>>
futureEntity = template.getForEntity(baseUrl + "/{method}", String.class, "get");
futureEntity.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
@Override
public void onSuccess(ResponseEntity<String> entity) {
assertEquals("Invalid content", helloWorld, entity.getBody());
assertFalse("No headers", entity.getHeaders().isEmpty());
assertEquals("Invalid content-type", contentType, entity.getHeaders().getContentType());
assertEquals("Invalid status code", HttpStatus.OK, entity.getStatusCode());
}
@Override
public void onFailure(Throwable t) {
fail(t.getMessage());
}
});
// wait till done
while (!futureEntity.isDone()) {
}
}
@Test
public void getNoResponse() throws ExecutionException, InterruptedException {
Future<ResponseEntity<String>>
@ -111,27 +144,60 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa @@ -111,27 +144,60 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa
assertTrue("No Content-Type header", headers.containsKey("Content-Type"));
}
@Test
public void headForHeadersCallback() throws ExecutionException, InterruptedException {
ListenableFuture<HttpHeaders> headersFuture = template.headForHeaders(baseUrl + "/get");
headersFuture.addCallback(new ListenableFutureCallback<HttpHeaders>() {
@Override
public void onSuccess(HttpHeaders result) {
assertTrue("No Content-Type header", result.containsKey("Content-Type"));
}
@Override
public void onFailure(Throwable t) {
fail(t.getMessage());
}
});
while (!headersFuture.isDone()) {
}
}
@Test
public void postForLocation()
throws URISyntaxException, ExecutionException, InterruptedException {
HttpEntity<String> requestEntity = new HttpEntity<>(helloWorld);
Future<URI> locationFuture = template.postForLocation(baseUrl + "/{method}", requestEntity,
HttpHeaders entityHeaders = new HttpHeaders();
entityHeaders.setContentType(new MediaType("text", "plain", Charset.forName("ISO-8859-15")));
HttpEntity<String> entity = new HttpEntity<String>(helloWorld, entityHeaders);
Future<URI>
locationFuture = template.postForLocation(baseUrl + "/{method}", entity,
"post");
URI location = locationFuture.get();
assertEquals("Invalid location", new URI(baseUrl + "/post/1"), location);
}
@Test
public void postForLocationEntity()
public void postForLocationCallback()
throws URISyntaxException, ExecutionException, InterruptedException {
HttpHeaders entityHeaders = new HttpHeaders();
entityHeaders.setContentType(new MediaType("text", "plain", Charset.forName("ISO-8859-15")));
HttpEntity<String> entity = new HttpEntity<String>(helloWorld, entityHeaders);
Future<URI>
final URI expected = new URI(baseUrl + "/post/1");
ListenableFuture<URI>
locationFuture = template.postForLocation(baseUrl + "/{method}", entity,
"post");
URI location = locationFuture.get();
assertEquals("Invalid location", new URI(baseUrl + "/post/1"), location);
locationFuture.addCallback(new ListenableFutureCallback<URI>() {
@Override
public void onSuccess(URI result) {
assertEquals("Invalid location", expected, result);
}
@Override
public void onFailure(Throwable t) {
fail(t.getMessage());
}
});
while (!locationFuture.isDone()) {
}
}
@Test
@ -145,6 +211,28 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa @@ -145,6 +211,28 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa
assertEquals("Invalid content", helloWorld, responseEntity.getBody());
}
@Test
public void postForEntityCallback()
throws URISyntaxException, ExecutionException, InterruptedException {
HttpEntity<String> requestEntity = new HttpEntity<>(helloWorld);
ListenableFuture<ResponseEntity<String>>
responseEntityFuture = template.postForEntity(baseUrl + "/{method}", requestEntity,
String.class, "post");
responseEntityFuture.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
@Override
public void onSuccess(ResponseEntity<String> result) {
assertEquals("Invalid content", helloWorld, result.getBody());
}
@Override
public void onFailure(Throwable t) {
fail(t.getMessage());
}
});
while (!responseEntityFuture.isDone()) {
}
}
@Test
public void put()
throws URISyntaxException, ExecutionException, InterruptedException {
@ -155,17 +243,59 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa @@ -155,17 +243,59 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa
responseEntityFuture.get();
}
@Test
public void putCallback()
throws URISyntaxException, ExecutionException, InterruptedException {
HttpEntity<String> requestEntity = new HttpEntity<>(helloWorld);
ListenableFuture<?>
responseEntityFuture = template.put(baseUrl + "/{method}", requestEntity,
"put");
responseEntityFuture.addCallback(new ListenableFutureCallback<Object>() {
@Override
public void onSuccess(Object result) {
assertNull(result);
}
@Override
public void onFailure(Throwable t) {
fail(t.getMessage());
}
});
while (!responseEntityFuture.isDone()) {
}
}
@Test
public void delete()
throws URISyntaxException, ExecutionException, InterruptedException {
Future<?> deletedFuture = template.delete(new URI(baseUrl + "/delete"));
deletedFuture.get();
}
@Test
public void deleteCallback()
throws URISyntaxException, ExecutionException, InterruptedException {
ListenableFuture<?> deletedFuture = template.delete(new URI(baseUrl + "/delete"));
deletedFuture.addCallback(new ListenableFutureCallback<Object>() {
@Override
public void onSuccess(Object result) {
assertNull(result);
}
@Override
public void onFailure(Throwable t) {
fail(t.getMessage());
}
});
while (!deletedFuture.isDone()) {
}
}
@Test
public void notFound() throws ExecutionException, InterruptedException {
try {
Future<Void> future = template.execute(baseUrl + "/status/notfound", HttpMethod.GET, null, null);
Future<?> future = template.execute(baseUrl + "/status/notfound", HttpMethod.GET, null, null);
future.get();
fail("HttpClientErrorException expected");
}
@ -176,6 +306,30 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa @@ -176,6 +306,30 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa
}
}
@Test
public void notFoundCallback() throws ExecutionException, InterruptedException {
ListenableFuture<?> future =
template.execute(baseUrl + "/status/notfound", HttpMethod.GET, null,
null);
future.addCallback(new ListenableFutureCallback<Object>() {
@Override
public void onSuccess(Object result) {
fail("onSuccess not expected");
}
@Override
public void onFailure(Throwable t) {
assertTrue(t instanceof HttpClientErrorException);
HttpClientErrorException ex = (HttpClientErrorException) t;
assertEquals(HttpStatus.NOT_FOUND, ex.getStatusCode());
assertNotNull(ex.getStatusText());
assertNotNull(ex.getResponseBodyAsString());
}
});
while (!future.isDone()) {
}
}
@Test
public void serverError() throws ExecutionException, InterruptedException {
try {
@ -190,6 +344,28 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa @@ -190,6 +344,28 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa
}
}
@Test
public void serverErrorCallback() throws ExecutionException, InterruptedException {
ListenableFuture<Void> future = template.execute(baseUrl + "/status/server", HttpMethod.GET, null, null);
future.addCallback(new ListenableFutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
fail("onSuccess not expected");
}
@Override
public void onFailure(Throwable t) {
assertTrue(t instanceof HttpServerErrorException);
HttpServerErrorException ex = (HttpServerErrorException) t;
assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, ex.getStatusCode());
assertNotNull(ex.getStatusText());
assertNotNull(ex.getResponseBodyAsString());
}
});
while (!future.isDone()) {
}
}
@Test
public void optionsForAllow()
throws URISyntaxException, ExecutionException, InterruptedException {
@ -200,6 +376,27 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa @@ -200,6 +376,27 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa
EnumSet.of(HttpMethod.GET, HttpMethod.OPTIONS, HttpMethod.HEAD, HttpMethod.TRACE), allowed);
}
@Test
public void optionsForAllowCallback()
throws URISyntaxException, ExecutionException, InterruptedException {
ListenableFuture<Set<HttpMethod>>
allowedFuture = template.optionsForAllow(new URI(baseUrl + "/get"));
allowedFuture.addCallback(new ListenableFutureCallback<Set<HttpMethod>>() {
@Override
public void onSuccess(Set<HttpMethod> result) {
assertEquals("Invalid response",
EnumSet.of(HttpMethod.GET, HttpMethod.OPTIONS, HttpMethod.HEAD, HttpMethod.TRACE), result);
}
@Override
public void onFailure(Throwable t) {
fail(t.getMessage());
}
});
while (!allowedFuture.isDone()) {
}
}
@Test
@SuppressWarnings("unchecked")
public void exchangeGet() throws Exception {
@ -213,6 +410,30 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa @@ -213,6 +410,30 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa
assertEquals("Invalid content", helloWorld, response.getBody());
}
@Test
@SuppressWarnings("unchecked")
public void exchangeGetCallback() throws Exception {
HttpHeaders requestHeaders = new HttpHeaders();
requestHeaders.set("MyHeader", "MyValue");
HttpEntity<?> requestEntity = new HttpEntity(requestHeaders);
ListenableFuture<ResponseEntity<String>> responseFuture =
template.exchange(baseUrl + "/{method}", HttpMethod.GET, requestEntity,
String.class, "get");
responseFuture.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
@Override
public void onSuccess(ResponseEntity<String> result) {
assertEquals("Invalid content", helloWorld, result.getBody());
}
@Override
public void onFailure(Throwable t) {
fail(t.getMessage());
}
});
while (!responseFuture.isDone()) {
}
}
@Test
public void exchangePost() throws Exception {
HttpHeaders requestHeaders = new HttpHeaders();
@ -228,6 +449,34 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa @@ -228,6 +449,34 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa
assertFalse(result.hasBody());
}
@Test
public void exchangePostCallback() throws Exception {
HttpHeaders requestHeaders = new HttpHeaders();
requestHeaders.set("MyHeader", "MyValue");
requestHeaders.setContentType(MediaType.TEXT_PLAIN);
HttpEntity<String> requestEntity = new HttpEntity<String>(helloWorld, requestHeaders);
ListenableFuture<ResponseEntity<Void>>
resultFuture = template.exchange(baseUrl + "/{method}", HttpMethod.POST,
requestEntity, Void.class, "post");
final URI expected =new URI(baseUrl + "/post/1");
resultFuture.addCallback(new ListenableFutureCallback<ResponseEntity<Void>>() {
@Override
public void onSuccess(ResponseEntity<Void> result) {
assertEquals("Invalid location", expected,
result.getHeaders().getLocation());
assertFalse(result.hasBody());
}
@Override
public void onFailure(Throwable t) {
fail(t.getMessage());
}
});
while (!resultFuture.isDone()) {
}
}
@Test
public void multipart() throws UnsupportedEncodingException, ExecutionException,
InterruptedException {

Loading…
Cancel
Save