Browse Source

Use method signature to refine RSocket @MessageMapping

Before this change an @MessageMapping could be matched to any RSocket
interaction type, which is arguably too flexible, makes it difficult to
reason what would happen in case of a significant mismatch of
cardinality, e.g. request for Fire-And-Forget (1-to-0) mapped to a
method that returns Flux, and could result in payloads being ignored,
or not seen unintentionally.

This commit checks @ConnectMapping method on startup and rejects them
if they return any values (sync or async). It also refines each
@MessageMapping to match only the RSocket interaction type it fits
based on the input and output cardinality of the handler method.
Subsequently if a request is not matched, we'll do a second search to
identify partial matches (by route only) and raise a helpful error that
explains which interaction type is actually supported.

The reference docs has been updated to explain the options.

Closes gh-23999
pull/24019/head
Rossen Stoyanchev 6 years ago
parent
commit
842b424acd
  1. 36
      spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/MessageMapping.java
  2. 33
      spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/AbstractMethodMessageHandler.java
  3. 6
      spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/HandlerMethodArgumentResolverComposite.java
  4. 8
      spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/InvocableHelper.java
  5. 88
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketFrameTypeMessageCondition.java
  6. 101
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandler.java
  7. 15
      spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java
  8. 4
      spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java
  9. 27
      spring-messaging/src/test/java/org/springframework/messaging/rsocket/annotation/support/RSocketFrameTypeMessageConditionTests.java
  10. 100
      spring-messaging/src/test/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandlerTests.java
  11. 4
      spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketClientToServerCoroutinesIntegrationTests.kt
  12. 94
      src/docs/asciidoc/rsocket.adoc

36
spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/MessageMapping.java

@ -64,18 +64,34 @@ import org.springframework.messaging.Message;
* authenticated user.</li> * authenticated user.</li>
* </ul> * </ul>
* *
* <p>How the return value is handled depends on the processing scenario. For * <p>Return value handling depends on the processing scenario:
* STOMP over WebSocket, it is turned into a message and sent to a default response * <ul>
* destination or to a custom destination specified with an {@link SendTo @SendTo} * <li>STOMP over WebSocket -- the value is turned into a message and sent to a
* or {@link org.springframework.messaging.simp.annotation.SendToUser @SendToUser} * default response destination or to a custom destination specified with an
* annotation. For RSocket, the response is used to reply to the stream request. * {@link SendTo @SendTo} or
* {@link org.springframework.messaging.simp.annotation.SendToUser @SendToUser}
* annotation.
* <li>RSocket -- the response is used to reply to the stream request.
* </ul>
* *
* <p>Specializations of this annotation including * <p>Specializations of this annotation include
* {@link org.springframework.messaging.simp.annotation.SubscribeMapping @SubscribeMapping} or * {@link org.springframework.messaging.simp.annotation.SubscribeMapping @SubscribeMapping}
* (e.g. STOMP subscriptions) and
* {@link org.springframework.messaging.rsocket.annotation.ConnectMapping @ConnectMapping} * {@link org.springframework.messaging.rsocket.annotation.ConnectMapping @ConnectMapping}
* further narrow the mapping by message type. Both can be combined with a * (e.g. RSocket connections). Both narrow the primary mapping further and also match
* type-level {@code @MessageMapping} for declaring a common pattern prefix * against the message type. Both can be combined with a type-level
* (or prefixes). * {@code @MessageMapping} that declares a common pattern prefix (or prefixes).
*
* <p>For further details on the use of this annotation in different contexts,
* see the following sections of the Spring Framework reference:
* <ul>
* <li>STOMP over WebSocket
* <a href="https://docs.spring.io/spring/docs/current/spring-framework-reference/web.html#websocket-stomp-handle-annotations">
* "Annotated Controllers"</a>.
* <li>RSocket
* <a href="https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html#rsocket-annot-responders">
* "Annotated Responders"</a>.
* </ul>
* *
* <p><b>NOTE:</b> When using controller interfaces (e.g. for AOP proxying), * <p><b>NOTE:</b> When using controller interfaces (e.g. for AOP proxying),
* make sure to consistently put <i>all</i> your mapping annotations - such as * make sure to consistently put <i>all</i> your mapping annotations - such as

33
spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/AbstractMethodMessageHandler.java

