|
|
|
|
@ -106,7 +106,7 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
@@ -106,7 +106,7 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
|
|
|
|
|
while (hasDemand()) { |
|
|
|
|
DataBuffer dataBuffer = read(); |
|
|
|
|
if (dataBuffer != null) { |
|
|
|
|
Operators.getAndSub(this.demand, 1L); |
|
|
|
|
getAndSub(this.demand, 1L); |
|
|
|
|
this.subscriber.onNext(dataBuffer); |
|
|
|
|
} |
|
|
|
|
else { |
|
|
|
|
@ -116,6 +116,29 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
@@ -116,6 +116,29 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
|
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Concurrent substraction bound to 0 and Long.MAX_VALUE. |
|
|
|
|
* Any concurrent write will "happen" before this operation. |
|
|
|
|
* |
|
|
|
|
* @param sequence current atomic to update |
|
|
|
|
* @param toSub delta to sub |
|
|
|
|
* @return value before subscription, 0 or Long.MAX_VALUE |
|
|
|
|
*/ |
|
|
|
|
private static long getAndSub(AtomicLong sequence, long toSub) { |
|
|
|
|
long r; |
|
|
|
|
long u; |
|
|
|
|
do { |
|
|
|
|
r = sequence.get(); |
|
|
|
|
if (r == 0 || r == Long.MAX_VALUE) { |
|
|
|
|
return r; |
|
|
|
|
} |
|
|
|
|
u = Operators.subOrZero(r, toSub); |
|
|
|
|
} while (!sequence.compareAndSet(r, u)); |
|
|
|
|
|
|
|
|
|
return r; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
protected abstract void checkOnDataAvailable(); |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|