Browse Source

Support CompletableFuture in @MessageMapping handler methods

Issue: SPR-12207
pull/808/head
Sebastien Deleuze 11 years ago
parent
commit
5255e7ae21
  1. 93
      spring-core/src/main/java/org/springframework/util/concurrent/CompletableToListenableFutureAdapter.java
  2. 46
      spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/CompletableFutureReturnValueHandler.java
  3. 9
      spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandler.java
  4. 64
      spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java

93
spring-core/src/main/java/org/springframework/util/concurrent/CompletableToListenableFutureAdapter.java

@ -0,0 +1,93 @@ @@ -0,0 +1,93 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.util.concurrent;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import org.springframework.lang.UsesJava8;
/**
* Adapts a {@link CompletableFuture} into a {@link ListenableFuture}.
*
* @author Sebastien Deleuze
* @since 4.2
*/
@UsesJava8
public class CompletableToListenableFutureAdapter<T> implements ListenableFuture<T> {
private final CompletableFuture<T> completableFuture;
private final ListenableFutureCallbackRegistry<T> callbacks = new ListenableFutureCallbackRegistry<T>();
public CompletableToListenableFutureAdapter(CompletableFuture<T> completableFuture) {
this.completableFuture = completableFuture;
this.completableFuture.handle(new BiFunction<T, Throwable, Object>() {
@Override
public Object apply(T result, Throwable ex) {
if (ex != null) {
callbacks.failure(ex);
}
else {
callbacks.success(result);
}
return null;
}
});
}
@Override
public void addCallback(ListenableFutureCallback<? super T> callback) {
this.callbacks.addCallback(callback);
}
@Override
public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) {
this.callbacks.addSuccessCallback(successCallback);
this.callbacks.addFailureCallback(failureCallback);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return this.completableFuture.cancel(mayInterruptIfRunning);
}
@Override
public boolean isCancelled() {
return this.completableFuture.isCancelled();
}
@Override
public boolean isDone() {
return this.completableFuture.isDone();
}
@Override
public T get() throws InterruptedException, ExecutionException {
return this.completableFuture.get();
}
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return this.completableFuture.get(timeout, unit);
}
}

46
spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/CompletableFutureReturnValueHandler.java

@ -0,0 +1,46 @@ @@ -0,0 +1,46 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.handler.invocation;
import java.util.concurrent.CompletableFuture;
import org.springframework.core.MethodParameter;
import org.springframework.lang.UsesJava8;
import org.springframework.util.concurrent.CompletableToListenableFutureAdapter;
import org.springframework.util.concurrent.ListenableFuture;
/**
* An {@link AsyncHandlerMethodReturnValueHandler} for {@link CompletableFuture} return type handling.
*
* @author Sebastien Deleuze
* @since 4.2
*/
@UsesJava8
public class CompletableFutureReturnValueHandler extends AbstractAsyncReturnValueHandler {
@Override
public boolean supportsReturnType(MethodParameter returnType) {
return CompletableFuture.class.isAssignableFrom(returnType.getParameterType());
}
@Override
@SuppressWarnings("unchecked")
public ListenableFuture<?> toListenableFuture(Object returnValue, MethodParameter returnType) {
return new CompletableToListenableFutureAdapter<Object>((CompletableFuture<Object>)returnValue);
}
}

9
spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandler.java

