diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/ChannelSendOperator.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/ChannelSendOperator.java index 108d1492990..ddcf7e709c2 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/ChannelSendOperator.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/ChannelSendOperator.java @@ -281,6 +281,10 @@ class ChannelSendOperator extends Mono implements Scannable { return; } synchronized (this) { + if (this.state == State.READY_TO_WRITE) { + s.request(n); + return; + } if (this.writeSubscriber != null) { if (this.state == State.EMITTING_CACHED_SIGNALS) { this.demandBeforeReadyToWrite = n; diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java index da46443edc2..6b5e4095955 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java @@ -273,6 +273,10 @@ public class ChannelSendOperator extends Mono implements Scannable { return; } synchronized (this) { + if (this.state == State.READY_TO_WRITE) { + s.request(n); + return; + } if (this.writeSubscriber != null) { if (this.state == State.EMITTING_CACHED_SIGNALS) { this.demandBeforeReadyToWrite = n;