diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java index 180778c93..de364c927 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java @@ -62,7 +62,7 @@ class DataBufferPublisherAdapter { /** * Use an {@link AsyncInputStreamHandler} to read data from the given {@link AsyncInputStream}. - * + * * @param inputStream the source stream. * @return a {@link Flux} emitting data chunks one by one. * @since 2.2.1 @@ -71,7 +71,6 @@ class DataBufferPublisherAdapter { AsyncInputStreamHandler streamHandler = new AsyncInputStreamHandler(inputStream, inputStream.dataBufferFactory, inputStream.bufferSize); - return Flux.create((sink) -> { sink.onDispose(streamHandler::close); @@ -87,7 +86,7 @@ class DataBufferPublisherAdapter { * An {@link AsyncInputStream} also holding a {@link DataBufferFactory} and default {@literal bufferSize} for reading * from it, delegating operations on the {@link AsyncInputStream} to the reference instance.
* Used to pass on the {@link AsyncInputStream} and parameters to avoid capturing lambdas. - * + * * @author Christoph Strobl * @since 2.2.1 */ @@ -146,12 +145,18 @@ class DataBufferPublisherAdapter { private static final AtomicIntegerFieldUpdater STATE = AtomicIntegerFieldUpdater .newUpdater(AsyncInputStreamHandler.class, "state"); + private static final AtomicIntegerFieldUpdater DRAIN = AtomicIntegerFieldUpdater + .newUpdater(AsyncInputStreamHandler.class, "drain"); + private static final AtomicIntegerFieldUpdater READ = AtomicIntegerFieldUpdater .newUpdater(AsyncInputStreamHandler.class, "read"); private static final int STATE_OPEN = 0; private static final int STATE_CLOSED = 1; + private static final int DRAIN_NONE = 0; + private static final int DRAIN_COMPLETION = 1; + private static final int READ_NONE = 0; private static final int READ_IN_PROGRESS = 1; @@ -165,6 +170,9 @@ class DataBufferPublisherAdapter { // see STATE volatile int state = STATE_OPEN; + // see DRAIN + volatile int drain = DRAIN_NONE; + // see READ_IN_PROGRESS volatile int read = READ_NONE; @@ -209,6 +217,14 @@ class DataBufferPublisherAdapter { STATE.compareAndSet(this, STATE_OPEN, STATE_CLOSED); } + boolean enterDrainLoop() { + return DRAIN.compareAndSet(this, DRAIN_NONE, DRAIN_COMPLETION); + } + + void leaveDrainLoop() { + DRAIN.set(this, DRAIN_NONE); + } + boolean isClosed() { return STATE.get(this) == STATE_CLOSED; } @@ -235,7 +251,6 @@ class DataBufferPublisherAdapter { private final FluxSink sink; private final DataBufferFactory factory; private final ByteBuffer transport; - private final Thread subscribeThread = Thread.currentThread(); private volatile Subscription subscription; BufferCoreSubscriber(FluxSink sink, DataBufferFactory factory, ByteBuffer transport) { @@ -261,8 +276,6 @@ class DataBufferPublisherAdapter { public void onNext(Integer bytes) { if (isClosed()) { - - onReadDone(); return; } @@ -273,13 +286,9 @@ class DataBufferPublisherAdapter { decrementDemand(); } - try { - if (bytes == -1) { - sink.complete(); - return; - } - } finally { - onReadDone(); + if (bytes == -1) { + sink.complete(); + return; } subscription.request(1); @@ -306,15 +315,25 @@ class DataBufferPublisherAdapter { return; } - onReadDone(); + close(); sink.error(t); } @Override public void onComplete() { - if (subscribeThread != Thread.currentThread()) { - drainLoop(sink); + onReadDone(); + + if (!isClosed()) { + + if (enterDrainLoop()) { + try { + drainLoop(sink); + } finally { + leaveDrainLoop(); + } + } + } } }