From 35cf4f173b1320e0a2684a97e2cac23168c22158 Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Thu, 1 Sep 2016 11:16:01 +0200 Subject: [PATCH] Copied getAndSub() over from Reactor Operators.getAndSub was removed in Reactor 3.0.1, this commit copies the implementation over to AbstractRequestBodyPublisher, which used it. --- .../AbstractRequestBodyPublisher.java | 25 ++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java index f929f45391a..719362ef68f 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java @@ -106,7 +106,7 @@ abstract class AbstractRequestBodyPublisher implements Publisher { while (hasDemand()) { DataBuffer dataBuffer = read(); if (dataBuffer != null) { - Operators.getAndSub(this.demand, 1L); + getAndSub(this.demand, 1L); this.subscriber.onNext(dataBuffer); } else { @@ -116,6 +116,29 @@ abstract class AbstractRequestBodyPublisher implements Publisher { return false; } + /** + * Concurrent substraction bound to 0 and Long.MAX_VALUE. + * Any concurrent write will "happen" before this operation. + * + * @param sequence current atomic to update + * @param toSub delta to sub + * @return value before subscription, 0 or Long.MAX_VALUE + */ + private static long getAndSub(AtomicLong sequence, long toSub) { + long r; + long u; + do { + r = sequence.get(); + if (r == 0 || r == Long.MAX_VALUE) { + return r; + } + u = Operators.subOrZero(r, toSub); + } while (!sequence.compareAndSet(r, u)); + + return r; + } + + protected abstract void checkOnDataAvailable(); /**