diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java index dfeb1411d..654f9d7f0 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java @@ -32,7 +32,6 @@ import org.reactivestreams.Subscription; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; -import org.springframework.core.io.buffer.DataBufferUtils; import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; @@ -51,11 +50,13 @@ class DataBufferPublisherAdapter { * * @param inputStream must not be {@literal null}. * @param dataBufferFactory must not be {@literal null}. + * @param bufferSize read {@code n} bytes per iteration. * @return the resulting {@link Publisher}. */ - static Flux createBinaryStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) { + static Flux createBinaryStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory, + int bufferSize) { - State state = new State(inputStream, dataBufferFactory); + State state = new State(inputStream, dataBufferFactory, bufferSize); return Flux.usingWhen(Mono.just(inputStream), it -> { @@ -92,6 +93,7 @@ class DataBufferPublisherAdapter { final AsyncInputStream inputStream; final DataBufferFactory dataBufferFactory; + final int bufferSize; // see DEMAND volatile long demand; @@ -105,8 +107,16 @@ class DataBufferPublisherAdapter { void request(FluxSink sink, long n) { Operators.addCap(DEMAND, this, n); + drainLoop(sink); + } - if (onShouldRead()) { + /** + * Loops while we have demand and while no read is in progress. + * + * @param sink + */ + void drainLoop(FluxSink sink) { + while (onShouldRead()) { emitNext(sink); } } @@ -119,16 +129,16 @@ class DataBufferPublisherAdapter { return READ.compareAndSet(this, READ_NONE, READ_IN_PROGRESS); } - boolean onReadDone() { - return READ.compareAndSet(this, READ_IN_PROGRESS, READ_NONE); + void onReadDone() { + READ.compareAndSet(this, READ_IN_PROGRESS, READ_NONE); } long getDemand() { return DEMAND.get(this); } - boolean decrementDemand() { - return DEMAND.decrementAndGet(this) > 0; + void decrementDemand() { + DEMAND.decrementAndGet(this); } void close() { @@ -143,15 +153,15 @@ class DataBufferPublisherAdapter { * Emit the next {@link DataBuffer}. * * @param sink + * @return */ - void emitNext(FluxSink sink) { - - DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(); - ByteBuffer intermediate = ByteBuffer.allocate(dataBuffer.capacity()); + private void emitNext(FluxSink sink) { + ByteBuffer transport = ByteBuffer.allocate(bufferSize); + BufferCoreSubscriber bufferCoreSubscriber = new BufferCoreSubscriber(sink, dataBufferFactory, transport); try { - Mono.from(inputStream.read(intermediate)).subscribe(new BufferCoreSubscriber(sink, dataBuffer, intermediate)); - } catch (Exception e) { + inputStream.read(transport).subscribe(bufferCoreSubscriber); + } catch (Throwable e) { sink.error(e); } } @@ -159,14 +169,16 @@ class DataBufferPublisherAdapter { private class BufferCoreSubscriber implements CoreSubscriber { private final FluxSink sink; - private final DataBuffer dataBuffer; - private final ByteBuffer intermediate; + private final DataBufferFactory factory; + private final ByteBuffer transport; + private final Thread subscribeThread = Thread.currentThread(); + private volatile Subscription subscription; - BufferCoreSubscriber(FluxSink sink, DataBuffer dataBuffer, ByteBuffer intermediate) { + BufferCoreSubscriber(FluxSink sink, DataBufferFactory factory, ByteBuffer transport) { this.sink = sink; - this.dataBuffer = dataBuffer; - this.intermediate = intermediate; + this.factory = factory; + this.transport = transport; } @Override @@ -176,6 +188,7 @@ class DataBufferPublisherAdapter { @Override public void onSubscribe(Subscription s) { + this.subscription = s; s.request(1); } @@ -185,24 +198,32 @@ class DataBufferPublisherAdapter { if (isClosed()) { onReadDone(); - DataBufferUtils.release(dataBuffer); - Operators.onNextDropped(dataBuffer, sink.currentContext()); return; } - intermediate.flip(); - dataBuffer.write(intermediate); + if (bytes > 0) { - sink.next(dataBuffer); - decrementDemand(); + transport.flip(); + + DataBuffer dataBuffer = factory.allocateBuffer(transport.remaining()); + dataBuffer.write(transport); + + transport.clear(); + sink.next(dataBuffer); + + decrementDemand(); + } try { if (bytes == -1) { sink.complete(); + return; } } finally { onReadDone(); } + + subscription.request(1); } @Override @@ -215,16 +236,14 @@ class DataBufferPublisherAdapter { } onReadDone(); - DataBufferUtils.release(dataBuffer); - Operators.onNextDropped(dataBuffer, sink.currentContext()); sink.error(t); } @Override public void onComplete() { - if (onShouldRead()) { - emitNext(sink); + if (subscribeThread != Thread.currentThread()) { + drainLoop(sink); } } }