From 752a0199d6eedba9f5a8d5c6729e5ef92b128a40 Mon Sep 17 00:00:00 2001 From: l2yuPa Date: Fri, 20 Feb 2026 22:29:47 +0900 Subject: [PATCH] Avoid invoking writeFunction under lock in ChannelSendOperator Signed-off-by: l2yuPa --- .../reactive/ChannelSendOperator.java | 55 ++++++++++++------- .../server/reactive/ChannelSendOperator.java | 55 ++++++++++++------- 2 files changed, 72 insertions(+), 38 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/ChannelSendOperator.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/ChannelSendOperator.java index 36d1c4e7c11..627b3276c6e 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/ChannelSendOperator.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/ChannelSendOperator.java @@ -176,7 +176,9 @@ class ChannelSendOperator extends Mono implements Scannable { requiredWriteSubscriber().onNext(item); return; } - // FIXME revisit in case of reentrant sync deadlock + + boolean invokeWriteFunction = false; + synchronized (this) { if (this.state == State.READY_TO_WRITE) { requiredWriteSubscriber().onNext(item); @@ -184,15 +186,7 @@ class ChannelSendOperator extends Mono implements Scannable { else if (this.state == State.NEW) { this.item = item; this.state = State.FIRST_SIGNAL_RECEIVED; - Publisher result; - try { - result = writeFunction.apply(this); - } - catch (Throwable ex) { - this.writeCompletionBarrier.onError(ex); - return; - } - result.subscribe(this.writeCompletionBarrier); + invokeWriteFunction = true; } else { if (this.subscription != null) { @@ -201,6 +195,20 @@ class ChannelSendOperator extends Mono implements Scannable { this.writeCompletionBarrier.onError(new IllegalStateException("Unexpected item.")); } } + + if (!invokeWriteFunction) { + return; + } + + Publisher result; + try { + result = writeFunction.apply(this); + } + catch (Throwable ex) { + this.writeCompletionBarrier.onError(ex); + return; + } + result.subscribe(this.writeCompletionBarrier); } private Subscriber requiredWriteSubscriber() { @@ -234,6 +242,9 @@ class ChannelSendOperator extends Mono implements Scannable { requiredWriteSubscriber().onComplete(); return; } + + boolean invokeWriteFunction = false; + synchronized (this) { if (this.state == State.READY_TO_WRITE) { requiredWriteSubscriber().onComplete(); @@ -241,20 +252,26 @@ class ChannelSendOperator extends Mono implements Scannable { else if (this.state == State.NEW) { this.completed = true; this.state = State.FIRST_SIGNAL_RECEIVED; - Publisher result; - try { - result = writeFunction.apply(this); - } - catch (Throwable ex) { - this.writeCompletionBarrier.onError(ex); - return; - } - result.subscribe(this.writeCompletionBarrier); + invokeWriteFunction = true; } else { this.completed = true; } } + + if (!invokeWriteFunction) { + return; + } + + Publisher result; + try { + result = writeFunction.apply(this); + } + catch (Throwable ex) { + this.writeCompletionBarrier.onError(ex); + return; + } + result.subscribe(this.writeCompletionBarrier); } @Override diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java index 1baf031b20a..8c5f8893b64 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java @@ -168,7 +168,9 @@ public class ChannelSendOperator extends Mono implements Scannable { requiredWriteSubscriber().onNext(item); return; } - // FIXME revisit in case of reentrant sync deadlock + + boolean invokeWriteFunction = false; + synchronized (this) { if (this.state == State.READY_TO_WRITE) { requiredWriteSubscriber().onNext(item); @@ -176,15 +178,7 @@ public class ChannelSendOperator extends Mono implements Scannable { else if (this.state == State.NEW) { this.item = item; this.state = State.FIRST_SIGNAL_RECEIVED; - Publisher result; - try { - result = writeFunction.apply(this); - } - catch (Throwable ex) { - this.writeCompletionBarrier.onError(ex); - return; - } - result.subscribe(this.writeCompletionBarrier); + invokeWriteFunction = true; } else { if (this.subscription != null) { @@ -193,6 +187,20 @@ public class ChannelSendOperator extends Mono implements Scannable { this.writeCompletionBarrier.onError(new IllegalStateException("Unexpected item.")); } } + + if (!invokeWriteFunction) { + return; + } + + Publisher result; + try { + result = writeFunction.apply(this); + } + catch (Throwable ex) { + this.writeCompletionBarrier.onError(ex); + return; + } + result.subscribe(this.writeCompletionBarrier); } private Subscriber requiredWriteSubscriber() { @@ -226,6 +234,9 @@ public class ChannelSendOperator extends Mono implements Scannable { requiredWriteSubscriber().onComplete(); return; } + + boolean invokeWriteFunction = false; + synchronized (this) { if (this.state == State.READY_TO_WRITE) { requiredWriteSubscriber().onComplete(); @@ -233,20 +244,26 @@ public class ChannelSendOperator extends Mono implements Scannable { else if (this.state == State.NEW) { this.completed = true; this.state = State.FIRST_SIGNAL_RECEIVED; - Publisher result; - try { - result = writeFunction.apply(this); - } - catch (Throwable ex) { - this.writeCompletionBarrier.onError(ex); - return; - } - result.subscribe(this.writeCompletionBarrier); + invokeWriteFunction = true; } else { this.completed = true; } } + + if (!invokeWriteFunction) { + return; + } + + Publisher result; + try { + result = writeFunction.apply(this); + } + catch (Throwable ex) { + this.writeCompletionBarrier.onError(ex); + return; + } + result.subscribe(this.writeCompletionBarrier); } @Override