|
|
|
|
@ -116,10 +116,13 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
@@ -116,10 +116,13 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
|
|
|
|
|
* @return {@code true} if there is more demand; {@code false} otherwise |
|
|
|
|
*/ |
|
|
|
|
private boolean readAndPublish() throws IOException { |
|
|
|
|
while (hasDemand()) { |
|
|
|
|
long r; |
|
|
|
|
while ((r = demand) > 0) { |
|
|
|
|
T data = read(); |
|
|
|
|
if (data != null) { |
|
|
|
|
Operators.addAndGet(DEMAND_FIELD_UPDATER, this, -1L); |
|
|
|
|
if (r != Long.MAX_VALUE) { |
|
|
|
|
DEMAND_FIELD_UPDATER.addAndGet(this, -1L); |
|
|
|
|
} |
|
|
|
|
this.subscriber.onNext(data); |
|
|
|
|
} |
|
|
|
|
else { |
|
|
|
|
@ -129,10 +132,6 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
@@ -129,10 +132,6 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
|
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private boolean hasDemand() { |
|
|
|
|
return (this.demand > 0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private boolean changeState(State oldState, State newState) { |
|
|
|
|
return this.state.compareAndSet(oldState, newState); |
|
|
|
|
} |
|
|
|
|
|