|
|
|
@ -69,9 +69,6 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public void subscribe(Subscriber<? super T> subscriber) { |
|
|
|
public void subscribe(Subscriber<? super T> subscriber) { |
|
|
|
if (this.logger.isTraceEnabled()) { |
|
|
|
|
|
|
|
this.logger.trace(this.state + " subscribe: " + subscriber); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
this.state.get().subscribe(this, subscriber); |
|
|
|
this.state.get().subscribe(this, subscriber); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@ -79,24 +76,20 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { |
|
|
|
// Methods for sub-classes to delegate to, when async I/O events occur...
|
|
|
|
// Methods for sub-classes to delegate to, when async I/O events occur...
|
|
|
|
|
|
|
|
|
|
|
|
public final void onDataAvailable() { |
|
|
|
public final void onDataAvailable() { |
|
|
|
if (this.logger.isTraceEnabled()) { |
|
|
|
this.logger.trace("I/O event onDataAvailable"); |
|
|
|
this.logger.trace(this.state + " onDataAvailable"); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
this.state.get().onDataAvailable(this); |
|
|
|
this.state.get().onDataAvailable(this); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public void onAllDataRead() { |
|
|
|
public void onAllDataRead() { |
|
|
|
if (this.logger.isTraceEnabled()) { |
|
|
|
this.logger.trace("I/O event onAllDataRead"); |
|
|
|
this.logger.trace(this.state + " onAllDataRead"); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
this.state.get().onAllDataRead(this); |
|
|
|
this.state.get().onAllDataRead(this); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public final void onError(Throwable t) { |
|
|
|
public final void onError(Throwable ex) { |
|
|
|
if (this.logger.isTraceEnabled()) { |
|
|
|
if (this.logger.isTraceEnabled()) { |
|
|
|
this.logger.trace(this.state + " onError: " + t); |
|
|
|
this.logger.trace("I/O event onError: " + ex); |
|
|
|
} |
|
|
|
} |
|
|
|
this.state.get().onError(this, t); |
|
|
|
this.state.get().onError(this, ex); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -142,10 +135,17 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { |
|
|
|
if (r != Long.MAX_VALUE) { |
|
|
|
if (r != Long.MAX_VALUE) { |
|
|
|
DEMAND_FIELD_UPDATER.addAndGet(this, -1L); |
|
|
|
DEMAND_FIELD_UPDATER.addAndGet(this, -1L); |
|
|
|
} |
|
|
|
} |
|
|
|
Assert.state(this.subscriber != null, "No subscriber"); |
|
|
|
Subscriber<? super T> subscriber = this.subscriber; |
|
|
|
this.subscriber.onNext(data); |
|
|
|
Assert.state(subscriber != null, "No subscriber"); |
|
|
|
|
|
|
|
if (logger.isTraceEnabled()) { |
|
|
|
|
|
|
|
logger.trace("Data item read, publishing.."); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
subscriber.onNext(data); |
|
|
|
} |
|
|
|
} |
|
|
|
else { |
|
|
|
else { |
|
|
|
|
|
|
|
if (logger.isTraceEnabled()) { |
|
|
|
|
|
|
|
logger.trace("No more data to read"); |
|
|
|
|
|
|
|
} |
|
|
|
return true; |
|
|
|
return true; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
@ -153,7 +153,17 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private boolean changeState(State oldState, State newState) { |
|
|
|
private boolean changeState(State oldState, State newState) { |
|
|
|
return this.state.compareAndSet(oldState, newState); |
|
|
|
boolean result = this.state.compareAndSet(oldState, newState); |
|
|
|
|
|
|
|
if (result && logger.isTraceEnabled()) { |
|
|
|
|
|
|
|
logger.trace(oldState + " -> " + newState); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
return result; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private void changeToDemandState(State oldState) { |
|
|
|
|
|
|
|
if (changeState(oldState, State.DEMAND)) { |
|
|
|
|
|
|
|
checkOnDataAvailable(); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private Subscription createSubscription() { |
|
|
|
private Subscription createSubscription() { |
|
|
|
@ -170,7 +180,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { |
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public final void request(long n) { |
|
|
|
public final void request(long n) { |
|
|
|
if (logger.isTraceEnabled()) { |
|
|
|
if (logger.isTraceEnabled()) { |
|
|
|
logger.trace(state + " request: " + n); |
|
|
|
logger.trace("Signal request(" + n + ")"); |
|
|
|
} |
|
|
|
} |
|
|
|
state.get().request(AbstractListenerReadPublisher.this, n); |
|
|
|
state.get().request(AbstractListenerReadPublisher.this, n); |
|
|
|
} |
|
|
|
} |
|
|
|
@ -178,7 +188,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { |
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public final void cancel() { |
|
|
|
public final void cancel() { |
|
|
|
if (logger.isTraceEnabled()) { |
|
|
|
if (logger.isTraceEnabled()) { |
|
|
|
logger.trace(state + " cancel"); |
|
|
|
logger.trace("Signal cancel()"); |
|
|
|
} |
|
|
|
} |
|
|
|
state.get().cancel(AbstractListenerReadPublisher.this); |
|
|
|
state.get().cancel(AbstractListenerReadPublisher.this); |
|
|
|
} |
|
|
|
} |
|
|
|
@ -217,10 +227,14 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { |
|
|
|
publisher.changeState(SUBSCRIBING, NO_DEMAND); |
|
|
|
publisher.changeState(SUBSCRIBING, NO_DEMAND); |
|
|
|
// Now safe to check "beforeDemand" flags, they won't change once in NO_DEMAND
|
|
|
|
// Now safe to check "beforeDemand" flags, they won't change once in NO_DEMAND
|
|
|
|
if (publisher.completionBeforeDemand) { |
|
|
|
if (publisher.completionBeforeDemand) { |
|
|
|
|
|
|
|
publisher.logger.trace("Completed before demand"); |
|
|
|
publisher.state.get().onAllDataRead(publisher); |
|
|
|
publisher.state.get().onAllDataRead(publisher); |
|
|
|
} |
|
|
|
} |
|
|
|
Throwable ex = publisher.errorBeforeDemand; |
|
|
|
Throwable ex = publisher.errorBeforeDemand; |
|
|
|
if (ex != null) { |
|
|
|
if (ex != null) { |
|
|
|
|
|
|
|
if (publisher.logger.isTraceEnabled()) { |
|
|
|
|
|
|
|
publisher.logger.trace("Completed with error before demand: " + ex); |
|
|
|
|
|
|
|
} |
|
|
|
publisher.state.get().onError(publisher, ex); |
|
|
|
publisher.state.get().onError(publisher, ex); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
@ -249,9 +263,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { |
|
|
|
<T> void request(AbstractListenerReadPublisher<T> publisher, long n) { |
|
|
|
<T> void request(AbstractListenerReadPublisher<T> publisher, long n) { |
|
|
|
if (Operators.validate(n)) { |
|
|
|
if (Operators.validate(n)) { |
|
|
|
Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n); |
|
|
|
Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n); |
|
|
|
if (publisher.changeState(this, DEMAND)) { |
|
|
|
publisher.changeToDemandState(this); |
|
|
|
publisher.checkOnDataAvailable(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@ -271,10 +283,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { |
|
|
|
<T> void request(AbstractListenerReadPublisher<T> publisher, long n) { |
|
|
|
<T> void request(AbstractListenerReadPublisher<T> publisher, long n) { |
|
|
|
if (Operators.validate(n)) { |
|
|
|
if (Operators.validate(n)) { |
|
|
|
Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n); |
|
|
|
Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n); |
|
|
|
if (publisher.changeState(this, DEMAND)) { |
|
|
|
publisher.changeToDemandState(this); |
|
|
|
publisher.checkOnDataAvailable(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
// or else we completed at the same time...
|
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
}, |
|
|
|
}, |
|
|
|
@ -285,9 +294,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { |
|
|
|
if (Operators.validate(n)) { |
|
|
|
if (Operators.validate(n)) { |
|
|
|
Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n); |
|
|
|
Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n); |
|
|
|
// Did a concurrent read transition to NO_DEMAND just before us?
|
|
|
|
// Did a concurrent read transition to NO_DEMAND just before us?
|
|
|
|
if (publisher.changeState(NO_DEMAND, this)) { |
|
|
|
publisher.changeToDemandState(NO_DEMAND); |
|
|
|
publisher.checkOnDataAvailable(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@ -297,17 +304,15 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { |
|
|
|
try { |
|
|
|
try { |
|
|
|
boolean demandAvailable = publisher.readAndPublish(); |
|
|
|
boolean demandAvailable = publisher.readAndPublish(); |
|
|
|
if (demandAvailable) { |
|
|
|
if (demandAvailable) { |
|
|
|
if (publisher.changeState(READING, DEMAND)) { |
|
|
|
publisher.changeToDemandState(READING); |
|
|
|
publisher.checkOnDataAvailable(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
else { |
|
|
|
else { |
|
|
|
publisher.readingPaused(); |
|
|
|
publisher.readingPaused(); |
|
|
|
if (publisher.changeState(READING, NO_DEMAND)) { |
|
|
|
if (publisher.changeState(READING, NO_DEMAND)) { |
|
|
|
// Demand may have arrived since readAndPublish returned
|
|
|
|
// Demand may have arrived since readAndPublish returned
|
|
|
|
long r = publisher.demand; |
|
|
|
long r = publisher.demand; |
|
|
|
if (r > 0 && publisher.changeState(NO_DEMAND, this)) { |
|
|
|
if (r > 0) { |
|
|
|
publisher.checkOnDataAvailable(); |
|
|
|
publisher.changeToDemandState(NO_DEMAND); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
@ -326,9 +331,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { |
|
|
|
if (Operators.validate(n)) { |
|
|
|
if (Operators.validate(n)) { |
|
|
|
Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n); |
|
|
|
Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n); |
|
|
|
// Did a concurrent read transition to NO_DEMAND just before us?
|
|
|
|
// Did a concurrent read transition to NO_DEMAND just before us?
|
|
|
|
if (publisher.changeState(NO_DEMAND, DEMAND)) { |
|
|
|
publisher.changeToDemandState(NO_DEMAND); |
|
|
|
publisher.checkOnDataAvailable(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
}, |
|
|
|
}, |
|
|
|
@ -372,8 +375,9 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { |
|
|
|
|
|
|
|
|
|
|
|
<T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) { |
|
|
|
<T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) { |
|
|
|
if (publisher.changeState(this, COMPLETED)) { |
|
|
|
if (publisher.changeState(this, COMPLETED)) { |
|
|
|
if (publisher.subscriber != null) { |
|
|
|
Subscriber<? super T> s = publisher.subscriber; |
|
|
|
publisher.subscriber.onComplete(); |
|
|
|
if (s != null) { |
|
|
|
|
|
|
|
s.onComplete(); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
else { |
|
|
|
else { |
|
|
|
@ -383,8 +387,9 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { |
|
|
|
|
|
|
|
|
|
|
|
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable t) { |
|
|
|
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable t) { |
|
|
|
if (publisher.changeState(this, COMPLETED)) { |
|
|
|
if (publisher.changeState(this, COMPLETED)) { |
|
|
|
if (publisher.subscriber != null) { |
|
|
|
Subscriber<? super T> s = publisher.subscriber; |
|
|
|
publisher.subscriber.onError(t); |
|
|
|
if (s != null) { |
|
|
|
|
|
|
|
s.onError(t); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
else { |
|
|
|
else { |
|
|
|
|