diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index 8ed89edff36..35f0e9c102d 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -57,6 +57,12 @@ public abstract class DataBufferUtils { private static final Consumer RELEASE_CONSUMER = DataBufferUtils::release; + /** + * Workaround to disable use of pooled buffers: + * https://github.com/reactor/reactor-core/issues/1634 + */ + private static final DataBufferFactory defaultDataBufferFactory = new DefaultDataBufferFactory(); + //--------------------------------------------------------------------- // Reading @@ -132,16 +138,21 @@ public abstract class DataBufferUtils { Assert.isTrue(position >= 0, "'position' must be >= 0"); Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0"); + DataBufferFactory bufferFactoryToUse = defaultDataBufferFactory; + Flux flux = Flux.using(channelSupplier, channel -> Flux.create(sink -> { ReadCompletionHandler handler = - new ReadCompletionHandler(channel, sink, position, bufferFactory, bufferSize); - DataBuffer dataBuffer = bufferFactory.allocateBuffer(bufferSize); + new ReadCompletionHandler(channel, sink, position, bufferFactoryToUse, bufferSize); + DataBuffer dataBuffer = bufferFactoryToUse.allocateBuffer(bufferSize); ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize); channel.read(byteBuffer, position, dataBuffer, handler); sink.onDispose(handler::dispose); }), - DataBufferUtils::closeChannel); + channel -> { + // Do not close channel from here, rather wait for the current read callback + // and then complete after releasing the DataBuffer. + }); return flux.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); } @@ -505,26 +516,40 @@ public abstract class DataBufferUtils { @Override public void completed(Integer read, DataBuffer dataBuffer) { - if (read != -1) { + if (read != -1 && !this.disposed.get()) { long pos = this.position.addAndGet(read); dataBuffer.writePosition(read); this.sink.next(dataBuffer); - if (!this.disposed.get()) { + // It's possible for cancellation to happen right before the push into the sink + if (this.disposed.get()) { + // TODO: + // This is not ideal since we already passed the buffer into the sink and + // releasing may cause something reading to fail. Maybe we won't have to + // do this after https://github.com/reactor/reactor-core/issues/1634 + complete(dataBuffer); + } + else { DataBuffer newDataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize); ByteBuffer newByteBuffer = newDataBuffer.asByteBuffer(0, this.bufferSize); this.channel.read(newByteBuffer, pos, newDataBuffer, this); } } else { - release(dataBuffer); - this.sink.complete(); + complete(dataBuffer); } } + private void complete(DataBuffer dataBuffer) { + release(dataBuffer); + this.sink.complete(); + closeChannel(this.channel); + } + @Override public void failed(Throwable exc, DataBuffer dataBuffer) { release(dataBuffer); this.sink.error(exc); + closeChannel(this.channel); } public void dispose() { diff --git a/spring-core/src/test/java/org/springframework/core/codec/ResourceRegionEncoderTests.java b/spring-core/src/test/java/org/springframework/core/codec/ResourceRegionEncoderTests.java index 112fd6a7c5c..d96b96b522d 100644 --- a/spring-core/src/test/java/org/springframework/core/codec/ResourceRegionEncoderTests.java +++ b/spring-core/src/test/java/org/springframework/core/codec/ResourceRegionEncoderTests.java @@ -80,8 +80,7 @@ public class ResourceRegionEncoderTests { .expectComplete() .verify(); - // TODO: https://github.com/reactor/reactor-core/issues/1634 - // this.bufferFactory.checkForLeaks(); + this.bufferFactory.checkForLeaks(); } @Test @@ -122,11 +121,10 @@ public class ResourceRegionEncoderTests { .expectComplete() .verify(); - // TODO: https://github.com/reactor/reactor-core/issues/1634 - // this.bufferFactory.checkForLeaks(); + this.bufferFactory.checkForLeaks(); } - @Test // gh- + @Test // gh-22107 public void cancelWithoutDemandForMultipleResourceRegions() { Resource resource = new ClassPathResource("ResourceRegionEncoderTests.txt", getClass()); Flux regions = Flux.just( @@ -173,8 +171,7 @@ public class ResourceRegionEncoderTests { .expectError(EncodingException.class) .verify(); - // TODO: https://github.com/reactor/reactor-core/issues/1634 - // this.bufferFactory.checkForLeaks(); + this.bufferFactory.checkForLeaks(); } protected Consumer stringConsumer(String expected) { diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/AbstractDataBufferAllocatingTestCase.java b/spring-core/src/test/java/org/springframework/core/io/buffer/AbstractDataBufferAllocatingTestCase.java index 5409e71489b..3904bdc6b1b 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/AbstractDataBufferAllocatingTestCase.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/AbstractDataBufferAllocatingTestCase.java @@ -137,6 +137,7 @@ public abstract class AbstractDataBufferAllocatingTestCase { catch (InterruptedException ex) { // ignore } + continue; } assertEquals("ByteBuf Leak: " + total + " unreleased allocations", 0, total); } diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java index 314788bdcd4..8e2ece6e58e 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java @@ -187,8 +187,6 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { .verify(); } - // TODO: Remove ignore after https://github.com/reactor/reactor-core/issues/1634 - @Ignore @Test // gh-22107 public void readAsynchronousFileChannelCancelWithoutDemand() throws Exception { URI uri = this.resource.getURI(); diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBufferFactory.java b/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBufferFactory.java index e270533af8c..e79da102758 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBufferFactory.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBufferFactory.java @@ -85,6 +85,7 @@ public class LeakAwareDataBufferFactory implements DataBufferFactory { catch (InterruptedException ex) { // ignore } + continue; } List errors = this.created.stream() .filter(LeakAwareDataBuffer::isAllocated)