Browse Source

DATAMONGO-1444 - Update bulk insert operations.

Accept Mono<Collection<T>> instead of Publisher<T> to get rid of hidden buffer call inside of MongoTemplate.
pull/411/merge
Christoph Strobl 9 years ago committed by Oliver Gierke
parent
commit
098aae41b7
  1. 52
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java
  2. 22
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java
  3. 6
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java

52
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java

@ -168,7 +168,7 @@ public interface ReactiveMongoOperations {
/** /**
* A set of collection names. * A set of collection names.
* *
* @return list of collection names * @return Flux of collection names
*/ */
Flux<String> getCollectionNames(); Flux<String> getCollectionNames();
@ -221,21 +221,21 @@ public interface ReactiveMongoOperations {
Mono<Void> dropCollection(String collectionName); Mono<Void> dropCollection(String collectionName);
/** /**
* Query for a list of objects of type T from the collection used by the entity class. * Query for a {@link Flux} of objects of type T from the collection used by the entity class.
* <p/> * <p/>
* The object is converted from the MongoDB native representation using an instance of {@see MongoConverter}. Unless * The object is converted from the MongoDB native representation using an instance of {@see MongoConverter}. Unless
* configured otherwise, an instance of {@link MappingMongoConverter} will be used. * configured otherwise, an instance of {@link MappingMongoConverter} will be used.
* <p/> * <p/>
* If your collection does not contain a homogeneous collection of types, this operation will not be an efficient way * If your collection does not contain a homogeneous collection of types, this operation will not be an efficient way
* to map objects since the test for class type is done in the client and not on the server. * to map objects since the test for class type is done in the client and not on the server.
* @param entityClass the parametrized type of the returned {@link Flux}.
* *
* @param entityClass the parametrized type of the returned list
* @return the converted collection * @return the converted collection
*/ */
<T> Flux<T> findAll(Class<T> entityClass); <T> Flux<T> findAll(Class<T> entityClass);
/** /**
* Query for a list of objects of type T from the specified collection. * Query for a {@link Flux} of objects of type T from the specified collection.
* <p/> * <p/>
* The object is converted from the MongoDB native representation using an instance of {@see MongoConverter}. Unless * The object is converted from the MongoDB native representation using an instance of {@see MongoConverter}. Unless
* configured otherwise, an instance of {@link MappingMongoConverter} will be used. * configured otherwise, an instance of {@link MappingMongoConverter} will be used.
@ -243,7 +243,7 @@ public interface ReactiveMongoOperations {
* If your collection does not contain a homogeneous collection of types, this operation will not be an efficient way * If your collection does not contain a homogeneous collection of types, this operation will not be an efficient way
* to map objects since the test for class type is done in the client and not on the server. * to map objects since the test for class type is done in the client and not on the server.
* *
* @param entityClass the parametrized type of the returned list. * @param entityClass the parametrized type of the returned {@link Flux}.
* @param collectionName name of the collection to retrieve the objects from * @param collectionName name of the collection to retrieve the objects from
* @return the converted collection * @return the converted collection
*/ */
@ -261,7 +261,7 @@ public interface ReactiveMongoOperations {
* *
* @param query the query class that specifies the criteria used to find a record and also an optional fields * @param query the query class that specifies the criteria used to find a record and also an optional fields
* specification * specification
* @param entityClass the parametrized type of the returned list. * @param entityClass the parametrized type of the returned {@link Mono}.
* @return the converted object * @return the converted object
*/ */
<T> Mono<T> findOne(Query query, Class<T> entityClass); <T> Mono<T> findOne(Query query, Class<T> entityClass);
@ -278,7 +278,7 @@ public interface ReactiveMongoOperations {
* *
* @param query the query class that specifies the criteria used to find a record and also an optional fields * @param query the query class that specifies the criteria used to find a record and also an optional fields
* specification * specification
* @param entityClass the parametrized type of the returned list. * @param entityClass the parametrized type of the returned {@link Mono}.
* @param collectionName name of the collection to retrieve the objects from * @param collectionName name of the collection to retrieve the objects from
* @return the converted object * @return the converted object
*/ */
@ -313,7 +313,7 @@ public interface ReactiveMongoOperations {
Mono<Boolean> exists(Query query, Class<?> entityClass, String collectionName); Mono<Boolean> exists(Query query, Class<?> entityClass, String collectionName);
/** /**
* Map the results of an ad-hoc query on the collection for the entity class to a List of the specified type. * Map the results of an ad-hoc query on the collection for the entity class to a {@link Flux} of the specified type.
* <p/> * <p/>
* The object is converted from the MongoDB native representation using an instance of {@see MongoConverter}. Unless * The object is converted from the MongoDB native representation using an instance of {@see MongoConverter}. Unless
* configured otherwise, an instance of {@link MappingMongoConverter} will be used. * configured otherwise, an instance of {@link MappingMongoConverter} will be used.
@ -323,13 +323,13 @@ public interface ReactiveMongoOperations {
* *
* @param query the query class that specifies the criteria used to find a record and also an optional fields * @param query the query class that specifies the criteria used to find a record and also an optional fields
* specification * specification
* @param entityClass the parametrized type of the returned list. * @param entityClass the parametrized type of the returned {@link Flux}.
* @return the List of converted objects * @return the {@link Flux} of converted objects
*/ */
<T> Flux<T> find(Query query, Class<T> entityClass); <T> Flux<T> find(Query query, Class<T> entityClass);
/** /**
* Map the results of an ad-hoc query on the specified collection to a List of the specified type. * Map the results of an ad-hoc query on the specified collection to a {@link Flux} of the specified type.
* <p/> * <p/>
* The object is converted from the MongoDB native representation using an instance of {@see MongoConverter}. Unless * The object is converted from the MongoDB native representation using an instance of {@see MongoConverter}. Unless
* configured otherwise, an instance of {@link MappingMongoConverter} will be used. * configured otherwise, an instance of {@link MappingMongoConverter} will be used.
@ -339,9 +339,9 @@ public interface ReactiveMongoOperations {
* *
* @param query the query class that specifies the criteria used to find a record and also an optional fields * @param query the query class that specifies the criteria used to find a record and also an optional fields
* specification * specification
* @param entityClass the parametrized type of the returned list. * @param entityClass the parametrized type of the returned {@link Flux}.
* @param collectionName name of the collection to retrieve the objects from * @param collectionName name of the collection to retrieve the objects from
* @return the List of converted objects * @return the {@link Flux} of converted objects
*/ */
<T> Flux<T> find(Query query, Class<T> entityClass, String collectionName); <T> Flux<T> find(Query query, Class<T> entityClass, String collectionName);
@ -459,7 +459,7 @@ public interface ReactiveMongoOperations {
* *
* @param query the query class that specifies the criteria used to find a record and also an optional fields * @param query the query class that specifies the criteria used to find a record and also an optional fields
* specification * specification
* @param entityClass the parametrized type of the returned list. * @param entityClass the parametrized type of the returned {@link Mono}.
* @return the converted object * @return the converted object
*/ */
<T> Mono<T> findAndRemove(Query query, Class<T> entityClass); <T> Mono<T> findAndRemove(Query query, Class<T> entityClass);
@ -476,8 +476,8 @@ public interface ReactiveMongoOperations {
* *
* @param query the query class that specifies the criteria used to find a record and also an optional fields * @param query the query class that specifies the criteria used to find a record and also an optional fields
* specification * specification
* @param entityClass the parametrized type of the returned list. * @param entityClass the parametrized type of the returned {@link Mono}.
* @param collectionName name of the collection to retrieve the objects from * @param collectionName name of the collection to retrieve the objects from.
* @return the converted object * @return the converted object
*/ */
<T> Mono<T> findAndRemove(Query query, Class<T> entityClass, String collectionName); <T> Mono<T> findAndRemove(Query query, Class<T> entityClass, String collectionName);
@ -550,14 +550,14 @@ public interface ReactiveMongoOperations {
/** /**
* Insert a Collection of objects into a collection in a single batch write to the database. * Insert a Collection of objects into a collection in a single batch write to the database.
* *
* @param batchToSave the list of objects to save. * @param batchToSave the batch of objects to save.
* @param entityClass class that determines the collection to use * @param entityClass class that determines the collection to use
* @return * @return
*/ */
<T> Flux<T> insert(Collection<? extends T> batchToSave, Class<?> entityClass); <T> Flux<T> insert(Collection<? extends T> batchToSave, Class<?> entityClass);
/** /**
* Insert a list of objects into the specified collection in a single batch write to the database. * Insert a batch of objects into the specified collection in a single batch write to the database.
* *
* @param batchToSave the list of objects to save. * @param batchToSave the list of objects to save.
* @param collectionName name of the collection to store the object in * @param collectionName name of the collection to store the object in
@ -600,16 +600,16 @@ public interface ReactiveMongoOperations {
* @param entityClass class that determines the collection to use * @param entityClass class that determines the collection to use
* @return * @return
*/ */
<T> Flux<T> insert(Publisher<? extends T> batchToSave, Class<?> entityClass); <T> Flux<T> insertAll(Mono<Collection<? extends T>> batchToSave, Class<?> entityClass);
/** /**
* Insert a list of objects into the specified collection in a single batch write to the database. * Insert objects into the specified collection in a single batch write to the database.
* *
* @param batchToSave the publisher which provides objects to save. * @param batchToSave the publisher which provides objects to save.
* @param collectionName name of the collection to store the object in * @param collectionName name of the collection to store the object in
* @return * @return
*/ */
<T> Flux<T> insert(Publisher<? extends T> batchToSave, String collectionName); <T> Flux<T> insertAll(Mono<Collection<? extends T>> batchToSave, String collectionName);
/** /**
* Insert a mixed Collection of objects into a database collection determining the collection name to use based on the * Insert a mixed Collection of objects into a database collection determining the collection name to use based on the
@ -618,7 +618,7 @@ public interface ReactiveMongoOperations {
* @param objectsToSave the publisher which provides objects to save. * @param objectsToSave the publisher which provides objects to save.
* @return * @return
*/ */
<T> Flux<T> insertAll(Publisher<? extends T> objectsToSave); <T> Flux<T> insertAll(Mono<Collection<? extends T>> objectsToSave);
/** /**
* Save the object to the collection for the entity type of the object to save. This will perform an insert if the * Save the object to the collection for the entity type of the object to save. This will perform an insert if the
@ -910,8 +910,8 @@ public interface ReactiveMongoOperations {
* *
* @param query the query class that specifies the criteria used to find a record and also an optional fields * @param query the query class that specifies the criteria used to find a record and also an optional fields
* specification * specification
* @param entityClass the parametrized type of the returned list. * @param entityClass the parametrized type of the returned {@link Flux}.
* @return the List of converted objects * @return the {@link Flux} of converted objects
*/ */
<T> Flux<T> tail(Query query, Class<T> entityClass); <T> Flux<T> tail(Query query, Class<T> entityClass);
@ -929,9 +929,9 @@ public interface ReactiveMongoOperations {
* *
* @param query the query class that specifies the criteria used to find a record and also an optional fields * @param query the query class that specifies the criteria used to find a record and also an optional fields
* specification * specification
* @param entityClass the parametrized type of the returned list. * @param entityClass the parametrized type of the returned {@link Flux}.
* @param collectionName name of the collection to retrieve the objects from * @param collectionName name of the collection to retrieve the objects from
* @return the List of converted objects * @return the {@link Flux} of converted objects
*/ */
<T> Flux<T> tail(Query query, Class<T> entityClass, String collectionName); <T> Flux<T> tail(Query query, Class<T> entityClass, String collectionName);

22
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java

@ -160,7 +160,6 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
private final QueryMapper queryMapper; private final QueryMapper queryMapper;
private final UpdateMapper updateMapper; private final UpdateMapper updateMapper;
private int publisherBatchSize = 10;
private WriteConcern writeConcern; private WriteConcern writeConcern;
private WriteConcernResolver writeConcernResolver = DefaultWriteConcernResolver.INSTANCE; private WriteConcernResolver writeConcernResolver = DefaultWriteConcernResolver.INSTANCE;
private WriteResultChecking writeResultChecking = WriteResultChecking.NONE; private WriteResultChecking writeResultChecking = WriteResultChecking.NONE;
@ -257,15 +256,6 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
this.readPreference = readPreference; this.readPreference = readPreference;
} }
/**
* Used to set a batch size when working with batches of {@link Publisher} emitting items to insert.
*
* @param publisherBatchSize batch size
*/
public void setPublisherBatchSize(int publisherBatchSize) {
this.publisherBatchSize = publisherBatchSize;
}
/* /*
* (non-Javadoc) * (non-Javadoc)
* @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext) * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext)
@ -779,16 +769,16 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
* @see org.springframework.data.mongodb.core.ReactiveMongoOperations#insert(org.reactivestreams.Publisher, java.lang.Class) * @see org.springframework.data.mongodb.core.ReactiveMongoOperations#insert(org.reactivestreams.Publisher, java.lang.Class)
*/ */
@Override @Override
public <T> Flux<T> insert(Publisher<? extends T> batchToSave, Class<?> entityClass) { public <T> Flux<T> insertAll(Mono<Collection<? extends T>> batchToSave, Class<?> entityClass) {
return insert(batchToSave, determineCollectionName(entityClass)); return insertAll(batchToSave, determineCollectionName(entityClass));
} }
/* (non-Javadoc) /* (non-Javadoc)
* @see org.springframework.data.mongodb.core.ReactiveMongoOperations#insert(org.reactivestreams.Publisher, java.lang.String) * @see org.springframework.data.mongodb.core.ReactiveMongoOperations#insert(org.reactivestreams.Publisher, java.lang.String)
*/ */
@Override @Override
public <T> Flux<T> insert(Publisher<? extends T> batchToSave, String collectionName) { public <T> Flux<T> insertAll(Mono<Collection<? extends T>> batchToSave, String collectionName) {
return Flux.from(batchToSave).buffer(publisherBatchSize).flatMap(collection -> insert(collection, collectionName)); return Flux.from(batchToSave).flatMap(collection -> insert(collection, collectionName));
} }
/* (non-Javadoc) /* (non-Javadoc)
@ -857,8 +847,8 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
* @see org.springframework.data.mongodb.core.ReactiveMongoOperations#insertAll(org.reactivestreams.Publisher) * @see org.springframework.data.mongodb.core.ReactiveMongoOperations#insertAll(org.reactivestreams.Publisher)
*/ */
@Override @Override
public <T> Flux<T> insertAll(Publisher<? extends T> objectsToSave) { public <T> Flux<T> insertAll(Mono<Collection<? extends T>> objectsToSave) {
return Flux.from(objectsToSave).buffer(publisherBatchSize).flatMap(this::insertAll); return Flux.from(objectsToSave).flatMap(this::insertAll);
} }
protected <T> Flux<T> doInsertAll(Collection<? extends T> listToSave, MongoWriter<Object> writer) { protected <T> Flux<T> doInsertAll(Collection<? extends T> listToSave, MongoWriter<Object> writer) {

6
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java

@ -68,6 +68,7 @@ import com.mongodb.WriteConcern;
import lombok.Data; import lombok.Data;
import reactor.core.Cancellation; import reactor.core.Cancellation;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.TestSubscriber; import reactor.test.TestSubscriber;
/** /**
@ -404,8 +405,7 @@ public class ReactiveMongoTemplateTests {
Query query = new Query( Query query = new Query(
new Criteria().orOperator(where("firstName").is("Walter Jr"), Criteria.where("firstName").is("Walter"))); new Criteria().orOperator(where("firstName").is("Walter Jr"), Criteria.where("firstName").is("Walter")));
template.insertAll(Flux.just(new Person("Walter", 50), new Person("Skyler", 43), new Person("Walter Jr", 16))) // template.insertAll(Mono.just(Arrays.asList(new Person("Walter", 50), new Person("Skyler", 43), new Person("Walter Jr", 16)))) //
.collectList() //
.flatMap(a -> template.updateMulti(query, new Update().set("firstName", "Walt"), Person.class)) // .flatMap(a -> template.updateMulti(query, new Update().set("firstName", "Walt"), Person.class)) //
.flatMap(p -> template.find(new Query(where("firstName").is("Walt")), Person.class)) // .flatMap(p -> template.find(new Query(where("firstName").is("Walt")), Person.class)) //
.subscribeWith(TestSubscriber.create()) // .subscribeWith(TestSubscriber.create()) //
@ -422,7 +422,7 @@ public class ReactiveMongoTemplateTests {
new Criteria().orOperator(where("firstName").is("Walter Jr"), Criteria.where("firstName").is("Walter"))); new Criteria().orOperator(where("firstName").is("Walter Jr"), Criteria.where("firstName").is("Walter")));
template template
.insert(Flux.just(new Person("Walter", 50), new Person("Skyler", 43), new Person("Walter Jr", 16)), "people") // .insertAll(Mono.just(Arrays.asList(new Person("Walter", 50), new Person("Skyler", 43), new Person("Walter Jr", 16))), "people") //
.collectList() // .collectList() //
.flatMap(a -> template.updateMulti(query, new Update().set("firstName", "Walt"), Person.class, "people")) // .flatMap(a -> template.updateMulti(query, new Update().set("firstName", "Walt"), Person.class, "people")) //
.flatMap(p -> template.find(new Query(where("firstName").is("Walt")), Person.class, "people")) // .flatMap(p -> template.find(new Query(where("firstName").is("Walt")), Person.class, "people")) //

Loading…
Cancel
Save