- * This adapter subscribes to the binary {@link Publisher} as soon as the first chunk gets {@link #read(ByteBuffer)
- * requested}. Requests are queued and binary chunks are requested from the {@link Publisher}. As soon as the
- * {@link Publisher} emits items, chunks are provided to the read request which completes the number-of-written-bytes
- * {@link Publisher}.
- *
- * {@link AsyncInputStream} is supposed to work as sequential callback API that is called until reaching EOF.
- * {@link #close()} is propagated as cancellation signal to the binary {@link Publisher}.
- *
- * @author Mark Paluch
- * @author Christoph Strobl
- * @since 2.2
- */
-@RequiredArgsConstructor
-class AsyncInputStreamAdapter implements AsyncInputStream {
-
- private static final AtomicLongFieldUpdater DEMAND = AtomicLongFieldUpdater
- .newUpdater(AsyncInputStreamAdapter.class, "demand");
-
- private static final AtomicIntegerFieldUpdater SUBSCRIBED = AtomicIntegerFieldUpdater
- .newUpdater(AsyncInputStreamAdapter.class, "subscribed");
-
- private static final int SUBSCRIPTION_NOT_SUBSCRIBED = 0;
- private static final int SUBSCRIPTION_SUBSCRIBED = 1;
-
- private final Publisher extends DataBuffer> buffers;
- private final Context subscriberContext;
-
- private volatile Subscription subscription;
- private volatile boolean cancelled;
- private volatile boolean allDataBuffersReceived;
- private volatile Throwable error;
- private final Queue readRequests = Queues. small().get();
-
- private final Queue bufferQueue = Queues. small().get();
-
- // see DEMAND
- volatile long demand;
-
- // see SUBSCRIBED
- volatile int subscribed = SUBSCRIPTION_NOT_SUBSCRIBED;
-
- /*
- * (non-Javadoc)
- * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#read(java.nio.ByteBuffer)
- */
- @Override
- public Publisher read(ByteBuffer dst) {
-
- return Flux.create(sink -> {
-
- readRequests.offer(new ReadRequest(sink, dst));
-
- sink.onCancel(this::terminatePendingReads);
- sink.onDispose(this::terminatePendingReads);
- sink.onRequest(this::request);
- });
- }
-
- void onError(FluxSink sink, Throwable e) {
-
- readRequests.poll();
- sink.error(e);
- }
-
- void onComplete(FluxSink sink, int writtenBytes) {
-
- readRequests.poll();
- DEMAND.decrementAndGet(this);
- sink.next(writtenBytes);
- sink.complete();
- }
-
- /*
- * (non-Javadoc)
- * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#skip(long)
- */
- @Override
- public Publisher skip(long bytesToSkip) {
- throw new UnsupportedOperationException("Skip is currently not implemented");
- }
-
- /*
- * (non-Javadoc)
- * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#close()
- */
- @Override
- public Publisher close() {
-
- return Mono.create(sink -> {
-
- cancelled = true;
-
- if (error != null) {
- terminatePendingReads();
- sink.error(error);
- return;
- }
-
- terminatePendingReads();
- sink.success();
- });
- }
-
- protected void request(long n) {
-
- if (allDataBuffersReceived && bufferQueue.isEmpty()) {
-
- terminatePendingReads();
- return;
- }
-
- Operators.addCap(DEMAND, this, n);
-
- if (SUBSCRIBED.get(this) == SUBSCRIPTION_NOT_SUBSCRIBED) {
-
- if (SUBSCRIBED.compareAndSet(this, SUBSCRIPTION_NOT_SUBSCRIBED, SUBSCRIPTION_SUBSCRIBED)) {
- buffers.subscribe(new DataBufferCoreSubscriber());
- }
-
- } else {
-
- Subscription subscription = this.subscription;
-
- if (subscription != null) {
- requestFromSubscription(subscription);
- }
- }
-
- }
-
- void requestFromSubscription(Subscription subscription) {
-
- if (cancelled) {
- subscription.cancel();
- }
-
- drainLoop();
- }
-
- void drainLoop() {
-
- while (DEMAND.get(AsyncInputStreamAdapter.this) > 0) {
-
- DataBuffer wip = bufferQueue.peek();
-
- if (wip == null) {
- break;
- }
-
- if (wip.readableByteCount() == 0) {
- bufferQueue.poll();
- continue;
- }
-
- ReadRequest consumer = AsyncInputStreamAdapter.this.readRequests.peek();
- if (consumer == null) {
- break;
- }
-
- consumer.transferBytes(wip, wip.readableByteCount());
- }
-
- if (bufferQueue.isEmpty()) {
-
- if (allDataBuffersReceived) {
- terminatePendingReads();
- return;
- }
-
- if (demand > 0) {
- subscription.request(1);
- }
- }
- }
-
- /**
- * Terminates pending reads with empty buffers.
- */
- void terminatePendingReads() {
-
- ReadRequest readers;
-
- while ((readers = readRequests.poll()) != null) {
- readers.onComplete();
- }
- }
-
- private class DataBufferCoreSubscriber implements CoreSubscriber {
-
- @Override
- public Context currentContext() {
- return AsyncInputStreamAdapter.this.subscriberContext;
- }
-
- @Override
- public void onSubscribe(Subscription s) {
-
- AsyncInputStreamAdapter.this.subscription = s;
- s.request(1);
- }
-
- @Override
- public void onNext(DataBuffer dataBuffer) {
-
- if (cancelled || allDataBuffersReceived) {
- DataBufferUtils.release(dataBuffer);
- Operators.onNextDropped(dataBuffer, AsyncInputStreamAdapter.this.subscriberContext);
- return;
- }
-
- ReadRequest readRequest = AsyncInputStreamAdapter.this.readRequests.peek();
-
- if (readRequest == null) {
-
- DataBufferUtils.release(dataBuffer);
- Operators.onNextDropped(dataBuffer, AsyncInputStreamAdapter.this.subscriberContext);
- subscription.cancel();
- return;
- }
-
- bufferQueue.offer(dataBuffer);
-
- drainLoop();
- }
-
- @Override
- public void onError(Throwable t) {
-
- if (AsyncInputStreamAdapter.this.cancelled || AsyncInputStreamAdapter.this.allDataBuffersReceived) {
- Operators.onErrorDropped(t, AsyncInputStreamAdapter.this.subscriberContext);
- return;
- }
-
- AsyncInputStreamAdapter.this.error = t;
- AsyncInputStreamAdapter.this.allDataBuffersReceived = true;
- terminatePendingReads();
- }
-
- @Override
- public void onComplete() {
-
- AsyncInputStreamAdapter.this.allDataBuffersReceived = true;
- if (bufferQueue.isEmpty()) {
- terminatePendingReads();
- }
- }
- }
-
- /**
- * Request to read bytes and transfer these to the associated {@link ByteBuffer}.
- */
- class ReadRequest {
-
- private final FluxSink sink;
- private final ByteBuffer dst;
-
- private int writtenBytes;
-
- ReadRequest(FluxSink sink, ByteBuffer dst) {
- this.sink = sink;
- this.dst = dst;
- this.writtenBytes = -1;
- }
-
- public void onComplete() {
-
- if (error != null) {
- AsyncInputStreamAdapter.this.onError(sink, error);
- return;
- }
-
- AsyncInputStreamAdapter.this.onComplete(sink, writtenBytes);
- }
-
- public void transferBytes(DataBuffer db, int bytes) {
-
- try {
-
- if (error != null) {
- AsyncInputStreamAdapter.this.onError(sink, error);
- return;
- }
-
- ByteBuffer byteBuffer = db.asByteBuffer();
- int remaining = byteBuffer.remaining();
- int writeCapacity = Math.min(dst.remaining(), remaining);
- int limit = Math.min(byteBuffer.position() + writeCapacity, byteBuffer.capacity());
- int toWrite = limit - byteBuffer.position();
-
- if (toWrite == 0) {
-
- AsyncInputStreamAdapter.this.onComplete(sink, writtenBytes);
- return;
- }
-
- int oldPosition = byteBuffer.position();
-
- byteBuffer.limit(toWrite);
- dst.put(byteBuffer);
- byteBuffer.limit(byteBuffer.capacity());
- byteBuffer.position(oldPosition);
- db.readPosition(db.readPosition() + toWrite);
-
- if (writtenBytes == -1) {
- writtenBytes = bytes;
- } else {
- writtenBytes += bytes;
- }
-
- } catch (Exception e) {
- AsyncInputStreamAdapter.this.onError(sink, e);
- } finally {
-
- if (db.readableByteCount() == 0) {
- DataBufferUtils.release(db);
- }
- }
- }
- }
-}
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdapters.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdapters.java
deleted file mode 100644
index e793f84c1..000000000
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdapters.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Copyright 2019-2020 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.springframework.data.mongodb.gridfs;
-
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import org.reactivestreams.Publisher;
-import org.springframework.core.io.buffer.DataBuffer;
-import org.springframework.core.io.buffer.DataBufferFactory;
-import org.springframework.core.io.buffer.DataBufferUtils;
-
-import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
-
-/**
- * Utility methods to create adapters from between {@link Publisher} of {@link DataBuffer} and {@link AsyncInputStream}.
- *
- * @author Mark Paluch
- * @since 2.2
- */
-class BinaryStreamAdapters {
-
- /**
- * Creates a {@link Flux} emitting {@link DataBuffer} by reading binary chunks from {@link AsyncInputStream}.
- * Publisher termination (completion, error, cancellation) closes the {@link AsyncInputStream}.
- *
- * The resulting {@link org.reactivestreams.Publisher} filters empty binary chunks and uses {@link DataBufferFactory}
- * settings to determine the chunk size.
- *
- * @param inputStream must not be {@literal null}.
- * @param dataBufferFactory must not be {@literal null}.
- * @param bufferSize read {@code n} bytes per iteration.
- * @return {@link Flux} emitting {@link DataBuffer}s.
- * @see DataBufferFactory#allocateBuffer()
- */
- static Flux toPublisher(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory,
- int bufferSize) {
-
- return DataBufferPublisherAdapter.createBinaryStream(inputStream, dataBufferFactory, bufferSize) //
- .filter(it -> {
-
- if (it.readableByteCount() == 0) {
- DataBufferUtils.release(it);
- return false;
- }
- return true;
- });
- }
-
- /**
- * Creates a {@link Mono} emitting a {@link AsyncInputStream} to consume a {@link Publisher} emitting
- * {@link DataBuffer} and exposing the binary stream through {@link AsyncInputStream}. {@link DataBuffer}s are
- * released by the adapter during consumption.
- *
- * This method returns a {@link Mono} to retain the {@link reactor.util.context.Context subscriber context}.
- *
- * @param dataBuffers must not be {@literal null}.
- * @return {@link Mono} emitting {@link AsyncInputStream}.
- * @see DataBufferUtils#release(DataBuffer)
- */
- static Mono toAsyncInputStream(Publisher extends DataBuffer> dataBuffers) {
-
- return Mono.create(sink -> {
- sink.success(new AsyncInputStreamAdapter(dataBuffers, sink.currentContext()));
- });
- }
-}
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java
deleted file mode 100644
index 359d0fb05..000000000
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java
+++ /dev/null
@@ -1,340 +0,0 @@
-/*
- * Copyright 2019-2020 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.springframework.data.mongodb.gridfs;
-
-import lombok.RequiredArgsConstructor;
-import reactor.core.CoreSubscriber;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.FluxSink;
-import reactor.core.publisher.Mono;
-import reactor.core.publisher.Operators;
-import reactor.util.context.Context;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-
-import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscription;
-import org.springframework.core.io.buffer.DataBuffer;
-import org.springframework.core.io.buffer.DataBufferFactory;
-
-import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
-
-/**
- * Utility to adapt a {@link AsyncInputStream} to a {@link Publisher} emitting {@link DataBuffer}.
- *
- * @author Mark Paluch
- * @author Christoph Strobl
- * @since 2.2
- */
-class DataBufferPublisherAdapter {
-
- /**
- * Creates a {@link Publisher} emitting {@link DataBuffer}s by reading binary chunks from {@link AsyncInputStream}.
- * Closes the {@link AsyncInputStream} once the {@link Publisher} terminates.
- *
- * @param inputStream must not be {@literal null}.
- * @param dataBufferFactory must not be {@literal null}.
- * @param bufferSize read {@code n} bytes per iteration.
- * @return the resulting {@link Publisher}.
- */
- static Flux createBinaryStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory,
- int bufferSize) {
-
- return Flux.usingWhen(Mono.just(new DelegatingAsyncInputStream(inputStream, dataBufferFactory, bufferSize)),
- DataBufferPublisherAdapter::doRead, AsyncInputStream::close, (it, err) -> it.close(), AsyncInputStream::close);
- }
-
- /**
- * Use an {@link AsyncInputStreamHandler} to read data from the given {@link AsyncInputStream}.
- *
- * @param inputStream the source stream.
- * @return a {@link Flux} emitting data chunks one by one.
- * @since 2.2.1
- */
- private static Flux doRead(DelegatingAsyncInputStream inputStream) {
-
- AsyncInputStreamHandler streamHandler = new AsyncInputStreamHandler(inputStream, inputStream.dataBufferFactory,
- inputStream.bufferSize);
- return Flux.create((sink) -> {
-
- sink.onDispose(streamHandler::close);
- sink.onCancel(streamHandler::close);
-
- sink.onRequest(n -> {
- streamHandler.request(sink, n);
- });
- });
- }
-
- /**
- * An {@link AsyncInputStream} also holding a {@link DataBufferFactory} and default {@literal bufferSize} for reading
- * from it, delegating operations on the {@link AsyncInputStream} to the reference instance.
- * Used to pass on the {@link AsyncInputStream} and parameters to avoid capturing lambdas.
- *
- * @author Christoph Strobl
- * @since 2.2.1
- */
- private static class DelegatingAsyncInputStream implements AsyncInputStream {
-
- private final AsyncInputStream inputStream;
- private final DataBufferFactory dataBufferFactory;
- private int bufferSize;
-
- /**
- * @param inputStream the source input stream.
- * @param dataBufferFactory
- * @param bufferSize
- */
- DelegatingAsyncInputStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory, int bufferSize) {
-
- this.inputStream = inputStream;
- this.dataBufferFactory = dataBufferFactory;
- this.bufferSize = bufferSize;
- }
-
- /*
- * (non-Javadoc)
- * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#read(java.nio.ByteBuffer)
- */
- @Override
- public Publisher read(ByteBuffer dst) {
- return inputStream.read(dst);
- }
-
- /*
- * (non-Javadoc)
- * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#skip(long)
- */
- @Override
- public Publisher skip(long bytesToSkip) {
- return inputStream.skip(bytesToSkip);
- }
-
- /*
- * (non-Javadoc)
- * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#close()
- */
- @Override
- public Publisher close() {
- return inputStream.close();
- }
- }
-
- @RequiredArgsConstructor
- static class AsyncInputStreamHandler {
-
- private static final AtomicLongFieldUpdater DEMAND = AtomicLongFieldUpdater
- .newUpdater(AsyncInputStreamHandler.class, "demand");
-
- private static final AtomicIntegerFieldUpdater STATE = AtomicIntegerFieldUpdater
- .newUpdater(AsyncInputStreamHandler.class, "state");
-
- private static final AtomicIntegerFieldUpdater DRAIN = AtomicIntegerFieldUpdater
- .newUpdater(AsyncInputStreamHandler.class, "drain");
-
- private static final AtomicIntegerFieldUpdater READ = AtomicIntegerFieldUpdater
- .newUpdater(AsyncInputStreamHandler.class, "read");
-
- private static final int STATE_OPEN = 0;
- private static final int STATE_CLOSED = 1;
-
- private static final int DRAIN_NONE = 0;
- private static final int DRAIN_COMPLETION = 1;
-
- private static final int READ_NONE = 0;
- private static final int READ_IN_PROGRESS = 1;
-
- final AsyncInputStream inputStream;
- final DataBufferFactory dataBufferFactory;
- final int bufferSize;
-
- // see DEMAND
- volatile long demand;
-
- // see STATE
- volatile int state = STATE_OPEN;
-
- // see DRAIN
- volatile int drain = DRAIN_NONE;
-
- // see READ_IN_PROGRESS
- volatile int read = READ_NONE;
-
- void request(FluxSink sink, long n) {
-
- Operators.addCap(DEMAND, this, n);
- drainLoop(sink);
- }
-
- /**
- * Loops while we have demand and while no read is in progress.
- *
- * @param sink
- */
- void drainLoop(FluxSink sink) {
- while (onShouldRead()) {
- emitNext(sink);
- }
- }
-
- boolean onShouldRead() {
- return !isClosed() && getDemand() > 0 && onWantRead();
- }
-
- boolean onWantRead() {
- return READ.compareAndSet(this, READ_NONE, READ_IN_PROGRESS);
- }
-
- void onReadDone() {
- READ.compareAndSet(this, READ_IN_PROGRESS, READ_NONE);
- }
-
- long getDemand() {
- return DEMAND.get(this);
- }
-
- void decrementDemand() {
- DEMAND.decrementAndGet(this);
- }
-
- void close() {
- STATE.compareAndSet(this, STATE_OPEN, STATE_CLOSED);
- }
-
- boolean enterDrainLoop() {
- return DRAIN.compareAndSet(this, DRAIN_NONE, DRAIN_COMPLETION);
- }
-
- void leaveDrainLoop() {
- DRAIN.set(this, DRAIN_NONE);
- }
-
- boolean isClosed() {
- return STATE.get(this) == STATE_CLOSED;
- }
-
- /**
- * Emit the next {@link DataBuffer}.
- *
- * @param sink
- * @return
- */
- private void emitNext(FluxSink sink) {
-
- ByteBuffer transport = ByteBuffer.allocate(bufferSize);
- BufferCoreSubscriber bufferCoreSubscriber = new BufferCoreSubscriber(sink, dataBufferFactory, transport);
- try {
- inputStream.read(transport).subscribe(bufferCoreSubscriber);
- } catch (Throwable e) {
- sink.error(e);
- }
- }
-
- private class BufferCoreSubscriber implements CoreSubscriber {
-
- private final FluxSink sink;
- private final DataBufferFactory factory;
- private final ByteBuffer transport;
- private volatile Subscription subscription;
-
- BufferCoreSubscriber(FluxSink sink, DataBufferFactory factory, ByteBuffer transport) {
-
- this.sink = sink;
- this.factory = factory;
- this.transport = transport;
- }
-
- @Override
- public Context currentContext() {
- return sink.currentContext();
- }
-
- @Override
- public void onSubscribe(Subscription s) {
-
- this.subscription = s;
- s.request(1);
- }
-
- @Override
- public void onNext(Integer bytes) {
-
- if (isClosed()) {
- return;
- }
-
- if (bytes > 0) {
-
- DataBuffer buffer = readNextChunk();
- sink.next(buffer);
- decrementDemand();
- }
-
- if (bytes == -1) {
- sink.complete();
- return;
- }
-
- subscription.request(1);
- }
-
- private DataBuffer readNextChunk() {
-
- transport.flip();
-
- DataBuffer dataBuffer = factory.allocateBuffer(transport.remaining());
- dataBuffer.write(transport);
-
- transport.clear();
-
- return dataBuffer;
- }
-
- @Override
- public void onError(Throwable t) {
-
- if (isClosed()) {
-
- Operators.onErrorDropped(t, sink.currentContext());
- return;
- }
-
- close();
- sink.error(t);
- }
-
- @Override
- public void onComplete() {
-
- onReadDone();
-
- if (!isClosed()) {
-
- if (enterDrainLoop()) {
- try {
- drainLoop(sink);
- } finally {
- leaveDrainLoop();
- }
- }
-
- }
- }
- }
- }
-}
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsOperations.java
index 9170da7cc..f189ea9d8 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsOperations.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsOperations.java
@@ -27,7 +27,6 @@ import org.springframework.data.mongodb.core.query.Query;
import org.springframework.lang.Nullable;
import com.mongodb.client.gridfs.model.GridFSFile;
-import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
/**
* Collection of operations to store and read files from MongoDB GridFS using reactive infrastructure.
@@ -101,20 +100,6 @@ public interface ReactiveGridFsOperations {
return store(content, filename, null, metadata);
}
- /**
- * Stores the given content into a file with the given name and content type using the given metadata. The metadata
- * object will be marshalled before writing.
- *
- * @param content must not be {@literal null}.
- * @param filename must not be {@literal null} or empty.
- * @param contentType can be {@literal null}.
- * @param metadata can be {@literal null}
- * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just
- * created.
- */
- Mono store(AsyncInputStream content, @Nullable String filename, @Nullable String contentType,
- @Nullable Object metadata);
-
/**
* Stores the given content into a file with the given name and content type using the given metadata. The metadata
* object will be marshalled before writing.
@@ -142,19 +127,6 @@ public interface ReactiveGridFsOperations {
return store(content, filename, null, metadata);
}
- /**
- * Stores the given content into a file with the given name and content type using the given metadata.
- *
- * @param content must not be {@literal null}.
- * @param filename must not be {@literal null} or empty.
- * @param contentType can be {@literal null}.
- * @param metadata can be {@literal null}.
- * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just
- * created.
- */
- Mono store(AsyncInputStream content, @Nullable String filename, @Nullable String contentType,
- @Nullable Document metadata);
-
/**
* Stores the given content into a file with the given name and content type using the given metadata.
*
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java
index 173ab2400..14be071eb 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java
@@ -16,20 +16,17 @@
package org.springframework.data.mongodb.gridfs;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.function.IntFunction;
-
-import org.reactivestreams.Publisher;
-import org.springframework.core.io.AbstractResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.DataBufferFactory;
+import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import com.mongodb.client.gridfs.model.GridFSFile;
+import com.mongodb.reactivestreams.client.gridfs.GridFSDownloadPublisher;
/**
* Reactive {@link GridFSFile} based {@link Resource} implementation.
@@ -38,13 +35,10 @@ import com.mongodb.client.gridfs.model.GridFSFile;
* @author Christoph Strobl
* @since 2.2
*/
-public class ReactiveGridFsResource extends AbstractResource {
-
- private static final Integer DEFAULT_CHUNK_SIZE = 256 * 1024;
+public class ReactiveGridFsResource {
- private final @Nullable GridFSFile file;
+ private final GridFSDownloadPublisher content;
private final String filename;
- private final IntFunction> contentFunction;
/**
* Creates a new, absent {@link ReactiveGridFsResource}.
@@ -52,35 +46,10 @@ public class ReactiveGridFsResource extends AbstractResource {
* @param filename filename of the absent resource.
* @param content
*/
- private ReactiveGridFsResource(String filename, Publisher content) {
+ public ReactiveGridFsResource(String filename, @Nullable GridFSDownloadPublisher content) {
- this.file = null;
+ this.content = content;
this.filename = filename;
- this.contentFunction = any -> Flux.from(content);
- }
-
- /**
- * Creates a new {@link ReactiveGridFsResource} from the given {@link GridFSFile}.
- *
- * @param file must not be {@literal null}.
- * @param content
- */
- public ReactiveGridFsResource(GridFSFile file, Publisher content) {
- this(file, (IntFunction>) any -> Flux.from(content));
- }
-
- /**
- * Creates a new {@link ReactiveGridFsResource} from the given {@link GridFSFile}.
- *
- * @param file must not be {@literal null}.
- * @param contentFunction
- * @since 2.2.1
- */
- ReactiveGridFsResource(GridFSFile file, IntFunction> contentFunction) {
-
- this.file = file;
- this.filename = file.getFilename();
- this.contentFunction = contentFunction;
}
/**
@@ -93,123 +62,54 @@ public class ReactiveGridFsResource extends AbstractResource {
public static ReactiveGridFsResource absent(String filename) {
Assert.notNull(filename, "Filename must not be null");
-
- return new ReactiveGridFsResource(filename, Flux.empty());
+ return new ReactiveGridFsResource(filename, null);
}
- /*
- * (non-Javadoc)
- * @see org.springframework.core.io.InputStreamResource#getInputStream()
- */
- @Override
- public InputStream getInputStream() throws IllegalStateException {
- throw new UnsupportedOperationException();
- }
-
- /*
- * (non-Javadoc)
- * @see org.springframework.core.io.AbstractResource#contentLength()
- */
- @Override
- public long contentLength() throws IOException {
-
- verifyExists();
- return getGridFSFile().getLength();
- }
-
- /*
- * (non-Javadoc)
+ /**
* @see org.springframework.core.io.AbstractResource#getFilename()
*/
- @Override
public String getFilename() throws IllegalStateException {
return this.filename;
}
- /*
- * (non-Javadoc)
- * @see org.springframework.core.io.AbstractResource#exists()
- */
- @Override
- public boolean exists() {
- return this.file != null;
- }
-
- /*
- * (non-Javadoc)
- * @see org.springframework.core.io.AbstractResource#lastModified()
- */
- @Override
- public long lastModified() throws IOException {
-
- verifyExists();
- return getGridFSFile().getUploadDate().getTime();
- }
-
- /*
- * (non-Javadoc)
- * @see org.springframework.core.io.AbstractResource#getDescription()
- */
- @Override
- public String getDescription() {
- return String.format("GridFs resource [%s]", this.getFilename());
- }
-
- /**
- * Returns the {@link Resource}'s id.
- *
- * @return never {@literal null}.
- * @throws IllegalStateException if the file does not {@link #exists()}.
- */
- public Object getId() {
-
- Assert.state(exists(), () -> String.format("%s does not exist.", getDescription()));
-
- return getGridFSFile().getId();
- }
-
/**
* @return the underlying {@link GridFSFile}. Can be {@literal null} if absent.
* @since 2.2
*/
- @Nullable
- public GridFSFile getGridFSFile() {
- return file;
+ public Mono getGridFSFile() {
+ return content != null ? Mono.from(content.getGridFSFile()) : Mono.empty();
}
/**
- * Retrieve the download stream using the default chunk size of 256 kB.
- *
- * @return a {@link Flux} emitting data chunks one by one. Please make sure to
- * {@link org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer) release} all
- * {@link DataBuffer buffers} when done.
*/
public Flux getDownloadStream() {
- return getDownloadStream(DEFAULT_CHUNK_SIZE);
+
+ if (content == null) {
+ return Flux.empty();
+ }
+
+ return createDownloadStream(content);
}
/**
- * Retrieve the download stream.
- *
- * @param chunkSize chunk size in bytes to use.
- * @return a {@link Flux} emitting data chunks one by one. Please make sure to
- * {@link org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer) release} all
- * {@link DataBuffer buffers} when done.
- * @since 2.2.1
*/
public Flux getDownloadStream(int chunkSize) {
- if (!exists()) {
- return Flux.error(new FileNotFoundException(String.format("%s does not exist.", getDescription())));
+ if (content == null) {
+ return Flux.empty();
+
}
- return contentFunction.apply(chunkSize);
+ return createDownloadStream(content.bufferSizeBytes(chunkSize));
}
- private void verifyExists() throws FileNotFoundException {
+ private Flux createDownloadStream(GridFSDownloadPublisher publisher) {
- if (!exists()) {
- throw new FileNotFoundException(String.format("%s does not exist.", getDescription()));
- }
+ DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
+ return Flux.from(publisher).map(bufferFactory::wrap);
+ }
+
+ public boolean exists() {
+ return content != null;
}
}
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java
index 3537b146e..c7f3c8f31 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java
@@ -21,6 +21,8 @@ import static org.springframework.data.mongodb.gridfs.GridFsCriteria.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import java.nio.ByteBuffer;
+
import org.bson.Document;
import org.bson.types.ObjectId;
import org.reactivestreams.Publisher;
@@ -37,12 +39,12 @@ import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import com.mongodb.client.gridfs.model.GridFSFile;
+import com.mongodb.client.gridfs.model.GridFSUploadOptions;
import com.mongodb.reactivestreams.client.MongoDatabase;
-import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
import com.mongodb.reactivestreams.client.gridfs.GridFSBucket;
import com.mongodb.reactivestreams.client.gridfs.GridFSBuckets;
-import com.mongodb.reactivestreams.client.gridfs.GridFSDownloadStream;
import com.mongodb.reactivestreams.client.gridfs.GridFSFindPublisher;
+import com.mongodb.reactivestreams.client.gridfs.GridFSUploadPublisher;
/**
* {@link ReactiveGridFsOperations} implementation to store content into MongoDB GridFS. Uses by default
@@ -51,6 +53,7 @@ import com.mongodb.reactivestreams.client.gridfs.GridFSFindPublisher;
* @author Mark Paluch
* @author Nick Stolwijk
* @author Denis Zavedeev
+ * @author Christoph Strobl
* @since 2.2
*/
public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements ReactiveGridFsOperations {
@@ -105,16 +108,6 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R
this.bucket = bucket;
}
- /*
- * (non-Javadoc)
- * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#store(com.mongodb.reactivestreams.client.gridfs.AsyncInputStream, java.lang.String, java.lang.String, java.lang.Object)
- */
- @Override
- public Mono store(AsyncInputStream content, @Nullable String filename, @Nullable String contentType,
- @Nullable Object metadata) {
- return store(content, filename, contentType, toDocument(metadata));
- }
-
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#store(org.reactivestreams.Publisher, java.lang.String, java.lang.String, java.lang.Object)
@@ -125,18 +118,6 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R
return store(content, filename, contentType, toDocument(metadata));
}
- /*
- * (non-Javadoc)
- * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#store(com.mongodb.reactivestreams.client.gridfs.AsyncInputStream, java.lang.String, java.lang.String, org.bson.Document)
- */
- @Override
- public Mono store(AsyncInputStream content, @Nullable String filename, @Nullable String contentType,
- @Nullable Document metadata) {
-
- Assert.notNull(content, "InputStream must not be null!");
- return Mono.from(getGridFs().uploadFromStream(filename, content, computeUploadOptionsFor(contentType, metadata)));
- }
-
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#store(org.reactivestreams.Publisher, java.lang.String, java.lang.String, org.bson.Document)
@@ -147,7 +128,12 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R
Assert.notNull(content, "Content must not be null!");
- return BinaryStreamAdapters.toAsyncInputStream(content).flatMap(it -> store(it, filename, contentType, metadata));
+ GridFSUploadOptions uploadOptions = new GridFSUploadOptions();
+ uploadOptions.metadata(metadata);
+
+ GridFSUploadPublisher publisher = getGridFs().uploadFromPublisher(filename,
+ Flux.from(content).map(this::dataBufferToByteBuffer), uploadOptions);
+ return Mono.from(publisher);
}
/*
@@ -223,12 +209,7 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R
Assert.notNull(file, "GridFSFile must not be null!");
return Mono.fromSupplier(() -> {
-
- return new ReactiveGridFsResource(file, chunkSize -> {
-
- GridFSDownloadStream stream = getGridFs().openDownloadStream(file.getId());
- return BinaryStreamAdapters.toPublisher(stream, dataBufferFactory, chunkSize);
- });
+ return new ReactiveGridFsResource(file.getFilename(), getGridFs().downloadToPublisher(file.getId()));
});
}
@@ -284,4 +265,14 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R
MongoDatabase db = dbFactory.getMongoDatabase();
return bucket == null ? GridFSBuckets.create(db) : GridFSBuckets.create(db, bucket);
}
+
+ private ByteBuffer dataBufferToByteBuffer(DataBuffer buffer) {
+
+ ByteBuffer byteBuffer = buffer.asByteBuffer();
+ ByteBuffer copy = ByteBuffer.allocate(byteBuffer.remaining());
+ byteBuffer.put(copy);
+ copy.flip();
+
+ return copy;
+ }
}
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/StringBasedMongoQuery.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/StringBasedMongoQuery.java
index ff84452d8..1eb4f66c3 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/StringBasedMongoQuery.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/StringBasedMongoQuery.java
@@ -16,9 +16,9 @@
package org.springframework.data.mongodb.repository.query;
import org.bson.Document;
+import org.bson.codecs.configuration.CodecRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.springframework.data.mongodb.core.MongoOperations;
import org.springframework.data.mongodb.core.query.BasicQuery;
import org.springframework.data.mongodb.core.query.Query;
@@ -28,6 +28,9 @@ import org.springframework.data.repository.query.QueryMethodEvaluationContextPro
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.util.Assert;
+import com.mongodb.MongoClientSettings;
+import com.mongodb.client.MongoDatabase;
+
/**
* Query to use a plain JSON String to create the {@link Query} to actually execute.
*
@@ -40,11 +43,11 @@ public class StringBasedMongoQuery extends AbstractMongoQuery {
private static final String COUNT_EXISTS_AND_DELETE = "Manually defined query for %s cannot be a count and exists or delete query at the same time!";
private static final Logger LOG = LoggerFactory.getLogger(StringBasedMongoQuery.class);
- private static final ParameterBindingDocumentCodec CODEC = new ParameterBindingDocumentCodec();
private final String query;
private final String fieldSpec;
+ private final ParameterBindingDocumentCodec codec;
private final SpelExpressionParser expressionParser;
private final QueryMethodEvaluationContextProvider evaluationContextProvider;
@@ -106,6 +109,10 @@ public class StringBasedMongoQuery extends AbstractMongoQuery {
this.isExistsQuery = false;
this.isDeleteQuery = false;
}
+
+ CodecRegistry codecRegistry = mongoOperations.execute(MongoDatabase::getCodecRegistry);
+ this.codec = new ParameterBindingDocumentCodec(
+ codecRegistry != null ? codecRegistry : MongoClientSettings.getDefaultCodecRegistry());
}
/*
@@ -118,8 +125,8 @@ public class StringBasedMongoQuery extends AbstractMongoQuery {
ParameterBindingContext bindingContext = new ParameterBindingContext((accessor::getBindableValue), expressionParser,
() -> evaluationContextProvider.getEvaluationContext(getQueryMethod().getParameters(), accessor.getValues()));
- Document queryObject = CODEC.decode(this.query, bindingContext);
- Document fieldsObject = CODEC.decode(this.fieldSpec, bindingContext);
+ Document queryObject = codec.decode(this.query, bindingContext);
+ Document fieldsObject = codec.decode(this.fieldSpec, bindingContext);
Query query = new BasicQuery(queryObject, fieldsObject).with(accessor.getSort());
diff --git a/spring-data-mongodb/src/main/resources/org/springframework/data/mongodb/config/spring-mongo-3.0.xsd b/spring-data-mongodb/src/main/resources/org/springframework/data/mongodb/config/spring-mongo-3.0.xsd
index a6f5ceffd..e670b66cd 100644
--- a/spring-data-mongodb/src/main/resources/org/springframework/data/mongodb/config/spring-mongo-3.0.xsd
+++ b/spring-data-mongodb/src/main/resources/org/springframework/data/mongodb/config/spring-mongo-3.0.xsd
@@ -370,6 +370,16 @@ Reference to FactoryBean for com.mongodb.AutoEncryptionSettings - @since 2.2
+
+
+
+
+
+
+
+
+
+
@@ -471,6 +481,16 @@ The application name to use when connecting to MongoDB. Mainly used to identify
]]>
+
+
+
+
+
+
+
+
values = definition.getPropertyValues().getPropertyValueList();
- values.forEach(System.out::println);
assertThat(values.get(2).getValue()).isInstanceOf(BeanDefinition.class);
BeanDefinition x = (BeanDefinition) values.get(2).getValue();
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoClientSettingsFactoryBeanUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoClientSettingsFactoryBeanUnitTests.java
index ee93abf13..98732af8d 100644
--- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoClientSettingsFactoryBeanUnitTests.java
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoClientSettingsFactoryBeanUnitTests.java
@@ -17,11 +17,13 @@ package org.springframework.data.mongodb.core;
import static org.assertj.core.api.Assertions.*;
+import org.bson.UuidRepresentation;
import org.junit.Test;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.data.mongodb.config.ReadConcernPropertyEditor;
import org.springframework.data.mongodb.config.ReadPreferencePropertyEditor;
+import org.springframework.data.mongodb.config.UUidRepresentationPropertyEditor;
import org.springframework.test.util.ReflectionTestUtils;
import com.mongodb.ReadConcern;
@@ -63,4 +65,19 @@ public class MongoClientSettingsFactoryBeanUnitTests {
MongoClientSettingsFactoryBean bean = factory.getBean("&factory", MongoClientSettingsFactoryBean.class);
assertThat(ReflectionTestUtils.getField(bean, "readConcern")).isEqualTo(ReadConcern.MAJORITY);
}
+
+ @Test // DATAMONGO-2427
+ public void convertsUuidRepresentationCorrectly() {
+
+ RootBeanDefinition definition = new RootBeanDefinition(MongoClientSettingsFactoryBean.class);
+ definition.getPropertyValues().addPropertyValue("uUidRepresentation", "STANDARD");
+
+ DefaultListableBeanFactory factory = new DefaultListableBeanFactory();
+ factory.registerCustomEditor(ReadPreference.class, UUidRepresentationPropertyEditor.class);
+
+ factory.registerBeanDefinition("factory", definition);
+
+ MongoClientSettingsFactoryBean bean = factory.getBean("&factory", MongoClientSettingsFactoryBean.class);
+ assertThat(ReflectionTestUtils.getField(bean, "uUidRepresentation")).isEqualTo(UuidRepresentation.STANDARD);
+ }
}
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateTests.java
index 4ec1a1bdc..217a86a42 100644
--- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateTests.java
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateTests.java
@@ -20,6 +20,7 @@ import static org.springframework.data.mongodb.core.query.Criteria.*;
import static org.springframework.data.mongodb.core.query.Query.*;
import static org.springframework.data.mongodb.core.query.Update.*;
+import com.mongodb.MongoClientSettings;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdaptersUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdaptersUnitTests.java
deleted file mode 100644
index 86609ca8c..000000000
--- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdaptersUnitTests.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Copyright 2019-2020 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.springframework.data.mongodb.gridfs;
-
-import static org.assertj.core.api.Assertions.*;
-
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.test.StepVerifier;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.junit.Test;
-
-import org.springframework.core.io.ClassPathResource;
-import org.springframework.core.io.buffer.DataBuffer;
-import org.springframework.core.io.buffer.DataBufferUtils;
-import org.springframework.core.io.buffer.DefaultDataBufferFactory;
-import org.springframework.util.StreamUtils;
-
-import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
-import com.mongodb.reactivestreams.client.gridfs.helpers.AsyncStreamHelper;
-
-/**
- * Unit tests for {@link BinaryStreamAdapters}.
- *
- * @author Mark Paluch
- */
-public class BinaryStreamAdaptersUnitTests {
-
- @Test // DATAMONGO-1855
- public void shouldAdaptAsyncInputStreamToDataBufferPublisher() throws IOException {
-
- ClassPathResource resource = new ClassPathResource("gridfs/gridfs.xml");
-
- byte[] content = StreamUtils.copyToByteArray(resource.getInputStream());
- AsyncInputStream inputStream = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream());
-
- Flux dataBuffers = BinaryStreamAdapters.toPublisher(inputStream, new DefaultDataBufferFactory(), 256);
-
- DataBufferUtils.join(dataBuffers) //
- .as(StepVerifier::create) //
- .consumeNextWith(actual -> {
-
- byte[] actualContent = new byte[actual.readableByteCount()];
- actual.read(actualContent);
- assertThat(actualContent).isEqualTo(content);
- }) //
- .verifyComplete();
- }
-
- @Test // DATAMONGO-1855
- public void shouldAdaptBinaryPublisherToAsyncInputStream() throws IOException {
-
- ClassPathResource resource = new ClassPathResource("gridfs/gridfs.xml");
-
- byte[] content = StreamUtils.copyToByteArray(resource.getInputStream());
-
- Flux dataBuffers = DataBufferUtils.readInputStream(resource::getInputStream,
- new DefaultDataBufferFactory(), 10);
-
- AsyncInputStream inputStream = BinaryStreamAdapters.toAsyncInputStream(dataBuffers).block();
- ByteBuffer complete = readBuffer(inputStream);
-
- assertThat(complete).isEqualTo(ByteBuffer.wrap(content));
- }
-
- static ByteBuffer readBuffer(AsyncInputStream inputStream) {
-
- ByteBuffer complete = ByteBuffer.allocate(1024);
-
- boolean hasData = true;
- while (hasData) {
-
- ByteBuffer chunk = ByteBuffer.allocate(100);
-
- Integer bytesRead = Mono.from(inputStream.read(chunk)).block();
-
- chunk.flip();
- complete.put(chunk);
-
- hasData = bytesRead > -1;
- }
-
- complete.flip();
-
- return complete;
- }
-}
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapterUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapterUnitTests.java
deleted file mode 100644
index e1336364f..000000000
--- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapterUnitTests.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Copyright 2019-2020 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.springframework.data.mongodb.gridfs;
-
-import static org.mockito.Mockito.*;
-
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.test.StepVerifier;
-
-import org.junit.Test;
-
-import org.springframework.core.io.buffer.DataBuffer;
-import org.springframework.core.io.buffer.DataBufferFactory;
-import org.springframework.core.io.buffer.DefaultDataBufferFactory;
-
-/**
- * Unit tests for {@link DataBufferPublisherAdapter}.
- *
- * @author Mark Paluch
- */
-public class DataBufferPublisherAdapterUnitTests {
-
- DataBufferFactory factory = new DefaultDataBufferFactory();
-
- @Test // DATAMONGO-2230
- public void adapterShouldPropagateErrors() {
-
- AsyncInputStreamAdapter asyncInput = mock(AsyncInputStreamAdapter.class);
-
- when(asyncInput.read(any())).thenReturn(Mono.just(1), Mono.error(new IllegalStateException()));
- when(asyncInput.close()).thenReturn(Mono.empty());
-
- Flux binaryStream = DataBufferPublisherAdapter.createBinaryStream(asyncInput, factory, 256);
-
- StepVerifier.create(binaryStream, 0) //
- .thenRequest(1) //
- .expectNextCount(1) //
- .thenRequest(1) //
- .verifyError(IllegalStateException.class);
- }
-}
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java
index a444b99b5..b3d0e3983 100644
--- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java
@@ -26,13 +26,16 @@ import reactor.test.StepVerifier;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import org.bson.BsonObjectId;
import org.bson.Document;
import org.bson.types.ObjectId;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
@@ -54,8 +57,11 @@ import org.springframework.util.StreamUtils;
import com.mongodb.client.gridfs.model.GridFSFile;
import com.mongodb.internal.HexUtils;
-import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
-import com.mongodb.reactivestreams.client.gridfs.helpers.AsyncStreamHelper;
+import com.mongodb.internal.connection.tlschannel.impl.ByteBufferUtil;
+import com.mongodb.reactivestreams.client.gridfs.GridFSBucket;
+import com.mongodb.reactivestreams.client.gridfs.GridFSBuckets;
+import com.mongodb.reactivestreams.client.gridfs.GridFSUploadPublisher;
+import com.mongodb.reactivestreams.client.internal.Publishers;
/**
* Integration tests for {@link ReactiveGridFsTemplate}.
@@ -85,6 +91,7 @@ public class ReactiveGridFsTemplateTests {
}
@Test // DATAMONGO-1855
+ @Ignore("https://jira.mongodb.org/browse/JAVARS-224")
public void storesAndFindsSimpleDocument() {
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
@@ -102,6 +109,7 @@ public class ReactiveGridFsTemplateTests {
}
@Test // DATAMONGO-1855
+ @Ignore("https://jira.mongodb.org/browse/JAVARS-224")
public void storesAndLoadsLargeFileCorrectly() {
ByteBuffer buffer = ByteBuffer.allocate(1000 * 1000); // 1 mb
@@ -160,13 +168,15 @@ public class ReactiveGridFsTemplateTests {
// }
@Test // DATAMONGO-1855
+ @Ignore("https://jira.mongodb.org/browse/JAVARS-224")
public void writesMetadataCorrectly() throws IOException {
Document metadata = new Document("key", "value");
- AsyncInputStream stream = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream());
+ Flux source = DataBufferUtils.read(resource, new DefaultDataBufferFactory(), 256);
+ // AsyncInputStream stream = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream());
- ObjectId reference = operations.store(stream, "foo.xml", "binary/octet-stream", metadata).block();
+ ObjectId reference = operations.store(source, "foo.xml", "binary/octet-stream", metadata).block();
operations.find(query(whereMetaData("key").is("value"))) //
.as(StepVerifier::create) //
@@ -177,14 +187,14 @@ public class ReactiveGridFsTemplateTests {
}
@Test // DATAMONGO-1855
+ @Ignore("https://jira.mongodb.org/browse/JAVARS-224")
public void marshalsComplexMetadata() throws IOException {
Metadata metadata = new Metadata();
metadata.version = "1.0";
- AsyncInputStream stream = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream());
-
- ObjectId reference = operations.store(stream, "foo.xml", "binary/octet-stream", metadata).block();
+ Flux source = DataBufferUtils.read(resource, new DefaultDataBufferFactory(), 256);
+ ObjectId reference = operations.store(source, "foo.xml", "binary/octet-stream", metadata).block();
operations.find(query(whereMetaData("version").is("1.0"))) //
.as(StepVerifier::create) //
@@ -196,12 +206,12 @@ public class ReactiveGridFsTemplateTests {
}
@Test // DATAMONGO-1855
+ @Ignore("https://jira.mongodb.org/browse/JAVARS-224")
public void getResourceShouldRetrieveContentByIdentity() throws IOException {
byte[] content = StreamUtils.copyToByteArray(resource.getInputStream());
- AsyncInputStream upload = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream());
-
- ObjectId reference = operations.store(upload, "foo.xml", null, null).block();
+ Flux source = DataBufferUtils.read(resource, new DefaultDataBufferFactory(), 256);
+ ObjectId reference = operations.store(source, "foo.xml", null, null).block();
operations.findOne(query(where("_id").is(reference))).flatMap(operations::getResource)
.flatMapMany(ReactiveGridFsResource::getDownloadStream) //
@@ -218,11 +228,12 @@ public class ReactiveGridFsTemplateTests {
}
@Test // DATAMONGO-1855, DATAMONGO-2240
+ @Ignore("https://jira.mongodb.org/browse/JAVARS-224")
public void shouldEmitFirstEntryWhenFindFirstRetrievesMoreThanOneResult() throws IOException {
- AsyncInputStream upload1 = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream());
- AsyncInputStream upload2 = AsyncStreamHelper
- .toAsyncInputStream(new ClassPathResource("gridfs/another-resource.xml").getInputStream());
+ Flux upload1 = DataBufferUtils.read(resource, new DefaultDataBufferFactory(), 256);
+ Flux upload2 = DataBufferUtils.read(new ClassPathResource("gridfs/another-resource.xml"),
+ new DefaultDataBufferFactory(), 256);
operations.store(upload1, "foo.xml", null, null).block();
operations.store(upload2, "foo2.xml", null, null).block();
@@ -244,16 +255,17 @@ public class ReactiveGridFsTemplateTests {
.assertNext(actual -> {
assertThat(actual.exists()).isFalse();
- assertThat(actual.getGridFSFile()).isNull();
+ assertThat(actual.getGridFSFile()).isEqualTo(Mono.empty());
}).verifyComplete();
}
@Test // DATAMONGO-1855
+ @Ignore("https://jira.mongodb.org/browse/JAVARS-224")
public void shouldEmitErrorWhenFindOneRetrievesMoreThanOneResult() throws IOException {
- AsyncInputStream upload1 = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream());
- AsyncInputStream upload2 = AsyncStreamHelper
- .toAsyncInputStream(new ClassPathResource("gridfs/another-resource.xml").getInputStream());
+ Flux upload1 = DataBufferUtils.read(resource, new DefaultDataBufferFactory(), 256);
+ Flux upload2 = DataBufferUtils.read(new ClassPathResource("gridfs/another-resource.xml"),
+ new DefaultDataBufferFactory(), 256);
operations.store(upload1, "foo.xml", null, null).block();
operations.store(upload2, "foo2.xml", null, null).block();
@@ -265,10 +277,11 @@ public class ReactiveGridFsTemplateTests {
}
@Test // DATAMONGO-1855
+ @Ignore("https://jira.mongodb.org/browse/JAVARS-224")
public void getResourcesByPattern() throws IOException {
byte[] content = StreamUtils.copyToByteArray(resource.getInputStream());
- AsyncInputStream upload = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream());
+ Flux upload = DataBufferUtils.read(resource, new DefaultDataBufferFactory(), 256);
operations.store(upload, "foo.xml", null, null).block();
@@ -287,6 +300,7 @@ public class ReactiveGridFsTemplateTests {
}
@Test // DATAMONGO-765
+ @Ignore("https://jira.mongodb.org/browse/JAVARS-224")
public void considersSkipLimitWhenQueryingFiles() {
DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
@@ -311,4 +325,57 @@ public class ReactiveGridFsTemplateTests {
static class Metadata {
String version;
}
+
+ @Test //
+ @Ignore("https://jira.mongodb.org/browse/JAVARS-224")
+ public void xxx() {
+
+ GridFSBucket buckets = GridFSBuckets.create(dbFactory.getMongoDatabase());
+
+ DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
+ DefaultDataBuffer first = factory.wrap("first".getBytes());
+ // DefaultDataBuffer second = factory.wrap("second".getBytes());
+
+ Flux source = Flux.just(first);
+
+ // GridFSUploadPublisher objectIdGridFSUploadPublisher = buckets.uploadFromPublisher("foo.xml",
+ // Mono.just(ByteBuffer.wrap("hello".getBytes())));
+ GridFSUploadPublisher objectIdGridFSUploadPublisher = buckets.uploadFromPublisher("foo.xml",
+ source.map(DataBuffer::asByteBuffer));
+ Mono idPublisher = Mono.from(objectIdGridFSUploadPublisher);
+ idPublisher.as(StepVerifier::create).expectNextCount(1).verifyComplete();
+ }
+
+ @Test
+ @Ignore("https://jira.mongodb.org/browse/JAVARS-224")
+ public void xxx2() {
+
+ GridFSBucket buckets = GridFSBuckets.create(dbFactory.getMongoDatabase());
+
+ Flux source = Flux.just(ByteBuffer.wrap("first".getBytes()), ByteBuffer.wrap("second".getBytes()));
+ Publisher rawSource = toPublisher(ByteBuffer.wrap("first".getBytes()),
+ ByteBuffer.wrap("second".getBytes()));
+
+ // GridFSUploadPublisher objectIdGridFSUploadPublisher = buckets.uploadFromPublisher("foo.xml",
+ // Mono.just(ByteBuffer.wrap("hello".getBytes())));
+ // GridFSUploadPublisher objectIdGridFSUploadPublisher = buckets.uploadFromPublisher("foo.xml", source);
+ GridFSUploadPublisher objectIdGridFSUploadPublisher = buckets.uploadFromPublisher("foo.xml", rawSource);
+ Mono.from(objectIdGridFSUploadPublisher).as(StepVerifier::create).expectNextCount(1).verifyComplete();
+
+ // idPublisher;
+ }
+
+ private static Publisher toPublisher(final ByteBuffer... byteBuffers) {
+ return Publishers.publishAndFlatten(callback -> callback.onResult(Arrays.asList(byteBuffers), null));
+ }
+
+ private ByteBuffer hack(DataBuffer buffer) {
+
+ ByteBuffer byteBuffer = buffer.asByteBuffer();
+ ByteBuffer copy = ByteBuffer.allocate(byteBuffer.remaining());
+ ByteBufferUtil.copy(byteBuffer, copy, byteBuffer.arrayOffset());
+ copy.flip();
+
+ return copy;
+ }
}
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/query/StringBasedMongoQueryUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/query/StringBasedMongoQueryUnitTests.java
index 4261c56a1..3a8a31ac8 100644
--- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/query/StringBasedMongoQueryUnitTests.java
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/query/StringBasedMongoQueryUnitTests.java
@@ -30,13 +30,15 @@ import java.util.UUID;
import org.bson.BsonBinarySubType;
import org.bson.Document;
+import org.bson.UuidRepresentation;
+import org.bson.codecs.configuration.CodecRegistry;
import org.bson.types.ObjectId;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
+import org.springframework.data.mongodb.core.DbCallback;
import org.springframework.data.mongodb.core.DocumentTestUtils;
import org.springframework.data.mongodb.core.ExecutableFindOperation.ExecutableFind;
import org.springframework.data.mongodb.core.MongoOperations;
@@ -57,6 +59,9 @@ import org.springframework.data.repository.query.QueryMethodEvaluationContextPro
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.util.Base64Utils;
+import com.mongodb.MongoClientSettings;
+import com.mongodb.reactivestreams.client.MongoClients;
+
/**
* Unit tests for {@link StringBasedMongoQuery}.
*
@@ -350,7 +355,18 @@ public class StringBasedMongoQueryUnitTests {
org.springframework.data.mongodb.core.query.Query reference = new BasicQuery(
"{'lastname' : { $binary:\"5PHq4zvkTYa5WbZAgvtjNg==\", $type: \"03\"}}");
- assertThat(query.getQueryObject().toJson()).isEqualTo(reference.getQueryObject().toJson());
+ // CodecRegistry registry =
+ // MongoClientSettings.builder().uuidRepresentation(UuidRepresentation.JAVA_LEGACY).build().getCodecRegistry();
+
+ // TODO: use OverridableUuidRepresentationCodecRegistry instead to save resources
+ CodecRegistry registry = MongoClients
+ .create(MongoClientSettings.builder().uuidRepresentation(UuidRepresentation.JAVA_LEGACY).build())
+ .getDatabase("database").getCodecRegistry();
+
+ // OverridableUuidRepresentationCodecRegistry
+
+ assertThat(query.getQueryObject().toJson(registry.get(Document.class)))
+ .isEqualTo(reference.getQueryObject().toJson());
}
@Test // DATAMONGO-2029
@@ -367,7 +383,36 @@ public class StringBasedMongoQueryUnitTests {
org.springframework.data.mongodb.core.query.Query reference = new BasicQuery(
"{'lastname' : { $in: [{ $binary : \"5PHq4zvkTYa5WbZAgvtjNg==\", $type : \"03\" }, { $binary : \"5PH+yjvkTYa5WbZAgvtjNg==\", $type : \"03\" }]}}");
- assertThat(query.getQueryObject().toJson()).isEqualTo(reference.getQueryObject().toJson());
+ // TODO: use OverridableUuidRepresentationCodecRegistry instead to save resources
+ CodecRegistry registry = MongoClients
+ .create(MongoClientSettings.builder().uuidRepresentation(UuidRepresentation.JAVA_LEGACY).build())
+ .getDatabase("database").getCodecRegistry();
+ assertThat(query.getQueryObject().toJson(registry.get(Document.class)))
+ .isEqualTo(reference.getQueryObject().toJson());
+ }
+
+ @Test // DATAMONGO-2427
+ public void shouldSupportNonQuotedUUIDCollectionReplacementWhenUsingNonLegacyUUIDCodec() {
+
+ // TODO: use OverridableUuidRepresentationCodecRegistry instead to save resources
+ CodecRegistry registry = MongoClients
+ .create(MongoClientSettings.builder().uuidRepresentation(UuidRepresentation.STANDARD).build())
+ .getDatabase("database").getCodecRegistry();
+ when(operations.execute(any(DbCallback.class))).thenReturn(registry);
+
+ UUID uuid1 = UUID.fromString("864de43b-e3ea-f1e4-3663-fb8240b659b9");
+ UUID uuid2 = UUID.fromString("864de43b-cafe-f1e4-3663-fb8240b659b9");
+
+ ConvertingParameterAccessor accessor = StubParameterAccessor.getAccessor(converter,
+ (Object) Arrays.asList(uuid1, uuid2));
+ StringBasedMongoQuery mongoQuery = createQueryForMethod("findByLastnameAsUUIDIn", List.class);
+
+ org.springframework.data.mongodb.core.query.Query query = mongoQuery.createQuery(accessor);
+ org.springframework.data.mongodb.core.query.Query reference = new BasicQuery(
+ "{'lastname' : { $in: [{ $binary : \"hk3kO+Pq8eQ2Y/uCQLZZuQ==\", $type : \"04\" }, { $binary : \"hk3kO8r+8eQ2Y/uCQLZZuQ==\", $type : \"04\" }]}}");
+
+ assertThat(query.getQueryObject().toJson(registry.get(Document.class)))
+ .isEqualTo(reference.getQueryObject().toJson());
}
@Test // DATAMONGO-1911
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/test/util/MongoTestUtils.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/test/util/MongoTestUtils.java
index 3924b50ab..7bb71cff8 100644
--- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/test/util/MongoTestUtils.java
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/test/util/MongoTestUtils.java
@@ -22,8 +22,10 @@ import java.time.Duration;
import java.util.List;
import org.bson.Document;
+import org.springframework.data.mongodb.SpringDataMongoDB;
import org.springframework.data.util.Version;
+import com.mongodb.ConnectionString;
import com.mongodb.ReadPreference;
import com.mongodb.WriteConcern;
import com.mongodb.client.MongoClient;
@@ -55,7 +57,9 @@ public class MongoTestUtils {
}
public static MongoClient client(String host, int port) {
- return com.mongodb.client.MongoClients.create(String.format(CONNECTION_STRING_PATTERN, host, port));
+
+ ConnectionString connectionString = new ConnectionString(String.format(CONNECTION_STRING_PATTERN, host, port));
+ return com.mongodb.client.MongoClients.create(connectionString, SpringDataMongoDB.driverInformation());
}
/**
@@ -68,7 +72,9 @@ public class MongoTestUtils {
}
public static com.mongodb.reactivestreams.client.MongoClient reactiveClient(String host, int port) {
- return MongoClients.create(String.format(CONNECTION_STRING_PATTERN, host, port));
+
+ ConnectionString connectionString = new ConnectionString(String.format(CONNECTION_STRING_PATTERN, host, port));
+ return MongoClients.create(connectionString, SpringDataMongoDB.driverInformation());
}
/**
diff --git a/spring-data-mongodb/src/test/resources/org/springframework/data/mongodb/config/MongoClientNamespaceTests-context.xml b/spring-data-mongodb/src/test/resources/org/springframework/data/mongodb/config/MongoClientNamespaceTests-context.xml
index 912d318ba..1bd3aa2a0 100644
--- a/spring-data-mongodb/src/test/resources/org/springframework/data/mongodb/config/MongoClientNamespaceTests-context.xml
+++ b/spring-data-mongodb/src/test/resources/org/springframework/data/mongodb/config/MongoClientNamespaceTests-context.xml
@@ -37,4 +37,8 @@
connection-pool-max-connection-idle-time="30"
connection-pool-max-wait-time="15" />
+
+
+
+
diff --git a/src/main/asciidoc/upgrading.adoc b/src/main/asciidoc/upgrading.adoc
index 507790132..5de900855 100644
--- a/src/main/asciidoc/upgrading.adoc
+++ b/src/main/asciidoc/upgrading.adoc
@@ -13,6 +13,9 @@ Instead of the single artifact uber-jar `mongo-java-driver`, imports are now spl
* `org.mongodb:mongodb-driver-sync` (optional)
* `org.mongodb:mongodb-driver-reactivestreams` (optional)
+Depending on the application one of the `mongodb-driver-sync`, `mongodb-driver-reactivestreams` artifacts is is required next to the mandatory `mongodb-driver-core`.
+It is possible to combine the sync and reactive drivers in one application if needed.
+
== Java Configuration
.Java API changes
@@ -140,3 +143,26 @@ Element | Comment
|===
+== Other Changes
+
+=== UUID Types
+
+The MongoDB UUID representation can now be configured with different formats.
+This has to be done via `MongoClientSettings` as shown in the snippet below.
+
+.UUid Codec Configuration
+====
+[source,java]
+----
+
+static class Config extends AbstractMongoClientConfiguration {
+
+ @Override
+ public void configureClientSettings(MongoClientSettings.Builder builder) {
+ builder.uuidRepresentation(UuidRepresentation.STANDARD);
+ }
+
+ // ...
+}
+----
+====