From 5b1b20c8c0f3d763a7fc7ea934d6b261ee8f82ae Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Mon, 12 Oct 2020 11:32:48 +0100 Subject: [PATCH] Polishing and minor refactoring See gh-25884 --- .../reactor/ReactorNettyTcpConnection.java | 2 +- ...RSocketServerToClientIntegrationTests.java | 10 +++++---- ...mpAnnotationMethodMessageHandlerTests.java | 22 +++++++++---------- .../reactive/MockServerHttpResponse.java | 7 ++---- .../reactive/server/HttpHandlerConnector.java | 15 +++++++------ .../web/reactive/server/WiretapConnector.java | 5 ++--- .../reactive/MockServerHttpResponse.java | 7 ++---- .../ErrorsMethodArgumentResolver.java | 7 +++--- .../ModelAttributeMethodArgumentResolver.java | 4 ++-- .../adapter/TomcatWebSocketSession.java | 10 ++++++++- .../client/StandardWebSocketClient.java | 16 ++++---------- .../socket/client/TomcatWebSocketClient.java | 10 ++++----- 12 files changed, 56 insertions(+), 59 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java index 3243f6493db..c068324d284 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java @@ -75,7 +75,7 @@ public class ReactorNettyTcpConnection

implements TcpConnection

{ @Override public void close() { - // Ignore result: can't overflow, ok if not first or no one listens + // Ignore result: concurrent attempts to complete are ok this.completionSink.tryEmitEmpty(); } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java index 4c1cc61766c..3bf659f20c7 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java @@ -200,10 +200,12 @@ class RSocketServerToClientIntegrationTests { private void runTest(Runnable testEcho) { Mono.fromRunnable(testEcho) - .doOnError(ex -> resultSink.emitError(ex, Sinks.EmitFailureHandler.FAIL_FAST)) - .doOnSuccess(o -> resultSink.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST)) .subscribeOn(Schedulers.boundedElastic()) // StepVerifier will block - .subscribe(); + .subscribe( + aVoid -> {}, + ex -> resultSink.tryEmitError(ex), // Ignore result: signals cannot compete + () -> resultSink.tryEmitEmpty() + ); } @MessageMapping("fnf") @@ -218,7 +220,7 @@ class RSocketServerToClientIntegrationTests { @MessageMapping("receive") void receive(String payload) { - this.fireForgetPayloads.tryEmitNext(payload); + this.fireForgetPayloads.emitNext(payload, Sinks.EmitFailureHandler.FAIL_FAST); } @MessageMapping("echo") diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java index 7aa9b218937..3768bad4cd2 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java @@ -341,8 +341,8 @@ public class SimpAnnotationMethodMessageHandlerTests { Message message = createMessage("/app1/mono"); this.messageHandler.handleMessage(message); - assertThat(controller.oneSink).isNotNull(); - controller.oneSink.emitValue("foo", Sinks.EmitFailureHandler.FAIL_FAST); + assertThat(controller.sinkOne).isNotNull(); + controller.sinkOne.emitValue("foo", Sinks.EmitFailureHandler.FAIL_FAST); verify(this.converter).toMessage(this.payloadCaptor.capture(), any(MessageHeaders.class)); assertThat(this.payloadCaptor.getValue()).isEqualTo("foo"); } @@ -356,7 +356,7 @@ public class SimpAnnotationMethodMessageHandlerTests { Message message = createMessage("/app1/mono"); this.messageHandler.handleMessage(message); - controller.oneSink.emitError(new IllegalStateException(), Sinks.EmitFailureHandler.FAIL_FAST); + controller.sinkOne.emitError(new IllegalStateException(), Sinks.EmitFailureHandler.FAIL_FAST); assertThat(controller.exceptionCaught).isTrue(); } @@ -369,8 +369,8 @@ public class SimpAnnotationMethodMessageHandlerTests { Message message = createMessage("/app1/flux"); this.messageHandler.handleMessage(message); - assertThat(controller.manySink).isNotNull(); - controller.manySink.tryEmitNext("foo"); + assertThat(controller.sinkMany).isNotNull(); + controller.sinkMany.tryEmitNext("foo"); verify(this.converter, never()).toMessage(any(), any(MessageHeaders.class)); } @@ -584,22 +584,22 @@ public class SimpAnnotationMethodMessageHandlerTests { @Controller private static class ReactiveController { - private Sinks.One oneSink; + private Sinks.One sinkOne; - private Sinks.Many manySink; + private Sinks.Many sinkMany; private boolean exceptionCaught = false; @MessageMapping("mono") public Mono handleMono() { - this.oneSink = Sinks.one(); - return this.oneSink.asMono(); + this.sinkOne = Sinks.one(); + return this.sinkOne.asMono(); } @MessageMapping("flux") public Flux handleFlux() { - this.manySink = Sinks.many().unicast().onBackpressureBuffer(); - return this.manySink.asFlux(); + this.sinkMany = Sinks.many().unicast().onBackpressureBuffer(); + return this.sinkMany.asFlux(); } @MessageExceptionHandler(IllegalStateException.class) diff --git a/spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java b/spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java index 6a2c588d0af..0c5dc89594b 100644 --- a/spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java +++ b/spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java @@ -65,11 +65,8 @@ public class MockServerHttpResponse extends AbstractServerHttpResponse { this.writeHandler = body -> { // Avoid .then() that causes data buffers to be discarded and released Sinks.Empty completion = Sinks.unsafe().empty(); - this.body = body - .doOnComplete(completion::tryEmitEmpty) // Ignore error: cached + serialized - .doOnError(completion::tryEmitError) - .cache(); - this.body.subscribe(); + this.body = body.cache(); + this.body.subscribe(aVoid -> {}, completion::tryEmitError, completion::tryEmitEmpty); // Signals are serialized return completion.asMono(); }; } diff --git a/spring-test/src/main/java/org/springframework/test/web/reactive/server/HttpHandlerConnector.java b/spring-test/src/main/java/org/springframework/test/web/reactive/server/HttpHandlerConnector.java index 23b05bcb27d..2db4fdc7d94 100644 --- a/spring-test/src/main/java/org/springframework/test/web/reactive/server/HttpHandlerConnector.java +++ b/spring-test/src/main/java/org/springframework/test/web/reactive/server/HttpHandlerConnector.java @@ -83,8 +83,9 @@ public class HttpHandlerConnector implements ClientHttpConnector { private Mono doConnect( HttpMethod httpMethod, URI uri, Function> requestCallback) { - Sinks.Empty requestWriteCompletion = Sinks.empty(); - Sinks.Empty handlerCompletion = Sinks.empty(); + // unsafe(): we're intercepting, already serialized Publisher signals + Sinks.Empty requestWriteSink = Sinks.unsafe().empty(); + Sinks.Empty handlerSink = Sinks.unsafe().empty(); ClientHttpResponse[] savedResponse = new ClientHttpResponse[1]; MockClientHttpRequest mockClientRequest = new MockClientHttpRequest(httpMethod, uri); @@ -96,8 +97,8 @@ public class HttpHandlerConnector implements ClientHttpConnector { ServerHttpResponse responseToUse = prepareResponse(mockServerResponse, mockServerRequest); this.handler.handle(mockServerRequest, responseToUse).subscribe( aVoid -> {}, - handlerCompletion::tryEmitError, // Ignore error: cached + serialized - handlerCompletion::tryEmitEmpty); + handlerSink::tryEmitError, // Ignore result: signals cannot compete + handlerSink::tryEmitEmpty); return Mono.empty(); }); @@ -110,10 +111,10 @@ public class HttpHandlerConnector implements ClientHttpConnector { log("Writing client request for ", httpMethod, uri); requestCallback.apply(mockClientRequest).subscribe( aVoid -> {}, - requestWriteCompletion::tryEmitError, // Ignore error: cached + serialized - requestWriteCompletion::tryEmitEmpty); + requestWriteSink::tryEmitError, // Ignore result: signals cannot compete + requestWriteSink::tryEmitEmpty); - return Mono.when(requestWriteCompletion.asMono(), handlerCompletion.asMono()) + return Mono.when(requestWriteSink.asMono(), handlerSink.asMono()) .onErrorMap(ex -> { ClientHttpResponse response = savedResponse[0]; return response != null ? new FailureAfterResponseCompletedException(response, ex) : ex; diff --git a/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java b/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java index a492c6a03ec..bff52e24105 100644 --- a/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java +++ b/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java @@ -168,7 +168,6 @@ class WiretapConnector implements ClientHttpConnector { .doOnComplete(this::handleOnComplete) : null; if (publisher == null && publisherNested == null) { - // Ignore result: OK or not relevant this.content.tryEmitEmpty(); } } @@ -206,14 +205,14 @@ class WiretapConnector implements ClientHttpConnector { private void handleOnError(Throwable ex) { - // Ignore result: OK or not relevant + // Ignore result: signals cannot compete this.content.tryEmitError(ex); } private void handleOnComplete() { byte[] bytes = new byte[this.buffer.readableByteCount()]; this.buffer.read(bytes); - // Ignore result: OK or not relevant + // Ignore result: signals cannot compete this.content.tryEmitValue(bytes); } } diff --git a/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/MockServerHttpResponse.java b/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/MockServerHttpResponse.java index 6d6866fdb34..d7b1f79a8c8 100644 --- a/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/MockServerHttpResponse.java +++ b/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/MockServerHttpResponse.java @@ -65,11 +65,8 @@ public class MockServerHttpResponse extends AbstractServerHttpResponse { this.writeHandler = body -> { // Avoid .then() that causes data buffers to be discarded and released Sinks.Empty completion = Sinks.unsafe().empty(); - this.body = body - .doOnComplete(completion::tryEmitEmpty) // Ignore error: cached + serialized - .doOnError(completion::tryEmitError) - .cache(); - this.body.subscribe(); + this.body = body.cache(); + this.body.subscribe(aVoid -> {}, completion::tryEmitError, completion::tryEmitEmpty); // Signals are serialized return completion.asMono(); }; } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ErrorsMethodArgumentResolver.java b/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ErrorsMethodArgumentResolver.java index da756a45b8b..28abd49dfd9 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ErrorsMethodArgumentResolver.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ErrorsMethodArgumentResolver.java @@ -57,9 +57,10 @@ public class ErrorsMethodArgumentResolver extends HandlerMethodArgumentResolverS Object errors = getErrors(parameter, context); - // Initially Errors/BindingResult is a Mono in the model even if it cannot be declared - // as an async argument. That way it can be resolved first while the Mono can complete - // later at which point the model is also updated for further use. + // Initially ModelAttributeMethodArgumentResolver adds Errors/BindingResult as a + // Mono in the model even if it can't be declared as such on a controller method. + // This is done to enable early argument resolution here. When the Mono actually + // completes it is replaced in the model with the actual value. if (Mono.class.isAssignableFrom(errors.getClass())) { return ((Mono) errors).cast(Object.class); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ModelAttributeMethodArgumentResolver.java b/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ModelAttributeMethodArgumentResolver.java index ce9d92bd050..645ae8e19e4 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ModelAttributeMethodArgumentResolver.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ModelAttributeMethodArgumentResolver.java @@ -119,13 +119,13 @@ public class ModelAttributeMethodArgumentResolver extends HandlerMethodArgumentR return valueMono.flatMap(value -> { WebExchangeDataBinder binder = context.createDataBinder(exchange, value, name); return bindRequestParameters(binder, exchange) - .doOnError(ex -> bindingResultSink.emitError(ex, Sinks.EmitFailureHandler.FAIL_FAST)) + .doOnError(bindingResultSink::tryEmitError) .doOnSuccess(aVoid -> { validateIfApplicable(binder, parameter); BindingResult bindingResult = binder.getBindingResult(); model.put(BindingResult.MODEL_KEY_PREFIX + name, bindingResult); model.put(name, value); - // serialized and buffered (should never fail) + // Ignore result: serialized and buffered (should never fail) bindingResultSink.tryEmitValue(bindingResult); }) .then(Mono.fromCallable(() -> { diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java index 48e78ccedcd..d06f87533ba 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java @@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import javax.websocket.Session; import org.apache.tomcat.websocket.WsSession; +import reactor.core.publisher.Sinks; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.web.reactive.socket.HandshakeInfo; @@ -46,7 +47,14 @@ public class TomcatWebSocketSession extends StandardWebSocketSession { super(session, info, factory); } - @SuppressWarnings("deprecation") + public TomcatWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory, + Sinks.Empty completionSink) { + + super(session, info, factory, completionSink); + suspendReceiving(); + } + + @Deprecated public TomcatWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory, reactor.core.publisher.MonoProcessor completionMono) { diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/StandardWebSocketClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/StandardWebSocketClient.java index 7d26e096bad..0ce69227898 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/StandardWebSocketClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/StandardWebSocketClient.java @@ -112,10 +112,10 @@ public class StandardWebSocketClient implements WebSocketClient { } private StandardWebSocketHandlerAdapter createEndpoint(URI url, WebSocketHandler handler, - Sinks.Empty completion, DefaultConfigurator configurator) { + Sinks.Empty completionSink, DefaultConfigurator configurator) { return new StandardWebSocketHandlerAdapter(handler, session -> - createWebSocketSession(session, createHandshakeInfo(url, configurator), completion)); + createWebSocketSession(session, createHandshakeInfo(url, configurator), completionSink)); } private HandshakeInfo createHandshakeInfo(URI url, DefaultConfigurator configurator) { @@ -124,21 +124,13 @@ public class StandardWebSocketClient implements WebSocketClient { return new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol); } - protected StandardWebSocketSession createWebSocketSession(Session session, HandshakeInfo info, - Sinks.Empty completionSink) { + protected StandardWebSocketSession createWebSocketSession( + Session session, HandshakeInfo info, Sinks.Empty completionSink) { return new StandardWebSocketSession( session, info, DefaultDataBufferFactory.sharedInstance, completionSink); } - @Deprecated - protected StandardWebSocketSession createWebSocketSession(Session session, HandshakeInfo info, - reactor.core.publisher.MonoProcessor completionMono) { - - return new StandardWebSocketSession( - session, info, DefaultDataBufferFactory.sharedInstance, completionMono); - } - private ClientEndpointConfig createEndpointConfig(Configurator configurator, List subProtocols) { return ClientEndpointConfig.Builder.create() .configurator(configurator) diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/TomcatWebSocketClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/TomcatWebSocketClient.java index 10e60cd2de8..8310268db4a 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/TomcatWebSocketClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/TomcatWebSocketClient.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import javax.websocket.Session; import javax.websocket.WebSocketContainer; import org.apache.tomcat.websocket.WsWebSocketContainer; +import reactor.core.publisher.Sinks; import org.springframework.web.reactive.socket.HandshakeInfo; import org.springframework.web.reactive.socket.adapter.StandardWebSocketSession; @@ -44,11 +45,10 @@ public class TomcatWebSocketClient extends StandardWebSocketClient { @Override - @SuppressWarnings("deprecation") - protected StandardWebSocketSession createWebSocketSession(Session session, - HandshakeInfo info, reactor.core.publisher.MonoProcessor completionMono) { + protected StandardWebSocketSession createWebSocketSession( + Session session, HandshakeInfo info, Sinks.Empty completionSink) { - return new TomcatWebSocketSession(session, info, bufferFactory(), completionMono); + return new TomcatWebSocketSession(session, info, bufferFactory(), completionSink); } }