diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java index f929f45391a..719362ef68f 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java @@ -106,7 +106,7 @@ abstract class AbstractRequestBodyPublisher implements Publisher { while (hasDemand()) { DataBuffer dataBuffer = read(); if (dataBuffer != null) { - Operators.getAndSub(this.demand, 1L); + getAndSub(this.demand, 1L); this.subscriber.onNext(dataBuffer); } else { @@ -116,6 +116,29 @@ abstract class AbstractRequestBodyPublisher implements Publisher { return false; } + /** + * Concurrent substraction bound to 0 and Long.MAX_VALUE. + * Any concurrent write will "happen" before this operation. + * + * @param sequence current atomic to update + * @param toSub delta to sub + * @return value before subscription, 0 or Long.MAX_VALUE + */ + private static long getAndSub(AtomicLong sequence, long toSub) { + long r; + long u; + do { + r = sequence.get(); + if (r == 0 || r == Long.MAX_VALUE) { + return r; + } + u = Operators.subOrZero(r, toSub); + } while (!sequence.compareAndSet(r, u)); + + return r; + } + + protected abstract void checkOnDataAvailable(); /**