@ -232,6 +232,15 @@ public abstract class AbstractMethodMessageHandler<T>
return CollectionUtils.unmodifiableMultiValueMap(this.destinationLookup); return CollectionUtils.unmodifiableMultiValueMap(this.destinationLookup);
} }
/**
* Return the argument resolvers initialized during {@link #afterPropertiesSet()}.
* Primarily for internal use in sub-classes.
* @since 5.2.2
*/
protected HandlerMethodArgumentResolverComposite getArgumentResolvers() {
return this.invocableHelper.getArgumentResolvers();
}
@Override @Override
public void afterPropertiesSet() { public void afterPropertiesSet() {
@ -377,6 +386,7 @@ public abstract class AbstractMethodMessageHandler<T>
oldHandlerMethod.getBean() + "' bean method\n" + oldHandlerMethod + " mapped."); oldHandlerMethod.getBean() + "' bean method\n" + oldHandlerMethod + " mapped.");
} }
mapping = extendMapping(mapping, newHandlerMethod);
this.handlerMethods.put(mapping, newHandlerMethod); this.handlerMethods.put(mapping, newHandlerMethod);
for (String pattern : getDirectLookupMappings(mapping)) { for (String pattern : getDirectLookupMappings(mapping)) {
@ -402,6 +412,21 @@ public abstract class AbstractMethodMessageHandler<T>
return handlerMethod; return handlerMethod;
} }
/**
* This method is invoked just before mappings are added. It allows
* sub-classes to update the mapping with the {@link HandlerMethod} in mind.
* This can be useful when the method signature is used to refine the
* mapping, e.g. based on the cardinality of input and output.
* <p>By default this method returns the mapping that is passed in.
* @param mapping the mapping to be added
* @param handlerMethod the target handler for the mapping
* @return a new mapping or the same
* @since 5.2.2
*/
protected T extendMapping(T mapping, HandlerMethod handlerMethod) {
return mapping;
}
/** /**
* Return String-based destinations for the given mapping, if any, that can * Return String-based destinations for the given mapping, if any, that can
* be used to find matches with a direct lookup (i.e. non-patterns). * be used to find matches with a direct lookup (i.e. non-patterns).
@ -414,7 +439,13 @@ public abstract class AbstractMethodMessageHandler<T>
@Override @Override
public Mono<Void> handleMessage(Message<?> message) throws MessagingException { public Mono<Void> handleMessage(Message<?> message) throws MessagingException {
Match<T> match = getHandlerMethod(message); Match<T> match = null;
try {
match = getHandlerMethod(message);
}
catch (Exception ex) {
return Mono.error(ex);
}
if (match == null) { if (match == null) {
// handleNoMatch would have been invoked already // handleNoMatch would have been invoked already
return Mono.empty(); return Mono.empty();

6
spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/HandlerMethodArgumentResolverComposite.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2018 the original author or authors. * Copyright 2002-2019 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -38,7 +38,7 @@ import org.springframework.messaging.Message;
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @since 5.2 * @since 5.2
*/ */
class HandlerMethodArgumentResolverComposite implements HandlerMethodArgumentResolver { public class HandlerMethodArgumentResolverComposite implements HandlerMethodArgumentResolver {
protected final Log logger = LogFactory.getLog(getClass()); protected final Log logger = LogFactory.getLog(getClass());
@ -125,7 +125,7 @@ class HandlerMethodArgumentResolverComposite implements HandlerMethodArgumentRes
* the given method parameter. * the given method parameter.
*/ */
@Nullable @Nullable
private HandlerMethodArgumentResolver getArgumentResolver(MethodParameter parameter) { public HandlerMethodArgumentResolver getArgumentResolver(MethodParameter parameter) {
HandlerMethodArgumentResolver result = this.argumentResolverCache.get(parameter); HandlerMethodArgumentResolver result = this.argumentResolverCache.get(parameter);
if (result == null) { if (result == null) {
for (HandlerMethodArgumentResolver methodArgumentResolver : this.argumentResolvers) { for (HandlerMethodArgumentResolver methodArgumentResolver : this.argumentResolvers) {

8
spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/InvocableHelper.java

@ -80,6 +80,14 @@ class InvocableHelper {
this.argumentResolvers.addResolvers(resolvers); this.argumentResolvers.addResolvers(resolvers);
} }
/**
* Return the configured resolvers.
* @since 5.2.2
*/
public HandlerMethodArgumentResolverComposite getArgumentResolvers() {
return this.argumentResolvers;
}
/** /**
* Add the return value handlers to use for message handling and exception * Add the return value handlers to use for message handling and exception
* handling methods. * handling methods.

88
spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketFrameTypeMessageCondition.java

@ -56,6 +56,13 @@ public class RSocketFrameTypeMessageCondition extends AbstractMessageCondition<R
} }
static final RSocketFrameTypeMessageCondition CONNECT_CONDITION =
new RSocketFrameTypeMessageCondition(FrameType.SETUP, FrameType.METADATA_PUSH);
static final RSocketFrameTypeMessageCondition EMPTY_CONDITION = new RSocketFrameTypeMessageCondition();
private final Set<FrameType> frameTypes; private final Set<FrameType> frameTypes;
@ -68,6 +75,10 @@ public class RSocketFrameTypeMessageCondition extends AbstractMessageCondition<R
this.frameTypes = Collections.unmodifiableSet(new LinkedHashSet<>(frameTypes)); this.frameTypes = Collections.unmodifiableSet(new LinkedHashSet<>(frameTypes));
} }
private RSocketFrameTypeMessageCondition() {
this.frameTypes = Collections.emptySet();
}
public Set<FrameType> getFrameTypes() { public Set<FrameType> getFrameTypes() {
return this.frameTypes; return this.frameTypes;
@ -124,18 +135,71 @@ public class RSocketFrameTypeMessageCondition extends AbstractMessageCondition<R
} }
/** Condition to match the initial SETUP frame and subsequent metadata pushes. */ /**
public static final RSocketFrameTypeMessageCondition CONNECT_CONDITION = * Return a condition for matching the RSocket request interaction type with
new RSocketFrameTypeMessageCondition( * that is selected based on the delcared request and response cardinality
FrameType.SETUP, * of some handler method.
FrameType.METADATA_PUSH); * <p>The table below shows the selections made:
* <table>
* <tr>
* <th>Request Cardinality</th>
* <th>Response Cardinality</th>
* <th>Interaction Types</th>
* </tr>
* <tr>
* <td>0,1</td>
* <td>0</td>
* <td>Fire-And-Forget, Request-Response</td>
* </tr>
* <tr>
* <td>0,1</td>
* <td>1</td>
* <td>Request-Response</td>
* </tr>
* <tr>
* <td>0,1</td>
* <td>2</td>
* <td>Request-Stream</td>
* </tr>
* <tr>
* <td>2</td>
* <td>Any</td>
* <td>Request-Channel</td>
* </tr>
* </table>
* @param cardinalityIn -- the request cardinality: 1 for a single payload,
* 2 for many payloads, and 0 if input is not handled.
* @param cardinalityOut -- the response cardinality: 0 for no output
* payloads, 1 for a single payload, and 2 for many payloads.
* @return a condition to use for matching the interaction type
* @since 5.2.2
*/
public static RSocketFrameTypeMessageCondition getCondition(int cardinalityIn, int cardinalityOut) {
switch (cardinalityIn) {
case 0:
case 1:
switch (cardinalityOut) {
case 0: return FF_RR_CONDITION;
case 1: return RR_CONDITION;
case 2: return RS_CONDITION;
default: throw new IllegalStateException("Invalid cardinality: " + cardinalityOut);
}
case 2:
return RC_CONDITION;
default:
throw new IllegalStateException("Invalid cardinality: " + cardinalityIn);
}
}
private static final RSocketFrameTypeMessageCondition FF_CONDITION = from(FrameType.REQUEST_FNF);
private static final RSocketFrameTypeMessageCondition RR_CONDITION = from(FrameType.REQUEST_RESPONSE);
private static final RSocketFrameTypeMessageCondition RS_CONDITION = from(FrameType.REQUEST_STREAM);
private static final RSocketFrameTypeMessageCondition RC_CONDITION = from(FrameType.REQUEST_CHANNEL);
private static final RSocketFrameTypeMessageCondition FF_RR_CONDITION = FF_CONDITION.combine(RR_CONDITION);
/** Condition to match one of the 4 stream request types. */ private static RSocketFrameTypeMessageCondition from(FrameType... frameTypes) {
public static final RSocketFrameTypeMessageCondition REQUEST_CONDITION = return new RSocketFrameTypeMessageCondition(frameTypes);
new RSocketFrameTypeMessageCondition( }
FrameType.REQUEST_FNF,
FrameType.REQUEST_RESPONSE,
FrameType.REQUEST_STREAM,
FrameType.REQUEST_CHANNEL);
} }

101
spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandler.java

@ -19,6 +19,9 @@ package org.springframework.messaging.rsocket.annotation.support;
import java.lang.reflect.AnnotatedElement; import java.lang.reflect.AnnotatedElement;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import io.rsocket.ConnectionSetupPayload; import io.rsocket.ConnectionSetupPayload;
import io.rsocket.RSocket; import io.rsocket.RSocket;
@ -28,6 +31,8 @@ import io.rsocket.metadata.WellKnownMimeType;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import org.springframework.core.MethodParameter;
import org.springframework.core.ReactiveAdapter;
import org.springframework.core.ReactiveAdapterRegistry; import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.annotation.AnnotatedElementUtils; import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.codec.Decoder; import org.springframework.core.codec.Decoder;
@ -37,8 +42,11 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessageDeliveryException; import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.handler.CompositeMessageCondition; import org.springframework.messaging.handler.CompositeMessageCondition;
import org.springframework.messaging.handler.DestinationPatternsMessageCondition; import org.springframework.messaging.handler.DestinationPatternsMessageCondition;
import org.springframework.messaging.handler.HandlerMethod;
import org.springframework.messaging.handler.MessageCondition;
import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.reactive.MessageMappingMessageHandler; import org.springframework.messaging.handler.annotation.reactive.MessageMappingMessageHandler;
import org.springframework.messaging.handler.annotation.reactive.PayloadMethodArgumentResolver;
import org.springframework.messaging.handler.invocation.reactive.HandlerMethodReturnValueHandler; import org.springframework.messaging.handler.invocation.reactive.HandlerMethodReturnValueHandler;
import org.springframework.messaging.rsocket.ClientRSocketFactoryConfigurer; import org.springframework.messaging.rsocket.ClientRSocketFactoryConfigurer;
import org.springframework.messaging.rsocket.MetadataExtractor; import org.springframework.messaging.rsocket.MetadataExtractor;
@ -55,12 +63,27 @@ import org.springframework.util.StringUtils;
* Extension of {@link MessageMappingMessageHandler} for handling RSocket * Extension of {@link MessageMappingMessageHandler} for handling RSocket
* requests with {@link ConnectMapping @ConnectMapping} and * requests with {@link ConnectMapping @ConnectMapping} and
* {@link MessageMapping @MessageMapping} methods. * {@link MessageMapping @MessageMapping} methods.
* <p>Use {@link #responder()} to obtain a {@link SocketAcceptor} adapter to *
* plug in as responder into an {@link io.rsocket.RSocketFactory}. * <p>For server scenarios this class can be declared as a bean in Spring
* <p>Use {@link #clientResponder(RSocketStrategies, Object...)} to obtain a * configuration and that would detect {@code @MessageMapping} methods in
* client responder configurer * {@code @Controller} beans. What beans are checked can be changed through a
* {@link #setHandlerPredicate(Predicate) handlerPredicate}. Given an instance
* of this class, you can then use {@link #responder()} to obtain a
* {@link SocketAcceptor} adapter to register with the
* {@link io.rsocket.RSocketFactory}.
*
* <p>For client scenarios, possibly in the same process as a server, consider
* consider using the static factory method
* {@link #clientResponder(RSocketStrategies, Object...)} to obtain a client
* responder to be registered with an
* {@link org.springframework.messaging.rsocket.RSocketRequester.Builder#rsocketFactory * {@link org.springframework.messaging.rsocket.RSocketRequester.Builder#rsocketFactory
* RSocketRequester}. * RSocketRequester.Builder}.
*
* <p>For {@code @MessageMapping} methods, this class automatically determines
* the RSocket interaction type based on the input and output cardinality of the
* method. See the
* <a href="https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html#rsocket-annot-responders">
* "Annotated Responders"</a> section of the Spring Framework reference for more details.
* *
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @since 5.2 * @since 5.2
@ -263,6 +286,17 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
getArgumentResolverConfigurer().addCustomResolver(new RSocketRequesterMethodArgumentResolver()); getArgumentResolverConfigurer().addCustomResolver(new RSocketRequesterMethodArgumentResolver());
super.afterPropertiesSet(); super.afterPropertiesSet();
getHandlerMethods().forEach((composite, handler) -> {
if (composite.getMessageConditions().contains(RSocketFrameTypeMessageCondition.CONNECT_CONDITION)) {
MethodParameter returnType = handler.getReturnType();
if (getCardinality(returnType) > 0) {
throw new IllegalStateException(
"Invalid @ConnectMapping method. " +
"Return type must be void or a void async type: " + handler);
}
}
});
} }
@Override @Override
@ -279,10 +313,9 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
protected CompositeMessageCondition getCondition(AnnotatedElement element) { protected CompositeMessageCondition getCondition(AnnotatedElement element) {
MessageMapping ann1 = AnnotatedElementUtils.findMergedAnnotation(element, MessageMapping.class); MessageMapping ann1 = AnnotatedElementUtils.findMergedAnnotation(element, MessageMapping.class);
if (ann1 != null && ann1.value().length > 0) { if (ann1 != null && ann1.value().length > 0) {
String[] patterns = processDestinations(ann1.value());
return new CompositeMessageCondition( return new CompositeMessageCondition(
RSocketFrameTypeMessageCondition.REQUEST_CONDITION, RSocketFrameTypeMessageCondition.EMPTY_CONDITION,
new DestinationPatternsMessageCondition(patterns, obtainRouteMatcher())); new DestinationPatternsMessageCondition(processDestinations(ann1.value()), obtainRouteMatcher()));
} }
ConnectMapping ann2 = AnnotatedElementUtils.findMergedAnnotation(element, ConnectMapping.class); ConnectMapping ann2 = AnnotatedElementUtils.findMergedAnnotation(element, ConnectMapping.class);
if (ann2 != null) { if (ann2 != null) {
@ -294,6 +327,45 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
return null; return null;
} }
@Override
protected CompositeMessageCondition extendMapping(CompositeMessageCondition composite, HandlerMethod handler) {
List<MessageCondition<?>> conditions = composite.getMessageConditions();
Assert.isTrue(conditions.size() == 2 &&
conditions.get(0) instanceof RSocketFrameTypeMessageCondition &&
conditions.get(1) instanceof DestinationPatternsMessageCondition,
"Unexpected message condition types");
if (conditions.get(0) != RSocketFrameTypeMessageCondition.EMPTY_CONDITION) {
return composite;
}
int responseCardinality = getCardinality(handler.getReturnType());
int requestCardinality = 0;
for (MethodParameter parameter : handler.getMethodParameters()) {
if (getArgumentResolvers().getArgumentResolver(parameter) instanceof PayloadMethodArgumentResolver) {
requestCardinality = getCardinality(parameter);
}
}
return new CompositeMessageCondition(
RSocketFrameTypeMessageCondition.getCondition(requestCardinality, responseCardinality),
conditions.get(1));
}
private int getCardinality(MethodParameter parameter) {
Class<?> clazz = parameter.getParameterType();
ReactiveAdapter adapter = getReactiveAdapterRegistry().getAdapter(clazz);
if (adapter == null) {
return clazz.equals(void.class) ? 0 : 1;
}
else if (parameter.nested().getNestedParameterType().equals(Void.class)) {
return 0;
}
else {
return adapter.isMultiValue() ? 2 : 1;
}
}
@Override @Override
protected void handleNoMatch(@Nullable RouteMatcher.Route destination, Message<?> message) { protected void handleNoMatch(@Nullable RouteMatcher.Route destination, Message<?> message) {
@ -306,7 +378,18 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
logger.warn("No handler for fireAndForget to '" + destination + "'"); logger.warn("No handler for fireAndForget to '" + destination + "'");
return; return;
} }
throw new MessageDeliveryException("No handler for destination '" + destination + "'");
Set<FrameType> frameTypes = getHandlerMethods().keySet().stream()
.map(CompositeMessageCondition::getMessageConditions)
.filter(conditions -> conditions.get(1).getMatchingCondition(message) != null)
.map(conditions -> (RSocketFrameTypeMessageCondition) conditions.get(0))
.flatMap(condition -> condition.getFrameTypes().stream())
.collect(Collectors.toSet());
throw new MessageDeliveryException(frameTypes.isEmpty() ?
"No handler for destination '" + destination + "'" :
"Destination '" + destination + "' does not support " + frameType + ". " +
"Supported interaction(s): " + frameTypes);
} }
/** /**

15
spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java

@ -137,22 +137,16 @@ public class RSocketBufferLeakTests {
@Test @Test
public void errorSignalWithExceptionHandler() { public void errorSignalWithExceptionHandler() {
Mono<String> result = requester.route("error-signal").data("foo").retrieveMono(String.class); Flux<String> result = requester.route("error-signal").data("foo").retrieveFlux(String.class);
StepVerifier.create(result).expectNext("Handled 'bad input'").expectComplete().verify(Duration.ofSeconds(5)); StepVerifier.create(result).expectNext("Handled 'bad input'").expectComplete().verify(Duration.ofSeconds(5));
} }
@Test @Test
public void ignoreInput() { public void ignoreInput() {
Flux<String> result = requester.route("ignore-input").data("a").retrieveFlux(String.class); Mono<String> result = requester.route("ignore-input").data("a").retrieveMono(String.class);
StepVerifier.create(result).expectNext("bar").thenCancel().verify(Duration.ofSeconds(5)); StepVerifier.create(result).expectNext("bar").thenCancel().verify(Duration.ofSeconds(5));
} }
@Test
public void retrieveMonoFromFluxResponderMethod() {
Mono<String> result = requester.route("request-stream").data("foo").retrieveMono(String.class);
StepVerifier.create(result).expectNext("foo-1").expectComplete().verify(Duration.ofSeconds(5));
}
@Controller @Controller
static class ServerController { static class ServerController {
@ -188,11 +182,6 @@ public class RSocketBufferLeakTests {
Mono<String> ignoreInput() { Mono<String> ignoreInput() {
return Mono.delay(Duration.ofMillis(10)).map(l -> "bar"); return Mono.delay(Duration.ofMillis(10)).map(l -> "bar");
} }
@MessageMapping("request-stream")
Flux<String> stream(String payload) {
return Flux.range(1,100).delayElements(Duration.ofMillis(10)).map(idx -> payload + "-" + idx);
}
} }

4
spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java

@ -159,13 +159,13 @@ public class RSocketClientToServerIntegrationTests {
@Test @Test
public void voidReturnValue() { public void voidReturnValue() {
Flux<String> result = requester.route("void-return-value").data("Hello").retrieveFlux(String.class); Mono<String> result = requester.route("void-return-value").data("Hello").retrieveMono(String.class);
StepVerifier.create(result).expectComplete().verify(Duration.ofSeconds(5)); StepVerifier.create(result).expectComplete().verify(Duration.ofSeconds(5));
} }
@Test @Test
public void voidReturnValueFromExceptionHandler() { public void voidReturnValueFromExceptionHandler() {
Flux<String> result = requester.route("void-return-value").data("bad").retrieveFlux(String.class); Mono<String> result = requester.route("void-return-value").data("bad").retrieveMono(String.class);
StepVerifier.create(result).expectComplete().verify(Duration.ofSeconds(5)); StepVerifier.create(result).expectComplete().verify(Duration.ofSeconds(5));
} }

27
spring-messaging/src/test/java/org/springframework/messaging/rsocket/annotation/support/RSocketFrameTypeMessageConditionTests.java

@ -26,6 +26,8 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.MessageBuilder;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.messaging.rsocket.annotation.support.RSocketFrameTypeMessageCondition.CONNECT_CONDITION;
import static org.springframework.messaging.rsocket.annotation.support.RSocketFrameTypeMessageCondition.EMPTY_CONDITION;
/** /**
* Unit tests for {@link RSocketFrameTypeMessageCondition}. * Unit tests for {@link RSocketFrameTypeMessageCondition}.
@ -33,16 +35,37 @@ import static org.assertj.core.api.Assertions.assertThat;
*/ */
public class RSocketFrameTypeMessageConditionTests { public class RSocketFrameTypeMessageConditionTests {
private static final RSocketFrameTypeMessageCondition FNF_RR_CONDITION =
new RSocketFrameTypeMessageCondition(FrameType.REQUEST_FNF, FrameType.REQUEST_RESPONSE);
@Test @Test
public void getMatchingCondition() { public void getMatchingCondition() {
Message<?> message = message(FrameType.REQUEST_RESPONSE); Message<?> message = message(FrameType.REQUEST_RESPONSE);
RSocketFrameTypeMessageCondition condition = condition(FrameType.REQUEST_FNF, FrameType.REQUEST_RESPONSE); RSocketFrameTypeMessageCondition actual = FNF_RR_CONDITION.getMatchingCondition(message);
RSocketFrameTypeMessageCondition actual = condition.getMatchingCondition(message);
assertThat(actual).isNotNull(); assertThat(actual).isNotNull();
assertThat(actual.getFrameTypes()).hasSize(1).containsOnly(FrameType.REQUEST_RESPONSE); assertThat(actual.getFrameTypes()).hasSize(1).containsOnly(FrameType.REQUEST_RESPONSE);
} }
@Test
public void getMatchingConditionEmpty() {
Message<?> message = message(FrameType.REQUEST_RESPONSE);
RSocketFrameTypeMessageCondition actual = EMPTY_CONDITION.getMatchingCondition(message);
assertThat(actual).isNull();
}
@Test
public void combine() {
assertThat(EMPTY_CONDITION.combine(CONNECT_CONDITION).getFrameTypes())
.containsExactly(FrameType.SETUP, FrameType.METADATA_PUSH);
assertThat(EMPTY_CONDITION.combine(new RSocketFrameTypeMessageCondition(FrameType.REQUEST_FNF)).getFrameTypes())
.containsExactly(FrameType.REQUEST_FNF);
}
@Test @Test
public void compareTo() { public void compareTo() {
Message<byte[]> message = message(null); Message<byte[]> message = message(null);

100
spring-messaging/src/test/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandlerTests.java

@ -21,6 +21,9 @@ import java.util.Map;
import io.rsocket.frame.FrameType; import io.rsocket.frame.FrameType;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import org.springframework.core.ReactiveAdapterRegistry; import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.codec.ByteArrayDecoder; import org.springframework.core.codec.ByteArrayDecoder;
@ -170,6 +173,69 @@ public class RSocketMessageHandlerTests {
} }
} }
@Test
public void rejectConnectMappingMethodsThatCanReply() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setHandlers(Collections.singletonList(new InvalidConnectMappingController()));
assertThatThrownBy(handler::afterPropertiesSet)
.hasMessage("Invalid @ConnectMapping method. " +
"Return type must be void or a void async type: " +
"public java.lang.String org.springframework.messaging.rsocket.annotation.support." +
"RSocketMessageHandlerTests$InvalidConnectMappingController.connectString()");
handler = new RSocketMessageHandler();
handler.setHandlers(Collections.singletonList(new AnotherInvalidConnectMappingController()));
assertThatThrownBy(handler::afterPropertiesSet)
.hasMessage("Invalid @ConnectMapping method. " +
"Return type must be void or a void async type: " +
"public reactor.core.publisher.Mono<java.lang.String> " +
"org.springframework.messaging.rsocket.annotation.support." +
"RSocketMessageHandlerTests$AnotherInvalidConnectMappingController.connectString()");
}
@Test
public void ignoreFireAndForgetToHandlerThatCanReply() {
InteractionMismatchController controller = new InteractionMismatchController();
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setHandlers(Collections.singletonList(controller));
handler.afterPropertiesSet();
MessageHeaderAccessor headers = new MessageHeaderAccessor();
headers.setLeaveMutable(true);
RouteMatcher.Route route = handler.getRouteMatcher().parseRoute("mono-string");
headers.setHeader(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, route);
headers.setHeader(RSocketFrameTypeMessageCondition.FRAME_TYPE_HEADER, FrameType.REQUEST_FNF);
Message<?> message = MessageBuilder.createMessage(Mono.empty(), headers.getMessageHeaders());
// Simply dropped and logged (error cannot propagate to client)
StepVerifier.create(handler.handleMessage(message)).expectComplete().verify();
assertThat(controller.invokeCount).isEqualTo(0);
}
@Test
public void rejectRequestResponseToStreamingHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setHandlers(Collections.singletonList(new InteractionMismatchController()));
handler.afterPropertiesSet();
MessageHeaderAccessor headers = new MessageHeaderAccessor();
headers.setLeaveMutable(true);
RouteMatcher.Route route = handler.getRouteMatcher().parseRoute("flux-string");
headers.setHeader(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, route);
headers.setHeader(RSocketFrameTypeMessageCondition.FRAME_TYPE_HEADER, FrameType.REQUEST_RESPONSE);
Message<?> message = MessageBuilder.createMessage(Mono.empty(), headers.getMessageHeaders());
StepVerifier.create(handler.handleMessage(message))
.expectErrorMessage(
"Destination 'flux-string' does not support REQUEST_RESPONSE. " +
"Supported interaction(s): [REQUEST_STREAM]")
.verify();
}
@Test @Test
public void handleNoMatch() { public void handleNoMatch() {
@ -222,4 +288,38 @@ public class RSocketMessageHandlerTests {
} }
} }
private static class InvalidConnectMappingController {
@ConnectMapping
public String connectString() {
return "";
}
}
private static class AnotherInvalidConnectMappingController {
@ConnectMapping
public Mono<String> connectString() {
return Mono.empty();
}
}
private static class InteractionMismatchController {
private int invokeCount;
@MessageMapping("mono-string")
public Mono<String> messageMonoString() {
this.invokeCount++;
return Mono.empty();
}
@MessageMapping("flux-string")
public Flux<String> messageFluxString() {
this.invokeCount++;
return Flux.empty();
}
}
} }

4
spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketClientToServerCoroutinesIntegrationTests.kt

@ -84,13 +84,13 @@ class RSocketClientToServerCoroutinesIntegrationTests {
@Test @Test
fun unitReturnValue() { fun unitReturnValue() {
val result = requester.route("unit-return-value").data("Hello").retrieveFlux(String::class.java) val result = requester.route("unit-return-value").data("Hello").retrieveMono(String::class.java)
StepVerifier.create(result).expectComplete().verify(Duration.ofSeconds(5)) StepVerifier.create(result).expectComplete().verify(Duration.ofSeconds(5))
} }
@Test @Test
fun unitReturnValueFromExceptionHandler() { fun unitReturnValueFromExceptionHandler() {
val result = requester.route("unit-return-value").data("bad").retrieveFlux(String::class.java) val result = requester.route("unit-return-value").data("bad").retrieveMono(String::class.java)
StepVerifier.create(result).expectComplete().verify(Duration.ofSeconds(5)) StepVerifier.create(result).expectComplete().verify(Duration.ofSeconds(5))
} }

94
src/docs/asciidoc/rsocket.adoc

@ -586,7 +586,7 @@ indicates only that the message was successfully sent, and not that it was handl
== Annotated Responders == Annotated Responders
RSocket responders can be implemented as `@MessageMapping` and `@ConnectMapping` methods. RSocket responders can be implemented as `@MessageMapping` and `@ConnectMapping` methods.
`@MessageMapping` methods handle individual requests, and `@ConnectMapping` methods handle `@MessageMapping` methods handle individual requests while `@ConnectMapping` methods handle
connection-level events (setup and metadata push). Annotated responders are supported connection-level events (setup and metadata push). Annotated responders are supported
symmetrically, for responding from the server side and for responding from the client side. symmetrically, for responding from the server side and for responding from the client side.
@ -760,20 +760,90 @@ class RadarsController {
} }
---- ----
You don't need to explicit specify the RSocket interaction type. Simply declare the The above `@MessageMapping` method responds to a Request-Stream interaction having the
expected input and output, and a route pattern. The supporting infrastructure will adapt route "locate.radars.within". It supports a flexible method signature with the option to
matching requests. use the following method arguments:
The following additional arguments are supported for `@MessageMapping` methods: [cols="1,3",options="header"]
|===
| Method Argument
| Description
* `RSocketRequester` -- the requester for the connection associated with the request, | `@Payload`
to make requests to the remote end. | The payload of the request. This can be a concrete value of asynchronous types like
* `@DestinationVariable` -- the value for a variable from the pattern, e.g. `Mono` or `Flux`.
*Note:* Use of the annotation is optional. A method argument that is not a simple type
and is not any of the other supported arguments, is assumed to be the expected payload.
| `RSocketRequester`
| Requester for making requests to the remote end.
| `@DestinationVariable`
| Value extracted from the route based on variables in the mapping pattern, e.g.
`@MessageMapping("find.radar.{id}")`. `@MessageMapping("find.radar.{id}")`.
* `@Header` -- access to a metadata value registered for extraction, as described in
<<rsocket-metadata-extractor>>. | `@Header`
* `@Headers Map<String, Object>` -- access to all metadata values registered for | Metadata value registered for extraction as described in <<rsocket-metadata-extractor>>.
extraction, as described in <<rsocket-metadata-extractor>>.
| `@Headers Map<String, Object>`
| All metadata values registered for extraction as described in <<rsocket-metadata-extractor>>.
|===
The return value is expected to be one or more Objects to be serialized as response
payloads. That can be asynchronous types like `Mono` or `Flux`, a concrete value, or
either `void` or a no-value asynchronous type such as `Mono<Void>`.
The RSocket interaction type that an `@MessageMapping` method supports is determined from
the cardinality of the input (i.e. `@Payload` argument) and of the output, where
cardinality means the following:
[%autowidth]
[cols=2*,options="header"]
|===
| Cardinality
| Description
| 1
| Either an explicit value, or a single-value asynchronous type such as `Mono<T>`.
| Many
| A multi-value asynchronous type such as `Flux<T>`.
| 0
| For input this means the method does not have an `@Payload` argument.
For output this is `void` or a no-value asynchronous type such as `Mono<Void>`.
|===
The table below shows all input and output cardinality combinations and the corresponding
interaction type(s):
[%autowidth]
[cols=3*,options="header"]
|===
| Input Cardinality
| Output Cardinality
| Interaction Types
| 0, 1
| 0
| Fire-and-Forget, Request-Response
| 0, 1
| 1
| Request-Response
| 0, 1
| Many
| Request-Stream
| Many
| 0, 1, Many
| Request-Channel
|===

Loading…
Cancel
Save