Browse Source

Support `@RSocketExchange` for annotated responders

See gh-30936
pull/30963/head
Olga MaciaszekSharma 2 years ago committed by rstoyanchev
parent
commit
4cd9e2e9b0
  1. 62
      framework-docs/modules/ROOT/pages/rsocket.adoc
  2. 25
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandler.java
  3. 20
      spring-messaging/src/test/java/org/springframework/messaging/rsocket/service/RSocketServiceIntegrationTests.java

62
framework-docs/modules/ROOT/pages/rsocket.adoc

@ -139,8 +139,10 @@ The `spring-messaging` module contains the following:
* xref:rsocket.adoc#rsocket-requester[RSocketRequester] -- fluent API to make requests through an `io.rsocket.RSocket` * xref:rsocket.adoc#rsocket-requester[RSocketRequester] -- fluent API to make requests through an `io.rsocket.RSocket`
with data and metadata encoding/decoding. with data and metadata encoding/decoding.
* xref:rsocket.adoc#rsocket-annot-responders[Annotated Responders] -- `@MessageMapping` annotated handler methods for * xref:rsocket.adoc#rsocket-interface[RSocket Interfaces] -- `@RSocketExchange` annotated
responding. interfaces for making requests.
* xref:rsocket.adoc#rsocket-annot-responders[Annotated Responders] -- `@MessageMapping`
and `@RSocketExchange` annotated handler methods for responding.
The `spring-web` module contains `Encoder` and `Decoder` implementations such as Jackson The `spring-web` module contains `Encoder` and `Decoder` implementations such as Jackson
CBOR/JSON, and Protobuf that RSocket applications will likely need. It also contains the CBOR/JSON, and Protobuf that RSocket applications will likely need. It also contains the
@ -862,6 +864,59 @@ interaction type(s):
|=== |===
[[rsocket-annot-rsocketexchange]]
=== @RSocketExchange
While `@MessageMapping` is only supported for responding, `@RSocketExchange`
can be used both to create an annotated responder
and xref:rsocket.adoc#rsocket-interface[an RSocket Interface] that allows
making requests.
`@RSocketExchange` can be used as follows to create responder methods:
[tabs]
======
Java::
+
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
----
@Controller
public class RadarsController {
@RSocketExchange("locate.radars.within")
public Flux<AirportLocation> radars(MapRequest request) {
// ...
}
}
----
Kotlin::
+
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
----
@Controller
class RadarsController {
@RSocketExchange("locate.radars.within")
fun radars(request: MapRequest): Flow<AirportLocation> {
// ...
}
}
----
======
`@RSocketExhange` supports a very similar method signature to `@MessageMapping`,
however, since it needs to be suitable both for requester and responder use,
there are slight differences. Notably, while `@MessageMapping` accepts
a `String` array as its `value` parameter, only a single `String` can be passed
as the `value` of `@RSocketExchange`.
When it comes to possible return values and the way we establish supported
RSocket interaction types, it works in the same way as with `@MessageMapping`.
Similarly to `@MessageMapping`, `@RSocketExchange` can also be used at class
level to specify a common prefix for all the method routes within the class.
[[rsocket-annot-connectmapping]] [[rsocket-annot-connectmapping]]
=== @ConnectMapping === @ConnectMapping
@ -1026,6 +1081,9 @@ Two, create a proxy that will perform the declared RSocket exchanges:
RepositoryService service = factory.createClient(RadarService.class); RepositoryService service = factory.createClient(RadarService.class);
---- ----
NOTE: Apart from RSocket interface services, `@RSocketExchange` can also
be used to create xref:rsocket.adoc#rsocket-annot-rsocketexchange[annotated responders].
[[rsocket-interface-method-parameters]] [[rsocket-interface-method-parameters]]
=== Method Parameters === Method Parameters

25
spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandler.java

