From fbcd554f428c8c5233db550463d0aeaded98fc91 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 12 Jul 2017 12:29:38 +0200 Subject: [PATCH] Fix method order in ChannelSendOperator 1. Group by contract 2. Follow lifecycle -- Subscriber 1st, Subscription 2nd, Publisher last 3. Order of declaration in implemented contracts --- .../server/reactive/ChannelSendOperator.java | 134 +++++++++--------- 1 file changed, 69 insertions(+), 65 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java index a091e56c96b..dec69f86675 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java @@ -119,41 +119,47 @@ public class ChannelSendOperator extends Mono implements Scannable { } - @Override - public void cancel() { - Subscription s = this.subscription; - if (s != null) { - this.subscription = null; - s.cancel(); - } - } + // CoreSubscriber methods (we're the subscriber to the write source).. @Override - public Context currentContext() { - return completionSubscriber.currentContext(); + public final void onSubscribe(Subscription s) { + if (Operators.validate(this.subscription, s)) { + this.subscription = s; + this.completionSubscriber.onSubscribe(this); + s.request(1); + } } @Override - public final void onComplete() { + public final void onNext(T item) { if (this.readyToWrite) { - requiredWriteSubscriber().onComplete(); + requiredWriteSubscriber().onNext(item); return; } + //FIXME revisit in case of reentrant sync deadlock synchronized (this) { if (this.readyToWrite) { - requiredWriteSubscriber().onComplete(); + requiredWriteSubscriber().onNext(item); } else if (this.beforeFirstEmission) { - this.completed = true; + this.item = item; this.beforeFirstEmission = false; writeFunction.apply(this).subscribe(new DownstreamBridge(this.completionSubscriber)); } else { - this.completed = true; + if (this.subscription != null) { + this.subscription.cancel(); + } + this.completionSubscriber.onError(new IllegalStateException("Unexpected item.")); } } } + private Subscriber requiredWriteSubscriber() { + Assert.state(this.writeSubscriber != null, "No write subscriber"); + return this.writeSubscriber; + } + @Override public final void onError(Throwable ex) { if (this.readyToWrite) { @@ -175,72 +181,33 @@ public class ChannelSendOperator extends Mono implements Scannable { } @Override - public final void onNext(T item) { + public final void onComplete() { if (this.readyToWrite) { - requiredWriteSubscriber().onNext(item); + requiredWriteSubscriber().onComplete(); return; } - //FIXME revisit in case of reentrant sync deadlock synchronized (this) { if (this.readyToWrite) { - requiredWriteSubscriber().onNext(item); + requiredWriteSubscriber().onComplete(); } else if (this.beforeFirstEmission) { - this.item = item; + this.completed = true; this.beforeFirstEmission = false; writeFunction.apply(this).subscribe(new DownstreamBridge(this.completionSubscriber)); } else { - if (this.subscription != null) { - this.subscription.cancel(); - } - this.completionSubscriber.onError(new IllegalStateException("Unexpected item.")); + this.completed = true; } } } @Override - public final void onSubscribe(Subscription s) { - if (Operators.validate(this.subscription, s)) { - this.subscription = s; - this.completionSubscriber.onSubscribe(this); - s.request(1); - } + public Context currentContext() { + return this.completionSubscriber.currentContext(); } - @Override - public void subscribe(Subscriber writeSubscriber) { - synchronized (this) { - Assert.state(this.writeSubscriber == null, "Only one write subscriber supported"); - this.writeSubscriber = writeSubscriber; - if (this.error != null || this.completed) { - this.writeSubscriber.onSubscribe(Operators.emptySubscription()); - emitCachedSignals(); - } - else { - this.writeSubscriber.onSubscribe(this); - } - } - } - /** - * Emit cached signals to the write subscriber. - * @return true if no more signals expected - */ - private boolean emitCachedSignals() { - if (this.item != null) { - requiredWriteSubscriber().onNext(this.item); - } - if (this.error != null) { - requiredWriteSubscriber().onError(this.error); - return true; - } - if (this.completed) { - requiredWriteSubscriber().onComplete(); - return true; - } - return false; - } + // Subscription methods (we're the subscription to completion~ and writeSubscriber).. @Override public void request(long n) { @@ -267,9 +234,46 @@ public class ChannelSendOperator extends Mono implements Scannable { s.request(n); } - private Subscriber requiredWriteSubscriber() { - Assert.state(this.writeSubscriber != null, "No write subscriber"); - return this.writeSubscriber; + private boolean emitCachedSignals() { + if (this.item != null) { + requiredWriteSubscriber().onNext(this.item); + } + if (this.error != null) { + requiredWriteSubscriber().onError(this.error); + return true; + } + if (this.completed) { + requiredWriteSubscriber().onComplete(); + return true; + } + return false; + } + + @Override + public void cancel() { + Subscription s = this.subscription; + if (s != null) { + this.subscription = null; + s.cancel(); + } + } + + + // Publisher methods (we're the Publisher to the write subscriber)... + + @Override + public void subscribe(Subscriber writeSubscriber) { + synchronized (this) { + Assert.state(this.writeSubscriber == null, "Only one write subscriber supported"); + this.writeSubscriber = writeSubscriber; + if (this.error != null || this.completed) { + this.writeSubscriber.onSubscribe(Operators.emptySubscription()); + emitCachedSignals(); + } + else { + this.writeSubscriber.onSubscribe(this); + } + } } }