Browse Source

DATAMONGO-2505 - Deferred Database retrieval from ReactiveMongoDatabaseFactory.

Change ReactiveMongoDatabaseFactory#getMongoDatabase() methods return Mono to enable access to the subscriber context.

Original pull request: #854.
pull/862/head
Mathieu Ouellet 6 years ago committed by Mark Paluch
parent
commit
8cfbd39c7e
No known key found for this signature in database
GPG Key ID: 51A00FA751B91849
  1. 10
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/ReactiveMongoDatabaseFactory.java
  2. 12
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/ReactiveMongoDatabaseUtils.java
  3. 3
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java
  4. 50
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java
  5. 36
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/SimpleReactiveMongoDatabaseFactory.java
  6. 162
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java
  7. 3
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveMongoDatabaseUtilsUnitTests.java
  8. 5
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveMongoTransactionManagerUnitTests.java
  9. 6
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultReactiveIndexOperationsTests.java
  10. 4
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultReactiveIndexOperationsUnitTests.java
  11. 37
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateIndexTests.java
  12. 5
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTransactionTests.java
  13. 3
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java
  14. 6
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveSessionBoundMongoTemplateUnitTests.java
  15. 3
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/SimpleReactiveMongoDatabaseFactoryUnitTests.java
  16. 3
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/index/ReactiveMongoPersistentEntityIndexCreatorUnitTests.java
  17. 17
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/mapreduce/ReactiveMapReduceTests.java
  18. 24
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/test/util/ReactiveMongoTestTemplate.java

10
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/ReactiveMongoDatabaseFactory.java

