From 0facdcfa9859f780b2d700bce7350dd5f176f712 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Mon, 21 Oct 2019 09:14:29 +0200 Subject: [PATCH] DATAMONGO-2393 - Use drain loop for same-thread processing in GridFS download stream. We now rely on an outer drain-loop when GridFS reads complete on the same thread instead of using recursive subscriptions to avoid StackOverflow. Previously, we recursively invoked subscriptions that lead to an increased stack size. Original Pull Request: #799 --- .../gridfs/DataBufferPublisherAdapter.java | 77 ++++++++++++------- 1 file changed, 48 insertions(+), 29 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java index dfeb1411d..654f9d7f0 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java @@ -32,7 +32,6 @@ import org.reactivestreams.Subscription; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; -import org.springframework.core.io.buffer.DataBufferUtils; import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; @@ -51,11 +50,13 @@ class DataBufferPublisherAdapter { * * @param inputStream must not be {@literal null}. * @param dataBufferFactory must not be {@literal null}. + * @param bufferSize read {@code n} bytes per iteration. * @return the resulting {@link Publisher}. */ - static Flux createBinaryStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) { + static Flux createBinaryStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory, + int bufferSize) { - State state = new State(inputStream, dataBufferFactory); + State state = new State(inputStream, dataBufferFactory, bufferSize); return Flux.usingWhen(Mono.just(inputStream), it -> { @@ -92,6 +93,7 @@ class DataBufferPublisherAdapter { final AsyncInputStream inputStream; final DataBufferFactory dataBufferFactory; + final int bufferSize; // see DEMAND volatile long demand; @@ -105,8 +107,16 @@ class DataBufferPublisherAdapter { void request(FluxSink sink, long n) { Operators.addCap(DEMAND, this, n); + drainLoop(sink); + } - if (onShouldRead()) { + /** + * Loops while we have demand and while no read is in progress. + * + * @param sink + */ + void drainLoop(FluxSink sink) { + while (onShouldRead()) { emitNext(sink); } } @@ -119,16 +129,16 @@ class DataBufferPublisherAdapter { return READ.compareAndSet(this, READ_NONE, READ_IN_PROGRESS); } - boolean onReadDone() { - return READ.compareAndSet(this, READ_IN_PROGRESS, READ_NONE); + void onReadDone() { + READ.compareAndSet(this, READ_IN_PROGRESS, READ_NONE); } long getDemand() { return DEMAND.get(this); } - boolean decrementDemand() { - return DEMAND.decrementAndGet(this) > 0; + void decrementDemand() { + DEMAND.decrementAndGet(this); } void close() { @@ -143,15 +153,15 @@ class DataBufferPublisherAdapter { * Emit the next {@link DataBuffer}. * * @param sink + * @return */ - void emitNext(FluxSink sink) { - - DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(); - ByteBuffer intermediate = ByteBuffer.allocate(dataBuffer.capacity()); + private void emitNext(FluxSink sink) { + ByteBuffer transport = ByteBuffer.allocate(bufferSize); + BufferCoreSubscriber bufferCoreSubscriber = new BufferCoreSubscriber(sink, dataBufferFactory, transport); try { - Mono.from(inputStream.read(intermediate)).subscribe(new BufferCoreSubscriber(sink, dataBuffer, intermediate)); - } catch (Exception e) { + inputStream.read(transport).subscribe(bufferCoreSubscriber); + } catch (Throwable e) { sink.error(e); } } @@ -159,14 +169,16 @@ class DataBufferPublisherAdapter { private class BufferCoreSubscriber implements CoreSubscriber { private final FluxSink sink; - private final DataBuffer dataBuffer; - private final ByteBuffer intermediate; + private final DataBufferFactory factory; + private final ByteBuffer transport; + private final Thread subscribeThread = Thread.currentThread(); + private volatile Subscription subscription; - BufferCoreSubscriber(FluxSink sink, DataBuffer dataBuffer, ByteBuffer intermediate) { + BufferCoreSubscriber(FluxSink sink, DataBufferFactory factory, ByteBuffer transport) { this.sink = sink; - this.dataBuffer = dataBuffer; - this.intermediate = intermediate; + this.factory = factory; + this.transport = transport; } @Override @@ -176,6 +188,7 @@ class DataBufferPublisherAdapter { @Override public void onSubscribe(Subscription s) { + this.subscription = s; s.request(1); } @@ -185,24 +198,32 @@ class DataBufferPublisherAdapter { if (isClosed()) { onReadDone(); - DataBufferUtils.release(dataBuffer); - Operators.onNextDropped(dataBuffer, sink.currentContext()); return; } - intermediate.flip(); - dataBuffer.write(intermediate); + if (bytes > 0) { - sink.next(dataBuffer); - decrementDemand(); + transport.flip(); + + DataBuffer dataBuffer = factory.allocateBuffer(transport.remaining()); + dataBuffer.write(transport); + + transport.clear(); + sink.next(dataBuffer); + + decrementDemand(); + } try { if (bytes == -1) { sink.complete(); + return; } } finally { onReadDone(); } + + subscription.request(1); } @Override @@ -215,16 +236,14 @@ class DataBufferPublisherAdapter { } onReadDone(); - DataBufferUtils.release(dataBuffer); - Operators.onNextDropped(dataBuffer, sink.currentContext()); sink.error(t); } @Override public void onComplete() { - if (onShouldRead()) { - emitNext(sink); + if (subscribeThread != Thread.currentThread()) { + drainLoop(sink); } } }