|
|
|
@ -161,20 +161,14 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { |
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public RSocketRequester transport(ClientTransport transport) { |
|
|
|
public RSocketRequester transport(ClientTransport transport) { |
|
|
|
RSocketStrategies strategies = getRSocketStrategies(); |
|
|
|
RSocketStrategies strategies = getRSocketStrategies(); |
|
|
|
Assert.isTrue(!strategies.encoders().isEmpty(), "No encoders"); |
|
|
|
MimeType metaMimeType = getMetadataMimeType(); |
|
|
|
Assert.isTrue(!strategies.decoders().isEmpty(), "No decoders"); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
MimeType metaMimeType = (this.metadataMimeType != null ? this.metadataMimeType : |
|
|
|
|
|
|
|
MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString())); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
MimeType dataMimeType = getDataMimeType(strategies); |
|
|
|
MimeType dataMimeType = getDataMimeType(strategies); |
|
|
|
Mono<Payload> setupPayload = getSetupPayload(dataMimeType, metaMimeType, strategies); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
RSocketConnector connector = initConnector( |
|
|
|
RSocketConnector connector = initConnector( |
|
|
|
this.rsocketConnectorConfigurers, metaMimeType, dataMimeType, setupPayload, strategies); |
|
|
|
this.rsocketConnectorConfigurers, metaMimeType, dataMimeType, strategies); |
|
|
|
|
|
|
|
|
|
|
|
return new DefaultRSocketRequester( |
|
|
|
RSocketClient client = RSocketClient.from(connector.connect(transport)); |
|
|
|
RSocketClient.from(connector.connect(transport)), null, dataMimeType, metaMimeType, strategies); |
|
|
|
return new DefaultRSocketRequester(client, null, dataMimeType, metaMimeType, strategies); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
@ -192,35 +186,36 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { |
|
|
|
@Override |
|
|
|
@Override |
|
|
|
@SuppressWarnings("deprecation") |
|
|
|
@SuppressWarnings("deprecation") |
|
|
|
public Mono<RSocketRequester> connect(ClientTransport transport) { |
|
|
|
public Mono<RSocketRequester> connect(ClientTransport transport) { |
|
|
|
|
|
|
|
|
|
|
|
RSocketStrategies rsocketStrategies = getRSocketStrategies(); |
|
|
|
RSocketStrategies rsocketStrategies = getRSocketStrategies(); |
|
|
|
Assert.isTrue(!rsocketStrategies.encoders().isEmpty(), "No encoders"); |
|
|
|
MimeType metaMimeType = getMetadataMimeType(); |
|
|
|
Assert.isTrue(!rsocketStrategies.decoders().isEmpty(), "No decoders"); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
MimeType metaMimeType = (this.metadataMimeType != null ? this.metadataMimeType : |
|
|
|
|
|
|
|
MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString())); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
MimeType dataMimeType = getDataMimeType(rsocketStrategies); |
|
|
|
MimeType dataMimeType = getDataMimeType(rsocketStrategies); |
|
|
|
Mono<Payload> setupPayload = getSetupPayload(dataMimeType, metaMimeType, rsocketStrategies); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
RSocketConnector connector = initConnector( |
|
|
|
RSocketConnector connector = initConnector( |
|
|
|
this.rsocketConnectorConfigurers, |
|
|
|
this.rsocketConnectorConfigurers, metaMimeType, dataMimeType, rsocketStrategies); |
|
|
|
metaMimeType, dataMimeType, setupPayload, rsocketStrategies); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return connector.connect(transport).map(rsocket -> |
|
|
|
return connector.connect(transport).map(rsocket -> |
|
|
|
new DefaultRSocketRequester(null, rsocket, dataMimeType, metaMimeType, rsocketStrategies)); |
|
|
|
new DefaultRSocketRequester(null, rsocket, dataMimeType, metaMimeType, rsocketStrategies)); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public MimeType getMetadataMimeType() { |
|
|
|
|
|
|
|
return this.metadataMimeType != null ? this.metadataMimeType : |
|
|
|
|
|
|
|
MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString()); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private RSocketStrategies getRSocketStrategies() { |
|
|
|
private RSocketStrategies getRSocketStrategies() { |
|
|
|
|
|
|
|
RSocketStrategies result; |
|
|
|
if (!this.strategiesConfigurers.isEmpty()) { |
|
|
|
if (!this.strategiesConfigurers.isEmpty()) { |
|
|
|
RSocketStrategies.Builder builder = |
|
|
|
RSocketStrategies.Builder builder = |
|
|
|
this.strategies != null ? this.strategies.mutate() : RSocketStrategies.builder(); |
|
|
|
this.strategies != null ? this.strategies.mutate() : RSocketStrategies.builder(); |
|
|
|
this.strategiesConfigurers.forEach(c -> c.accept(builder)); |
|
|
|
this.strategiesConfigurers.forEach(c -> c.accept(builder)); |
|
|
|
return builder.build(); |
|
|
|
result = builder.build(); |
|
|
|
} |
|
|
|
} |
|
|
|
else { |
|
|
|
else { |
|
|
|
return this.strategies != null ? this.strategies : RSocketStrategies.builder().build(); |
|
|
|
result = this.strategies != null ? this.strategies : RSocketStrategies.builder().build(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
Assert.isTrue(!result.encoders().isEmpty(), "No encoders"); |
|
|
|
|
|
|
|
Assert.isTrue(!result.decoders().isEmpty(), "No decoders"); |
|
|
|
|
|
|
|
return result; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private MimeType getDataMimeType(RSocketStrategies strategies) { |
|
|
|
private MimeType getDataMimeType(RSocketStrategies strategies) { |
|
|
|
@ -294,8 +289,7 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { |
|
|
|
|
|
|
|
|
|
|
|
@SuppressWarnings("deprecation") |
|
|
|
@SuppressWarnings("deprecation") |
|
|
|
private RSocketConnector initConnector(List<RSocketConnectorConfigurer> connectorConfigurers, |
|
|
|
private RSocketConnector initConnector(List<RSocketConnectorConfigurer> connectorConfigurers, |
|
|
|
MimeType metaMimeType, MimeType dataMimeType, Mono<Payload> setupPayloadMono, |
|
|
|
MimeType metaMimeType, MimeType dataMimeType, RSocketStrategies rsocketStrategies) { |
|
|
|
RSocketStrategies rsocketStrategies) { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
RSocketConnector connector = RSocketConnector.create(); |
|
|
|
RSocketConnector connector = RSocketConnector.create(); |
|
|
|
connectorConfigurers.forEach(c -> c.configure(connector)); |
|
|
|
connectorConfigurers.forEach(c -> c.configure(connector)); |
|
|
|
@ -307,9 +301,11 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { |
|
|
|
connector.metadataMimeType(metaMimeType.toString()); |
|
|
|
connector.metadataMimeType(metaMimeType.toString()); |
|
|
|
connector.dataMimeType(dataMimeType.toString()); |
|
|
|
connector.dataMimeType(dataMimeType.toString()); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Mono<Payload> setupPayloadMono = getSetupPayload(dataMimeType, metaMimeType, rsocketStrategies); |
|
|
|
if (setupPayloadMono != EMPTY_SETUP_PAYLOAD) { |
|
|
|
if (setupPayloadMono != EMPTY_SETUP_PAYLOAD) { |
|
|
|
connector.setupPayload(setupPayloadMono); |
|
|
|
connector.setupPayload(setupPayloadMono); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
return connector; |
|
|
|
return connector; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|