From 6562e3047f0850140eb045b9327d3eba6abbef75 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 2 Aug 2018 15:00:56 +0300 Subject: [PATCH] takeUntilByteCount actually uses takeUntil Issue: SPR-17188 --- .../core/io/buffer/DataBufferUtils.java | 9 +++---- .../core/io/buffer/DataBufferUtilsTests.java | 25 ++++++++++++++----- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index d2e0d57a07a..0fd760899c3 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -396,14 +396,11 @@ public abstract class DataBufferUtils { AtomicLong countDown = new AtomicLong(maxByteCount); return Flux.from(publisher) - .takeWhile(buffer -> { - int delta = -buffer.readableByteCount(); - return countDown.getAndAdd(delta) >= 0; - }) .map(buffer -> { - long count = countDown.get(); + long count = countDown.addAndGet(-buffer.readableByteCount()); return count >= 0 ? buffer : buffer.slice(0, buffer.readableByteCount() + (int) count); - }); + }) + .takeUntil(buffer -> countDown.get() <= 0); } /** diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java index e6b1ff55382..f2a6a4c5b8f 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java @@ -226,19 +226,32 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { @Test public void takeUntilByteCount() { - DataBuffer foo = stringBuffer("foo"); - DataBuffer bar = stringBuffer("bar"); - DataBuffer baz = stringBuffer("baz"); - Flux flux = Flux.just(foo, bar, baz); - Flux result = DataBufferUtils.takeUntilByteCount(flux, 5L); + + Flux result = DataBufferUtils.takeUntilByteCount( + Flux.just(stringBuffer("foo"), stringBuffer("bar")), 5L); StepVerifier.create(result) .consumeNextWith(stringConsumer("foo")) .consumeNextWith(stringConsumer("ba")) .expectComplete() .verify(Duration.ofSeconds(5)); + } + + @Test + public void takeUntilByteCountExact() { + + DataBuffer extraBuffer = stringBuffer("baz"); + + Flux result = DataBufferUtils.takeUntilByteCount( + Flux.just(stringBuffer("foo"), stringBuffer("bar"), extraBuffer), 6L); + + StepVerifier.create(result) + .consumeNextWith(stringConsumer("foo")) + .consumeNextWith(stringConsumer("bar")) + .expectComplete() + .verify(Duration.ofSeconds(5)); - release(baz); + release(extraBuffer); } @Test