From 098aae41b7e02c59c95278fa3e244c0c47eaca7e Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Mon, 7 Nov 2016 13:06:02 +0100 Subject: [PATCH] DATAMONGO-1444 - Update bulk insert operations. Accept Mono> instead of Publisher to get rid of hidden buffer call inside of MongoTemplate. --- .../mongodb/core/ReactiveMongoOperations.java | 52 +++++++++---------- .../mongodb/core/ReactiveMongoTemplate.java | 22 +++----- .../core/ReactiveMongoTemplateTests.java | 6 +-- 3 files changed, 35 insertions(+), 45 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java index de0febc5c..c7ff5066e 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java @@ -168,7 +168,7 @@ public interface ReactiveMongoOperations { /** * A set of collection names. * - * @return list of collection names + * @return Flux of collection names */ Flux getCollectionNames(); @@ -221,21 +221,21 @@ public interface ReactiveMongoOperations { Mono dropCollection(String collectionName); /** - * Query for a list of objects of type T from the collection used by the entity class. + * Query for a {@link Flux} of objects of type T from the collection used by the entity class. *

* The object is converted from the MongoDB native representation using an instance of {@see MongoConverter}. Unless * configured otherwise, an instance of {@link MappingMongoConverter} will be used. *

* If your collection does not contain a homogeneous collection of types, this operation will not be an efficient way * to map objects since the test for class type is done in the client and not on the server. + * @param entityClass the parametrized type of the returned {@link Flux}. * - * @param entityClass the parametrized type of the returned list * @return the converted collection */ Flux findAll(Class entityClass); /** - * Query for a list of objects of type T from the specified collection. + * Query for a {@link Flux} of objects of type T from the specified collection. *

* The object is converted from the MongoDB native representation using an instance of {@see MongoConverter}. Unless * configured otherwise, an instance of {@link MappingMongoConverter} will be used. @@ -243,7 +243,7 @@ public interface ReactiveMongoOperations { * If your collection does not contain a homogeneous collection of types, this operation will not be an efficient way * to map objects since the test for class type is done in the client and not on the server. * - * @param entityClass the parametrized type of the returned list. + * @param entityClass the parametrized type of the returned {@link Flux}. * @param collectionName name of the collection to retrieve the objects from * @return the converted collection */ @@ -261,7 +261,7 @@ public interface ReactiveMongoOperations { * * @param query the query class that specifies the criteria used to find a record and also an optional fields * specification - * @param entityClass the parametrized type of the returned list. + * @param entityClass the parametrized type of the returned {@link Mono}. * @return the converted object */ Mono findOne(Query query, Class entityClass); @@ -278,7 +278,7 @@ public interface ReactiveMongoOperations { * * @param query the query class that specifies the criteria used to find a record and also an optional fields * specification - * @param entityClass the parametrized type of the returned list. + * @param entityClass the parametrized type of the returned {@link Mono}. * @param collectionName name of the collection to retrieve the objects from * @return the converted object */ @@ -313,7 +313,7 @@ public interface ReactiveMongoOperations { Mono exists(Query query, Class entityClass, String collectionName); /** - * Map the results of an ad-hoc query on the collection for the entity class to a List of the specified type. + * Map the results of an ad-hoc query on the collection for the entity class to a {@link Flux} of the specified type. *

* The object is converted from the MongoDB native representation using an instance of {@see MongoConverter}. Unless * configured otherwise, an instance of {@link MappingMongoConverter} will be used. @@ -323,13 +323,13 @@ public interface ReactiveMongoOperations { * * @param query the query class that specifies the criteria used to find a record and also an optional fields * specification - * @param entityClass the parametrized type of the returned list. - * @return the List of converted objects + * @param entityClass the parametrized type of the returned {@link Flux}. + * @return the {@link Flux} of converted objects */ Flux find(Query query, Class entityClass); /** - * Map the results of an ad-hoc query on the specified collection to a List of the specified type. + * Map the results of an ad-hoc query on the specified collection to a {@link Flux} of the specified type. *

* The object is converted from the MongoDB native representation using an instance of {@see MongoConverter}. Unless * configured otherwise, an instance of {@link MappingMongoConverter} will be used. @@ -339,9 +339,9 @@ public interface ReactiveMongoOperations { * * @param query the query class that specifies the criteria used to find a record and also an optional fields * specification - * @param entityClass the parametrized type of the returned list. + * @param entityClass the parametrized type of the returned {@link Flux}. * @param collectionName name of the collection to retrieve the objects from - * @return the List of converted objects + * @return the {@link Flux} of converted objects */ Flux find(Query query, Class entityClass, String collectionName); @@ -459,7 +459,7 @@ public interface ReactiveMongoOperations { * * @param query the query class that specifies the criteria used to find a record and also an optional fields * specification - * @param entityClass the parametrized type of the returned list. + * @param entityClass the parametrized type of the returned {@link Mono}. * @return the converted object */ Mono findAndRemove(Query query, Class entityClass); @@ -476,8 +476,8 @@ public interface ReactiveMongoOperations { * * @param query the query class that specifies the criteria used to find a record and also an optional fields * specification - * @param entityClass the parametrized type of the returned list. - * @param collectionName name of the collection to retrieve the objects from + * @param entityClass the parametrized type of the returned {@link Mono}. + * @param collectionName name of the collection to retrieve the objects from. * @return the converted object */ Mono findAndRemove(Query query, Class entityClass, String collectionName); @@ -550,14 +550,14 @@ public interface ReactiveMongoOperations { /** * Insert a Collection of objects into a collection in a single batch write to the database. * - * @param batchToSave the list of objects to save. + * @param batchToSave the batch of objects to save. * @param entityClass class that determines the collection to use * @return */ Flux insert(Collection batchToSave, Class entityClass); /** - * Insert a list of objects into the specified collection in a single batch write to the database. + * Insert a batch of objects into the specified collection in a single batch write to the database. * * @param batchToSave the list of objects to save. * @param collectionName name of the collection to store the object in @@ -600,16 +600,16 @@ public interface ReactiveMongoOperations { * @param entityClass class that determines the collection to use * @return */ - Flux insert(Publisher batchToSave, Class entityClass); + Flux insertAll(Mono> batchToSave, Class entityClass); /** - * Insert a list of objects into the specified collection in a single batch write to the database. + * Insert objects into the specified collection in a single batch write to the database. * * @param batchToSave the publisher which provides objects to save. * @param collectionName name of the collection to store the object in * @return */ - Flux insert(Publisher batchToSave, String collectionName); + Flux insertAll(Mono> batchToSave, String collectionName); /** * Insert a mixed Collection of objects into a database collection determining the collection name to use based on the @@ -618,7 +618,7 @@ public interface ReactiveMongoOperations { * @param objectsToSave the publisher which provides objects to save. * @return */ - Flux insertAll(Publisher objectsToSave); + Flux insertAll(Mono> objectsToSave); /** * Save the object to the collection for the entity type of the object to save. This will perform an insert if the @@ -910,8 +910,8 @@ public interface ReactiveMongoOperations { * * @param query the query class that specifies the criteria used to find a record and also an optional fields * specification - * @param entityClass the parametrized type of the returned list. - * @return the List of converted objects + * @param entityClass the parametrized type of the returned {@link Flux}. + * @return the {@link Flux} of converted objects */ Flux tail(Query query, Class entityClass); @@ -929,9 +929,9 @@ public interface ReactiveMongoOperations { * * @param query the query class that specifies the criteria used to find a record and also an optional fields * specification - * @param entityClass the parametrized type of the returned list. + * @param entityClass the parametrized type of the returned {@link Flux}. * @param collectionName name of the collection to retrieve the objects from - * @return the List of converted objects + * @return the {@link Flux} of converted objects */ Flux tail(Query query, Class entityClass, String collectionName); diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java index 9b60168f7..19f03d0d0 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java @@ -160,7 +160,6 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati private final QueryMapper queryMapper; private final UpdateMapper updateMapper; - private int publisherBatchSize = 10; private WriteConcern writeConcern; private WriteConcernResolver writeConcernResolver = DefaultWriteConcernResolver.INSTANCE; private WriteResultChecking writeResultChecking = WriteResultChecking.NONE; @@ -257,15 +256,6 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati this.readPreference = readPreference; } - /** - * Used to set a batch size when working with batches of {@link Publisher} emitting items to insert. - * - * @param publisherBatchSize batch size - */ - public void setPublisherBatchSize(int publisherBatchSize) { - this.publisherBatchSize = publisherBatchSize; - } - /* * (non-Javadoc) * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext) @@ -779,16 +769,16 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati * @see org.springframework.data.mongodb.core.ReactiveMongoOperations#insert(org.reactivestreams.Publisher, java.lang.Class) */ @Override - public Flux insert(Publisher batchToSave, Class entityClass) { - return insert(batchToSave, determineCollectionName(entityClass)); + public Flux insertAll(Mono> batchToSave, Class entityClass) { + return insertAll(batchToSave, determineCollectionName(entityClass)); } /* (non-Javadoc) * @see org.springframework.data.mongodb.core.ReactiveMongoOperations#insert(org.reactivestreams.Publisher, java.lang.String) */ @Override - public Flux insert(Publisher batchToSave, String collectionName) { - return Flux.from(batchToSave).buffer(publisherBatchSize).flatMap(collection -> insert(collection, collectionName)); + public Flux insertAll(Mono> batchToSave, String collectionName) { + return Flux.from(batchToSave).flatMap(collection -> insert(collection, collectionName)); } /* (non-Javadoc) @@ -857,8 +847,8 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati * @see org.springframework.data.mongodb.core.ReactiveMongoOperations#insertAll(org.reactivestreams.Publisher) */ @Override - public Flux insertAll(Publisher objectsToSave) { - return Flux.from(objectsToSave).buffer(publisherBatchSize).flatMap(this::insertAll); + public Flux insertAll(Mono> objectsToSave) { + return Flux.from(objectsToSave).flatMap(this::insertAll); } protected Flux doInsertAll(Collection listToSave, MongoWriter writer) { diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java index bcb83548e..e0d26cdc0 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java @@ -68,6 +68,7 @@ import com.mongodb.WriteConcern; import lombok.Data; import reactor.core.Cancellation; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.test.TestSubscriber; /** @@ -404,8 +405,7 @@ public class ReactiveMongoTemplateTests { Query query = new Query( new Criteria().orOperator(where("firstName").is("Walter Jr"), Criteria.where("firstName").is("Walter"))); - template.insertAll(Flux.just(new Person("Walter", 50), new Person("Skyler", 43), new Person("Walter Jr", 16))) // - .collectList() // + template.insertAll(Mono.just(Arrays.asList(new Person("Walter", 50), new Person("Skyler", 43), new Person("Walter Jr", 16)))) // .flatMap(a -> template.updateMulti(query, new Update().set("firstName", "Walt"), Person.class)) // .flatMap(p -> template.find(new Query(where("firstName").is("Walt")), Person.class)) // .subscribeWith(TestSubscriber.create()) // @@ -422,7 +422,7 @@ public class ReactiveMongoTemplateTests { new Criteria().orOperator(where("firstName").is("Walter Jr"), Criteria.where("firstName").is("Walter"))); template - .insert(Flux.just(new Person("Walter", 50), new Person("Skyler", 43), new Person("Walter Jr", 16)), "people") // + .insertAll(Mono.just(Arrays.asList(new Person("Walter", 50), new Person("Skyler", 43), new Person("Walter Jr", 16))), "people") // .collectList() // .flatMap(a -> template.updateMulti(query, new Update().set("firstName", "Walt"), Person.class, "people")) // .flatMap(p -> template.find(new Query(where("firstName").is("Walt")), Person.class, "people")) //