From e94de2f9ad7a417db3f52926cd22d7195cb6002a Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Tue, 24 Feb 2026 10:57:51 +0100 Subject: [PATCH] Route single and multi collection bulk writes. --- .../data/mongodb/core/BulkWriteSupport.java | 73 ++++- .../data/mongodb/core/BulkWriter.java | 270 +++++++++++++--- .../data/mongodb/core/MongoTemplate.java | 4 +- .../data/mongodb/core/ReactiveBulkWriter.java | 303 ++++++++++++++---- .../mongodb/core/ReactiveMongoTemplate.java | 4 +- .../core/bulk/BulkOperationResult.java | 46 +++ .../core/MongoTemplateBulkUnitTests.java | 75 ++++- .../ReactiveMongoTemplateBulkUnitTests.java | 130 +++++--- 8 files changed, 735 insertions(+), 170 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/BulkWriteSupport.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/BulkWriteSupport.java index 350e491ee..4b35186f3 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/BulkWriteSupport.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/BulkWriteSupport.java @@ -17,15 +17,23 @@ package org.springframework.data.mongodb.core; import java.util.List; -import com.mongodb.client.model.bulk.ClientDeleteOneOptions; -import com.mongodb.client.model.bulk.ClientReplaceOneOptions; import org.bson.Document; +import org.springframework.util.ClassUtils; import com.mongodb.MongoNamespace; +import com.mongodb.client.model.DeleteManyModel; +import com.mongodb.client.model.DeleteOneModel; import com.mongodb.client.model.DeleteOptions; +import com.mongodb.client.model.ReplaceOneModel; +import com.mongodb.client.model.ReplaceOptions; +import com.mongodb.client.model.UpdateManyModel; +import com.mongodb.client.model.UpdateOneModel; import com.mongodb.client.model.UpdateOptions; +import com.mongodb.client.model.WriteModel; import com.mongodb.client.model.bulk.ClientDeleteManyOptions; +import com.mongodb.client.model.bulk.ClientDeleteOneOptions; import com.mongodb.client.model.bulk.ClientNamespacedWriteModel; +import com.mongodb.client.model.bulk.ClientReplaceOneOptions; import com.mongodb.client.model.bulk.ClientUpdateManyOptions; import com.mongodb.client.model.bulk.ClientUpdateOneOptions; @@ -34,6 +42,50 @@ import com.mongodb.client.model.bulk.ClientUpdateOneOptions; */ abstract class BulkWriteSupport { + static WriteModel updateMany(Document query, Object update, UpdateOptions updateOptions) { + + if (update instanceof List pipeline) { + return new UpdateManyModel<>(query, (List) pipeline, updateOptions); + } else if (update instanceof Document updateDocument) { + return new UpdateManyModel<>(query, updateDocument, updateOptions); + } else { + throw new IllegalArgumentException( + "Update needs to be either a List or a Document, but was [%s]".formatted(ClassUtils.getUserClass(update))); + } + } + + static WriteModel updateOne(Document query, Object update, UpdateOptions updateOptions) { + + if (update instanceof List pipeline) { + return new UpdateOneModel<>(query, (List) pipeline, updateOptions); + } else if (update instanceof Document updateDocument) { + return new UpdateOneModel<>(query, updateDocument, updateOptions); + } else { + throw new IllegalArgumentException( + "Update needs to be either a List or a Document, but was [%s]".formatted(ClassUtils.getUserClass(update))); + } + } + + static WriteModel removeMany(Document query, DeleteOptions deleteOptions) { + return new DeleteManyModel<>(query, deleteOptions); + } + + static WriteModel removeOne(Document query, DeleteOptions deleteOptions) { + return new DeleteOneModel<>(query, deleteOptions); + } + + static WriteModel replaceOne(Document query, Document replacement, UpdateOptions updateOptions) { + + ReplaceOptions replaceOptions = new ReplaceOptions(); + replaceOptions.collation(updateOptions.getCollation()); + replaceOptions.upsert(updateOptions.isUpsert()); + replaceOptions.sort(updateOptions.getSort()); + replaceOptions.hint(updateOptions.getHint()); + replaceOptions.hintString(updateOptions.getHintString()); + + return new ReplaceOneModel<>(query, replacement, replaceOptions); + } + static ClientNamespacedWriteModel updateMany(MongoNamespace namespace, Document query, Object update, UpdateOptions updateOptions) { @@ -46,8 +98,11 @@ abstract class BulkWriteSupport { if (update instanceof List pipeline) { return ClientNamespacedWriteModel.updateMany(namespace, query, (List) pipeline, updateManyOptions); + } else if (update instanceof Document updateDocument) { + return ClientNamespacedWriteModel.updateMany(namespace, query, updateDocument, updateManyOptions); } else { - return ClientNamespacedWriteModel.updateMany(namespace, query, (Document) update, updateManyOptions); + throw new IllegalArgumentException( + "Update needs to be either a List or a Document, but was [%s]".formatted(ClassUtils.getUserClass(update))); } } @@ -64,8 +119,11 @@ abstract class BulkWriteSupport { if (update instanceof List pipeline) { return ClientNamespacedWriteModel.updateOne(namespace, query, (List) pipeline, updateOneOptions); + } else if (update instanceof Document updateDocument) { + return ClientNamespacedWriteModel.updateOne(namespace, query, updateDocument, updateOneOptions); } else { - return ClientNamespacedWriteModel.updateOne(namespace, query, (Document) update, updateOneOptions); + throw new IllegalArgumentException( + "Update needs to be either a List or a Document, but was [%s]".formatted(ClassUtils.getUserClass(update))); } } @@ -87,11 +145,11 @@ abstract class BulkWriteSupport { clientDeleteOneOptions.hint(deleteOptions.getHint()); clientDeleteOneOptions.hintString(deleteOptions.getHintString()); - return ClientNamespacedWriteModel.deleteOne(namespace, query, clientDeleteOneOptions); } - static ClientNamespacedWriteModel replaceOne(MongoNamespace namespace, Document query, Document replacement, UpdateOptions updateOptions) { + static ClientNamespacedWriteModel replaceOne(MongoNamespace namespace, Document query, Document replacement, + UpdateOptions updateOptions) { ClientReplaceOneOptions replaceOptions = ClientReplaceOneOptions.clientReplaceOneOptions(); replaceOptions.sort(updateOptions.getSort()); @@ -100,7 +158,6 @@ abstract class BulkWriteSupport { replaceOptions.hintString(updateOptions.getHintString()); replaceOptions.collation(updateOptions.getCollation()); - return ClientNamespacedWriteModel.replaceOne(namespace, query, - replacement, replaceOptions); + return ClientNamespacedWriteModel.replaceOne(namespace, query, replacement, replaceOptions); } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/BulkWriter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/BulkWriter.java index 91a147fe2..9d88e1479 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/BulkWriter.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/BulkWriter.java @@ -17,6 +17,9 @@ package org.springframework.data.mongodb.core; import java.util.ArrayList; import java.util.List; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; import org.bson.Document; import org.springframework.dao.DataAccessException; @@ -24,7 +27,6 @@ import org.springframework.data.mongodb.core.MongoTemplate.SourceAwareDocument; import org.springframework.data.mongodb.core.QueryOperations.DeleteContext; import org.springframework.data.mongodb.core.QueryOperations.UpdateContext; import org.springframework.data.mongodb.core.bulk.Bulk; -import org.springframework.data.mongodb.core.bulk.BulkWriteOptions; import org.springframework.data.mongodb.core.bulk.BulkOperation; import org.springframework.data.mongodb.core.bulk.BulkOperation.Insert; import org.springframework.data.mongodb.core.bulk.BulkOperation.Remove; @@ -32,22 +34,29 @@ import org.springframework.data.mongodb.core.bulk.BulkOperation.RemoveFirst; import org.springframework.data.mongodb.core.bulk.BulkOperation.Replace; import org.springframework.data.mongodb.core.bulk.BulkOperation.Update; import org.springframework.data.mongodb.core.bulk.BulkOperation.UpdateFirst; +import org.springframework.data.mongodb.core.bulk.BulkOperationContext.TypedNamespace; +import org.springframework.data.mongodb.core.bulk.BulkOperationResult; +import org.springframework.data.mongodb.core.bulk.BulkWriteOptions; import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity; import org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent; +import org.springframework.util.StringUtils; import com.mongodb.MongoBulkWriteException; import com.mongodb.MongoNamespace; +import com.mongodb.bulk.BulkWriteResult; import com.mongodb.client.model.DeleteOptions; +import com.mongodb.client.model.InsertOneModel; import com.mongodb.client.model.UpdateOptions; +import com.mongodb.client.model.WriteModel; import com.mongodb.client.model.bulk.ClientBulkWriteOptions; import com.mongodb.client.model.bulk.ClientBulkWriteResult; import com.mongodb.client.model.bulk.ClientNamespacedWriteModel; /** * Internal API wrapping a {@link MongoTemplate} to encapsulate {@link Bulk} handling. - * + * * @author Christoph Strobl - * @since 2026/02 + * @since 5.1 */ class BulkWriter { @@ -57,24 +66,84 @@ class BulkWriter { this.template = template; } - public ClientBulkWriteResult write(String defaultDatabase, Bulk bulk, BulkWriteOptions options) { + public BulkOperationResult write(String defaultDatabase, Bulk bulk, BulkWriteOptions options) { + + Set namespaces = bulk.operations().stream().map(it -> it.context().namespace()) + .collect(Collectors.toSet()); + if (namespaces.size() == 1) { + return writeToSingleCollection(defaultDatabase, bulk, options, namespaces.iterator().next()); + } + return writeToMultipleCollections(defaultDatabase, bulk, options); + } + + private BulkOperationResult writeToSingleCollection(String defaultDatabase, Bulk bulk, + BulkWriteOptions options, TypedNamespace namespace) { - List writeModels = new ArrayList<>(); - List> afterSaveCallables = new ArrayList<>(); + MongoNamespace mongoNamespace = new MongoNamespace(defaultDatabase, + StringUtils.hasText(namespace.collection()) ? namespace.collection() + : template.getCollectionName(namespace.type())); + + SingleCollectionCollector collector = new SingleCollectionCollector(mongoNamespace); + buildWriteModels(bulk, collector); + + try { + BulkWriteResult bulkWriteResult = template.execute(collector.getNamespace().getCollectionName(), + collection -> collection.bulkWrite(collector.getWriteModels(), new com.mongodb.client.model.BulkWriteOptions() + .ordered(options.getOrder().equals(BulkWriteOptions.Order.ORDERED)))); + + collector.getAfterSaveCallables().forEach(callable -> { + template + .maybeEmitEvent(new AfterSaveEvent<>(callable.source(), callable.document(), callable.collectionName())); + template.maybeCallAfterSave(callable.source(), callable.document(), callable.collectionName()); + }); + return BulkOperationResult.from(bulkWriteResult); + } catch (MongoBulkWriteException e) { + DataAccessException dataAccessException = template.getExceptionTranslator().translateExceptionIfPossible(e); + if (dataAccessException != null) { + throw dataAccessException; + } + throw e; + } + } + + private BulkOperationResult writeToMultipleCollections(String defaultDatabase, Bulk bulk, + BulkWriteOptions options) { + + MultiCollectionCollector collector = new MultiCollectionCollector(defaultDatabase); + buildWriteModels(bulk, collector); + + try { + + ClientBulkWriteResult clientBulkWriteResult = template + .doWithClient(client -> client.bulkWrite(collector.getWriteModels(), ClientBulkWriteOptions + .clientBulkWriteOptions().ordered(options.getOrder().equals(BulkWriteOptions.Order.ORDERED)))); + + collector.getAfterSaveCallables().forEach(callable -> { + template + .maybeEmitEvent(new AfterSaveEvent<>(callable.source(), callable.document(), callable.collectionName())); + template.maybeCallAfterSave(callable.source(), callable.document(), callable.collectionName()); + }); + return BulkOperationResult.from(clientBulkWriteResult); + } catch (MongoBulkWriteException e) { + DataAccessException dataAccessException = template.getExceptionTranslator().translateExceptionIfPossible(e); + if (dataAccessException != null) { + throw dataAccessException; + } + throw e; + } + } + + private void buildWriteModels(Bulk bulk, WriteModelCollector collector) { for (BulkOperation bulkOp : bulk.operations()) { - String collectionName = bulkOp.context().namespace().collection() != null - ? bulkOp.context().namespace().collection() - : template.getCollectionName(bulkOp.context().namespace().type()); + MongoNamespace namespace = collector.resolveNamespace(bulkOp, it -> template.getCollectionName(it.type())); - MongoNamespace mongoNamespace = new MongoNamespace(defaultDatabase, collectionName); if (bulkOp instanceof Insert insert) { - SourceAwareDocument sourceAwareDocument = template.prepareObjectForSave(collectionName, insert.value(), - template.getConverter()); - writeModels.add(ClientNamespacedWriteModel.insertOne(mongoNamespace, sourceAwareDocument.document())); - afterSaveCallables.add(sourceAwareDocument); + SourceAwareDocument sourceAwareDocument = template.prepareObjectForSave(namespace.getCollectionName(), + insert.value(), template.getConverter()); + collector.addInsert(namespace, sourceAwareDocument.document(), sourceAwareDocument); } else if (bulkOp instanceof Update update) { Class domainType = update.context().namespace().type(); @@ -89,11 +158,7 @@ class BulkWriter { : updateContext.getMappedUpdate(entity); UpdateOptions updateOptions = updateContext.getUpdateOptions(domainType, update.query()); - if (multi) { - writeModels.add(BulkWriteSupport.updateMany(mongoNamespace, mappedQuery, mappedUpdate, updateOptions)); - } else { - writeModels.add(BulkWriteSupport.updateOne(mongoNamespace, mappedQuery, mappedUpdate, updateOptions)); - } + collector.addUpdate(namespace, multi, mappedQuery, mappedUpdate, updateOptions); } else if (bulkOp instanceof Remove remove) { Class domainType = remove.context().namespace().type(); @@ -102,16 +167,12 @@ class BulkWriter { Document mappedQuery = deleteContext.getMappedQuery(template.getPersistentEntity(domainType)); DeleteOptions deleteOptions = deleteContext.getDeleteOptions(domainType); - if (remove instanceof RemoveFirst) { - writeModels.add(BulkWriteSupport.removeOne(mongoNamespace, mappedQuery, deleteOptions)); - } else { - writeModels.add(BulkWriteSupport.removeMany(mongoNamespace, mappedQuery, deleteOptions)); - } + collector.addRemove(namespace, remove instanceof RemoveFirst, mappedQuery, deleteOptions); } else if (bulkOp instanceof Replace replace) { Class domainType = replace.context().namespace().type(); - SourceAwareDocument sourceAwareDocument = template.prepareObjectForSave(collectionName, + SourceAwareDocument sourceAwareDocument = template.prepareObjectForSave(namespace.getCollectionName(), replace.replacement(), template.getConverter()); UpdateContext updateContext = template.getQueryOperations().replaceSingleContext(replace.query(), @@ -120,29 +181,154 @@ class BulkWriter { Document mappedQuery = updateContext.getMappedQuery(template.getPersistentEntity(domainType)); UpdateOptions updateOptions = updateContext.getUpdateOptions(domainType, replace.query()); - writeModels.add( - BulkWriteSupport.replaceOne(mongoNamespace, mappedQuery, sourceAwareDocument.document(), updateOptions)); - afterSaveCallables.add(sourceAwareDocument); + collector.addReplace(namespace, mappedQuery, sourceAwareDocument.document(), updateOptions, + sourceAwareDocument); } } + } - try { + private interface WriteModelCollector { - ClientBulkWriteResult clientBulkWriteResult = template.doWithClient(client -> client.bulkWrite(writeModels, - ClientBulkWriteOptions.clientBulkWriteOptions().ordered(options.getOrder().equals(BulkWriteOptions.Order.ORDERED)))); + MongoNamespace resolveNamespace(String collectionName); - afterSaveCallables.forEach(callable -> { - template - .maybeEmitEvent(new AfterSaveEvent<>(callable.source(), callable.document(), callable.collectionName())); - template.maybeCallAfterSave(callable.source(), callable.document(), callable.collectionName()); - }); - return clientBulkWriteResult; - } catch (MongoBulkWriteException e) { - DataAccessException dataAccessException = template.getExceptionTranslator().translateExceptionIfPossible(e); - if (dataAccessException != null) { - throw dataAccessException; + default MongoNamespace resolveNamespace(BulkOperation operation, Function fallback) { + + TypedNamespace typedNamespace = operation.context().namespace(); + if (StringUtils.hasText(typedNamespace.collection())) { + return resolveNamespace(typedNamespace.collection()); } - throw e; + + return resolveNamespace(fallback.apply(typedNamespace)); + } + + void addInsert(MongoNamespace namespace, Document document, SourceAwareDocument sourceDoc); + + void addUpdate(MongoNamespace namespace, boolean multi, Document query, Object update, UpdateOptions options); + + void addRemove(MongoNamespace namespace, boolean removeFirst, Document query, DeleteOptions options); + + void addReplace(MongoNamespace namespace, Document query, Document replacement, UpdateOptions options, + SourceAwareDocument sourceDoc); + + List> getAfterSaveCallables(); + } + + private static class SingleCollectionCollector implements WriteModelCollector> { + + private final List> writeModels = new ArrayList<>(); + private final List> afterSaveCallables = new ArrayList<>(); + private MongoNamespace namespace; + + public SingleCollectionCollector(MongoNamespace namespace) { + this.namespace = namespace; + } + + @Override + public MongoNamespace resolveNamespace(String collectionName) { + return namespace; + } + + @Override + public void addInsert(MongoNamespace namespace, Document document, SourceAwareDocument sourceDoc) { + writeModels.add(new InsertOneModel<>(document)); + afterSaveCallables.add(sourceDoc); + } + + @Override + public void addUpdate(MongoNamespace namespace, boolean multi, Document query, Object update, + UpdateOptions options) { + if (multi) { + writeModels.add(BulkWriteSupport.updateMany(query, update, options)); + } else { + writeModels.add(BulkWriteSupport.updateOne(query, update, options)); + } + } + + @Override + public void addRemove(MongoNamespace namespace, boolean removeFirst, Document query, DeleteOptions options) { + if (removeFirst) { + writeModels.add(BulkWriteSupport.removeOne(query, options)); + } else { + writeModels.add(BulkWriteSupport.removeMany(query, options)); + } + } + + @Override + public void addReplace(MongoNamespace namespace, Document query, Document replacement, UpdateOptions options, + SourceAwareDocument sourceDoc) { + writeModels.add(BulkWriteSupport.replaceOne(query, replacement, options)); + afterSaveCallables.add(sourceDoc); + } + + @Override + public List> getAfterSaveCallables() { + return afterSaveCallables; + } + + MongoNamespace getNamespace() { + return namespace; + } + + List> getWriteModels() { + return writeModels; + } + } + + private static class MultiCollectionCollector implements WriteModelCollector { + + private final List writeModels = new ArrayList<>(); + private final List> afterSaveCallables = new ArrayList<>(); + private final String defaultDatabaseName; + + public MultiCollectionCollector(String defaultDatabaseName) { + this.defaultDatabaseName = defaultDatabaseName; + } + + @Override + public MongoNamespace resolveNamespace(String collectionName) { + return new MongoNamespace(defaultDatabaseName, collectionName); + } + + @Override + public void addInsert(MongoNamespace namespace, Document document, SourceAwareDocument sourceDoc) { + writeModels.add(ClientNamespacedWriteModel.insertOne(namespace, document)); + afterSaveCallables.add(sourceDoc); + } + + @Override + public void addUpdate(MongoNamespace namespace, boolean multi, Document query, Object update, + UpdateOptions options) { + if (multi) { + writeModels.add(BulkWriteSupport.updateMany(namespace, query, update, options)); + } else { + writeModels.add(BulkWriteSupport.updateOne(namespace, query, update, options)); + } + } + + @Override + public void addRemove(MongoNamespace namespace, boolean removeFirst, Document query, DeleteOptions options) { + if (removeFirst) { + writeModels.add(BulkWriteSupport.removeOne(namespace, query, options)); + } else { + writeModels.add(BulkWriteSupport.removeMany(namespace, query, options)); + } + } + + @Override + public void addReplace(MongoNamespace namespace, Document query, Document replacement, UpdateOptions options, + SourceAwareDocument sourceDoc) { + writeModels.add(BulkWriteSupport.replaceOne(namespace, query, replacement, options)); + afterSaveCallables.add(sourceDoc); + } + + @Override + public List> getAfterSaveCallables() { + return afterSaveCallables; + } + + List getWriteModels() { + return writeModels; } } + } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java index 7ef926711..1b1a85701 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java @@ -688,9 +688,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, @Override public BulkOperationResult bulkWrite(Bulk bulk, BulkWriteOptions options) { - - ClientBulkWriteResult result = new BulkWriter(this).write(getDb().getName(), bulk, options); - return BulkOperationResult.from(result); + return new BulkWriter(this).write(getDb().getName(), bulk, options); } @Override diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveBulkWriter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveBulkWriter.java index 29dcc3231..3407393a8 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveBulkWriter.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveBulkWriter.java @@ -15,12 +15,14 @@ */ package org.springframework.data.mongodb.core; -import org.springframework.data.mongodb.core.bulk.BulkWriteOptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.util.ArrayList; import java.util.List; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; import org.bson.Document; import org.springframework.data.mongodb.core.QueryOperations.DeleteContext; @@ -34,12 +36,19 @@ import org.springframework.data.mongodb.core.bulk.BulkOperation.RemoveFirst; import org.springframework.data.mongodb.core.bulk.BulkOperation.Replace; import org.springframework.data.mongodb.core.bulk.BulkOperation.Update; import org.springframework.data.mongodb.core.bulk.BulkOperation.UpdateFirst; +import org.springframework.data.mongodb.core.bulk.BulkOperationContext.TypedNamespace; +import org.springframework.data.mongodb.core.bulk.BulkOperationResult; +import org.springframework.data.mongodb.core.bulk.BulkWriteOptions; import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity; import org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent; +import org.springframework.util.StringUtils; import com.mongodb.MongoNamespace; +import com.mongodb.bulk.BulkWriteResult; import com.mongodb.client.model.DeleteOptions; +import com.mongodb.client.model.InsertOneModel; import com.mongodb.client.model.UpdateOptions; +import com.mongodb.client.model.WriteModel; import com.mongodb.client.model.bulk.ClientBulkWriteOptions; import com.mongodb.client.model.bulk.ClientBulkWriteResult; import com.mongodb.client.model.bulk.ClientNamespacedWriteModel; @@ -48,7 +57,7 @@ import com.mongodb.client.model.bulk.ClientNamespacedWriteModel; * Internal API wrapping a {@link ReactiveMongoTemplate} to encapsulate {@link Bulk} handling using a reactive flow. * * @author Christoph Strobl - * @since 2026/02 + * @since 5.1 */ class ReactiveBulkWriter { @@ -58,51 +67,85 @@ class ReactiveBulkWriter { this.template = template; } - public Mono write(String defaultDatabase, Bulk bulk, BulkWriteOptions options) { - - return Flux.fromIterable(bulk.operations()).concatMap(bulkOp -> toWriteModelAndAfterSave(defaultDatabase, bulkOp)) - .collectList().flatMap(results -> { - - List writeModels = new ArrayList<>(); - List> afterSaveCallables = new ArrayList<>(); - - for (WriteModelAndAfterSave result : results) { - writeModels.add(result.model()); - if (result.afterSave() != null) { - afterSaveCallables.add(result.afterSave()); - } - } - - return template - .doWithClient(client -> client.bulkWrite(writeModels, - ClientBulkWriteOptions.clientBulkWriteOptions().ordered(options.getOrder().equals(BulkWriteOptions.Order.ORDERED)))) - .doOnSuccess(v -> afterSaveCallables.forEach(callable -> { - template.maybeEmitEvent( - new AfterSaveEvent<>(callable.source(), callable.document(), callable.collectionName())); - - })) - .flatMap(v -> Flux.concat(afterSaveCallables.stream().map(callable -> template - .maybeCallAfterSave(callable.source(), callable.document(), callable.collectionName())).toList()) - .then(Mono.just(v))); - }); + public Mono> write(String defaultDatabase, Bulk bulk, BulkWriteOptions options) { + + Set namespaces = bulk.operations().stream().map(it -> it.context().namespace()) + .collect(Collectors.toSet()); + if (namespaces.size() == 1) { + return writeToSingleCollection(defaultDatabase, bulk, options, namespaces.iterator().next()) + .map(r -> (BulkOperationResult) r); + } + return writeToMultipleCollections(defaultDatabase, bulk, options).map(r -> (BulkOperationResult) r); + } + + private Mono> writeToSingleCollection(String defaultDatabase, Bulk bulk, + BulkWriteOptions options, TypedNamespace namespace) { + + MongoNamespace mongoNamespace = new MongoNamespace(defaultDatabase, + StringUtils.hasText(namespace.collection()) ? namespace.collection() + : template.getCollectionName(namespace.type())); + + SingleCollectionCollector collector = new SingleCollectionCollector(mongoNamespace); + return buildWriteModelsReactive(bulk, collector).then(Mono.defer(() -> { + + String collectionName = collector.getNamespace().getCollectionName(); + List> afterSaveCallables = collector.getAfterSaveCallables(); + + return template + .createMono(collectionName, + col -> col.bulkWrite(collector.getWriteModels(), + new com.mongodb.client.model.BulkWriteOptions() + .ordered(options.getOrder().equals(BulkWriteOptions.Order.ORDERED)))) + .map( + BulkOperationResult::from) + .doOnSuccess( + v -> afterSaveCallables + .forEach(callable -> template.maybeEmitEvent(new AfterSaveEvent<>(callable.source(), + callable.document(), callable.collectionName())))) + .flatMap(result -> Flux.concat(afterSaveCallables.stream().map(callable -> template + .maybeCallAfterSave(callable.source(), callable.document(), callable.collectionName())).toList()) + .then(Mono.just(result))); + })); } - private Mono toWriteModelAndAfterSave(String defaultDatabase, BulkOperation bulkOp) { + private Mono> writeToMultipleCollections(String defaultDatabase, Bulk bulk, + BulkWriteOptions options) { + + MultiCollectionCollector collector = new MultiCollectionCollector(defaultDatabase); + return buildWriteModelsReactive(bulk, collector).then(Mono.defer(() -> { + + List writeModels = collector.getWriteModels(); + List> afterSaveCallables = collector.getAfterSaveCallables(); + + return template + .doWithClient(client -> client.bulkWrite(writeModels, + ClientBulkWriteOptions + .clientBulkWriteOptions().ordered(options.getOrder().equals(BulkWriteOptions.Order.ORDERED)))) + .map( + BulkOperationResult::from) + .doOnSuccess( + v -> afterSaveCallables + .forEach(callable -> template.maybeEmitEvent(new AfterSaveEvent<>(callable.source(), + callable.document(), callable.collectionName())))) + .flatMap(result -> Flux.concat(afterSaveCallables.stream().map(callable -> template + .maybeCallAfterSave(callable.source(), callable.document(), callable.collectionName())).toList()) + .then(Mono.just(result))); + })); + } - String collectionName = bulkOp.context().namespace().collection() != null - ? bulkOp.context().namespace().collection() - : template.getCollectionName(bulkOp.context().namespace().type()); + private Mono buildWriteModelsReactive(Bulk bulk, WriteModelCollector collector) { + return Flux.fromIterable(bulk.operations()).concatMap(bulkOp -> addOperationReactive(bulkOp, collector)).then(); + } - MongoNamespace mongoNamespace = new MongoNamespace(defaultDatabase, collectionName); + private Mono addOperationReactive(BulkOperation bulkOp, WriteModelCollector collector) { + + MongoNamespace namespace = collector.resolveNamespace(bulkOp, ns -> template.getCollectionName(ns.type())); if (bulkOp instanceof Insert insert) { - return template.prepareObjectForSaveReactive(collectionName, insert.value(), template.getConverter()) - .map(sourceAwareDocument -> { - ClientNamespacedWriteModel model = ClientNamespacedWriteModel.insertOne(mongoNamespace, - sourceAwareDocument.document()); - return new WriteModelAndAfterSave(model, sourceAwareDocument); - }); + return template + .prepareObjectForSaveReactive(namespace.getCollectionName(), insert.value(), template.getConverter()) + .doOnNext(sad -> collector.addInsert(namespace, sad.document(), toObject(sad))).then(); } if (bulkOp instanceof Update update) { @@ -119,15 +162,8 @@ class ReactiveBulkWriter { : updateContext.getMappedUpdate(entity); UpdateOptions updateOptions = updateContext.getUpdateOptions(domainType, update.query()); - if (multi) { - - ClientNamespacedWriteModel model = BulkWriteSupport.updateMany(mongoNamespace, mappedQuery, mappedUpdate, - updateOptions); - return Mono.just(new WriteModelAndAfterSave(model, null)); - } - ClientNamespacedWriteModel model = BulkWriteSupport.updateOne(mongoNamespace, mappedQuery, mappedUpdate, - updateOptions); - return Mono.just(new WriteModelAndAfterSave(model, null)); + collector.addUpdate(namespace, multi, mappedQuery, mappedUpdate, updateOptions); + return Mono.empty(); } if (bulkOp instanceof Remove remove) { @@ -138,37 +174,176 @@ class ReactiveBulkWriter { Document mappedQuery = deleteContext.getMappedQuery(template.getPersistentEntity(domainType)); DeleteOptions deleteOptions = deleteContext.getDeleteOptions(domainType); - if (remove instanceof RemoveFirst) { - ClientNamespacedWriteModel model = BulkWriteSupport.removeOne(mongoNamespace, mappedQuery, deleteOptions); - return Mono.just(new WriteModelAndAfterSave(model, null)); - } else { - ClientNamespacedWriteModel model = BulkWriteSupport.removeMany(mongoNamespace, mappedQuery, deleteOptions); - return Mono.just(new WriteModelAndAfterSave(model, null)); - } + collector.addRemove(namespace, remove instanceof RemoveFirst, mappedQuery, deleteOptions); + return Mono.empty(); } if (bulkOp instanceof Replace replace) { - return template.prepareObjectForSaveReactive(collectionName, replace.replacement(), template.getConverter()) - .map(sourceAwareDocument -> { + return template + .prepareObjectForSaveReactive(namespace.getCollectionName(), replace.replacement(), template.getConverter()) + .doOnNext(sad -> { UpdateContext updateContext = template.getQueryOperations().replaceSingleContext(replace.query(), - MappedDocument.of(sourceAwareDocument.document()), replace.upsert()); + MappedDocument.of(sad.document()), replace.upsert()); Document mappedQuery = updateContext .getMappedQuery(template.getPersistentEntity(replace.context().namespace().type())); UpdateOptions updateOptions = updateContext.getUpdateOptions(replace.context().namespace().type(), replace.query()); - ClientNamespacedWriteModel model = BulkWriteSupport.replaceOne(mongoNamespace, mappedQuery, - sourceAwareDocument.document(), updateOptions); - return new WriteModelAndAfterSave(model, sourceAwareDocument); - }); + collector.addReplace(namespace, mappedQuery, sad.document(), updateOptions, toObject(sad)); + }).then(); } return Mono.error(new IllegalStateException("Unknown bulk operation type: " + bulkOp.getClass())); } - private record WriteModelAndAfterSave(ClientNamespacedWriteModel model, SourceAwareDocument afterSave) { + @SuppressWarnings("unchecked") + private static SourceAwareDocument toObject(SourceAwareDocument sad) { + return (SourceAwareDocument) sad; + } + + private interface WriteModelCollector { + + MongoNamespace resolveNamespace(String collectionName); + + default MongoNamespace resolveNamespace(BulkOperation operation, Function fallback) { + + TypedNamespace typedNamespace = operation.context().namespace(); + if (StringUtils.hasText(typedNamespace.collection())) { + return resolveNamespace(typedNamespace.collection()); + } + return resolveNamespace(fallback.apply(typedNamespace)); + } + + void addInsert(MongoNamespace namespace, Document document, SourceAwareDocument sourceDoc); + + void addUpdate(MongoNamespace namespace, boolean multi, Document query, Object update, UpdateOptions options); + + void addRemove(MongoNamespace namespace, boolean removeFirst, Document query, DeleteOptions options); + + void addReplace(MongoNamespace namespace, Document query, Document replacement, UpdateOptions options, + SourceAwareDocument sourceDoc); + + List> getAfterSaveCallables(); + } + + private static class SingleCollectionCollector implements WriteModelCollector> { + + private final List> writeModels = new ArrayList<>(); + private final List> afterSaveCallables = new ArrayList<>(); + private final MongoNamespace namespace; + + SingleCollectionCollector(MongoNamespace namespace) { + this.namespace = namespace; + } + + @Override + public MongoNamespace resolveNamespace(String collectionName) { + return namespace; + } + + @Override + public void addInsert(MongoNamespace namespace, Document document, SourceAwareDocument sourceDoc) { + writeModels.add(new InsertOneModel<>(document)); + afterSaveCallables.add(sourceDoc); + } + + @Override + public void addUpdate(MongoNamespace namespace, boolean multi, Document query, Object update, + UpdateOptions options) { + if (multi) { + writeModels.add(BulkWriteSupport.updateMany(query, update, options)); + } else { + writeModels.add(BulkWriteSupport.updateOne(query, update, options)); + } + } + + @Override + public void addRemove(MongoNamespace namespace, boolean removeFirst, Document query, DeleteOptions options) { + if (removeFirst) { + writeModels.add(BulkWriteSupport.removeOne(query, options)); + } else { + writeModels.add(BulkWriteSupport.removeMany(query, options)); + } + } + + @Override + public void addReplace(MongoNamespace namespace, Document query, Document replacement, UpdateOptions options, + SourceAwareDocument sourceDoc) { + writeModels.add(BulkWriteSupport.replaceOne(query, replacement, options)); + afterSaveCallables.add(sourceDoc); + } + + @Override + public List> getAfterSaveCallables() { + return afterSaveCallables; + } + + MongoNamespace getNamespace() { + return namespace; + } + + List> getWriteModels() { + return writeModels; + } + } + + private static class MultiCollectionCollector implements WriteModelCollector { + + private final List writeModels = new ArrayList<>(); + private final List> afterSaveCallables = new ArrayList<>(); + private final String defaultDatabaseName; + + MultiCollectionCollector(String defaultDatabaseName) { + this.defaultDatabaseName = defaultDatabaseName; + } + + @Override + public MongoNamespace resolveNamespace(String collectionName) { + return new MongoNamespace(defaultDatabaseName, collectionName); + } + + @Override + public void addInsert(MongoNamespace namespace, Document document, SourceAwareDocument sourceDoc) { + writeModels.add(ClientNamespacedWriteModel.insertOne(namespace, document)); + afterSaveCallables.add(sourceDoc); + } + + @Override + public void addUpdate(MongoNamespace namespace, boolean multi, Document query, Object update, + UpdateOptions options) { + if (multi) { + writeModels.add(BulkWriteSupport.updateMany(namespace, query, update, options)); + } else { + writeModels.add(BulkWriteSupport.updateOne(namespace, query, update, options)); + } + } + + @Override + public void addRemove(MongoNamespace namespace, boolean removeFirst, Document query, DeleteOptions options) { + if (removeFirst) { + writeModels.add(BulkWriteSupport.removeOne(namespace, query, options)); + } else { + writeModels.add(BulkWriteSupport.removeMany(namespace, query, options)); + } + } + + @Override + public void addReplace(MongoNamespace namespace, Document query, Document replacement, UpdateOptions options, + SourceAwareDocument sourceDoc) { + writeModels.add(BulkWriteSupport.replaceOne(namespace, query, replacement, options)); + afterSaveCallables.add(sourceDoc); + } + + @Override + public List> getAfterSaveCallables() { + return afterSaveCallables; + } + + List getWriteModels() { + return writeModels; + } } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java index 70dd69611..04e41c458 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java @@ -842,9 +842,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati @Override public Mono> bulkWrite(Bulk bulk, BulkWriteOptions options) { - return doGetDatabase() - .flatMap(db -> new ReactiveBulkWriter(this).write(db.getName(), bulk, options)) - .map(BulkOperationResult::from); + return doGetDatabase().flatMap(db -> new ReactiveBulkWriter(this).write(db.getName(), bulk, options)); } public record SourceAwareDocument(T source, Document document, String collectionName) { diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/bulk/BulkOperationResult.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/bulk/BulkOperationResult.java index 84256c67e..6147e99fc 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/bulk/BulkOperationResult.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/bulk/BulkOperationResult.java @@ -15,6 +15,7 @@ */ package org.springframework.data.mongodb.core.bulk; +import com.mongodb.bulk.BulkWriteResult; import com.mongodb.client.model.bulk.ClientBulkWriteResult; /** @@ -73,6 +74,51 @@ public interface BulkOperationResult { }; } + /** + * Creates a {@link BulkOperationResult} from a MongoDB driver {@link BulkWriteResult}. + * + * @param result the driver result; must not be {@literal null}. + * @return a new {@link BulkOperationResult} wrapping the given result; never {@literal null}. + */ + static BulkOperationResult from(BulkWriteResult result) { + return new BulkOperationResult<>() { + @Override + public long insertCount() { + return result.getInsertedCount(); + } + + @Override + public long modifiedCount() { + return result.getModifiedCount(); + } + + @Override + public long deleteCount() { + return result.getDeletedCount(); + } + + @Override + public long upsertCount() { + return result.getUpserts().size(); + } + + @Override + public boolean acknowledged() { + return result.wasAcknowledged(); + } + + @Override + public long matchedCount() { + return result.getMatchedCount(); + } + + @Override + public BulkWriteResult rawResult() { + return result; + } + }; + } + /** * Returns the number of documents inserted. * diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateBulkUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateBulkUnitTests.java index eeb19401c..081549ffe 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateBulkUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateBulkUnitTests.java @@ -17,12 +17,14 @@ package org.springframework.data.mongodb.core; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import static org.springframework.data.mongodb.core.query.Criteria.where; import static org.springframework.data.mongodb.core.query.Query.query; @@ -134,11 +136,36 @@ class MongoTemplateBulkUnitTests { ops = Bulk.builder().inCollection("default-collection"); } + @Test // GH-5087 + void delegatesToCollectionOnSingleNamespace() { + + Bulk bulk = ops.insert(new BaseDoc()).build(); + + template.bulkWrite(bulk, BulkWriteOptions.ordered()); + + verify(client).getDatabase(anyString()); + verifyNoMoreInteractions(client); + verify(collection).bulkWrite(anyList(), any()); + } + + @Test // GH-5087 + void delegatesToClientOnMultiNamespace() { + + Bulk bulk = ops.insert(new BaseDoc()).inCollection("other-collection").insert(new BaseDoc()).build(); + + template.bulkWrite(bulk, BulkWriteOptions.ordered()); + + verify(client).bulkWrite(anyList(), any()); + verifyNoInteractions(collection); + } + @Test // GH-5087 void updateOneShouldUseCollationWhenPresent() { Bulk bulk = ops - .updateOne(new BasicQuery("{}").collation(Collation.of("de")), new Update().set("lastName", "targaryen")) + .updateOne(new BasicQuery("{}").collation(Collation.of("de")), new Update().set("lastName", "targaryen")) // + .inCollection("other-collection") + .updateOne(new BasicQuery("{}"), new Update().set("items.$.documents.0.fileId", "new-id")) // .build(); template.bulkWrite(bulk, BulkWriteOptions.ordered()); @@ -155,6 +182,8 @@ class MongoTemplateBulkUnitTests { Bulk bulk = ops .updateMulti(new BasicQuery("{}").collation(Collation.of("de")), new Update().set("lastName", "targaryen")) + .inCollection("other-collection") + .updateOne(new BasicQuery("{}"), new Update().set("items.$.documents.0.fileId", "new-id")) // .build(); template.bulkWrite(bulk, BulkWriteOptions.ordered()); @@ -169,7 +198,9 @@ class MongoTemplateBulkUnitTests { @Test // GH-5087 void removeShouldUseCollationWhenPresent() { - Bulk bulk = ops.remove(new BasicQuery("{}").collation(Collation.of("de"))).build(); + Bulk bulk = ops.remove(new BasicQuery("{}").collation(Collation.of("de"))).inCollection("other-collection") + .updateOne(new BasicQuery("{}"), new Update().set("items.$.documents.0.fileId", "new-id")) // + .build(); template.bulkWrite(bulk, BulkWriteOptions.ordered()); @@ -183,7 +214,10 @@ class MongoTemplateBulkUnitTests { @Test // GH-5087 void replaceOneShouldUseCollationWhenPresent() { - Bulk bulk = ops.replaceOne(new BasicQuery("{}").collation(Collation.of("de")), new SomeDomainType()).build(); + Bulk bulk = ops.replaceOne(new BasicQuery("{}").collation(Collation.of("de")), new SomeDomainType()) // + .inCollection("other-collection") + .updateOne(new BasicQuery("{}"), new Update().set("items.$.documents.0.fileId", "new-id")) // + .build(); template.bulkWrite(bulk, BulkWriteOptions.ordered()); @@ -198,7 +232,10 @@ class MongoTemplateBulkUnitTests { void bulkUpdateShouldMapQueryAndUpdateCorrectly() { Bulk bulk = ops.inCollection("test", SomeDomainType.class) - .updateOne(query(where("firstName").is("danerys")), Update.update("firstName", "queen danerys")).build(); + .updateOne(query(where("firstName").is("danerys")), Update.update("firstName", "queen danerys")) // + .inCollection("other-collection") + .updateOne(new BasicQuery("{}"), new Update().set("items.$.documents.0.fileId", "new-id")) // + .build(); template.bulkWrite(bulk, BulkWriteOptions.ordered()); verify(client).bulkWrite(captor.capture(), any()); @@ -212,7 +249,10 @@ class MongoTemplateBulkUnitTests { @Test // GH-5087 void bulkRemoveShouldMapQueryCorrectly() { - Bulk bulk = ops.inCollection("test", SomeDomainType.class).remove(query(where("firstName").is("danerys"))).build(); + Bulk bulk = ops.inCollection("test", SomeDomainType.class).remove(query(where("firstName").is("danerys"))) + .inCollection("other-collection") + .updateOne(new BasicQuery("{}"), new Update().set("items.$.documents.0.fileId", "new-id")) // + .build(); template.bulkWrite(bulk, BulkWriteOptions.ordered()); verify(client).bulkWrite(captor.capture(), any()); @@ -230,7 +270,9 @@ class MongoTemplateBulkUnitTests { replacement.lastName = "Kim"; Bulk bulk = ops.inCollection("test", SomeDomainType.class) - .replaceOne(query(where("firstName").is("danerys")), replacement).build(); + .replaceOne(query(where("firstName").is("danerys")), replacement).inCollection("other-collection") + .updateOne(new BasicQuery("{}"), new Update().set("items.$.documents.0.fileId", "new-id")) // + .build(); template.bulkWrite(bulk, BulkWriteOptions.ordered()); verify(client).bulkWrite(captor.capture(), any()); @@ -247,7 +289,9 @@ class MongoTemplateBulkUnitTests { void bulkInsertInvokesEntityCallbacks() { Person entity = new Person("init"); - Bulk bulk = ops.inCollection("person").insert(entity).build(); + Bulk bulk = ops.inCollection("person").insert(entity).inCollection("other-collection") + .updateOne(new BasicQuery("{}"), new Update().set("items.$.documents.0.fileId", "new-id")) // + .build(); template.bulkWrite(bulk, BulkWriteOptions.ordered()); @@ -296,13 +340,16 @@ class MongoTemplateBulkUnitTests { MockingDetails mockingDetails = Mockito.mockingDetails(eventPublisher); Collection invocations = mockingDetails.getInvocations(); assertThat(invocations).hasSize(3).extracting(tt -> tt.getArgument(0)).map(Object::getClass) - .containsExactly((Class) BeforeConvertEvent.class, (Class) BeforeSaveEvent.class, (Class) AfterSaveEvent.class); + .containsExactly((Class) BeforeConvertEvent.class, (Class) BeforeSaveEvent.class, (Class) AfterSaveEvent.class); } @Test // GH-5087 void appliesArrayFilterWhenPresent() { - Bulk bulk = ops.updateOne(new BasicQuery("{}"), new Update().filterArray(where("element").gte(100))).build(); + Bulk bulk = ops.updateOne(new BasicQuery("{}"), new Update().filterArray(where("element").gte(100))) // + .inCollection("other-collection") + .updateOne(new BasicQuery("{}"), new Update().set("items.$.documents.0.fileId", "new-id")) // + .build(); template.bulkWrite(bulk, BulkWriteOptions.ordered()); verify(client).bulkWrite(captor.capture(), any()); @@ -317,7 +364,10 @@ class MongoTemplateBulkUnitTests { @Test // GH-5087 void shouldRetainNestedArrayPathWithPlaceholdersForNoMatchingPaths() { - Bulk bulk = ops.updateOne(new BasicQuery("{}"), new Update().set("items.$.documents.0.fileId", "new-id")).build(); + Bulk bulk = ops.updateOne(new BasicQuery("{}"), new Update().set("items.$.documents.0.fileId", "new-id")) // + .inCollection("other-collection") + .updateOne(new BasicQuery("{}"), new Update().set("items.$.documents.0.fileId", "new-id")) // + .build(); template.bulkWrite(bulk, BulkWriteOptions.ordered()); verify(client).bulkWrite(captor.capture(), any()); @@ -332,7 +382,10 @@ class MongoTemplateBulkUnitTests { void shouldRetainNestedArrayPathWithPlaceholdersForMappedEntity() { Bulk bulk = ops.inCollection("collection-1", OrderTest.class) - .updateOne(new BasicQuery("{}"), Update.update("items.$.documents.0.fileId", "file-id")).build(); + .updateOne(new BasicQuery("{}"), Update.update("items.$.documents.0.fileId", "file-id")) // + .inCollection("collection-2", OrderTest.class) + .updateOne(new BasicQuery("{}"), Update.update("items.$.documents.0.fileId", "file-id")) // + .build(); template.bulkWrite(bulk, BulkWriteOptions.ordered()); verify(client).bulkWrite(captor.capture(), any()); diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateBulkUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateBulkUnitTests.java index 4af9b209e..41f638591 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateBulkUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateBulkUnitTests.java @@ -28,6 +28,8 @@ import static org.mockito.Mockito.when; import static org.springframework.data.mongodb.core.query.Criteria.where; import static org.springframework.data.mongodb.core.query.Query.query; +import reactor.core.publisher.Mono; + import java.util.Collection; import java.util.List; @@ -59,12 +61,12 @@ import org.springframework.data.mongodb.core.convert.MappingMongoConverter; import org.springframework.data.mongodb.core.convert.MongoConverter; import org.springframework.data.mongodb.core.mapping.Field; import org.springframework.data.mongodb.core.mapping.MongoMappingContext; -import org.springframework.data.mongodb.core.mapping.event.ReactiveAfterSaveCallback; -import org.springframework.data.mongodb.core.mapping.event.ReactiveBeforeConvertCallback; -import org.springframework.data.mongodb.core.mapping.event.ReactiveBeforeSaveCallback; import org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent; import org.springframework.data.mongodb.core.mapping.event.BeforeConvertEvent; import org.springframework.data.mongodb.core.mapping.event.BeforeSaveEvent; +import org.springframework.data.mongodb.core.mapping.event.ReactiveAfterSaveCallback; +import org.springframework.data.mongodb.core.mapping.event.ReactiveBeforeConvertCallback; +import org.springframework.data.mongodb.core.mapping.event.ReactiveBeforeSaveCallback; import org.springframework.data.mongodb.core.query.BasicQuery; import org.springframework.data.mongodb.core.query.Collation; import org.springframework.data.mongodb.core.query.Update; @@ -82,10 +84,9 @@ import com.mongodb.reactivestreams.client.MongoClient; import com.mongodb.reactivestreams.client.MongoCollection; import com.mongodb.reactivestreams.client.MongoDatabase; -import reactor.core.publisher.Mono; - /** - * Unit tests for {@link ReactiveMongoOperations#bulkWrite}. + * Unit tests for {@link ReactiveMongoOperations#bulkWrite}. Tests use at least two collections so that + * {@code client.bulkWrite} is exercised (multi-collection path). * * @author Christoph Strobl */ @@ -93,6 +94,13 @@ import reactor.core.publisher.Mono; @MockitoSettings(strictness = Strictness.LENIENT) class ReactiveMongoTemplateBulkUnitTests { + private static final String OTHER_COLLECTION = "other-collection"; + + /** Simple insert in another collection so bulk has multiple namespaces and client.bulkWrite is used. */ + private static Document simpleInsertInOtherCollection() { + return new Document("_id", 1); + } + private ReactiveMongoTemplate template; @Mock MongoClient client; @Mock MongoDatabase database; @@ -139,10 +147,33 @@ class ReactiveMongoTemplateBulkUnitTests { ops = Bulk.builder().inCollection("default-collection"); } + @Test // GH-5087 + void delegatesToCollectionOnSingleNamespace() { + + Bulk bulk = ops.insert(new BaseDoc()).build(); + + template.bulkWrite(bulk, BulkWriteOptions.ordered()).subscribe(); + + verifyNoInteractions(client); + verify(collection).bulkWrite(anyList(), any()); + } + + @Test // GH-5087 + void delegatesToClientOnMultiNamespace() { + + Bulk bulk = ops.insert(new BaseDoc()).inCollection("other-collection").insert(new BaseDoc()).build(); + + template.bulkWrite(bulk, BulkWriteOptions.ordered()).subscribe(); + + verify(client).bulkWrite(anyList(), any()); + verifyNoInteractions(collection); + } + @Test // GH-5087 void updateOneShouldUseCollationWhenPresent() { - Bulk bulk = ops + Bulk bulk = Bulk.builder().inCollection(OTHER_COLLECTION).insert(simpleInsertInOtherCollection()) + .inCollection("default-collection") .updateOne(new BasicQuery("{}").collation(Collation.of("de")), new Update().set("lastName", "targaryen")) .build(); @@ -151,14 +182,15 @@ class ReactiveMongoTemplateBulkUnitTests { verify(client).bulkWrite(captor.capture(), any()); assertThat( - extractWriteModel(ConcreteClientUpdateOneModel.class, captor.getValue().get(0)).getOptions().getCollation()) + extractWriteModel(ConcreteClientUpdateOneModel.class, captor.getValue().get(1)).getOptions().getCollation()) .contains(com.mongodb.client.model.Collation.builder().locale("de").build()); } @Test // GH-5087 void updateManyShouldUseCollationWhenPresent() { - Bulk bulk = ops + Bulk bulk = Bulk.builder().inCollection(OTHER_COLLECTION).insert(simpleInsertInOtherCollection()) + .inCollection("default-collection") .updateMulti(new BasicQuery("{}").collation(Collation.of("de")), new Update().set("lastName", "targaryen")) .build(); @@ -167,49 +199,53 @@ class ReactiveMongoTemplateBulkUnitTests { verify(client).bulkWrite(captor.capture(), any()); assertThat( - extractWriteModel(ConcreteClientUpdateManyModel.class, captor.getValue().get(0)).getOptions().getCollation()) + extractWriteModel(ConcreteClientUpdateManyModel.class, captor.getValue().get(1)).getOptions().getCollation()) .contains(com.mongodb.client.model.Collation.builder().locale("de").build()); } @Test // GH-5087 void removeShouldUseCollationWhenPresent() { - Bulk bulk = ops.remove(new BasicQuery("{}").collation(Collation.of("de"))).build(); + Bulk bulk = Bulk.builder().inCollection(OTHER_COLLECTION).insert(simpleInsertInOtherCollection()) + .inCollection("default-collection").remove(new BasicQuery("{}").collation(Collation.of("de"))).build(); template.bulkWrite(bulk, BulkWriteOptions.ordered()).block(); verify(client).bulkWrite(captor.capture(), any()); assertThat( - extractWriteModel(ConcreteClientDeleteManyModel.class, captor.getValue().get(0)).getOptions().getCollation()) + extractWriteModel(ConcreteClientDeleteManyModel.class, captor.getValue().get(1)).getOptions().getCollation()) .contains(com.mongodb.client.model.Collation.builder().locale("de").build()); } @Test // GH-5087 void replaceOneShouldUseCollationWhenPresent() { - Bulk bulk = ops.replaceOne(new BasicQuery("{}").collation(Collation.of("de")), new SomeDomainType()).build(); + Bulk bulk = Bulk.builder().inCollection(OTHER_COLLECTION).insert(simpleInsertInOtherCollection()) + .inCollection("default-collection") + .replaceOne(new BasicQuery("{}").collation(Collation.of("de")), new SomeDomainType()).build(); template.bulkWrite(bulk, BulkWriteOptions.ordered()).block(); verify(client).bulkWrite(captor.capture(), any()); assertThat( - extractWriteModel(ConcreteClientReplaceOneModel.class, captor.getValue().get(0)).getOptions().getCollation()) + extractWriteModel(ConcreteClientReplaceOneModel.class, captor.getValue().get(1)).getOptions().getCollation()) .contains(com.mongodb.client.model.Collation.builder().locale("de").build()); } @Test // GH-5087 void bulkUpdateShouldMapQueryAndUpdateCorrectly() { - Bulk bulk = ops.inCollection("test", SomeDomainType.class) + Bulk bulk = Bulk.builder().inCollection(OTHER_COLLECTION).insert(simpleInsertInOtherCollection()) + .inCollection("test", SomeDomainType.class) .updateOne(query(where("firstName").is("danerys")), Update.update("firstName", "queen danerys")).build(); template.bulkWrite(bulk, BulkWriteOptions.ordered()).block(); verify(client).bulkWrite(captor.capture(), any()); ConcreteClientUpdateOneModel updateModel = extractWriteModel(ConcreteClientUpdateOneModel.class, - captor.getValue().get(0)); + captor.getValue().get(1)); assertThat(updateModel.getFilter()).isEqualTo(new Document("first_name", "danerys")); assertThat(updateModel.getUpdate()).contains(new Document("$set", new Document("first_name", "queen danerys"))); } @@ -217,13 +253,14 @@ class ReactiveMongoTemplateBulkUnitTests { @Test // GH-5087 void bulkRemoveShouldMapQueryCorrectly() { - Bulk bulk = ops.inCollection("test", SomeDomainType.class).remove(query(where("firstName").is("danerys"))).build(); + Bulk bulk = Bulk.builder().inCollection(OTHER_COLLECTION).insert(simpleInsertInOtherCollection()) + .inCollection("test", SomeDomainType.class).remove(query(where("firstName").is("danerys"))).build(); template.bulkWrite(bulk, BulkWriteOptions.ordered()).block(); verify(client).bulkWrite(captor.capture(), any()); ConcreteClientDeleteManyModel deleteModel = extractWriteModel(ConcreteClientDeleteManyModel.class, - captor.getValue().get(0)); + captor.getValue().get(1)); assertThat(deleteModel.getFilter()).isEqualTo(new Document("first_name", "danerys")); } @@ -234,25 +271,26 @@ class ReactiveMongoTemplateBulkUnitTests { replacement.firstName = "Minsu"; replacement.lastName = "Kim"; - Bulk bulk = ops.inCollection("test", SomeDomainType.class) - .replaceOne(query(where("firstName").is("danerys")), replacement).build(); + Bulk bulk = Bulk.builder().inCollection(OTHER_COLLECTION).insert(simpleInsertInOtherCollection()) + .inCollection("test", SomeDomainType.class).replaceOne(query(where("firstName").is("danerys")), replacement) + .build(); template.bulkWrite(bulk, BulkWriteOptions.ordered()).block(); verify(client).bulkWrite(captor.capture(), any()); ConcreteClientReplaceOneModel replaceModel = extractWriteModel(ConcreteClientReplaceOneModel.class, - captor.getValue().get(0)); + captor.getValue().get(1)); assertThat(replaceModel.getFilter()).isEqualTo(new Document("first_name", "danerys")); assertThat(replaceModel.getReplacement()).asInstanceOf(InstanceOfAssertFactories.map(String.class, Object.class)) - .containsEntry("first_name", "Minsu") - .containsEntry("lastName", "Kim"); + .containsEntry("first_name", "Minsu").containsEntry("lastName", "Kim"); } @Test // GH-5087 void bulkInsertInvokesEntityCallbacks() { Person entity = new Person("init"); - Bulk bulk = ops.inCollection("person").insert(entity).build(); + Bulk bulk = Bulk.builder().inCollection(OTHER_COLLECTION).insert(simpleInsertInOtherCollection()) + .inCollection("person").insert(entity).build(); template.bulkWrite(bulk, BulkWriteOptions.ordered()).block(); @@ -265,7 +303,7 @@ class ReactiveMongoTemplateBulkUnitTests { verify(client).bulkWrite(captor.capture(), any()); ConcreteClientInsertOneModel insertModel = extractWriteModel(ConcreteClientInsertOneModel.class, - captor.getValue().get(0)); + captor.getValue().get(1)); assertThat(insertModel.getDocument()).asInstanceOf(InstanceOfAssertFactories.map(String.class, Object.class)) .containsEntry("firstName", "after-save"); } @@ -274,46 +312,57 @@ class ReactiveMongoTemplateBulkUnitTests { @SuppressWarnings("rawtypes") void bulkReplaceOneEmitsEventsCorrectly() { - ops.replaceOne(query(where("firstName").is("danerys")), new SomeDomainType()); + Bulk bulk = Bulk.builder().inCollection(OTHER_COLLECTION).insert(simpleInsertInOtherCollection()) + .inCollection("default-collection").replaceOne(query(where("firstName").is("danerys")), new SomeDomainType()) + .build(); verifyNoInteractions(applicationContext); - template.bulkWrite(ops.build(), BulkWriteOptions.ordered()).block(); + template.bulkWrite(bulk, BulkWriteOptions.ordered()).block(); MockingDetails mockingDetails = Mockito.mockingDetails(applicationContext); Collection invocations = mockingDetails.getInvocations(); - assertThat(invocations).hasSize(3).extracting(tt -> tt.getArgument(0)).map(Object::getClass) - .containsExactly((Class) BeforeConvertEvent.class, (Class) BeforeSaveEvent.class, (Class) AfterSaveEvent.class); + // Insert in other-collection (3 events) + replace in default-collection (3 events) + assertThat(invocations).hasSize(6).extracting(tt -> tt.getArgument(0)).map(Object::getClass) + .containsExactlyInAnyOrder((Class) BeforeConvertEvent.class, (Class) BeforeConvertEvent.class, + (Class) BeforeSaveEvent.class, (Class) BeforeSaveEvent.class, (Class) AfterSaveEvent.class, + (Class) AfterSaveEvent.class); } @Test // GH-5087 @SuppressWarnings("rawtypes") void bulkInsertEmitsEventsCorrectly() { - ops.insert(new SomeDomainType()); + Bulk bulk = Bulk.builder().inCollection(OTHER_COLLECTION).insert(simpleInsertInOtherCollection()) + .inCollection("default-collection").insert(new SomeDomainType()).build(); verify(applicationContext, never()).publishEvent(any(BeforeConvertEvent.class)); verify(applicationContext, never()).publishEvent(any(BeforeSaveEvent.class)); verify(applicationContext, never()).publishEvent(any(AfterSaveEvent.class)); - template.bulkWrite(ops.build(), BulkWriteOptions.ordered()).block(); + template.bulkWrite(bulk, BulkWriteOptions.ordered()).block(); MockingDetails mockingDetails = Mockito.mockingDetails(applicationContext); Collection invocations = mockingDetails.getInvocations(); - assertThat(invocations).hasSize(3).extracting(tt -> tt.getArgument(0)).map(Object::getClass) - .containsExactly((Class) BeforeConvertEvent.class, (Class) BeforeSaveEvent.class, (Class) AfterSaveEvent.class); + // Two inserts (other-collection + default-collection) each emit BeforeConvert, BeforeSave, AfterSave + assertThat(invocations).hasSize(6).extracting(tt -> tt.getArgument(0)).map(Object::getClass) + .containsExactlyInAnyOrder((Class) BeforeConvertEvent.class, (Class) BeforeConvertEvent.class, + (Class) BeforeSaveEvent.class, (Class) BeforeSaveEvent.class, (Class) AfterSaveEvent.class, + (Class) AfterSaveEvent.class); } @Test // GH-5087 void appliesArrayFilterWhenPresent() { - Bulk bulk = ops.updateOne(new BasicQuery("{}"), new Update().filterArray(where("element").gte(100))).build(); + Bulk bulk = Bulk.builder().inCollection(OTHER_COLLECTION).insert(simpleInsertInOtherCollection()) + .inCollection("default-collection") + .updateOne(new BasicQuery("{}"), new Update().filterArray(where("element").gte(100))).build(); template.bulkWrite(bulk, BulkWriteOptions.ordered()).block(); verify(client).bulkWrite(captor.capture(), any()); ConcreteClientUpdateOneModel updateModel = extractWriteModel(ConcreteClientUpdateOneModel.class, - captor.getValue().get(0)); + captor.getValue().get(1)); assertThat(updateModel.getOptions().getArrayFilters().get()).satisfies(it -> { assertThat((List) it).containsExactly(new Document("element", new Document("$gte", 100))); }); @@ -322,13 +371,15 @@ class ReactiveMongoTemplateBulkUnitTests { @Test // GH-5087 void shouldRetainNestedArrayPathWithPlaceholdersForNoMatchingPaths() { - Bulk bulk = ops.updateOne(new BasicQuery("{}"), new Update().set("items.$.documents.0.fileId", "new-id")).build(); + Bulk bulk = Bulk.builder().inCollection(OTHER_COLLECTION).insert(simpleInsertInOtherCollection()) + .inCollection("default-collection") + .updateOne(new BasicQuery("{}"), new Update().set("items.$.documents.0.fileId", "new-id")).build(); template.bulkWrite(bulk, BulkWriteOptions.ordered()).block(); verify(client).bulkWrite(captor.capture(), any()); ConcreteClientUpdateOneModel updateModel = extractWriteModel(ConcreteClientUpdateOneModel.class, - captor.getValue().get(0)); + captor.getValue().get(1)); assertThat(updateModel.getUpdate()) .contains(new Document("$set", new Document("items.$.documents.0.fileId", "new-id"))); } @@ -336,14 +387,15 @@ class ReactiveMongoTemplateBulkUnitTests { @Test // GH-5087 void shouldRetainNestedArrayPathWithPlaceholdersForMappedEntity() { - Bulk bulk = ops.inCollection("collection-1", OrderTest.class) + Bulk bulk = Bulk.builder().inCollection(OTHER_COLLECTION).insert(simpleInsertInOtherCollection()) + .inCollection("collection-1", OrderTest.class) .updateOne(new BasicQuery("{}"), Update.update("items.$.documents.0.fileId", "file-id")).build(); template.bulkWrite(bulk, BulkWriteOptions.ordered()).block(); verify(client).bulkWrite(captor.capture(), any()); ConcreteClientUpdateOneModel updateModel = extractWriteModel(ConcreteClientUpdateOneModel.class, - captor.getValue().get(0)); + captor.getValue().get(1)); assertThat(updateModel.getUpdate()) .contains(new Document("$set", new Document("items.$.documents.0.the_file_id", "file-id"))); }