diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java index 1e543f02092..9769674e5b8 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java @@ -22,6 +22,7 @@ import java.util.function.Consumer; import io.rsocket.Payload; import io.rsocket.RSocket; +import io.rsocket.RSocketClient; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -48,8 +49,7 @@ final class DefaultRSocketRequester implements RSocketRequester { private static final Map EMPTY_HINTS = Collections.emptyMap(); - - private final RSocket rsocket; + private final RSocketDelegate rsocketDelegate; private final MimeType dataMimeType; @@ -61,15 +61,15 @@ final class DefaultRSocketRequester implements RSocketRequester { DefaultRSocketRequester( - RSocket rsocket, MimeType dataMimeType, MimeType metadataMimeType, + RSocketDelegate rsocketDelegate, MimeType dataMimeType, MimeType metadataMimeType, RSocketStrategies strategies) { - Assert.notNull(rsocket, "RSocket is required"); + Assert.notNull(rsocketDelegate, "RSocket or RSocketClient is required"); Assert.notNull(dataMimeType, "'dataMimeType' is required"); Assert.notNull(metadataMimeType, "'metadataMimeType' is required"); Assert.notNull(strategies, "RSocketStrategies is required"); - this.rsocket = rsocket; + this.rsocketDelegate = rsocketDelegate; this.dataMimeType = dataMimeType; this.metadataMimeType = metadataMimeType; this.strategies = strategies; @@ -77,9 +77,11 @@ final class DefaultRSocketRequester implements RSocketRequester { } + @Nullable @Override public RSocket rsocket() { - return this.rsocket; + return (this.rsocketDelegate instanceof ConnectionRSocketDelegate ? + ((ConnectionRSocketDelegate) this.rsocketDelegate).getRSocket() : null); } @Override @@ -102,6 +104,10 @@ final class DefaultRSocketRequester implements RSocketRequester { return new DefaultRequestSpec(metadata, mimeType); } + @Override + public void dispose() { + this.rsocketDelegate.dispose(); + } private static boolean isVoid(ResolvableType elementType) { return (Void.class.equals(elementType.resolve()) || void.class.equals(elementType.resolve())); @@ -250,12 +256,12 @@ final class DefaultRSocketRequester implements RSocketRequester { @Override public Mono sendMetadata() { - return getPayloadMono().flatMap(rsocket::metadataPush); + return rsocketDelegate().metadataPush(getPayloadMono()); } @Override public Mono send() { - return getPayloadMono().flatMap(rsocket::fireAndForget); + return rsocketDelegate().fireAndForget(getPayloadMono()); } @Override @@ -270,7 +276,7 @@ final class DefaultRSocketRequester implements RSocketRequester { @SuppressWarnings("unchecked") private Mono retrieveMono(ResolvableType elementType) { - Mono payloadMono = getPayloadMono().flatMap(rsocket::requestResponse); + Mono payloadMono = rsocketDelegate().requestResponse(getPayloadMono()); if (isVoid(elementType)) { return (Mono) payloadMono.then(); @@ -295,8 +301,8 @@ final class DefaultRSocketRequester implements RSocketRequester { private Flux retrieveFlux(ResolvableType elementType) { Flux payloadFlux = (this.payloadFlux != null ? - rsocket.requestChannel(this.payloadFlux) : - getPayloadMono().flatMapMany(rsocket::requestStream)); + rsocketDelegate().requestChannel(this.payloadFlux) : + rsocketDelegate().requestStream(getPayloadMono())); if (isVoid(elementType)) { return payloadFlux.thenMany(Flux.empty()); @@ -307,6 +313,10 @@ final class DefaultRSocketRequester implements RSocketRequester { (T) decoder.decode(dataBuffer, elementType, dataMimeType, EMPTY_HINTS)); } + private RSocketDelegate rsocketDelegate() { + return DefaultRSocketRequester.this.rsocketDelegate; + } + private Mono getPayloadMono() { Assert.state(this.payloadFlux == null, "No RSocket interaction with Flux request and Mono response."); return this.payloadMono != null ? this.payloadMono : firstPayload(emptyBufferMono); @@ -316,4 +326,107 @@ final class DefaultRSocketRequester implements RSocketRequester { return PayloadUtils.retainDataAndReleasePayload(payload, bufferFactory()); } } + + + // Contract to avoid a hard dependency on RSocketClient for now. + + interface RSocketDelegate { + + Mono fireAndForget(Mono payloadMono); + + Mono requestResponse(Mono payloadMono); + + Flux requestStream(Mono payloadMono); + + Flux requestChannel(Publisher payloadPublisher); + + Mono metadataPush(Mono payloadMono); + + void dispose(); + } + + static class ConnectionRSocketDelegate implements RSocketDelegate { + + private final RSocket rsocket; + + public ConnectionRSocketDelegate(RSocket rsocket) { + Assert.notNull(rsocket, "RSocket is required"); + this.rsocket = rsocket; + } + + public RSocket getRSocket() { + return this.rsocket; + } + + @Override + public Mono fireAndForget(Mono payloadMono) { + return payloadMono.flatMap(this.rsocket::fireAndForget); + } + + @Override + public Mono requestResponse(Mono payloadMono) { + return payloadMono.flatMap(this.rsocket::requestResponse); + } + + @Override + public Flux requestStream(Mono payloadMono) { + return payloadMono.flatMapMany(this.rsocket::requestStream); + } + + @Override + public Flux requestChannel(Publisher payloadPublisher) { + return this.rsocket.requestChannel(payloadPublisher); + } + + @Override + public Mono metadataPush(Mono payloadMono) { + return payloadMono.flatMap(this.rsocket::metadataPush); + } + + @Override + public void dispose() { + this.rsocket.dispose(); + } + } + + static class ClientRSocketDelegate implements RSocketDelegate { + + private final RSocketClient rsocketClient; + + public ClientRSocketDelegate(RSocketClient rsocketClient) { + Assert.notNull(rsocketClient, "RSocketClient is required"); + this.rsocketClient = rsocketClient; + } + + @Override + public Mono fireAndForget(Mono payloadMono) { + return this.rsocketClient.fireAndForget(payloadMono); + } + + @Override + public Mono requestResponse(Mono payloadMono) { + return this.rsocketClient.requestResponse(payloadMono); + } + + @Override + public Flux requestStream(Mono payloadMono) { + return this.rsocketClient.requestStream(payloadMono); + } + + @Override + public Flux requestChannel(Publisher payloadPublisher) { + return this.rsocketClient.requestChannel(payloadPublisher); + } + + @Override + public Mono metadataPush(Mono payloadMono) { + return this.rsocketClient.metadataPush(payloadMono); + } + + @Override + public void dispose() { + this.rsocketClient.dispose(); + } + } + } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java index fcec3a4c130..1ae1051798f 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java @@ -23,10 +23,9 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.function.Consumer; -import java.util.function.Function; import io.rsocket.Payload; -import io.rsocket.RSocket; +import io.rsocket.core.RSocketConnector; import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.metadata.WellKnownMimeType; import io.rsocket.transport.ClientTransport; @@ -45,7 +44,6 @@ import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.lang.Nullable; import org.springframework.util.Assert; -import org.springframework.util.ClassUtils; import org.springframework.util.CollectionUtils; import org.springframework.util.MimeType; import org.springframework.util.MimeTypeUtils; @@ -59,11 +57,6 @@ import org.springframework.util.MimeTypeUtils; */ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { - private final static boolean rsocketConnectorPresent = - ClassUtils.isPresent("io.rsocket.core.RSocketConnector", - DefaultRSocketRequesterBuilder.class.getClassLoader()); - - private static final Map HINTS = Collections.emptyMap(); private static final byte[] EMPTY_BYTE_ARRAY = new byte[0]; @@ -165,47 +158,70 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { } @Override + public RSocketRequester tcp(String host, int port) { + return transport(TcpClientTransport.create(host, port)); + } + + @Override + public RSocketRequester websocket(URI uri) { + return transport(WebsocketClientTransport.create(uri)); + } + + @Override + public RSocketRequester transport(ClientTransport transport) { + RSocketStrategies strategies = getRSocketStrategies(); + Assert.isTrue(!strategies.encoders().isEmpty(), "No encoders"); + Assert.isTrue(!strategies.decoders().isEmpty(), "No decoders"); + + MimeType metaMimeType = (this.metadataMimeType != null ? this.metadataMimeType : + MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString())); + + MimeType dataMimeType = getDataMimeType(strategies); + Mono setupPayload = getSetupPayload(dataMimeType, metaMimeType, strategies); + + RSocketConnector connector = initConnector( + this.rsocketConnectorConfigurers, this.rsocketFactoryConfigurers, + metaMimeType, dataMimeType, setupPayload, strategies); + + return new DefaultRSocketRequester( + new DefaultRSocketRequester.ClientRSocketDelegate(connector.toRSocketClient(transport)), + dataMimeType, metaMimeType, strategies); + } + + @Override + @SuppressWarnings("deprecation") public Mono connectTcp(String host, int port) { return connect(TcpClientTransport.create(host, port)); } @Override + @SuppressWarnings("deprecation") public Mono connectWebSocket(URI uri) { return connect(WebsocketClientTransport.create(uri)); } @Override + @SuppressWarnings("deprecation") public Mono connect(ClientTransport transport) { + RSocketStrategies rsocketStrategies = getRSocketStrategies(); Assert.isTrue(!rsocketStrategies.encoders().isEmpty(), "No encoders"); Assert.isTrue(!rsocketStrategies.decoders().isEmpty(), "No decoders"); - MimeType metaMimeType = this.metadataMimeType != null ? this.metadataMimeType : - MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString()); + MimeType metaMimeType = (this.metadataMimeType != null ? this.metadataMimeType : + MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString())); MimeType dataMimeType = getDataMimeType(rsocketStrategies); Mono setupPayload = getSetupPayload(dataMimeType, metaMimeType, rsocketStrategies); - Function> connectFunction; - if (rsocketConnectorPresent) { - connectFunction = payload -> new RSocketConnectorHelper().getRSocketMono( - this.rsocketConnectorConfigurers, this.rsocketFactoryConfigurers, - metaMimeType, dataMimeType, setupPayload, rsocketStrategies, transport, payload); - } - else { - connectFunction = payload -> new RSocketFactoryHelper().getRSocketMono( - this.rsocketFactoryConfigurers, metaMimeType, dataMimeType, - setupPayload, rsocketStrategies, transport, payload); - } - - // In RSocket 1.0.2 we can pass a Mono for the setup Payload. Until then we have to - // resolve it and then cache the Mono because it may be a ReconnectMono. + RSocketConnector connector = initConnector( + this.rsocketConnectorConfigurers, this.rsocketFactoryConfigurers, + metaMimeType, dataMimeType, setupPayload, rsocketStrategies); - return setupPayload - .map(connectFunction) - .cache() - .flatMap(mono -> mono.map(rsocket -> - new DefaultRSocketRequester(rsocket, dataMimeType, metaMimeType, rsocketStrategies))); + return connector.connect(transport).map(rsocket -> + new DefaultRSocketRequester( + new DefaultRSocketRequester.ConnectionRSocketDelegate(rsocket), + dataMimeType, metaMimeType, rsocketStrategies)); } private RSocketStrategies getRSocketStrategies() { @@ -289,60 +305,32 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { .doOnDiscard(Payload.class, Payload::release); } - @SuppressWarnings("deprecation") - private static class RSocketConnectorHelper { - - Mono getRSocketMono(List connectorConfigurers, - List factoryConfigurers, - MimeType metaMimeType, MimeType dataMimeType, Mono setupPayload, - RSocketStrategies rsocketStrategies, ClientTransport transport, Payload payload) { - - io.rsocket.core.RSocketConnector connector = io.rsocket.core.RSocketConnector.create(); - connectorConfigurers.forEach(c -> c.configure(connector)); - - if (!factoryConfigurers.isEmpty()) { - io.rsocket.RSocketFactory.ClientRSocketFactory factory = - new io.rsocket.RSocketFactory.ClientRSocketFactory(connector); - factoryConfigurers.forEach(c -> c.configure(factory)); - } - - if (rsocketStrategies.dataBufferFactory() instanceof NettyDataBufferFactory) { - connector.payloadDecoder(PayloadDecoder.ZERO_COPY); - } - - connector.metadataMimeType(metaMimeType.toString()); - connector.dataMimeType(dataMimeType.toString()); - - if (setupPayload != EMPTY_SETUP_PAYLOAD) { - connector.setupPayload(payload); - } - return connector.connect(transport); + private RSocketConnector initConnector(List connectorConfigurers, + List factoryConfigurers, + MimeType metaMimeType, MimeType dataMimeType, Mono setupPayloadMono, + RSocketStrategies rsocketStrategies) { + + RSocketConnector connector = RSocketConnector.create(); + connectorConfigurers.forEach(c -> c.configure(connector)); + + if (!factoryConfigurers.isEmpty()) { + io.rsocket.RSocketFactory.ClientRSocketFactory factory = + new io.rsocket.RSocketFactory.ClientRSocketFactory(connector); + factoryConfigurers.forEach(c -> c.configure(factory)); } - } - - - @SuppressWarnings("deprecation") - private static class RSocketFactoryHelper { - Mono getRSocketMono(List configurers, - MimeType metaMimeType, MimeType dataMimeType, Mono setupPayload, - RSocketStrategies rsocketStrategies, ClientTransport transport, Payload payload) { + if (rsocketStrategies.dataBufferFactory() instanceof NettyDataBufferFactory) { + connector.payloadDecoder(PayloadDecoder.ZERO_COPY); + } - io.rsocket.RSocketFactory.ClientRSocketFactory factory = io.rsocket.RSocketFactory.connect(); - configurers.forEach(c -> c.configure(factory)); + connector.metadataMimeType(metaMimeType.toString()); + connector.dataMimeType(dataMimeType.toString()); - if (rsocketStrategies.dataBufferFactory() instanceof NettyDataBufferFactory) { - factory.frameDecoder(PayloadDecoder.ZERO_COPY); - } - - factory.metadataMimeType(metaMimeType.toString()); - factory.dataMimeType(dataMimeType.toString()); - if (setupPayload != EMPTY_SETUP_PAYLOAD) { - factory.setupPayload(payload); - } - return factory.transport(transport).start(); + if (setupPayloadMono != EMPTY_SETUP_PAYLOAD) { + connector.setupPayload(setupPayloadMono); } + return connector; } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java index 1188b5431a7..1542aa3b689 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java @@ -48,8 +48,15 @@ import org.springframework.util.MimeType; public interface RSocketRequester { /** - * Return the underlying sending RSocket. + * This method returns {@code null} unless the the requester was created + * with a "live" RSocket through one of the (now deprecated) builder connect + * methods or via {@link #wrap(RSocket, MimeType, MimeType, RSocketStrategies)} + * which is mainly for internal use in client and server responder + * implementations. Otherwise in the more common case where there is no + * "live" RSocket, the requester delegates to an + * {@link io.rsocket.RSocketClient}. */ + @Nullable RSocket rsocket(); /** @@ -96,6 +103,12 @@ public interface RSocketRequester { */ RequestSpec metadata(Object metadata, @Nullable MimeType mimeType); + /** + * Invoke the dispose method on the underlying + * {@link io.rsocket.RSocketClient} or {@link RSocket}. + * @since 5.3 + */ + public void dispose(); /** * Obtain a builder to create a client {@link RSocketRequester} by connecting @@ -113,7 +126,9 @@ public interface RSocketRequester { RSocket rsocket, MimeType dataMimeType, MimeType metadataMimeType, RSocketStrategies strategies) { - return new DefaultRSocketRequester(rsocket, dataMimeType, metadataMimeType, strategies); + return new DefaultRSocketRequester( + new DefaultRSocketRequester.ConnectionRSocketDelegate(rsocket), + dataMimeType, metadataMimeType, strategies); } @@ -236,28 +251,65 @@ public interface RSocketRequester { */ RSocketRequester.Builder apply(Consumer configurer); + /** + * Build an {@link RSocketRequester} instance for use with a TCP + * transport. Requests are made via {@link io.rsocket.RSocketClient} + * which establishes a shared TCP connection to given host and port. + * @param host the host of the server to connect to + * @param port the port of the server to connect to + * @return the created {@code RSocketRequester} + * @since 5.3 + */ + RSocketRequester tcp(String host, int port); + + /** + * Build an {@link RSocketRequester} instance for use with a WebSocket + * transport. Requests are made via {@link io.rsocket.RSocketClient} + * which establishes a shared WebSocket connection to given URL. + * @param uri the URL of the server to connect to + * @return the created {@code RSocketRequester} + * @since 5.3 + */ + RSocketRequester websocket(URI uri); + + /** + * Build an {@link RSocketRequester} instance for use with the given + * transport. Requests are made via {@link io.rsocket.RSocketClient} + * which establishes a shared connection through the given transport. + * @param transport the transport to use for connecting to the server + * @return the created {@code RSocketRequester} + * @since 5.3 + */ + RSocketRequester transport(ClientTransport transport); + /** * Connect to the server over TCP. * @param host the server host * @param port the server port * @return an {@code RSocketRequester} for the connection + * @deprecated as of 5.3 in favor of {@link #tcp(String, int)} * @see TcpClientTransport */ + @Deprecated Mono connectTcp(String host, int port); /** * Connect to the server over WebSocket. * @param uri the RSocket server endpoint URI * @return an {@code RSocketRequester} for the connection + * @deprecated as of 5.3 in favor of {@link #websocket(URI)} * @see WebsocketClientTransport */ + @Deprecated Mono connectWebSocket(URI uri); /** * Connect to the server with the given {@code ClientTransport}. * @param transport the client transport to use * @return an {@code RSocketRequester} for the connection + * @deprecated as of 5.3 in favor of {@link #transport(ClientTransport)} */ + @Deprecated Mono connect(ClientTransport transport); } diff --git a/spring-messaging/src/main/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensions.kt b/spring-messaging/src/main/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensions.kt index 0d7635a5e01..687a94a67fe 100644 --- a/spring-messaging/src/main/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensions.kt +++ b/spring-messaging/src/main/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensions.kt @@ -18,9 +18,9 @@ package org.springframework.messaging.rsocket import io.rsocket.transport.ClientTransport import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.reactive.asFlow import kotlinx.coroutines.reactive.awaitFirstOrNull import kotlinx.coroutines.reactive.awaitSingle -import kotlinx.coroutines.reactive.asFlow import org.reactivestreams.Publisher import org.springframework.core.ParameterizedTypeReference import reactor.core.publisher.Flux @@ -33,6 +33,7 @@ import java.net.URI * @author Sebastien Deleuze * @since 5.2 */ +@Suppress("DEPRECATION") suspend fun RSocketRequester.Builder.connectAndAwait(transport: ClientTransport): RSocketRequester = connect(transport).awaitSingle() @@ -42,6 +43,7 @@ suspend fun RSocketRequester.Builder.connectAndAwait(transport: ClientTransport) * @author Sebastien Deleuze * @since 5.2 */ +@Suppress("DEPRECATION") suspend fun RSocketRequester.Builder.connectTcpAndAwait(host: String, port: Int): RSocketRequester = connectTcp(host, port).awaitSingle() @@ -51,6 +53,7 @@ suspend fun RSocketRequester.Builder.connectTcpAndAwait(host: String, port: Int) * @author Sebastien Deleuze * @since 5.2 */ +@Suppress("DEPRECATION") suspend fun RSocketRequester.Builder.connectWebSocketAndAwait(uri: URI): RSocketRequester = connectWebSocket(uri).awaitSingle() diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java index d2c1abf1531..84bdb562884 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java @@ -55,7 +55,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoInteractions; /** * Unit tests for {@link DefaultRSocketRequesterBuilder}. @@ -75,22 +74,10 @@ public class DefaultRSocketRequesterBuilderTests { public void setup() { this.transport = mock(ClientTransport.class); given(this.transport.connect()).willReturn(Mono.just(this.connection)); + given(this.transport.maxFrameLength()).willReturn(16777215); } - @Test - @SuppressWarnings("unchecked") - public void rsocketConnectorConfigurerAppliesAtSubscription() { - Consumer strategiesConfigurer = mock(Consumer.class); - RSocketRequester.builder() - .rsocketConnector(this.connectorConfigurer) - .rsocketStrategies(strategiesConfigurer) - .connect(this.transport); - - verifyNoInteractions(this.transport); - assertThat(this.connectorConfigurer.connector()).isNull(); - } - @Test @SuppressWarnings({"unchecked", "deprecation"}) public void rsocketConnectorConfigurer() { @@ -100,12 +87,10 @@ public class DefaultRSocketRequesterBuilderTests { .rsocketConnector(this.connectorConfigurer) .rsocketFactory(factoryConfigurer) .rsocketStrategies(strategiesConfigurer) - .connect(this.transport) - .block(); + .transport(this.transport); // RSocketStrategies and RSocketConnector configurers should have been called - verify(this.transport).connect(); verify(strategiesConfigurer).accept(any(RSocketStrategies.Builder.class)); verify(factoryConfigurer).configure(any(io.rsocket.RSocketFactory.ClientRSocketFactory.class)); assertThat(this.connectorConfigurer.connector()).isNotNull(); @@ -113,9 +98,7 @@ public class DefaultRSocketRequesterBuilderTests { @Test public void defaultDataMimeType() { - RSocketRequester requester = RSocketRequester.builder() - .connect(this.transport) - .block(); + RSocketRequester requester = RSocketRequester.builder().transport(this.transport); assertThat(requester.dataMimeType()) .as("Default data MimeType, based on the first Decoder") @@ -130,8 +113,7 @@ public class DefaultRSocketRequesterBuilderTests { RSocketRequester requester = RSocketRequester.builder() .rsocketStrategies(strategies) - .connect(this.transport) - .block(); + .transport(this.transport); assertThat(requester.dataMimeType()) .as("Default data MimeType, based on the first configured, non-default Decoder") @@ -142,12 +124,9 @@ public class DefaultRSocketRequesterBuilderTests { public void dataMimeTypeExplicitlySet() { RSocketRequester requester = RSocketRequester.builder() .dataMimeType(MimeTypeUtils.APPLICATION_JSON) - .connect(this.transport) - .block(); + .transport(this.transport); - ConnectionSetupPayload setupPayload = Mono.from(this.connection.sentFrames()) - .map(DefaultConnectionSetupPayload::new) - .block(); + ConnectionSetupPayload setupPayload = getConnectionSetupPayload(requester); assertThat(setupPayload.dataMimeType()).isEqualTo("application/json"); assertThat(requester.dataMimeType()).isEqualTo(MimeTypeUtils.APPLICATION_JSON); @@ -165,12 +144,9 @@ public class DefaultRSocketRequesterBuilderTests { connector.metadataMimeType("text/plain"); connector.dataMimeType("application/xml"); }) - .connect(this.transport) - .block(); + .transport(this.transport); - ConnectionSetupPayload setupPayload = Mono.from(this.connection.sentFrames()) - .map(DefaultConnectionSetupPayload::new) - .block(); + ConnectionSetupPayload setupPayload = getConnectionSetupPayload(requester); assertThat(setupPayload.dataMimeType()).isEqualTo(dataMimeType.toString()); assertThat(setupPayload.metadataMimeType()).isEqualTo(metaMimeType.toString()); @@ -180,17 +156,14 @@ public class DefaultRSocketRequesterBuilderTests { @Test public void setupRoute() { - RSocketRequester.builder() + RSocketRequester requester = RSocketRequester.builder() .dataMimeType(MimeTypeUtils.TEXT_PLAIN) .metadataMimeType(MimeTypeUtils.TEXT_PLAIN) .setupRoute("toA") .setupData("My data") - .connect(this.transport) - .block(); + .transport(this.transport); - ConnectionSetupPayload setupPayload = Mono.from(this.connection.sentFrames()) - .map(DefaultConnectionSetupPayload::new) - .block(); + ConnectionSetupPayload setupPayload = getConnectionSetupPayload(requester); assertThat(setupPayload.getMetadataUtf8()).isEqualTo("toA"); assertThat(setupPayload.getDataUtf8()).isEqualTo("My data"); @@ -203,18 +176,15 @@ public class DefaultRSocketRequesterBuilderTests { Mono asyncMeta2 = Mono.delay(Duration.ofMillis(1)).map(aLong -> "Async Metadata 2"); Mono data = Mono.delay(Duration.ofMillis(1)).map(aLong -> "Async data"); - RSocketRequester.builder() + RSocketRequester requester = RSocketRequester.builder() .dataMimeType(MimeTypeUtils.TEXT_PLAIN) .setupRoute("toA") .setupMetadata(asyncMeta1, new MimeType("text", "x.test.metadata1")) .setupMetadata(asyncMeta2, new MimeType("text", "x.test.metadata2")) .setupData(data) - .connect(this.transport) - .block(); + .transport(this.transport); - ConnectionSetupPayload payload = Mono.from(this.connection.sentFrames()) - .map(DefaultConnectionSetupPayload::new) - .block(); + ConnectionSetupPayload setupPayload = getConnectionSetupPayload(requester); MimeType compositeMimeType = MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString()); @@ -222,11 +192,11 @@ public class DefaultRSocketRequesterBuilderTests { DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(StringDecoder.allMimeTypes()); extractor.metadataToExtract(new MimeType("text", "x.test.metadata1"), String.class, "asyncMeta1"); extractor.metadataToExtract(new MimeType("text", "x.test.metadata2"), String.class, "asyncMeta2"); - Map metadataValues = extractor.extract(payload, compositeMimeType); + Map metadataValues = extractor.extract(setupPayload, compositeMimeType); assertThat(metadataValues.get("asyncMeta1")).isEqualTo("Async Metadata 1"); assertThat(metadataValues.get("asyncMeta2")).isEqualTo("Async Metadata 2"); - assertThat(payload.getDataUtf8()).isEqualTo("Async data"); + assertThat(setupPayload.getDataUtf8()).isEqualTo("Async data"); } @Test @@ -235,6 +205,12 @@ public class DefaultRSocketRequesterBuilderTests { testPayloadDecoder(DefaultDataBufferFactory.sharedInstance, PayloadDecoder.DEFAULT); } + private ConnectionSetupPayload getConnectionSetupPayload(RSocketRequester requester) { + // Trigger connection and sending of SETUP frame + requester.route("any-route").data("any-data").send().block(); + return new DefaultConnectionSetupPayload(this.connection.setupFrame()); + } + private void testPayloadDecoder(DataBufferFactory bufferFactory, PayloadDecoder payloadDecoder) throws NoSuchFieldException { @@ -245,8 +221,7 @@ public class DefaultRSocketRequesterBuilderTests { RSocketRequester.builder() .rsocketStrategies(strategies) .rsocketConnector(this.connectorConfigurer) - .connect(this.transport) - .block(); + .transport(this.transport); RSocketConnector connector = this.connectorConfigurer.connector(); assertThat(connector).isNotNull(); @@ -260,16 +235,21 @@ public class DefaultRSocketRequesterBuilderTests { static class MockConnection implements DuplexConnection { - private Publisher sentFrames; + private ByteBuf setupFrame; - public Publisher sentFrames() { - return this.sentFrames; + public ByteBuf setupFrame() { + return this.setupFrame; } @Override public Mono send(Publisher frames) { - this.sentFrames = frames; + return Mono.empty(); + } + + @Override + public Mono sendOne(ByteBuf frame) { + this.setupFrame = frame; return Mono.empty(); } @@ -285,12 +265,17 @@ public class DefaultRSocketRequesterBuilderTests { @Override public Mono onClose() { - return Mono.empty(); + return Mono.never(); } @Override public void dispose() { } + + @Override + public boolean isDisposed() { + return false; + } } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java index a7d6f7a8fb3..dc9b116ac7f 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java @@ -89,13 +89,12 @@ class RSocketBufferLeakTests { requester = RSocketRequester.builder() .rsocketConnector(conn -> conn.interceptors(registry -> registry.forRequester(payloadInterceptor))) .rsocketStrategies(context.getBean(RSocketStrategies.class)) - .connectTcp("localhost", 7000) - .block(); + .tcp("localhost", 7000); } @AfterAll void tearDownOnce() { - requester.rsocket().dispose(); + requester.dispose(); server.dispose(); context.close(); } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java index ceaaf839f50..ddfb7a61f14 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java @@ -91,13 +91,12 @@ public class RSocketClientToServerIntegrationTests { requester = RSocketRequester.builder() .metadataMimeType(metadataMimeType) .rsocketStrategies(context.getBean(RSocketStrategies.class)) - .connectTcp("localhost", 7000) - .block(); + .tcp("localhost", 7000); } @AfterAll public static void tearDownOnce() { - requester.rsocket().dispose(); + requester.dispose(); server.dispose(); } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java index d4e45439ae7..9269af62421 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java @@ -108,14 +108,16 @@ public class RSocketServerToClientIntegrationTests { .setupRoute(connectionRoute) .rsocketStrategies(strategies) .rsocketConnector(connector -> connector.acceptor(responder)) - .connectTcp("localhost", server.address().getPort()) - .block(); + .tcp("localhost", server.address().getPort()); + + // Make a request to cause a connection to be established. + requester.route("fnf").send().block(); context.getBean(ServerController.class).await(Duration.ofSeconds(5)); } finally { if (requester != null) { - requester.rsocket().dispose(); + requester.dispose(); } } } @@ -199,7 +201,6 @@ public class RSocketServerToClientIntegrationTests { }); } - private void runTest(Runnable testEcho) { Mono.fromRunnable(testEcho) .doOnError(ex -> result.onError(ex)) @@ -207,6 +208,10 @@ public class RSocketServerToClientIntegrationTests { .subscribeOn(Schedulers.boundedElastic()) // StepVerifier will block .subscribe(); } + + @MessageMapping("fnf") + void handleFireAndForget() { + } } diff --git a/spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketClientToServerCoroutinesIntegrationTests.kt b/spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketClientToServerCoroutinesIntegrationTests.kt index cdcea8e46cd..83cae672959 100644 --- a/spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketClientToServerCoroutinesIntegrationTests.kt +++ b/spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketClientToServerCoroutinesIntegrationTests.kt @@ -265,14 +265,13 @@ class RSocketClientToServerCoroutinesIntegrationTests { requester = RSocketRequester.builder() .rsocketConnector { connector -> connector.payloadDecoder(PayloadDecoder.ZERO_COPY) } .rsocketStrategies(context.getBean(RSocketStrategies::class.java)) - .connectTcp("localhost", 7000) - .block()!! + .tcp("localhost", 7000) } @AfterAll @JvmStatic fun tearDownOnce() { - requester.rsocket().dispose() + requester.dispose() server.dispose() } } diff --git a/spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensionsTests.kt b/spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensionsTests.kt index 7454c80153e..50aff2d49b1 100644 --- a/spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensionsTests.kt +++ b/spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensionsTests.kt @@ -39,6 +39,7 @@ class RSocketRequesterExtensionsTests { private val stringTypeRefMatcher: (ParameterizedTypeReference<*>) -> Boolean = { it.type == String::class.java } @Test + @Suppress("DEPRECATION") fun connectAndAwait() { val requester = mockk() val builder = mockk() @@ -49,6 +50,7 @@ class RSocketRequesterExtensionsTests { } @Test + @Suppress("DEPRECATION") fun connectTcpAndAwait() { val host = "127.0.0.1" val requester = mockk() @@ -60,6 +62,7 @@ class RSocketRequesterExtensionsTests { } @Test + @Suppress("DEPRECATION") fun connectWebSocketAndAwait() { val requester = mockk() val builder = mockk() diff --git a/src/docs/asciidoc/rsocket.adoc b/src/docs/asciidoc/rsocket.adoc index 0ea88e33bd5..03fc05719df 100644 --- a/src/docs/asciidoc/rsocket.adoc +++ b/src/docs/asciidoc/rsocket.adoc @@ -176,80 +176,33 @@ symmetrically, to make requests from clients and to make requests from servers. [[rsocket-requester-client]] === Client Requester -To obtain an `RSocketRequester` on the client side requires connecting to a server along with -preparing and sending the initial RSocket `SETUP` frame. `RSocketRequester` provides a -builder for that. Internally it builds on `io.rsocket.core.RSocketConnector`. +To obtain an `RSocketRequester` on the client side is to connect to a server which involves +sending an RSocket `SETUP` frame with connection settings. `RSocketRequester` provides a +builder that helps to prepare an `io.rsocket.core.RSocketConnector` including connection +settings for the `SETUP` frame. This is the most basic way to connect with default settings: [source,java,indent=0,subs="verbatim,quotes",role="primary"] .Java ---- - Mono requesterMono = RSocketRequester.builder() - .connectTcp("localhost", 7000); + RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000); - Mono requesterMono = RSocketRequester.builder() - .connectWebSocket(URI.create("https://example.org:8080/rsocket")); + URI url = URI.create("https://example.org:8080/rsocket"); + RSocketRequester requester = RSocketRequester.builder().webSocket(url); ---- [source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] .Kotlin ---- - import org.springframework.messaging.rsocket.connectTcpAndAwait - import org.springframework.messaging.rsocket.connectWebSocketAndAwait + val requester = RSocketRequester.builder().tcp("localhost", 7000) - val requester = RSocketRequester.builder() - .connectTcpAndAwait("localhost", 7000) - - val requester = RSocketRequester.builder() - .connectWebSocketAndAwait(URI.create("https://example.org:8080/rsocket")) ----- - -The above is deferred. To actually connect and use the requester: - -[source,java,indent=0,subs="verbatim,quotes",role="primary"] -.Java ----- - // Connect asynchronously - RSocketRequester.builder().connectTcp("localhost", 7000) - .subscribe(requester -> { - // ... - }); - - // Or block - RSocketRequester requester = RSocketRequester.builder() - .connectTcp("localhost", 7000) - .block(Duration.ofSeconds(5)); + URI url = URI.create("https://example.org:8080/rsocket"); + val requester = RSocketRequester.builder().webSocket(url) ---- -[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] -.Kotlin ----- - // Connect asynchronously - import org.springframework.messaging.rsocket.connectTcpAndAwait - - class MyService { - - private var requester: RSocketRequester? = null - - private suspend fun requester() = requester ?: - RSocketRequester.builder().connectTcpAndAwait("localhost", 7000).also { requester = it } - - suspend fun doSomething() = requester().route(...) - } - - // Or block - import org.springframework.messaging.rsocket.connectTcpAndAwait - - class MyService { - - private val requester = runBlocking { - RSocketRequester.builder().connectTcpAndAwait("localhost", 7000) - } - - suspend fun doSomething() = requester.route(...) - } ----- +The above does not connect immediately. When requests are made, a shared connection is +established transparently and used. [[rsocket-requester-client-setup]] @@ -291,16 +244,14 @@ can be registered as follows: .decoders(decoders -> decoders.add(new Jackson2CborDecoder())) .build(); - Mono requesterMono = RSocketRequester.builder() + RSocketRequester requester = RSocketRequester.builder() .rsocketStrategies(strategies) - .connectTcp("localhost", 7000); + .tcp("localhost", 7000); ---- [source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] .Kotlin ---- - import org.springframework.messaging.rsocket.connectTcpAndAwait - val strategies = RSocketStrategies.builder() .encoders { it.add(Jackson2CborEncoder()) } .decoders { it.add(Jackson2CborDecoder()) } @@ -308,7 +259,7 @@ can be registered as follows: val requester = RSocketRequester.builder() .rsocketStrategies(strategies) - .connectTcpAndAwait("localhost", 7000) + .tcp("localhost", 7000) ---- `RSocketStrategies` is designed for re-use. In some scenarios, e.g. client and server in @@ -334,9 +285,9 @@ infrastructure that's used on a server, but registered programmatically as follo SocketAcceptor responder = RSocketMessageHandler.responder(strategies, new ClientHandler()); // <2> - Mono requesterMono = RSocketRequester.builder() + RSocketRequester requester = RSocketRequester.builder() .rsocketConnector(connector -> connector.acceptor(responder)) // <3> - .connectTcp("localhost", 7000); + .tcp("localhost", 7000); ---- <1> Use `PathPatternRouteMatcher`, if `spring-web` is present, for efficient route matching. @@ -346,8 +297,6 @@ infrastructure that's used on a server, but registered programmatically as follo [source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] .Kotlin ---- - import org.springframework.messaging.rsocket.connectTcpAndAwait - val strategies = RSocketStrategies.builder() .routeMatcher(PathPatternRouteMatcher()) // <1> .build() @@ -357,7 +306,7 @@ infrastructure that's used on a server, but registered programmatically as follo val requester = RSocketRequester.builder() .rsocketConnector { it.acceptor(responder) } // <3> - .connectTcpAndAwait("localhost", 7000) + .tcp("localhost", 7000) ---- <1> Use `PathPatternRouteMatcher`, if `spring-web` is present, for efficient route matching. @@ -374,23 +323,22 @@ you can still declare `RSocketMessageHandler` as a Spring bean and then apply as ApplicationContext context = ... ; RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class); - Mono requesterMono = RSocketRequester.builder() + RSocketRequester requester = RSocketRequester.builder() .rsocketConnector(connector -> connector.acceptor(handler.responder())) - .connectTcp("localhost", 7000); + .tcp("localhost", 7000); ---- [source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] .Kotlin ---- import org.springframework.beans.factory.getBean - import org.springframework.messaging.rsocket.connectTcpAndAwait val context: ApplicationContext = ... val handler = context.getBean() val requester = RSocketRequester.builder() .rsocketConnector { it.acceptor(handler.responder()) } - .connectTcpAndAwait("localhost", 7000) + .tcp("localhost", 7000) ---- For the above you may also need to use `setHandlerPredicate` in `RSocketMessageHandler` to @@ -413,22 +361,21 @@ at that level as follows: [source,java,indent=0,subs="verbatim,quotes",role="primary"] .Java ---- - Mono requesterMono = RSocketRequester.builder() + RSocketRequester requester = RSocketRequester.builder() .rsocketConnector(connector -> { // ... }) - .connectTcp("localhost", 7000); + .tcp("localhost", 7000); ---- [source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] .Kotlin ---- - import org.springframework.messaging.rsocket.connectTcpAndAwait - val requester = RSocketRequester.builder() .rsocketConnector { //... - }.connectTcpAndAwait("localhost", 7000) + } + .tcp("localhost", 7000) ----