@ -52,6 +52,7 @@ import org.springframework.messaging.rsocket.MetadataExtractor;
import org.springframework.messaging.rsocket.RSocketRequester; import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies; import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.messaging.rsocket.annotation.ConnectMapping; import org.springframework.messaging.rsocket.annotation.ConnectMapping;
import org.springframework.messaging.rsocket.service.RSocketExchange;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.MimeType; import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils; import org.springframework.util.MimeTypeUtils;
@ -60,8 +61,9 @@ import org.springframework.util.StringUtils;
/** /**
* Extension of {@link MessageMappingMessageHandler} for handling RSocket * Extension of {@link MessageMappingMessageHandler} for handling RSocket
* requests with {@link ConnectMapping @ConnectMapping} and * requests with {@link ConnectMapping @ConnectMapping},
* {@link MessageMapping @MessageMapping} methods. * {@link MessageMapping @MessageMapping}
* and {@link RSocketExchange @RSocketExchange} methods.
* *
* <p>For server scenarios this class can be declared as a bean in Spring * <p>For server scenarios this class can be declared as a bean in Spring
* configuration and that would detect {@code @MessageMapping} methods in * configuration and that would detect {@code @MessageMapping} methods in
@ -77,13 +79,14 @@ import org.springframework.util.StringUtils;
* {@link org.springframework.messaging.rsocket.RSocketRequester.Builder#rsocketConnector * {@link org.springframework.messaging.rsocket.RSocketRequester.Builder#rsocketConnector
* RSocketRequester.Builder}. * RSocketRequester.Builder}.
* *
* <p>For {@code @MessageMapping} methods, this class automatically determines * <p>For {@code @MessageMapping} and {@code @RSocketExchange} methods,
* the RSocket interaction type based on the input and output cardinality of the * this class automatically determines the RSocket interaction type
* method. See the * based on the input and output cardinality of the method. See the
* <a href="https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html#rsocket-annot-responders"> * <a href="https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html#rsocket-annot-responders">
* "Annotated Responders"</a> section of the Spring Framework reference for more details. * "Annotated Responders"</a> section of the Spring Framework reference for more details.
* *
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @author Olga Maciaszek-Sharma
* @since 5.2 * @since 5.2
*/ */
public class RSocketMessageHandler extends MessageMappingMessageHandler { public class RSocketMessageHandler extends MessageMappingMessageHandler {
@ -322,6 +325,15 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
RSocketFrameTypeMessageCondition.CONNECT_CONDITION, RSocketFrameTypeMessageCondition.CONNECT_CONDITION,
new DestinationPatternsMessageCondition(patterns, obtainRouteMatcher())); new DestinationPatternsMessageCondition(patterns, obtainRouteMatcher()));
} }
RSocketExchange ann3 = AnnotatedElementUtils.findMergedAnnotation(element, RSocketExchange.class);
if (ann3 != null && StringUtils.hasText(ann3.value())) {
String[] destinations = new String[]{ann3.value()};
return new CompositeMessageCondition(
RSocketFrameTypeMessageCondition.EMPTY_CONDITION,
new DestinationPatternsMessageCondition(processDestinations(destinations),
obtainRouteMatcher())
);
}
return null; return null;
} }
@ -402,7 +414,8 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
* connection. Such a method can also start requests to the client but that * connection. Such a method can also start requests to the client but that
* must be done decoupled from handling and from the current thread. * must be done decoupled from handling and from the current thread.
* <p>Subsequent requests on the connection can be handled with * <p>Subsequent requests on the connection can be handled with
* {@link MessageMapping MessageMapping} methods. * {@link MessageMapping MessageMapping}
* and {@link RSocketExchange RSocketExchange} methods.
*/ */
public SocketAcceptor responder() { public SocketAcceptor responder() {
return (setupPayload, sendingRSocket) -> { return (setupPayload, sendingRSocket) -> {

20
spring-messaging/src/test/java/org/springframework/messaging/rsocket/service/RSocketServiceIntegrationTests.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2022 the original author or authors. * Copyright 2002-2023 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -33,7 +33,6 @@ import reactor.test.StepVerifier;
import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.rsocket.RSocketRequester; import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies; import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler; import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
@ -45,6 +44,7 @@ import org.springframework.util.MimeTypeUtils;
* Integration tests with RSocket Service client. * Integration tests with RSocket Service client.
* *
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @author Olga Maciaszek-Sharma
*/ */
class RSocketServiceIntegrationTests { class RSocketServiceIntegrationTests {
@ -57,7 +57,7 @@ class RSocketServiceIntegrationTests {
@BeforeAll @BeforeAll
@SuppressWarnings("ConstantConditions") @SuppressWarnings("ConstantConditions")
static void setupOnce() throws Exception { static void setupOnce() {
MimeType metadataMimeType = MimeTypeUtils.parseMimeType( MimeType metadataMimeType = MimeTypeUtils.parseMimeType(
WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString()); WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());
@ -112,28 +112,26 @@ class RSocketServiceIntegrationTests {
} }
@Controller @RSocketExchange("echo")
interface Service { interface Service {
@RSocketExchange("echo-async") @RSocketExchange("async")
Mono<String> echoAsync(String payload); Mono<String> echoAsync(String payload);
@RSocketExchange("echo-stream") @RSocketExchange("stream")
Flux<String> echoStream(String payload); Flux<String> echoStream(String payload);
} }
@Controller @Controller
static class ServerController { static class ServerController implements Service {
@MessageMapping("echo-async") public Mono<String> echoAsync(String payload) {
Mono<String> echoAsync(String payload) {
return Mono.delay(Duration.ofMillis(10)).map(aLong -> payload + " async"); return Mono.delay(Duration.ofMillis(10)).map(aLong -> payload + " async");
} }
@MessageMapping("echo-stream") public Flux<String> echoStream(String payload) {
Flux<String> echoStream(String payload) {
return Flux.interval(Duration.ofMillis(10)).map(aLong -> payload + " " + aLong); return Flux.interval(Duration.ofMillis(10)).map(aLong -> payload + " " + aLong);
} }

Loading…
Cancel
Save