@ -683,9 +683,7 @@ public abstract class DataBufferUtils {
private final AtomicLong position ;
private final AtomicLong position ;
private final AtomicBoolean reading = new AtomicBoolean ( ) ;
private final AtomicReference < State > state = new AtomicReference < > ( State . IDLE ) ;
private final AtomicBoolean disposed = new AtomicBoolean ( ) ;
public ReadCompletionHandler ( AsynchronousFileChannel channel ,
public ReadCompletionHandler ( AsynchronousFileChannel channel ,
FluxSink < DataBuffer > sink , long position , DataBufferFactory dataBufferFactory , int bufferSize ) {
FluxSink < DataBuffer > sink , long position , DataBufferFactory dataBufferFactory , int bufferSize ) {
@ -697,39 +695,68 @@ public abstract class DataBufferUtils {
this . bufferSize = bufferSize ;
this . bufferSize = bufferSize ;
}
}
public void read ( ) {
/ * *
if ( this . sink . requestedFromDownstream ( ) > 0 & &
* Invoked when Reactive Streams consumer signals demand .
isNotDisposed ( ) & &
* /
this . reading . compareAndSet ( false , true ) ) {
public void request ( long n ) {
DataBuffer dataBuffer = this . dataBufferFactory . allocateBuffer ( this . bufferSize ) ;
tryRead ( ) ;
ByteBuffer byteBuffer = dataBuffer . asByteBuffer ( 0 , this . bufferSize ) ;
}
this . channel . read ( byteBuffer , this . position . get ( ) , dataBuffer , this ) ;
/ * *
* Invoked when Reactive Streams consumer cancels .
* /
public void cancel ( ) {
this . state . getAndSet ( State . DISPOSED ) ;
// According java.nio.channels.AsynchronousChannel "if an I/O operation is outstanding
// on the channel and the channel's close method is invoked, then the I/O operation
// fails with the exception AsynchronousCloseException". That should invoke the failed
// callback below which and the current DataBuffer should be released.
closeChannel ( this . channel ) ;
}
private void tryRead ( ) {
if ( this . sink . requestedFromDownstream ( ) > 0 & & this . state . compareAndSet ( State . IDLE , State . READING ) ) {
read ( ) ;
}
}
}
}
private void read ( ) {
DataBuffer dataBuffer = this . dataBufferFactory . allocateBuffer ( this . bufferSize ) ;
ByteBuffer byteBuffer = dataBuffer . asByteBuffer ( 0 , this . bufferSize ) ;
this . channel . read ( byteBuffer , this . position . get ( ) , dataBuffer , this ) ;
}
@Override
@Override
public void completed ( Integer read , DataBuffer dataBuffer ) {
public void completed ( Integer read , DataBuffer dataBuffer ) {
if ( isNotDisposed ( ) ) {
if ( this . state . get ( ) . equals ( State . DISPOSED ) ) {
if ( read ! = - 1 ) {
release ( dataBuffer ) ;
this . position . addAndGet ( read ) ;
closeChannel ( this . channel ) ;
dataBuffer . writePosition ( read ) ;
return ;
this . sink . next ( dataBuffer ) ;
this . reading . set ( false ) ;
read ( ) ;
}
else {
release ( dataBuffer ) ;
closeChannel ( this . channel ) ;
if ( this . disposed . compareAndSet ( false , true ) ) {
this . sink . complete ( ) ;
}
this . reading . set ( false ) ;
}
}
}
else {
if ( read = = - 1 ) {
release ( dataBuffer ) ;
release ( dataBuffer ) ;
closeChannel ( this . channel ) ;
closeChannel ( this . channel ) ;
this . reading . set ( false ) ;
this . state . set ( State . DISPOSED ) ;
this . sink . complete ( ) ;
return ;
}
this . position . addAndGet ( read ) ;
dataBuffer . writePosition ( read ) ;
this . sink . next ( dataBuffer ) ;
// Stay in READING mode if there is demand
if ( this . sink . requestedFromDownstream ( ) > 0 ) {
read ( ) ;
return ;
}
// Release READING mode and then try again in case of concurrent "request"
if ( this . state . compareAndSet ( State . READING , State . IDLE ) ) {
tryRead ( ) ;
}
}
}
}
@ -737,26 +764,12 @@ public abstract class DataBufferUtils {
public void failed ( Throwable exc , DataBuffer dataBuffer ) {
public void failed ( Throwable exc , DataBuffer dataBuffer ) {
release ( dataBuffer ) ;
release ( dataBuffer ) ;
closeChannel ( this . channel ) ;
closeChannel ( this . channel ) ;
if ( this . disposed . compareAndSet ( false , true ) ) {
this . state . set ( State . DISPOSED ) ;
this . sink . error ( exc ) ;
this . sink . error ( exc ) ;
}
this . reading . set ( false ) ;
}
public void request ( long n ) {
read ( ) ;
}
public void cancel ( ) {
if ( this . disposed . compareAndSet ( false , true ) ) {
if ( ! this . reading . get ( ) ) {
closeChannel ( this . channel ) ;
}
}
}
}
private boolean isNotDisposed ( ) {
private enum State {
return ! this . disposed . get ( ) ;
IDLE , READING , DISPOSED
}
}
}
}