From 02904121a3af819613dfb2d4f976e512d42734ac Mon Sep 17 00:00:00 2001 From: Brian Clozel Date: Tue, 23 Apr 2019 10:55:23 +0200 Subject: [PATCH] Add RSocketRequest.Builder in Spring Messaging Prior to this commit, `RSocketRequester` would have a single `RSocketRequester.create` static method taking a fully built `RSocket` as an argument. Developers need to build an `RSocket` instance using the `RSocketFactory` and then use it to create a requester. To help developers set up a requester, this commit adds a new `RSocketRequester.Builder` interface and implementation. The `RSocket` building phase and codecs configuration are part of a single call chain. Subscribing to the returned `Mono` will configure and connect to the remote RSocket server. This design should be improved in gh-22798, since we will need to support metadata in a broader fashion. Closes gh-22806 --- spring-messaging/spring-messaging.gradle | 2 +- .../DefaultRSocketRequesterBuilder.java | 82 ++++++++++++++ .../messaging/rsocket/RSocketRequester.java | 61 ++++++++++ .../DefaultRSocketRequesterBuilderTests.java | 105 ++++++++++++++++++ ...RSocketClientToServerIntegrationTests.java | 21 ++-- 5 files changed, 257 insertions(+), 14 deletions(-) create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java create mode 100644 spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java diff --git a/spring-messaging/spring-messaging.gradle b/spring-messaging/spring-messaging.gradle index b8e2df484ae..5248302cb8b 100644 --- a/spring-messaging/spring-messaging.gradle +++ b/spring-messaging/spring-messaging.gradle @@ -16,6 +16,7 @@ dependencies { optional(project(":spring-oxm")) optional("io.projectreactor.netty:reactor-netty") optional("io.rsocket:rsocket-core:${rsocketVersion}") + optional("io.rsocket:rsocket-transport-netty:${rsocketVersion}") optional("com.fasterxml.jackson.core:jackson-databind:${jackson2Version}") optional("javax.xml.bind:jaxb-api:2.3.1") testCompile("javax.inject:javax.inject-tck:1") @@ -29,7 +30,6 @@ dependencies { testCompile("org.apache.activemq:activemq-stomp:5.8.0") testCompile("io.projectreactor:reactor-test") testCompile "io.reactivex.rxjava2:rxjava:${rxjava2Version}" - testCompile("io.rsocket:rsocket-transport-netty:${rsocketVersion}") testCompile("org.jetbrains.kotlin:kotlin-reflect:${kotlinVersion}") testCompile("org.jetbrains.kotlin:kotlin-stdlib:${kotlinVersion}") testCompile("org.xmlunit:xmlunit-matchers:2.6.2") diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java new file mode 100644 index 00000000000..0ff7b30240c --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java @@ -0,0 +1,82 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.rsocket; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +import io.rsocket.RSocketFactory; +import io.rsocket.transport.ClientTransport; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.transport.netty.client.WebsocketClientTransport; +import reactor.core.publisher.Mono; + +import org.springframework.lang.Nullable; +import org.springframework.util.MimeType; + +/** + * Default implementation of {@link RSocketRequester.Builder}. + * + * @author Brian Clozel + * @since 5.2 + */ +final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { + + @Nullable + private List> factoryConfigurers = new ArrayList<>(); + + @Nullable + private List> strategiesConfigurers = new ArrayList<>(); + + @Override + public RSocketRequester.Builder rsocketFactory(Consumer configurer) { + this.factoryConfigurers.add(configurer); + return this; + } + + @Override + public RSocketRequester.Builder rsocketStrategies(Consumer configurer) { + this.strategiesConfigurers.add(configurer); + return this; + } + + @Override + public Mono connect(ClientTransport transport, MimeType dataMimeType) { + return Mono.defer(() -> { + RSocketStrategies.Builder strategiesBuilder = RSocketStrategies.builder(); + this.strategiesConfigurers.forEach(configurer -> configurer.accept(strategiesBuilder)); + RSocketFactory.ClientRSocketFactory clientFactory = RSocketFactory.connect() + .dataMimeType(dataMimeType.toString()); + this.factoryConfigurers.forEach(configurer -> configurer.accept(clientFactory)); + return clientFactory.transport(transport).start() + .map(rsocket -> RSocketRequester.create(rsocket, dataMimeType, strategiesBuilder.build())); + }); + } + + @Override + public Mono connectTcp(String host, int port, MimeType dataMimeType) { + return connect(TcpClientTransport.create(host, port), dataMimeType); + } + + @Override + public Mono connectWebSocket(URI uri, MimeType dataMimeType) { + return connect(WebsocketClientTransport.create(uri), dataMimeType); + } + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java index d895ab2c3a1..b41f3910b36 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java @@ -16,7 +16,12 @@ package org.springframework.messaging.rsocket; +import java.net.URI; +import java.util.function.Consumer; + import io.rsocket.RSocket; +import io.rsocket.RSocketFactory; +import io.rsocket.transport.ClientTransport; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -32,6 +37,7 @@ import org.springframework.util.MimeType; * methods specify routing and other metadata. * * @author Rossen Stoyanchev + * @author Brian Clozel * @since 5.2 */ public interface RSocketRequester { @@ -55,6 +61,61 @@ public interface RSocketRequester { return new DefaultRSocketRequester(rsocket, dataMimeType, strategies); } + /** + * Obtain a {@code RSocketRequester} builder. + */ + static RSocketRequester.Builder builder() { + return new DefaultRSocketRequesterBuilder(); + } + + /** + * A mutable builder for creating a client {@link RSocketRequester}. + */ + interface Builder { + + /** + * Configure the client {@code RSocketFactory}. This is useful for + * customizing protocol options and add RSocket plugins. + * @param configurer the configurer to apply + */ + RSocketRequester.Builder rsocketFactory( + Consumer configurer); + + /** + * Configure the builder for {@link RSocketStrategies}. + *

The builder starts with an empty {@code RSocketStrategies}. + * @param configurer the configurer to apply + */ + RSocketRequester.Builder rsocketStrategies(Consumer configurer); + + /** + * Configure the {@code ClientTransport} for the RSocket connection + * and connect to the RSocket server + * @param transport the chosen client transport + * @return a mono containing the connected {@code RSocketRequester} + */ + Mono connect(ClientTransport transport, MimeType dataMimeType); + + /** + * Connect to the RSocket server over TCP transport using the + * provided connection parameters + * @param host the RSocket server host + * @param port the RSocket server port + * @param dataMimeType the data MimeType + * @return a mono containing the connected {@code RSocketRequester} + */ + Mono connectTcp(String host, int port, MimeType dataMimeType); + + /** + * Connect to the RSocket server over WebSocket transport using the + * provided connection parameters + * @param uri the RSocket server endpoint URI + * @param dataMimeType the data MimeType + * @return a mono containing the connected {@code RSocketRequester} + */ + Mono connectWebSocket(URI uri, MimeType dataMimeType); + + } // For now we treat metadata as a simple string that is the route. // This will change after the resolution of: diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java new file mode 100644 index 00000000000..c282accdd86 --- /dev/null +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java @@ -0,0 +1,105 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.rsocket; + +import java.util.function.Consumer; + +import io.netty.buffer.ByteBuf; +import io.rsocket.DuplexConnection; +import io.rsocket.RSocketFactory; +import io.rsocket.transport.ClientTransport; +import org.junit.Before; +import org.junit.Test; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.util.MimeTypeUtils; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link DefaultRSocketRequesterBuilder}. + * + * @author Brian Clozel + */ +public class DefaultRSocketRequesterBuilderTests { + + private ClientTransport transport; + + @Before + public void setup() { + this.transport = mock(ClientTransport.class); + when(this.transport.connect(anyInt())).thenReturn(Mono.just(new MockConnection())); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldApplyCustomizationsAtSubscription() { + Consumer factoryConfigurer = mock(Consumer.class); + Consumer strategiesConfigurer = mock(Consumer.class); + Mono requester = RSocketRequester.builder() + .rsocketFactory(factoryConfigurer) + .rsocketStrategies(strategiesConfigurer) + .connect(this.transport, MimeTypeUtils.APPLICATION_JSON); + verifyZeroInteractions(this.transport, factoryConfigurer, strategiesConfigurer); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldApplyCustomizations() { + Consumer factoryConfigurer = mock(Consumer.class); + Consumer strategiesConfigurer = mock(Consumer.class); + RSocketRequester requester = RSocketRequester.builder() + .rsocketFactory(factoryConfigurer) + .rsocketStrategies(strategiesConfigurer) + .connect(this.transport, MimeTypeUtils.APPLICATION_JSON) + .block(); + verify(this.transport).connect(anyInt()); + verify(factoryConfigurer).accept(any(RSocketFactory.ClientRSocketFactory.class)); + verify(strategiesConfigurer).accept(any(RSocketStrategies.Builder.class)); + } + + static class MockConnection implements DuplexConnection { + + @Override + public Mono send(Publisher frames) { + return Mono.empty(); + } + + @Override + public Flux receive() { + return Flux.empty(); + } + + @Override + public Mono onClose() { + return Mono.empty(); + } + + @Override + public void dispose() { + + } + } + +} \ No newline at end of file diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java index ac5698ffc76..3639622f9dd 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java @@ -19,10 +19,8 @@ package org.springframework.messaging.rsocket; import java.time.Duration; import io.netty.buffer.PooledByteBufAllocator; -import io.rsocket.RSocket; import io.rsocket.RSocketFactory; import io.rsocket.frame.decoder.PayloadDecoder; -import io.rsocket.transport.netty.client.TcpClientTransport; import io.rsocket.transport.netty.server.CloseableChannel; import io.rsocket.transport.netty.server.TcpServerTransport; import org.junit.AfterClass; @@ -59,8 +57,6 @@ public class RSocketClientToServerIntegrationTests { private static FireAndForgetCountingInterceptor interceptor = new FireAndForgetCountingInterceptor(); - private static RSocket client; - private static RSocketRequester requester; @@ -77,20 +73,19 @@ public class RSocketClientToServerIntegrationTests { .start() .block(); - client = RSocketFactory.connect() - .dataMimeType(MimeTypeUtils.TEXT_PLAIN_VALUE) - .frameDecoder(PayloadDecoder.ZERO_COPY) - .transport(TcpClientTransport.create("localhost", 7000)) - .start() + requester = RSocketRequester.builder() + .rsocketFactory(factory -> factory.frameDecoder(PayloadDecoder.ZERO_COPY)) + .rsocketStrategies(strategies -> strategies + .decoder(StringDecoder.allMimeTypes()) + .encoder(CharSequenceEncoder.allMimeTypes()) + .dataBufferFactory(new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT))) + .connectTcp("localhost", 7000, MimeTypeUtils.TEXT_PLAIN) .block(); - - requester = RSocketRequester.create( - client, MimeTypeUtils.TEXT_PLAIN, context.getBean(RSocketStrategies.class)); } @AfterClass public static void tearDownOnce() { - client.dispose(); + requester.rsocket().dispose(); server.dispose(); }