From a780cad12eb3d109b55b7040b98f0e027b47065f Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 23 Jul 2019 16:42:06 +0100 Subject: [PATCH 1/4] Updates to RSocket[Strategies|Requester] defaults 1. RSocketStrategies hooks in the basic codecs from spring-core by default. Now that we have support for composite metadata, it makes sense to have multiple codecs available. 2. RSocketStrategies is pre-configured with NettyDataBufferFactory. 3. DefaultRSocketRequesterBuilder configures RSocket with a frame decoder that matches the DataBufferFactory choice, i.e. ensuring consistency of zero copy vs default (copy) choice. 4. DefaultRSocketRequesterBuilder now tries to find a single non-basic decoder to select a default data MimeType (e.g. CBOR), or otherwise fall back on the first default decoder (e.g. String). See gh-23314 --- .../DefaultRSocketRequesterBuilder.java | 49 +++-- .../rsocket/DefaultRSocketStrategies.java | 46 ++++- .../messaging/rsocket/RSocketRequester.java | 33 ++- .../messaging/rsocket/RSocketStrategies.java | 53 ++--- .../DefaultRSocketRequesterBuilderTests.java | 190 +++++++++++++++--- 5 files changed, 286 insertions(+), 85 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java index 5a936283b2f..247f66abee2 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java @@ -18,9 +18,9 @@ package org.springframework.messaging.rsocket; import java.net.URI; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.function.Consumer; -import java.util.stream.Stream; import io.rsocket.RSocketFactory; import io.rsocket.frame.decoder.PayloadDecoder; @@ -29,6 +29,9 @@ import io.rsocket.transport.netty.client.TcpClientTransport; import io.rsocket.transport.netty.client.WebsocketClientTransport; import reactor.core.publisher.Mono; +import org.springframework.core.codec.Decoder; +import org.springframework.core.codec.StringDecoder; +import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.MimeType; @@ -110,7 +113,10 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { MimeType dataMimeType = getDataMimeType(rsocketStrategies); rsocketFactory.dataMimeType(dataMimeType.toString()); rsocketFactory.metadataMimeType(this.metadataMimeType.toString()); - rsocketFactory.frameDecoder(PayloadDecoder.ZERO_COPY); + + if (rsocketStrategies.dataBufferFactory() instanceof NettyDataBufferFactory) { + rsocketFactory.frameDecoder(PayloadDecoder.ZERO_COPY); + } this.rsocketFactoryConfigurers.forEach(configurer -> { configurer.configureWithStrategies(rsocketStrategies); @@ -139,16 +145,35 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { if (this.dataMimeType != null) { return this.dataMimeType; } - return Stream - .concat( - strategies.encoders().stream() - .flatMap(encoder -> encoder.getEncodableMimeTypes().stream()), - strategies.decoders().stream() - .flatMap(encoder -> encoder.getDecodableMimeTypes().stream()) - ) - .filter(MimeType::isConcrete) - .findFirst() - .orElseThrow(() -> new IllegalArgumentException("Failed to select data MimeType to use.")); + // Look for non-basic Decoder (e.g. CBOR, Protobuf) + MimeType selected = null; + List> decoders = strategies.decoders(); + for (Decoder candidate : decoders) { + if (!isCoreCodec(candidate) && !candidate.getDecodableMimeTypes().isEmpty()) { + Assert.state(selected == null, + () -> "Cannot select default data MimeType based on configured decoders: " + decoders); + selected = getMimeType(candidate); + } + } + if (selected != null) { + return selected; + } + // Fall back on 1st decoder (e.g. String) + for (Decoder decoder : decoders) { + if (!decoder.getDecodableMimeTypes().isEmpty()) { + return getMimeType(decoder); + } + } + throw new IllegalArgumentException("Failed to select data MimeType to use."); + } + + private static boolean isCoreCodec(Object codec) { + return codec.getClass().getPackage().equals(StringDecoder.class.getPackage()); + } + + private static MimeType getMimeType(Decoder decoder) { + MimeType mimeType = decoder.getDecodableMimeTypes().get(0); + return new MimeType(mimeType, Collections.emptyMap()); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketStrategies.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketStrategies.java index ba72af81019..c16958d7e83 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketStrategies.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketStrategies.java @@ -22,11 +22,21 @@ import java.util.Collections; import java.util.List; import java.util.function.Consumer; +import io.netty.buffer.PooledByteBufAllocator; + import org.springframework.core.ReactiveAdapterRegistry; +import org.springframework.core.codec.ByteArrayDecoder; +import org.springframework.core.codec.ByteArrayEncoder; +import org.springframework.core.codec.ByteBufferDecoder; +import org.springframework.core.codec.ByteBufferEncoder; +import org.springframework.core.codec.CharSequenceEncoder; +import org.springframework.core.codec.DataBufferDecoder; +import org.springframework.core.codec.DataBufferEncoder; import org.springframework.core.codec.Decoder; import org.springframework.core.codec.Encoder; +import org.springframework.core.codec.StringDecoder; import org.springframework.core.io.buffer.DataBufferFactory; -import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -90,18 +100,33 @@ final class DefaultRSocketStrategies implements RSocketStrategies { private ReactiveAdapterRegistry adapterRegistry = ReactiveAdapterRegistry.getSharedInstance(); @Nullable - private DataBufferFactory dataBufferFactory; + private DataBufferFactory bufferFactory; + + + DefaultRSocketStrategiesBuilder() { + + // Order of decoders may be significant for default data MimeType + // selection in RSocketRequester.Builder - public DefaultRSocketStrategiesBuilder() { + this.decoders.add(StringDecoder.allMimeTypes()); + this.decoders.add(new ByteBufferDecoder()); + this.decoders.add(new ByteArrayDecoder()); + this.decoders.add(new DataBufferDecoder()); + + this.encoders.add(CharSequenceEncoder.allMimeTypes()); + this.encoders.add(new ByteBufferEncoder()); + this.encoders.add(new ByteArrayEncoder()); + this.encoders.add(new DataBufferEncoder()); } - public DefaultRSocketStrategiesBuilder(RSocketStrategies other) { + DefaultRSocketStrategiesBuilder(RSocketStrategies other) { this.encoders.addAll(other.encoders()); this.decoders.addAll(other.decoders()); this.adapterRegistry = other.reactiveAdapterRegistry(); - this.dataBufferFactory = other.dataBufferFactory(); + this.bufferFactory = other.dataBufferFactory(); } + @Override public Builder encoder(Encoder... encoders) { this.encoders.addAll(Arrays.asList(encoders)); @@ -135,14 +160,19 @@ final class DefaultRSocketStrategies implements RSocketStrategies { @Override public Builder dataBufferFactory(DataBufferFactory bufferFactory) { - this.dataBufferFactory = bufferFactory; + this.bufferFactory = bufferFactory; return this; } @Override public RSocketStrategies build() { - return new DefaultRSocketStrategies(this.encoders, this.decoders, this.adapterRegistry, - this.dataBufferFactory != null ? this.dataBufferFactory : new DefaultDataBufferFactory()); + return new DefaultRSocketStrategies( + this.encoders, this.decoders, this.adapterRegistry, initBufferFactory()); + } + + private DataBufferFactory initBufferFactory() { + return this.bufferFactory != null ? this.bufferFactory : + new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java index 071e4d481a7..59e9606e2d7 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java @@ -28,6 +28,7 @@ import reactor.core.publisher.Mono; import org.springframework.core.ParameterizedTypeReference; import org.springframework.core.ReactiveAdapterRegistry; +import org.springframework.core.codec.Decoder; import org.springframework.lang.Nullable; import org.springframework.messaging.rsocket.annotation.support.AnnotationClientResponderConfigurer; import org.springframework.util.MimeType; @@ -125,32 +126,32 @@ public interface RSocketRequester { interface Builder { /** - * Configure the MimeType for payload data which is then specified - * on the {@code SETUP} frame and applies to the whole connection. - *

By default this is set to the first concrete mime type supported - * by the configured encoders and decoders. - * @param mimeType the data MimeType to use + * Configure the payload data MimeType to specify on the {@code SETUP} + * frame that applies to the whole connection. + *

If this is not set, the builder will try to select the mime type + * based on the presence of a single + * {@link RSocketStrategies.Builder#decoder(Decoder[]) non-default} + * {@code Decoder}, or the first default decoder otherwise + * (i.e. {@code String}) if no others are configured. */ RSocketRequester.Builder dataMimeType(@Nullable MimeType mimeType); /** - * Configure the MimeType for payload metadata which is then specified - * on the {@code SETUP} frame and applies to the whole connection. + * Configure the payload metadata MimeType to specify on the {@code SETUP} + * frame and applies to the whole connection. *

By default this is set to * {@code "message/x.rsocket.composite-metadata.v0"} in which case the * route, if provided, is encoded as a - * {@code "message/x.rsocket.routing.v0"} metadata entry, potentially - * with other metadata entries added too. If this is set to any other - * mime type, and a route is provided, it is assumed the mime type is - * for the route. - * @param mimeType the data MimeType to use + * {@code "message/x.rsocket.routing.v0"} composite metadata entry. + * For any other MimeType, it is assumed to be the MimeType for the + * route, if provided. */ RSocketRequester.Builder metadataMimeType(MimeType mimeType); /** - * Set the {@link RSocketStrategies} to use for access to encoders, - * decoders, and a factory for {@code DataBuffer's}. - * @param strategies the codecs strategies to use + * Set the {@link RSocketStrategies} to use. + *

By default this is set to {@code RSocketStrategies.builder().build()} + * but may be further customized via {@link #rsocketStrategies(Consumer)}. */ RSocketRequester.Builder rsocketStrategies(@Nullable RSocketStrategies strategies); @@ -159,7 +160,6 @@ public interface RSocketRequester { *

By default this starts out with an empty builder, i.e. * {@link RSocketStrategies#builder()}, but the strategies can also be * set via {@link #rsocketStrategies(RSocketStrategies)}. - * @param configurer the configurer to apply */ RSocketRequester.Builder rsocketStrategies(Consumer configurer); @@ -172,7 +172,6 @@ public interface RSocketRequester { * {@code ClientRSocketFactory}. Use the shortcuts on this builder * instead since the created {@code RSocketRequester} needs to be aware * of those settings. - * @param configurer consumer to customize the factory * @see AnnotationClientResponderConfigurer */ RSocketRequester.Builder rsocketFactory(ClientRSocketFactoryConfigurer configurer); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java index 9e665a3bc75..8e6aa27f17c 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java @@ -19,13 +19,17 @@ package org.springframework.messaging.rsocket; import java.util.List; import java.util.function.Consumer; -import io.netty.buffer.PooledByteBufAllocator; +import io.rsocket.Payload; +import io.rsocket.RSocketFactory.ClientRSocketFactory; +import io.rsocket.RSocketFactory.ServerRSocketFactory; import org.springframework.core.ReactiveAdapterRegistry; import org.springframework.core.ResolvableType; import org.springframework.core.codec.Decoder; import org.springframework.core.codec.Encoder; import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.lang.Nullable; import org.springframework.util.MimeType; @@ -120,24 +124,28 @@ public interface RSocketStrategies { interface Builder { /** - * Add encoders to use for serializing Objects. - *

By default this is empty. + * Append to the list of encoders to use for serializing Objects to the + * data or metadata of a {@link Payload}. + *

By default this is initialized with encoders for {@code String}, + * {@code byte[]}, {@code ByteBuffer}, and {@code DataBuffer}. */ Builder encoder(Encoder... encoder); /** - * Access and manipulate the list of configured {@link #encoder encoders}. + * Apply the consumer to the list of configured encoders, immediately. */ Builder encoders(Consumer>> consumer); /** - * Add decoders for de-serializing Objects. - *

By default this is empty. + * Append to the list of decoders to use for de-serializing Objects from + * the data or metadata of a {@link Payload}. + *

By default this is initialized with decoders for {@code String}, + * {@code byte[]}, {@code ByteBuffer}, and {@code DataBuffer}. */ Builder decoder(Decoder... decoder); /** - * Access and manipulate the list of configured {@link #encoder decoders}. + * Apply the consumer to the list of configured decoders, immediately. */ Builder decoders(Consumer>> consumer); @@ -146,28 +154,23 @@ public interface RSocketStrategies { * to adapt to, and/or determine the semantics of a given * {@link org.reactivestreams.Publisher Publisher}. *

By default this {@link ReactiveAdapterRegistry#getSharedInstance()}. - * @param registry the registry to use */ Builder reactiveAdapterStrategy(ReactiveAdapterRegistry registry); /** - * Configure the DataBufferFactory to use for allocating buffers, for - * example when preparing requests or when responding. The choice here - * must be aligned with the frame decoder configured in - * {@link io.rsocket.RSocketFactory}. - *

By default this property is an instance of - * {@link org.springframework.core.io.buffer.DefaultDataBufferFactory - * DefaultDataBufferFactory} matching to the default frame decoder in - * {@link io.rsocket.RSocketFactory} which copies the payload. This - * comes at cost to performance but does not require reference counting - * and eliminates possibility for memory leaks. - *

To switch to a zero-copy strategy, - * configure RSocket - * accordingly, and then configure this property with an instance of - * {@link org.springframework.core.io.buffer.NettyDataBufferFactory - * NettyDataBufferFactory} with a pooled allocator such as - * {@link PooledByteBufAllocator#DEFAULT}. - * @param bufferFactory the DataBufferFactory to use + * Configure the DataBufferFactory to use for allocating buffers when + * preparing requests or creating responses. + *

By default this is set to {@link NettyDataBufferFactory} with + * pooled, allocated buffers for zero copy. RSocket must also be + * configured + * for zero copy. For client setup, {@link RSocketRequester.Builder} + * adapts automatically to the {@code DataBufferFactory} configured + * here, and sets the frame decoder in {@link ClientRSocketFactory + * ClientRSocketFactory} accordingly. For server setup, the + * {@link ServerRSocketFactory ServerRSocketFactory} must be configured + * accordingly too for zero copy. + *

If using {@link DefaultDataBufferFactory} instead, there is no + * need for related config changes in RSocket. */ Builder dataBufferFactory(DataBufferFactory bufferFactory); diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java index 14d7d05ec7f..bd497b58609 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java @@ -17,11 +17,16 @@ package org.springframework.messaging.rsocket; import java.lang.reflect.Field; +import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.function.Consumer; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.rsocket.DuplexConnection; import io.rsocket.RSocketFactory; +import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.transport.ClientTransport; import org.junit.Before; import org.junit.Test; @@ -29,13 +34,19 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import org.springframework.core.codec.CharSequenceEncoder; -import org.springframework.core.codec.StringDecoder; +import org.springframework.core.ResolvableType; +import org.springframework.core.codec.Decoder; +import org.springframework.core.codec.DecodingException; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.util.MimeType; import org.springframework.util.MimeTypeUtils; import org.springframework.util.ReflectionUtils; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.BDDMockito.given; @@ -52,6 +63,8 @@ public class DefaultRSocketRequesterBuilderTests { private ClientTransport transport; + private final TestRSocketFactoryConfigurer rsocketFactoryConfigurer = new TestRSocketFactoryConfigurer(); + @Before public void setup() { @@ -63,53 +76,113 @@ public class DefaultRSocketRequesterBuilderTests { @Test @SuppressWarnings("unchecked") public void shouldApplyCustomizationsAtSubscription() { - ClientRSocketFactoryConfigurer factoryConfigurer = mock(ClientRSocketFactoryConfigurer.class); Consumer strategiesConfigurer = mock(Consumer.class); RSocketRequester.builder() - .rsocketFactory(factoryConfigurer) + .rsocketFactory(this.rsocketFactoryConfigurer) .rsocketStrategies(strategiesConfigurer) .connect(this.transport); - verifyZeroInteractions(this.transport, factoryConfigurer, strategiesConfigurer); + + verifyZeroInteractions(this.transport); + assertThat(this.rsocketFactoryConfigurer.rsocketFactory()).isNull(); } @Test @SuppressWarnings("unchecked") public void shouldApplyCustomizations() { - RSocketStrategies strategies = RSocketStrategies.builder() - .encoder(CharSequenceEncoder.allMimeTypes()) - .decoder(StringDecoder.allMimeTypes()) - .build(); - ClientRSocketFactoryConfigurer factoryConfigurer = mock(ClientRSocketFactoryConfigurer.class); - Consumer strategiesConfigurer = mock(Consumer.class); + Consumer rsocketStrategiesConfigurer = mock(Consumer.class); RSocketRequester.builder() - .rsocketStrategies(strategies) - .rsocketFactory(factoryConfigurer) - .rsocketStrategies(strategiesConfigurer) + .rsocketFactory(this.rsocketFactoryConfigurer) + .rsocketStrategies(rsocketStrategiesConfigurer) .connect(this.transport) .block(); + + // RSocketStrategies and RSocketFactory configurers should have been called + verify(this.transport).connect(anyInt()); - verify(factoryConfigurer).configureWithStrategies(any(RSocketStrategies.class)); - verify(factoryConfigurer).configure(any(RSocketFactory.ClientRSocketFactory.class)); - verify(strategiesConfigurer).accept(any(RSocketStrategies.Builder.class)); + verify(rsocketStrategiesConfigurer).accept(any(RSocketStrategies.Builder.class)); + assertThat(this.rsocketFactoryConfigurer.rsocketStrategies()).isNotNull(); + assertThat(this.rsocketFactoryConfigurer.rsocketFactory()).isNotNull(); + } + + @Test + public void defaultDataMimeType() { + RSocketRequester requester = RSocketRequester.builder() + .connect(this.transport) + .block(); + + assertThat(requester.dataMimeType()) + .as("Default data MimeType, based on the first configured Decoder") + .isEqualTo(MimeTypeUtils.TEXT_PLAIN); } @Test - public void dataMimeType() throws NoSuchFieldException { + public void defaultDataMimeTypeWithCustomDecoderRegitered() { RSocketStrategies strategies = RSocketStrategies.builder() - .encoder(CharSequenceEncoder.allMimeTypes()) - .decoder(StringDecoder.allMimeTypes()) + .decoder(new TestJsonDecoder(MimeTypeUtils.APPLICATION_JSON)) .build(); RSocketRequester requester = RSocketRequester.builder() .rsocketStrategies(strategies) + .connect(this.transport) + .block(); + + assertThat(requester.dataMimeType()) + .as("Default data MimeType, based on the first configured, non-default Decoder") + .isEqualTo(MimeTypeUtils.APPLICATION_JSON); + } + + @Test + public void defaultDataMimeTypeWithMultipleCustomDecoderRegitered() { + RSocketStrategies strategies = RSocketStrategies.builder() + .decoder(new TestJsonDecoder(MimeTypeUtils.APPLICATION_JSON)) + .decoder(new TestJsonDecoder(MimeTypeUtils.APPLICATION_XML)) + .build(); + + assertThatThrownBy(() -> + RSocketRequester + .builder() + .rsocketStrategies(strategies) + .connect(this.transport) + .block()) + .hasMessageContaining("Cannot select default data MimeType"); + } + + @Test + public void dataMimeTypeSet() { + RSocketRequester requester = RSocketRequester.builder() .dataMimeType(MimeTypeUtils.APPLICATION_JSON) .connect(this.transport) .block(); - Field field = DefaultRSocketRequester.class.getDeclaredField("dataMimeType"); + assertThat(requester.dataMimeType()).isEqualTo(MimeTypeUtils.APPLICATION_JSON); + } + + @Test + public void frameDecoderMatchesDataBufferFactory() throws Exception { + testFrameDecoder(new NettyDataBufferFactory(ByteBufAllocator.DEFAULT), PayloadDecoder.ZERO_COPY); + testFrameDecoder(new DefaultDataBufferFactory(), PayloadDecoder.DEFAULT); + } + + private void testFrameDecoder(DataBufferFactory bufferFactory, PayloadDecoder frameDecoder) + throws NoSuchFieldException { + + RSocketStrategies strategies = RSocketStrategies.builder() + .dataBufferFactory(bufferFactory) + .build(); + + RSocketRequester.builder() + .rsocketStrategies(strategies) + .rsocketFactory(this.rsocketFactoryConfigurer) + .connect(this.transport) + .block(); + + RSocketFactory.ClientRSocketFactory factory = this.rsocketFactoryConfigurer.rsocketFactory(); + assertThat(factory).isNotNull(); + + Field field = RSocketFactory.ClientRSocketFactory.class.getDeclaredField("payloadDecoder"); ReflectionUtils.makeAccessible(field); - MimeType dataMimeType = (MimeType) ReflectionUtils.getField(field, requester); - assertThat(dataMimeType).isEqualTo(MimeTypeUtils.APPLICATION_JSON); + PayloadDecoder decoder = (PayloadDecoder) ReflectionUtils.getField(field, factory); + assertThat(decoder).isSameAs(frameDecoder); } @@ -135,4 +208,75 @@ public class DefaultRSocketRequesterBuilderTests { } } + + static class TestRSocketFactoryConfigurer implements ClientRSocketFactoryConfigurer { + + private RSocketStrategies strategies; + + private RSocketFactory.ClientRSocketFactory rsocketFactory; + + + public RSocketStrategies rsocketStrategies() { + return this.strategies; + } + + public RSocketFactory.ClientRSocketFactory rsocketFactory() { + return this.rsocketFactory; + } + + + @Override + public void configureWithStrategies(RSocketStrategies strategies) { + this.strategies = strategies; + } + + @Override + public void configure(RSocketFactory.ClientRSocketFactory rsocketFactory) { + this.rsocketFactory = rsocketFactory; + } + } + + + static class TestJsonDecoder implements Decoder { + + private final MimeType mimeType; + + + TestJsonDecoder(MimeType mimeType) { + this.mimeType = mimeType; + } + + @Override + public List getDecodableMimeTypes() { + return Collections.singletonList(this.mimeType); + } + + @Override + public boolean canDecode(ResolvableType elementType, MimeType mimeType) { + return false; + } + + @Override + public Mono decodeToMono(Publisher inputStream, ResolvableType elementType, + MimeType mimeType, Map hints) { + + throw new UnsupportedOperationException(); + } + + @Override + public Flux decode(Publisher inputStream, ResolvableType elementType, + MimeType mimeType, Map hints) { + + throw new UnsupportedOperationException(); + } + + + @Override + public Object decode(DataBuffer buffer, ResolvableType targetType, MimeType mimeType, + Map hints) throws DecodingException { + + throw new UnsupportedOperationException(); + } + } + } From 91b040d0bf05c857c022a263b3c076a3020c3875 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 24 Jul 2019 10:25:59 +0100 Subject: [PATCH 2/4] Add responder strategies to RSocketStrategies RouteMatcher and MetadataExtractor can now be configured on and accessed through RSocketStrategies. This simplifies configuration for client and server responders. See gh-23314 --- .../MessageMappingMessageHandler.java | 12 +++- .../rsocket/DefaultRSocketStrategies.java | 70 ++++++++++++++++--- .../messaging/rsocket/RSocketStrategies.java | 40 ++++++++++- .../AnnotationClientResponderConfigurer.java | 32 --------- .../support/RSocketMessageHandler.java | 43 +++++++----- 5 files changed, 134 insertions(+), 63 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/reactive/MessageMappingMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/reactive/MessageMappingMessageHandler.java index 584bb9ca593..1065939b41f 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/reactive/MessageMappingMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/reactive/MessageMappingMessageHandler.java @@ -88,6 +88,7 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler AnnotatedElementUtils.hasAnnotation(type, Controller.class)); } @@ -150,7 +148,9 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler> decoders; - private final ReactiveAdapterRegistry adapterRegistry; + private final RouteMatcher routeMatcher; + + private final MetadataExtractor metadataExtractor; private final DataBufferFactory bufferFactory; + private final ReactiveAdapterRegistry adapterRegistry; + private DefaultRSocketStrategies(List> encoders, List> decoders, - ReactiveAdapterRegistry adapterRegistry, DataBufferFactory bufferFactory) { + RouteMatcher routeMatcher, MetadataExtractor metadataExtractor, + DataBufferFactory bufferFactory, ReactiveAdapterRegistry adapterRegistry) { this.encoders = Collections.unmodifiableList(encoders); this.decoders = Collections.unmodifiableList(decoders); - this.adapterRegistry = adapterRegistry; + this.routeMatcher = routeMatcher; + this.metadataExtractor = metadataExtractor; this.bufferFactory = bufferFactory; + this.adapterRegistry = adapterRegistry; } @@ -78,8 +89,13 @@ final class DefaultRSocketStrategies implements RSocketStrategies { } @Override - public ReactiveAdapterRegistry reactiveAdapterRegistry() { - return this.adapterRegistry; + public RouteMatcher routeMatcher() { + return this.routeMatcher; + } + + @Override + public MetadataExtractor metadataExtractor() { + return this.metadataExtractor; } @Override @@ -87,6 +103,11 @@ final class DefaultRSocketStrategies implements RSocketStrategies { return this.bufferFactory; } + @Override + public ReactiveAdapterRegistry reactiveAdapterRegistry() { + return this.adapterRegistry; + } + /** * Default RSocketStrategies.Builder implementation. @@ -97,6 +118,12 @@ final class DefaultRSocketStrategies implements RSocketStrategies { private final List> decoders = new ArrayList<>(); + @Nullable + private RouteMatcher routeMatcher; + + @Nullable + private MetadataExtractor metadataExtractor; + private ReactiveAdapterRegistry adapterRegistry = ReactiveAdapterRegistry.getSharedInstance(); @Nullable @@ -151,6 +178,18 @@ final class DefaultRSocketStrategies implements RSocketStrategies { return this; } + @Override + public Builder routeMatcher(RouteMatcher routeMatcher) { + this.routeMatcher = routeMatcher; + return this; + } + + @Override + public Builder metadataExtractor(MetadataExtractor metadataExtractor) { + this.metadataExtractor = metadataExtractor; + return this; + } + @Override public Builder reactiveAdapterStrategy(ReactiveAdapterRegistry registry) { Assert.notNull(registry, "ReactiveAdapterRegistry is required"); @@ -167,12 +206,27 @@ final class DefaultRSocketStrategies implements RSocketStrategies { @Override public RSocketStrategies build() { return new DefaultRSocketStrategies( - this.encoders, this.decoders, this.adapterRegistry, initBufferFactory()); + this.encoders, this.decoders, + this.routeMatcher != null ? this.routeMatcher : initRouteMatcher(), + this.metadataExtractor != null ? this.metadataExtractor : initMetadataExtractor(), + this.bufferFactory != null ? this.bufferFactory : initBufferFactory(), + this.adapterRegistry); + } + + private RouteMatcher initRouteMatcher() { + AntPathMatcher pathMatcher = new AntPathMatcher(); + pathMatcher.setPathSeparator("."); + return new SimpleRouteMatcher(pathMatcher); + } + + private MetadataExtractor initMetadataExtractor() { + DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(); + extractor.metadataToExtract(MimeTypeUtils.TEXT_PLAIN, String.class, MetadataExtractor.ROUTE_KEY); + return extractor; } private DataBufferFactory initBufferFactory() { - return this.bufferFactory != null ? this.bufferFactory : - new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT); + return new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java index 8e6aa27f17c..6c1572aaa8b 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java @@ -31,7 +31,10 @@ import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.lang.Nullable; +import org.springframework.util.AntPathMatcher; import org.springframework.util.MimeType; +import org.springframework.util.RouteMatcher; +import org.springframework.util.SimpleRouteMatcher; /** * Access to strategies for use by RSocket requester and responder components. @@ -90,10 +93,14 @@ public interface RSocketStrategies { } /** - * Return the configured - * {@link Builder#reactiveAdapterStrategy(ReactiveAdapterRegistry) reactiveAdapterRegistry}. + * Return the configured {@link Builder#routeMatcher(RouteMatcher)}. */ - ReactiveAdapterRegistry reactiveAdapterRegistry(); + RouteMatcher routeMatcher(); + + /** + * Return the configured {@link Builder#metadataExtractor(MetadataExtractor)}. + */ + MetadataExtractor metadataExtractor(); /** * Return the configured @@ -101,6 +108,12 @@ public interface RSocketStrategies { */ DataBufferFactory dataBufferFactory(); + /** + * Return the configured + * {@link Builder#reactiveAdapterStrategy(ReactiveAdapterRegistry) reactiveAdapterRegistry}. + */ + ReactiveAdapterRegistry reactiveAdapterRegistry(); + /** * Return a builder to build a new {@code RSocketStrategies} instance. @@ -149,6 +162,27 @@ public interface RSocketStrategies { */ Builder decoders(Consumer>> consumer); + /** + * Configure a {@code RouteMatcher} for matching routes to message + * handlers based on route patterns. This option is applicable to + * client or server responders. + *

By default, {@link SimpleRouteMatcher} is used, backed by + * {@link AntPathMatcher} with "." as separator. For better + * efficiency consider using the {@code PathPatternRouteMatcher} from + * {@code spring-web} instead. + */ + Builder routeMatcher(RouteMatcher routeMatcher); + + /** + * Configure a {@link MetadataExtractor} to extract the route along with + * other metadata. This option is applicable to client or server + * responders. + *

By default this is {@link DefaultMetadataExtractor} extracting a + * route from {@code "message/x.rsocket.routing.v0"} or + * {@code "text/plain"} metadata entries. + */ + Builder metadataExtractor(MetadataExtractor metadataExtractor); + /** * Configure the registry for reactive type support. This can be used to * to adapt to, and/or determine the semantics of a given diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/AnnotationClientResponderConfigurer.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/AnnotationClientResponderConfigurer.java index b15e35e50ea..aadc3405abc 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/AnnotationClientResponderConfigurer.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/AnnotationClientResponderConfigurer.java @@ -25,10 +25,8 @@ import io.rsocket.RSocketFactory; import org.springframework.beans.BeanUtils; import org.springframework.lang.Nullable; import org.springframework.messaging.rsocket.ClientRSocketFactoryConfigurer; -import org.springframework.messaging.rsocket.MetadataExtractor; import org.springframework.messaging.rsocket.RSocketStrategies; import org.springframework.util.Assert; -import org.springframework.util.RouteMatcher; /** * {@link ClientRSocketFactoryConfigurer} to configure and plug in a responder @@ -44,12 +42,6 @@ public final class AnnotationClientResponderConfigurer implements ClientRSocketF private final List handlers = new ArrayList<>(); - @Nullable - private RouteMatcher routeMatcher; - - @Nullable - private MetadataExtractor extractor; - @Nullable private RSocketStrategies strategies; @@ -61,24 +53,6 @@ public final class AnnotationClientResponderConfigurer implements ClientRSocketF } - /** - * Configure the {@link RouteMatcher} to use. This is used to set - * {@link RSocketMessageHandler#setRouteMatcher(RouteMatcher)}. - */ - public AnnotationClientResponderConfigurer routeMatcher(RouteMatcher routeMatcher) { - this.routeMatcher = routeMatcher; - return this; - } - - /** - * Configure the {@link MetadataExtractor} to use. This is used to set - * {@link RSocketMessageHandler#setMetadataExtractor(MetadataExtractor)}. - */ - public AnnotationClientResponderConfigurer metadataExtractor(MetadataExtractor extractor) { - this.extractor = extractor; - return this; - } - /** * Configure handlers to detect {@code @MessasgeMapping} handler methods on. * This is used to set {@link RSocketMessageHandler#setHandlers(List)}. @@ -101,12 +75,6 @@ public final class AnnotationClientResponderConfigurer implements ClientRSocketF Assert.notEmpty(this.handlers, "No handlers"); RSocketMessageHandler messageHandler = new RSocketMessageHandler(); messageHandler.setHandlers(this.handlers); - if (this.routeMatcher != null) { - messageHandler.setRouteMatcher(this.routeMatcher); - } - if (this.extractor != null) { - messageHandler.setMetadataExtractor(this.extractor); - } messageHandler.setRSocketStrategies(this.strategies); messageHandler.afterPropertiesSet(); factory.acceptor(messageHandler.clientResponder()); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandler.java index 7f7ba9d9a1e..a595e735667 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandler.java @@ -114,10 +114,16 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler { /** * Provide configuration in the form of {@link RSocketStrategies} instance * which can also be re-used to initialize a client-side - * {@link RSocketRequester}. When this property is set, it also sets - * {@link #setDecoders(List) decoders}, {@link #setEncoders(List) encoders}, - * and {@link #setReactiveAdapterRegistry(ReactiveAdapterRegistry) - * reactiveAdapterRegistry}. + * {@link RSocketRequester}. + *

When this is set, in turn it sets the following: + *

    + *
  • {@link #setDecoders(List)} + *
  • {@link #setEncoders(List)} + *
  • {@link #setRouteMatcher(RouteMatcher)} + *
  • {@link #setMetadataExtractor(MetadataExtractor)} + *
  • {@link #setReactiveAdapterRegistry(ReactiveAdapterRegistry)} + *
+ *

By default if this is not set, it is initialized from the above. */ public void setRSocketStrategies(@Nullable RSocketStrategies rsocketStrategies) { this.rsocketStrategies = rsocketStrategies; @@ -138,12 +144,10 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler { } /** - * Configure a {@link MetadataExtractor} to extract the route and possibly - * other metadata from the first payload of incoming requests. - *

By default this is a {@link DefaultMetadataExtractor} with the - * configured {@link RSocketStrategies} (and decoders), extracting a route - * from {@code "message/x.rsocket.routing.v0"} or {@code "text/plain"} - * metadata entries. + * Configure a {@link MetadataExtractor} to extract the route along with + * other metadata. + *

By default this is {@link DefaultMetadataExtractor} extracting a + * route from {@code "message/x.rsocket.routing.v0"} or {@code "text/plain"}. * @param extractor the extractor to use */ public void setMetadataExtractor(MetadataExtractor extractor) { @@ -200,20 +204,25 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler { @Override public void afterPropertiesSet() { + + getArgumentResolverConfigurer().addCustomResolver(new RSocketRequesterMethodArgumentResolver()); + super.afterPropertiesSet(); + + if (getMetadataExtractor() == null) { + DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(); + extractor.metadataToExtract(MimeTypeUtils.TEXT_PLAIN, String.class, MetadataExtractor.ROUTE_KEY); + setMetadataExtractor(extractor); + } + if (this.rsocketStrategies == null) { this.rsocketStrategies = RSocketStrategies.builder() .decoder(getDecoders().toArray(new Decoder[0])) .encoder(getEncoders().toArray(new Encoder[0])) + .routeMatcher(getRouteMatcher()) + .metadataExtractor(getMetadataExtractor()) .reactiveAdapterStrategy(getReactiveAdapterRegistry()) .build(); } - if (this.metadataExtractor == null) { - DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(); - extractor.metadataToExtract(MimeTypeUtils.TEXT_PLAIN, String.class, MetadataExtractor.ROUTE_KEY); - this.metadataExtractor = extractor; - } - getArgumentResolverConfigurer().addCustomResolver(new RSocketRequesterMethodArgumentResolver()); - super.afterPropertiesSet(); } @Override From c456950bc3d2611e34154afc87193b3cc9e1f7a8 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 24 Jul 2019 10:37:21 +0100 Subject: [PATCH 3/4] Add create shortcut to RSocketStrategies Now that RSocketStrategies has default settings it makes sense to have a create() shortcut vs builder().build(). This commit also updates tests to take advantage of improvements in this and the previous two commits. See gh-23314 --- .../messaging/rsocket/RSocketStrategies.java | 24 ++++++++++++------ .../rsocket/RSocketBufferLeakTests.java | 15 +++++------ ...RSocketClientToServerIntegrationTests.java | 17 +++++-------- ...RSocketServerToClientIntegrationTests.java | 25 ++++++------------- 4 files changed, 37 insertions(+), 44 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java index 6c1572aaa8b..92eba2b58c6 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java @@ -114,20 +114,30 @@ public interface RSocketStrategies { */ ReactiveAdapterRegistry reactiveAdapterRegistry(); + /** + * Return a builder to create a new {@link RSocketStrategies} instance + * replicated from the current instance. + */ + default Builder mutate() { + return new DefaultRSocketStrategies.DefaultRSocketStrategiesBuilder(this); + } + /** - * Return a builder to build a new {@code RSocketStrategies} instance. + * Create an {@code RSocketStrategies} instance with default settings. + * Equivalent to {@code RSocketStrategies.builder().build()}. */ - static Builder builder() { - return new DefaultRSocketStrategies.DefaultRSocketStrategiesBuilder(); + static RSocketStrategies create() { + return new DefaultRSocketStrategies.DefaultRSocketStrategiesBuilder().build(); } /** - * Return a builder to create a new {@link RSocketStrategies} instance - * replicated from the current instance. + * Return a builder to build a new {@code RSocketStrategies} instance. + * The builder applies default settings, see individual builder methods for + * details. */ - default Builder mutate() { - return new DefaultRSocketStrategies.DefaultRSocketStrategiesBuilder(this); + static Builder builder() { + return new DefaultRSocketStrategies.DefaultRSocketStrategiesBuilder(); } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java index d95218707d4..76be98cef14 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java @@ -26,6 +26,7 @@ import io.netty.util.ReferenceCounted; import io.rsocket.AbstractRSocket; import io.rsocket.RSocket; import io.rsocket.RSocketFactory; +import io.rsocket.SocketAcceptor; import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.plugins.RSocketInterceptor; import io.rsocket.transport.netty.server.CloseableChannel; @@ -44,8 +45,6 @@ import reactor.test.StepVerifier; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.core.codec.CharSequenceEncoder; -import org.springframework.core.codec.StringDecoder; import org.springframework.core.io.Resource; import org.springframework.messaging.handler.annotation.MessageExceptionHandler; import org.springframework.messaging.handler.annotation.MessageMapping; @@ -74,21 +73,21 @@ public class RSocketBufferLeakTests { @BeforeClass @SuppressWarnings("ConstantConditions") public static void setupOnce() { + context = new AnnotationConfigApplicationContext(ServerConfig.class); + RSocketMessageHandler messageHandler = context.getBean(RSocketMessageHandler.class); + SocketAcceptor responder = messageHandler.serverResponder(); server = RSocketFactory.receive() .frameDecoder(PayloadDecoder.ZERO_COPY) .addResponderPlugin(payloadInterceptor) // intercept responding - .acceptor(context.getBean(RSocketMessageHandler.class).serverResponder()) + .acceptor(responder) .transport(TcpServerTransport.create("localhost", 7000)) .start() .block(); requester = RSocketRequester.builder() - .rsocketFactory(factory -> { - factory.frameDecoder(PayloadDecoder.ZERO_COPY); - factory.addRequesterPlugin(payloadInterceptor); // intercept outgoing requests - }) + .rsocketFactory(factory -> factory.addRequesterPlugin(payloadInterceptor)) .rsocketStrategies(context.getBean(RSocketStrategies.class)) .connectTcp("localhost", 7000) .block(); @@ -215,8 +214,6 @@ public class RSocketBufferLeakTests { @Bean public RSocketStrategies rsocketStrategies() { return RSocketStrategies.builder() - .decoder(StringDecoder.allMimeTypes()) - .encoder(CharSequenceEncoder.allMimeTypes()) .dataBufferFactory(new LeakAwareNettyDataBufferFactory(PooledByteBufAllocator.DEFAULT)) .build(); } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java index e6e4e1f2d66..58f807d6c61 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java @@ -18,8 +18,8 @@ package org.springframework.messaging.rsocket; import java.time.Duration; -import io.netty.buffer.PooledByteBufAllocator; import io.rsocket.RSocketFactory; +import io.rsocket.SocketAcceptor; import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.transport.netty.server.CloseableChannel; import io.rsocket.transport.netty.server.TcpServerTransport; @@ -34,9 +34,6 @@ import reactor.test.StepVerifier; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.core.codec.CharSequenceEncoder; -import org.springframework.core.codec.StringDecoder; -import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.messaging.handler.annotation.MessageExceptionHandler; import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler; @@ -63,18 +60,20 @@ public class RSocketClientToServerIntegrationTests { @BeforeClass @SuppressWarnings("ConstantConditions") public static void setupOnce() { + context = new AnnotationConfigApplicationContext(ServerConfig.class); + RSocketMessageHandler messageHandler = context.getBean(RSocketMessageHandler.class); + SocketAcceptor responder = messageHandler.serverResponder(); server = RSocketFactory.receive() .addResponderPlugin(interceptor) .frameDecoder(PayloadDecoder.ZERO_COPY) - .acceptor(context.getBean(RSocketMessageHandler.class).serverResponder()) + .acceptor(responder) .transport(TcpServerTransport.create("localhost", 7000)) .start() .block(); requester = RSocketRequester.builder() - .rsocketFactory(factory -> factory.frameDecoder(PayloadDecoder.ZERO_COPY)) .rsocketStrategies(context.getBean(RSocketStrategies.class)) .connectTcp("localhost", 7000) .block(); @@ -266,11 +265,7 @@ public class RSocketClientToServerIntegrationTests { @Bean public RSocketStrategies rsocketStrategies() { - return RSocketStrategies.builder() - .decoder(StringDecoder.allMimeTypes()) - .encoder(CharSequenceEncoder.allMimeTypes()) - .dataBufferFactory(new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT)) - .build(); + return RSocketStrategies.create(); } } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java index db661510008..c153112436d 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java @@ -18,8 +18,8 @@ package org.springframework.messaging.rsocket; import java.time.Duration; -import io.netty.buffer.PooledByteBufAllocator; import io.rsocket.RSocketFactory; +import io.rsocket.SocketAcceptor; import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.transport.netty.server.CloseableChannel; import io.rsocket.transport.netty.server.TcpServerTransport; @@ -37,9 +37,6 @@ import reactor.test.StepVerifier; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.core.codec.CharSequenceEncoder; -import org.springframework.core.codec.StringDecoder; -import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.messaging.rsocket.annotation.ConnectMapping; import org.springframework.messaging.rsocket.annotation.support.AnnotationClientResponderConfigurer; @@ -62,11 +59,14 @@ public class RSocketServerToClientIntegrationTests { @BeforeClass @SuppressWarnings("ConstantConditions") public static void setupOnce() { + context = new AnnotationConfigApplicationContext(RSocketConfig.class); + RSocketMessageHandler messageHandler = context.getBean(RSocketMessageHandler.class); + SocketAcceptor responder = messageHandler.serverResponder(); server = RSocketFactory.receive() .frameDecoder(PayloadDecoder.ZERO_COPY) - .acceptor(context.getBean(RSocketMessageHandler.class).serverResponder()) + .acceptor(responder) .transport(TcpServerTransport.create("localhost", 0)) .start() .block(); @@ -103,21 +103,16 @@ public class RSocketServerToClientIntegrationTests { ServerController serverController = context.getBean(ServerController.class); serverController.reset(); - RSocketStrategies strategies = context.getBean(RSocketStrategies.class); RSocketRequester requester = null; try { - ClientRSocketFactoryConfigurer responderConfigurer = - AnnotationClientResponderConfigurer.withHandlers(new ClientHandler()); - requester = RSocketRequester.builder() .rsocketFactory(factory -> { factory.metadataMimeType("text/plain"); factory.setupPayload(ByteBufPayload.create("", connectionRoute)); - factory.frameDecoder(PayloadDecoder.ZERO_COPY); }) - .rsocketFactory(responderConfigurer) - .rsocketStrategies(strategies) + .rsocketFactory(AnnotationClientResponderConfigurer.withHandlers(new ClientHandler())) + .rsocketStrategies(context.getBean(RSocketStrategies.class)) .connectTcp("localhost", server.address().getPort()) .block(); @@ -268,11 +263,7 @@ public class RSocketServerToClientIntegrationTests { @Bean public RSocketStrategies rsocketStrategies() { - return RSocketStrategies.builder() - .decoder(StringDecoder.allMimeTypes()) - .encoder(CharSequenceEncoder.allMimeTypes()) - .dataBufferFactory(new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT)) - .build(); + return RSocketStrategies.create(); } } From e19e36ae4c65d4d340178808864a1b67a1139ec5 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 24 Jul 2019 11:13:44 +0100 Subject: [PATCH 4/4] Simplify RSocket client responder config Now that responder RSocketStrategies also exposes responder strategies, AnnotationClientResponderConfigurer is reduced and no longer needs to be public. This commit folds it into RSocketMessageHandler as a nested class and exposes it as a ClientRSocketFactoryConfigurer through a static method that accepts the handlers to use. Effectively a shortcut for creating RSocketMessageHandler, giving it RSocketStrategies, calling afterPropertiesSet, and then the instance createResponder. See gh-23314 --- .../messaging/rsocket/RSocketRequester.java | 9 +- .../AnnotationClientResponderConfigurer.java | 101 ------------------ .../support/RSocketMessageHandler.java | 45 +++++++- ...RSocketServerToClientIntegrationTests.java | 3 +- 4 files changed, 50 insertions(+), 108 deletions(-) delete mode 100644 spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/AnnotationClientResponderConfigurer.java diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java index 59e9606e2d7..ef9643632f2 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java @@ -30,7 +30,7 @@ import org.springframework.core.ParameterizedTypeReference; import org.springframework.core.ReactiveAdapterRegistry; import org.springframework.core.codec.Decoder; import org.springframework.lang.Nullable; -import org.springframework.messaging.rsocket.annotation.support.AnnotationClientResponderConfigurer; +import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler; import org.springframework.util.MimeType; /** @@ -165,14 +165,15 @@ public interface RSocketRequester { /** * Callback to configure the {@code ClientRSocketFactory} directly. - *

See {@link AnnotationClientResponderConfigurer} for configuring a - * client side responder. + *

See static factory method + * {@link RSocketMessageHandler#clientResponder(Object...)} for + * configuring a client side responder with annotated methods. *

Note: Do not set {@link #dataMimeType(MimeType)} * and {@link #metadataMimeType(MimeType)} directly on the * {@code ClientRSocketFactory}. Use the shortcuts on this builder * instead since the created {@code RSocketRequester} needs to be aware * of those settings. - * @see AnnotationClientResponderConfigurer + * @see RSocketMessageHandler#clientResponder(Object...) */ RSocketRequester.Builder rsocketFactory(ClientRSocketFactoryConfigurer configurer); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/AnnotationClientResponderConfigurer.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/AnnotationClientResponderConfigurer.java deleted file mode 100644 index aadc3405abc..00000000000 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/AnnotationClientResponderConfigurer.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Copyright 2002-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.messaging.rsocket.annotation.support; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -import io.rsocket.RSocketFactory; - -import org.springframework.beans.BeanUtils; -import org.springframework.lang.Nullable; -import org.springframework.messaging.rsocket.ClientRSocketFactoryConfigurer; -import org.springframework.messaging.rsocket.RSocketStrategies; -import org.springframework.util.Assert; - -/** - * {@link ClientRSocketFactoryConfigurer} to configure and plug in a responder - * that handles requests via annotated handler methods. Effectively a thin layer over - * {@link RSocketMessageHandler} that provides a programmatic way to configure - * it and obtain a responder via {@link RSocketMessageHandler#clientResponder() - * clientResponder()}. - * - * @author Rossen Stoyanchev - * @since 5.2 - */ -public final class AnnotationClientResponderConfigurer implements ClientRSocketFactoryConfigurer { - - private final List handlers = new ArrayList<>(); - - @Nullable - private RSocketStrategies strategies; - - - private AnnotationClientResponderConfigurer(List handlers) { - for (Object obj : handlers) { - this.handlers.add(obj instanceof Class ? BeanUtils.instantiateClass((Class) obj) : obj); - } - } - - - /** - * Configure handlers to detect {@code @MessasgeMapping} handler methods on. - * This is used to set {@link RSocketMessageHandler#setHandlers(List)}. - */ - public AnnotationClientResponderConfigurer handlers(Object... handlers) { - this.handlers.addAll(Arrays.asList(handlers)); - return this; - } - - - // Implementation of ClientRSocketFactoryConfigurer - - @Override - public void configureWithStrategies(RSocketStrategies strategies) { - this.strategies = strategies; - } - - @Override - public void configure(RSocketFactory.ClientRSocketFactory factory) { - Assert.notEmpty(this.handlers, "No handlers"); - RSocketMessageHandler messageHandler = new RSocketMessageHandler(); - messageHandler.setHandlers(this.handlers); - messageHandler.setRSocketStrategies(this.strategies); - messageHandler.afterPropertiesSet(); - factory.acceptor(messageHandler.clientResponder()); - } - - - // Static factory methods - - /** - * Create an {@code AnnotationClientResponderConfigurer} with the given handlers - * to check for {@code @MessasgeMapping} handler methods. - */ - public static AnnotationClientResponderConfigurer withHandlers(Object... handlers) { - return new AnnotationClientResponderConfigurer(Arrays.asList(handlers)); - } - - /** - * Create an {@code AnnotationClientResponderConfigurer} to set up further. - */ - public static AnnotationClientResponderConfigurer create() { - return new AnnotationClientResponderConfigurer(Collections.emptyList()); - } - -} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandler.java index a595e735667..e817ea98852 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandler.java @@ -24,10 +24,12 @@ import java.util.function.Function; import io.rsocket.ConnectionSetupPayload; import io.rsocket.RSocket; +import io.rsocket.RSocketFactory; import io.rsocket.SocketAcceptor; import io.rsocket.frame.FrameType; import reactor.core.publisher.Mono; +import org.springframework.beans.BeanUtils; import org.springframework.core.ReactiveAdapterRegistry; import org.springframework.core.annotation.AnnotatedElementUtils; import org.springframework.core.codec.Decoder; @@ -40,6 +42,7 @@ import org.springframework.messaging.handler.DestinationPatternsMessageCondition import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.messaging.handler.annotation.reactive.MessageMappingMessageHandler; import org.springframework.messaging.handler.invocation.reactive.HandlerMethodReturnValueHandler; +import org.springframework.messaging.rsocket.ClientRSocketFactoryConfigurer; import org.springframework.messaging.rsocket.DefaultMetadataExtractor; import org.springframework.messaging.rsocket.MetadataExtractor; import org.springframework.messaging.rsocket.RSocketRequester; @@ -324,10 +327,50 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler { Assert.notNull(strategies, "No RSocketStrategies. Was afterPropertiesSet not called?"); RSocketRequester requester = RSocketRequester.wrap(rsocket, dataMimeType, metadataMimeType, strategies); - Assert.notNull(this.metadataExtractor, () -> "No MetadataExtractor. Was afterPropertiesSet not called?"); + Assert.state(this.metadataExtractor != null, + () -> "No MetadataExtractor. Was afterPropertiesSet not called?"); + + Assert.state(getRouteMatcher() != null, + () -> "No RouteMatcher. Was afterPropertiesSet not called?"); return new MessagingRSocket(dataMimeType, metadataMimeType, this.metadataExtractor, requester, this, getRouteMatcher(), strategies); } + + public static ClientRSocketFactoryConfigurer clientResponder(Object... handlers) { + return new ResponderConfigurer(handlers); + } + + + private static final class ResponderConfigurer implements ClientRSocketFactoryConfigurer { + + private final List handlers = new ArrayList<>(); + + @Nullable + private RSocketStrategies strategies; + + + private ResponderConfigurer(Object... handlers) { + Assert.notEmpty(handlers, "No handlers"); + for (Object obj : handlers) { + this.handlers.add(obj instanceof Class ? BeanUtils.instantiateClass((Class) obj) : obj); + } + } + + @Override + public void configureWithStrategies(RSocketStrategies strategies) { + this.strategies = strategies; + } + + @Override + public void configure(RSocketFactory.ClientRSocketFactory factory) { + RSocketMessageHandler handler = new RSocketMessageHandler(); + handler.setHandlers(this.handlers); + handler.setRSocketStrategies(this.strategies); + handler.afterPropertiesSet(); + factory.acceptor(handler.clientResponder()); + } + } + } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java index c153112436d..818ccdd0f5a 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java @@ -39,7 +39,6 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.messaging.rsocket.annotation.ConnectMapping; -import org.springframework.messaging.rsocket.annotation.support.AnnotationClientResponderConfigurer; import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler; import org.springframework.stereotype.Controller; @@ -111,7 +110,7 @@ public class RSocketServerToClientIntegrationTests { factory.metadataMimeType("text/plain"); factory.setupPayload(ByteBufPayload.create("", connectionRoute)); }) - .rsocketFactory(AnnotationClientResponderConfigurer.withHandlers(new ClientHandler())) + .rsocketFactory(RSocketMessageHandler.clientResponder(new ClientHandler())) .rsocketStrategies(context.getBean(RSocketStrategies.class)) .connectTcp("localhost", server.address().getPort()) .block();