|
|
|
|
@ -683,9 +683,7 @@ public abstract class DataBufferUtils {
@@ -683,9 +683,7 @@ public abstract class DataBufferUtils {
|
|
|
|
|
|
|
|
|
|
private final AtomicLong position; |
|
|
|
|
|
|
|
|
|
private final AtomicBoolean reading = new AtomicBoolean(); |
|
|
|
|
|
|
|
|
|
private final AtomicBoolean disposed = new AtomicBoolean(); |
|
|
|
|
private final AtomicReference<State> state = new AtomicReference<>(State.IDLE); |
|
|
|
|
|
|
|
|
|
public ReadCompletionHandler(AsynchronousFileChannel channel, |
|
|
|
|
FluxSink<DataBuffer> sink, long position, DataBufferFactory dataBufferFactory, int bufferSize) { |
|
|
|
|
@ -697,39 +695,68 @@ public abstract class DataBufferUtils {
@@ -697,39 +695,68 @@ public abstract class DataBufferUtils {
|
|
|
|
|
this.bufferSize = bufferSize; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public void read() { |
|
|
|
|
if (this.sink.requestedFromDownstream() > 0 && |
|
|
|
|
isNotDisposed() && |
|
|
|
|
this.reading.compareAndSet(false, true)) { |
|
|
|
|
DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize); |
|
|
|
|
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, this.bufferSize); |
|
|
|
|
this.channel.read(byteBuffer, this.position.get(), dataBuffer, this); |
|
|
|
|
/** |
|
|
|
|
* Invoked when Reactive Streams consumer signals demand. |
|
|
|
|
*/ |
|
|
|
|
public void request(long n) { |
|
|
|
|
tryRead(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Invoked when Reactive Streams consumer cancels. |
|
|
|
|
*/ |
|
|
|
|
public void cancel() { |
|
|
|
|
this.state.getAndSet(State.DISPOSED); |
|
|
|
|
|
|
|
|
|
// According java.nio.channels.AsynchronousChannel "if an I/O operation is outstanding
|
|
|
|
|
// on the channel and the channel's close method is invoked, then the I/O operation
|
|
|
|
|
// fails with the exception AsynchronousCloseException". That should invoke the failed
|
|
|
|
|
// callback below and the current DataBuffer should be released.
|
|
|
|
|
|
|
|
|
|
closeChannel(this.channel); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void tryRead() { |
|
|
|
|
if (this.sink.requestedFromDownstream() > 0 && this.state.compareAndSet(State.IDLE, State.READING)) { |
|
|
|
|
read(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void read() { |
|
|
|
|
DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize); |
|
|
|
|
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, this.bufferSize); |
|
|
|
|
this.channel.read(byteBuffer, this.position.get(), dataBuffer, this); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public void completed(Integer read, DataBuffer dataBuffer) { |
|
|
|
|
if (isNotDisposed()) { |
|
|
|
|
if (read != -1) { |
|
|
|
|
this.position.addAndGet(read); |
|
|
|
|
dataBuffer.writePosition(read); |
|
|
|
|
this.sink.next(dataBuffer); |
|
|
|
|
this.reading.set(false); |
|
|
|
|
read(); |
|
|
|
|
} |
|
|
|
|
else { |
|
|
|
|
release(dataBuffer); |
|
|
|
|
closeChannel(this.channel); |
|
|
|
|
if (this.disposed.compareAndSet(false, true)) { |
|
|
|
|
this.sink.complete(); |
|
|
|
|
} |
|
|
|
|
this.reading.set(false); |
|
|
|
|
} |
|
|
|
|
if (this.state.get().equals(State.DISPOSED)) { |
|
|
|
|
release(dataBuffer); |
|
|
|
|
closeChannel(this.channel); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
else { |
|
|
|
|
|
|
|
|
|
if (read == -1) { |
|
|
|
|
release(dataBuffer); |
|
|
|
|
closeChannel(this.channel); |
|
|
|
|
this.reading.set(false); |
|
|
|
|
this.state.set(State.DISPOSED); |
|
|
|
|
this.sink.complete(); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
this.position.addAndGet(read); |
|
|
|
|
dataBuffer.writePosition(read); |
|
|
|
|
this.sink.next(dataBuffer); |
|
|
|
|
|
|
|
|
|
// Stay in READING mode if there is demand
|
|
|
|
|
if (this.sink.requestedFromDownstream() > 0) { |
|
|
|
|
read(); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Release READING mode and then try again in case of concurrent "request"
|
|
|
|
|
if (this.state.compareAndSet(State.READING, State.IDLE)) { |
|
|
|
|
tryRead(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -737,26 +764,12 @@ public abstract class DataBufferUtils {
@@ -737,26 +764,12 @@ public abstract class DataBufferUtils {
|
|
|
|
|
public void failed(Throwable exc, DataBuffer dataBuffer) { |
|
|
|
|
release(dataBuffer); |
|
|
|
|
closeChannel(this.channel); |
|
|
|
|
if (this.disposed.compareAndSet(false, true)) { |
|
|
|
|
this.sink.error(exc); |
|
|
|
|
} |
|
|
|
|
this.reading.set(false); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public void request(long n) { |
|
|
|
|
read(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public void cancel() { |
|
|
|
|
if (this.disposed.compareAndSet(false, true)) { |
|
|
|
|
if (!this.reading.get()) { |
|
|
|
|
closeChannel(this.channel); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
this.state.set(State.DISPOSED); |
|
|
|
|
this.sink.error(exc); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private boolean isNotDisposed() { |
|
|
|
|
return !this.disposed.get(); |
|
|
|
|
private enum State { |
|
|
|
|
IDLE, READING, DISPOSED |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|