|
|
|
@ -75,7 +75,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
public AbstractListenerWriteFlushProcessor(String logPrefix) { |
|
|
|
public AbstractListenerWriteFlushProcessor(String logPrefix) { |
|
|
|
this.logPrefix = logPrefix; |
|
|
|
this.logPrefix = logPrefix; |
|
|
|
this.resultPublisher = new WriteResultPublisher(logPrefix); |
|
|
|
this.resultPublisher = new WriteResultPublisher(logPrefix + "[WFP] "); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -98,7 +98,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo |
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public final void onNext(Publisher<? extends T> publisher) { |
|
|
|
public final void onNext(Publisher<? extends T> publisher) { |
|
|
|
if (rsWriteFlushLogger.isTraceEnabled()) { |
|
|
|
if (rsWriteFlushLogger.isTraceEnabled()) { |
|
|
|
rsWriteFlushLogger.trace(getLogPrefix() + "Received onNext publisher"); |
|
|
|
rsWriteFlushLogger.trace(getLogPrefix() + "onNext: \"write\" Publisher"); |
|
|
|
} |
|
|
|
} |
|
|
|
this.state.get().onNext(this, publisher); |
|
|
|
this.state.get().onNext(this, publisher); |
|
|
|
} |
|
|
|
} |
|
|
|
@ -109,10 +109,11 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public final void onError(Throwable ex) { |
|
|
|
public final void onError(Throwable ex) { |
|
|
|
|
|
|
|
State state = this.state.get(); |
|
|
|
if (rsWriteFlushLogger.isTraceEnabled()) { |
|
|
|
if (rsWriteFlushLogger.isTraceEnabled()) { |
|
|
|
rsWriteFlushLogger.trace(getLogPrefix() + "Received onError: " + ex); |
|
|
|
rsWriteFlushLogger.trace(getLogPrefix() + "onError: " + ex + " [" + state + "]"); |
|
|
|
} |
|
|
|
} |
|
|
|
this.state.get().onError(this, ex); |
|
|
|
state.onError(this, ex); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
@ -121,10 +122,11 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public final void onComplete() { |
|
|
|
public final void onComplete() { |
|
|
|
|
|
|
|
State state = this.state.get(); |
|
|
|
if (rsWriteFlushLogger.isTraceEnabled()) { |
|
|
|
if (rsWriteFlushLogger.isTraceEnabled()) { |
|
|
|
rsWriteFlushLogger.trace(getLogPrefix() + "Received onComplete"); |
|
|
|
rsWriteFlushLogger.trace(getLogPrefix() + "onComplete [" + state + "]"); |
|
|
|
} |
|
|
|
} |
|
|
|
this.state.get().onComplete(this); |
|
|
|
state.onComplete(this); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
@ -142,7 +144,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
protected void cancel() { |
|
|
|
protected void cancel() { |
|
|
|
if (rsWriteFlushLogger.isTraceEnabled()) { |
|
|
|
if (rsWriteFlushLogger.isTraceEnabled()) { |
|
|
|
rsWriteFlushLogger.trace(getLogPrefix() + "Received request to cancel"); |
|
|
|
rsWriteFlushLogger.trace(getLogPrefix() + "cancel [" + this.state + "]"); |
|
|
|
} |
|
|
|
} |
|
|
|
if (this.subscription != null) { |
|
|
|
if (this.subscription != null) { |
|
|
|
this.subscription.cancel(); |
|
|
|
this.subscription.cancel(); |
|
|
|
@ -294,7 +296,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo |
|
|
|
} |
|
|
|
} |
|
|
|
if (processor.changeState(this, REQUESTED)) { |
|
|
|
if (processor.changeState(this, REQUESTED)) { |
|
|
|
if (processor.sourceCompleted) { |
|
|
|
if (processor.sourceCompleted) { |
|
|
|
handleSubscriberCompleted(processor); |
|
|
|
handleSourceCompleted(processor); |
|
|
|
} |
|
|
|
} |
|
|
|
else { |
|
|
|
else { |
|
|
|
Assert.state(processor.subscription != null, "No subscription"); |
|
|
|
Assert.state(processor.subscription != null, "No subscription"); |
|
|
|
@ -307,11 +309,11 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo |
|
|
|
processor.sourceCompleted = true; |
|
|
|
processor.sourceCompleted = true; |
|
|
|
// A competing write might have completed very quickly
|
|
|
|
// A competing write might have completed very quickly
|
|
|
|
if (processor.state.get().equals(State.REQUESTED)) { |
|
|
|
if (processor.state.get().equals(State.REQUESTED)) { |
|
|
|
handleSubscriberCompleted(processor); |
|
|
|
handleSourceCompleted(processor); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private <T> void handleSubscriberCompleted(AbstractListenerWriteFlushProcessor<T> processor) { |
|
|
|
private <T> void handleSourceCompleted(AbstractListenerWriteFlushProcessor<T> processor) { |
|
|
|
if (processor.isFlushPending()) { |
|
|
|
if (processor.isFlushPending()) { |
|
|
|
// Ensure the final flush
|
|
|
|
// Ensure the final flush
|
|
|
|
processor.changeState(State.REQUESTED, State.FLUSHING); |
|
|
|
processor.changeState(State.REQUESTED, State.FLUSHING); |
|
|
|
@ -423,6 +425,10 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public void onError(Throwable ex) { |
|
|
|
public void onError(Throwable ex) { |
|
|
|
|
|
|
|
if (rsWriteFlushLogger.isTraceEnabled()) { |
|
|
|
|
|
|
|
rsWriteFlushLogger.trace( |
|
|
|
|
|
|
|
this.processor.getLogPrefix() + "current \"write\" Publisher failed: " + ex); |
|
|
|
|
|
|
|
} |
|
|
|
this.processor.cancel(); |
|
|
|
this.processor.cancel(); |
|
|
|
this.processor.onError(ex); |
|
|
|
this.processor.onError(ex); |
|
|
|
} |
|
|
|
} |
|
|
|
@ -430,10 +436,16 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo |
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public void onComplete() { |
|
|
|
public void onComplete() { |
|
|
|
if (rsWriteFlushLogger.isTraceEnabled()) { |
|
|
|
if (rsWriteFlushLogger.isTraceEnabled()) { |
|
|
|
rsWriteFlushLogger.trace(this.processor.getLogPrefix() + this.processor.state + " writeComplete"); |
|
|
|
rsWriteFlushLogger.trace( |
|
|
|
|
|
|
|
this.processor.getLogPrefix() + "current \"write\" Publisher completed"); |
|
|
|
} |
|
|
|
} |
|
|
|
this.processor.state.get().writeComplete(this.processor); |
|
|
|
this.processor.state.get().writeComplete(this.processor); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
|
|
|
public String toString() { |
|
|
|
|
|
|
|
return this.processor.getClass().getSimpleName() + "-WriteResultSubscriber"; |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|