From eacc531cf76594747e3541decdd89f16bc46cf73 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 29 May 2019 17:05:25 -0400 Subject: [PATCH] Add dataMimeType to RSocketRequestBuilder Closes gh-23012 --- spring-messaging/spring-messaging.gradle | 2 +- .../DefaultRSocketRequesterBuilder.java | 49 ++++++++++++------- .../messaging/rsocket/RSocketRequester.java | 15 ++++-- .../DefaultRSocketRequesterBuilderTests.java | 33 ++++++++++++- 4 files changed, 75 insertions(+), 24 deletions(-) diff --git a/spring-messaging/spring-messaging.gradle b/spring-messaging/spring-messaging.gradle index ba6b40b5fa8..df5a1f13165 100644 --- a/spring-messaging/spring-messaging.gradle +++ b/spring-messaging/spring-messaging.gradle @@ -7,7 +7,7 @@ dependencyManagement { } } -def rsocketVersion = "0.12.2-RC3-SNAPSHOT" +def rsocketVersion = "0.12.2-RC4-SNAPSHOT" dependencies { compile(project(":spring-beans")) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java index 5156db03e50..7213e65ff35 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java @@ -20,6 +20,7 @@ import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.function.Consumer; +import java.util.stream.Stream; import io.rsocket.RSocketFactory; import io.rsocket.transport.ClientTransport; @@ -28,6 +29,7 @@ import io.rsocket.transport.netty.client.WebsocketClientTransport; import reactor.core.publisher.Mono; import org.springframework.lang.Nullable; +import org.springframework.util.Assert; import org.springframework.util.MimeType; /** @@ -39,6 +41,9 @@ import org.springframework.util.MimeType; */ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { + @Nullable + private MimeType dataMimeType; + private List> factoryConfigurers = new ArrayList<>(); @Nullable @@ -47,6 +52,12 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { private List> strategiesConfigurers = new ArrayList<>(); + @Override + public RSocketRequester.Builder dataMimeType(MimeType mimeType) { + this.dataMimeType = mimeType; + return this; + } + @Override public RSocketRequester.Builder rsocketFactory(Consumer configurer) { this.factoryConfigurers.add(configurer); @@ -81,17 +92,15 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { } private Mono doConnect(ClientTransport transport) { - RSocketStrategies rsocketStrategies = getRSocketStrategies(); - RSocketFactory.ClientRSocketFactory rsocketFactory = RSocketFactory.connect(); - // 1. Apply default settings - MimeType dataMimeType = getDefaultDataMimeType(rsocketStrategies); - if (dataMimeType != null) { - rsocketFactory.dataMimeType(dataMimeType.toString()); - } + RSocketStrategies rsocketStrategies = getRSocketStrategies(); + Assert.isTrue(!rsocketStrategies.encoders().isEmpty(), "No encoders"); + Assert.isTrue(!rsocketStrategies.decoders().isEmpty(), "No decoders"); - // 2. Application customizations - this.factoryConfigurers.forEach(c -> c.accept(rsocketFactory)); + RSocketFactory.ClientRSocketFactory rsocketFactory = RSocketFactory.connect(); + MimeType dataMimeType = getDataMimeType(rsocketStrategies); + rsocketFactory.dataMimeType(dataMimeType.toString()); + this.factoryConfigurers.forEach(consumer -> consumer.accept(rsocketFactory)); return rsocketFactory.transport(transport).start() .map(rsocket -> new DefaultRSocketRequester(rsocket, dataMimeType, rsocketStrategies)); @@ -109,18 +118,20 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { } } - @Nullable - private MimeType getDefaultDataMimeType(RSocketStrategies strategies) { - return strategies.encoders().stream() - .flatMap(encoder -> encoder.getEncodableMimeTypes().stream()) - .filter(MimeType::isConcrete) - .findFirst() - .orElseGet(() -> + private MimeType getDataMimeType(RSocketStrategies strategies) { + if (this.dataMimeType != null) { + return this.dataMimeType; + } + return Stream + .concat( + strategies.encoders().stream() + .flatMap(encoder -> encoder.getEncodableMimeTypes().stream()), strategies.decoders().stream() .flatMap(encoder -> encoder.getDecodableMimeTypes().stream()) - .filter(MimeType::isConcrete) - .findFirst() - .orElse(null)); + ) + .filter(MimeType::isConcrete) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("Failed to select data MimeType to use.")); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java index 0b8c64dbf32..f64e1d751ec 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java @@ -106,11 +106,20 @@ public interface RSocketRequester { */ interface Builder { + /** + * Configure the MimeType to use for payload data. This is set on the + * {@code SETUP} frame for the whole connection. + *

By default this is set to the first concrete MimeType supported + * by the configured encoders and decoders. + * @param mimeType the data MimeType to use + */ + RSocketRequester.Builder dataMimeType(MimeType mimeType); + /** * Configure the {@code ClientRSocketFactory}. - *

Note there is typically no need to set a data MimeType explicitly. - * By default a data MimeType is picked by taking the first concrete - * MimeType supported by the configured encoders and decoders. + *

Note: Please, do not set the {@code dataMimeType} + * directly on the underlying {@code RSocketFactory.ClientRSocketFactory}, + * and use {@link #dataMimeType(MimeType)} instead. * @param configurer the configurer to apply */ RSocketRequester.Builder rsocketFactory(Consumer configurer); diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java index 062aaf20bff..6a18c48a78f 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java @@ -16,6 +16,7 @@ package org.springframework.messaging.rsocket; +import java.lang.reflect.Field; import java.util.function.Consumer; import io.netty.buffer.ByteBuf; @@ -28,6 +29,13 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import org.springframework.core.codec.CharSequenceEncoder; +import org.springframework.core.codec.StringDecoder; +import org.springframework.util.MimeType; +import org.springframework.util.MimeTypeUtils; +import org.springframework.util.ReflectionUtils; + +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.BDDMockito.given; @@ -67,9 +75,14 @@ public class DefaultRSocketRequesterBuilderTests { @Test @SuppressWarnings("unchecked") public void shouldApplyCustomizations() { + RSocketStrategies strategies = RSocketStrategies.builder() + .encoder(CharSequenceEncoder.allMimeTypes()) + .decoder(StringDecoder.allMimeTypes()) + .build(); Consumer factoryConfigurer = mock(Consumer.class); Consumer strategiesConfigurer = mock(Consumer.class); RSocketRequester.builder() + .rsocketStrategies(strategies) .rsocketFactory(factoryConfigurer) .rsocketStrategies(strategiesConfigurer) .connect(this.transport) @@ -79,6 +92,25 @@ public class DefaultRSocketRequesterBuilderTests { verify(strategiesConfigurer).accept(any(RSocketStrategies.Builder.class)); } + @Test + public void dataMimeType() throws NoSuchFieldException { + RSocketStrategies strategies = RSocketStrategies.builder() + .encoder(CharSequenceEncoder.allMimeTypes()) + .decoder(StringDecoder.allMimeTypes()) + .build(); + + RSocketRequester requester = RSocketRequester.builder() + .rsocketStrategies(strategies) + .dataMimeType(MimeTypeUtils.APPLICATION_JSON) + .connect(this.transport) + .block(); + + Field field = DefaultRSocketRequester.class.getDeclaredField("dataMimeType"); + ReflectionUtils.makeAccessible(field); + MimeType dataMimeType = (MimeType) ReflectionUtils.getField(field, requester); + assertThat(dataMimeType).isEqualTo(MimeTypeUtils.APPLICATION_JSON); + } + static class MockConnection implements DuplexConnection { @@ -99,7 +131,6 @@ public class DefaultRSocketRequesterBuilderTests { @Override public void dispose() { - } }