diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/ReactiveMongoDatabaseFactory.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/ReactiveMongoDatabaseFactory.java index 05932f7e0..d673fd98c 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/ReactiveMongoDatabaseFactory.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/ReactiveMongoDatabaseFactory.java @@ -31,6 +31,7 @@ import com.mongodb.reactivestreams.client.MongoDatabase; * * @author Mark Paluch * @author Christoph Strobl + * @author Mathieu Ouellet * @since 2.0 */ public interface ReactiveMongoDatabaseFactory extends CodecRegistryProvider { @@ -41,7 +42,7 @@ public interface ReactiveMongoDatabaseFactory extends CodecRegistryProvider { * @return * @throws DataAccessException */ - MongoDatabase getMongoDatabase() throws DataAccessException; + Mono getMongoDatabase() throws DataAccessException; /** * Obtain a {@link MongoDatabase} instance to access the database with the given name. @@ -50,7 +51,7 @@ public interface ReactiveMongoDatabaseFactory extends CodecRegistryProvider { * @return * @throws DataAccessException */ - MongoDatabase getMongoDatabase(String dbName) throws DataAccessException; + Mono getMongoDatabase(String dbName) throws DataAccessException; /** * Exposes a shared {@link MongoExceptionTranslator}. @@ -64,10 +65,7 @@ public interface ReactiveMongoDatabaseFactory extends CodecRegistryProvider { * * @return never {@literal null}. */ - @Override - default CodecRegistry getCodecRegistry() { - return getMongoDatabase().getCodecRegistry(); - } + CodecRegistry getCodecRegistry(); /** * Obtain a {@link Mono} emitting a {@link ClientSession} for given {@link ClientSessionOptions options}. diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/ReactiveMongoDatabaseUtils.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/ReactiveMongoDatabaseUtils.java index 37a5b3d0f..6138d71a5 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/ReactiveMongoDatabaseUtils.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/ReactiveMongoDatabaseUtils.java @@ -41,6 +41,7 @@ import com.mongodb.reactivestreams.client.MongoDatabase; * * @author Mark Paluch * @author Christoph Strobl + * @author Mathieu Ouellet * @since 2.2 */ public class ReactiveMongoDatabaseUtils { @@ -142,14 +143,13 @@ public class ReactiveMongoDatabaseUtils { .flatMap(synchronizationManager -> { return doGetSession(synchronizationManager, factory, sessionSynchronization) // - .map(it -> getMongoDatabaseOrDefault(dbName, factory.withSession(it))); - }) - .onErrorResume(NoTransactionException.class, - e -> Mono.fromSupplier(() -> getMongoDatabaseOrDefault(dbName, factory))) - .defaultIfEmpty(getMongoDatabaseOrDefault(dbName, factory)); + .flatMap(it -> getMongoDatabaseOrDefault(dbName, factory.withSession(it))); + }) // + .onErrorResume(NoTransactionException.class, e -> getMongoDatabaseOrDefault(dbName, factory)) + .switchIfEmpty(getMongoDatabaseOrDefault(dbName, factory)); } - private static MongoDatabase getMongoDatabaseOrDefault(@Nullable String dbName, + private static Mono getMongoDatabaseOrDefault(@Nullable String dbName, ReactiveMongoDatabaseFactory factory) { return StringUtils.hasText(dbName) ? factory.getMongoDatabase(dbName) : factory.getMongoDatabase(); } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java index 1eb5df3d7..58d1fa115 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java @@ -66,6 +66,7 @@ import com.mongodb.reactivestreams.client.MongoCollection; * * @author Mark Paluch * @author Christoph Strobl + * @author Mathieu Ouellet * @since 2.0 * @see Flux * @see Mono @@ -298,7 +299,7 @@ public interface ReactiveMongoOperations extends ReactiveFluentMongoOperations { * @param collectionName name of the collection. * @return an existing collection or one created on first server interaction. */ - MongoCollection getCollection(String collectionName); + Mono> getCollection(String collectionName); /** * Check to see if a collection with a name indicated by the entity class exists. diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java index baddbb2b5..67eec6594 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java @@ -156,6 +156,7 @@ import com.mongodb.reactivestreams.client.MongoDatabase; * @author Mark Paluch * @author Christoph Strobl * @author Roman Puchkovskiy + * @author Mathieu Ouellet * @since 2.0 */ public class ReactiveMongoTemplate implements ReactiveMongoOperations, ApplicationContextAware { @@ -718,15 +719,11 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati * (non-Javadoc) * @see org.springframework.data.mongodb.core.ReactiveMongoOperations#getCollection(java.lang.String) */ - public MongoCollection getCollection(String collectionName) { + public Mono> getCollection(String collectionName) { Assert.notNull(collectionName, "Collection name must not be null!"); - try { - return this.mongoDatabaseFactory.getMongoDatabase().getCollection(collectionName); - } catch (RuntimeException e) { - throw potentiallyConvertRuntimeException(e, exceptionTranslator); - } + return createMono(db -> Mono.just(db.getCollection(collectionName))); } /* @@ -777,7 +774,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati return createFlux(MongoDatabase::listCollectionNames); } - public MongoDatabase getMongoDatabase() { + public Mono getMongoDatabase() { return mongoDatabaseFactory.getMongoDatabase(); } @@ -2074,24 +2071,25 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati FullDocument fullDocument = ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT : FullDocument.UPDATE_LOOKUP; - MongoDatabase db = StringUtils.hasText(database) ? mongoDatabaseFactory.getMongoDatabase(database) - : getMongoDatabase(); - - ChangeStreamPublisher publisher; - if (StringUtils.hasText(collectionName)) { - publisher = filter.isEmpty() ? db.getCollection(collectionName).watch(Document.class) - : db.getCollection(collectionName).watch(filter, Document.class); + return ReactiveMongoDatabaseUtils.getDatabase(database, mongoDatabaseFactory) // + .map(db -> { + ChangeStreamPublisher publisher; + if (StringUtils.hasText(collectionName)) { + publisher = filter.isEmpty() ? db.getCollection(collectionName).watch(Document.class) + : db.getCollection(collectionName).watch(filter, Document.class); - } else { - publisher = filter.isEmpty() ? db.watch(Document.class) : db.watch(filter, Document.class); - } - - publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::resumeAfter).orElse(publisher); - publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation).orElse(publisher); - publisher = options.getResumeBsonTimestamp().map(publisher::startAtOperationTime).orElse(publisher); - publisher = publisher.fullDocument(options.getFullDocumentLookup().orElse(fullDocument)); + } else { + publisher = filter.isEmpty() ? db.watch(Document.class) : db.watch(filter, Document.class); + } - return Flux.from(publisher).map(document -> new ChangeStreamEvent<>(document, targetType, getConverter())); + publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::resumeAfter).orElse(publisher); + publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation) + .orElse(publisher); + publisher = options.getResumeBsonTimestamp().map(publisher::startAtOperationTime).orElse(publisher); + return publisher.fullDocument(options.getFullDocumentLookup().orElse(fullDocument)); + }) // + .flatMapMany(publisher -> Flux.from(publisher) + .map(document -> new ChangeStreamEvent<>(document, targetType, getConverter()))); } List prepareFilter(ChangeStreamOptions options) { @@ -2337,7 +2335,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati LOGGER.debug("Created collection [{}]", collectionName); } - }).thenReturn(getCollection(collectionName)); + }).then(getCollection(collectionName)); } /** @@ -3350,7 +3348,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati * @see org.springframework.data.mongodb.core.ReactiveMongoTemplate#getCollection(java.lang.String) */ @Override - public MongoCollection getCollection(String collectionName) { + public Mono> getCollection(String collectionName) { // native MongoDB objects that offer methods with ClientSession must not be proxied. return delegate.getCollection(collectionName); @@ -3361,7 +3359,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati * @see org.springframework.data.mongodb.core.ReactiveMongoTemplate#getMongoDatabase() */ @Override - public MongoDatabase getMongoDatabase() { + public Mono getMongoDatabase() { // native MongoDB objects that offer methods with ClientSession must not be proxied. return delegate.getMongoDatabase(); diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/SimpleReactiveMongoDatabaseFactory.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/SimpleReactiveMongoDatabaseFactory.java index 4d91dcd35..545357ea9 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/SimpleReactiveMongoDatabaseFactory.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/SimpleReactiveMongoDatabaseFactory.java @@ -18,6 +18,7 @@ package org.springframework.data.mongodb.core; import lombok.Value; import reactor.core.publisher.Mono; +import org.bson.codecs.configuration.CodecRegistry; import org.springframework.aop.framework.ProxyFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.dao.DataAccessException; @@ -41,6 +42,7 @@ import com.mongodb.reactivestreams.client.MongoDatabase; * * @author Mark Paluch * @author Christoph Strobl + * @author Mathieu Ouellet * @since 2.0 */ public class SimpleReactiveMongoDatabaseFactory implements DisposableBean, ReactiveMongoDatabaseFactory { @@ -99,7 +101,7 @@ public class SimpleReactiveMongoDatabaseFactory implements DisposableBean, React * (non-Javadoc) * @see org.springframework.data.mongodb.ReactiveMongoDbFactory#getMongoDatabase() */ - public MongoDatabase getMongoDatabase() throws DataAccessException { + public Mono getMongoDatabase() throws DataAccessException { return getMongoDatabase(databaseName); } @@ -107,12 +109,12 @@ public class SimpleReactiveMongoDatabaseFactory implements DisposableBean, React * (non-Javadoc) * @see org.springframework.data.mongodb.ReactiveMongoDbFactory#getMongoDatabase(java.lang.String) */ - public MongoDatabase getMongoDatabase(String dbName) throws DataAccessException { + public Mono getMongoDatabase(String dbName) throws DataAccessException { Assert.hasText(dbName, "Database name must not be empty."); - MongoDatabase db = mongo.getDatabase(dbName); - return writeConcern != null ? db.withWriteConcern(writeConcern) : db; + return Mono.just(mongo.getDatabase(dbName)) + .map(db -> writeConcern != null ? db.withWriteConcern(writeConcern) : db); } /** @@ -135,6 +137,15 @@ public class SimpleReactiveMongoDatabaseFactory implements DisposableBean, React return this.exceptionTranslator; } + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.ReactiveMongoDatabaseFactory#getCodecRegistry() + */ + @Override + public CodecRegistry getCodecRegistry() { + return this.mongo.getDatabase(databaseName).getCodecRegistry(); + } + /* * (non-Javadoc) * @see org.springframework.data.mongodb.ReactiveMongoDbFactory#getSession(com.mongodb.ClientSessionOptions) @@ -171,8 +182,8 @@ public class SimpleReactiveMongoDatabaseFactory implements DisposableBean, React * @see org.springframework.data.mongodb.ReactiveMongoDatabaseFactory#getMongoDatabase() */ @Override - public MongoDatabase getMongoDatabase() throws DataAccessException { - return decorateDatabase(delegate.getMongoDatabase()); + public Mono getMongoDatabase() throws DataAccessException { + return delegate.getMongoDatabase().map(this::decorateDatabase); } /* @@ -180,8 +191,8 @@ public class SimpleReactiveMongoDatabaseFactory implements DisposableBean, React * @see org.springframework.data.mongodb.ReactiveMongoDatabaseFactory#getMongoDatabase(java.lang.String) */ @Override - public MongoDatabase getMongoDatabase(String dbName) throws DataAccessException { - return decorateDatabase(delegate.getMongoDatabase(dbName)); + public Mono getMongoDatabase(String dbName) throws DataAccessException { + return delegate.getMongoDatabase(dbName).map(this::decorateDatabase); } /* @@ -193,6 +204,15 @@ public class SimpleReactiveMongoDatabaseFactory implements DisposableBean, React return delegate.getExceptionTranslator(); } + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.ReactiveMongoDatabaseFactory#getCodecRegistry() + */ + @Override + public CodecRegistry getCodecRegistry() { + return delegate.getCodecRegistry(); + } + /* * (non-Javadoc) * @see org.springframework.data.mongodb.ReactiveMongoDatabaseFactory#getSession(com.mongodb.ClientSessionOptions) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java index 524de7b9b..ceea4a3e3 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java @@ -18,9 +18,13 @@ package org.springframework.data.mongodb.gridfs; import static org.springframework.data.mongodb.core.query.Query.*; import static org.springframework.data.mongodb.gridfs.GridFsCriteria.*; +import lombok.RequiredArgsConstructor; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.nio.ByteBuffer; + +import org.bson.BsonValue; import org.bson.Document; import org.bson.types.ObjectId; import org.reactivestreams.Publisher; @@ -39,7 +43,6 @@ import org.springframework.util.StringUtils; import com.mongodb.client.gridfs.model.GridFSFile; import com.mongodb.client.gridfs.model.GridFSUploadOptions; -import com.mongodb.reactivestreams.client.MongoDatabase; import com.mongodb.reactivestreams.client.gridfs.GridFSBucket; import com.mongodb.reactivestreams.client.gridfs.GridFSBuckets; import com.mongodb.reactivestreams.client.gridfs.GridFSFindPublisher; @@ -53,6 +56,7 @@ import com.mongodb.reactivestreams.client.gridfs.GridFSUploadPublisher; * @author Nick Stolwijk * @author Denis Zavedeev * @author Christoph Strobl + * @author Mathieu Ouellet * @since 2.2 */ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements ReactiveGridFsOperations { @@ -130,18 +134,15 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R uploadOptions.chunkSizeBytes(upload.getOptions().getChunkSize()); } - if (upload.getFileId() == null) { - GridFSUploadPublisher publisher = getGridFs().uploadFromPublisher(upload.getFilename(), - Flux.from(upload.getContent()).map(DataBuffer::asByteBuffer), uploadOptions); - - return (Mono) Mono.from(publisher); + String filename = upload.getFilename(); + Flux source = Flux.from(upload.getContent()).map(DataBuffer::asByteBuffer); + T fileId = upload.getFileId(); + if (fileId == null) { + return (Mono) createMono(new AutoIdCreatingUploadCallback(filename, source, uploadOptions)); } - GridFSUploadPublisher publisher = getGridFs().uploadFromPublisher( - BsonUtils.simpleToBsonValue(upload.getFileId()), upload.getFilename(), - Flux.from(upload.getContent()).map(DataBuffer::asByteBuffer), uploadOptions); - - return Mono.from(publisher).then(Mono.just(upload.getFileId())); + UploadCallback callback = new UploadCallback(BsonUtils.simpleToBsonValue(fileId), filename, source, uploadOptions); + return createMono(callback).then(Mono.just(fileId)); } /* @@ -150,7 +151,11 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R */ @Override public Flux find(Query query) { - return Flux.from(prepareQuery(query)); + + Document queryObject = getMappedQuery(query.getQueryObject()); + Document sortObject = getMappedQuery(query.getSortObject()); + + return createFlux(new FindCallback(query, queryObject, sortObject)); } /* @@ -160,7 +165,10 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R @Override public Mono findOne(Query query) { - return Flux.from(prepareQuery(query).limit(2)) // + Document queryObject = getMappedQuery(query.getQueryObject()); + Document sortObject = getMappedQuery(query.getSortObject()); + + return createFlux(new FindLimitCallback(query, queryObject, sortObject, 2)) // .collectList() // .flatMap(it -> { if (it.isEmpty()) { @@ -182,7 +190,11 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R */ @Override public Mono findFirst(Query query) { - return Flux.from(prepareQuery(query).limit(1)).next(); + + Document queryObject = getMappedQuery(query.getQueryObject()); + Document sortObject = getMappedQuery(query.getSortObject()); + + return createFlux(new FindLimitCallback(query, queryObject, sortObject, 1)).next(); } /* @@ -191,7 +203,7 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R */ @Override public Mono delete(Query query) { - return find(query).flatMap(it -> getGridFs().delete(it.getId())).then(); + return find(query).flatMap(it -> createMono(new DeleteCallback(it.getId()))).then(); } /* @@ -216,9 +228,8 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R Assert.notNull(file, "GridFSFile must not be null!"); - return Mono.fromSupplier(() -> { - return new ReactiveGridFsResource(file, getGridFs().downloadToPublisher(file.getId()), dataBufferFactory); - }); + return this.doGetBucket() + .map(it -> new ReactiveGridFsResource(file, it.downloadToPublisher(file.getId()), dataBufferFactory)); } /* @@ -243,34 +254,117 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R return getResource(locationPattern).flux(); } - protected GridFSFindPublisher prepareQuery(Query query) { + /** + * Create a reusable Mono for a {@link ReactiveBucketCallback}. It's up to the developer to choose to obtain a new + * {@link Flux} or to reuse the {@link Flux}. + * + * @param callback must not be {@literal null} + * @return a {@link Mono} wrapping the {@link ReactiveBucketCallback}. + */ + public Mono createMono(ReactiveBucketCallback callback) { - Assert.notNull(query, "Query must not be null!"); + Assert.notNull(callback, "ReactiveBucketCallback must not be null!"); - Document queryObject = getMappedQuery(query.getQueryObject()); - Document sortObject = getMappedQuery(query.getSortObject()); + return Mono.defer(this::doGetBucket).flatMap(bucket -> Mono.from(callback.doInBucket(bucket))); + } - GridFSFindPublisher publisherToUse = getGridFs().find(queryObject).sort(sortObject); + /** + * Create a reusable Flux for a {@link ReactiveBucketCallback}. It's up to the developer to choose to obtain a new + * {@link Flux} or to reuse the {@link Flux}. + * + * @param callback must not be {@literal null} + * @return a {@link Flux} wrapping the {@link ReactiveBucketCallback}. + */ + public Flux createFlux(ReactiveBucketCallback callback) { - if (query.getLimit() > 0) { - publisherToUse = publisherToUse.limit(query.getLimit()); + Assert.notNull(callback, "ReactiveBucketCallback must not be null!"); + + return Mono.defer(this::doGetBucket).flatMapMany(callback::doInBucket); + } + + protected Mono doGetBucket() { + return dbFactory.getMongoDatabase() + .map(db -> bucket == null ? GridFSBuckets.create(db) : GridFSBuckets.create(db, bucket)); + } + + interface ReactiveBucketCallback { + Publisher doInBucket(GridFSBucket bucket); + } + + @RequiredArgsConstructor + private static class FindCallback implements ReactiveBucketCallback { + + private final Query query; + private final Document queryObject; + private final Document sortObject; + + public GridFSFindPublisher doInBucket(GridFSBucket bucket) { + GridFSFindPublisher findPublisher = bucket.find(queryObject).sort(sortObject); + if (query.getLimit() > 0) { + findPublisher = findPublisher.limit(query.getLimit()); + } + if (query.getSkip() > 0) { + findPublisher = findPublisher.skip(Math.toIntExact(query.getSkip())); + } + Integer cursorBatchSize = query.getMeta().getCursorBatchSize(); + if (cursorBatchSize != null) { + findPublisher = findPublisher.batchSize(cursorBatchSize); + } + return findPublisher; + } + } + + private static class FindLimitCallback extends FindCallback { + + private final int limit; + + public FindLimitCallback(Query query, Document queryObject, Document sortObject, int limit) { + super(query, queryObject, sortObject); + this.limit = limit; } - if (query.getSkip() > 0) { - publisherToUse = publisherToUse.skip(Math.toIntExact(query.getSkip())); + @Override + public GridFSFindPublisher doInBucket(GridFSBucket bucket) { + return super.doInBucket(bucket).limit(limit); } + } + + @RequiredArgsConstructor + private static class UploadCallback implements ReactiveBucketCallback { - Integer cursorBatchSize = query.getMeta().getCursorBatchSize(); - if (cursorBatchSize != null) { - publisherToUse = publisherToUse.batchSize(cursorBatchSize); + private final BsonValue fileId; + private final String filename; + private final Publisher source; + private final GridFSUploadOptions uploadOptions; + + @Override + public GridFSUploadPublisher doInBucket(GridFSBucket bucket) { + return bucket.uploadFromPublisher(fileId, filename, source, uploadOptions); } + } + + @RequiredArgsConstructor + private static class AutoIdCreatingUploadCallback implements ReactiveBucketCallback { - return publisherToUse; + private final String filename; + private final Publisher source; + private final GridFSUploadOptions uploadOptions; + + @Override + public GridFSUploadPublisher doInBucket(GridFSBucket bucket) { + return bucket.uploadFromPublisher(filename, source, uploadOptions); + } } - protected GridFSBucket getGridFs() { + @RequiredArgsConstructor + private static class DeleteCallback implements ReactiveBucketCallback { + + private final BsonValue id; - MongoDatabase db = dbFactory.getMongoDatabase(); - return bucket == null ? GridFSBuckets.create(db) : GridFSBuckets.create(db, bucket); + @Override + public Publisher doInBucket(GridFSBucket bucket) { + return bucket.delete(id); + } } + } diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveMongoDatabaseUtilsUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveMongoDatabaseUtilsUnitTests.java index f05e1c676..39b9c4a90 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveMongoDatabaseUtilsUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveMongoDatabaseUtilsUnitTests.java @@ -40,6 +40,7 @@ import com.mongodb.session.ServerSession; * * @author Mark Paluch * @author Christoph Strobl + * @author Mathieu Ouellet */ @ExtendWith(MockitoExtension.class) class ReactiveMongoDatabaseUtilsUnitTests { @@ -90,7 +91,7 @@ class ReactiveMongoDatabaseUtilsUnitTests { @Test // DATAMONGO-2265 void shouldNotStartSessionWhenNoTransactionOngoing() { - when(databaseFactory.getMongoDatabase()).thenReturn(db); + when(databaseFactory.getMongoDatabase()).thenReturn(Mono.just(db)); ReactiveMongoDatabaseUtils.getDatabase(databaseFactory, SessionSynchronization.ON_ACTUAL_TRANSACTION) // .as(StepVerifier::create) // diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveMongoTransactionManagerUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveMongoTransactionManagerUnitTests.java index d04db2529..8563d56a8 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveMongoTransactionManagerUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveMongoTransactionManagerUnitTests.java @@ -40,6 +40,7 @@ import com.mongodb.session.ServerSession; * * @author Mark Paluch * @author Christoph Strobl + * @author Mathieu Ouellet */ @ExtendWith(MockitoExtension.class) class ReactiveMongoTransactionManagerUnitTests { @@ -56,7 +57,7 @@ class ReactiveMongoTransactionManagerUnitTests { void setUp() { when(databaseFactory.getSession(any())).thenReturn(Mono.just(session), Mono.just(session2)); when(databaseFactory.withSession(session)).thenReturn(databaseFactory); - when(databaseFactory.getMongoDatabase()).thenReturn(db); + when(databaseFactory.getMongoDatabase()).thenReturn(Mono.just(db)); when(session.getServerSession()).thenReturn(serverSession); } @@ -181,7 +182,7 @@ class ReactiveMongoTransactionManagerUnitTests { void suspendTransactionWhilePropagationRequiresNew() { when(databaseFactory.withSession(session2)).thenReturn(databaseFactory2); - when(databaseFactory2.getMongoDatabase()).thenReturn(db2); + when(databaseFactory2.getMongoDatabase()).thenReturn(Mono.just(db2)); when(session2.getServerSession()).thenReturn(serverSession); ReactiveMongoTransactionManager txManager = new ReactiveMongoTransactionManager(databaseFactory); diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultReactiveIndexOperationsTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultReactiveIndexOperationsTests.java index 4fe2888be..7c1041483 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultReactiveIndexOperationsTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultReactiveIndexOperationsTests.java @@ -44,6 +44,7 @@ import com.mongodb.reactivestreams.client.MongoCollection; /** * @author Christoph Strobl * @author Mark Paluch + * @author Mathieu Ouellet */ @ExtendWith(MongoTemplateExtension.class) public class DefaultReactiveIndexOperationsTests { @@ -53,13 +54,14 @@ public class DefaultReactiveIndexOperationsTests { String collectionName = template.getCollectionName(DefaultIndexOperationsIntegrationTestsSample.class); - MongoCollection collection = template.getCollection(collectionName); DefaultReactiveIndexOperations indexOps = new DefaultReactiveIndexOperations(template, collectionName, new QueryMapper(template.getConverter())); @BeforeEach public void setUp() { - StepVerifier.create(this.collection.dropIndexes()).verifyComplete(); + template.getCollection(collectionName).flatMapMany(MongoCollection::dropIndexes) // + .as(StepVerifier::create) // + .verifyComplete(); } @Test // DATAMONGO-1518 diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultReactiveIndexOperationsUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultReactiveIndexOperationsUnitTests.java index 2121f6f30..e358689d5 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultReactiveIndexOperationsUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultReactiveIndexOperationsUnitTests.java @@ -19,6 +19,7 @@ import static org.assertj.core.api.Assertions.*; import static org.mockito.Mockito.*; import lombok.Data; +import reactor.core.publisher.Mono; import org.bson.Document; import org.junit.jupiter.api.BeforeEach; @@ -45,6 +46,7 @@ import com.mongodb.reactivestreams.client.MongoDatabase; /** * @author Christoph Strobl + * @author Mathieu Ouellet */ @ExtendWith(MockitoExtension.class) public class DefaultReactiveIndexOperationsUnitTests { @@ -63,7 +65,7 @@ public class DefaultReactiveIndexOperationsUnitTests { @BeforeEach void setUp() { - when(factory.getMongoDatabase()).thenReturn(db); + when(factory.getMongoDatabase()).thenReturn(Mono.just(db)); when(factory.getExceptionTranslator()).thenReturn(exceptionTranslator); when(db.getCollection(any(), any(Class.class))).thenReturn(collection); when(collection.createIndex(any(), any(IndexOptions.class))).thenReturn(publisher); diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateIndexTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateIndexTests.java index 363f167a0..636dd05b1 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateIndexTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateIndexTests.java @@ -48,14 +48,15 @@ import org.springframework.data.mongodb.test.util.MongoClientExtension; import org.springframework.data.mongodb.test.util.MongoTestUtils; import com.mongodb.client.model.IndexOptions; -import com.mongodb.reactivestreams.client.ListIndexesPublisher; import com.mongodb.reactivestreams.client.MongoClient; +import com.mongodb.reactivestreams.client.MongoCollection; /** * Integration test for index creation via {@link ReactiveMongoTemplate}. * * @author Mark Paluch * @author Christoph Strobl + * @author Mathieu Ouellet */ @ExtendWith(MongoClientExtension.class) public class ReactiveMongoTemplateIndexTests { @@ -73,10 +74,9 @@ public class ReactiveMongoTemplateIndexTests { mappingContext.setAutoIndexCreation(true); template = new ReactiveMongoTemplate(factory, new MappingMongoConverter(NoOpDbRefResolver.INSTANCE, mappingContext)); - - MongoTestUtils.dropCollectionNow(template.getMongoDatabase().getName(), "person", client); - MongoTestUtils.dropCollectionNow(template.getMongoDatabase().getName(), "indexfail", client); - MongoTestUtils.dropCollectionNow(template.getMongoDatabase().getName(), "indexedSample", client); + MongoTestUtils.dropCollectionNow("reactive-template-index-tests", "person", client); + MongoTestUtils.dropCollectionNow("reactive-template-index-tests", "indexfail", client); + MongoTestUtils.dropCollectionNow("reactive-template-index-tests", "indexedSample", client); } @AfterEach @@ -99,7 +99,8 @@ public class ReactiveMongoTemplateIndexTests { .expectNextCount(1) // .verifyComplete(); - Flux.from(template.getCollection(template.getCollectionName(Person.class)).listIndexes()).collectList() // + template.getCollection(template.getCollectionName(Person.class)).flatMapMany(MongoCollection::listIndexes) + .collectList() // .as(StepVerifier::create) // .consumeNextWith(indexInfo -> { @@ -161,16 +162,15 @@ public class ReactiveMongoTemplateIndexTests { .as(StepVerifier::create) // .verifyComplete(); - Flux.from(factory.getMongoDatabase().getCollection(template.getCollectionName(Person.class)) - .createIndex(new Document("age", -1), new IndexOptions().unique(true).sparse(true))) // + factory.getMongoDatabase() // + .flatMapMany(db -> db.getCollection(template.getCollectionName(Person.class)) + .createIndex(new Document("age", -1), new IndexOptions().unique(true).sparse(true))) .as(StepVerifier::create) // .expectNextCount(1) // .verifyComplete(); - ListIndexesPublisher listIndexesPublisher = template - .getCollection(template.getCollectionName(Person.class)).listIndexes(); - - Flux.from(listIndexesPublisher).collectList() // + template.getCollection(template.getCollectionName(Person.class)).flatMapMany(MongoCollection::listIndexes) + .collectList() // .as(StepVerifier::create) // .consumeNextWith(indexInfos -> { @@ -205,7 +205,9 @@ public class ReactiveMongoTemplateIndexTests { @RepeatFailedTest(3) void shouldCreateIndexOnAccess() { - StepVerifier.create(template.getCollection("indexedSample").listIndexes(Document.class)).expectNextCount(0) + template.getCollection("indexedSample").flatMapMany(it -> it.listIndexes(Document.class)) // + .as(StepVerifier::create) // + .expectNextCount(0) // .verifyComplete(); template.findAll(IndexedSample.class).defaultIfEmpty(new IndexedSample()) // @@ -214,7 +216,9 @@ public class ReactiveMongoTemplateIndexTests { .as(StepVerifier::create) // .verifyComplete(); - StepVerifier.create(template.getCollection("indexedSample").listIndexes(Document.class)).expectNextCount(2) + template.getCollection("indexedSample").flatMapMany(it -> it.listIndexes(Document.class)) // + .as(StepVerifier::create) // + .expectNextCount(2) // .verifyComplete(); } @@ -222,8 +226,9 @@ public class ReactiveMongoTemplateIndexTests { @RepeatFailedTest(3) void indexCreationShouldFail() throws InterruptedException { - Flux.from(factory.getMongoDatabase().getCollection("indexfail") // - .createIndex(new Document("field", 1), new IndexOptions().name("foo").unique(true).sparse(true))) + factory.getMongoDatabase() // + .flatMapMany(db -> db.getCollection("indexfail") // + .createIndex(new Document("field", 1), new IndexOptions().name("foo").unique(true).sparse(true))) .as(StepVerifier::create) // .expectNextCount(1) // .verifyComplete(); diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTransactionTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTransactionTests.java index a92675dad..86ba2bb64 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTransactionTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTransactionTests.java @@ -47,6 +47,7 @@ import com.mongodb.reactivestreams.client.MongoClient; * * @author Christoph Strobl * @author Mark Paluch + * @author Mathieu Ouellet * @currentRead The Core - Peter V. Brett */ @ExtendWith(MongoClientExtension.class) @@ -186,10 +187,10 @@ public class ReactiveMongoTemplateTransactionTests { public void changesNotVisibleOutsideTransaction() { template.inTransaction().execute(action -> { - return action.remove(ID_QUERY, Document.class, COLLECTION_NAME).flatMap(val -> { + return action.remove(ID_QUERY, Document.class, COLLECTION_NAME).flatMapMany(val -> { // once we use the collection directly we're no longer participating in the tx - return Mono.from(template.getCollection(COLLECTION_NAME).find(ID_QUERY.getQueryObject())); + return template.getCollection(COLLECTION_NAME).flatMapMany(it -> it.find(ID_QUERY.getQueryObject())); }); }).as(StepVerifier::create).expectNext(DOCUMENT).verifyComplete(); diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java index 506a4e9e2..1dac146e1 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java @@ -116,6 +116,7 @@ import com.mongodb.reactivestreams.client.MongoDatabase; * @author Mark Paluch * @author Christoph Strobl * @author Roman Puchkovskiy + * @author Mathieu Ouellet */ @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) @@ -146,7 +147,7 @@ public class ReactiveMongoTemplateUnitTests { when(factory.getExceptionTranslator()).thenReturn(exceptionTranslator); when(factory.getCodecRegistry()).thenReturn(MongoClientSettings.getDefaultCodecRegistry()); - when(factory.getMongoDatabase()).thenReturn(db); + when(factory.getMongoDatabase()).thenReturn(Mono.just(db)); when(db.getCollection(any())).thenReturn(collection); when(db.getCollection(any(), any())).thenReturn(collection); when(db.runCommand(any(), any(Class.class))).thenReturn(runCommandPublisher); diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveSessionBoundMongoTemplateUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveSessionBoundMongoTemplateUnitTests.java index 183772b43..e8e3966d7 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveSessionBoundMongoTemplateUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveSessionBoundMongoTemplateUnitTests.java @@ -69,6 +69,7 @@ import com.mongodb.reactivestreams.client.MongoDatabase; * * @author Christoph Strobl * @author Mark Paluch + * @author Mathieu Ouellet */ @SuppressWarnings("unchecked") @RunWith(MockitoJUnitRunner.Silent.class) @@ -302,12 +303,13 @@ public class ReactiveSessionBoundMongoTemplateUnitTests { @Test // DATAMONGO-1880 public void getCollectionShouldShouldJustReturnTheCollection/*No ClientSession binding*/() { - assertThat(template.getCollection(COLLECTION_NAME)).isNotInstanceOf(Proxy.class); + assertThat(template.getCollection(COLLECTION_NAME).block()).isNotInstanceOf(Proxy.class) + .isInstanceOf(MongoCollection.class); } @Test // DATAMONGO-1880 public void getDbShouldJustReturnTheDatabase/*No ClientSession binding*/() { - assertThat(template.getMongoDatabase()).isNotInstanceOf(Proxy.class); + assertThat(template.getMongoDatabase().block()).isNotInstanceOf(Proxy.class).isInstanceOf(MongoDatabase.class); } @Test // DATAMONGO-1880 diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/SimpleReactiveMongoDatabaseFactoryUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/SimpleReactiveMongoDatabaseFactoryUnitTests.java index feaa9890c..ff9aca1d0 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/SimpleReactiveMongoDatabaseFactoryUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/SimpleReactiveMongoDatabaseFactoryUnitTests.java @@ -38,6 +38,7 @@ import com.mongodb.reactivestreams.client.MongoDatabase; * Unit tests for {@link SimpleReactiveMongoDatabaseFactory}. * * @author Mark Paluch + * @author Mathieu Ouellet */ @ExtendWith(MockitoExtension.class) class SimpleReactiveMongoDatabaseFactoryUnitTests { @@ -54,7 +55,7 @@ class SimpleReactiveMongoDatabaseFactoryUnitTests { ReactiveMongoDatabaseFactory factory = new SimpleReactiveMongoDatabaseFactory(mongoClient, "foo"); ReactiveMongoDatabaseFactory wrapped = factory.withSession(clientSession).withSession(clientSession); - InvocationHandler invocationHandler = Proxy.getInvocationHandler(wrapped.getMongoDatabase()); + InvocationHandler invocationHandler = Proxy.getInvocationHandler(wrapped.getMongoDatabase().block()); Object singletonTarget = AopProxyUtils .getSingletonTarget(ReflectionTestUtils.getField(invocationHandler, "advised")); diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/index/ReactiveMongoPersistentEntityIndexCreatorUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/index/ReactiveMongoPersistentEntityIndexCreatorUnitTests.java index 1a4e12b29..71c52002c 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/index/ReactiveMongoPersistentEntityIndexCreatorUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/index/ReactiveMongoPersistentEntityIndexCreatorUnitTests.java @@ -50,6 +50,7 @@ import com.mongodb.reactivestreams.client.MongoDatabase; * Unit tests for {@link ReactiveMongoPersistentEntityIndexCreator}. * * @author Mark Paluch + * @author Mathieu Ouellet */ @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) @@ -70,7 +71,7 @@ public class ReactiveMongoPersistentEntityIndexCreatorUnitTests { void setUp() { when(factory.getExceptionTranslator()).thenReturn(new MongoExceptionTranslator()); - when(factory.getMongoDatabase()).thenReturn(db); + when(factory.getMongoDatabase()).thenReturn(Mono.just(db)); when(db.getCollection(any(), any(Class.class))).thenReturn(collection); indexOperations = new ReactiveMongoTemplate(factory).indexOps("foo"); diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/mapreduce/ReactiveMapReduceTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/mapreduce/ReactiveMapReduceTests.java index 8f02df5d0..c89bc7eb4 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/mapreduce/ReactiveMapReduceTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/mapreduce/ReactiveMapReduceTests.java @@ -40,10 +40,12 @@ import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner; import com.mongodb.reactivestreams.client.MongoCollection; +import com.mongodb.reactivestreams.client.MongoDatabase; /** * @author Christoph Strobl * @author Mark Paluch + * @author Mathieu Ouellet * @currentRead Beyond the Shadows - Brent Weeks */ @RunWith(SpringRunner.class) @@ -62,7 +64,8 @@ public class ReactiveMapReduceTests { template.dropCollection(ValueObject.class) // .mergeWith(template.dropCollection("jmr1")) // .mergeWith(template.dropCollection("jmr1_out")) // - .mergeWith(Mono.from(factory.getMongoDatabase("reactive-jrm1-out-db").drop()).then()).as(StepVerifier::create) // + .mergeWith(factory.getMongoDatabase("reactive-jrm1-out-db").map(MongoDatabase::drop).then()) // + .as(StepVerifier::create) // .verifyComplete(); } @@ -144,7 +147,7 @@ public class ReactiveMapReduceTests { MapReduceOptions.options().outputDatabase("reactive-jrm1-out-db").outputCollection("jmr1_out")) .as(StepVerifier::create).expectNextCount(4).verifyComplete(); - Flux.from(factory.getMongoDatabase("reactive-jrm1-out-db").listCollectionNames()).buffer(10) + factory.getMongoDatabase("reactive-jrm1-out-db").flatMapMany(MongoDatabase::listCollectionNames).buffer(10) .map(list -> list.contains("jmr1_out")).as(StepVerifier::create).expectNext(true).verifyComplete(); } @@ -190,11 +193,11 @@ public class ReactiveMapReduceTests { private void createMapReduceData() { - MongoCollection collection = factory.getMongoDatabase().getCollection("jmr1", Document.class); - - StepVerifier - .create(collection.insertMany(Arrays.asList(new Document("x", Arrays.asList("a", "b")), - new Document("x", Arrays.asList("b", "c")), new Document("x", Arrays.asList("c", "d"))))) + factory.getMongoDatabase() + .flatMapMany(db -> db.getCollection("jmr1", Document.class) + .insertMany(Arrays.asList(new Document("x", Arrays.asList("a", "b")), + new Document("x", Arrays.asList("b", "c")), new Document("x", Arrays.asList("c", "d"))))) + .as(StepVerifier::create) // .expectNextCount(1) // .verifyComplete(); } diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/test/util/ReactiveMongoTestTemplate.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/test/util/ReactiveMongoTestTemplate.java index 290b78c2d..05240b20d 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/test/util/ReactiveMongoTestTemplate.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/test/util/ReactiveMongoTestTemplate.java @@ -31,11 +31,14 @@ import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory; import org.springframework.data.mongodb.core.ReactiveMongoTemplate; import com.mongodb.reactivestreams.client.MongoClient; +import com.mongodb.reactivestreams.client.MongoCollection; +import com.mongodb.reactivestreams.client.MongoDatabase; /** * A {@link ReactiveMongoTemplate} with configuration hooks and extension suitable for tests. * * @author Christoph Strobl + * @author Mathieu Ouellet * @since 3.0 */ public class ReactiveMongoTestTemplate extends ReactiveMongoTemplate { @@ -96,7 +99,7 @@ public class ReactiveMongoTestTemplate extends ReactiveMongoTemplate { } public Mono flushDatabase() { - return flush(getMongoDatabase().listCollectionNames()); + return flush(getMongoDatabase().flatMapMany(MongoDatabase::listCollectionNames)); } public Mono flush(Class... entities) { @@ -110,8 +113,8 @@ public class ReactiveMongoTestTemplate extends ReactiveMongoTemplate { public Mono flush(Publisher collectionNames) { return Flux.from(collectionNames) - .flatMap(collection -> Mono.from(getCollection(collection).deleteMany(new Document())).then() - .onErrorResume(it -> Mono.from(getCollection(collection).drop()).then())) + .flatMap(collection -> getCollection(collection).flatMapMany(it -> it.deleteMany(new Document())).then() + .onErrorResume(it -> getCollection(collection).flatMapMany(MongoCollection::drop).then())) .then(); } @@ -130,18 +133,15 @@ public class ReactiveMongoTestTemplate extends ReactiveMongoTemplate { } public Mono dropDatabase() { - return Mono.from(getMongoDatabase().drop()).then(); + return getMongoDatabase().map(MongoDatabase::drop).then(); } - public void dropIndexes(String... collections) { - for (String collection : collections) { - getCollection(collection).dropIndexes(); - } + public Mono dropIndexes(String... collections) { + return Flux.fromArray(collections).flatMap(it -> getCollection(it).map(MongoCollection::dropIndexes).then()).then(); } - public void dropIndexes(Class... entities) { - for (Class entity : entities) { - getCollection(getCollectionName(entity)).dropIndexes(); - } + public Mono dropIndexes(Class... entities) { + return Flux.fromArray(entities) + .flatMap(it -> getCollection(getCollectionName(it)).map(MongoCollection::dropIndexes).then()).then(); } }