From 923134bbdc8d024c7416e2af2ae7fed52e1b2be1 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Mon, 11 Nov 2019 12:02:16 +0100 Subject: [PATCH] DATAMONGO-2414 - Guard drain loop in AsyncInputStreamHandler with state switch. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We now use a non-blocking state switch to determine whether to invoke drainLoop(…) from Subscriber completion. Previously, we relied on same thread identification assuming if the subscription thread and the completion thread were the same, that we're already running inside the drain loop. It turns out that a I/O thread could also run in event-loop mode where subscription and completion happens on the same thread but in between there's some processing and so the the call to completion is a delayed signal and not being called on the same stack as drainLoop(…). The same-thread assumption was in place to avoid StackOverflow caused by infinite recursions. We now use a state lock to enter the drain loop. Any concurrent attempts to re-enter the drain loop in Subscriber completion is now prevented to make sure that we continue draining while not causing stack recursions. Original Pull Request: #807 --- .../gridfs/DataBufferPublisherAdapter.java | 51 +++++++++++++------ 1 file changed, 35 insertions(+), 16 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java index 180778c93..de364c927 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java @@ -62,7 +62,7 @@ class DataBufferPublisherAdapter { /** * Use an {@link AsyncInputStreamHandler} to read data from the given {@link AsyncInputStream}. - * + * * @param inputStream the source stream. * @return a {@link Flux} emitting data chunks one by one. * @since 2.2.1 @@ -71,7 +71,6 @@ class DataBufferPublisherAdapter { AsyncInputStreamHandler streamHandler = new AsyncInputStreamHandler(inputStream, inputStream.dataBufferFactory, inputStream.bufferSize); - return Flux.create((sink) -> { sink.onDispose(streamHandler::close); @@ -87,7 +86,7 @@ class DataBufferPublisherAdapter { * An {@link AsyncInputStream} also holding a {@link DataBufferFactory} and default {@literal bufferSize} for reading * from it, delegating operations on the {@link AsyncInputStream} to the reference instance.
* Used to pass on the {@link AsyncInputStream} and parameters to avoid capturing lambdas. - * + * * @author Christoph Strobl * @since 2.2.1 */ @@ -146,12 +145,18 @@ class DataBufferPublisherAdapter { private static final AtomicIntegerFieldUpdater STATE = AtomicIntegerFieldUpdater .newUpdater(AsyncInputStreamHandler.class, "state"); + private static final AtomicIntegerFieldUpdater DRAIN = AtomicIntegerFieldUpdater + .newUpdater(AsyncInputStreamHandler.class, "drain"); + private static final AtomicIntegerFieldUpdater READ = AtomicIntegerFieldUpdater .newUpdater(AsyncInputStreamHandler.class, "read"); private static final int STATE_OPEN = 0; private static final int STATE_CLOSED = 1; + private static final int DRAIN_NONE = 0; + private static final int DRAIN_COMPLETION = 1; + private static final int READ_NONE = 0; private static final int READ_IN_PROGRESS = 1; @@ -165,6 +170,9 @@ class DataBufferPublisherAdapter { // see STATE volatile int state = STATE_OPEN; + // see DRAIN + volatile int drain = DRAIN_NONE; + // see READ_IN_PROGRESS volatile int read = READ_NONE; @@ -209,6 +217,14 @@ class DataBufferPublisherAdapter { STATE.compareAndSet(this, STATE_OPEN, STATE_CLOSED); } + boolean enterDrainLoop() { + return DRAIN.compareAndSet(this, DRAIN_NONE, DRAIN_COMPLETION); + } + + void leaveDrainLoop() { + DRAIN.set(this, DRAIN_NONE); + } + boolean isClosed() { return STATE.get(this) == STATE_CLOSED; } @@ -235,7 +251,6 @@ class DataBufferPublisherAdapter { private final FluxSink sink; private final DataBufferFactory factory; private final ByteBuffer transport; - private final Thread subscribeThread = Thread.currentThread(); private volatile Subscription subscription; BufferCoreSubscriber(FluxSink sink, DataBufferFactory factory, ByteBuffer transport) { @@ -261,8 +276,6 @@ class DataBufferPublisherAdapter { public void onNext(Integer bytes) { if (isClosed()) { - - onReadDone(); return; } @@ -273,13 +286,9 @@ class DataBufferPublisherAdapter { decrementDemand(); } - try { - if (bytes == -1) { - sink.complete(); - return; - } - } finally { - onReadDone(); + if (bytes == -1) { + sink.complete(); + return; } subscription.request(1); @@ -306,15 +315,25 @@ class DataBufferPublisherAdapter { return; } - onReadDone(); + close(); sink.error(t); } @Override public void onComplete() { - if (subscribeThread != Thread.currentThread()) { - drainLoop(sink); + onReadDone(); + + if (!isClosed()) { + + if (enterDrainLoop()) { + try { + drainLoop(sink); + } finally { + leaveDrainLoop(); + } + } + } } }