From 2c5958e191be3db2726d03fb0ceb0e39f17351a5 Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Thu, 29 Aug 2019 16:07:22 +0200 Subject: [PATCH] Support back pressure in DataBufferUtils::readAsynchronousFileChannel This commit adds support for back pressure in the ReadCompletionHandler, as used by DataBufferUtils::readAsynchronousFileChannel. See gh-23518 --- .../core/io/buffer/DataBufferUtils.java | 67 ++++++++++++------- 1 file changed, 43 insertions(+), 24 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index b12f9434b2d..bc97362c989 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -139,10 +139,8 @@ public abstract class DataBufferUtils { channel -> Flux.create(sink -> { ReadCompletionHandler handler = new ReadCompletionHandler(channel, sink, position, bufferFactory, bufferSize); - sink.onDispose(handler::dispose); - DataBuffer dataBuffer = bufferFactory.allocateBuffer(bufferSize); - ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize); - channel.read(byteBuffer, position, dataBuffer, handler); + sink.onCancel(handler::cancel); + sink.onRequest(handler::request); }), channel -> { // Do not close channel from here, rather wait for the current read callback @@ -506,7 +504,9 @@ public abstract class DataBufferUtils { private final AtomicLong position; - private final AtomicBoolean disposed = new AtomicBoolean(); + private final AtomicBoolean reading = new AtomicBoolean(); + + private final AtomicBoolean canceled = new AtomicBoolean(); public ReadCompletionHandler(AsynchronousFileChannel channel, FluxSink sink, long position, DataBufferFactory dataBufferFactory, int bufferSize) { @@ -518,43 +518,62 @@ public abstract class DataBufferUtils { this.bufferSize = bufferSize; } + public void read() { + if (this.sink.requestedFromDownstream() > 0 && this.reading.compareAndSet(false, true)) { + DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize); + ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, this.bufferSize); + this.channel.read(byteBuffer, this.position.get(), dataBuffer, this); + } + } + @Override public void completed(Integer read, DataBuffer dataBuffer) { - if (read != -1 && !this.disposed.get()) { - long pos = this.position.addAndGet(read); - dataBuffer.writePosition(read); - this.sink.next(dataBuffer); - // onNext may have led to onCancel (e.g. downstream takeUntil) - if (this.disposed.get()) { - complete(); + this.reading.set(false); + if (!isCanceled()) { + if (read != -1) { + this.position.addAndGet(read); + dataBuffer.writePosition(read); + this.sink.next(dataBuffer); + read(); } else { - DataBuffer newDataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize); - ByteBuffer newByteBuffer = newDataBuffer.asByteBuffer(0, this.bufferSize); - this.channel.read(newByteBuffer, pos, newDataBuffer, this); + release(dataBuffer); + closeChannel(this.channel); + this.sink.complete(); } } else { release(dataBuffer); - complete(); + closeChannel(this.channel); } } - private void complete() { - this.sink.complete(); - closeChannel(this.channel); - } - @Override public void failed(Throwable exc, DataBuffer dataBuffer) { + this.reading.set(false); release(dataBuffer); - this.sink.error(exc); closeChannel(this.channel); + if (!isCanceled()) { + this.sink.error(exc); + } + } + + public void request(long n) { + read(); } - public void dispose() { - this.disposed.set(true); + public void cancel() { + if (this.canceled.compareAndSet(false, true)) { + if (!this.reading.get()) { + closeChannel(this.channel); + } + } } + + private boolean isCanceled() { + return this.canceled.get(); + } + }