diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java
index 3243f6493db..c068324d284 100644
--- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java
+++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java
@@ -75,7 +75,7 @@ public class ReactorNettyTcpConnection
implements TcpConnection
{
@Override
public void close() {
- // Ignore result: can't overflow, ok if not first or no one listens
+ // Ignore result: concurrent attempts to complete are ok
this.completionSink.tryEmitEmpty();
}
diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java
index 4c1cc61766c..3bf659f20c7 100644
--- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java
+++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java
@@ -200,10 +200,12 @@ class RSocketServerToClientIntegrationTests {
private void runTest(Runnable testEcho) {
Mono.fromRunnable(testEcho)
- .doOnError(ex -> resultSink.emitError(ex, Sinks.EmitFailureHandler.FAIL_FAST))
- .doOnSuccess(o -> resultSink.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST))
.subscribeOn(Schedulers.boundedElastic()) // StepVerifier will block
- .subscribe();
+ .subscribe(
+ aVoid -> {},
+ ex -> resultSink.tryEmitError(ex), // Ignore result: signals cannot compete
+ () -> resultSink.tryEmitEmpty()
+ );
}
@MessageMapping("fnf")
@@ -218,7 +220,7 @@ class RSocketServerToClientIntegrationTests {
@MessageMapping("receive")
void receive(String payload) {
- this.fireForgetPayloads.tryEmitNext(payload);
+ this.fireForgetPayloads.emitNext(payload, Sinks.EmitFailureHandler.FAIL_FAST);
}
@MessageMapping("echo")
diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java
index 7aa9b218937..3768bad4cd2 100644
--- a/spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java
+++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java
@@ -341,8 +341,8 @@ public class SimpAnnotationMethodMessageHandlerTests {
Message> message = createMessage("/app1/mono");
this.messageHandler.handleMessage(message);
- assertThat(controller.oneSink).isNotNull();
- controller.oneSink.emitValue("foo", Sinks.EmitFailureHandler.FAIL_FAST);
+ assertThat(controller.sinkOne).isNotNull();
+ controller.sinkOne.emitValue("foo", Sinks.EmitFailureHandler.FAIL_FAST);
verify(this.converter).toMessage(this.payloadCaptor.capture(), any(MessageHeaders.class));
assertThat(this.payloadCaptor.getValue()).isEqualTo("foo");
}
@@ -356,7 +356,7 @@ public class SimpAnnotationMethodMessageHandlerTests {
Message> message = createMessage("/app1/mono");
this.messageHandler.handleMessage(message);
- controller.oneSink.emitError(new IllegalStateException(), Sinks.EmitFailureHandler.FAIL_FAST);
+ controller.sinkOne.emitError(new IllegalStateException(), Sinks.EmitFailureHandler.FAIL_FAST);
assertThat(controller.exceptionCaught).isTrue();
}
@@ -369,8 +369,8 @@ public class SimpAnnotationMethodMessageHandlerTests {
Message> message = createMessage("/app1/flux");
this.messageHandler.handleMessage(message);
- assertThat(controller.manySink).isNotNull();
- controller.manySink.tryEmitNext("foo");
+ assertThat(controller.sinkMany).isNotNull();
+ controller.sinkMany.tryEmitNext("foo");
verify(this.converter, never()).toMessage(any(), any(MessageHeaders.class));
}
@@ -584,22 +584,22 @@ public class SimpAnnotationMethodMessageHandlerTests {
@Controller
private static class ReactiveController {
- private Sinks.One oneSink;
+ private Sinks.One sinkOne;
- private Sinks.Many manySink;
+ private Sinks.Many sinkMany;
private boolean exceptionCaught = false;
@MessageMapping("mono")
public Mono handleMono() {
- this.oneSink = Sinks.one();
- return this.oneSink.asMono();
+ this.sinkOne = Sinks.one();
+ return this.sinkOne.asMono();
}
@MessageMapping("flux")
public Flux handleFlux() {
- this.manySink = Sinks.many().unicast().onBackpressureBuffer();
- return this.manySink.asFlux();
+ this.sinkMany = Sinks.many().unicast().onBackpressureBuffer();
+ return this.sinkMany.asFlux();
}
@MessageExceptionHandler(IllegalStateException.class)
diff --git a/spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java b/spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java
index 6a2c588d0af..0c5dc89594b 100644
--- a/spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java
+++ b/spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java
@@ -65,11 +65,8 @@ public class MockServerHttpResponse extends AbstractServerHttpResponse {
this.writeHandler = body -> {
// Avoid .then() that causes data buffers to be discarded and released
Sinks.Empty completion = Sinks.unsafe().empty();
- this.body = body
- .doOnComplete(completion::tryEmitEmpty) // Ignore error: cached + serialized
- .doOnError(completion::tryEmitError)
- .cache();
- this.body.subscribe();
+ this.body = body.cache();
+ this.body.subscribe(aVoid -> {}, completion::tryEmitError, completion::tryEmitEmpty); // Signals are serialized
return completion.asMono();
};
}
diff --git a/spring-test/src/main/java/org/springframework/test/web/reactive/server/HttpHandlerConnector.java b/spring-test/src/main/java/org/springframework/test/web/reactive/server/HttpHandlerConnector.java
index 23b05bcb27d..2db4fdc7d94 100644
--- a/spring-test/src/main/java/org/springframework/test/web/reactive/server/HttpHandlerConnector.java
+++ b/spring-test/src/main/java/org/springframework/test/web/reactive/server/HttpHandlerConnector.java
@@ -83,8 +83,9 @@ public class HttpHandlerConnector implements ClientHttpConnector {
private Mono doConnect(
HttpMethod httpMethod, URI uri, Function super ClientHttpRequest, Mono> requestCallback) {
- Sinks.Empty requestWriteCompletion = Sinks.empty();
- Sinks.Empty handlerCompletion = Sinks.empty();
+ // unsafe(): we're intercepting, already serialized Publisher signals
+ Sinks.Empty requestWriteSink = Sinks.unsafe().empty();
+ Sinks.Empty handlerSink = Sinks.unsafe().empty();
ClientHttpResponse[] savedResponse = new ClientHttpResponse[1];
MockClientHttpRequest mockClientRequest = new MockClientHttpRequest(httpMethod, uri);
@@ -96,8 +97,8 @@ public class HttpHandlerConnector implements ClientHttpConnector {
ServerHttpResponse responseToUse = prepareResponse(mockServerResponse, mockServerRequest);
this.handler.handle(mockServerRequest, responseToUse).subscribe(
aVoid -> {},
- handlerCompletion::tryEmitError, // Ignore error: cached + serialized
- handlerCompletion::tryEmitEmpty);
+ handlerSink::tryEmitError, // Ignore result: signals cannot compete
+ handlerSink::tryEmitEmpty);
return Mono.empty();
});
@@ -110,10 +111,10 @@ public class HttpHandlerConnector implements ClientHttpConnector {
log("Writing client request for ", httpMethod, uri);
requestCallback.apply(mockClientRequest).subscribe(
aVoid -> {},
- requestWriteCompletion::tryEmitError, // Ignore error: cached + serialized
- requestWriteCompletion::tryEmitEmpty);
+ requestWriteSink::tryEmitError, // Ignore result: signals cannot compete
+ requestWriteSink::tryEmitEmpty);
- return Mono.when(requestWriteCompletion.asMono(), handlerCompletion.asMono())
+ return Mono.when(requestWriteSink.asMono(), handlerSink.asMono())
.onErrorMap(ex -> {
ClientHttpResponse response = savedResponse[0];
return response != null ? new FailureAfterResponseCompletedException(response, ex) : ex;
diff --git a/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java b/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java
index a492c6a03ec..bff52e24105 100644
--- a/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java
+++ b/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java
@@ -168,7 +168,6 @@ class WiretapConnector implements ClientHttpConnector {
.doOnComplete(this::handleOnComplete) : null;
if (publisher == null && publisherNested == null) {
- // Ignore result: OK or not relevant
this.content.tryEmitEmpty();
}
}
@@ -206,14 +205,14 @@ class WiretapConnector implements ClientHttpConnector {
private void handleOnError(Throwable ex) {
- // Ignore result: OK or not relevant
+ // Ignore result: signals cannot compete
this.content.tryEmitError(ex);
}
private void handleOnComplete() {
byte[] bytes = new byte[this.buffer.readableByteCount()];
this.buffer.read(bytes);
- // Ignore result: OK or not relevant
+ // Ignore result: signals cannot compete
this.content.tryEmitValue(bytes);
}
}
diff --git a/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/MockServerHttpResponse.java b/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/MockServerHttpResponse.java
index 6d6866fdb34..d7b1f79a8c8 100644
--- a/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/MockServerHttpResponse.java
+++ b/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/MockServerHttpResponse.java
@@ -65,11 +65,8 @@ public class MockServerHttpResponse extends AbstractServerHttpResponse {
this.writeHandler = body -> {
// Avoid .then() that causes data buffers to be discarded and released
Sinks.Empty completion = Sinks.unsafe().empty();
- this.body = body
- .doOnComplete(completion::tryEmitEmpty) // Ignore error: cached + serialized
- .doOnError(completion::tryEmitError)
- .cache();
- this.body.subscribe();
+ this.body = body.cache();
+ this.body.subscribe(aVoid -> {}, completion::tryEmitError, completion::tryEmitEmpty); // Signals are serialized
return completion.asMono();
};
}
diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ErrorsMethodArgumentResolver.java b/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ErrorsMethodArgumentResolver.java
index da756a45b8b..28abd49dfd9 100644
--- a/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ErrorsMethodArgumentResolver.java
+++ b/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ErrorsMethodArgumentResolver.java
@@ -57,9 +57,10 @@ public class ErrorsMethodArgumentResolver extends HandlerMethodArgumentResolverS
Object errors = getErrors(parameter, context);
- // Initially Errors/BindingResult is a Mono in the model even if it cannot be declared
- // as an async argument. That way it can be resolved first while the Mono can complete
- // later at which point the model is also updated for further use.
+ // Initially ModelAttributeMethodArgumentResolver adds Errors/BindingResult as a
+ // Mono in the model even if it can't be declared as such on a controller method.
+ // This is done to enable early argument resolution here. When the Mono actually
+ // completes it is replaced in the model with the actual value.
if (Mono.class.isAssignableFrom(errors.getClass())) {
return ((Mono>) errors).cast(Object.class);
diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ModelAttributeMethodArgumentResolver.java b/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ModelAttributeMethodArgumentResolver.java
index ce9d92bd050..645ae8e19e4 100644
--- a/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ModelAttributeMethodArgumentResolver.java
+++ b/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ModelAttributeMethodArgumentResolver.java
@@ -119,13 +119,13 @@ public class ModelAttributeMethodArgumentResolver extends HandlerMethodArgumentR
return valueMono.flatMap(value -> {
WebExchangeDataBinder binder = context.createDataBinder(exchange, value, name);
return bindRequestParameters(binder, exchange)
- .doOnError(ex -> bindingResultSink.emitError(ex, Sinks.EmitFailureHandler.FAIL_FAST))
+ .doOnError(bindingResultSink::tryEmitError)
.doOnSuccess(aVoid -> {
validateIfApplicable(binder, parameter);
BindingResult bindingResult = binder.getBindingResult();
model.put(BindingResult.MODEL_KEY_PREFIX + name, bindingResult);
model.put(name, value);
- // serialized and buffered (should never fail)
+ // Ignore result: serialized and buffered (should never fail)
bindingResultSink.tryEmitValue(bindingResult);
})
.then(Mono.fromCallable(() -> {
diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java
index 48e78ccedcd..d06f87533ba 100644
--- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java
+++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java
@@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import javax.websocket.Session;
import org.apache.tomcat.websocket.WsSession;
+import reactor.core.publisher.Sinks;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.web.reactive.socket.HandshakeInfo;
@@ -46,7 +47,14 @@ public class TomcatWebSocketSession extends StandardWebSocketSession {
super(session, info, factory);
}
- @SuppressWarnings("deprecation")
+ public TomcatWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory,
+ Sinks.Empty completionSink) {
+
+ super(session, info, factory, completionSink);
+ suspendReceiving();
+ }
+
+ @Deprecated
public TomcatWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory,
reactor.core.publisher.MonoProcessor completionMono) {
diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/StandardWebSocketClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/StandardWebSocketClient.java
index 7d26e096bad..0ce69227898 100644
--- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/StandardWebSocketClient.java
+++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/StandardWebSocketClient.java
@@ -112,10 +112,10 @@ public class StandardWebSocketClient implements WebSocketClient {
}
private StandardWebSocketHandlerAdapter createEndpoint(URI url, WebSocketHandler handler,
- Sinks.Empty completion, DefaultConfigurator configurator) {
+ Sinks.Empty completionSink, DefaultConfigurator configurator) {
return new StandardWebSocketHandlerAdapter(handler, session ->
- createWebSocketSession(session, createHandshakeInfo(url, configurator), completion));
+ createWebSocketSession(session, createHandshakeInfo(url, configurator), completionSink));
}
private HandshakeInfo createHandshakeInfo(URI url, DefaultConfigurator configurator) {
@@ -124,21 +124,13 @@ public class StandardWebSocketClient implements WebSocketClient {
return new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol);
}
- protected StandardWebSocketSession createWebSocketSession(Session session, HandshakeInfo info,
- Sinks.Empty completionSink) {
+ protected StandardWebSocketSession createWebSocketSession(
+ Session session, HandshakeInfo info, Sinks.Empty completionSink) {
return new StandardWebSocketSession(
session, info, DefaultDataBufferFactory.sharedInstance, completionSink);
}
- @Deprecated
- protected StandardWebSocketSession createWebSocketSession(Session session, HandshakeInfo info,
- reactor.core.publisher.MonoProcessor completionMono) {
-
- return new StandardWebSocketSession(
- session, info, DefaultDataBufferFactory.sharedInstance, completionMono);
- }
-
private ClientEndpointConfig createEndpointConfig(Configurator configurator, List subProtocols) {
return ClientEndpointConfig.Builder.create()
.configurator(configurator)
diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/TomcatWebSocketClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/TomcatWebSocketClient.java
index 10e60cd2de8..8310268db4a 100644
--- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/TomcatWebSocketClient.java
+++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/TomcatWebSocketClient.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2018 the original author or authors.
+ * Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@ import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import org.apache.tomcat.websocket.WsWebSocketContainer;
+import reactor.core.publisher.Sinks;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.adapter.StandardWebSocketSession;
@@ -44,11 +45,10 @@ public class TomcatWebSocketClient extends StandardWebSocketClient {
@Override
- @SuppressWarnings("deprecation")
- protected StandardWebSocketSession createWebSocketSession(Session session,
- HandshakeInfo info, reactor.core.publisher.MonoProcessor completionMono) {
+ protected StandardWebSocketSession createWebSocketSession(
+ Session session, HandshakeInfo info, Sinks.Empty completionSink) {
- return new TomcatWebSocketSession(session, info, bufferFactory(), completionMono);
+ return new TomcatWebSocketSession(session, info, bufferFactory(), completionSink);
}
}