Browse Source

takeUntilByteCount actually uses takeUntil

Issue: SPR-17188
pull/1916/head
Rossen Stoyanchev 8 years ago
parent
commit
6562e3047f
  1. 9
      spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java
  2. 25
      spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java

9
spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java

@ -396,14 +396,11 @@ public abstract class DataBufferUtils { @@ -396,14 +396,11 @@ public abstract class DataBufferUtils {
AtomicLong countDown = new AtomicLong(maxByteCount);
return Flux.from(publisher)
.takeWhile(buffer -> {
int delta = -buffer.readableByteCount();
return countDown.getAndAdd(delta) >= 0;
})
.map(buffer -> {
long count = countDown.get();
long count = countDown.addAndGet(-buffer.readableByteCount());
return count >= 0 ? buffer : buffer.slice(0, buffer.readableByteCount() + (int) count);
});
})
.takeUntil(buffer -> countDown.get() <= 0);
}
/**

25
spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java

@ -226,19 +226,32 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { @@ -226,19 +226,32 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
@Test
public void takeUntilByteCount() {
DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar");
DataBuffer baz = stringBuffer("baz");
Flux<DataBuffer> flux = Flux.just(foo, bar, baz);
Flux<DataBuffer> result = DataBufferUtils.takeUntilByteCount(flux, 5L);
Flux<DataBuffer> result = DataBufferUtils.takeUntilByteCount(
Flux.just(stringBuffer("foo"), stringBuffer("bar")), 5L);
StepVerifier.create(result)
.consumeNextWith(stringConsumer("foo"))
.consumeNextWith(stringConsumer("ba"))
.expectComplete()
.verify(Duration.ofSeconds(5));
}
@Test
public void takeUntilByteCountExact() {
DataBuffer extraBuffer = stringBuffer("baz");
Flux<DataBuffer> result = DataBufferUtils.takeUntilByteCount(
Flux.just(stringBuffer("foo"), stringBuffer("bar"), extraBuffer), 6L);
StepVerifier.create(result)
.consumeNextWith(stringConsumer("foo"))
.consumeNextWith(stringConsumer("bar"))
.expectComplete()
.verify(Duration.ofSeconds(5));
release(baz);
release(extraBuffer);
}
@Test

Loading…
Cancel
Save