From beae1fbb12e064f7b989a2ed4cbd567c7b91f053 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 28 Mar 2019 17:18:40 -0400 Subject: [PATCH] Uncomment buffer leak tests in DataBufferUtils Following a response on https://github.com/reactor/reactor-core/issues/1634 apparently FluxSink is respecting doOnDiscard but there is a race condition in DataBufferUtils with file reading. This commit makes two changes: 1) Add more checks for onDispose to detect cancellation signals as well as to deal with potentially concurrent such signal. 2) Do not close the channel through the Flux.using callback but rather allow the current I/O callback to take place and only then close the channel or else the buffer is left hanging. Despite this tests still can fail due to a suspected issue in Reactor itself with the doOnDiscard callback for FluxSink. That's tracked under the same issue https://github.com/reactor/reactor-core/issues/1634 and for now the use of DefaultDataBufferFactory is enforced as a workaround until the issue is resolved. See gh-22107 --- .../core/io/buffer/DataBufferUtils.java | 39 +++++++++++++++---- .../codec/ResourceRegionEncoderTests.java | 11 ++---- .../AbstractDataBufferAllocatingTestCase.java | 1 + .../core/io/buffer/DataBufferUtilsTests.java | 2 - .../io/buffer/LeakAwareDataBufferFactory.java | 1 + 5 files changed, 38 insertions(+), 16 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index 8ed89edff36..35f0e9c102d 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -57,6 +57,12 @@ public abstract class DataBufferUtils { private static final Consumer RELEASE_CONSUMER = DataBufferUtils::release; + /** + * Workaround to disable use of pooled buffers: + * https://github.com/reactor/reactor-core/issues/1634 + */ + private static final DataBufferFactory defaultDataBufferFactory = new DefaultDataBufferFactory(); + //--------------------------------------------------------------------- // Reading @@ -132,16 +138,21 @@ public abstract class DataBufferUtils { Assert.isTrue(position >= 0, "'position' must be >= 0"); Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0"); + DataBufferFactory bufferFactoryToUse = defaultDataBufferFactory; + Flux flux = Flux.using(channelSupplier, channel -> Flux.create(sink -> { ReadCompletionHandler handler = - new ReadCompletionHandler(channel, sink, position, bufferFactory, bufferSize); - DataBuffer dataBuffer = bufferFactory.allocateBuffer(bufferSize); + new ReadCompletionHandler(channel, sink, position, bufferFactoryToUse, bufferSize); + DataBuffer dataBuffer = bufferFactoryToUse.allocateBuffer(bufferSize); ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize); channel.read(byteBuffer, position, dataBuffer, handler); sink.onDispose(handler::dispose); }), - DataBufferUtils::closeChannel); + channel -> { + // Do not close channel from here, rather wait for the current read callback + // and then complete after releasing the DataBuffer. + }); return flux.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); } @@ -505,26 +516,40 @@ public abstract class DataBufferUtils { @Override public void completed(Integer read, DataBuffer dataBuffer) { - if (read != -1) { + if (read != -1 && !this.disposed.get()) { long pos = this.position.addAndGet(read); dataBuffer.writePosition(read); this.sink.next(dataBuffer); - if (!this.disposed.get()) { + // It's possible for cancellation to happen right before the push into the sink + if (this.disposed.get()) { + // TODO: + // This is not ideal since we already passed the buffer into the sink and + // releasing may cause something reading to fail. Maybe we won't have to + // do this after https://github.com/reactor/reactor-core/issues/1634 + complete(dataBuffer); + } + else { DataBuffer newDataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize); ByteBuffer newByteBuffer = newDataBuffer.asByteBuffer(0, this.bufferSize); this.channel.read(newByteBuffer, pos, newDataBuffer, this); } } else { - release(dataBuffer); - this.sink.complete(); + complete(dataBuffer); } } + private void complete(DataBuffer dataBuffer) { + release(dataBuffer); + this.sink.complete(); + closeChannel(this.channel); + } + @Override public void failed(Throwable exc, DataBuffer dataBuffer) { release(dataBuffer); this.sink.error(exc); + closeChannel(this.channel); } public void dispose() { diff --git a/spring-core/src/test/java/org/springframework/core/codec/ResourceRegionEncoderTests.java b/spring-core/src/test/java/org/springframework/core/codec/ResourceRegionEncoderTests.java index 112fd6a7c5c..d96b96b522d 100644 --- a/spring-core/src/test/java/org/springframework/core/codec/ResourceRegionEncoderTests.java +++ b/spring-core/src/test/java/org/springframework/core/codec/ResourceRegionEncoderTests.java @@ -80,8 +80,7 @@ public class ResourceRegionEncoderTests { .expectComplete() .verify(); - // TODO: https://github.com/reactor/reactor-core/issues/1634 - // this.bufferFactory.checkForLeaks(); + this.bufferFactory.checkForLeaks(); } @Test @@ -122,11 +121,10 @@ public class ResourceRegionEncoderTests { .expectComplete() .verify(); - // TODO: https://github.com/reactor/reactor-core/issues/1634 - // this.bufferFactory.checkForLeaks(); + this.bufferFactory.checkForLeaks(); } - @Test // gh- + @Test // gh-22107 public void cancelWithoutDemandForMultipleResourceRegions() { Resource resource = new ClassPathResource("ResourceRegionEncoderTests.txt", getClass()); Flux regions = Flux.just( @@ -173,8 +171,7 @@ public class ResourceRegionEncoderTests { .expectError(EncodingException.class) .verify(); - // TODO: https://github.com/reactor/reactor-core/issues/1634 - // this.bufferFactory.checkForLeaks(); + this.bufferFactory.checkForLeaks(); } protected Consumer stringConsumer(String expected) { diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/AbstractDataBufferAllocatingTestCase.java b/spring-core/src/test/java/org/springframework/core/io/buffer/AbstractDataBufferAllocatingTestCase.java index a7e21feefc4..9ed9b0b6a5a 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/AbstractDataBufferAllocatingTestCase.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/AbstractDataBufferAllocatingTestCase.java @@ -135,6 +135,7 @@ public abstract class AbstractDataBufferAllocatingTestCase { catch (InterruptedException ex) { // ignore } + continue; } assertEquals("ByteBuf Leak: " + total + " unreleased allocations", 0, total); } diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java index 314788bdcd4..8e2ece6e58e 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java @@ -187,8 +187,6 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { .verify(); } - // TODO: Remove ignore after https://github.com/reactor/reactor-core/issues/1634 - @Ignore @Test // gh-22107 public void readAsynchronousFileChannelCancelWithoutDemand() throws Exception { URI uri = this.resource.getURI(); diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBufferFactory.java b/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBufferFactory.java index e270533af8c..e79da102758 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBufferFactory.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBufferFactory.java @@ -85,6 +85,7 @@ public class LeakAwareDataBufferFactory implements DataBufferFactory { catch (InterruptedException ex) { // ignore } + continue; } List errors = this.created.stream() .filter(LeakAwareDataBuffer::isAllocated)