@ -50,6 +50,7 @@ import org.springframework.messaging.handler.annotation.support.MessageMethodArg @@ -50,6 +50,7 @@ import org.springframework.messaging.handler.annotation.support.MessageMethodArg
import org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver;
import org.springframework.messaging.handler.invocation.AbstractExceptionHandlerMethodResolver;
import org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler;
import org.springframework.messaging.handler.invocation.CompletableFutureReturnValueHandler;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
import org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler;
import org.springframework.messaging.handler.invocation.ListenableFutureReturnValueHandler;
@ -83,6 +84,10 @@ import org.springframework.validation.Validator; @@ -83,6 +84,10 @@ import org.springframework.validation.Validator;
public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHandler<SimpMessageMappingInfo>
implements SmartLifecycle {
private static final boolean completableFuturePresent = ClassUtils.isPresent("java.util.concurrent.CompletableFuture",
SimpAnnotationMethodMessageHandler.class.getClassLoader());
private final SubscribableChannel clientInboundChannel;
private final SimpMessageSendingOperations clientMessagingTemplate;
@ -318,6 +323,10 @@ public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHan @@ -318,6 +323,10 @@ public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHan
// Single-purpose return value types
ListenableFutureReturnValueHandler lfh = new ListenableFutureReturnValueHandler();
handlers.add(lfh);
if (completableFuturePresent) {
CompletableFutureReturnValueHandler cfh = new CompletableFutureReturnValueHandler();
handlers.add(cfh);
}
// Annotation-based return value types
SendToMethodReturnValueHandler sth = new SendToMethodReturnValueHandler(this.brokerTemplate, true);

64
spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java

@ -21,6 +21,7 @@ import java.util.HashMap; @@ -21,6 +21,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.junit.Before;
@ -294,6 +295,51 @@ public class SimpAnnotationMethodMessageHandlerTests { @@ -294,6 +295,51 @@ public class SimpAnnotationMethodMessageHandlerTests {
assertTrue(controller.exceptionCatched);
}
@Test
@SuppressWarnings("unchecked")
public void completableFutureSuccess() {
given(this.channel.send(any(Message.class))).willReturn(true);
given(this.converter.toMessage(anyObject(), any(MessageHeaders.class)))
.willReturn((Message) MessageBuilder.withPayload(new byte[0]).build());
CompletableFutureController controller = new CompletableFutureController();
this.messageHandler.registerHandler(controller);
this.messageHandler.setDestinationPrefixes(Arrays.asList("/app1", "/app2/"));
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create();
headers.setSessionId("session1");
headers.setSessionAttributes(new HashMap<>());
headers.setDestination("/app1/completable-future");
Message<?> message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
this.messageHandler.handleMessage(message);
assertNotNull(controller.future);
controller.future.complete("foo");
verify(this.converter).toMessage(this.payloadCaptor.capture(), any(MessageHeaders.class));
assertEquals("foo", this.payloadCaptor.getValue());
}
@Test
@SuppressWarnings("unchecked")
public void completableFutureFailure() {
given(this.channel.send(any(Message.class))).willReturn(true);
given(this.converter.toMessage(anyObject(), any(MessageHeaders.class)))
.willReturn((Message) MessageBuilder.withPayload(new byte[0]).build());
CompletableFutureController controller = new CompletableFutureController();
this.messageHandler.registerHandler(controller);
this.messageHandler.setDestinationPrefixes(Arrays.asList("/app1", "/app2/"));
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create();
headers.setSessionId("session1");
headers.setSessionAttributes(new HashMap<>());
headers.setDestination("/app1/completable-future");
Message<?> message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
this.messageHandler.handleMessage(message);
controller.future.completeExceptionally(new IllegalStateException());
assertTrue(controller.exceptionCatched);
}
private static class TestSimpAnnotationMethodMessageHandler extends SimpAnnotationMethodMessageHandler {
@ -413,6 +459,24 @@ public class SimpAnnotationMethodMessageHandlerTests { @@ -413,6 +459,24 @@ public class SimpAnnotationMethodMessageHandlerTests {
}
@Controller
private static class CompletableFutureController {
private CompletableFuture<String> future;
private boolean exceptionCatched = false;
@MessageMapping("completable-future")
public CompletableFuture<String> handleCompletableFuture() {
this.future = new CompletableFuture<>();
return this.future;
}
@MessageExceptionHandler(IllegalStateException.class)
public void handleValidationException() {
this.exceptionCatched = true;
}
}
private static class StringTestValidator implements Validator {

Loading…
Cancel
Save