diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java index e29f344ce..d57268388 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java @@ -49,6 +49,7 @@ import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; * {@link #close()} is propagated as cancellation signal to the binary {@link Publisher}. * * @author Mark Paluch + * @author Christoph Strobl * @since 2.2 */ @RequiredArgsConstructor @@ -75,10 +76,10 @@ class AsyncInputStreamAdapter implements AsyncInputStream { .get(); // see DEMAND - private volatile long demand; + volatile long demand; // see SUBSCRIBED - private volatile int subscribed = SUBSCRIPTION_NOT_SUBSCRIBED; + volatile int subscribed = SUBSCRIPTION_NOT_SUBSCRIBED; /* * (non-Javadoc) @@ -94,20 +95,23 @@ class AsyncInputStreamAdapter implements AsyncInputStream { try { if (error != null) { + sink.error(error); return; } if (bytecount == -1) { + sink.success(-1); return; } ByteBuffer byteBuffer = db.asByteBuffer(); int toWrite = byteBuffer.remaining(); - dst.put(byteBuffer); + dst.put(byteBuffer); sink.success(toWrite); + } catch (Exception e) { sink.error(e); } finally { @@ -127,6 +131,7 @@ class AsyncInputStreamAdapter implements AsyncInputStream { public Publisher close() { return Mono.create(sink -> { + cancelled = true; if (error != null) { @@ -141,6 +146,7 @@ class AsyncInputStreamAdapter implements AsyncInputStream { protected void request(int n) { if (complete) { + terminatePendingReads(); return; } @@ -150,67 +156,9 @@ class AsyncInputStreamAdapter implements AsyncInputStream { if (SUBSCRIBED.get(this) == SUBSCRIPTION_NOT_SUBSCRIBED) { if (SUBSCRIBED.compareAndSet(this, SUBSCRIPTION_NOT_SUBSCRIBED, SUBSCRIPTION_SUBSCRIBED)) { - - buffers.subscribe(new CoreSubscriber() { - - @Override - public Context currentContext() { - return subscriberContext; - } - - @Override - public void onSubscribe(Subscription s) { - subscription = s; - - Operators.addCap(DEMAND, AsyncInputStreamAdapter.this, -1); - s.request(1); - } - - @Override - public void onNext(DataBuffer dataBuffer) { - - if (cancelled || complete) { - DataBufferUtils.release(dataBuffer); - Operators.onNextDropped(dataBuffer, subscriberContext); - return; - } - - BiConsumer poll = readRequests.poll(); - - if (poll == null) { - - DataBufferUtils.release(dataBuffer); - Operators.onNextDropped(dataBuffer, subscriberContext); - subscription.cancel(); - return; - } - - poll.accept(dataBuffer, dataBuffer.readableByteCount()); - - requestFromSubscription(subscription); - } - - @Override - public void onError(Throwable t) { - - if (cancelled || complete) { - Operators.onErrorDropped(t, subscriberContext); - return; - } - - error = t; - complete = true; - terminatePendingReads(); - } - - @Override - public void onComplete() { - - complete = true; - terminatePendingReads(); - } - }); + buffers.subscribe(new DataBufferCoreSubscriber()); } + } else { Subscription subscription = this.subscription; @@ -245,4 +193,65 @@ class AsyncInputStreamAdapter implements AsyncInputStream { readers.accept(factory.wrap(new byte[0]), -1); } } + + private class DataBufferCoreSubscriber implements CoreSubscriber { + + @Override + public Context currentContext() { + return AsyncInputStreamAdapter.this.subscriberContext; + } + + @Override + public void onSubscribe(Subscription s) { + + AsyncInputStreamAdapter.this.subscription = s; + + Operators.addCap(DEMAND, AsyncInputStreamAdapter.this, -1); + s.request(1); + } + + @Override + public void onNext(DataBuffer dataBuffer) { + + if (cancelled || complete) { + DataBufferUtils.release(dataBuffer); + Operators.onNextDropped(dataBuffer, AsyncInputStreamAdapter.this.subscriberContext); + return; + } + + BiConsumer poll = AsyncInputStreamAdapter.this.readRequests.poll(); + + if (poll == null) { + + DataBufferUtils.release(dataBuffer); + Operators.onNextDropped(dataBuffer, AsyncInputStreamAdapter.this.subscriberContext); + subscription.cancel(); + return; + } + + poll.accept(dataBuffer, dataBuffer.readableByteCount()); + + requestFromSubscription(subscription); + } + + @Override + public void onError(Throwable t) { + + if (AsyncInputStreamAdapter.this.cancelled || AsyncInputStreamAdapter.this.complete) { + Operators.onErrorDropped(t, AsyncInputStreamAdapter.this.subscriberContext); + return; + } + + AsyncInputStreamAdapter.this.error = t; + AsyncInputStreamAdapter.this.complete = true; + terminatePendingReads(); + } + + @Override + public void onComplete() { + + AsyncInputStreamAdapter.this.complete = true; + terminatePendingReads(); + } + } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java index d55489bc7..df98508fe 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java @@ -39,6 +39,7 @@ import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; * Utility to adapt a {@link AsyncInputStream} to a {@link Publisher} emitting {@link DataBuffer}. * * @author Mark Paluch + * @author Christoph Strobl * @since 2.2 */ class DataBufferPublisherAdapter { @@ -51,7 +52,7 @@ class DataBufferPublisherAdapter { * @param dataBufferFactory must not be {@literal null}. * @return the resulting {@link Publisher}. */ - public static Flux createBinaryStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) { + static Flux createBinaryStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) { State state = new State(inputStream, dataBufferFactory); @@ -73,17 +74,17 @@ class DataBufferPublisherAdapter { @RequiredArgsConstructor static class State { - static final AtomicLongFieldUpdater DEMAND = AtomicLongFieldUpdater.newUpdater(State.class, "demand"); + private static final AtomicLongFieldUpdater DEMAND = AtomicLongFieldUpdater.newUpdater(State.class, "demand"); - static final AtomicIntegerFieldUpdater STATE = AtomicIntegerFieldUpdater.newUpdater(State.class, "state"); + private static final AtomicIntegerFieldUpdater STATE = AtomicIntegerFieldUpdater.newUpdater(State.class, "state"); - static final AtomicIntegerFieldUpdater READ = AtomicIntegerFieldUpdater.newUpdater(State.class, "read"); + private static final AtomicIntegerFieldUpdater READ = AtomicIntegerFieldUpdater.newUpdater(State.class, "read"); - static final int STATE_OPEN = 0; - static final int STATE_CLOSED = 1; + private static final int STATE_OPEN = 0; + private static final int STATE_CLOSED = 1; - static final int READ_NONE = 0; - static final int READ_IN_PROGRESS = 1; + private static final int READ_NONE = 0; + private static final int READ_IN_PROGRESS = 1; final AsyncInputStream inputStream; final DataBufferFactory dataBufferFactory; @@ -140,66 +141,79 @@ class DataBufferPublisherAdapter { DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(); ByteBuffer intermediate = ByteBuffer.allocate(dataBuffer.capacity()); - Mono.from(inputStream.read(intermediate)).subscribe(new CoreSubscriber() { + Mono.from(inputStream.read(intermediate)).subscribe(new BufferCoreSubscriber(sink, dataBuffer, intermediate)); + } - @Override - public Context currentContext() { - return sink.currentContext(); - } + private class BufferCoreSubscriber implements CoreSubscriber { - @Override - public void onSubscribe(Subscription s) { - s.request(1); - } + private final FluxSink sink; + private final DataBuffer dataBuffer; + private final ByteBuffer intermediate; - @Override - public void onNext(Integer bytes) { + BufferCoreSubscriber(FluxSink sink, DataBuffer dataBuffer, ByteBuffer intermediate) { - if (isClosed()) { + this.sink = sink; + this.dataBuffer = dataBuffer; + this.intermediate = intermediate; + } - onReadDone(); - DataBufferUtils.release(dataBuffer); - Operators.onNextDropped(dataBuffer, sink.currentContext()); - return; - } + @Override + public Context currentContext() { + return sink.currentContext(); + } - intermediate.flip(); - dataBuffer.write(intermediate); + @Override + public void onSubscribe(Subscription s) { + s.request(1); + } - sink.next(dataBuffer); + @Override + public void onNext(Integer bytes) { - try { - if (bytes == -1) { - sink.complete(); - } - } finally { - onReadDone(); - } + if (isClosed()) { + + onReadDone(); + DataBufferUtils.release(dataBuffer); + Operators.onNextDropped(dataBuffer, sink.currentContext()); + return; } - @Override - public void onError(Throwable t) { + intermediate.flip(); + dataBuffer.write(intermediate); - if (isClosed()) { + sink.next(dataBuffer); - Operators.onErrorDropped(t, sink.currentContext()); - return; + try { + if (bytes == -1) { + sink.complete(); } - + } finally { onReadDone(); - DataBufferUtils.release(dataBuffer); - Operators.onNextDropped(dataBuffer, sink.currentContext()); - sink.error(t); } + } - @Override - public void onComplete() { + @Override + public void onError(Throwable t) { - if (onShouldRead()) { - emitNext(sink); - } + if (isClosed()) { + + Operators.onErrorDropped(t, sink.currentContext()); + return; } - }); + + onReadDone(); + DataBufferUtils.release(dataBuffer); + Operators.onNextDropped(dataBuffer, sink.currentContext()); + sink.error(t); + } + + @Override + public void onComplete() { + + if (onShouldRead()) { + emitNext(sink); + } + } } } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsOperationsSupport.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsOperationsSupport.java new file mode 100644 index 000000000..8d4bd9f5a --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsOperationsSupport.java @@ -0,0 +1,104 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.gridfs; + +import java.util.Optional; + +import org.bson.Document; +import org.springframework.data.mongodb.core.convert.MongoConverter; +import org.springframework.data.mongodb.core.convert.QueryMapper; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + +import com.mongodb.client.gridfs.model.GridFSUploadOptions; + +/** + * Base class offering common tasks like query mapping and {@link GridFSUploadOptions} computation to be shared across + * imperative and reactive implementations. + * + * @author Christoph Strobl + * @since 2.2 + */ +class GridFsOperationsSupport { + + private final QueryMapper queryMapper; + private final MongoConverter converter; + + /** + * @param converter must not be {@literal null}. + */ + GridFsOperationsSupport(MongoConverter converter) { + + Assert.notNull(converter, "MongoConverter must not be null!"); + + this.converter = converter; + this.queryMapper = new QueryMapper(converter); + } + + /** + * @param query pass the given query though a {@link QueryMapper} to apply type conversion. + * @return never {@literal null}. + */ + protected Document getMappedQuery(Document query) { + return queryMapper.getMappedObject(query, Optional.empty()); + } + + /** + * Compute the {@link GridFSUploadOptions} to be used from the given {@literal contentType} and {@literal metadata} + * {@link Document}. + * + * @param contentType can be {@literal null}. + * @param metadata can be {@literal null} + * @return never {@literal null}. + */ + protected GridFSUploadOptions computeUploadOptionsFor(@Nullable String contentType, @Nullable Document metadata) { + + Document targetMetadata = new Document(); + + if (StringUtils.hasText(contentType)) { + targetMetadata.put(GridFsResource.CONTENT_TYPE_FIELD, contentType); + } + + if (metadata != null) { + targetMetadata.putAll(metadata); + } + + GridFSUploadOptions options = new GridFSUploadOptions(); + options.metadata(targetMetadata); + + return options; + } + + /** + * Convert a given {@literal value} into a {@link Document}. + * + * @param value can be {@literal null}. + * @return an empty {@link Document} if the source value is {@literal null}. + */ + protected Document toDocument(@Nullable Object value) { + + if (value instanceof Document) { + return (Document) value; + } + + Document document = new Document(); + if (value != null) { + converter.write(value, document); + } + return document; + } +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsTemplate.java index 3b87d2a78..6c278ac7c 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsTemplate.java @@ -29,7 +29,6 @@ import org.bson.types.ObjectId; import org.springframework.core.io.support.ResourcePatternResolver; import org.springframework.data.mongodb.MongoDbFactory; import org.springframework.data.mongodb.core.convert.MongoConverter; -import org.springframework.data.mongodb.core.convert.QueryMapper; import org.springframework.data.mongodb.core.query.Query; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -40,7 +39,6 @@ import com.mongodb.client.gridfs.GridFSBucket; import com.mongodb.client.gridfs.GridFSBuckets; import com.mongodb.client.gridfs.GridFSFindIterable; import com.mongodb.client.gridfs.model.GridFSFile; -import com.mongodb.client.gridfs.model.GridFSUploadOptions; /** * {@link GridFsOperations} implementation to store content into MongoDB GridFS. @@ -54,13 +52,11 @@ import com.mongodb.client.gridfs.model.GridFSUploadOptions; * @author Hartmut Lang * @author Niklas Helge Hanft */ -public class GridFsTemplate implements GridFsOperations, ResourcePatternResolver { +public class GridFsTemplate extends GridFsOperationsSupport implements GridFsOperations, ResourcePatternResolver { private final MongoDbFactory dbFactory; private final @Nullable String bucket; - private final MongoConverter converter; - private final QueryMapper queryMapper; /** * Creates a new {@link GridFsTemplate} using the given {@link MongoDbFactory} and {@link MongoConverter}. @@ -81,14 +77,12 @@ public class GridFsTemplate implements GridFsOperations, ResourcePatternResolver */ public GridFsTemplate(MongoDbFactory dbFactory, MongoConverter converter, @Nullable String bucket) { + super(converter); + Assert.notNull(dbFactory, "MongoDbFactory must not be null!"); - Assert.notNull(converter, "MongoConverter must not be null!"); this.dbFactory = dbFactory; - this.converter = converter; this.bucket = bucket; - - this.queryMapper = new QueryMapper(converter); } /* @@ -137,16 +131,9 @@ public class GridFsTemplate implements GridFsOperations, ResourcePatternResolver * (non-Javadoc) * @see org.springframework.data.mongodb.gridfs.GridFsOperations#store(java.io.InputStream, java.lang.String, java.lang.String, java.lang.Object) */ - public ObjectId store(InputStream content, @Nullable String filename, @Nullable String contentType, @Nullable Object metadata) { - - Document document = null; - - if (metadata != null) { - document = new Document(); - converter.write(metadata, document); - } - - return store(content, filename, contentType, document); + public ObjectId store(InputStream content, @Nullable String filename, @Nullable String contentType, + @Nullable Object metadata) { + return store(content, filename, contentType, toDocument(metadata)); } /* @@ -161,25 +148,11 @@ public class GridFsTemplate implements GridFsOperations, ResourcePatternResolver * (non-Javadoc) * @see org.springframework.data.mongodb.gridfs.GridFsOperations#store(java.io.InputStream, java.lang.String, com.mongodb.Document) */ - public ObjectId store(InputStream content, @Nullable String filename, @Nullable String contentType, @Nullable Document metadata) { + public ObjectId store(InputStream content, @Nullable String filename, @Nullable String contentType, + @Nullable Document metadata) { Assert.notNull(content, "InputStream must not be null!"); - - GridFSUploadOptions options = new GridFSUploadOptions(); - - Document mData = new Document(); - - if (StringUtils.hasText(contentType)) { - mData.put(GridFsResource.CONTENT_TYPE_FIELD, contentType); - } - - if (metadata != null) { - mData.putAll(metadata); - } - - options.metadata(mData); - - return getGridFs().uploadFromStream(filename, content, options); + return getGridFs().uploadFromStream(filename, content, computeUploadOptionsFor(contentType, metadata)); } /* @@ -210,8 +183,8 @@ public class GridFsTemplate implements GridFsOperations, ResourcePatternResolver */ public void delete(Query query) { - for (GridFSFile x : find(query)) { - getGridFs().delete(((BsonObjectId) x.getId()).getValue()); + for (GridFSFile gridFSFile : find(query)) { + getGridFs().delete(((BsonObjectId) gridFSFile.getId()).getValue()); } } @@ -246,9 +219,9 @@ public class GridFsTemplate implements GridFsOperations, ResourcePatternResolver } /* - * (non-Javadoc) - * @see org.springframework.core.io.support.ResourcePatternResolver#getResources(java.lang.String) - */ + * (non-Javadoc) + * @see org.springframework.core.io.support.ResourcePatternResolver#getResources(java.lang.String) + */ public GridFsResource[] getResources(String locationPattern) { if (!StringUtils.hasText(locationPattern)) { @@ -272,10 +245,6 @@ public class GridFsTemplate implements GridFsOperations, ResourcePatternResolver return new GridFsResource[] { getResource(locationPattern) }; } - private Document getMappedQuery(Document query) { - return queryMapper.getMappedObject(query, Optional.empty()); - } - private GridFSBucket getGridFs() { MongoDatabase db = dbFactory.getDb(); diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsOperations.java index c4fc184e6..c7f3a578d 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsOperations.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsOperations.java @@ -22,12 +22,10 @@ import org.bson.Document; import org.bson.types.ObjectId; import org.reactivestreams.Publisher; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.support.ResourcePatternResolver; import org.springframework.data.domain.Sort; import org.springframework.data.mongodb.core.query.Query; import org.springframework.lang.Nullable; -import com.mongodb.client.gridfs.GridFSFindIterable; import com.mongodb.client.gridfs.model.GridFSFile; import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; @@ -35,6 +33,7 @@ import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; * Collection of operations to store and read files from MongoDB GridFS using reactive infrastructure. * * @author Mark Paluch + * @author Christoph Strobl * @since 2.2 */ public interface ReactiveGridFsOperations { @@ -44,29 +43,32 @@ public interface ReactiveGridFsOperations { * * @param content must not be {@literal null}. * @param filename must not be {@literal null} or empty. - * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just + * created. */ default Mono store(Publisher content, String filename) { return store(content, filename, (Object) null); } /** - * Stores the given content into a file with the given name. + * Stores the given content into a file applying the given metadata. * * @param content must not be {@literal null}. * @param metadata can be {@literal null}. - * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just + * created. */ default Mono store(Publisher content, @Nullable Object metadata) { return store(content, null, metadata); } /** - * Stores the given content into a file with the given name. + * Stores the given content into a file applying the given metadata. * * @param content must not be {@literal null}. * @param metadata can be {@literal null}. - * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just + * created. */ default Mono store(Publisher content, @Nullable Document metadata) { return store(content, null, metadata); @@ -78,7 +80,8 @@ public interface ReactiveGridFsOperations { * @param content must not be {@literal null}. * @param filename must not be {@literal null} or empty. * @param contentType can be {@literal null}. - * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just + * created. */ default Mono store(Publisher content, @Nullable String filename, @Nullable String contentType) { return store(content, filename, contentType, (Object) null); @@ -91,7 +94,8 @@ public interface ReactiveGridFsOperations { * @param content must not be {@literal null}. * @param filename can be {@literal null} or empty. * @param metadata can be {@literal null}. - * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just + * created. */ default Mono store(Publisher content, @Nullable String filename, @Nullable Object metadata) { return store(content, filename, null, metadata); @@ -105,7 +109,8 @@ public interface ReactiveGridFsOperations { * @param filename must not be {@literal null} or empty. * @param contentType can be {@literal null}. * @param metadata can be {@literal null} - * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just + * created. */ Mono store(AsyncInputStream content, @Nullable String filename, @Nullable String contentType, @Nullable Object metadata); @@ -118,7 +123,8 @@ public interface ReactiveGridFsOperations { * @param filename must not be {@literal null} or empty. * @param contentType can be {@literal null}. * @param metadata can be {@literal null} - * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just + * created. */ Mono store(Publisher content, @Nullable String filename, @Nullable String contentType, @Nullable Object metadata); @@ -129,7 +135,8 @@ public interface ReactiveGridFsOperations { * @param content must not be {@literal null}. * @param filename must not be {@literal null} or empty. * @param metadata can be {@literal null}. - * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just + * created. */ default Mono store(Publisher content, @Nullable String filename, @Nullable Document metadata) { return store(content, filename, null, metadata); @@ -142,63 +149,85 @@ public interface ReactiveGridFsOperations { * @param filename must not be {@literal null} or empty. * @param contentType can be {@literal null}. * @param metadata can be {@literal null}. - * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just + * created. */ Mono store(AsyncInputStream content, @Nullable String filename, @Nullable String contentType, @Nullable Document metadata); - Mono store(Publisher content, String filename, String contentType, Document metadata); + /** + * Stores the given content into a file with the given name and content type using the given metadata. + * + * @param content must not be {@literal null}. + * @param filename must not be {@literal null} or empty. + * @param contentType can be {@literal null}. + * @param metadata can be {@literal null}. + * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just + * created. + */ + Mono store(Publisher content, @Nullable String filename, @Nullable String contentType, + @Nullable Document metadata); /** - * Returns all files matching the given query. Note, that currently {@link Sort} criterias defined at the - * {@link Query} will not be regarded as MongoDB does not support ordering for GridFS file access. + * Returns a {@link Flux} emitting all files matching the given query.
+ * Note: Currently {@link Sort} criteria defined at the {@link Query} will not be regarded as MongoDB + * does not support ordering for GridFS file access. * * @see MongoDB Jira: JAVA-431 * @param query must not be {@literal null}. - * @return {@link GridFSFindIterable} to obtain results from. Eg. by calling - * {@link GridFSFindIterable#into(java.util.Collection)}. + * @return {@link Flux#empty()} if no mach found. */ Flux find(Query query); /** - * Returns a single {@link com.mongodb.client.gridfs.model.GridFSFile} matching the given query or {@literal null} in - * case no file matches. + * Returns a {@link Mono} emitting a single {@link com.mongodb.client.gridfs.model.GridFSFile} matching the given + * query or {@link Mono#empty()} in case no file matches.
+ * NOTE If more than one file matches the given query the resulting {@link Mono} emits an error. If + * you want to obtain the first found file use {@link #findFirst(Query)}. * * @param query must not be {@literal null}. - * @return + * @return {@link Mono#empty()} if not match found. */ Mono findOne(Query query); + /** + * Returns a {@link Mono} emitting the frist {@link com.mongodb.client.gridfs.model.GridFSFile} matching the given + * query or {@link Mono#empty()} in case no file matches. + * + * @param query must not be {@literal null}. + * @return {@link Mono#empty()} if not match found. + */ + Mono findFirst(Query query); + /** * Deletes all files matching the given {@link Query}. * * @param query must not be {@literal null}. + * @return a {@link Mono} signalling operation completion. */ Mono delete(Query query); /** - * Returns the {@link GridFsResource} with the given file name. + * Returns a {@link Mono} emitting the {@link ReactiveGridFsResource} with the given file name. * * @param filename must not be {@literal null}. - * @return the resource. Use {@link org.springframework.core.io.Resource#exists()} to check if the returned - * {@link GridFsResource} is actually present. - * @see ResourcePatternResolver#getResource(String) + * @return {@link Mono#empty()} if no match found. */ Mono getResource(String filename); /** - * Returns the {@link GridFsResource} for a {@link GridFSFile}. + * Returns a {@link Mono} emitting the {@link ReactiveGridFsResource} for a {@link GridFSFile}. * * @param file must not be {@literal null}. - * @return the resource for the file. + * @return {@link Mono#empty()} if no match found. */ Mono getResource(GridFSFile file); /** - * Returns all {@link GridFsResource}s matching the given file name pattern. + * Returns a {@link Flux} emitting all {@link ReactiveGridFsResource}s matching the given file name pattern. * * @param filenamePattern must not be {@literal null}. - * @return + * @return {@link Flux#empty()} if no match found. */ Flux getResources(String filenamePattern); } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java index 4f13ded27..2c182c687 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java @@ -21,24 +21,22 @@ import static org.springframework.data.mongodb.gridfs.GridFsCriteria.*; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.util.Optional; - import org.bson.Document; import org.bson.types.ObjectId; import org.reactivestreams.Publisher; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.dao.IncorrectResultSizeDataAccessException; import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory; import org.springframework.data.mongodb.core.convert.MongoConverter; -import org.springframework.data.mongodb.core.convert.QueryMapper; import org.springframework.data.mongodb.core.query.Query; +import org.springframework.data.mongodb.core.query.SerializationUtils; import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.StringUtils; import com.mongodb.client.gridfs.model.GridFSFile; -import com.mongodb.client.gridfs.model.GridFSUploadOptions; import com.mongodb.reactivestreams.client.MongoDatabase; import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; import com.mongodb.reactivestreams.client.gridfs.GridFSBucket; @@ -53,13 +51,11 @@ import com.mongodb.reactivestreams.client.gridfs.GridFSFindPublisher; * @author Mark Paluch * @since 2.2 */ -public class ReactiveGridFsTemplate implements ReactiveGridFsOperations { +public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements ReactiveGridFsOperations { - private final DataBufferFactory dataBufferFactory; private final ReactiveMongoDatabaseFactory dbFactory; + private final DataBufferFactory dataBufferFactory; private final @Nullable String bucket; - private final MongoConverter converter; - private final QueryMapper queryMapper; /** * Creates a new {@link ReactiveGridFsTemplate} using the given {@link ReactiveMongoDatabaseFactory} and @@ -97,16 +93,14 @@ public class ReactiveGridFsTemplate implements ReactiveGridFsOperations { public ReactiveGridFsTemplate(DataBufferFactory dataBufferFactory, ReactiveMongoDatabaseFactory dbFactory, MongoConverter converter, @Nullable String bucket) { + super(converter); + Assert.notNull(dataBufferFactory, "DataBufferFactory must not be null!"); Assert.notNull(dbFactory, "ReactiveMongoDatabaseFactory must not be null!"); - Assert.notNull(converter, "MongoConverter must not be null!"); this.dataBufferFactory = dataBufferFactory; this.dbFactory = dbFactory; - this.converter = converter; this.bucket = bucket; - - this.queryMapper = new QueryMapper(converter); } /* @@ -138,22 +132,7 @@ public class ReactiveGridFsTemplate implements ReactiveGridFsOperations { @Nullable Document metadata) { Assert.notNull(content, "InputStream must not be null!"); - - GridFSUploadOptions options = new GridFSUploadOptions(); - - Document mData = new Document(); - - if (StringUtils.hasText(contentType)) { - mData.put(GridFsResource.CONTENT_TYPE_FIELD, contentType); - } - - if (metadata != null) { - mData.putAll(metadata); - } - - options.metadata(mData); - - return Mono.from(getGridFs().uploadFromStream(filename, content, options)); + return Mono.from(getGridFs().uploadFromStream(filename, content, computeUploadOptionsFor(contentType, metadata))); } /* @@ -175,10 +154,7 @@ public class ReactiveGridFsTemplate implements ReactiveGridFsOperations { */ @Override public Flux find(Query query) { - - GridFSFindPublisher publisherToUse = prepareQuery(query); - - return Flux.from(publisherToUse); + return Flux.from(prepareQuery(query)); } /* @@ -188,9 +164,29 @@ public class ReactiveGridFsTemplate implements ReactiveGridFsOperations { @Override public Mono findOne(Query query) { - GridFSFindPublisher publisherToUse = prepareQuery(query); + return Flux.from(prepareQuery(query).limit(2)) // + .collectList() // + .flatMap(it -> { + if (it.isEmpty()) { + return Mono.empty(); + } - return Flux.from(publisherToUse.limit(1)).next(); + if (it.size() > 1) { + return Mono.error(new IncorrectResultSizeDataAccessException( + "Query " + SerializationUtils.serializeToJsonSafely(query) + " returned non unique result.", 1)); + } + + return Mono.just(it.get(0)); + }); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#findFirst(org.springframework.data.mongodb.core.query.Query) + */ + @Override + public Mono findFirst(Query query) { + return Flux.from(prepareQuery(query).limit(1)).next(); } /* @@ -199,9 +195,7 @@ public class ReactiveGridFsTemplate implements ReactiveGridFsOperations { */ @Override public Mono delete(Query query) { - - GridFSBucket gridFs = getGridFs(); - return find(query).flatMap(it -> gridFs.delete(it.getId())).then(); + return find(query).flatMap(it -> getGridFs().delete(it.getId())).then(); } /* @@ -273,25 +267,9 @@ public class ReactiveGridFsTemplate implements ReactiveGridFsOperations { return publisherToUse; } - private Document getMappedQuery(Document query) { - return queryMapper.getMappedObject(query, Optional.empty()); - } - protected GridFSBucket getGridFs() { MongoDatabase db = dbFactory.getMongoDatabase(); return bucket == null ? GridFSBuckets.create(db) : GridFSBuckets.create(db, bucket); } - - @Nullable - private Document toDocument(@Nullable Object metadata) { - - Document document = null; - - if (metadata != null) { - document = new Document(); - converter.write(metadata, document); - } - return document; - } } diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateIntegrationTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java similarity index 65% rename from spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateIntegrationTests.java rename to spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java index dea663eb5..879bbdf1a 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateIntegrationTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java @@ -16,10 +16,11 @@ package org.springframework.data.mongodb.gridfs; import static org.assertj.core.api.Assertions.*; -import static org.springframework.data.mongodb.core.query.Criteria.*; +import static org.springframework.data.mongodb.core.query.Criteria.where; import static org.springframework.data.mongodb.core.query.Query.*; import static org.springframework.data.mongodb.gridfs.GridFsCriteria.*; +import org.springframework.dao.IncorrectResultSizeDataAccessException; import reactor.core.publisher.Flux; import reactor.test.StepVerifier; @@ -46,11 +47,14 @@ import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; import com.mongodb.reactivestreams.client.gridfs.helpers.AsyncStreamHelper; /** + * Integration tests for {@link ReactiveGridFsTemplate}. + * * @author Mark Paluch + * @author Christoph Strobl */ @RunWith(SpringRunner.class) @ContextConfiguration("classpath:gridfs/reactive-gridfs.xml") -public class ReactiveGridFsTemplateIntegrationTests { +public class ReactiveGridFsTemplateTests { Resource resource = new ClassPathResource("gridfs/gridfs.xml"); @@ -58,6 +62,7 @@ public class ReactiveGridFsTemplateIntegrationTests { @Before public void setUp() { + operations.delete(new Query()) // .as(StepVerifier::create) // .verifyComplete(); @@ -76,7 +81,8 @@ public class ReactiveGridFsTemplateIntegrationTests { .as(StepVerifier::create) // .assertNext(actual -> { assertThat(((BsonObjectId) actual.getId()).getValue()).isEqualTo(reference); - }).verifyComplete(); + }) // + .verifyComplete(); } @Test // DATAMONGO-1855 @@ -92,7 +98,7 @@ public class ReactiveGridFsTemplateIntegrationTests { .as(StepVerifier::create) // .consumeNextWith(actual -> { assertThat(actual.getObjectId()).isEqualTo(reference); - })// + }) // .verifyComplete(); } @@ -110,7 +116,8 @@ public class ReactiveGridFsTemplateIntegrationTests { .as(StepVerifier::create) // .consumeNextWith(actual -> { assertThat(actual.getObjectId()).isEqualTo(reference); - })// + assertThat(actual.getMetadata()).containsEntry("version", "1.0"); + }) // .verifyComplete(); } @@ -124,14 +131,69 @@ public class ReactiveGridFsTemplateIntegrationTests { operations.findOne(query(where("_id").is(reference))).flatMap(operations::getResource) .flatMapMany(ReactiveGridFsResource::getDownloadStream) // - .transform(DataBufferUtils::join).as(StepVerifier::create) // + .transform(DataBufferUtils::join) // + .as(StepVerifier::create) // + .consumeNextWith(dataBuffer -> { + + byte[] actual = new byte[dataBuffer.readableByteCount()]; + dataBuffer.read(actual); + + assertThat(actual).isEqualTo(content); + }) // + .verifyComplete(); + } + + @Test // DATAMONGO-1855 + public void shouldEmitFirstEntryWhenFindFirstRetrievesMoreThanOneResult() throws IOException { + + AsyncInputStream upload1 = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream()); + AsyncInputStream upload2 = AsyncStreamHelper.toAsyncInputStream(new ClassPathResource("gridfs/another-resource.xml").getInputStream()); + + operations.store(upload1, "foo.xml", null, null).block(); + operations.store(upload2, "foo2.xml", null, null).block(); + + operations.findFirst(query(where("filename").regex("foo*"))) // + .flatMap(operations::getResource) // + .as(StepVerifier::create) // + .expectNextCount(1) // + .verifyComplete(); + } + + @Test // DATAMONGO-1855 + public void shouldEmitErrorWhenFindOneRetrievesMoreThanOneResult() throws IOException { + + AsyncInputStream upload1 = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream()); + AsyncInputStream upload2 = AsyncStreamHelper.toAsyncInputStream(new ClassPathResource("gridfs/another-resource.xml").getInputStream()); + + operations.store(upload1, "foo.xml", null, null).block(); + operations.store(upload2, "foo2.xml", null, null).block(); + + operations.findOne(query(where("filename").regex("foo*"))) // + .as(StepVerifier::create) // + .expectError(IncorrectResultSizeDataAccessException.class) // + .verify(); + } + + @Test // DATAMONGO-1855 + public void getResourcesByPattern() throws IOException { + + byte[] content = StreamUtils.copyToByteArray(resource.getInputStream()); + AsyncInputStream upload = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream()); + + operations.store(upload, "foo.xml", null, null).block(); + + operations.getResources("foo*") // + .flatMap(ReactiveGridFsResource::getDownloadStream) // + .transform(DataBufferUtils::join) // + .as(StepVerifier::create) // .consumeNextWith(dataBuffer -> { byte[] actual = new byte[dataBuffer.readableByteCount()]; dataBuffer.read(actual); assertThat(actual).isEqualTo(content); - }).verifyComplete(); + }) // + .verifyComplete(); } static class Metadata {