|
|
|
@ -1,5 +1,5 @@ |
|
|
|
/* |
|
|
|
/* |
|
|
|
* Copyright 2002-2018 the original author or authors. |
|
|
|
* Copyright 2002-2019 the original author or authors. |
|
|
|
* |
|
|
|
* |
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
|
|
* you may not use this file except in compliance with the License. |
|
|
|
* you may not use this file except in compliance with the License. |
|
|
|
@ -283,19 +283,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { |
|
|
|
publisher.subscriber = subscriber; |
|
|
|
publisher.subscriber = subscriber; |
|
|
|
subscriber.onSubscribe(subscription); |
|
|
|
subscriber.onSubscribe(subscription); |
|
|
|
publisher.changeState(SUBSCRIBING, NO_DEMAND); |
|
|
|
publisher.changeState(SUBSCRIBING, NO_DEMAND); |
|
|
|
// Now safe to check "beforeDemand" flags, they won't change once in NO_DEMAND
|
|
|
|
handleCompletionOrErrorBeforeDemand(publisher); |
|
|
|
String logPrefix = publisher.getLogPrefix(); |
|
|
|
|
|
|
|
if (publisher.completionBeforeDemand) { |
|
|
|
|
|
|
|
rsReadLogger.trace(logPrefix + "Completed before demand"); |
|
|
|
|
|
|
|
publisher.state.get().onAllDataRead(publisher); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
Throwable ex = publisher.errorBeforeDemand; |
|
|
|
|
|
|
|
if (ex != null) { |
|
|
|
|
|
|
|
if (rsReadLogger.isTraceEnabled()) { |
|
|
|
|
|
|
|
rsReadLogger.trace(logPrefix + "Completed with error before demand: " + ex); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
publisher.state.get().onError(publisher, ex); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
else { |
|
|
|
else { |
|
|
|
throw new IllegalStateException("Failed to transition to SUBSCRIBING, " + |
|
|
|
throw new IllegalStateException("Failed to transition to SUBSCRIBING, " + |
|
|
|
@ -306,11 +294,30 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { |
|
|
|
@Override |
|
|
|
@Override |
|
|
|
<T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) { |
|
|
|
<T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) { |
|
|
|
publisher.completionBeforeDemand = true; |
|
|
|
publisher.completionBeforeDemand = true; |
|
|
|
|
|
|
|
handleCompletionOrErrorBeforeDemand(publisher); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable ex) { |
|
|
|
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable ex) { |
|
|
|
publisher.errorBeforeDemand = ex; |
|
|
|
publisher.errorBeforeDemand = ex; |
|
|
|
|
|
|
|
handleCompletionOrErrorBeforeDemand(publisher); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private <T> void handleCompletionOrErrorBeforeDemand(AbstractListenerReadPublisher<T> publisher) { |
|
|
|
|
|
|
|
if (publisher.state.get().equals(NO_DEMAND)) { |
|
|
|
|
|
|
|
if (publisher.completionBeforeDemand) { |
|
|
|
|
|
|
|
rsReadLogger.trace(publisher.getLogPrefix() + "Completed before demand"); |
|
|
|
|
|
|
|
publisher.state.get().onAllDataRead(publisher); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
Throwable ex = publisher.errorBeforeDemand; |
|
|
|
|
|
|
|
if (ex != null) { |
|
|
|
|
|
|
|
if (rsReadLogger.isTraceEnabled()) { |
|
|
|
|
|
|
|
String prefix = publisher.getLogPrefix(); |
|
|
|
|
|
|
|
rsReadLogger.trace(prefix + "Completed with error before demand: " + ex); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
publisher.state.get().onError(publisher, ex); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
}, |
|
|
|
}, |
|
|
|
|
|
|
|
|
|
|
|
|