@ -32,7 +32,6 @@ import org.reactivestreams.Subscription;
@@ -32,7 +32,6 @@ import org.reactivestreams.Subscription;
import org.springframework.core.io.buffer.DataBuffer ;
import org.springframework.core.io.buffer.DataBufferFactory ;
import org.springframework.core.io.buffer.DataBufferUtils ;
import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream ;
@ -51,11 +50,13 @@ class DataBufferPublisherAdapter {
@@ -51,11 +50,13 @@ class DataBufferPublisherAdapter {
*
* @param inputStream must not be { @literal null } .
* @param dataBufferFactory must not be { @literal null } .
* @param bufferSize read { @code n } bytes per iteration .
* @return the resulting { @link Publisher } .
* /
static Flux < DataBuffer > createBinaryStream ( AsyncInputStream inputStream , DataBufferFactory dataBufferFactory ) {
static Flux < DataBuffer > createBinaryStream ( AsyncInputStream inputStream , DataBufferFactory dataBufferFactory ,
int bufferSize ) {
State state = new State ( inputStream , dataBufferFactory ) ;
State state = new State ( inputStream , dataBufferFactory , bufferSize ) ;
return Flux . usingWhen ( Mono . just ( inputStream ) , it - > {
@ -92,6 +93,7 @@ class DataBufferPublisherAdapter {
@@ -92,6 +93,7 @@ class DataBufferPublisherAdapter {
final AsyncInputStream inputStream ;
final DataBufferFactory dataBufferFactory ;
final int bufferSize ;
// see DEMAND
volatile long demand ;
@ -105,8 +107,16 @@ class DataBufferPublisherAdapter {
@@ -105,8 +107,16 @@ class DataBufferPublisherAdapter {
void request ( FluxSink < DataBuffer > sink , long n ) {
Operators . addCap ( DEMAND , this , n ) ;
drainLoop ( sink ) ;
}
if ( onShouldRead ( ) ) {
/ * *
* Loops while we have demand and while no read is in progress .
*
* @param sink
* /
void drainLoop ( FluxSink < DataBuffer > sink ) {
while ( onShouldRead ( ) ) {
emitNext ( sink ) ;
}
}
@ -119,16 +129,16 @@ class DataBufferPublisherAdapter {
@@ -119,16 +129,16 @@ class DataBufferPublisherAdapter {
return READ . compareAndSet ( this , READ_NONE , READ_IN_PROGRESS ) ;
}
boolean onReadDone ( ) {
return READ . compareAndSet ( this , READ_IN_PROGRESS , READ_NONE ) ;
void onReadDone ( ) {
READ . compareAndSet ( this , READ_IN_PROGRESS , READ_NONE ) ;
}
long getDemand ( ) {
return DEMAND . get ( this ) ;
}
boolean decrementDemand ( ) {
return DEMAND . decrementAndGet ( this ) > 0 ;
void decrementDemand ( ) {
DEMAND . decrementAndGet ( this ) ;
}
void close ( ) {
@ -143,15 +153,15 @@ class DataBufferPublisherAdapter {
@@ -143,15 +153,15 @@ class DataBufferPublisherAdapter {
* Emit the next { @link DataBuffer } .
*
* @param sink
* @return
* /
void emitNext ( FluxSink < DataBuffer > sink ) {
DataBuffer dataBuffer = dataBufferFactory . allocateBuffer ( ) ;
ByteBuffer intermediate = ByteBuffer . allocate ( dataBuffer . capacity ( ) ) ;
private void emitNext ( FluxSink < DataBuffer > sink ) {
ByteBuffer transport = ByteBuffer . allocate ( bufferSize ) ;
BufferCoreSubscriber bufferCoreSubscriber = new BufferCoreSubscriber ( sink , dataBufferFactory , transport ) ;
try {
Mono . from ( inputStream . read ( intermediate ) ) . subscribe ( new BufferCoreSubscriber ( sink , dataBuffer , intermediate ) ) ;
} catch ( Exception e ) {
inputStream . read ( transport ) . subscribe ( bufferCoreSubscriber ) ;
} catch ( Throwable e ) {
sink . error ( e ) ;
}
}
@ -159,14 +169,16 @@ class DataBufferPublisherAdapter {
@@ -159,14 +169,16 @@ class DataBufferPublisherAdapter {
private class BufferCoreSubscriber implements CoreSubscriber < Integer > {
private final FluxSink < DataBuffer > sink ;
private final DataBuffer dataBuffer ;
private final ByteBuffer intermediate ;
private final DataBufferFactory factory ;
private final ByteBuffer transport ;
private final Thread subscribeThread = Thread . currentThread ( ) ;
private volatile Subscription subscription ;
BufferCoreSubscriber ( FluxSink < DataBuffer > sink , DataBuffer dataBuffer , ByteBuffer intermediate ) {
BufferCoreSubscriber ( FluxSink < DataBuffer > sink , DataBufferFactory factory , ByteBuffer transpor t ) {
this . sink = sink ;
this . dataBuffer = dataBuffer ;
this . in te rmedi ate = in te rmedi ate ;
this . factory = factory ;
this . transpor t = transpor t ;
}
@Override
@ -176,6 +188,7 @@ class DataBufferPublisherAdapter {
@@ -176,6 +188,7 @@ class DataBufferPublisherAdapter {
@Override
public void onSubscribe ( Subscription s ) {
this . subscription = s ;
s . request ( 1 ) ;
}
@ -185,24 +198,32 @@ class DataBufferPublisherAdapter {
@@ -185,24 +198,32 @@ class DataBufferPublisherAdapter {
if ( isClosed ( ) ) {
onReadDone ( ) ;
DataBufferUtils . release ( dataBuffer ) ;
Operators . onNextDropped ( dataBuffer , sink . currentContext ( ) ) ;
return ;
}
intermediate . flip ( ) ;
dataBuffer . write ( intermediate ) ;
if ( bytes > 0 ) {
sink . next ( dataBuffer ) ;
decrementDemand ( ) ;
transport . flip ( ) ;
DataBuffer dataBuffer = factory . allocateBuffer ( transport . remaining ( ) ) ;
dataBuffer . write ( transport ) ;
transport . clear ( ) ;
sink . next ( dataBuffer ) ;
decrementDemand ( ) ;
}
try {
if ( bytes = = - 1 ) {
sink . complete ( ) ;
return ;
}
} finally {
onReadDone ( ) ;
}
subscription . request ( 1 ) ;
}
@Override
@ -215,16 +236,14 @@ class DataBufferPublisherAdapter {
@@ -215,16 +236,14 @@ class DataBufferPublisherAdapter {
}
onReadDone ( ) ;
DataBufferUtils . release ( dataBuffer ) ;
Operators . onNextDropped ( dataBuffer , sink . currentContext ( ) ) ;
sink . error ( t ) ;
}
@Override
public void onComplete ( ) {
if ( onShouldR ead( ) ) {
emitNext ( sink ) ;
if ( subscribeThread ! = Thread . currentThr ead( ) ) {
drainLoop ( sink ) ;
}
}
}