|
|
|
@ -20,6 +20,7 @@ import java.net.URI; |
|
|
|
import java.util.ArrayList; |
|
|
|
import java.util.ArrayList; |
|
|
|
import java.util.List; |
|
|
|
import java.util.List; |
|
|
|
import java.util.function.Consumer; |
|
|
|
import java.util.function.Consumer; |
|
|
|
|
|
|
|
import java.util.stream.Stream; |
|
|
|
|
|
|
|
|
|
|
|
import io.rsocket.RSocketFactory; |
|
|
|
import io.rsocket.RSocketFactory; |
|
|
|
import io.rsocket.transport.ClientTransport; |
|
|
|
import io.rsocket.transport.ClientTransport; |
|
|
|
@ -28,6 +29,7 @@ import io.rsocket.transport.netty.client.WebsocketClientTransport; |
|
|
|
import reactor.core.publisher.Mono; |
|
|
|
import reactor.core.publisher.Mono; |
|
|
|
|
|
|
|
|
|
|
|
import org.springframework.lang.Nullable; |
|
|
|
import org.springframework.lang.Nullable; |
|
|
|
|
|
|
|
import org.springframework.util.Assert; |
|
|
|
import org.springframework.util.MimeType; |
|
|
|
import org.springframework.util.MimeType; |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
@ -39,6 +41,9 @@ import org.springframework.util.MimeType; |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { |
|
|
|
final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Nullable |
|
|
|
|
|
|
|
private MimeType dataMimeType; |
|
|
|
|
|
|
|
|
|
|
|
private List<Consumer<RSocketFactory.ClientRSocketFactory>> factoryConfigurers = new ArrayList<>(); |
|
|
|
private List<Consumer<RSocketFactory.ClientRSocketFactory>> factoryConfigurers = new ArrayList<>(); |
|
|
|
|
|
|
|
|
|
|
|
@Nullable |
|
|
|
@Nullable |
|
|
|
@ -47,6 +52,12 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { |
|
|
|
private List<Consumer<RSocketStrategies.Builder>> strategiesConfigurers = new ArrayList<>(); |
|
|
|
private List<Consumer<RSocketStrategies.Builder>> strategiesConfigurers = new ArrayList<>(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
|
|
|
public RSocketRequester.Builder dataMimeType(MimeType mimeType) { |
|
|
|
|
|
|
|
this.dataMimeType = mimeType; |
|
|
|
|
|
|
|
return this; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public RSocketRequester.Builder rsocketFactory(Consumer<RSocketFactory.ClientRSocketFactory> configurer) { |
|
|
|
public RSocketRequester.Builder rsocketFactory(Consumer<RSocketFactory.ClientRSocketFactory> configurer) { |
|
|
|
this.factoryConfigurers.add(configurer); |
|
|
|
this.factoryConfigurers.add(configurer); |
|
|
|
@ -81,17 +92,15 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private Mono<RSocketRequester> doConnect(ClientTransport transport) { |
|
|
|
private Mono<RSocketRequester> doConnect(ClientTransport transport) { |
|
|
|
RSocketStrategies rsocketStrategies = getRSocketStrategies(); |
|
|
|
|
|
|
|
RSocketFactory.ClientRSocketFactory rsocketFactory = RSocketFactory.connect(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 1. Apply default settings
|
|
|
|
RSocketStrategies rsocketStrategies = getRSocketStrategies(); |
|
|
|
MimeType dataMimeType = getDefaultDataMimeType(rsocketStrategies); |
|
|
|
Assert.isTrue(!rsocketStrategies.encoders().isEmpty(), "No encoders"); |
|
|
|
if (dataMimeType != null) { |
|
|
|
Assert.isTrue(!rsocketStrategies.decoders().isEmpty(), "No decoders"); |
|
|
|
rsocketFactory.dataMimeType(dataMimeType.toString()); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 2. Application customizations
|
|
|
|
RSocketFactory.ClientRSocketFactory rsocketFactory = RSocketFactory.connect(); |
|
|
|
this.factoryConfigurers.forEach(c -> c.accept(rsocketFactory)); |
|
|
|
MimeType dataMimeType = getDataMimeType(rsocketStrategies); |
|
|
|
|
|
|
|
rsocketFactory.dataMimeType(dataMimeType.toString()); |
|
|
|
|
|
|
|
this.factoryConfigurers.forEach(consumer -> consumer.accept(rsocketFactory)); |
|
|
|
|
|
|
|
|
|
|
|
return rsocketFactory.transport(transport).start() |
|
|
|
return rsocketFactory.transport(transport).start() |
|
|
|
.map(rsocket -> new DefaultRSocketRequester(rsocket, dataMimeType, rsocketStrategies)); |
|
|
|
.map(rsocket -> new DefaultRSocketRequester(rsocket, dataMimeType, rsocketStrategies)); |
|
|
|
@ -109,18 +118,20 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Nullable |
|
|
|
private MimeType getDataMimeType(RSocketStrategies strategies) { |
|
|
|
private MimeType getDefaultDataMimeType(RSocketStrategies strategies) { |
|
|
|
if (this.dataMimeType != null) { |
|
|
|
return strategies.encoders().stream() |
|
|
|
return this.dataMimeType; |
|
|
|
.flatMap(encoder -> encoder.getEncodableMimeTypes().stream()) |
|
|
|
} |
|
|
|
.filter(MimeType::isConcrete) |
|
|
|
return Stream |
|
|
|
.findFirst() |
|
|
|
.concat( |
|
|
|
.orElseGet(() -> |
|
|
|
strategies.encoders().stream() |
|
|
|
|
|
|
|
.flatMap(encoder -> encoder.getEncodableMimeTypes().stream()), |
|
|
|
strategies.decoders().stream() |
|
|
|
strategies.decoders().stream() |
|
|
|
.flatMap(encoder -> encoder.getDecodableMimeTypes().stream()) |
|
|
|
.flatMap(encoder -> encoder.getDecodableMimeTypes().stream()) |
|
|
|
.filter(MimeType::isConcrete) |
|
|
|
) |
|
|
|
.findFirst() |
|
|
|
.filter(MimeType::isConcrete) |
|
|
|
.orElse(null)); |
|
|
|
.findFirst() |
|
|
|
|
|
|
|
.orElseThrow(() -> new IllegalArgumentException("Failed to select data MimeType to use.")); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|