Browse Source

Merge branch 'rs'

pull/23353/head
Rossen Stoyanchev 7 years ago
parent
commit
2bb510588d
  1. 12
      spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/reactive/MessageMappingMessageHandler.java
  2. 49
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java
  3. 110
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketStrategies.java
  4. 42
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java
  5. 113
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java
  6. 133
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/AnnotationClientResponderConfigurer.java
  7. 88
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandler.java
  8. 190
      spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java
  9. 15
      spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java
  10. 17
      spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java
  11. 26
      spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java

12
spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/reactive/MessageMappingMessageHandler.java

@ -88,6 +88,7 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler<C @@ -88,6 +88,7 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler<C
@Nullable
private Validator validator;
@Nullable
private RouteMatcher routeMatcher;
private ConversionService conversionService = new DefaultFormattingConversionService();
@ -97,9 +98,6 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler<C @@ -97,9 +98,6 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler<C
public MessageMappingMessageHandler() {
AntPathMatcher pathMatcher = new AntPathMatcher();
pathMatcher.setPathSeparator(".");
this.routeMatcher = new SimpleRouteMatcher(pathMatcher);
setHandlerPredicate(type -> AnnotatedElementUtils.hasAnnotation(type, Controller.class));
}
@ -150,7 +148,9 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler<C @@ -150,7 +148,9 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler<C
/**
* Return the {@code RouteMatcher} used to map messages to handlers.
* May be {@code null} before component is initialized.
*/
@Nullable
public RouteMatcher getRouteMatcher() {
return this.routeMatcher;
}
@ -203,6 +203,12 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler<C @@ -203,6 +203,12 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler<C
resolvers.add(new PayloadMethodArgumentResolver(
getDecoders(), this.validator, getReactiveAdapterRegistry(), true));
if (this.routeMatcher == null) {
AntPathMatcher pathMatcher = new AntPathMatcher();
pathMatcher.setPathSeparator(".");
this.routeMatcher = new SimpleRouteMatcher(pathMatcher);
}
return resolvers;
}

49
spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java