@ -31,6 +31,7 @@ import com.mongodb.reactivestreams.client.MongoDatabase; @@ -31,6 +31,7 @@ import com.mongodb.reactivestreams.client.MongoDatabase;
*
* @author Mark Paluch
* @author Christoph Strobl
* @author Mathieu Ouellet
* @since 2.0
*/
public interface ReactiveMongoDatabaseFactory extends CodecRegistryProvider {
@ -41,7 +42,7 @@ public interface ReactiveMongoDatabaseFactory extends CodecRegistryProvider { @@ -41,7 +42,7 @@ public interface ReactiveMongoDatabaseFactory extends CodecRegistryProvider {
* @return
* @throws DataAccessException
*/
MongoDatabase getMongoDatabase() throws DataAccessException;
Mono<MongoDatabase> getMongoDatabase() throws DataAccessException;
/**
* Obtain a {@link MongoDatabase} instance to access the database with the given name.
@ -50,7 +51,7 @@ public interface ReactiveMongoDatabaseFactory extends CodecRegistryProvider { @@ -50,7 +51,7 @@ public interface ReactiveMongoDatabaseFactory extends CodecRegistryProvider {
* @return
* @throws DataAccessException
*/
MongoDatabase getMongoDatabase(String dbName) throws DataAccessException;
Mono<MongoDatabase> getMongoDatabase(String dbName) throws DataAccessException;
/**
* Exposes a shared {@link MongoExceptionTranslator}.
@ -64,10 +65,7 @@ public interface ReactiveMongoDatabaseFactory extends CodecRegistryProvider { @@ -64,10 +65,7 @@ public interface ReactiveMongoDatabaseFactory extends CodecRegistryProvider {
*
* @return never {@literal null}.
*/
@Override
default CodecRegistry getCodecRegistry() {
return getMongoDatabase().getCodecRegistry();
}
CodecRegistry getCodecRegistry();
/**
* Obtain a {@link Mono} emitting a {@link ClientSession} for given {@link ClientSessionOptions options}.

12
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/ReactiveMongoDatabaseUtils.java

@ -41,6 +41,7 @@ import com.mongodb.reactivestreams.client.MongoDatabase; @@ -41,6 +41,7 @@ import com.mongodb.reactivestreams.client.MongoDatabase;
*
* @author Mark Paluch
* @author Christoph Strobl
* @author Mathieu Ouellet
* @since 2.2
*/
public class ReactiveMongoDatabaseUtils {
@ -142,14 +143,13 @@ public class ReactiveMongoDatabaseUtils { @@ -142,14 +143,13 @@ public class ReactiveMongoDatabaseUtils {
.flatMap(synchronizationManager -> {
return doGetSession(synchronizationManager, factory, sessionSynchronization) //
.map(it -> getMongoDatabaseOrDefault(dbName, factory.withSession(it)));
})
.onErrorResume(NoTransactionException.class,
e -> Mono.fromSupplier(() -> getMongoDatabaseOrDefault(dbName, factory)))
.defaultIfEmpty(getMongoDatabaseOrDefault(dbName, factory));
.flatMap(it -> getMongoDatabaseOrDefault(dbName, factory.withSession(it)));
}) //
.onErrorResume(NoTransactionException.class, e -> getMongoDatabaseOrDefault(dbName, factory))
.switchIfEmpty(getMongoDatabaseOrDefault(dbName, factory));
}
private static MongoDatabase getMongoDatabaseOrDefault(@Nullable String dbName,
private static Mono<MongoDatabase> getMongoDatabaseOrDefault(@Nullable String dbName,
ReactiveMongoDatabaseFactory factory) {
return StringUtils.hasText(dbName) ? factory.getMongoDatabase(dbName) : factory.getMongoDatabase();
}

3
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java

@ -66,6 +66,7 @@ import com.mongodb.reactivestreams.client.MongoCollection; @@ -66,6 +66,7 @@ import com.mongodb.reactivestreams.client.MongoCollection;
*
* @author Mark Paluch
* @author Christoph Strobl
* @author Mathieu Ouellet
* @since 2.0
* @see Flux
* @see Mono
@ -298,7 +299,7 @@ public interface ReactiveMongoOperations extends ReactiveFluentMongoOperations { @@ -298,7 +299,7 @@ public interface ReactiveMongoOperations extends ReactiveFluentMongoOperations {
* @param collectionName name of the collection.
* @return an existing collection or one created on first server interaction.
*/
MongoCollection<Document> getCollection(String collectionName);
Mono<MongoCollection<Document>> getCollection(String collectionName);
/**
* Check to see if a collection with a name indicated by the entity class exists.

50
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java

@ -156,6 +156,7 @@ import com.mongodb.reactivestreams.client.MongoDatabase; @@ -156,6 +156,7 @@ import com.mongodb.reactivestreams.client.MongoDatabase;
* @author Mark Paluch
* @author Christoph Strobl
* @author Roman Puchkovskiy
* @author Mathieu Ouellet
* @since 2.0
*/
public class ReactiveMongoTemplate implements ReactiveMongoOperations, ApplicationContextAware {
@ -718,15 +719,11 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati @@ -718,15 +719,11 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
* (non-Javadoc)
* @see org.springframework.data.mongodb.core.ReactiveMongoOperations#getCollection(java.lang.String)
*/
public MongoCollection<Document> getCollection(String collectionName) {
public Mono<MongoCollection<Document>> getCollection(String collectionName) {
Assert.notNull(collectionName, "Collection name must not be null!");
try {
return this.mongoDatabaseFactory.getMongoDatabase().getCollection(collectionName);
} catch (RuntimeException e) {
throw potentiallyConvertRuntimeException(e, exceptionTranslator);
}
return createMono(db -> Mono.just(db.getCollection(collectionName)));
}
/*
@ -777,7 +774,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati @@ -777,7 +774,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
return createFlux(MongoDatabase::listCollectionNames);
}
public MongoDatabase getMongoDatabase() {
public Mono<MongoDatabase> getMongoDatabase() {
return mongoDatabaseFactory.getMongoDatabase();
}
@ -2074,24 +2071,25 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati @@ -2074,24 +2071,25 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
FullDocument fullDocument = ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT
: FullDocument.UPDATE_LOOKUP;
MongoDatabase db = StringUtils.hasText(database) ? mongoDatabaseFactory.getMongoDatabase(database)
: getMongoDatabase();
ChangeStreamPublisher<Document> publisher;
if (StringUtils.hasText(collectionName)) {
publisher = filter.isEmpty() ? db.getCollection(collectionName).watch(Document.class)
: db.getCollection(collectionName).watch(filter, Document.class);
return ReactiveMongoDatabaseUtils.getDatabase(database, mongoDatabaseFactory) //
.map(db -> {
ChangeStreamPublisher<Document> publisher;
if (StringUtils.hasText(collectionName)) {
publisher = filter.isEmpty() ? db.getCollection(collectionName).watch(Document.class)
: db.getCollection(collectionName).watch(filter, Document.class);
} else {
publisher = filter.isEmpty() ? db.watch(Document.class) : db.watch(filter, Document.class);
}
publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::resumeAfter).orElse(publisher);
publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation).orElse(publisher);
publisher = options.getResumeBsonTimestamp().map(publisher::startAtOperationTime).orElse(publisher);
publisher = publisher.fullDocument(options.getFullDocumentLookup().orElse(fullDocument));
} else {
publisher = filter.isEmpty() ? db.watch(Document.class) : db.watch(filter, Document.class);
}
return Flux.from(publisher).map(document -> new ChangeStreamEvent<>(document, targetType, getConverter()));
publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::resumeAfter).orElse(publisher);
publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation)
.orElse(publisher);
publisher = options.getResumeBsonTimestamp().map(publisher::startAtOperationTime).orElse(publisher);
return publisher.fullDocument(options.getFullDocumentLookup().orElse(fullDocument));
}) //
.flatMapMany(publisher -> Flux.from(publisher)
.map(document -> new ChangeStreamEvent<>(document, targetType, getConverter())));
}
List<Document> prepareFilter(ChangeStreamOptions options) {
@ -2337,7 +2335,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati @@ -2337,7 +2335,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
LOGGER.debug("Created collection [{}]", collectionName);
}
}).thenReturn(getCollection(collectionName));
}).then(getCollection(collectionName));
}
/**
@ -3350,7 +3348,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati @@ -3350,7 +3348,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
* @see org.springframework.data.mongodb.core.ReactiveMongoTemplate#getCollection(java.lang.String)
*/
@Override
public MongoCollection<Document> getCollection(String collectionName) {
public Mono<MongoCollection<Document>> getCollection(String collectionName) {
// native MongoDB objects that offer methods with ClientSession must not be proxied.
return delegate.getCollection(collectionName);
@ -3361,7 +3359,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati @@ -3361,7 +3359,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
* @see org.springframework.data.mongodb.core.ReactiveMongoTemplate#getMongoDatabase()
*/
@Override
public MongoDatabase getMongoDatabase() {
public Mono<MongoDatabase> getMongoDatabase() {
// native MongoDB objects that offer methods with ClientSession must not be proxied.
return delegate.getMongoDatabase();

36
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/SimpleReactiveMongoDatabaseFactory.java

@ -18,6 +18,7 @@ package org.springframework.data.mongodb.core; @@ -18,6 +18,7 @@ package org.springframework.data.mongodb.core;
import lombok.Value;
import reactor.core.publisher.Mono;
import org.bson.codecs.configuration.CodecRegistry;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.dao.DataAccessException;
@ -41,6 +42,7 @@ import com.mongodb.reactivestreams.client.MongoDatabase; @@ -41,6 +42,7 @@ import com.mongodb.reactivestreams.client.MongoDatabase;
*
* @author Mark Paluch
* @author Christoph Strobl
* @author Mathieu Ouellet
* @since 2.0
*/
public class SimpleReactiveMongoDatabaseFactory implements DisposableBean, ReactiveMongoDatabaseFactory {
@ -99,7 +101,7 @@ public class SimpleReactiveMongoDatabaseFactory implements DisposableBean, React @@ -99,7 +101,7 @@ public class SimpleReactiveMongoDatabaseFactory implements DisposableBean, React
* (non-Javadoc)
* @see org.springframework.data.mongodb.ReactiveMongoDbFactory#getMongoDatabase()
*/
public MongoDatabase getMongoDatabase() throws DataAccessException {
public Mono<MongoDatabase> getMongoDatabase() throws DataAccessException {
return getMongoDatabase(databaseName);
}
@ -107,12 +109,12 @@ public class SimpleReactiveMongoDatabaseFactory implements DisposableBean, React @@ -107,12 +109,12 @@ public class SimpleReactiveMongoDatabaseFactory implements DisposableBean, React
* (non-Javadoc)
* @see org.springframework.data.mongodb.ReactiveMongoDbFactory#getMongoDatabase(java.lang.String)
*/
public MongoDatabase getMongoDatabase(String dbName) throws DataAccessException {
public Mono<MongoDatabase> getMongoDatabase(String dbName) throws DataAccessException {
Assert.hasText(dbName, "Database name must not be empty.");
MongoDatabase db = mongo.getDatabase(dbName);
return writeConcern != null ? db.withWriteConcern(writeConcern) : db;
return Mono.just(mongo.getDatabase(dbName))
.map(db -> writeConcern != null ? db.withWriteConcern(writeConcern) : db);
}
/**
@ -135,6 +137,15 @@ public class SimpleReactiveMongoDatabaseFactory implements DisposableBean, React @@ -135,6 +137,15 @@ public class SimpleReactiveMongoDatabaseFactory implements DisposableBean, React
return this.exceptionTranslator;
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.ReactiveMongoDatabaseFactory#getCodecRegistry()
*/
@Override
public CodecRegistry getCodecRegistry() {
return this.mongo.getDatabase(databaseName).getCodecRegistry();
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.ReactiveMongoDbFactory#getSession(com.mongodb.ClientSessionOptions)
@ -171,8 +182,8 @@ public class SimpleReactiveMongoDatabaseFactory implements DisposableBean, React @@ -171,8 +182,8 @@ public class SimpleReactiveMongoDatabaseFactory implements DisposableBean, React
* @see org.springframework.data.mongodb.ReactiveMongoDatabaseFactory#getMongoDatabase()
*/
@Override
public MongoDatabase getMongoDatabase() throws DataAccessException {
return decorateDatabase(delegate.getMongoDatabase());
public Mono<MongoDatabase> getMongoDatabase() throws DataAccessException {
return delegate.getMongoDatabase().map(this::decorateDatabase);
}
/*
@ -180,8 +191,8 @@ public class SimpleReactiveMongoDatabaseFactory implements DisposableBean, React @@ -180,8 +191,8 @@ public class SimpleReactiveMongoDatabaseFactory implements DisposableBean, React
* @see org.springframework.data.mongodb.ReactiveMongoDatabaseFactory#getMongoDatabase(java.lang.String)
*/
@Override
public MongoDatabase getMongoDatabase(String dbName) throws DataAccessException {
return decorateDatabase(delegate.getMongoDatabase(dbName));
public Mono<MongoDatabase> getMongoDatabase(String dbName) throws DataAccessException {
return delegate.getMongoDatabase(dbName).map(this::decorateDatabase);
}
/*
@ -193,6 +204,15 @@ public class SimpleReactiveMongoDatabaseFactory implements DisposableBean, React @@ -193,6 +204,15 @@ public class SimpleReactiveMongoDatabaseFactory implements DisposableBean, React
return delegate.getExceptionTranslator();
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.ReactiveMongoDatabaseFactory#getCodecRegistry()
*/
@Override
public CodecRegistry getCodecRegistry() {
return delegate.getCodecRegistry();
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.ReactiveMongoDatabaseFactory#getSession(com.mongodb.ClientSessionOptions)

162
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java

@ -18,9 +18,13 @@ package org.springframework.data.mongodb.gridfs; @@ -18,9 +18,13 @@ package org.springframework.data.mongodb.gridfs;
import static org.springframework.data.mongodb.core.query.Query.*;
import static org.springframework.data.mongodb.gridfs.GridFsCriteria.*;
import lombok.RequiredArgsConstructor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.types.ObjectId;
import org.reactivestreams.Publisher;
@ -39,7 +43,6 @@ import org.springframework.util.StringUtils; @@ -39,7 +43,6 @@ import org.springframework.util.StringUtils;
import com.mongodb.client.gridfs.model.GridFSFile;
import com.mongodb.client.gridfs.model.GridFSUploadOptions;
import com.mongodb.reactivestreams.client.MongoDatabase;
import com.mongodb.reactivestreams.client.gridfs.GridFSBucket;
import com.mongodb.reactivestreams.client.gridfs.GridFSBuckets;
import com.mongodb.reactivestreams.client.gridfs.GridFSFindPublisher;
@ -53,6 +56,7 @@ import com.mongodb.reactivestreams.client.gridfs.GridFSUploadPublisher; @@ -53,6 +56,7 @@ import com.mongodb.reactivestreams.client.gridfs.GridFSUploadPublisher;
* @author Nick Stolwijk
* @author Denis Zavedeev
* @author Christoph Strobl
* @author Mathieu Ouellet
* @since 2.2
*/
public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements ReactiveGridFsOperations {
@ -130,18 +134,15 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R @@ -130,18 +134,15 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R
uploadOptions.chunkSizeBytes(upload.getOptions().getChunkSize());
}
if (upload.getFileId() == null) {
GridFSUploadPublisher<ObjectId> publisher = getGridFs().uploadFromPublisher(upload.getFilename(),
Flux.from(upload.getContent()).map(DataBuffer::asByteBuffer), uploadOptions);
return (Mono<T>) Mono.from(publisher);
String filename = upload.getFilename();
Flux<ByteBuffer> source = Flux.from(upload.getContent()).map(DataBuffer::asByteBuffer);
T fileId = upload.getFileId();
if (fileId == null) {
return (Mono<T>) createMono(new AutoIdCreatingUploadCallback(filename, source, uploadOptions));
}
GridFSUploadPublisher<Void> publisher = getGridFs().uploadFromPublisher(
BsonUtils.simpleToBsonValue(upload.getFileId()), upload.getFilename(),
Flux.from(upload.getContent()).map(DataBuffer::asByteBuffer), uploadOptions);
return Mono.from(publisher).then(Mono.just(upload.getFileId()));
UploadCallback callback = new UploadCallback(BsonUtils.simpleToBsonValue(fileId), filename, source, uploadOptions);
return createMono(callback).then(Mono.just(fileId));
}
/*
@ -150,7 +151,11 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R @@ -150,7 +151,11 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R
*/
@Override
public Flux<GridFSFile> find(Query query) {
return Flux.from(prepareQuery(query));
Document queryObject = getMappedQuery(query.getQueryObject());
Document sortObject = getMappedQuery(query.getSortObject());
return createFlux(new FindCallback(query, queryObject, sortObject));
}
/*
@ -160,7 +165,10 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R @@ -160,7 +165,10 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R
@Override
public Mono<GridFSFile> findOne(Query query) {
return Flux.from(prepareQuery(query).limit(2)) //
Document queryObject = getMappedQuery(query.getQueryObject());
Document sortObject = getMappedQuery(query.getSortObject());
return createFlux(new FindLimitCallback(query, queryObject, sortObject, 2)) //
.collectList() //
.flatMap(it -> {
if (it.isEmpty()) {
@ -182,7 +190,11 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R @@ -182,7 +190,11 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R
*/
@Override
public Mono<GridFSFile> findFirst(Query query) {
return Flux.from(prepareQuery(query).limit(1)).next();
Document queryObject = getMappedQuery(query.getQueryObject());
Document sortObject = getMappedQuery(query.getSortObject());
return createFlux(new FindLimitCallback(query, queryObject, sortObject, 1)).next();
}
/*
@ -191,7 +203,7 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R @@ -191,7 +203,7 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R
*/
@Override
public Mono<Void> delete(Query query) {
return find(query).flatMap(it -> getGridFs().delete(it.getId())).then();
return find(query).flatMap(it -> createMono(new DeleteCallback(it.getId()))).then();
}
/*
@ -216,9 +228,8 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R @@ -216,9 +228,8 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R
Assert.notNull(file, "GridFSFile must not be null!");
return Mono.fromSupplier(() -> {
return new ReactiveGridFsResource(file, getGridFs().downloadToPublisher(file.getId()), dataBufferFactory);
});
return this.doGetBucket()
.map(it -> new ReactiveGridFsResource(file, it.downloadToPublisher(file.getId()), dataBufferFactory));
}
/*
@ -243,34 +254,117 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R @@ -243,34 +254,117 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R
return getResource(locationPattern).flux();
}
protected GridFSFindPublisher prepareQuery(Query query) {
/**
* Create a reusable Mono for a {@link ReactiveBucketCallback}. It's up to the developer to choose to obtain a new
* {@link Flux} or to reuse the {@link Flux}.
*
* @param callback must not be {@literal null}
* @return a {@link Mono} wrapping the {@link ReactiveBucketCallback}.
*/
public <T> Mono<T> createMono(ReactiveBucketCallback<T> callback) {
Assert.notNull(query, "Query must not be null!");
Assert.notNull(callback, "ReactiveBucketCallback must not be null!");
Document queryObject = getMappedQuery(query.getQueryObject());
Document sortObject = getMappedQuery(query.getSortObject());
return Mono.defer(this::doGetBucket).flatMap(bucket -> Mono.from(callback.doInBucket(bucket)));
}
GridFSFindPublisher publisherToUse = getGridFs().find(queryObject).sort(sortObject);
/**
* Create a reusable Flux for a {@link ReactiveBucketCallback}. It's up to the developer to choose to obtain a new
* {@link Flux} or to reuse the {@link Flux}.
*
* @param callback must not be {@literal null}
* @return a {@link Flux} wrapping the {@link ReactiveBucketCallback}.
*/
public <T> Flux<T> createFlux(ReactiveBucketCallback<T> callback) {
if (query.getLimit() > 0) {
publisherToUse = publisherToUse.limit(query.getLimit());
Assert.notNull(callback, "ReactiveBucketCallback must not be null!");
return Mono.defer(this::doGetBucket).flatMapMany(callback::doInBucket);
}
protected Mono<GridFSBucket> doGetBucket() {
return dbFactory.getMongoDatabase()
.map(db -> bucket == null ? GridFSBuckets.create(db) : GridFSBuckets.create(db, bucket));
}
interface ReactiveBucketCallback<T> {
Publisher<T> doInBucket(GridFSBucket bucket);
}
@RequiredArgsConstructor
private static class FindCallback implements ReactiveBucketCallback<GridFSFile> {
private final Query query;
private final Document queryObject;
private final Document sortObject;
public GridFSFindPublisher doInBucket(GridFSBucket bucket) {
GridFSFindPublisher findPublisher = bucket.find(queryObject).sort(sortObject);
if (query.getLimit() > 0) {
findPublisher = findPublisher.limit(query.getLimit());
}
if (query.getSkip() > 0) {
findPublisher = findPublisher.skip(Math.toIntExact(query.getSkip()));
}
Integer cursorBatchSize = query.getMeta().getCursorBatchSize();
if (cursorBatchSize != null) {
findPublisher = findPublisher.batchSize(cursorBatchSize);
}
return findPublisher;
}
}
private static class FindLimitCallback extends FindCallback {
private final int limit;
public FindLimitCallback(Query query, Document queryObject, Document sortObject, int limit) {
super(query, queryObject, sortObject);
this.limit = limit;
}
if (query.getSkip() > 0) {
publisherToUse = publisherToUse.skip(Math.toIntExact(query.getSkip()));
@Override
public GridFSFindPublisher doInBucket(GridFSBucket bucket) {
return super.doInBucket(bucket).limit(limit);
}
}
@RequiredArgsConstructor
private static class UploadCallback implements ReactiveBucketCallback<Void> {
Integer cursorBatchSize = query.getMeta().getCursorBatchSize();
if (cursorBatchSize != null) {
publisherToUse = publisherToUse.batchSize(cursorBatchSize);
private final BsonValue fileId;
private final String filename;
private final Publisher<ByteBuffer> source;
private final GridFSUploadOptions uploadOptions;
@Override
public GridFSUploadPublisher<Void> doInBucket(GridFSBucket bucket) {
return bucket.uploadFromPublisher(fileId, filename, source, uploadOptions);
}
}
@RequiredArgsConstructor
private static class AutoIdCreatingUploadCallback implements ReactiveBucketCallback<ObjectId> {
return publisherToUse;
private final String filename;
private final Publisher<ByteBuffer> source;
private final GridFSUploadOptions uploadOptions;
@Override
public GridFSUploadPublisher<ObjectId> doInBucket(GridFSBucket bucket) {
return bucket.uploadFromPublisher(filename, source, uploadOptions);
}
}
protected GridFSBucket getGridFs() {
@RequiredArgsConstructor
private static class DeleteCallback implements ReactiveBucketCallback<Void> {
private final BsonValue id;
MongoDatabase db = dbFactory.getMongoDatabase();
return bucket == null ? GridFSBuckets.create(db) : GridFSBuckets.create(db, bucket);
@Override
public Publisher<Void> doInBucket(GridFSBucket bucket) {
return bucket.delete(id);
}
}
}

3
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveMongoDatabaseUtilsUnitTests.java

@ -40,6 +40,7 @@ import com.mongodb.session.ServerSession; @@ -40,6 +40,7 @@ import com.mongodb.session.ServerSession;
*
* @author Mark Paluch
* @author Christoph Strobl
* @author Mathieu Ouellet
*/
@ExtendWith(MockitoExtension.class)
class ReactiveMongoDatabaseUtilsUnitTests {
@ -90,7 +91,7 @@ class ReactiveMongoDatabaseUtilsUnitTests { @@ -90,7 +91,7 @@ class ReactiveMongoDatabaseUtilsUnitTests {
@Test // DATAMONGO-2265
void shouldNotStartSessionWhenNoTransactionOngoing() {
when(databaseFactory.getMongoDatabase()).thenReturn(db);
when(databaseFactory.getMongoDatabase()).thenReturn(Mono.just(db));
ReactiveMongoDatabaseUtils.getDatabase(databaseFactory, SessionSynchronization.ON_ACTUAL_TRANSACTION) //
.as(StepVerifier::create) //

5
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveMongoTransactionManagerUnitTests.java

@ -40,6 +40,7 @@ import com.mongodb.session.ServerSession; @@ -40,6 +40,7 @@ import com.mongodb.session.ServerSession;
*
* @author Mark Paluch
* @author Christoph Strobl
* @author Mathieu Ouellet
*/
@ExtendWith(MockitoExtension.class)
class ReactiveMongoTransactionManagerUnitTests {
@ -56,7 +57,7 @@ class ReactiveMongoTransactionManagerUnitTests { @@ -56,7 +57,7 @@ class ReactiveMongoTransactionManagerUnitTests {
void setUp() {
when(databaseFactory.getSession(any())).thenReturn(Mono.just(session), Mono.just(session2));
when(databaseFactory.withSession(session)).thenReturn(databaseFactory);
when(databaseFactory.getMongoDatabase()).thenReturn(db);
when(databaseFactory.getMongoDatabase()).thenReturn(Mono.just(db));
when(session.getServerSession()).thenReturn(serverSession);
}
@ -181,7 +182,7 @@ class ReactiveMongoTransactionManagerUnitTests { @@ -181,7 +182,7 @@ class ReactiveMongoTransactionManagerUnitTests {
void suspendTransactionWhilePropagationRequiresNew() {
when(databaseFactory.withSession(session2)).thenReturn(databaseFactory2);
when(databaseFactory2.getMongoDatabase()).thenReturn(db2);
when(databaseFactory2.getMongoDatabase()).thenReturn(Mono.just(db2));
when(session2.getServerSession()).thenReturn(serverSession);
ReactiveMongoTransactionManager txManager = new ReactiveMongoTransactionManager(databaseFactory);

6
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultReactiveIndexOperationsTests.java

@ -44,6 +44,7 @@ import com.mongodb.reactivestreams.client.MongoCollection; @@ -44,6 +44,7 @@ import com.mongodb.reactivestreams.client.MongoCollection;
/**
* @author Christoph Strobl
* @author Mark Paluch
* @author Mathieu Ouellet
*/
@ExtendWith(MongoTemplateExtension.class)
public class DefaultReactiveIndexOperationsTests {
@ -53,13 +54,14 @@ public class DefaultReactiveIndexOperationsTests { @@ -53,13 +54,14 @@ public class DefaultReactiveIndexOperationsTests {
String collectionName = template.getCollectionName(DefaultIndexOperationsIntegrationTestsSample.class);
MongoCollection<Document> collection = template.getCollection(collectionName);
DefaultReactiveIndexOperations indexOps = new DefaultReactiveIndexOperations(template, collectionName,
new QueryMapper(template.getConverter()));
@BeforeEach
public void setUp() {
StepVerifier.create(this.collection.dropIndexes()).verifyComplete();
template.getCollection(collectionName).flatMapMany(MongoCollection::dropIndexes) //
.as(StepVerifier::create) //
.verifyComplete();
}
@Test // DATAMONGO-1518

4
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultReactiveIndexOperationsUnitTests.java

@ -19,6 +19,7 @@ import static org.assertj.core.api.Assertions.*; @@ -19,6 +19,7 @@ import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.*;
import lombok.Data;
import reactor.core.publisher.Mono;
import org.bson.Document;
import org.junit.jupiter.api.BeforeEach;
@ -45,6 +46,7 @@ import com.mongodb.reactivestreams.client.MongoDatabase; @@ -45,6 +46,7 @@ import com.mongodb.reactivestreams.client.MongoDatabase;
/**
* @author Christoph Strobl
* @author Mathieu Ouellet
*/
@ExtendWith(MockitoExtension.class)
public class DefaultReactiveIndexOperationsUnitTests {
@ -63,7 +65,7 @@ public class DefaultReactiveIndexOperationsUnitTests { @@ -63,7 +65,7 @@ public class DefaultReactiveIndexOperationsUnitTests {
@BeforeEach
void setUp() {
when(factory.getMongoDatabase()).thenReturn(db);
when(factory.getMongoDatabase()).thenReturn(Mono.just(db));
when(factory.getExceptionTranslator()).thenReturn(exceptionTranslator);
when(db.getCollection(any(), any(Class.class))).thenReturn(collection);
when(collection.createIndex(any(), any(IndexOptions.class))).thenReturn(publisher);

37
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateIndexTests.java

@ -48,14 +48,15 @@ import org.springframework.data.mongodb.test.util.MongoClientExtension; @@ -48,14 +48,15 @@ import org.springframework.data.mongodb.test.util.MongoClientExtension;
import org.springframework.data.mongodb.test.util.MongoTestUtils;
import com.mongodb.client.model.IndexOptions;
import com.mongodb.reactivestreams.client.ListIndexesPublisher;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoCollection;
/**
* Integration test for index creation via {@link ReactiveMongoTemplate}.
*
* @author Mark Paluch
* @author Christoph Strobl
* @author Mathieu Ouellet
*/
@ExtendWith(MongoClientExtension.class)
public class ReactiveMongoTemplateIndexTests {
@ -73,10 +74,9 @@ public class ReactiveMongoTemplateIndexTests { @@ -73,10 +74,9 @@ public class ReactiveMongoTemplateIndexTests {
mappingContext.setAutoIndexCreation(true);
template = new ReactiveMongoTemplate(factory, new MappingMongoConverter(NoOpDbRefResolver.INSTANCE, mappingContext));
MongoTestUtils.dropCollectionNow(template.getMongoDatabase().getName(), "person", client);
MongoTestUtils.dropCollectionNow(template.getMongoDatabase().getName(), "indexfail", client);
MongoTestUtils.dropCollectionNow(template.getMongoDatabase().getName(), "indexedSample", client);
MongoTestUtils.dropCollectionNow("reactive-template-index-tests", "person", client);
MongoTestUtils.dropCollectionNow("reactive-template-index-tests", "indexfail", client);
MongoTestUtils.dropCollectionNow("reactive-template-index-tests", "indexedSample", client);
}
@AfterEach
@ -99,7 +99,8 @@ public class ReactiveMongoTemplateIndexTests { @@ -99,7 +99,8 @@ public class ReactiveMongoTemplateIndexTests {
.expectNextCount(1) //
.verifyComplete();
Flux.from(template.getCollection(template.getCollectionName(Person.class)).listIndexes()).collectList() //
template.getCollection(template.getCollectionName(Person.class)).flatMapMany(MongoCollection::listIndexes)
.collectList() //
.as(StepVerifier::create) //
.consumeNextWith(indexInfo -> {
@ -161,16 +162,15 @@ public class ReactiveMongoTemplateIndexTests { @@ -161,16 +162,15 @@ public class ReactiveMongoTemplateIndexTests {
.as(StepVerifier::create) //
.verifyComplete();
Flux.from(factory.getMongoDatabase().getCollection(template.getCollectionName(Person.class))
.createIndex(new Document("age", -1), new IndexOptions().unique(true).sparse(true))) //
factory.getMongoDatabase() //
.flatMapMany(db -> db.getCollection(template.getCollectionName(Person.class))
.createIndex(new Document("age", -1), new IndexOptions().unique(true).sparse(true)))
.as(StepVerifier::create) //
.expectNextCount(1) //
.verifyComplete();
ListIndexesPublisher<Document> listIndexesPublisher = template
.getCollection(template.getCollectionName(Person.class)).listIndexes();
Flux.from(listIndexesPublisher).collectList() //
template.getCollection(template.getCollectionName(Person.class)).flatMapMany(MongoCollection::listIndexes)
.collectList() //
.as(StepVerifier::create) //
.consumeNextWith(indexInfos -> {
@ -205,7 +205,9 @@ public class ReactiveMongoTemplateIndexTests { @@ -205,7 +205,9 @@ public class ReactiveMongoTemplateIndexTests {
@RepeatFailedTest(3)
void shouldCreateIndexOnAccess() {
StepVerifier.create(template.getCollection("indexedSample").listIndexes(Document.class)).expectNextCount(0)
template.getCollection("indexedSample").flatMapMany(it -> it.listIndexes(Document.class)) //
.as(StepVerifier::create) //
.expectNextCount(0) //
.verifyComplete();
template.findAll(IndexedSample.class).defaultIfEmpty(new IndexedSample()) //
@ -214,7 +216,9 @@ public class ReactiveMongoTemplateIndexTests { @@ -214,7 +216,9 @@ public class ReactiveMongoTemplateIndexTests {
.as(StepVerifier::create) //
.verifyComplete();
StepVerifier.create(template.getCollection("indexedSample").listIndexes(Document.class)).expectNextCount(2)
template.getCollection("indexedSample").flatMapMany(it -> it.listIndexes(Document.class)) //
.as(StepVerifier::create) //
.expectNextCount(2) //
.verifyComplete();
}
@ -222,8 +226,9 @@ public class ReactiveMongoTemplateIndexTests { @@ -222,8 +226,9 @@ public class ReactiveMongoTemplateIndexTests {
@RepeatFailedTest(3)
void indexCreationShouldFail() throws InterruptedException {
Flux.from(factory.getMongoDatabase().getCollection("indexfail") //
.createIndex(new Document("field", 1), new IndexOptions().name("foo").unique(true).sparse(true)))
factory.getMongoDatabase() //
.flatMapMany(db -> db.getCollection("indexfail") //
.createIndex(new Document("field", 1), new IndexOptions().name("foo").unique(true).sparse(true)))
.as(StepVerifier::create) //
.expectNextCount(1) //
.verifyComplete();

5
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTransactionTests.java

@ -47,6 +47,7 @@ import com.mongodb.reactivestreams.client.MongoClient; @@ -47,6 +47,7 @@ import com.mongodb.reactivestreams.client.MongoClient;
*
* @author Christoph Strobl
* @author Mark Paluch
* @author Mathieu Ouellet
* @currentRead The Core - Peter V. Brett
*/
@ExtendWith(MongoClientExtension.class)
@ -186,10 +187,10 @@ public class ReactiveMongoTemplateTransactionTests { @@ -186,10 +187,10 @@ public class ReactiveMongoTemplateTransactionTests {
public void changesNotVisibleOutsideTransaction() {
template.inTransaction().execute(action -> {
return action.remove(ID_QUERY, Document.class, COLLECTION_NAME).flatMap(val -> {
return action.remove(ID_QUERY, Document.class, COLLECTION_NAME).flatMapMany(val -> {
// once we use the collection directly we're no longer participating in the tx
return Mono.from(template.getCollection(COLLECTION_NAME).find(ID_QUERY.getQueryObject()));
return template.getCollection(COLLECTION_NAME).flatMapMany(it -> it.find(ID_QUERY.getQueryObject()));
});
}).as(StepVerifier::create).expectNext(DOCUMENT).verifyComplete();

3
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java

@ -116,6 +116,7 @@ import com.mongodb.reactivestreams.client.MongoDatabase; @@ -116,6 +116,7 @@ import com.mongodb.reactivestreams.client.MongoDatabase;
* @author Mark Paluch
* @author Christoph Strobl
* @author Roman Puchkovskiy
* @author Mathieu Ouellet
*/
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
@ -146,7 +147,7 @@ public class ReactiveMongoTemplateUnitTests { @@ -146,7 +147,7 @@ public class ReactiveMongoTemplateUnitTests {
when(factory.getExceptionTranslator()).thenReturn(exceptionTranslator);
when(factory.getCodecRegistry()).thenReturn(MongoClientSettings.getDefaultCodecRegistry());
when(factory.getMongoDatabase()).thenReturn(db);
when(factory.getMongoDatabase()).thenReturn(Mono.just(db));
when(db.getCollection(any())).thenReturn(collection);
when(db.getCollection(any(), any())).thenReturn(collection);
when(db.runCommand(any(), any(Class.class))).thenReturn(runCommandPublisher);

6
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveSessionBoundMongoTemplateUnitTests.java

@ -69,6 +69,7 @@ import com.mongodb.reactivestreams.client.MongoDatabase; @@ -69,6 +69,7 @@ import com.mongodb.reactivestreams.client.MongoDatabase;
*
* @author Christoph Strobl
* @author Mark Paluch
* @author Mathieu Ouellet
*/
@SuppressWarnings("unchecked")
@RunWith(MockitoJUnitRunner.Silent.class)
@ -302,12 +303,13 @@ public class ReactiveSessionBoundMongoTemplateUnitTests { @@ -302,12 +303,13 @@ public class ReactiveSessionBoundMongoTemplateUnitTests {
@Test // DATAMONGO-1880
public void getCollectionShouldShouldJustReturnTheCollection/*No ClientSession binding*/() {
assertThat(template.getCollection(COLLECTION_NAME)).isNotInstanceOf(Proxy.class);
assertThat(template.getCollection(COLLECTION_NAME).block()).isNotInstanceOf(Proxy.class)
.isInstanceOf(MongoCollection.class);
}
@Test // DATAMONGO-1880
public void getDbShouldJustReturnTheDatabase/*No ClientSession binding*/() {
assertThat(template.getMongoDatabase()).isNotInstanceOf(Proxy.class);
assertThat(template.getMongoDatabase().block()).isNotInstanceOf(Proxy.class).isInstanceOf(MongoDatabase.class);
}
@Test // DATAMONGO-1880

3
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/SimpleReactiveMongoDatabaseFactoryUnitTests.java

@ -38,6 +38,7 @@ import com.mongodb.reactivestreams.client.MongoDatabase; @@ -38,6 +38,7 @@ import com.mongodb.reactivestreams.client.MongoDatabase;
* Unit tests for {@link SimpleReactiveMongoDatabaseFactory}.
*
* @author Mark Paluch
* @author Mathieu Ouellet
*/
@ExtendWith(MockitoExtension.class)
class SimpleReactiveMongoDatabaseFactoryUnitTests {
@ -54,7 +55,7 @@ class SimpleReactiveMongoDatabaseFactoryUnitTests { @@ -54,7 +55,7 @@ class SimpleReactiveMongoDatabaseFactoryUnitTests {
ReactiveMongoDatabaseFactory factory = new SimpleReactiveMongoDatabaseFactory(mongoClient, "foo");
ReactiveMongoDatabaseFactory wrapped = factory.withSession(clientSession).withSession(clientSession);
InvocationHandler invocationHandler = Proxy.getInvocationHandler(wrapped.getMongoDatabase());
InvocationHandler invocationHandler = Proxy.getInvocationHandler(wrapped.getMongoDatabase().block());
Object singletonTarget = AopProxyUtils
.getSingletonTarget(ReflectionTestUtils.getField(invocationHandler, "advised"));

3
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/index/ReactiveMongoPersistentEntityIndexCreatorUnitTests.java

@ -50,6 +50,7 @@ import com.mongodb.reactivestreams.client.MongoDatabase; @@ -50,6 +50,7 @@ import com.mongodb.reactivestreams.client.MongoDatabase;
* Unit tests for {@link ReactiveMongoPersistentEntityIndexCreator}.
*
* @author Mark Paluch
* @author Mathieu Ouellet
*/
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
@ -70,7 +71,7 @@ public class ReactiveMongoPersistentEntityIndexCreatorUnitTests { @@ -70,7 +71,7 @@ public class ReactiveMongoPersistentEntityIndexCreatorUnitTests {
void setUp() {
when(factory.getExceptionTranslator()).thenReturn(new MongoExceptionTranslator());
when(factory.getMongoDatabase()).thenReturn(db);
when(factory.getMongoDatabase()).thenReturn(Mono.just(db));
when(db.getCollection(any(), any(Class.class))).thenReturn(collection);
indexOperations = new ReactiveMongoTemplate(factory).indexOps("foo");

17
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/mapreduce/ReactiveMapReduceTests.java

@ -40,10 +40,12 @@ import org.springframework.test.context.ContextConfiguration; @@ -40,10 +40,12 @@ import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
/**
* @author Christoph Strobl
* @author Mark Paluch
* @author Mathieu Ouellet
* @currentRead Beyond the Shadows - Brent Weeks
*/
@RunWith(SpringRunner.class)
@ -62,7 +64,8 @@ public class ReactiveMapReduceTests { @@ -62,7 +64,8 @@ public class ReactiveMapReduceTests {
template.dropCollection(ValueObject.class) //
.mergeWith(template.dropCollection("jmr1")) //
.mergeWith(template.dropCollection("jmr1_out")) //
.mergeWith(Mono.from(factory.getMongoDatabase("reactive-jrm1-out-db").drop()).then()).as(StepVerifier::create) //
.mergeWith(factory.getMongoDatabase("reactive-jrm1-out-db").map(MongoDatabase::drop).then()) //
.as(StepVerifier::create) //
.verifyComplete();
}
@ -144,7 +147,7 @@ public class ReactiveMapReduceTests { @@ -144,7 +147,7 @@ public class ReactiveMapReduceTests {
MapReduceOptions.options().outputDatabase("reactive-jrm1-out-db").outputCollection("jmr1_out"))
.as(StepVerifier::create).expectNextCount(4).verifyComplete();
Flux.from(factory.getMongoDatabase("reactive-jrm1-out-db").listCollectionNames()).buffer(10)
factory.getMongoDatabase("reactive-jrm1-out-db").flatMapMany(MongoDatabase::listCollectionNames).buffer(10)
.map(list -> list.contains("jmr1_out")).as(StepVerifier::create).expectNext(true).verifyComplete();
}
@ -190,11 +193,11 @@ public class ReactiveMapReduceTests { @@ -190,11 +193,11 @@ public class ReactiveMapReduceTests {
private void createMapReduceData() {
MongoCollection<Document> collection = factory.getMongoDatabase().getCollection("jmr1", Document.class);
StepVerifier
.create(collection.insertMany(Arrays.asList(new Document("x", Arrays.asList("a", "b")),
new Document("x", Arrays.asList("b", "c")), new Document("x", Arrays.asList("c", "d")))))
factory.getMongoDatabase()
.flatMapMany(db -> db.getCollection("jmr1", Document.class)
.insertMany(Arrays.asList(new Document("x", Arrays.asList("a", "b")),
new Document("x", Arrays.asList("b", "c")), new Document("x", Arrays.asList("c", "d")))))
.as(StepVerifier::create) //
.expectNextCount(1) //
.verifyComplete();
}

24
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/test/util/ReactiveMongoTestTemplate.java

@ -31,11 +31,14 @@ import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory; @@ -31,11 +31,14 @@ import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
/**
* A {@link ReactiveMongoTemplate} with configuration hooks and extension suitable for tests.
*
* @author Christoph Strobl
* @author Mathieu Ouellet
* @since 3.0
*/
public class ReactiveMongoTestTemplate extends ReactiveMongoTemplate {
@ -96,7 +99,7 @@ public class ReactiveMongoTestTemplate extends ReactiveMongoTemplate { @@ -96,7 +99,7 @@ public class ReactiveMongoTestTemplate extends ReactiveMongoTemplate {
}
public Mono<Void> flushDatabase() {
return flush(getMongoDatabase().listCollectionNames());
return flush(getMongoDatabase().flatMapMany(MongoDatabase::listCollectionNames));
}
public Mono<Void> flush(Class<?>... entities) {
@ -110,8 +113,8 @@ public class ReactiveMongoTestTemplate extends ReactiveMongoTemplate { @@ -110,8 +113,8 @@ public class ReactiveMongoTestTemplate extends ReactiveMongoTemplate {
public Mono<Void> flush(Publisher<String> collectionNames) {
return Flux.from(collectionNames)
.flatMap(collection -> Mono.from(getCollection(collection).deleteMany(new Document())).then()
.onErrorResume(it -> Mono.from(getCollection(collection).drop()).then()))
.flatMap(collection -> getCollection(collection).flatMapMany(it -> it.deleteMany(new Document())).then()
.onErrorResume(it -> getCollection(collection).flatMapMany(MongoCollection::drop).then()))
.then();
}
@ -130,18 +133,15 @@ public class ReactiveMongoTestTemplate extends ReactiveMongoTemplate { @@ -130,18 +133,15 @@ public class ReactiveMongoTestTemplate extends ReactiveMongoTemplate {
}
public Mono<Void> dropDatabase() {
return Mono.from(getMongoDatabase().drop()).then();
return getMongoDatabase().map(MongoDatabase::drop).then();
}
public void dropIndexes(String... collections) {
for (String collection : collections) {
getCollection(collection).dropIndexes();
}
public Mono<Void> dropIndexes(String... collections) {
return Flux.fromArray(collections).flatMap(it -> getCollection(it).map(MongoCollection::dropIndexes).then()).then();
}
public void dropIndexes(Class<?>... entities) {
for (Class<?> entity : entities) {
getCollection(getCollectionName(entity)).dropIndexes();
}
public Mono<Void> dropIndexes(Class<?>... entities) {
return Flux.fromArray(entities)
.flatMap(it -> getCollection(getCollectionName(it)).map(MongoCollection::dropIndexes).then()).then();
}
}

Loading…
Cancel
Save