diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java index 654f9d7f0..180778c93 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java @@ -29,10 +29,10 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; - import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; +import com.mongodb.reactivestreams.client.Success; import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; /** @@ -56,34 +56,98 @@ class DataBufferPublisherAdapter { static Flux createBinaryStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory, int bufferSize) { - State state = new State(inputStream, dataBufferFactory, bufferSize); + return Flux.usingWhen(Mono.just(new DelegatingAsyncInputStream(inputStream, dataBufferFactory, bufferSize)), + DataBufferPublisherAdapter::doRead, AsyncInputStream::close, (it, err) -> it.close(), AsyncInputStream::close); + } + + /** + * Use an {@link AsyncInputStreamHandler} to read data from the given {@link AsyncInputStream}. + * + * @param inputStream the source stream. + * @return a {@link Flux} emitting data chunks one by one. + * @since 2.2.1 + */ + private static Flux doRead(DelegatingAsyncInputStream inputStream) { - return Flux.usingWhen(Mono.just(inputStream), it -> { + AsyncInputStreamHandler streamHandler = new AsyncInputStreamHandler(inputStream, inputStream.dataBufferFactory, + inputStream.bufferSize); - return Flux. create((sink) -> { + return Flux.create((sink) -> { - sink.onDispose(state::close); - sink.onCancel(state::close); + sink.onDispose(streamHandler::close); + sink.onCancel(streamHandler::close); - sink.onRequest(n -> { - state.request(sink, n); - }); + sink.onRequest(n -> { + streamHandler.request(sink, n); }); - }, AsyncInputStream::close, (it, err) -> it.close(), AsyncInputStream::close) // - .concatMap(Flux::just, 1); + }); + } + + /** + * An {@link AsyncInputStream} also holding a {@link DataBufferFactory} and default {@literal bufferSize} for reading + * from it, delegating operations on the {@link AsyncInputStream} to the reference instance.
+ * Used to pass on the {@link AsyncInputStream} and parameters to avoid capturing lambdas. + * + * @author Christoph Strobl + * @since 2.2.1 + */ + private static class DelegatingAsyncInputStream implements AsyncInputStream { + + private final AsyncInputStream inputStream; + private final DataBufferFactory dataBufferFactory; + private int bufferSize; + + /** + * @param inputStream the source input stream. + * @param dataBufferFactory + * @param bufferSize + */ + DelegatingAsyncInputStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory, int bufferSize) { + + this.inputStream = inputStream; + this.dataBufferFactory = dataBufferFactory; + this.bufferSize = bufferSize; + } + + /* + * (non-Javadoc) + * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#read(java.nio.ByteBuffer) + */ + @Override + public Publisher read(ByteBuffer dst) { + return inputStream.read(dst); + } + + /* + * (non-Javadoc) + * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#skip(long) + */ + @Override + public Publisher skip(long bytesToSkip) { + return inputStream.skip(bytesToSkip); + } + + /* + * (non-Javadoc) + * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#close() + */ + @Override + public Publisher close() { + return inputStream.close(); + } } @RequiredArgsConstructor - static class State { + static class AsyncInputStreamHandler { - private static final AtomicLongFieldUpdater DEMAND = AtomicLongFieldUpdater.newUpdater(State.class, - "demand"); + private static final AtomicLongFieldUpdater DEMAND = AtomicLongFieldUpdater + .newUpdater(AsyncInputStreamHandler.class, "demand"); - private static final AtomicIntegerFieldUpdater STATE = AtomicIntegerFieldUpdater.newUpdater(State.class, - "state"); + private static final AtomicIntegerFieldUpdater STATE = AtomicIntegerFieldUpdater + .newUpdater(AsyncInputStreamHandler.class, "state"); - private static final AtomicIntegerFieldUpdater READ = AtomicIntegerFieldUpdater.newUpdater(State.class, - "read"); + private static final AtomicIntegerFieldUpdater READ = AtomicIntegerFieldUpdater + .newUpdater(AsyncInputStreamHandler.class, "read"); private static final int STATE_OPEN = 0; private static final int STATE_CLOSED = 1; @@ -188,6 +252,7 @@ class DataBufferPublisherAdapter { @Override public void onSubscribe(Subscription s) { + this.subscription = s; s.request(1); } @@ -203,14 +268,8 @@ class DataBufferPublisherAdapter { if (bytes > 0) { - transport.flip(); - - DataBuffer dataBuffer = factory.allocateBuffer(transport.remaining()); - dataBuffer.write(transport); - - transport.clear(); - sink.next(dataBuffer); - + DataBuffer buffer = readNextChunk(); + sink.next(buffer); decrementDemand(); } @@ -226,6 +285,18 @@ class DataBufferPublisherAdapter { subscription.request(1); } + private DataBuffer readNextChunk() { + + transport.flip(); + + DataBuffer dataBuffer = factory.allocateBuffer(transport.remaining()); + dataBuffer.write(transport); + + transport.clear(); + + return dataBuffer; + } + @Override public void onError(Throwable t) { diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java index d67aaa3f1..fb40dad10 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java @@ -23,7 +23,6 @@ import java.io.InputStream; import java.util.function.IntFunction; import org.reactivestreams.Publisher; - import org.springframework.core.io.AbstractResource; import org.springframework.core.io.Resource; import org.springframework.core.io.buffer.DataBuffer; @@ -36,10 +35,13 @@ import com.mongodb.client.gridfs.model.GridFSFile; * Reactive {@link GridFSFile} based {@link Resource} implementation. * * @author Mark Paluch + * @author Christoph Strobl * @since 2.2 */ public class ReactiveGridFsResource extends AbstractResource { + private static final Integer DEFAULT_CHUNK_SIZE = 256 * 1024; + private final @Nullable GridFSFile file; private final String filename; private final IntFunction> contentFunction; @@ -176,19 +178,23 @@ public class ReactiveGridFsResource extends AbstractResource { } /** - * Retrieve the download stream using the default chunk size of 256kb. + * Retrieve the download stream using the default chunk size of 256 kB. * - * @return + * @return a {@link Flux} emitting data chunks one by one. Please make sure to + * {@link org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer) release} all + * {@link DataBuffer buffers} when done. */ public Flux getDownloadStream() { - return getDownloadStream(256 * 1024); // 256kb buffers + return getDownloadStream(DEFAULT_CHUNK_SIZE); } /** * Retrieve the download stream. * * @param chunkSize chunk size in bytes to use. - * @return + * @return a {@link Flux} emitting data chunks one by one. Please make sure to + * {@link org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer) release} all + * {@link DataBuffer buffers} when done. * @since 2.2.1 */ public Flux getDownloadStream(int chunkSize) { diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java index a95976b7d..9914443c8 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java @@ -33,7 +33,6 @@ import org.bson.types.ObjectId; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; - import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.Resource; @@ -101,13 +100,10 @@ public class ReactiveGridFsTemplateTests { @Test // DATAMONGO-1855 public void storesAndLoadsLargeFileCorrectly() { - ByteBuffer buffer = ByteBuffer.allocate(1000 * 1000 * 1); // 1 mb - + ByteBuffer buffer = ByteBuffer.allocate(1000 * 1000); // 1 mb int i = 0; while (buffer.remaining() != 0) { - byte b = (byte) (i++ % 16); - String string = HexUtils.toHex(new byte[] { b }); - buffer.put(string.getBytes()); + buffer.put(HexUtils.toHex(new byte[] { (byte) (i++ % 16) }).getBytes()); } buffer.flip();