From c6592b01b0c99f2e8967a5a3f11749ea9398e10a Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Mon, 21 Oct 2019 09:07:30 +0200 Subject: [PATCH] DATAMONGO-2393 - Polishing. Extract read requests into inner class. Original Pull Request: #799 --- .../gridfs/AsyncInputStreamAdapter.java | 134 +++++++++++------- 1 file changed, 79 insertions(+), 55 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java index f9e3dc602..ff2a33bf5 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java @@ -27,9 +27,7 @@ import reactor.util.context.Context; import java.nio.ByteBuffer; import java.util.Queue; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongFieldUpdater; -import java.util.function.BiConsumer; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; @@ -74,8 +72,7 @@ class AsyncInputStreamAdapter implements AsyncInputStream { private volatile boolean cancelled; private volatile boolean allDataBuffersReceived; private volatile Throwable error; - private final Queue> readRequests = Queues.> small() - .get(); + private final Queue readRequests = Queues. small().get(); private final Queue bufferQueue = Queues. small().get(); @@ -94,52 +91,7 @@ class AsyncInputStreamAdapter implements AsyncInputStream { return Flux.create(sink -> { - AtomicLong written = new AtomicLong(); - readRequests.offer((db, bytecount) -> { - - try { - - if (error != null) { - onError(sink, error); - return; - } - - if (bytecount == -1) { - - onComplete(sink, written.get() > 0 ? written.intValue() : -1); - return; - } - - ByteBuffer byteBuffer = db.asByteBuffer(); - int remaining = byteBuffer.remaining(); - int writeCapacity = Math.min(dst.remaining(), remaining); - int limit = Math.min(byteBuffer.position() + writeCapacity, byteBuffer.capacity()); - int toWrite = limit - byteBuffer.position(); - - if (toWrite == 0) { - - onComplete(sink, written.intValue()); - return; - } - - int oldPosition = byteBuffer.position(); - - byteBuffer.limit(toWrite); - dst.put(byteBuffer); - byteBuffer.limit(byteBuffer.capacity()); - byteBuffer.position(oldPosition); - db.readPosition(db.readPosition() + toWrite); - written.addAndGet(toWrite); - - } catch (Exception e) { - onError(sink, e); - } finally { - - if (db != null && db.readableByteCount() == 0) { - DataBufferUtils.release(db); - } - } - }); + readRequests.offer(new ReadRequest(sink, dst)); sink.onCancel(this::terminatePendingReads); sink.onDispose(this::terminatePendingReads); @@ -243,12 +195,12 @@ class AsyncInputStreamAdapter implements AsyncInputStream { continue; } - BiConsumer consumer = AsyncInputStreamAdapter.this.readRequests.peek(); + ReadRequest consumer = AsyncInputStreamAdapter.this.readRequests.peek(); if (consumer == null) { break; } - consumer.accept(wip, wip.readableByteCount()); + consumer.transferBytes(wip, wip.readableByteCount()); } if (bufferQueue.isEmpty()) { @@ -269,10 +221,10 @@ class AsyncInputStreamAdapter implements AsyncInputStream { */ void terminatePendingReads() { - BiConsumer readers; + ReadRequest readers; while ((readers = readRequests.poll()) != null) { - readers.accept(null, -1); + readers.onComplete(); } } @@ -299,7 +251,7 @@ class AsyncInputStreamAdapter implements AsyncInputStream { return; } - BiConsumer readRequest = AsyncInputStreamAdapter.this.readRequests.peek(); + ReadRequest readRequest = AsyncInputStreamAdapter.this.readRequests.peek(); if (readRequest == null) { @@ -336,4 +288,76 @@ class AsyncInputStreamAdapter implements AsyncInputStream { } } } + + /** + * Request to read bytes and transfer these to the associated {@link ByteBuffer}. + */ + class ReadRequest { + + private final FluxSink sink; + private final ByteBuffer dst; + + private int writtenBytes; + + ReadRequest(FluxSink sink, ByteBuffer dst) { + this.sink = sink; + this.dst = dst; + this.writtenBytes = -1; + } + + public void onComplete() { + + if (error != null) { + AsyncInputStreamAdapter.this.onError(sink, error); + return; + } + + AsyncInputStreamAdapter.this.onComplete(sink, writtenBytes); + } + + public void transferBytes(DataBuffer db, int bytes) { + + try { + + if (error != null) { + AsyncInputStreamAdapter.this.onError(sink, error); + return; + } + + ByteBuffer byteBuffer = db.asByteBuffer(); + int remaining = byteBuffer.remaining(); + int writeCapacity = Math.min(dst.remaining(), remaining); + int limit = Math.min(byteBuffer.position() + writeCapacity, byteBuffer.capacity()); + int toWrite = limit - byteBuffer.position(); + + if (toWrite == 0) { + + AsyncInputStreamAdapter.this.onComplete(sink, writtenBytes); + return; + } + + int oldPosition = byteBuffer.position(); + + byteBuffer.limit(toWrite); + dst.put(byteBuffer); + byteBuffer.limit(byteBuffer.capacity()); + byteBuffer.position(oldPosition); + db.readPosition(db.readPosition() + toWrite); + + if (writtenBytes == -1) { + writtenBytes = bytes; + } else { + writtenBytes += bytes; + } + + } catch (Exception e) { + AsyncInputStreamAdapter.this.onError(sink, e); + } finally { + + if (db.readableByteCount() == 0) { + DataBufferUtils.release(db); + } + } + } + } }