@ -62,7 +62,7 @@ class DataBufferPublisherAdapter {
/ * *
/ * *
* Use an { @link AsyncInputStreamHandler } to read data from the given { @link AsyncInputStream } .
* Use an { @link AsyncInputStreamHandler } to read data from the given { @link AsyncInputStream } .
*
*
* @param inputStream the source stream .
* @param inputStream the source stream .
* @return a { @link Flux } emitting data chunks one by one .
* @return a { @link Flux } emitting data chunks one by one .
* @since 2 . 2 . 1
* @since 2 . 2 . 1
@ -71,7 +71,6 @@ class DataBufferPublisherAdapter {
AsyncInputStreamHandler streamHandler = new AsyncInputStreamHandler ( inputStream , inputStream . dataBufferFactory ,
AsyncInputStreamHandler streamHandler = new AsyncInputStreamHandler ( inputStream , inputStream . dataBufferFactory ,
inputStream . bufferSize ) ;
inputStream . bufferSize ) ;
return Flux . create ( ( sink ) - > {
return Flux . create ( ( sink ) - > {
sink . onDispose ( streamHandler : : close ) ;
sink . onDispose ( streamHandler : : close ) ;
@ -87,7 +86,7 @@ class DataBufferPublisherAdapter {
* An { @link AsyncInputStream } also holding a { @link DataBufferFactory } and default { @literal bufferSize } for reading
* An { @link AsyncInputStream } also holding a { @link DataBufferFactory } and default { @literal bufferSize } for reading
* from it , delegating operations on the { @link AsyncInputStream } to the reference instance . < br / >
* from it , delegating operations on the { @link AsyncInputStream } to the reference instance . < br / >
* Used to pass on the { @link AsyncInputStream } and parameters to avoid capturing lambdas .
* Used to pass on the { @link AsyncInputStream } and parameters to avoid capturing lambdas .
*
*
* @author Christoph Strobl
* @author Christoph Strobl
* @since 2 . 2 . 1
* @since 2 . 2 . 1
* /
* /
@ -146,12 +145,18 @@ class DataBufferPublisherAdapter {
private static final AtomicIntegerFieldUpdater < AsyncInputStreamHandler > STATE = AtomicIntegerFieldUpdater
private static final AtomicIntegerFieldUpdater < AsyncInputStreamHandler > STATE = AtomicIntegerFieldUpdater
. newUpdater ( AsyncInputStreamHandler . class , "state" ) ;
. newUpdater ( AsyncInputStreamHandler . class , "state" ) ;
private static final AtomicIntegerFieldUpdater < AsyncInputStreamHandler > DRAIN = AtomicIntegerFieldUpdater
. newUpdater ( AsyncInputStreamHandler . class , "drain" ) ;
private static final AtomicIntegerFieldUpdater < AsyncInputStreamHandler > READ = AtomicIntegerFieldUpdater
private static final AtomicIntegerFieldUpdater < AsyncInputStreamHandler > READ = AtomicIntegerFieldUpdater
. newUpdater ( AsyncInputStreamHandler . class , "read" ) ;
. newUpdater ( AsyncInputStreamHandler . class , "read" ) ;
private static final int STATE_OPEN = 0 ;
private static final int STATE_OPEN = 0 ;
private static final int STATE_CLOSED = 1 ;
private static final int STATE_CLOSED = 1 ;
private static final int DRAIN_NONE = 0 ;
private static final int DRAIN_COMPLETION = 1 ;
private static final int READ_NONE = 0 ;
private static final int READ_NONE = 0 ;
private static final int READ_IN_PROGRESS = 1 ;
private static final int READ_IN_PROGRESS = 1 ;
@ -165,6 +170,9 @@ class DataBufferPublisherAdapter {
// see STATE
// see STATE
volatile int state = STATE_OPEN ;
volatile int state = STATE_OPEN ;
// see DRAIN
volatile int drain = DRAIN_NONE ;
// see READ_IN_PROGRESS
// see READ_IN_PROGRESS
volatile int read = READ_NONE ;
volatile int read = READ_NONE ;
@ -209,6 +217,14 @@ class DataBufferPublisherAdapter {
STATE . compareAndSet ( this , STATE_OPEN , STATE_CLOSED ) ;
STATE . compareAndSet ( this , STATE_OPEN , STATE_CLOSED ) ;
}
}
boolean enterDrainLoop ( ) {
return DRAIN . compareAndSet ( this , DRAIN_NONE , DRAIN_COMPLETION ) ;
}
void leaveDrainLoop ( ) {
DRAIN . set ( this , DRAIN_NONE ) ;
}
boolean isClosed ( ) {
boolean isClosed ( ) {
return STATE . get ( this ) = = STATE_CLOSED ;
return STATE . get ( this ) = = STATE_CLOSED ;
}
}
@ -235,7 +251,6 @@ class DataBufferPublisherAdapter {
private final FluxSink < DataBuffer > sink ;
private final FluxSink < DataBuffer > sink ;
private final DataBufferFactory factory ;
private final DataBufferFactory factory ;
private final ByteBuffer transport ;
private final ByteBuffer transport ;
private final Thread subscribeThread = Thread . currentThread ( ) ;
private volatile Subscription subscription ;
private volatile Subscription subscription ;
BufferCoreSubscriber ( FluxSink < DataBuffer > sink , DataBufferFactory factory , ByteBuffer transport ) {
BufferCoreSubscriber ( FluxSink < DataBuffer > sink , DataBufferFactory factory , ByteBuffer transport ) {
@ -261,8 +276,6 @@ class DataBufferPublisherAdapter {
public void onNext ( Integer bytes ) {
public void onNext ( Integer bytes ) {
if ( isClosed ( ) ) {
if ( isClosed ( ) ) {
onReadDone ( ) ;
return ;
return ;
}
}
@ -273,13 +286,9 @@ class DataBufferPublisherAdapter {
decrementDemand ( ) ;
decrementDemand ( ) ;
}
}
try {
if ( bytes = = - 1 ) {
if ( bytes = = - 1 ) {
sink . complete ( ) ;
sink . complete ( ) ;
return ;
return ;
}
} finally {
onReadDone ( ) ;
}
}
subscription . request ( 1 ) ;
subscription . request ( 1 ) ;
@ -306,15 +315,25 @@ class DataBufferPublisherAdapter {
return ;
return ;
}
}
onReadDon e( ) ;
clos e( ) ;
sink . error ( t ) ;
sink . error ( t ) ;
}
}
@Override
@Override
public void onComplete ( ) {
public void onComplete ( ) {
if ( subscribeThread ! = Thread . currentThread ( ) ) {
onReadDone ( ) ;
drainLoop ( sink ) ;
if ( ! isClosed ( ) ) {
if ( enterDrainLoop ( ) ) {
try {
drainLoop ( sink ) ;
} finally {
leaveDrainLoop ( ) ;
}
}
}
}
}
}
}
}