Browse Source

Fix method order in ChannelSendOperator

1. Group by contract
2. Follow lifecycle -- Subscriber 1st, Subscription 2nd, Publisher last
3. Order of declaration in implemented contracts
pull/1473/head
Rossen Stoyanchev 9 years ago
parent
commit
fbcd554f42
  1. 134
      spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java

134
spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java

@ -119,41 +119,47 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
} }
@Override // CoreSubscriber<T> methods (we're the subscriber to the write source)..
public void cancel() {
Subscription s = this.subscription;
if (s != null) {
this.subscription = null;
s.cancel();
}
}
@Override @Override
public Context currentContext() { public final void onSubscribe(Subscription s) {
return completionSubscriber.currentContext(); if (Operators.validate(this.subscription, s)) {
this.subscription = s;
this.completionSubscriber.onSubscribe(this);
s.request(1);
}
} }
@Override @Override
public final void onComplete() { public final void onNext(T item) {
if (this.readyToWrite) { if (this.readyToWrite) {
requiredWriteSubscriber().onComplete(); requiredWriteSubscriber().onNext(item);
return; return;
} }
//FIXME revisit in case of reentrant sync deadlock
synchronized (this) { synchronized (this) {
if (this.readyToWrite) { if (this.readyToWrite) {
requiredWriteSubscriber().onComplete(); requiredWriteSubscriber().onNext(item);
} }
else if (this.beforeFirstEmission) { else if (this.beforeFirstEmission) {
this.completed = true; this.item = item;
this.beforeFirstEmission = false; this.beforeFirstEmission = false;
writeFunction.apply(this).subscribe(new DownstreamBridge(this.completionSubscriber)); writeFunction.apply(this).subscribe(new DownstreamBridge(this.completionSubscriber));
} }
else { else {
this.completed = true; if (this.subscription != null) {
this.subscription.cancel();
}
this.completionSubscriber.onError(new IllegalStateException("Unexpected item."));
} }
} }
} }
private Subscriber<? super T> requiredWriteSubscriber() {
Assert.state(this.writeSubscriber != null, "No write subscriber");
return this.writeSubscriber;
}
@Override @Override
public final void onError(Throwable ex) { public final void onError(Throwable ex) {
if (this.readyToWrite) { if (this.readyToWrite) {
@ -175,72 +181,33 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
} }
@Override @Override
public final void onNext(T item) { public final void onComplete() {
if (this.readyToWrite) { if (this.readyToWrite) {
requiredWriteSubscriber().onNext(item); requiredWriteSubscriber().onComplete();
return; return;
} }
//FIXME revisit in case of reentrant sync deadlock
synchronized (this) { synchronized (this) {
if (this.readyToWrite) { if (this.readyToWrite) {
requiredWriteSubscriber().onNext(item); requiredWriteSubscriber().onComplete();
} }
else if (this.beforeFirstEmission) { else if (this.beforeFirstEmission) {
this.item = item; this.completed = true;
this.beforeFirstEmission = false; this.beforeFirstEmission = false;
writeFunction.apply(this).subscribe(new DownstreamBridge(this.completionSubscriber)); writeFunction.apply(this).subscribe(new DownstreamBridge(this.completionSubscriber));
} }
else { else {
if (this.subscription != null) { this.completed = true;
this.subscription.cancel();
}
this.completionSubscriber.onError(new IllegalStateException("Unexpected item."));
} }
} }
} }
@Override @Override
public final void onSubscribe(Subscription s) { public Context currentContext() {
if (Operators.validate(this.subscription, s)) { return this.completionSubscriber.currentContext();
this.subscription = s;
this.completionSubscriber.onSubscribe(this);
s.request(1);
}
} }
@Override
public void subscribe(Subscriber<? super T> writeSubscriber) {
synchronized (this) {
Assert.state(this.writeSubscriber == null, "Only one write subscriber supported");
this.writeSubscriber = writeSubscriber;
if (this.error != null || this.completed) {
this.writeSubscriber.onSubscribe(Operators.emptySubscription());
emitCachedSignals();
}
else {
this.writeSubscriber.onSubscribe(this);
}
}
}
/** // Subscription methods (we're the subscription to completion~ and writeSubscriber)..
* Emit cached signals to the write subscriber.
* @return true if no more signals expected
*/
private boolean emitCachedSignals() {
if (this.item != null) {
requiredWriteSubscriber().onNext(this.item);
}
if (this.error != null) {
requiredWriteSubscriber().onError(this.error);
return true;
}
if (this.completed) {
requiredWriteSubscriber().onComplete();
return true;
}
return false;
}
@Override @Override
public void request(long n) { public void request(long n) {
@ -267,9 +234,46 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
s.request(n); s.request(n);
} }
private Subscriber<? super T> requiredWriteSubscriber() { private boolean emitCachedSignals() {
Assert.state(this.writeSubscriber != null, "No write subscriber"); if (this.item != null) {
return this.writeSubscriber; requiredWriteSubscriber().onNext(this.item);
}
if (this.error != null) {
requiredWriteSubscriber().onError(this.error);
return true;
}
if (this.completed) {
requiredWriteSubscriber().onComplete();
return true;
}
return false;
}
@Override
public void cancel() {
Subscription s = this.subscription;
if (s != null) {
this.subscription = null;
s.cancel();
}
}
// Publisher<T> methods (we're the Publisher to the write subscriber)...
@Override
public void subscribe(Subscriber<? super T> writeSubscriber) {
synchronized (this) {
Assert.state(this.writeSubscriber == null, "Only one write subscriber supported");
this.writeSubscriber = writeSubscriber;
if (this.error != null || this.completed) {
this.writeSubscriber.onSubscribe(Operators.emptySubscription());
emitCachedSignals();
}
else {
this.writeSubscriber.onSubscribe(this);
}
}
} }
} }

Loading…
Cancel
Save