Browse Source

Nullability refinements and related polishing

pull/22870/head
Juergen Hoeller 7 years ago
parent
commit
c4bd5abc3b
  1. 2
      spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandler.java
  2. 14
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java
  3. 4
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java
  4. 5
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketStrategies.java
  5. 4
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessageHandlerAcceptor.java
  6. 29
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java

2
spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandler.java

@ -323,6 +323,7 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler<C @@ -323,6 +323,7 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler<C
if (!CollectionUtils.isEmpty(patterns)) {
String pattern = patterns.iterator().next();
String destination = getDestination(message);
Assert.state(destination != null, "Missing destination header");
Map<String, String> vars = getPathMatcher().extractUriTemplateVariables(pattern, destination);
if (!CollectionUtils.isEmpty(vars)) {
MessageHeaderAccessor mha = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class);
@ -332,4 +333,5 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler<C @@ -332,4 +333,5 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler<C
}
return super.handleMatch(mapping, handlerMethod, message);
}
}

14
spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java

@ -79,7 +79,7 @@ final class DefaultRSocketRequester implements RSocketRequester { @@ -79,7 +79,7 @@ final class DefaultRSocketRequester implements RSocketRequester {
private static boolean isVoid(ResolvableType elementType) {
return Void.class.equals(elementType.resolve()) || void.class.equals(elementType.resolve());
return (Void.class.equals(elementType.resolve()) || void.class.equals(elementType.resolve()));
}
@ -87,12 +87,10 @@ final class DefaultRSocketRequester implements RSocketRequester { @@ -87,12 +87,10 @@ final class DefaultRSocketRequester implements RSocketRequester {
private final String route;
DefaultRequestSpec(String route) {
this.route = route;
}
@Override
public ResponseSpec data(Object data) {
Assert.notNull(data, "'data' must not be null");
@ -195,7 +193,6 @@ final class DefaultRSocketRequester implements RSocketRequester { @@ -195,7 +193,6 @@ final class DefaultRSocketRequester implements RSocketRequester {
@Nullable
private final Flux<Payload> payloadFlux;
DefaultResponseSpec(Mono<Payload> payloadMono) {
this.payloadMono = payloadMono;
this.payloadFlux = null;
@ -206,10 +203,9 @@ final class DefaultRSocketRequester implements RSocketRequester { @@ -206,10 +203,9 @@ final class DefaultRSocketRequester implements RSocketRequester {
this.payloadFlux = payloadFlux;
}
@Override
public Mono<Void> send() {
Assert.notNull(this.payloadMono, "No RSocket interaction model for one-way send with Flux.");
Assert.state(this.payloadMono != null, "No RSocket interaction model for one-way send with Flux");
return this.payloadMono.flatMap(rsocket::fireAndForget);
}
@ -235,9 +231,7 @@ final class DefaultRSocketRequester implements RSocketRequester { @@ -235,9 +231,7 @@ final class DefaultRSocketRequester implements RSocketRequester {
@SuppressWarnings("unchecked")
private <T> Mono<T> retrieveMono(ResolvableType elementType) {
Assert.notNull(this.payloadMono,
"No RSocket interaction model for Flux request to Mono response.");
Assert.notNull(this.payloadMono, "No RSocket interaction model for Flux request to Mono response.");
Mono<Payload> payloadMono = this.payloadMono.flatMap(rsocket::requestResponse);
if (isVoid(elementType)) {
@ -251,7 +245,6 @@ final class DefaultRSocketRequester implements RSocketRequester { @@ -251,7 +245,6 @@ final class DefaultRSocketRequester implements RSocketRequester {
@SuppressWarnings("unchecked")
private <T> Flux<T> retrieveFlux(ResolvableType elementType) {
Flux<Payload> payloadFlux = this.payloadMono != null ?
this.payloadMono.flatMapMany(rsocket::requestStream) :
rsocket.requestChannel(this.payloadFlux);
@ -261,7 +254,6 @@ final class DefaultRSocketRequester implements RSocketRequester { @@ -261,7 +254,6 @@ final class DefaultRSocketRequester implements RSocketRequester {
}
Decoder<?> decoder = strategies.decoder(elementType, dataMimeType);
return payloadFlux.map(this::retainDataAndReleasePayload).map(dataBuffer ->
(T) decoder.decode(dataBuffer, elementType, dataMimeType, EMPTY_HINTS));
}

4
spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java

@ -27,7 +27,6 @@ import io.rsocket.transport.netty.client.TcpClientTransport; @@ -27,7 +27,6 @@ import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import reactor.core.publisher.Mono;
import org.springframework.lang.Nullable;
import org.springframework.util.MimeType;
/**
@ -38,12 +37,11 @@ import org.springframework.util.MimeType; @@ -38,12 +37,11 @@ import org.springframework.util.MimeType;
*/
final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
@Nullable
private List<Consumer<RSocketFactory.ClientRSocketFactory>> factoryConfigurers = new ArrayList<>();
@Nullable
private List<Consumer<RSocketStrategies.Builder>> strategiesConfigurers = new ArrayList<>();
@Override
public RSocketRequester.Builder rsocketFactory(Consumer<RSocketFactory.ClientRSocketFactory> configurer) {
this.factoryConfigurers.add(configurer);

5
spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketStrategies.java

@ -47,8 +47,7 @@ final class DefaultRSocketStrategies implements RSocketStrategies { @@ -47,8 +47,7 @@ final class DefaultRSocketStrategies implements RSocketStrategies {
private final DataBufferFactory bufferFactory;
private DefaultRSocketStrategies(
List<Encoder<?>> encoders, List<Decoder<?>> decoders,
private DefaultRSocketStrategies(List<Encoder<?>> encoders, List<Decoder<?>> decoders,
ReactiveAdapterRegistry adapterRegistry, DataBufferFactory bufferFactory) {
this.encoders = Collections.unmodifiableList(encoders);
@ -93,7 +92,6 @@ final class DefaultRSocketStrategies implements RSocketStrategies { @@ -93,7 +92,6 @@ final class DefaultRSocketStrategies implements RSocketStrategies {
@Nullable
private DataBufferFactory dataBufferFactory;
public DefaultRSocketStrategiesBuilder() {
}
@ -104,7 +102,6 @@ final class DefaultRSocketStrategies implements RSocketStrategies { @@ -104,7 +102,6 @@ final class DefaultRSocketStrategies implements RSocketStrategies {
this.dataBufferFactory = other.dataBufferFactory();
}
@Override
public Builder encoder(Encoder<?>... encoders) {
this.encoders.addAll(Arrays.asList(encoders));

4
spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessageHandlerAcceptor.java

@ -59,6 +59,7 @@ public final class MessageHandlerAcceptor extends RSocketMessageHandler @@ -59,6 +59,7 @@ public final class MessageHandlerAcceptor extends RSocketMessageHandler
@Override
public Mono<RSocket> accept(ConnectionSetupPayload setupPayload, RSocket sendingRSocket) {
MessagingRSocket rsocket = createRSocket(sendingRSocket);
// Allow handling of the ConnectionSetupPayload via @MessageMapping methods.
// However, if the handling is to make requests to the client, it's expected
// it will do so decoupled from the handling, e.g. via .subscribe().
@ -71,8 +72,7 @@ public final class MessageHandlerAcceptor extends RSocketMessageHandler @@ -71,8 +72,7 @@ public final class MessageHandlerAcceptor extends RSocketMessageHandler
}
private MessagingRSocket createRSocket(RSocket rsocket) {
return new MessagingRSocket(
this::handleMessage, rsocket, this.defaultDataMimeType, getRSocketStrategies());
return new MessagingRSocket(this::handleMessage, rsocket, this.defaultDataMimeType, getRSocketStrategies());
}
}

29
spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java

@ -42,12 +42,24 @@ import org.springframework.util.MimeType; @@ -42,12 +42,24 @@ import org.springframework.util.MimeType;
*/
public interface RSocketRequester {
/**
* Return the underlying RSocket used to make requests.
*/
RSocket rsocket();
// For now we treat metadata as a simple string that is the route.
// This will change after the resolution of:
// https://github.com/rsocket/rsocket-java/issues/568
/**
* Entry point to prepare a new request to the given route.
* <p>For requestChannel interactions, i.e. Flux-to-Flux the metadata is
* attached to the first request payload.
* @param route the routing destination
* @return a spec for further defining and executing the reuqest
*/
RequestSpec route(String route);
/**
* Create a new {@code RSocketRequester} from the given {@link RSocket} and
@ -68,20 +80,6 @@ public interface RSocketRequester { @@ -68,20 +80,6 @@ public interface RSocketRequester {
return new DefaultRSocketRequesterBuilder();
}
// For now we treat metadata as a simple string that is the route.
// This will change after the resolution of:
// https://github.com/rsocket/rsocket-java/issues/568
/**
* Entry point to prepare a new request to the given route.
*
* <p>For requestChannel interactions, i.e. Flux-to-Flux the metadata is
* attached to the first request payload.
*
* @param route the routing destination
* @return a spec for further defining and executing the reuqest
*/
RequestSpec route(String route);
/**
* A mutable builder for creating a client {@link RSocketRequester}.
@ -129,7 +127,6 @@ public interface RSocketRequester { @@ -129,7 +127,6 @@ public interface RSocketRequester {
* @return a mono containing the connected {@code RSocketRequester}
*/
Mono<RSocketRequester> connectWebSocket(URI uri, MimeType dataMimeType);
}

Loading…
Cancel
Save