From 0def1640f271a97d8328d35e70b26eb9af20d164 Mon Sep 17 00:00:00 2001 From: Sebastien Deleuze Date: Mon, 23 Jul 2018 13:37:05 +0200 Subject: [PATCH] Support single-value reactive types in @MessageMapping This commit adds support for single-value reactive types in @MessageMapping by converting them using ReactiveAdapterRegistry and MonoToListenableFutureAdapter. MonoToListenableFutureAdapter previously package private and used only in org.springframework.messaging.tcp.reactor has been moved to org.springframework.messaging.support and made public in order to be used by ReactiveReturnValueHandler as well. Issue: SPR-16634 --- .../ReactiveReturnValueHandler.java | 65 ++++++++++++++ .../SimpAnnotationMethodMessageHandler.java | 2 + ...AbstractMonoToListenableFutureAdapter.java | 2 +- .../MonoToListenableFutureAdapter.java | 4 +- .../tcp/reactor/ReactorNettyTcpClient.java | 1 + .../reactor/ReactorNettyTcpConnection.java | 1 + ...mpAnnotationMethodMessageHandlerTests.java | 88 +++++++++++++++++++ 7 files changed, 160 insertions(+), 3 deletions(-) create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/ReactiveReturnValueHandler.java rename spring-messaging/src/main/java/org/springframework/messaging/{tcp/reactor => support}/AbstractMonoToListenableFutureAdapter.java (98%) rename spring-messaging/src/main/java/org/springframework/messaging/{tcp/reactor => support}/MonoToListenableFutureAdapter.java (88%) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/ReactiveReturnValueHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/ReactiveReturnValueHandler.java new file mode 100644 index 00000000000..3fdd678d575 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/ReactiveReturnValueHandler.java @@ -0,0 +1,65 @@ +/* + * Copyright 2002-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.handler.invocation; + +import reactor.core.publisher.Mono; + +import org.springframework.core.MethodParameter; +import org.springframework.core.ReactiveAdapter; +import org.springframework.core.ReactiveAdapterRegistry; +import org.springframework.messaging.support.MonoToListenableFutureAdapter; +import org.springframework.util.concurrent.ListenableFuture; + +/** + * Support for single-value reactive types (like {@code Mono} or {@code Single}) as a + * return value type. + * + * @author Sebastien Deleuze + * @since 5.1 + */ +public class ReactiveReturnValueHandler extends AbstractAsyncReturnValueHandler { + + private final ReactiveAdapterRegistry adapterRegistry; + + + public ReactiveReturnValueHandler() { + this(ReactiveAdapterRegistry.getSharedInstance()); + } + + public ReactiveReturnValueHandler(ReactiveAdapterRegistry adapterRegistry) { + this.adapterRegistry = adapterRegistry; + } + + + @Override + public boolean supportsReturnType(MethodParameter returnType) { + return this.adapterRegistry.getAdapter(returnType.getParameterType()) != null; + } + + @Override + public boolean isAsyncReturnValue(Object returnValue, MethodParameter returnType) { + ReactiveAdapter adapter = this.adapterRegistry.getAdapter(returnType.getParameterType(), returnValue); + return !adapter.isMultiValue() && !adapter.isNoValue(); + } + + @Override + public ListenableFuture toListenableFuture(Object returnValue, MethodParameter returnType) { + ReactiveAdapter adapter = this.adapterRegistry.getAdapter(returnType.getParameterType(), returnValue); + return new MonoToListenableFutureAdapter<>(Mono.from(adapter.toPublisher(returnValue))); + } + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandler.java index 4b5aa03c536..256f6d50fd5 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandler.java @@ -60,6 +60,7 @@ import org.springframework.messaging.handler.invocation.HandlerMethodArgumentRes import org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler; import org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandlerComposite; import org.springframework.messaging.handler.invocation.ListenableFutureReturnValueHandler; +import org.springframework.messaging.handler.invocation.ReactiveReturnValueHandler; import org.springframework.messaging.simp.SimpAttributesContextHolder; import org.springframework.messaging.simp.SimpLogging; import org.springframework.messaging.simp.SimpMessageHeaderAccessor; @@ -337,6 +338,7 @@ public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHan handlers.add(new ListenableFutureReturnValueHandler()); handlers.add(new CompletableFutureReturnValueHandler()); + handlers.add(new ReactiveReturnValueHandler()); // Annotation-based return value types diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractMonoToListenableFutureAdapter.java b/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractMonoToListenableFutureAdapter.java similarity index 98% rename from spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractMonoToListenableFutureAdapter.java rename to spring-messaging/src/main/java/org/springframework/messaging/support/AbstractMonoToListenableFutureAdapter.java index 29e73651c77..9e3870556b6 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractMonoToListenableFutureAdapter.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractMonoToListenableFutureAdapter.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.messaging.tcp.reactor; +package org.springframework.messaging.support; import java.time.Duration; import java.util.concurrent.ExecutionException; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/MonoToListenableFutureAdapter.java b/spring-messaging/src/main/java/org/springframework/messaging/support/MonoToListenableFutureAdapter.java similarity index 88% rename from spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/MonoToListenableFutureAdapter.java rename to spring-messaging/src/main/java/org/springframework/messaging/support/MonoToListenableFutureAdapter.java index ffa405bd84f..712644f55ee 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/MonoToListenableFutureAdapter.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/MonoToListenableFutureAdapter.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.messaging.tcp.reactor; +package org.springframework.messaging.support; import reactor.core.publisher.Mono; @@ -29,7 +29,7 @@ import org.springframework.lang.Nullable; * @since 5.0 * @param the object type */ -class MonoToListenableFutureAdapter extends AbstractMonoToListenableFutureAdapter { +public class MonoToListenableFutureAdapter extends AbstractMonoToListenableFutureAdapter { public MonoToListenableFutureAdapter(Mono mono) { super(mono); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index d2e66ba5c5c..8ad4774b0d2 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -49,6 +49,7 @@ import reactor.netty.tcp.TcpClient; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; +import org.springframework.messaging.support.MonoToListenableFutureAdapter; import org.springframework.messaging.tcp.ReconnectStrategy; import org.springframework.messaging.tcp.TcpConnection; import org.springframework.messaging.tcp.TcpConnectionHandler; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java index 49a3067c5fa..25ec53d67d4 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java @@ -23,6 +23,7 @@ import reactor.netty.NettyInbound; import reactor.netty.NettyOutbound; import org.springframework.messaging.Message; +import org.springframework.messaging.support.MonoToListenableFutureAdapter; import org.springframework.messaging.tcp.TcpConnection; import org.springframework.util.concurrent.ListenableFuture; diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java index 702c6db6f8c..3118a451987 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java @@ -31,12 +31,18 @@ import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import reactor.core.publisher.EmitterProcessor; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxProcessor; +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; import org.springframework.context.support.StaticApplicationContext; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.MessagingException; import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.handler.HandlerMethod; @@ -336,6 +342,61 @@ public class SimpAnnotationMethodMessageHandlerTests { assertTrue(controller.exceptionCaught); } + @Test + public void monoSuccess() { + Message emptyMessage = MessageBuilder.withPayload(new byte[0]).build(); + given(this.channel.send(any(Message.class))).willReturn(true); + given(this.converter.toMessage(any(), any(MessageHeaders.class))).willReturn(emptyMessage); + + ReactiveController controller = new ReactiveController(); + this.messageHandler.registerHandler(controller); + this.messageHandler.setDestinationPrefixes(Arrays.asList("/app1", "/app2/")); + + Message message = createMessage("/app1/mono"); + this.messageHandler.handleMessage(message); + + assertNotNull(controller.mono); + controller.mono.onNext("foo"); + verify(this.converter).toMessage(this.payloadCaptor.capture(), any(MessageHeaders.class)); + assertEquals("foo", this.payloadCaptor.getValue()); + } + + @Test + public void monoFailure() { + Message emptyMessage = MessageBuilder.withPayload(new byte[0]).build(); + given(this.channel.send(any(Message.class))).willReturn(true); + given(this.converter.toMessage(any(), any(MessageHeaders.class))).willReturn(emptyMessage); + + ReactiveController controller = new ReactiveController(); + this.messageHandler.registerHandler(controller); + this.messageHandler.setDestinationPrefixes(Arrays.asList("/app1", "/app2/")); + + Message message = createMessage("/app1/mono"); + this.messageHandler.handleMessage(message); + + controller.mono.onError(new IllegalStateException()); + assertTrue(controller.exceptionCaught); + } + + @Test + public void fluxNotHandled() { + Message emptyMessage = MessageBuilder.withPayload(new byte[0]).build(); + given(this.channel.send(any(Message.class))).willReturn(true); + given(this.converter.toMessage(any(), any(MessageHeaders.class))).willReturn(emptyMessage); + + ReactiveController controller = new ReactiveController(); + this.messageHandler.registerHandler(controller); + this.messageHandler.setDestinationPrefixes(Arrays.asList("/app1", "/app2/")); + + Message message = createMessage("/app1/flux"); + this.messageHandler.handleMessage(message); + + assertNotNull(controller.flux); + controller.flux.onNext("foo"); + + verify(this.converter, never()).toMessage(any(), any(MessageHeaders.class)); + } + @Test public void placeholder() throws Exception { Message message = createMessage("/pre/myValue"); @@ -542,6 +603,33 @@ public class SimpAnnotationMethodMessageHandlerTests { } } + @Controller + private static class ReactiveController { + + private MonoProcessor mono; + + private FluxProcessor flux; + + private boolean exceptionCaught = false; + + @MessageMapping("mono") + public Mono handleMono() { + this.mono = MonoProcessor.create(); + return this.mono; + } + + @MessageMapping("flux") + public Flux handleFlux() { + this.flux = EmitterProcessor.create(); + return this.flux; + } + + @MessageExceptionHandler(IllegalStateException.class) + public void handleValidationException() { + this.exceptionCaught = true; + } + } + private static class StringTestValidator implements Validator {