@ -18,9 +18,9 @@ package org.springframework.messaging.rsocket; @@ -18,9 +18,9 @@ package org.springframework.messaging.rsocket;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Stream;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
@ -29,6 +29,9 @@ import io.rsocket.transport.netty.client.TcpClientTransport; @@ -29,6 +29,9 @@ import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import reactor.core.publisher.Mono;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
@ -110,7 +113,10 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { @@ -110,7 +113,10 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
MimeType dataMimeType = getDataMimeType(rsocketStrategies);
rsocketFactory.dataMimeType(dataMimeType.toString());
rsocketFactory.metadataMimeType(this.metadataMimeType.toString());
rsocketFactory.frameDecoder(PayloadDecoder.ZERO_COPY);
if (rsocketStrategies.dataBufferFactory() instanceof NettyDataBufferFactory) {
rsocketFactory.frameDecoder(PayloadDecoder.ZERO_COPY);
}
this.rsocketFactoryConfigurers.forEach(configurer -> {
configurer.configureWithStrategies(rsocketStrategies);
@ -139,16 +145,35 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { @@ -139,16 +145,35 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
if (this.dataMimeType != null) {
return this.dataMimeType;
}
return Stream
.concat(
strategies.encoders().stream()
.flatMap(encoder -> encoder.getEncodableMimeTypes().stream()),
strategies.decoders().stream()
.flatMap(encoder -> encoder.getDecodableMimeTypes().stream())
)
.filter(MimeType::isConcrete)
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Failed to select data MimeType to use."));
// Look for non-basic Decoder (e.g. CBOR, Protobuf)
MimeType selected = null;
List<Decoder<?>> decoders = strategies.decoders();
for (Decoder<?> candidate : decoders) {
if (!isCoreCodec(candidate) && !candidate.getDecodableMimeTypes().isEmpty()) {
Assert.state(selected == null,
() -> "Cannot select default data MimeType based on configured decoders: " + decoders);
selected = getMimeType(candidate);
}
}
if (selected != null) {
return selected;
}
// Fall back on 1st decoder (e.g. String)
for (Decoder<?> decoder : decoders) {
if (!decoder.getDecodableMimeTypes().isEmpty()) {
return getMimeType(decoder);
}
}
throw new IllegalArgumentException("Failed to select data MimeType to use.");
}
private static boolean isCoreCodec(Object codec) {
return codec.getClass().getPackage().equals(StringDecoder.class.getPackage());
}
private static MimeType getMimeType(Decoder<?> decoder) {
MimeType mimeType = decoder.getDecodableMimeTypes().get(0);
return new MimeType(mimeType, Collections.emptyMap());
}
}

110
spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketStrategies.java

@ -22,13 +22,27 @@ import java.util.Collections; @@ -22,13 +22,27 @@ import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import io.netty.buffer.PooledByteBufAllocator;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.codec.ByteArrayDecoder;
import org.springframework.core.codec.ByteArrayEncoder;
import org.springframework.core.codec.ByteBufferDecoder;
import org.springframework.core.codec.ByteBufferEncoder;
import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.codec.DataBufferDecoder;
import org.springframework.core.codec.DataBufferEncoder;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.Encoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.AntPathMatcher;
import org.springframework.util.Assert;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.RouteMatcher;
import org.springframework.util.SimpleRouteMatcher;
/**
* Default, package-private {@link RSocketStrategies} implementation.
@ -42,18 +56,25 @@ final class DefaultRSocketStrategies implements RSocketStrategies { @@ -42,18 +56,25 @@ final class DefaultRSocketStrategies implements RSocketStrategies {
private final List<Decoder<?>> decoders;
private final ReactiveAdapterRegistry adapterRegistry;
private final RouteMatcher routeMatcher;
private final MetadataExtractor metadataExtractor;
private final DataBufferFactory bufferFactory;
private final ReactiveAdapterRegistry adapterRegistry;
private DefaultRSocketStrategies(List<Encoder<?>> encoders, List<Decoder<?>> decoders,
ReactiveAdapterRegistry adapterRegistry, DataBufferFactory bufferFactory) {
RouteMatcher routeMatcher, MetadataExtractor metadataExtractor,
DataBufferFactory bufferFactory, ReactiveAdapterRegistry adapterRegistry) {
this.encoders = Collections.unmodifiableList(encoders);
this.decoders = Collections.unmodifiableList(decoders);
this.adapterRegistry = adapterRegistry;
this.routeMatcher = routeMatcher;
this.metadataExtractor = metadataExtractor;
this.bufferFactory = bufferFactory;
this.adapterRegistry = adapterRegistry;
}
@ -68,8 +89,13 @@ final class DefaultRSocketStrategies implements RSocketStrategies { @@ -68,8 +89,13 @@ final class DefaultRSocketStrategies implements RSocketStrategies {
}
@Override
public ReactiveAdapterRegistry reactiveAdapterRegistry() {
return this.adapterRegistry;
public RouteMatcher routeMatcher() {
return this.routeMatcher;
}
@Override
public MetadataExtractor metadataExtractor() {
return this.metadataExtractor;
}
@Override
@ -77,6 +103,11 @@ final class DefaultRSocketStrategies implements RSocketStrategies { @@ -77,6 +103,11 @@ final class DefaultRSocketStrategies implements RSocketStrategies {
return this.bufferFactory;
}
@Override
public ReactiveAdapterRegistry reactiveAdapterRegistry() {
return this.adapterRegistry;
}
/**
* Default RSocketStrategies.Builder implementation.
@ -87,21 +118,42 @@ final class DefaultRSocketStrategies implements RSocketStrategies { @@ -87,21 +118,42 @@ final class DefaultRSocketStrategies implements RSocketStrategies {
private final List<Decoder<?>> decoders = new ArrayList<>();
@Nullable
private RouteMatcher routeMatcher;
@Nullable
private MetadataExtractor metadataExtractor;
private ReactiveAdapterRegistry adapterRegistry = ReactiveAdapterRegistry.getSharedInstance();
@Nullable
private DataBufferFactory dataBufferFactory;
private DataBufferFactory bufferFactory;
DefaultRSocketStrategiesBuilder() {
// Order of decoders may be significant for default data MimeType
// selection in RSocketRequester.Builder
this.decoders.add(StringDecoder.allMimeTypes());
this.decoders.add(new ByteBufferDecoder());
this.decoders.add(new ByteArrayDecoder());
this.decoders.add(new DataBufferDecoder());
public DefaultRSocketStrategiesBuilder() {
this.encoders.add(CharSequenceEncoder.allMimeTypes());
this.encoders.add(new ByteBufferEncoder());
this.encoders.add(new ByteArrayEncoder());
this.encoders.add(new DataBufferEncoder());
}
public DefaultRSocketStrategiesBuilder(RSocketStrategies other) {
DefaultRSocketStrategiesBuilder(RSocketStrategies other) {
this.encoders.addAll(other.encoders());
this.decoders.addAll(other.decoders());
this.adapterRegistry = other.reactiveAdapterRegistry();
this.dataBufferFactory = other.dataBufferFactory();
this.bufferFactory = other.dataBufferFactory();
}
@Override
public Builder encoder(Encoder<?>... encoders) {
this.encoders.addAll(Arrays.asList(encoders));
@ -126,6 +178,18 @@ final class DefaultRSocketStrategies implements RSocketStrategies { @@ -126,6 +178,18 @@ final class DefaultRSocketStrategies implements RSocketStrategies {
return this;
}
@Override
public Builder routeMatcher(RouteMatcher routeMatcher) {
this.routeMatcher = routeMatcher;
return this;
}
@Override
public Builder metadataExtractor(MetadataExtractor metadataExtractor) {
this.metadataExtractor = metadataExtractor;
return this;
}
@Override
public Builder reactiveAdapterStrategy(ReactiveAdapterRegistry registry) {
Assert.notNull(registry, "ReactiveAdapterRegistry is required");
@ -135,14 +199,34 @@ final class DefaultRSocketStrategies implements RSocketStrategies { @@ -135,14 +199,34 @@ final class DefaultRSocketStrategies implements RSocketStrategies {
@Override
public Builder dataBufferFactory(DataBufferFactory bufferFactory) {
this.dataBufferFactory = bufferFactory;
this.bufferFactory = bufferFactory;
return this;
}
@Override
public RSocketStrategies build() {
return new DefaultRSocketStrategies(this.encoders, this.decoders, this.adapterRegistry,
this.dataBufferFactory != null ? this.dataBufferFactory : new DefaultDataBufferFactory());
return new DefaultRSocketStrategies(
this.encoders, this.decoders,
this.routeMatcher != null ? this.routeMatcher : initRouteMatcher(),
this.metadataExtractor != null ? this.metadataExtractor : initMetadataExtractor(),
this.bufferFactory != null ? this.bufferFactory : initBufferFactory(),
this.adapterRegistry);
}
private RouteMatcher initRouteMatcher() {
AntPathMatcher pathMatcher = new AntPathMatcher();
pathMatcher.setPathSeparator(".");
return new SimpleRouteMatcher(pathMatcher);
}
private MetadataExtractor initMetadataExtractor() {
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor();
extractor.metadataToExtract(MimeTypeUtils.TEXT_PLAIN, String.class, MetadataExtractor.ROUTE_KEY);
return extractor;
}
private DataBufferFactory initBufferFactory() {
return new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT);
}
}

42
spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java

@ -28,8 +28,9 @@ import reactor.core.publisher.Mono; @@ -28,8 +28,9 @@ import reactor.core.publisher.Mono;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.codec.Decoder;
import org.springframework.lang.Nullable;
import org.springframework.messaging.rsocket.annotation.support.AnnotationClientResponderConfigurer;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
import org.springframework.util.MimeType;
/**
@ -125,32 +126,32 @@ public interface RSocketRequester { @@ -125,32 +126,32 @@ public interface RSocketRequester {
interface Builder {
/**
* Configure the MimeType for payload data which is then specified
* on the {@code SETUP} frame and applies to the whole connection.
* <p>By default this is set to the first concrete mime type supported
* by the configured encoders and decoders.
* @param mimeType the data MimeType to use
* Configure the payload data MimeType to specify on the {@code SETUP}
* frame that applies to the whole connection.
* <p>If this is not set, the builder will try to select the mime type
* based on the presence of a single
* {@link RSocketStrategies.Builder#decoder(Decoder[]) non-default}
* {@code Decoder}, or the first default decoder otherwise
* (i.e. {@code String}) if no others are configured.
*/
RSocketRequester.Builder dataMimeType(@Nullable MimeType mimeType);
/**
* Configure the MimeType for payload metadata which is then specified
* on the {@code SETUP} frame and applies to the whole connection.
* Configure the payload metadata MimeType to specify on the {@code SETUP}
* frame and applies to the whole connection.
* <p>By default this is set to
* {@code "message/x.rsocket.composite-metadata.v0"} in which case the
* route, if provided, is encoded as a
* {@code "message/x.rsocket.routing.v0"} metadata entry, potentially
* with other metadata entries added too. If this is set to any other
* mime type, and a route is provided, it is assumed the mime type is
* for the route.
* @param mimeType the data MimeType to use
* {@code "message/x.rsocket.routing.v0"} composite metadata entry.
* For any other MimeType, it is assumed to be the MimeType for the
* route, if provided.
*/
RSocketRequester.Builder metadataMimeType(MimeType mimeType);
/**
* Set the {@link RSocketStrategies} to use for access to encoders,
* decoders, and a factory for {@code DataBuffer's}.
* @param strategies the codecs strategies to use
* Set the {@link RSocketStrategies} to use.
* <p>By default this is set to {@code RSocketStrategies.builder().build()}
* but may be further customized via {@link #rsocketStrategies(Consumer)}.
*/
RSocketRequester.Builder rsocketStrategies(@Nullable RSocketStrategies strategies);
@ -159,21 +160,20 @@ public interface RSocketRequester { @@ -159,21 +160,20 @@ public interface RSocketRequester {
* <p>By default this starts out with an empty builder, i.e.
* {@link RSocketStrategies#builder()}, but the strategies can also be
* set via {@link #rsocketStrategies(RSocketStrategies)}.
* @param configurer the configurer to apply
*/
RSocketRequester.Builder rsocketStrategies(Consumer<RSocketStrategies.Builder> configurer);
/**
* Callback to configure the {@code ClientRSocketFactory} directly.
* <p>See {@link AnnotationClientResponderConfigurer} for configuring a
* client side responder.
* <p>See static factory method
* {@link RSocketMessageHandler#clientResponder(Object...)} for
* configuring a client side responder with annotated methods.
* <p><strong>Note:</strong> Do not set {@link #dataMimeType(MimeType)}
* and {@link #metadataMimeType(MimeType)} directly on the
* {@code ClientRSocketFactory}. Use the shortcuts on this builder
* instead since the created {@code RSocketRequester} needs to be aware
* of those settings.
* @param configurer consumer to customize the factory
* @see AnnotationClientResponderConfigurer
* @see RSocketMessageHandler#clientResponder(Object...)
*/
RSocketRequester.Builder rsocketFactory(ClientRSocketFactoryConfigurer configurer);

113
spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java

@ -19,15 +19,22 @@ package org.springframework.messaging.rsocket; @@ -19,15 +19,22 @@ package org.springframework.messaging.rsocket;
import java.util.List;
import java.util.function.Consumer;
import io.netty.buffer.PooledByteBufAllocator;
import io.rsocket.Payload;
import io.rsocket.RSocketFactory.ClientRSocketFactory;
import io.rsocket.RSocketFactory.ServerRSocketFactory;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.Encoder;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.AntPathMatcher;
import org.springframework.util.MimeType;
import org.springframework.util.RouteMatcher;
import org.springframework.util.SimpleRouteMatcher;
/**
* Access to strategies for use by RSocket requester and responder components.
@ -86,10 +93,14 @@ public interface RSocketStrategies { @@ -86,10 +93,14 @@ public interface RSocketStrategies {
}
/**
* Return the configured
* {@link Builder#reactiveAdapterStrategy(ReactiveAdapterRegistry) reactiveAdapterRegistry}.
* Return the configured {@link Builder#routeMatcher(RouteMatcher)}.
*/
ReactiveAdapterRegistry reactiveAdapterRegistry();
RouteMatcher routeMatcher();
/**
* Return the configured {@link Builder#metadataExtractor(MetadataExtractor)}.
*/
MetadataExtractor metadataExtractor();
/**
* Return the configured
@ -97,13 +108,11 @@ public interface RSocketStrategies { @@ -97,13 +108,11 @@ public interface RSocketStrategies {
*/
DataBufferFactory dataBufferFactory();
/**
* Return a builder to build a new {@code RSocketStrategies} instance.
* Return the configured
* {@link Builder#reactiveAdapterStrategy(ReactiveAdapterRegistry) reactiveAdapterRegistry}.
*/
static Builder builder() {
return new DefaultRSocketStrategies.DefaultRSocketStrategiesBuilder();
}
ReactiveAdapterRegistry reactiveAdapterRegistry();
/**
* Return a builder to create a new {@link RSocketStrategies} instance
@ -114,60 +123,98 @@ public interface RSocketStrategies { @@ -114,60 +123,98 @@ public interface RSocketStrategies {
}
/**
* Create an {@code RSocketStrategies} instance with default settings.
* Equivalent to {@code RSocketStrategies.builder().build()}.
*/
static RSocketStrategies create() {
return new DefaultRSocketStrategies.DefaultRSocketStrategiesBuilder().build();
}
/**
* Return a builder to build a new {@code RSocketStrategies} instance.
* The builder applies default settings, see individual builder methods for
* details.
*/
static Builder builder() {
return new DefaultRSocketStrategies.DefaultRSocketStrategiesBuilder();
}
/**
* The builder options for creating {@code RSocketStrategies}.
*/
interface Builder {
/**
* Add encoders to use for serializing Objects.
* <p>By default this is empty.
* Append to the list of encoders to use for serializing Objects to the
* data or metadata of a {@link Payload}.
* <p>By default this is initialized with encoders for {@code String},
* {@code byte[]}, {@code ByteBuffer}, and {@code DataBuffer}.
*/
Builder encoder(Encoder<?>... encoder);
/**
* Access and manipulate the list of configured {@link #encoder encoders}.
* Apply the consumer to the list of configured encoders, immediately.
*/
Builder encoders(Consumer<List<Encoder<?>>> consumer);
/**
* Add decoders for de-serializing Objects.
* <p>By default this is empty.
* Append to the list of decoders to use for de-serializing Objects from
* the data or metadata of a {@link Payload}.
* <p>By default this is initialized with decoders for {@code String},
* {@code byte[]}, {@code ByteBuffer}, and {@code DataBuffer}.
*/
Builder decoder(Decoder<?>... decoder);
/**
* Access and manipulate the list of configured {@link #encoder decoders}.
* Apply the consumer to the list of configured decoders, immediately.
*/
Builder decoders(Consumer<List<Decoder<?>>> consumer);
/**
* Configure a {@code RouteMatcher} for matching routes to message
* handlers based on route patterns. This option is applicable to
* client or server responders.
* <p>By default, {@link SimpleRouteMatcher} is used, backed by
* {@link AntPathMatcher} with "." as separator. For better
* efficiency consider using the {@code PathPatternRouteMatcher} from
* {@code spring-web} instead.
*/
Builder routeMatcher(RouteMatcher routeMatcher);
/**
* Configure a {@link MetadataExtractor} to extract the route along with
* other metadata. This option is applicable to client or server
* responders.
* <p>By default this is {@link DefaultMetadataExtractor} extracting a
* route from {@code "message/x.rsocket.routing.v0"} or
* {@code "text/plain"} metadata entries.
*/
Builder metadataExtractor(MetadataExtractor metadataExtractor);
/**
* Configure the registry for reactive type support. This can be used to
* to adapt to, and/or determine the semantics of a given
* {@link org.reactivestreams.Publisher Publisher}.
* <p>By default this {@link ReactiveAdapterRegistry#getSharedInstance()}.
* @param registry the registry to use
*/
Builder reactiveAdapterStrategy(ReactiveAdapterRegistry registry);
/**
* Configure the DataBufferFactory to use for allocating buffers, for
* example when preparing requests or when responding. The choice here
* must be aligned with the frame decoder configured in
* {@link io.rsocket.RSocketFactory}.
* <p>By default this property is an instance of
* {@link org.springframework.core.io.buffer.DefaultDataBufferFactory
* DefaultDataBufferFactory} matching to the default frame decoder in
* {@link io.rsocket.RSocketFactory} which copies the payload. This
* comes at cost to performance but does not require reference counting
* and eliminates possibility for memory leaks.
* <p>To switch to a zero-copy strategy,
* <a href="https://github.com/rsocket/rsocket-java#zero-copy">configure RSocket</a>
* accordingly, and then configure this property with an instance of
* {@link org.springframework.core.io.buffer.NettyDataBufferFactory
* NettyDataBufferFactory} with a pooled allocator such as
* {@link PooledByteBufAllocator#DEFAULT}.
* @param bufferFactory the DataBufferFactory to use
* Configure the DataBufferFactory to use for allocating buffers when
* preparing requests or creating responses.
* <p>By default this is set to {@link NettyDataBufferFactory} with
* pooled, allocated buffers for zero copy. RSocket must also be
* <a href="https://github.com/rsocket/rsocket-java#zero-copy">configured</a>
* for zero copy. For client setup, {@link RSocketRequester.Builder}
* adapts automatically to the {@code DataBufferFactory} configured
* here, and sets the frame decoder in {@link ClientRSocketFactory
* ClientRSocketFactory} accordingly. For server setup, the
* {@link ServerRSocketFactory ServerRSocketFactory} must be configured
* accordingly too for zero copy.
* <p>If using {@link DefaultDataBufferFactory} instead, there is no
* need for related config changes in RSocket.
*/
Builder dataBufferFactory(DataBufferFactory bufferFactory);

133
spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/AnnotationClientResponderConfigurer.java

@ -1,133 +0,0 @@ @@ -1,133 +0,0 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.rsocket.annotation.support;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import io.rsocket.RSocketFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.rsocket.ClientRSocketFactoryConfigurer;
import org.springframework.messaging.rsocket.MetadataExtractor;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.Assert;
import org.springframework.util.RouteMatcher;
/**
* {@link ClientRSocketFactoryConfigurer} to configure and plug in a responder
* that handles requests via annotated handler methods. Effectively a thin layer over
* {@link RSocketMessageHandler} that provides a programmatic way to configure
* it and obtain a responder via {@link RSocketMessageHandler#clientResponder()
* clientResponder()}.
*
* @author Rossen Stoyanchev
* @since 5.2
*/
public final class AnnotationClientResponderConfigurer implements ClientRSocketFactoryConfigurer {
private final List<Object> handlers = new ArrayList<>();
@Nullable
private RouteMatcher routeMatcher;
@Nullable
private MetadataExtractor extractor;
@Nullable
private RSocketStrategies strategies;
private AnnotationClientResponderConfigurer(List<Object> handlers) {
for (Object obj : handlers) {
this.handlers.add(obj instanceof Class ? BeanUtils.instantiateClass((Class<?>) obj) : obj);
}
}
/**
* Configure the {@link RouteMatcher} to use. This is used to set
* {@link RSocketMessageHandler#setRouteMatcher(RouteMatcher)}.
*/
public AnnotationClientResponderConfigurer routeMatcher(RouteMatcher routeMatcher) {
this.routeMatcher = routeMatcher;
return this;
}
/**
* Configure the {@link MetadataExtractor} to use. This is used to set
* {@link RSocketMessageHandler#setMetadataExtractor(MetadataExtractor)}.
*/
public AnnotationClientResponderConfigurer metadataExtractor(MetadataExtractor extractor) {
this.extractor = extractor;
return this;
}
/**
* Configure handlers to detect {@code @MessasgeMapping} handler methods on.
* This is used to set {@link RSocketMessageHandler#setHandlers(List)}.
*/
public AnnotationClientResponderConfigurer handlers(Object... handlers) {
this.handlers.addAll(Arrays.asList(handlers));
return this;
}
// Implementation of ClientRSocketFactoryConfigurer
@Override
public void configureWithStrategies(RSocketStrategies strategies) {
this.strategies = strategies;
}
@Override
public void configure(RSocketFactory.ClientRSocketFactory factory) {
Assert.notEmpty(this.handlers, "No handlers");
RSocketMessageHandler messageHandler = new RSocketMessageHandler();
messageHandler.setHandlers(this.handlers);
if (this.routeMatcher != null) {
messageHandler.setRouteMatcher(this.routeMatcher);
}
if (this.extractor != null) {
messageHandler.setMetadataExtractor(this.extractor);
}
messageHandler.setRSocketStrategies(this.strategies);
messageHandler.afterPropertiesSet();
factory.acceptor(messageHandler.clientResponder());
}
// Static factory methods
/**
* Create an {@code AnnotationClientResponderConfigurer} with the given handlers
* to check for {@code @MessasgeMapping} handler methods.
*/
public static AnnotationClientResponderConfigurer withHandlers(Object... handlers) {
return new AnnotationClientResponderConfigurer(Arrays.asList(handlers));
}
/**
* Create an {@code AnnotationClientResponderConfigurer} to set up further.
*/
public static AnnotationClientResponderConfigurer create() {
return new AnnotationClientResponderConfigurer(Collections.emptyList());
}
}

88
spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandler.java

@ -24,10 +24,12 @@ import java.util.function.Function; @@ -24,10 +24,12 @@ import java.util.function.Function;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import io.rsocket.frame.FrameType;
import reactor.core.publisher.Mono;
import org.springframework.beans.BeanUtils;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.codec.Decoder;
@ -40,6 +42,7 @@ import org.springframework.messaging.handler.DestinationPatternsMessageCondition @@ -40,6 +42,7 @@ import org.springframework.messaging.handler.DestinationPatternsMessageCondition
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.reactive.MessageMappingMessageHandler;
import org.springframework.messaging.handler.invocation.reactive.HandlerMethodReturnValueHandler;
import org.springframework.messaging.rsocket.ClientRSocketFactoryConfigurer;
import org.springframework.messaging.rsocket.DefaultMetadataExtractor;
import org.springframework.messaging.rsocket.MetadataExtractor;
import org.springframework.messaging.rsocket.RSocketRequester;
@ -114,10 +117,16 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler { @@ -114,10 +117,16 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
/**
* Provide configuration in the form of {@link RSocketStrategies} instance
* which can also be re-used to initialize a client-side
* {@link RSocketRequester}. When this property is set, it also sets
* {@link #setDecoders(List) decoders}, {@link #setEncoders(List) encoders},
* and {@link #setReactiveAdapterRegistry(ReactiveAdapterRegistry)
* reactiveAdapterRegistry}.
* {@link RSocketRequester}.
* <p>When this is set, in turn it sets the following:
* <ul>
* <li>{@link #setDecoders(List)}
* <li>{@link #setEncoders(List)}
* <li>{@link #setRouteMatcher(RouteMatcher)}
* <li>{@link #setMetadataExtractor(MetadataExtractor)}
* <li>{@link #setReactiveAdapterRegistry(ReactiveAdapterRegistry)}
* </ul>
* <p>By default if this is not set, it is initialized from the above.
*/
public void setRSocketStrategies(@Nullable RSocketStrategies rsocketStrategies) {
this.rsocketStrategies = rsocketStrategies;
@ -138,12 +147,10 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler { @@ -138,12 +147,10 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
}
/**
* Configure a {@link MetadataExtractor} to extract the route and possibly
* other metadata from the first payload of incoming requests.
* <p>By default this is a {@link DefaultMetadataExtractor} with the
* configured {@link RSocketStrategies} (and decoders), extracting a route
* from {@code "message/x.rsocket.routing.v0"} or {@code "text/plain"}
* metadata entries.
* Configure a {@link MetadataExtractor} to extract the route along with
* other metadata.
* <p>By default this is {@link DefaultMetadataExtractor} extracting a
* route from {@code "message/x.rsocket.routing.v0"} or {@code "text/plain"}.
* @param extractor the extractor to use
*/
public void setMetadataExtractor(MetadataExtractor extractor) {
@ -200,20 +207,25 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler { @@ -200,20 +207,25 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
@Override
public void afterPropertiesSet() {
getArgumentResolverConfigurer().addCustomResolver(new RSocketRequesterMethodArgumentResolver());
super.afterPropertiesSet();
if (getMetadataExtractor() == null) {
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor();
extractor.metadataToExtract(MimeTypeUtils.TEXT_PLAIN, String.class, MetadataExtractor.ROUTE_KEY);
setMetadataExtractor(extractor);
}
if (this.rsocketStrategies == null) {
this.rsocketStrategies = RSocketStrategies.builder()
.decoder(getDecoders().toArray(new Decoder<?>[0]))
.encoder(getEncoders().toArray(new Encoder<?>[0]))
.routeMatcher(getRouteMatcher())
.metadataExtractor(getMetadataExtractor())
.reactiveAdapterStrategy(getReactiveAdapterRegistry())
.build();
}
if (this.metadataExtractor == null) {
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor();
extractor.metadataToExtract(MimeTypeUtils.TEXT_PLAIN, String.class, MetadataExtractor.ROUTE_KEY);
this.metadataExtractor = extractor;
}
getArgumentResolverConfigurer().addCustomResolver(new RSocketRequesterMethodArgumentResolver());
super.afterPropertiesSet();
}
@Override
@ -315,10 +327,50 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler { @@ -315,10 +327,50 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
Assert.notNull(strategies, "No RSocketStrategies. Was afterPropertiesSet not called?");
RSocketRequester requester = RSocketRequester.wrap(rsocket, dataMimeType, metadataMimeType, strategies);
Assert.notNull(this.metadataExtractor, () -> "No MetadataExtractor. Was afterPropertiesSet not called?");
Assert.state(this.metadataExtractor != null,
() -> "No MetadataExtractor. Was afterPropertiesSet not called?");
Assert.state(getRouteMatcher() != null,
() -> "No RouteMatcher. Was afterPropertiesSet not called?");
return new MessagingRSocket(dataMimeType, metadataMimeType, this.metadataExtractor, requester,
this, getRouteMatcher(), strategies);
}
public static ClientRSocketFactoryConfigurer clientResponder(Object... handlers) {
return new ResponderConfigurer(handlers);
}
private static final class ResponderConfigurer implements ClientRSocketFactoryConfigurer {
private final List<Object> handlers = new ArrayList<>();
@Nullable
private RSocketStrategies strategies;
private ResponderConfigurer(Object... handlers) {
Assert.notEmpty(handlers, "No handlers");
for (Object obj : handlers) {
this.handlers.add(obj instanceof Class ? BeanUtils.instantiateClass((Class<?>) obj) : obj);
}
}
@Override
public void configureWithStrategies(RSocketStrategies strategies) {
this.strategies = strategies;
}
@Override
public void configure(RSocketFactory.ClientRSocketFactory factory) {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setHandlers(this.handlers);
handler.setRSocketStrategies(this.strategies);
handler.afterPropertiesSet();
factory.acceptor(handler.clientResponder());
}
}
}

190
spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java

@ -17,11 +17,16 @@ @@ -17,11 +17,16 @@
package org.springframework.messaging.rsocket;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.DuplexConnection;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ClientTransport;
import org.junit.Before;
import org.junit.Test;
@ -29,13 +34,19 @@ import org.reactivestreams.Publisher; @@ -29,13 +34,19 @@ import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.DecodingException;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.ReflectionUtils;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.BDDMockito.given;
@ -52,6 +63,8 @@ public class DefaultRSocketRequesterBuilderTests { @@ -52,6 +63,8 @@ public class DefaultRSocketRequesterBuilderTests {
private ClientTransport transport;
private final TestRSocketFactoryConfigurer rsocketFactoryConfigurer = new TestRSocketFactoryConfigurer();
@Before
public void setup() {
@ -63,53 +76,113 @@ public class DefaultRSocketRequesterBuilderTests { @@ -63,53 +76,113 @@ public class DefaultRSocketRequesterBuilderTests {
@Test
@SuppressWarnings("unchecked")
public void shouldApplyCustomizationsAtSubscription() {
ClientRSocketFactoryConfigurer factoryConfigurer = mock(ClientRSocketFactoryConfigurer.class);
Consumer<RSocketStrategies.Builder> strategiesConfigurer = mock(Consumer.class);
RSocketRequester.builder()
.rsocketFactory(factoryConfigurer)
.rsocketFactory(this.rsocketFactoryConfigurer)
.rsocketStrategies(strategiesConfigurer)
.connect(this.transport);
verifyZeroInteractions(this.transport, factoryConfigurer, strategiesConfigurer);
verifyZeroInteractions(this.transport);
assertThat(this.rsocketFactoryConfigurer.rsocketFactory()).isNull();
}
@Test
@SuppressWarnings("unchecked")
public void shouldApplyCustomizations() {
RSocketStrategies strategies = RSocketStrategies.builder()
.encoder(CharSequenceEncoder.allMimeTypes())
.decoder(StringDecoder.allMimeTypes())
.build();
ClientRSocketFactoryConfigurer factoryConfigurer = mock(ClientRSocketFactoryConfigurer.class);
Consumer<RSocketStrategies.Builder> strategiesConfigurer = mock(Consumer.class);
Consumer<RSocketStrategies.Builder> rsocketStrategiesConfigurer = mock(Consumer.class);
RSocketRequester.builder()
.rsocketStrategies(strategies)
.rsocketFactory(factoryConfigurer)
.rsocketStrategies(strategiesConfigurer)
.rsocketFactory(this.rsocketFactoryConfigurer)
.rsocketStrategies(rsocketStrategiesConfigurer)
.connect(this.transport)
.block();
// RSocketStrategies and RSocketFactory configurers should have been called
verify(this.transport).connect(anyInt());
verify(factoryConfigurer).configureWithStrategies(any(RSocketStrategies.class));
verify(factoryConfigurer).configure(any(RSocketFactory.ClientRSocketFactory.class));
verify(strategiesConfigurer).accept(any(RSocketStrategies.Builder.class));
verify(rsocketStrategiesConfigurer).accept(any(RSocketStrategies.Builder.class));
assertThat(this.rsocketFactoryConfigurer.rsocketStrategies()).isNotNull();
assertThat(this.rsocketFactoryConfigurer.rsocketFactory()).isNotNull();
}
@Test
public void defaultDataMimeType() {
RSocketRequester requester = RSocketRequester.builder()
.connect(this.transport)
.block();
assertThat(requester.dataMimeType())
.as("Default data MimeType, based on the first configured Decoder")
.isEqualTo(MimeTypeUtils.TEXT_PLAIN);
}
@Test
public void dataMimeType() throws NoSuchFieldException {
public void defaultDataMimeTypeWithCustomDecoderRegitered() {
RSocketStrategies strategies = RSocketStrategies.builder()
.encoder(CharSequenceEncoder.allMimeTypes())
.decoder(StringDecoder.allMimeTypes())
.decoder(new TestJsonDecoder(MimeTypeUtils.APPLICATION_JSON))
.build();
RSocketRequester requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.connect(this.transport)
.block();
assertThat(requester.dataMimeType())
.as("Default data MimeType, based on the first configured, non-default Decoder")
.isEqualTo(MimeTypeUtils.APPLICATION_JSON);
}
@Test
public void defaultDataMimeTypeWithMultipleCustomDecoderRegitered() {
RSocketStrategies strategies = RSocketStrategies.builder()
.decoder(new TestJsonDecoder(MimeTypeUtils.APPLICATION_JSON))
.decoder(new TestJsonDecoder(MimeTypeUtils.APPLICATION_XML))
.build();
assertThatThrownBy(() ->
RSocketRequester
.builder()
.rsocketStrategies(strategies)
.connect(this.transport)
.block())
.hasMessageContaining("Cannot select default data MimeType");
}
@Test
public void dataMimeTypeSet() {
RSocketRequester requester = RSocketRequester.builder()
.dataMimeType(MimeTypeUtils.APPLICATION_JSON)
.connect(this.transport)
.block();
Field field = DefaultRSocketRequester.class.getDeclaredField("dataMimeType");
assertThat(requester.dataMimeType()).isEqualTo(MimeTypeUtils.APPLICATION_JSON);
}
@Test
public void frameDecoderMatchesDataBufferFactory() throws Exception {
testFrameDecoder(new NettyDataBufferFactory(ByteBufAllocator.DEFAULT), PayloadDecoder.ZERO_COPY);
testFrameDecoder(new DefaultDataBufferFactory(), PayloadDecoder.DEFAULT);
}
private void testFrameDecoder(DataBufferFactory bufferFactory, PayloadDecoder frameDecoder)
throws NoSuchFieldException {
RSocketStrategies strategies = RSocketStrategies.builder()
.dataBufferFactory(bufferFactory)
.build();
RSocketRequester.builder()
.rsocketStrategies(strategies)
.rsocketFactory(this.rsocketFactoryConfigurer)
.connect(this.transport)
.block();
RSocketFactory.ClientRSocketFactory factory = this.rsocketFactoryConfigurer.rsocketFactory();
assertThat(factory).isNotNull();
Field field = RSocketFactory.ClientRSocketFactory.class.getDeclaredField("payloadDecoder");
ReflectionUtils.makeAccessible(field);
MimeType dataMimeType = (MimeType) ReflectionUtils.getField(field, requester);
assertThat(dataMimeType).isEqualTo(MimeTypeUtils.APPLICATION_JSON);
PayloadDecoder decoder = (PayloadDecoder) ReflectionUtils.getField(field, factory);
assertThat(decoder).isSameAs(frameDecoder);
}
@ -135,4 +208,75 @@ public class DefaultRSocketRequesterBuilderTests { @@ -135,4 +208,75 @@ public class DefaultRSocketRequesterBuilderTests {
}
}
static class TestRSocketFactoryConfigurer implements ClientRSocketFactoryConfigurer {
private RSocketStrategies strategies;
private RSocketFactory.ClientRSocketFactory rsocketFactory;
public RSocketStrategies rsocketStrategies() {
return this.strategies;
}
public RSocketFactory.ClientRSocketFactory rsocketFactory() {
return this.rsocketFactory;
}
@Override
public void configureWithStrategies(RSocketStrategies strategies) {
this.strategies = strategies;
}
@Override
public void configure(RSocketFactory.ClientRSocketFactory rsocketFactory) {
this.rsocketFactory = rsocketFactory;
}
}
static class TestJsonDecoder implements Decoder<Object> {
private final MimeType mimeType;
TestJsonDecoder(MimeType mimeType) {
this.mimeType = mimeType;
}
@Override
public List<MimeType> getDecodableMimeTypes() {
return Collections.singletonList(this.mimeType);
}
@Override
public boolean canDecode(ResolvableType elementType, MimeType mimeType) {
return false;
}
@Override
public Mono<Object> decodeToMono(Publisher<DataBuffer> inputStream, ResolvableType elementType,
MimeType mimeType, Map<String, Object> hints) {
throw new UnsupportedOperationException();
}
@Override
public Flux<Object> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
MimeType mimeType, Map<String, Object> hints) {
throw new UnsupportedOperationException();
}
@Override
public Object decode(DataBuffer buffer, ResolvableType targetType, MimeType mimeType,
Map<String, Object> hints) throws DecodingException {
throw new UnsupportedOperationException();
}
}
}

15
spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java

@ -26,6 +26,7 @@ import io.netty.util.ReferenceCounted; @@ -26,6 +26,7 @@ import io.netty.util.ReferenceCounted;
import io.rsocket.AbstractRSocket;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.plugins.RSocketInterceptor;
import io.rsocket.transport.netty.server.CloseableChannel;
@ -44,8 +45,6 @@ import reactor.test.StepVerifier; @@ -44,8 +45,6 @@ import reactor.test.StepVerifier;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.core.io.Resource;
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
import org.springframework.messaging.handler.annotation.MessageMapping;
@ -74,21 +73,21 @@ public class RSocketBufferLeakTests { @@ -74,21 +73,21 @@ public class RSocketBufferLeakTests {
@BeforeClass
@SuppressWarnings("ConstantConditions")
public static void setupOnce() {
context = new AnnotationConfigApplicationContext(ServerConfig.class);
RSocketMessageHandler messageHandler = context.getBean(RSocketMessageHandler.class);
SocketAcceptor responder = messageHandler.serverResponder();
server = RSocketFactory.receive()
.frameDecoder(PayloadDecoder.ZERO_COPY)
.addResponderPlugin(payloadInterceptor) // intercept responding
.acceptor(context.getBean(RSocketMessageHandler.class).serverResponder())
.acceptor(responder)
.transport(TcpServerTransport.create("localhost", 7000))
.start()
.block();
requester = RSocketRequester.builder()
.rsocketFactory(factory -> {
factory.frameDecoder(PayloadDecoder.ZERO_COPY);
factory.addRequesterPlugin(payloadInterceptor); // intercept outgoing requests
})
.rsocketFactory(factory -> factory.addRequesterPlugin(payloadInterceptor))
.rsocketStrategies(context.getBean(RSocketStrategies.class))
.connectTcp("localhost", 7000)
.block();
@ -215,8 +214,6 @@ public class RSocketBufferLeakTests { @@ -215,8 +214,6 @@ public class RSocketBufferLeakTests {
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.allMimeTypes())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new LeakAwareNettyDataBufferFactory(PooledByteBufAllocator.DEFAULT))
.build();
}

17
spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java

@ -18,8 +18,8 @@ package org.springframework.messaging.rsocket; @@ -18,8 +18,8 @@ package org.springframework.messaging.rsocket;
import java.time.Duration;
import io.netty.buffer.PooledByteBufAllocator;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
@ -34,9 +34,6 @@ import reactor.test.StepVerifier; @@ -34,9 +34,6 @@ import reactor.test.StepVerifier;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
@ -63,18 +60,20 @@ public class RSocketClientToServerIntegrationTests { @@ -63,18 +60,20 @@ public class RSocketClientToServerIntegrationTests {
@BeforeClass
@SuppressWarnings("ConstantConditions")
public static void setupOnce() {
context = new AnnotationConfigApplicationContext(ServerConfig.class);
RSocketMessageHandler messageHandler = context.getBean(RSocketMessageHandler.class);
SocketAcceptor responder = messageHandler.serverResponder();
server = RSocketFactory.receive()
.addResponderPlugin(interceptor)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.acceptor(context.getBean(RSocketMessageHandler.class).serverResponder())
.acceptor(responder)
.transport(TcpServerTransport.create("localhost", 7000))
.start()
.block();
requester = RSocketRequester.builder()
.rsocketFactory(factory -> factory.frameDecoder(PayloadDecoder.ZERO_COPY))
.rsocketStrategies(context.getBean(RSocketStrategies.class))
.connectTcp("localhost", 7000)
.block();
@ -266,11 +265,7 @@ public class RSocketClientToServerIntegrationTests { @@ -266,11 +265,7 @@ public class RSocketClientToServerIntegrationTests {
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.allMimeTypes())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT))
.build();
return RSocketStrategies.create();
}
}

26
spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java

@ -18,8 +18,8 @@ package org.springframework.messaging.rsocket; @@ -18,8 +18,8 @@ package org.springframework.messaging.rsocket;
import java.time.Duration;
import io.netty.buffer.PooledByteBufAllocator;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
@ -37,12 +37,8 @@ import reactor.test.StepVerifier; @@ -37,12 +37,8 @@ import reactor.test.StepVerifier;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.rsocket.annotation.ConnectMapping;
import org.springframework.messaging.rsocket.annotation.support.AnnotationClientResponderConfigurer;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
import org.springframework.stereotype.Controller;
@ -62,11 +58,14 @@ public class RSocketServerToClientIntegrationTests { @@ -62,11 +58,14 @@ public class RSocketServerToClientIntegrationTests {
@BeforeClass
@SuppressWarnings("ConstantConditions")
public static void setupOnce() {
context = new AnnotationConfigApplicationContext(RSocketConfig.class);
RSocketMessageHandler messageHandler = context.getBean(RSocketMessageHandler.class);
SocketAcceptor responder = messageHandler.serverResponder();
server = RSocketFactory.receive()
.frameDecoder(PayloadDecoder.ZERO_COPY)
.acceptor(context.getBean(RSocketMessageHandler.class).serverResponder())
.acceptor(responder)
.transport(TcpServerTransport.create("localhost", 0))
.start()
.block();
@ -103,21 +102,16 @@ public class RSocketServerToClientIntegrationTests { @@ -103,21 +102,16 @@ public class RSocketServerToClientIntegrationTests {
ServerController serverController = context.getBean(ServerController.class);
serverController.reset();
RSocketStrategies strategies = context.getBean(RSocketStrategies.class);
RSocketRequester requester = null;
try {
ClientRSocketFactoryConfigurer responderConfigurer =
AnnotationClientResponderConfigurer.withHandlers(new ClientHandler());
requester = RSocketRequester.builder()
.rsocketFactory(factory -> {
factory.metadataMimeType("text/plain");
factory.setupPayload(ByteBufPayload.create("", connectionRoute));
factory.frameDecoder(PayloadDecoder.ZERO_COPY);
})
.rsocketFactory(responderConfigurer)
.rsocketStrategies(strategies)
.rsocketFactory(RSocketMessageHandler.clientResponder(new ClientHandler()))
.rsocketStrategies(context.getBean(RSocketStrategies.class))
.connectTcp("localhost", server.address().getPort())
.block();
@ -268,11 +262,7 @@ public class RSocketServerToClientIntegrationTests { @@ -268,11 +262,7 @@ public class RSocketServerToClientIntegrationTests {
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.allMimeTypes())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT))
.build();
return RSocketStrategies.create();
}
}

Loading…
Cancel
Save