diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java index 76fc45bea51..a091e56c96b 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java @@ -50,7 +50,9 @@ public class ChannelSendOperator extends Mono implements Scannable { private final Flux source; - public ChannelSendOperator(Publisher source, Function, Publisher> writeFunction) { + public ChannelSendOperator(Publisher source, Function, + Publisher> writeFunction) { + this.source = Flux.from(source); this.writeFunction = writeFunction; } @@ -79,8 +81,10 @@ public class ChannelSendOperator extends Mono implements Scannable { private final class WriteWithBarrier implements Publisher, CoreSubscriber, Subscription { - private final CoreSubscriber subscriber; + /* Downstream write completion subscriber */ + private final CoreSubscriber completionSubscriber; + /* Upstream write source subscription */ @Nullable private Subscription subscription; @@ -109,10 +113,12 @@ public class ChannelSendOperator extends Mono implements Scannable { @Nullable private Subscriber writeSubscriber; - WriteWithBarrier(CoreSubscriber subscriber) { - this.subscriber = subscriber; + + WriteWithBarrier(CoreSubscriber completionSubscriber) { + this.completionSubscriber = completionSubscriber; } + @Override public void cancel() { Subscription s = this.subscription; @@ -124,23 +130,23 @@ public class ChannelSendOperator extends Mono implements Scannable { @Override public Context currentContext() { - return subscriber.currentContext(); + return completionSubscriber.currentContext(); } @Override public final void onComplete() { if (this.readyToWrite) { - obtainWriteSubscriber().onComplete(); + requiredWriteSubscriber().onComplete(); return; } synchronized (this) { if (this.readyToWrite) { - obtainWriteSubscriber().onComplete(); + requiredWriteSubscriber().onComplete(); } else if (this.beforeFirstEmission) { this.completed = true; this.beforeFirstEmission = false; - writeFunction.apply(this).subscribe(new DownstreamBridge(subscriber)); + writeFunction.apply(this).subscribe(new DownstreamBridge(this.completionSubscriber)); } else { this.completed = true; @@ -151,16 +157,16 @@ public class ChannelSendOperator extends Mono implements Scannable { @Override public final void onError(Throwable ex) { if (this.readyToWrite) { - obtainWriteSubscriber().onError(ex); + requiredWriteSubscriber().onError(ex); return; } synchronized (this) { if (this.readyToWrite) { - obtainWriteSubscriber().onError(ex); + requiredWriteSubscriber().onError(ex); } else if (this.beforeFirstEmission) { this.beforeFirstEmission = false; - subscriber.onError(ex); + this.completionSubscriber.onError(ex); } else { this.error = ex; @@ -171,24 +177,24 @@ public class ChannelSendOperator extends Mono implements Scannable { @Override public final void onNext(T item) { if (this.readyToWrite) { - obtainWriteSubscriber().onNext(item); + requiredWriteSubscriber().onNext(item); return; } //FIXME revisit in case of reentrant sync deadlock synchronized (this) { if (this.readyToWrite) { - obtainWriteSubscriber().onNext(item); + requiredWriteSubscriber().onNext(item); } else if (this.beforeFirstEmission) { this.item = item; this.beforeFirstEmission = false; - writeFunction.apply(this).subscribe(new DownstreamBridge(subscriber)); + writeFunction.apply(this).subscribe(new DownstreamBridge(this.completionSubscriber)); } else { if (this.subscription != null) { this.subscription.cancel(); } - subscriber.onError(new IllegalStateException("Unexpected item.")); + this.completionSubscriber.onError(new IllegalStateException("Unexpected item.")); } } } @@ -197,8 +203,8 @@ public class ChannelSendOperator extends Mono implements Scannable { public final void onSubscribe(Subscription s) { if (Operators.validate(this.subscription, s)) { this.subscription = s; - this.subscriber.onSubscribe(this); - s.request(1); // bypass doRequest + this.completionSubscriber.onSubscribe(this); + s.request(1); } } @@ -223,14 +229,14 @@ public class ChannelSendOperator extends Mono implements Scannable { */ private boolean emitCachedSignals() { if (this.item != null) { - obtainWriteSubscriber().onNext(this.item); + requiredWriteSubscriber().onNext(this.item); } if (this.error != null) { - obtainWriteSubscriber().onError(this.error); + requiredWriteSubscriber().onError(this.error); return true; } if (this.completed) { - obtainWriteSubscriber().onComplete(); + requiredWriteSubscriber().onComplete(); return true; } return false; @@ -242,13 +248,13 @@ public class ChannelSendOperator extends Mono implements Scannable { if (s == null) { return; } - if (readyToWrite) { + if (this.readyToWrite) { s.request(n); return; } synchronized (this) { if (this.writeSubscriber != null) { - readyToWrite = true; + this.readyToWrite = true; if (emitCachedSignals()) { return; } @@ -261,7 +267,7 @@ public class ChannelSendOperator extends Mono implements Scannable { s.request(n); } - private Subscriber obtainWriteSubscriber() { + private Subscriber requiredWriteSubscriber() { Assert.state(this.writeSubscriber != null, "No write subscriber"); return this.writeSubscriber; } @@ -297,7 +303,7 @@ public class ChannelSendOperator extends Mono implements Scannable { @Override public Context currentContext() { - return downstream.currentContext(); + return this.downstream.currentContext(); } }