Browse Source

Route single and multi collection bulk writes.

pull/5169/head
Christoph Strobl 4 weeks ago
parent
commit
e94de2f9ad
No known key found for this signature in database
GPG Key ID: E6054036D0C37A4B
  1. 73
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/BulkWriteSupport.java
  2. 270
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/BulkWriter.java
  3. 4
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java
  4. 303
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveBulkWriter.java
  5. 4
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java
  6. 46
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/bulk/BulkOperationResult.java
  7. 75
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateBulkUnitTests.java
  8. 130
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateBulkUnitTests.java

73
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/BulkWriteSupport.java

@ -17,15 +17,23 @@ package org.springframework.data.mongodb.core; @@ -17,15 +17,23 @@ package org.springframework.data.mongodb.core;
import java.util.List;
import com.mongodb.client.model.bulk.ClientDeleteOneOptions;
import com.mongodb.client.model.bulk.ClientReplaceOneOptions;
import org.bson.Document;
import org.springframework.util.ClassUtils;
import com.mongodb.MongoNamespace;
import com.mongodb.client.model.DeleteManyModel;
import com.mongodb.client.model.DeleteOneModel;
import com.mongodb.client.model.DeleteOptions;
import com.mongodb.client.model.ReplaceOneModel;
import com.mongodb.client.model.ReplaceOptions;
import com.mongodb.client.model.UpdateManyModel;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;
import com.mongodb.client.model.bulk.ClientDeleteManyOptions;
import com.mongodb.client.model.bulk.ClientDeleteOneOptions;
import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
import com.mongodb.client.model.bulk.ClientReplaceOneOptions;
import com.mongodb.client.model.bulk.ClientUpdateManyOptions;
import com.mongodb.client.model.bulk.ClientUpdateOneOptions;
@ -34,6 +42,50 @@ import com.mongodb.client.model.bulk.ClientUpdateOneOptions; @@ -34,6 +42,50 @@ import com.mongodb.client.model.bulk.ClientUpdateOneOptions;
*/
abstract class BulkWriteSupport {
static WriteModel<Document> updateMany(Document query, Object update, UpdateOptions updateOptions) {
if (update instanceof List<?> pipeline) {
return new UpdateManyModel<>(query, (List<Document>) pipeline, updateOptions);
} else if (update instanceof Document updateDocument) {
return new UpdateManyModel<>(query, updateDocument, updateOptions);
} else {
throw new IllegalArgumentException(
"Update needs to be either a List or a Document, but was [%s]".formatted(ClassUtils.getUserClass(update)));
}
}
static WriteModel<Document> updateOne(Document query, Object update, UpdateOptions updateOptions) {
if (update instanceof List<?> pipeline) {
return new UpdateOneModel<>(query, (List<Document>) pipeline, updateOptions);
} else if (update instanceof Document updateDocument) {
return new UpdateOneModel<>(query, updateDocument, updateOptions);
} else {
throw new IllegalArgumentException(
"Update needs to be either a List or a Document, but was [%s]".formatted(ClassUtils.getUserClass(update)));
}
}
static WriteModel<Document> removeMany(Document query, DeleteOptions deleteOptions) {
return new DeleteManyModel<>(query, deleteOptions);
}
static WriteModel<Document> removeOne(Document query, DeleteOptions deleteOptions) {
return new DeleteOneModel<>(query, deleteOptions);
}
static WriteModel<Document> replaceOne(Document query, Document replacement, UpdateOptions updateOptions) {
ReplaceOptions replaceOptions = new ReplaceOptions();
replaceOptions.collation(updateOptions.getCollation());
replaceOptions.upsert(updateOptions.isUpsert());
replaceOptions.sort(updateOptions.getSort());
replaceOptions.hint(updateOptions.getHint());
replaceOptions.hintString(updateOptions.getHintString());
return new ReplaceOneModel<>(query, replacement, replaceOptions);
}
static ClientNamespacedWriteModel updateMany(MongoNamespace namespace, Document query, Object update,
UpdateOptions updateOptions) {
@ -46,8 +98,11 @@ abstract class BulkWriteSupport { @@ -46,8 +98,11 @@ abstract class BulkWriteSupport {
if (update instanceof List<?> pipeline) {
return ClientNamespacedWriteModel.updateMany(namespace, query, (List<Document>) pipeline, updateManyOptions);
} else if (update instanceof Document updateDocument) {
return ClientNamespacedWriteModel.updateMany(namespace, query, updateDocument, updateManyOptions);
} else {
return ClientNamespacedWriteModel.updateMany(namespace, query, (Document) update, updateManyOptions);
throw new IllegalArgumentException(
"Update needs to be either a List or a Document, but was [%s]".formatted(ClassUtils.getUserClass(update)));
}
}
@ -64,8 +119,11 @@ abstract class BulkWriteSupport { @@ -64,8 +119,11 @@ abstract class BulkWriteSupport {
if (update instanceof List<?> pipeline) {
return ClientNamespacedWriteModel.updateOne(namespace, query, (List<Document>) pipeline, updateOneOptions);
} else if (update instanceof Document updateDocument) {
return ClientNamespacedWriteModel.updateOne(namespace, query, updateDocument, updateOneOptions);
} else {
return ClientNamespacedWriteModel.updateOne(namespace, query, (Document) update, updateOneOptions);
throw new IllegalArgumentException(
"Update needs to be either a List or a Document, but was [%s]".formatted(ClassUtils.getUserClass(update)));
}
}
@ -87,11 +145,11 @@ abstract class BulkWriteSupport { @@ -87,11 +145,11 @@ abstract class BulkWriteSupport {
clientDeleteOneOptions.hint(deleteOptions.getHint());
clientDeleteOneOptions.hintString(deleteOptions.getHintString());
return ClientNamespacedWriteModel.deleteOne(namespace, query, clientDeleteOneOptions);
}
static ClientNamespacedWriteModel replaceOne(MongoNamespace namespace, Document query, Document replacement, UpdateOptions updateOptions) {
static ClientNamespacedWriteModel replaceOne(MongoNamespace namespace, Document query, Document replacement,
UpdateOptions updateOptions) {
ClientReplaceOneOptions replaceOptions = ClientReplaceOneOptions.clientReplaceOneOptions();
replaceOptions.sort(updateOptions.getSort());
@ -100,7 +158,6 @@ abstract class BulkWriteSupport { @@ -100,7 +158,6 @@ abstract class BulkWriteSupport {
replaceOptions.hintString(updateOptions.getHintString());
replaceOptions.collation(updateOptions.getCollation());
return ClientNamespacedWriteModel.replaceOne(namespace, query,
replacement, replaceOptions);
return ClientNamespacedWriteModel.replaceOne(namespace, query, replacement, replaceOptions);
}
}

270
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/BulkWriter.java

@ -17,6 +17,9 @@ package org.springframework.data.mongodb.core; @@ -17,6 +17,9 @@ package org.springframework.data.mongodb.core;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.bson.Document;
import org.springframework.dao.DataAccessException;
@ -24,7 +27,6 @@ import org.springframework.data.mongodb.core.MongoTemplate.SourceAwareDocument; @@ -24,7 +27,6 @@ import org.springframework.data.mongodb.core.MongoTemplate.SourceAwareDocument;
import org.springframework.data.mongodb.core.QueryOperations.DeleteContext;
import org.springframework.data.mongodb.core.QueryOperations.UpdateContext;
import org.springframework.data.mongodb.core.bulk.Bulk;
import org.springframework.data.mongodb.core.bulk.BulkWriteOptions;
import org.springframework.data.mongodb.core.bulk.BulkOperation;
import org.springframework.data.mongodb.core.bulk.BulkOperation.Insert;
import org.springframework.data.mongodb.core.bulk.BulkOperation.Remove;
@ -32,22 +34,29 @@ import org.springframework.data.mongodb.core.bulk.BulkOperation.RemoveFirst; @@ -32,22 +34,29 @@ import org.springframework.data.mongodb.core.bulk.BulkOperation.RemoveFirst;
import org.springframework.data.mongodb.core.bulk.BulkOperation.Replace;
import org.springframework.data.mongodb.core.bulk.BulkOperation.Update;
import org.springframework.data.mongodb.core.bulk.BulkOperation.UpdateFirst;
import org.springframework.data.mongodb.core.bulk.BulkOperationContext.TypedNamespace;
import org.springframework.data.mongodb.core.bulk.BulkOperationResult;
import org.springframework.data.mongodb.core.bulk.BulkWriteOptions;
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
import org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent;
import org.springframework.util.StringUtils;
import com.mongodb.MongoBulkWriteException;
import com.mongodb.MongoNamespace;
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.model.DeleteOptions;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;
import com.mongodb.client.model.bulk.ClientBulkWriteOptions;
import com.mongodb.client.model.bulk.ClientBulkWriteResult;
import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
/**
* Internal API wrapping a {@link MongoTemplate} to encapsulate {@link Bulk} handling.
*
*
* @author Christoph Strobl
* @since 2026/02
* @since 5.1
*/
class BulkWriter {
@ -57,24 +66,84 @@ class BulkWriter { @@ -57,24 +66,84 @@ class BulkWriter {
this.template = template;
}
public ClientBulkWriteResult write(String defaultDatabase, Bulk bulk, BulkWriteOptions options) {
public BulkOperationResult<?> write(String defaultDatabase, Bulk bulk, BulkWriteOptions options) {
Set<TypedNamespace> namespaces = bulk.operations().stream().map(it -> it.context().namespace())
.collect(Collectors.toSet());
if (namespaces.size() == 1) {
return writeToSingleCollection(defaultDatabase, bulk, options, namespaces.iterator().next());
}
return writeToMultipleCollections(defaultDatabase, bulk, options);
}
private BulkOperationResult<BulkWriteResult> writeToSingleCollection(String defaultDatabase, Bulk bulk,
BulkWriteOptions options, TypedNamespace namespace) {
List<ClientNamespacedWriteModel> writeModels = new ArrayList<>();
List<SourceAwareDocument<Object>> afterSaveCallables = new ArrayList<>();
MongoNamespace mongoNamespace = new MongoNamespace(defaultDatabase,
StringUtils.hasText(namespace.collection()) ? namespace.collection()
: template.getCollectionName(namespace.type()));
SingleCollectionCollector collector = new SingleCollectionCollector(mongoNamespace);
buildWriteModels(bulk, collector);
try {
BulkWriteResult bulkWriteResult = template.execute(collector.getNamespace().getCollectionName(),
collection -> collection.bulkWrite(collector.getWriteModels(), new com.mongodb.client.model.BulkWriteOptions()
.ordered(options.getOrder().equals(BulkWriteOptions.Order.ORDERED))));
collector.getAfterSaveCallables().forEach(callable -> {
template
.maybeEmitEvent(new AfterSaveEvent<>(callable.source(), callable.document(), callable.collectionName()));
template.maybeCallAfterSave(callable.source(), callable.document(), callable.collectionName());
});
return BulkOperationResult.from(bulkWriteResult);
} catch (MongoBulkWriteException e) {
DataAccessException dataAccessException = template.getExceptionTranslator().translateExceptionIfPossible(e);
if (dataAccessException != null) {
throw dataAccessException;
}
throw e;
}
}
private BulkOperationResult<ClientBulkWriteResult> writeToMultipleCollections(String defaultDatabase, Bulk bulk,
BulkWriteOptions options) {
MultiCollectionCollector collector = new MultiCollectionCollector(defaultDatabase);
buildWriteModels(bulk, collector);
try {
ClientBulkWriteResult clientBulkWriteResult = template
.doWithClient(client -> client.bulkWrite(collector.getWriteModels(), ClientBulkWriteOptions
.clientBulkWriteOptions().ordered(options.getOrder().equals(BulkWriteOptions.Order.ORDERED))));
collector.getAfterSaveCallables().forEach(callable -> {
template
.maybeEmitEvent(new AfterSaveEvent<>(callable.source(), callable.document(), callable.collectionName()));
template.maybeCallAfterSave(callable.source(), callable.document(), callable.collectionName());
});
return BulkOperationResult.from(clientBulkWriteResult);
} catch (MongoBulkWriteException e) {
DataAccessException dataAccessException = template.getExceptionTranslator().translateExceptionIfPossible(e);
if (dataAccessException != null) {
throw dataAccessException;
}
throw e;
}
}
private void buildWriteModels(Bulk bulk, WriteModelCollector<?> collector) {
for (BulkOperation bulkOp : bulk.operations()) {
String collectionName = bulkOp.context().namespace().collection() != null
? bulkOp.context().namespace().collection()
: template.getCollectionName(bulkOp.context().namespace().type());
MongoNamespace namespace = collector.resolveNamespace(bulkOp, it -> template.getCollectionName(it.type()));
MongoNamespace mongoNamespace = new MongoNamespace(defaultDatabase, collectionName);
if (bulkOp instanceof Insert insert) {
SourceAwareDocument<Object> sourceAwareDocument = template.prepareObjectForSave(collectionName, insert.value(),
template.getConverter());
writeModels.add(ClientNamespacedWriteModel.insertOne(mongoNamespace, sourceAwareDocument.document()));
afterSaveCallables.add(sourceAwareDocument);
SourceAwareDocument<Object> sourceAwareDocument = template.prepareObjectForSave(namespace.getCollectionName(),
insert.value(), template.getConverter());
collector.addInsert(namespace, sourceAwareDocument.document(), sourceAwareDocument);
} else if (bulkOp instanceof Update update) {
Class<?> domainType = update.context().namespace().type();
@ -89,11 +158,7 @@ class BulkWriter { @@ -89,11 +158,7 @@ class BulkWriter {
: updateContext.getMappedUpdate(entity);
UpdateOptions updateOptions = updateContext.getUpdateOptions(domainType, update.query());
if (multi) {
writeModels.add(BulkWriteSupport.updateMany(mongoNamespace, mappedQuery, mappedUpdate, updateOptions));
} else {
writeModels.add(BulkWriteSupport.updateOne(mongoNamespace, mappedQuery, mappedUpdate, updateOptions));
}
collector.addUpdate(namespace, multi, mappedQuery, mappedUpdate, updateOptions);
} else if (bulkOp instanceof Remove remove) {
Class<?> domainType = remove.context().namespace().type();
@ -102,16 +167,12 @@ class BulkWriter { @@ -102,16 +167,12 @@ class BulkWriter {
Document mappedQuery = deleteContext.getMappedQuery(template.getPersistentEntity(domainType));
DeleteOptions deleteOptions = deleteContext.getDeleteOptions(domainType);
if (remove instanceof RemoveFirst) {
writeModels.add(BulkWriteSupport.removeOne(mongoNamespace, mappedQuery, deleteOptions));
} else {
writeModels.add(BulkWriteSupport.removeMany(mongoNamespace, mappedQuery, deleteOptions));
}
collector.addRemove(namespace, remove instanceof RemoveFirst, mappedQuery, deleteOptions);
} else if (bulkOp instanceof Replace replace) {
Class<?> domainType = replace.context().namespace().type();
SourceAwareDocument<Object> sourceAwareDocument = template.prepareObjectForSave(collectionName,
SourceAwareDocument<Object> sourceAwareDocument = template.prepareObjectForSave(namespace.getCollectionName(),
replace.replacement(), template.getConverter());
UpdateContext updateContext = template.getQueryOperations().replaceSingleContext(replace.query(),
@ -120,29 +181,154 @@ class BulkWriter { @@ -120,29 +181,154 @@ class BulkWriter {
Document mappedQuery = updateContext.getMappedQuery(template.getPersistentEntity(domainType));
UpdateOptions updateOptions = updateContext.getUpdateOptions(domainType, replace.query());
writeModels.add(
BulkWriteSupport.replaceOne(mongoNamespace, mappedQuery, sourceAwareDocument.document(), updateOptions));
afterSaveCallables.add(sourceAwareDocument);
collector.addReplace(namespace, mappedQuery, sourceAwareDocument.document(), updateOptions,
sourceAwareDocument);
}
}
}
try {
private interface WriteModelCollector<T> {
ClientBulkWriteResult clientBulkWriteResult = template.doWithClient(client -> client.bulkWrite(writeModels,
ClientBulkWriteOptions.clientBulkWriteOptions().ordered(options.getOrder().equals(BulkWriteOptions.Order.ORDERED))));
MongoNamespace resolveNamespace(String collectionName);
afterSaveCallables.forEach(callable -> {
template
.maybeEmitEvent(new AfterSaveEvent<>(callable.source(), callable.document(), callable.collectionName()));
template.maybeCallAfterSave(callable.source(), callable.document(), callable.collectionName());
});
return clientBulkWriteResult;
} catch (MongoBulkWriteException e) {
DataAccessException dataAccessException = template.getExceptionTranslator().translateExceptionIfPossible(e);
if (dataAccessException != null) {
throw dataAccessException;
default MongoNamespace resolveNamespace(BulkOperation operation, Function<TypedNamespace, String> fallback) {
TypedNamespace typedNamespace = operation.context().namespace();
if (StringUtils.hasText(typedNamespace.collection())) {
return resolveNamespace(typedNamespace.collection());
}
throw e;
return resolveNamespace(fallback.apply(typedNamespace));
}
void addInsert(MongoNamespace namespace, Document document, SourceAwareDocument<Object> sourceDoc);
void addUpdate(MongoNamespace namespace, boolean multi, Document query, Object update, UpdateOptions options);
void addRemove(MongoNamespace namespace, boolean removeFirst, Document query, DeleteOptions options);
void addReplace(MongoNamespace namespace, Document query, Document replacement, UpdateOptions options,
SourceAwareDocument<Object> sourceDoc);
List<SourceAwareDocument<Object>> getAfterSaveCallables();
}
private static class SingleCollectionCollector implements WriteModelCollector<WriteModel<Document>> {
private final List<WriteModel<Document>> writeModels = new ArrayList<>();
private final List<SourceAwareDocument<Object>> afterSaveCallables = new ArrayList<>();
private MongoNamespace namespace;
public SingleCollectionCollector(MongoNamespace namespace) {
this.namespace = namespace;
}
@Override
public MongoNamespace resolveNamespace(String collectionName) {
return namespace;
}
@Override
public void addInsert(MongoNamespace namespace, Document document, SourceAwareDocument<Object> sourceDoc) {
writeModels.add(new InsertOneModel<>(document));
afterSaveCallables.add(sourceDoc);
}
@Override
public void addUpdate(MongoNamespace namespace, boolean multi, Document query, Object update,
UpdateOptions options) {
if (multi) {
writeModels.add(BulkWriteSupport.updateMany(query, update, options));
} else {
writeModels.add(BulkWriteSupport.updateOne(query, update, options));
}
}
@Override
public void addRemove(MongoNamespace namespace, boolean removeFirst, Document query, DeleteOptions options) {
if (removeFirst) {
writeModels.add(BulkWriteSupport.removeOne(query, options));
} else {
writeModels.add(BulkWriteSupport.removeMany(query, options));
}
}
@Override
public void addReplace(MongoNamespace namespace, Document query, Document replacement, UpdateOptions options,
SourceAwareDocument<Object> sourceDoc) {
writeModels.add(BulkWriteSupport.replaceOne(query, replacement, options));
afterSaveCallables.add(sourceDoc);
}
@Override
public List<SourceAwareDocument<Object>> getAfterSaveCallables() {
return afterSaveCallables;
}
MongoNamespace getNamespace() {
return namespace;
}
List<WriteModel<Document>> getWriteModels() {
return writeModels;
}
}
private static class MultiCollectionCollector implements WriteModelCollector<ClientNamespacedWriteModel> {
private final List<ClientNamespacedWriteModel> writeModels = new ArrayList<>();
private final List<SourceAwareDocument<Object>> afterSaveCallables = new ArrayList<>();
private final String defaultDatabaseName;
public MultiCollectionCollector(String defaultDatabaseName) {
this.defaultDatabaseName = defaultDatabaseName;
}
@Override
public MongoNamespace resolveNamespace(String collectionName) {
return new MongoNamespace(defaultDatabaseName, collectionName);
}
@Override
public void addInsert(MongoNamespace namespace, Document document, SourceAwareDocument<Object> sourceDoc) {
writeModels.add(ClientNamespacedWriteModel.insertOne(namespace, document));
afterSaveCallables.add(sourceDoc);
}
@Override
public void addUpdate(MongoNamespace namespace, boolean multi, Document query, Object update,
UpdateOptions options) {
if (multi) {
writeModels.add(BulkWriteSupport.updateMany(namespace, query, update, options));
} else {
writeModels.add(BulkWriteSupport.updateOne(namespace, query, update, options));
}
}
@Override
public void addRemove(MongoNamespace namespace, boolean removeFirst, Document query, DeleteOptions options) {
if (removeFirst) {
writeModels.add(BulkWriteSupport.removeOne(namespace, query, options));
} else {
writeModels.add(BulkWriteSupport.removeMany(namespace, query, options));
}
}
@Override
public void addReplace(MongoNamespace namespace, Document query, Document replacement, UpdateOptions options,
SourceAwareDocument<Object> sourceDoc) {
writeModels.add(BulkWriteSupport.replaceOne(namespace, query, replacement, options));
afterSaveCallables.add(sourceDoc);
}
@Override
public List<SourceAwareDocument<Object>> getAfterSaveCallables() {
return afterSaveCallables;
}
List<ClientNamespacedWriteModel> getWriteModels() {
return writeModels;
}
}
}

4
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java

@ -688,9 +688,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, @@ -688,9 +688,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
@Override
public BulkOperationResult<?> bulkWrite(Bulk bulk, BulkWriteOptions options) {
ClientBulkWriteResult result = new BulkWriter(this).write(getDb().getName(), bulk, options);
return BulkOperationResult.from(result);
return new BulkWriter(this).write(getDb().getName(), bulk, options);
}
@Override

303
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveBulkWriter.java

@ -15,12 +15,14 @@ @@ -15,12 +15,14 @@
*/
package org.springframework.data.mongodb.core;
import org.springframework.data.mongodb.core.bulk.BulkWriteOptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.bson.Document;
import org.springframework.data.mongodb.core.QueryOperations.DeleteContext;
@ -34,12 +36,19 @@ import org.springframework.data.mongodb.core.bulk.BulkOperation.RemoveFirst; @@ -34,12 +36,19 @@ import org.springframework.data.mongodb.core.bulk.BulkOperation.RemoveFirst;
import org.springframework.data.mongodb.core.bulk.BulkOperation.Replace;
import org.springframework.data.mongodb.core.bulk.BulkOperation.Update;
import org.springframework.data.mongodb.core.bulk.BulkOperation.UpdateFirst;
import org.springframework.data.mongodb.core.bulk.BulkOperationContext.TypedNamespace;
import org.springframework.data.mongodb.core.bulk.BulkOperationResult;
import org.springframework.data.mongodb.core.bulk.BulkWriteOptions;
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
import org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent;
import org.springframework.util.StringUtils;
import com.mongodb.MongoNamespace;
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.model.DeleteOptions;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;
import com.mongodb.client.model.bulk.ClientBulkWriteOptions;
import com.mongodb.client.model.bulk.ClientBulkWriteResult;
import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
@ -48,7 +57,7 @@ import com.mongodb.client.model.bulk.ClientNamespacedWriteModel; @@ -48,7 +57,7 @@ import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
* Internal API wrapping a {@link ReactiveMongoTemplate} to encapsulate {@link Bulk} handling using a reactive flow.
*
* @author Christoph Strobl
* @since 2026/02
* @since 5.1
*/
class ReactiveBulkWriter {
@ -58,51 +67,85 @@ class ReactiveBulkWriter { @@ -58,51 +67,85 @@ class ReactiveBulkWriter {
this.template = template;
}
public Mono<ClientBulkWriteResult> write(String defaultDatabase, Bulk bulk, BulkWriteOptions options) {
return Flux.fromIterable(bulk.operations()).concatMap(bulkOp -> toWriteModelAndAfterSave(defaultDatabase, bulkOp))
.collectList().flatMap(results -> {
List<ClientNamespacedWriteModel> writeModels = new ArrayList<>();
List<SourceAwareDocument<?>> afterSaveCallables = new ArrayList<>();
for (WriteModelAndAfterSave result : results) {
writeModels.add(result.model());
if (result.afterSave() != null) {
afterSaveCallables.add(result.afterSave());
}
}
return template
.doWithClient(client -> client.bulkWrite(writeModels,
ClientBulkWriteOptions.clientBulkWriteOptions().ordered(options.getOrder().equals(BulkWriteOptions.Order.ORDERED))))
.doOnSuccess(v -> afterSaveCallables.forEach(callable -> {
template.maybeEmitEvent(
new AfterSaveEvent<>(callable.source(), callable.document(), callable.collectionName()));
}))
.flatMap(v -> Flux.concat(afterSaveCallables.stream().map(callable -> template
.maybeCallAfterSave(callable.source(), callable.document(), callable.collectionName())).toList())
.then(Mono.just(v)));
});
public Mono<BulkOperationResult<?>> write(String defaultDatabase, Bulk bulk, BulkWriteOptions options) {
Set<TypedNamespace> namespaces = bulk.operations().stream().map(it -> it.context().namespace())
.collect(Collectors.toSet());
if (namespaces.size() == 1) {
return writeToSingleCollection(defaultDatabase, bulk, options, namespaces.iterator().next())
.map(r -> (BulkOperationResult<?>) r);
}
return writeToMultipleCollections(defaultDatabase, bulk, options).map(r -> (BulkOperationResult<?>) r);
}
private Mono<BulkOperationResult<BulkWriteResult>> writeToSingleCollection(String defaultDatabase, Bulk bulk,
BulkWriteOptions options, TypedNamespace namespace) {
MongoNamespace mongoNamespace = new MongoNamespace(defaultDatabase,
StringUtils.hasText(namespace.collection()) ? namespace.collection()
: template.getCollectionName(namespace.type()));
SingleCollectionCollector collector = new SingleCollectionCollector(mongoNamespace);
return buildWriteModelsReactive(bulk, collector).then(Mono.defer(() -> {
String collectionName = collector.getNamespace().getCollectionName();
List<SourceAwareDocument<Object>> afterSaveCallables = collector.getAfterSaveCallables();
return template
.createMono(collectionName,
col -> col.bulkWrite(collector.getWriteModels(),
new com.mongodb.client.model.BulkWriteOptions()
.ordered(options.getOrder().equals(BulkWriteOptions.Order.ORDERED))))
.map(
BulkOperationResult::from)
.doOnSuccess(
v -> afterSaveCallables
.forEach(callable -> template.maybeEmitEvent(new AfterSaveEvent<>(callable.source(),
callable.document(), callable.collectionName()))))
.flatMap(result -> Flux.concat(afterSaveCallables.stream().map(callable -> template
.maybeCallAfterSave(callable.source(), callable.document(), callable.collectionName())).toList())
.then(Mono.just(result)));
}));
}
private Mono<WriteModelAndAfterSave> toWriteModelAndAfterSave(String defaultDatabase, BulkOperation bulkOp) {
private Mono<BulkOperationResult<ClientBulkWriteResult>> writeToMultipleCollections(String defaultDatabase, Bulk bulk,
BulkWriteOptions options) {
MultiCollectionCollector collector = new MultiCollectionCollector(defaultDatabase);
return buildWriteModelsReactive(bulk, collector).then(Mono.defer(() -> {
List<ClientNamespacedWriteModel> writeModels = collector.getWriteModels();
List<SourceAwareDocument<Object>> afterSaveCallables = collector.getAfterSaveCallables();
return template
.doWithClient(client -> client.bulkWrite(writeModels,
ClientBulkWriteOptions
.clientBulkWriteOptions().ordered(options.getOrder().equals(BulkWriteOptions.Order.ORDERED))))
.map(
BulkOperationResult::from)
.doOnSuccess(
v -> afterSaveCallables
.forEach(callable -> template.maybeEmitEvent(new AfterSaveEvent<>(callable.source(),
callable.document(), callable.collectionName()))))
.flatMap(result -> Flux.concat(afterSaveCallables.stream().map(callable -> template
.maybeCallAfterSave(callable.source(), callable.document(), callable.collectionName())).toList())
.then(Mono.just(result)));
}));
}
String collectionName = bulkOp.context().namespace().collection() != null
? bulkOp.context().namespace().collection()
: template.getCollectionName(bulkOp.context().namespace().type());
private Mono<Void> buildWriteModelsReactive(Bulk bulk, WriteModelCollector<?> collector) {
return Flux.fromIterable(bulk.operations()).concatMap(bulkOp -> addOperationReactive(bulkOp, collector)).then();
}
MongoNamespace mongoNamespace = new MongoNamespace(defaultDatabase, collectionName);
private Mono<Void> addOperationReactive(BulkOperation bulkOp, WriteModelCollector<?> collector) {
MongoNamespace namespace = collector.resolveNamespace(bulkOp, ns -> template.getCollectionName(ns.type()));
if (bulkOp instanceof Insert insert) {
return template.prepareObjectForSaveReactive(collectionName, insert.value(), template.getConverter())
.map(sourceAwareDocument -> {
ClientNamespacedWriteModel model = ClientNamespacedWriteModel.insertOne(mongoNamespace,
sourceAwareDocument.document());
return new WriteModelAndAfterSave(model, sourceAwareDocument);
});
return template
.prepareObjectForSaveReactive(namespace.getCollectionName(), insert.value(), template.getConverter())
.doOnNext(sad -> collector.addInsert(namespace, sad.document(), toObject(sad))).then();
}
if (bulkOp instanceof Update update) {
@ -119,15 +162,8 @@ class ReactiveBulkWriter { @@ -119,15 +162,8 @@ class ReactiveBulkWriter {
: updateContext.getMappedUpdate(entity);
UpdateOptions updateOptions = updateContext.getUpdateOptions(domainType, update.query());
if (multi) {
ClientNamespacedWriteModel model = BulkWriteSupport.updateMany(mongoNamespace, mappedQuery, mappedUpdate,
updateOptions);
return Mono.just(new WriteModelAndAfterSave(model, null));
}
ClientNamespacedWriteModel model = BulkWriteSupport.updateOne(mongoNamespace, mappedQuery, mappedUpdate,
updateOptions);
return Mono.just(new WriteModelAndAfterSave(model, null));
collector.addUpdate(namespace, multi, mappedQuery, mappedUpdate, updateOptions);
return Mono.empty();
}
if (bulkOp instanceof Remove remove) {
@ -138,37 +174,176 @@ class ReactiveBulkWriter { @@ -138,37 +174,176 @@ class ReactiveBulkWriter {
Document mappedQuery = deleteContext.getMappedQuery(template.getPersistentEntity(domainType));
DeleteOptions deleteOptions = deleteContext.getDeleteOptions(domainType);
if (remove instanceof RemoveFirst) {
ClientNamespacedWriteModel model = BulkWriteSupport.removeOne(mongoNamespace, mappedQuery, deleteOptions);
return Mono.just(new WriteModelAndAfterSave(model, null));
} else {
ClientNamespacedWriteModel model = BulkWriteSupport.removeMany(mongoNamespace, mappedQuery, deleteOptions);
return Mono.just(new WriteModelAndAfterSave(model, null));
}
collector.addRemove(namespace, remove instanceof RemoveFirst, mappedQuery, deleteOptions);
return Mono.empty();
}
if (bulkOp instanceof Replace replace) {
return template.prepareObjectForSaveReactive(collectionName, replace.replacement(), template.getConverter())
.map(sourceAwareDocument -> {
return template
.prepareObjectForSaveReactive(namespace.getCollectionName(), replace.replacement(), template.getConverter())
.doOnNext(sad -> {
UpdateContext updateContext = template.getQueryOperations().replaceSingleContext(replace.query(),
MappedDocument.of(sourceAwareDocument.document()), replace.upsert());
MappedDocument.of(sad.document()), replace.upsert());
Document mappedQuery = updateContext
.getMappedQuery(template.getPersistentEntity(replace.context().namespace().type()));
UpdateOptions updateOptions = updateContext.getUpdateOptions(replace.context().namespace().type(),
replace.query());
ClientNamespacedWriteModel model = BulkWriteSupport.replaceOne(mongoNamespace, mappedQuery,
sourceAwareDocument.document(), updateOptions);
return new WriteModelAndAfterSave(model, sourceAwareDocument);
});
collector.addReplace(namespace, mappedQuery, sad.document(), updateOptions, toObject(sad));
}).then();
}
return Mono.error(new IllegalStateException("Unknown bulk operation type: " + bulkOp.getClass()));
}
private record WriteModelAndAfterSave(ClientNamespacedWriteModel model, SourceAwareDocument<?> afterSave) {
@SuppressWarnings("unchecked")
private static SourceAwareDocument<Object> toObject(SourceAwareDocument<?> sad) {
return (SourceAwareDocument<Object>) sad;
}
private interface WriteModelCollector<T> {
MongoNamespace resolveNamespace(String collectionName);
default MongoNamespace resolveNamespace(BulkOperation operation, Function<TypedNamespace, String> fallback) {
TypedNamespace typedNamespace = operation.context().namespace();
if (StringUtils.hasText(typedNamespace.collection())) {
return resolveNamespace(typedNamespace.collection());
}
return resolveNamespace(fallback.apply(typedNamespace));
}
void addInsert(MongoNamespace namespace, Document document, SourceAwareDocument<Object> sourceDoc);
void addUpdate(MongoNamespace namespace, boolean multi, Document query, Object update, UpdateOptions options);
void addRemove(MongoNamespace namespace, boolean removeFirst, Document query, DeleteOptions options);
void addReplace(MongoNamespace namespace, Document query, Document replacement, UpdateOptions options,
SourceAwareDocument<Object> sourceDoc);
List<SourceAwareDocument<Object>> getAfterSaveCallables();
}
private static class SingleCollectionCollector implements WriteModelCollector<WriteModel<Document>> {
private final List<WriteModel<Document>> writeModels = new ArrayList<>();
private final List<SourceAwareDocument<Object>> afterSaveCallables = new ArrayList<>();
private final MongoNamespace namespace;
SingleCollectionCollector(MongoNamespace namespace) {
this.namespace = namespace;
}
@Override
public MongoNamespace resolveNamespace(String collectionName) {
return namespace;
}
@Override
public void addInsert(MongoNamespace namespace, Document document, SourceAwareDocument<Object> sourceDoc) {
writeModels.add(new InsertOneModel<>(document));
afterSaveCallables.add(sourceDoc);
}
@Override
public void addUpdate(MongoNamespace namespace, boolean multi, Document query, Object update,
UpdateOptions options) {
if (multi) {
writeModels.add(BulkWriteSupport.updateMany(query, update, options));
} else {
writeModels.add(BulkWriteSupport.updateOne(query, update, options));
}
}
@Override
public void addRemove(MongoNamespace namespace, boolean removeFirst, Document query, DeleteOptions options) {
if (removeFirst) {
writeModels.add(BulkWriteSupport.removeOne(query, options));
} else {
writeModels.add(BulkWriteSupport.removeMany(query, options));
}
}
@Override
public void addReplace(MongoNamespace namespace, Document query, Document replacement, UpdateOptions options,
SourceAwareDocument<Object> sourceDoc) {
writeModels.add(BulkWriteSupport.replaceOne(query, replacement, options));
afterSaveCallables.add(sourceDoc);
}
@Override
public List<SourceAwareDocument<Object>> getAfterSaveCallables() {
return afterSaveCallables;
}
MongoNamespace getNamespace() {
return namespace;
}
List<WriteModel<Document>> getWriteModels() {
return writeModels;
}
}
private static class MultiCollectionCollector implements WriteModelCollector<ClientNamespacedWriteModel> {
private final List<ClientNamespacedWriteModel> writeModels = new ArrayList<>();
private final List<SourceAwareDocument<Object>> afterSaveCallables = new ArrayList<>();
private final String defaultDatabaseName;
MultiCollectionCollector(String defaultDatabaseName) {
this.defaultDatabaseName = defaultDatabaseName;
}
@Override
public MongoNamespace resolveNamespace(String collectionName) {
return new MongoNamespace(defaultDatabaseName, collectionName);
}
@Override
public void addInsert(MongoNamespace namespace, Document document, SourceAwareDocument<Object> sourceDoc) {
writeModels.add(ClientNamespacedWriteModel.insertOne(namespace, document));
afterSaveCallables.add(sourceDoc);
}
@Override
public void addUpdate(MongoNamespace namespace, boolean multi, Document query, Object update,
UpdateOptions options) {
if (multi) {
writeModels.add(BulkWriteSupport.updateMany(namespace, query, update, options));
} else {
writeModels.add(BulkWriteSupport.updateOne(namespace, query, update, options));
}
}
@Override
public void addRemove(MongoNamespace namespace, boolean removeFirst, Document query, DeleteOptions options) {
if (removeFirst) {
writeModels.add(BulkWriteSupport.removeOne(namespace, query, options));
} else {
writeModels.add(BulkWriteSupport.removeMany(namespace, query, options));
}
}
@Override
public void addReplace(MongoNamespace namespace, Document query, Document replacement, UpdateOptions options,
SourceAwareDocument<Object> sourceDoc) {
writeModels.add(BulkWriteSupport.replaceOne(namespace, query, replacement, options));
afterSaveCallables.add(sourceDoc);
}
@Override
public List<SourceAwareDocument<Object>> getAfterSaveCallables() {
return afterSaveCallables;
}
List<ClientNamespacedWriteModel> getWriteModels() {
return writeModels;
}
}
}

4
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java

@ -842,9 +842,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati @@ -842,9 +842,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
@Override
public Mono<BulkOperationResult<?>> bulkWrite(Bulk bulk, BulkWriteOptions options) {
return doGetDatabase()
.flatMap(db -> new ReactiveBulkWriter(this).write(db.getName(), bulk, options))
.map(BulkOperationResult::from);
return doGetDatabase().flatMap(db -> new ReactiveBulkWriter(this).write(db.getName(), bulk, options));
}
public record SourceAwareDocument<T>(T source, Document document, String collectionName) {

46
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/bulk/BulkOperationResult.java

@ -15,6 +15,7 @@ @@ -15,6 +15,7 @@
*/
package org.springframework.data.mongodb.core.bulk;
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.model.bulk.ClientBulkWriteResult;
/**
@ -73,6 +74,51 @@ public interface BulkOperationResult<T> { @@ -73,6 +74,51 @@ public interface BulkOperationResult<T> {
};
}
/**
* Creates a {@link BulkOperationResult} from a MongoDB driver {@link BulkWriteResult}.
*
* @param result the driver result; must not be {@literal null}.
* @return a new {@link BulkOperationResult} wrapping the given result; never {@literal null}.
*/
static BulkOperationResult<BulkWriteResult> from(BulkWriteResult result) {
return new BulkOperationResult<>() {
@Override
public long insertCount() {
return result.getInsertedCount();
}
@Override
public long modifiedCount() {
return result.getModifiedCount();
}
@Override
public long deleteCount() {
return result.getDeletedCount();
}
@Override
public long upsertCount() {
return result.getUpserts().size();
}
@Override
public boolean acknowledged() {
return result.wasAcknowledged();
}
@Override
public long matchedCount() {
return result.getMatchedCount();
}
@Override
public BulkWriteResult rawResult() {
return result;
}
};
}
/**
* Returns the number of documents inserted.
*

75
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateBulkUnitTests.java

@ -17,12 +17,14 @@ package org.springframework.data.mongodb.core; @@ -17,12 +17,14 @@ package org.springframework.data.mongodb.core;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.springframework.data.mongodb.core.query.Criteria.where;
import static org.springframework.data.mongodb.core.query.Query.query;
@ -134,11 +136,36 @@ class MongoTemplateBulkUnitTests { @@ -134,11 +136,36 @@ class MongoTemplateBulkUnitTests {
ops = Bulk.builder().inCollection("default-collection");
}
@Test // GH-5087
void delegatesToCollectionOnSingleNamespace() {
Bulk bulk = ops.insert(new BaseDoc()).build();
template.bulkWrite(bulk, BulkWriteOptions.ordered());
verify(client).getDatabase(anyString());
verifyNoMoreInteractions(client);
verify(collection).bulkWrite(anyList(), any());
}
@Test // GH-5087
void delegatesToClientOnMultiNamespace() {
Bulk bulk = ops.insert(new BaseDoc()).inCollection("other-collection").insert(new BaseDoc()).build();
template.bulkWrite(bulk, BulkWriteOptions.ordered());
verify(client).bulkWrite(anyList(), any());
verifyNoInteractions(collection);
}
@Test // GH-5087
void updateOneShouldUseCollationWhenPresent() {
Bulk bulk = ops
.updateOne(new BasicQuery("{}").collation(Collation.of("de")), new Update().set("lastName", "targaryen"))
.updateOne(new BasicQuery("{}").collation(Collation.of("de")), new Update().set("lastName", "targaryen")) //
.inCollection("other-collection")
.updateOne(new BasicQuery("{}"), new Update().set("items.$.documents.0.fileId", "new-id")) //
.build();
template.bulkWrite(bulk, BulkWriteOptions.ordered());
@ -155,6 +182,8 @@ class MongoTemplateBulkUnitTests { @@ -155,6 +182,8 @@ class MongoTemplateBulkUnitTests {
Bulk bulk = ops
.updateMulti(new BasicQuery("{}").collation(Collation.of("de")), new Update().set("lastName", "targaryen"))
.inCollection("other-collection")
.updateOne(new BasicQuery("{}"), new Update().set("items.$.documents.0.fileId", "new-id")) //
.build();
template.bulkWrite(bulk, BulkWriteOptions.ordered());
@ -169,7 +198,9 @@ class MongoTemplateBulkUnitTests { @@ -169,7 +198,9 @@ class MongoTemplateBulkUnitTests {
@Test // GH-5087
void removeShouldUseCollationWhenPresent() {
Bulk bulk = ops.remove(new BasicQuery("{}").collation(Collation.of("de"))).build();
Bulk bulk = ops.remove(new BasicQuery("{}").collation(Collation.of("de"))).inCollection("other-collection")
.updateOne(new BasicQuery("{}"), new Update().set("items.$.documents.0.fileId", "new-id")) //
.build();
template.bulkWrite(bulk, BulkWriteOptions.ordered());
@ -183,7 +214,10 @@ class MongoTemplateBulkUnitTests { @@ -183,7 +214,10 @@ class MongoTemplateBulkUnitTests {
@Test // GH-5087
void replaceOneShouldUseCollationWhenPresent() {
Bulk bulk = ops.replaceOne(new BasicQuery("{}").collation(Collation.of("de")), new SomeDomainType()).build();
Bulk bulk = ops.replaceOne(new BasicQuery("{}").collation(Collation.of("de")), new SomeDomainType()) //
.inCollection("other-collection")
.updateOne(new BasicQuery("{}"), new Update().set("items.$.documents.0.fileId", "new-id")) //
.build();
template.bulkWrite(bulk, BulkWriteOptions.ordered());
@ -198,7 +232,10 @@ class MongoTemplateBulkUnitTests { @@ -198,7 +232,10 @@ class MongoTemplateBulkUnitTests {
void bulkUpdateShouldMapQueryAndUpdateCorrectly() {
Bulk bulk = ops.inCollection("test", SomeDomainType.class)
.updateOne(query(where("firstName").is("danerys")), Update.update("firstName", "queen danerys")).build();
.updateOne(query(where("firstName").is("danerys")), Update.update("firstName", "queen danerys")) //
.inCollection("other-collection")
.updateOne(new BasicQuery("{}"), new Update().set("items.$.documents.0.fileId", "new-id")) //
.build();
template.bulkWrite(bulk, BulkWriteOptions.ordered());
verify(client).bulkWrite(captor.capture(), any());
@ -212,7 +249,10 @@ class MongoTemplateBulkUnitTests { @@ -212,7 +249,10 @@ class MongoTemplateBulkUnitTests {
@Test // GH-5087
void bulkRemoveShouldMapQueryCorrectly() {
Bulk bulk = ops.inCollection("test", SomeDomainType.class).remove(query(where("firstName").is("danerys"))).build();
Bulk bulk = ops.inCollection("test", SomeDomainType.class).remove(query(where("firstName").is("danerys")))
.inCollection("other-collection")
.updateOne(new BasicQuery("{}"), new Update().set("items.$.documents.0.fileId", "new-id")) //
.build();
template.bulkWrite(bulk, BulkWriteOptions.ordered());
verify(client).bulkWrite(captor.capture(), any());
@ -230,7 +270,9 @@ class MongoTemplateBulkUnitTests { @@ -230,7 +270,9 @@ class MongoTemplateBulkUnitTests {
replacement.lastName = "Kim";
Bulk bulk = ops.inCollection("test", SomeDomainType.class)
.replaceOne(query(where("firstName").is("danerys")), replacement).build();
.replaceOne(query(where("firstName").is("danerys")), replacement).inCollection("other-collection")
.updateOne(new BasicQuery("{}"), new Update().set("items.$.documents.0.fileId", "new-id")) //
.build();
template.bulkWrite(bulk, BulkWriteOptions.ordered());
verify(client).bulkWrite(captor.capture(), any());
@ -247,7 +289,9 @@ class MongoTemplateBulkUnitTests { @@ -247,7 +289,9 @@ class MongoTemplateBulkUnitTests {
void bulkInsertInvokesEntityCallbacks() {
Person entity = new Person("init");
Bulk bulk = ops.inCollection("person").insert(entity).build();
Bulk bulk = ops.inCollection("person").insert(entity).inCollection("other-collection")
.updateOne(new BasicQuery("{}"), new Update().set("items.$.documents.0.fileId", "new-id")) //
.build();
template.bulkWrite(bulk, BulkWriteOptions.ordered());
@ -296,13 +340,16 @@ class MongoTemplateBulkUnitTests { @@ -296,13 +340,16 @@ class MongoTemplateBulkUnitTests {
MockingDetails mockingDetails = Mockito.mockingDetails(eventPublisher);
Collection<Invocation> invocations = mockingDetails.getInvocations();
assertThat(invocations).hasSize(3).extracting(tt -> tt.getArgument(0)).map(Object::getClass)
.containsExactly((Class) BeforeConvertEvent.class, (Class) BeforeSaveEvent.class, (Class) AfterSaveEvent.class);
.containsExactly((Class) BeforeConvertEvent.class, (Class) BeforeSaveEvent.class, (Class) AfterSaveEvent.class);
}
@Test // GH-5087
void appliesArrayFilterWhenPresent() {
Bulk bulk = ops.updateOne(new BasicQuery("{}"), new Update().filterArray(where("element").gte(100))).build();
Bulk bulk = ops.updateOne(new BasicQuery("{}"), new Update().filterArray(where("element").gte(100))) //
.inCollection("other-collection")
.updateOne(new BasicQuery("{}"), new Update().set("items.$.documents.0.fileId", "new-id")) //
.build();
template.bulkWrite(bulk, BulkWriteOptions.ordered());
verify(client).bulkWrite(captor.capture(), any());
@ -317,7 +364,10 @@ class MongoTemplateBulkUnitTests { @@ -317,7 +364,10 @@ class MongoTemplateBulkUnitTests {
@Test // GH-5087
void shouldRetainNestedArrayPathWithPlaceholdersForNoMatchingPaths() {
Bulk bulk = ops.updateOne(new BasicQuery("{}"), new Update().set("items.$.documents.0.fileId", "new-id")).build();
Bulk bulk = ops.updateOne(new BasicQuery("{}"), new Update().set("items.$.documents.0.fileId", "new-id")) //
.inCollection("other-collection")
.updateOne(new BasicQuery("{}"), new Update().set("items.$.documents.0.fileId", "new-id")) //
.build();
template.bulkWrite(bulk, BulkWriteOptions.ordered());
verify(client).bulkWrite(captor.capture(), any());
@ -332,7 +382,10 @@ class MongoTemplateBulkUnitTests { @@ -332,7 +382,10 @@ class MongoTemplateBulkUnitTests {
void shouldRetainNestedArrayPathWithPlaceholdersForMappedEntity() {
Bulk bulk = ops.inCollection("collection-1", OrderTest.class)
.updateOne(new BasicQuery("{}"), Update.update("items.$.documents.0.fileId", "file-id")).build();
.updateOne(new BasicQuery("{}"), Update.update("items.$.documents.0.fileId", "file-id")) //
.inCollection("collection-2", OrderTest.class)
.updateOne(new BasicQuery("{}"), Update.update("items.$.documents.0.fileId", "file-id")) //
.build();
template.bulkWrite(bulk, BulkWriteOptions.ordered());
verify(client).bulkWrite(captor.capture(), any());

130
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateBulkUnitTests.java

@ -28,6 +28,8 @@ import static org.mockito.Mockito.when; @@ -28,6 +28,8 @@ import static org.mockito.Mockito.when;
import static org.springframework.data.mongodb.core.query.Criteria.where;
import static org.springframework.data.mongodb.core.query.Query.query;
import reactor.core.publisher.Mono;
import java.util.Collection;
import java.util.List;
@ -59,12 +61,12 @@ import org.springframework.data.mongodb.core.convert.MappingMongoConverter; @@ -59,12 +61,12 @@ import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.mapping.Field;
import org.springframework.data.mongodb.core.mapping.MongoMappingContext;
import org.springframework.data.mongodb.core.mapping.event.ReactiveAfterSaveCallback;
import org.springframework.data.mongodb.core.mapping.event.ReactiveBeforeConvertCallback;
import org.springframework.data.mongodb.core.mapping.event.ReactiveBeforeSaveCallback;
import org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent;
import org.springframework.data.mongodb.core.mapping.event.BeforeConvertEvent;
import org.springframework.data.mongodb.core.mapping.event.BeforeSaveEvent;
import org.springframework.data.mongodb.core.mapping.event.ReactiveAfterSaveCallback;
import org.springframework.data.mongodb.core.mapping.event.ReactiveBeforeConvertCallback;
import org.springframework.data.mongodb.core.mapping.event.ReactiveBeforeSaveCallback;
import org.springframework.data.mongodb.core.query.BasicQuery;
import org.springframework.data.mongodb.core.query.Collation;
import org.springframework.data.mongodb.core.query.Update;
@ -82,10 +84,9 @@ import com.mongodb.reactivestreams.client.MongoClient; @@ -82,10 +84,9 @@ import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import reactor.core.publisher.Mono;
/**
* Unit tests for {@link ReactiveMongoOperations#bulkWrite}.
* Unit tests for {@link ReactiveMongoOperations#bulkWrite}. Tests use at least two collections so that
* {@code client.bulkWrite} is exercised (multi-collection path).
*
* @author Christoph Strobl
*/
@ -93,6 +94,13 @@ import reactor.core.publisher.Mono; @@ -93,6 +94,13 @@ import reactor.core.publisher.Mono;
@MockitoSettings(strictness = Strictness.LENIENT)
class ReactiveMongoTemplateBulkUnitTests {
private static final String OTHER_COLLECTION = "other-collection";
/** Simple insert in another collection so bulk has multiple namespaces and client.bulkWrite is used. */
private static Document simpleInsertInOtherCollection() {
return new Document("_id", 1);
}
private ReactiveMongoTemplate template;
@Mock MongoClient client;
@Mock MongoDatabase database;
@ -139,10 +147,33 @@ class ReactiveMongoTemplateBulkUnitTests { @@ -139,10 +147,33 @@ class ReactiveMongoTemplateBulkUnitTests {
ops = Bulk.builder().inCollection("default-collection");
}
@Test // GH-5087
void delegatesToCollectionOnSingleNamespace() {
Bulk bulk = ops.insert(new BaseDoc()).build();
template.bulkWrite(bulk, BulkWriteOptions.ordered()).subscribe();
verifyNoInteractions(client);
verify(collection).bulkWrite(anyList(), any());
}
@Test // GH-5087
void delegatesToClientOnMultiNamespace() {
Bulk bulk = ops.insert(new BaseDoc()).inCollection("other-collection").insert(new BaseDoc()).build();
template.bulkWrite(bulk, BulkWriteOptions.ordered()).subscribe();
verify(client).bulkWrite(anyList(), any());
verifyNoInteractions(collection);
}
@Test // GH-5087
void updateOneShouldUseCollationWhenPresent() {
Bulk bulk = ops
Bulk bulk = Bulk.builder().inCollection(OTHER_COLLECTION).insert(simpleInsertInOtherCollection())
.inCollection("default-collection")
.updateOne(new BasicQuery("{}").collation(Collation.of("de")), new Update().set("lastName", "targaryen"))
.build();
@ -151,14 +182,15 @@ class ReactiveMongoTemplateBulkUnitTests { @@ -151,14 +182,15 @@ class ReactiveMongoTemplateBulkUnitTests {
verify(client).bulkWrite(captor.capture(), any());
assertThat(
extractWriteModel(ConcreteClientUpdateOneModel.class, captor.getValue().get(0)).getOptions().getCollation())
extractWriteModel(ConcreteClientUpdateOneModel.class, captor.getValue().get(1)).getOptions().getCollation())
.contains(com.mongodb.client.model.Collation.builder().locale("de").build());
}
@Test // GH-5087
void updateManyShouldUseCollationWhenPresent() {
Bulk bulk = ops
Bulk bulk = Bulk.builder().inCollection(OTHER_COLLECTION).insert(simpleInsertInOtherCollection())
.inCollection("default-collection")
.updateMulti(new BasicQuery("{}").collation(Collation.of("de")), new Update().set("lastName", "targaryen"))
.build();
@ -167,49 +199,53 @@ class ReactiveMongoTemplateBulkUnitTests { @@ -167,49 +199,53 @@ class ReactiveMongoTemplateBulkUnitTests {
verify(client).bulkWrite(captor.capture(), any());
assertThat(
extractWriteModel(ConcreteClientUpdateManyModel.class, captor.getValue().get(0)).getOptions().getCollation())
extractWriteModel(ConcreteClientUpdateManyModel.class, captor.getValue().get(1)).getOptions().getCollation())
.contains(com.mongodb.client.model.Collation.builder().locale("de").build());
}
@Test // GH-5087
void removeShouldUseCollationWhenPresent() {
Bulk bulk = ops.remove(new BasicQuery("{}").collation(Collation.of("de"))).build();
Bulk bulk = Bulk.builder().inCollection(OTHER_COLLECTION).insert(simpleInsertInOtherCollection())
.inCollection("default-collection").remove(new BasicQuery("{}").collation(Collation.of("de"))).build();
template.bulkWrite(bulk, BulkWriteOptions.ordered()).block();
verify(client).bulkWrite(captor.capture(), any());
assertThat(
extractWriteModel(ConcreteClientDeleteManyModel.class, captor.getValue().get(0)).getOptions().getCollation())
extractWriteModel(ConcreteClientDeleteManyModel.class, captor.getValue().get(1)).getOptions().getCollation())
.contains(com.mongodb.client.model.Collation.builder().locale("de").build());
}
@Test // GH-5087
void replaceOneShouldUseCollationWhenPresent() {
Bulk bulk = ops.replaceOne(new BasicQuery("{}").collation(Collation.of("de")), new SomeDomainType()).build();
Bulk bulk = Bulk.builder().inCollection(OTHER_COLLECTION).insert(simpleInsertInOtherCollection())
.inCollection("default-collection")
.replaceOne(new BasicQuery("{}").collation(Collation.of("de")), new SomeDomainType()).build();
template.bulkWrite(bulk, BulkWriteOptions.ordered()).block();
verify(client).bulkWrite(captor.capture(), any());
assertThat(
extractWriteModel(ConcreteClientReplaceOneModel.class, captor.getValue().get(0)).getOptions().getCollation())
extractWriteModel(ConcreteClientReplaceOneModel.class, captor.getValue().get(1)).getOptions().getCollation())
.contains(com.mongodb.client.model.Collation.builder().locale("de").build());
}
@Test // GH-5087
void bulkUpdateShouldMapQueryAndUpdateCorrectly() {
Bulk bulk = ops.inCollection("test", SomeDomainType.class)
Bulk bulk = Bulk.builder().inCollection(OTHER_COLLECTION).insert(simpleInsertInOtherCollection())
.inCollection("test", SomeDomainType.class)
.updateOne(query(where("firstName").is("danerys")), Update.update("firstName", "queen danerys")).build();
template.bulkWrite(bulk, BulkWriteOptions.ordered()).block();
verify(client).bulkWrite(captor.capture(), any());
ConcreteClientUpdateOneModel updateModel = extractWriteModel(ConcreteClientUpdateOneModel.class,
captor.getValue().get(0));
captor.getValue().get(1));
assertThat(updateModel.getFilter()).isEqualTo(new Document("first_name", "danerys"));
assertThat(updateModel.getUpdate()).contains(new Document("$set", new Document("first_name", "queen danerys")));
}
@ -217,13 +253,14 @@ class ReactiveMongoTemplateBulkUnitTests { @@ -217,13 +253,14 @@ class ReactiveMongoTemplateBulkUnitTests {
@Test // GH-5087
void bulkRemoveShouldMapQueryCorrectly() {
Bulk bulk = ops.inCollection("test", SomeDomainType.class).remove(query(where("firstName").is("danerys"))).build();
Bulk bulk = Bulk.builder().inCollection(OTHER_COLLECTION).insert(simpleInsertInOtherCollection())
.inCollection("test", SomeDomainType.class).remove(query(where("firstName").is("danerys"))).build();
template.bulkWrite(bulk, BulkWriteOptions.ordered()).block();
verify(client).bulkWrite(captor.capture(), any());
ConcreteClientDeleteManyModel deleteModel = extractWriteModel(ConcreteClientDeleteManyModel.class,
captor.getValue().get(0));
captor.getValue().get(1));
assertThat(deleteModel.getFilter()).isEqualTo(new Document("first_name", "danerys"));
}
@ -234,25 +271,26 @@ class ReactiveMongoTemplateBulkUnitTests { @@ -234,25 +271,26 @@ class ReactiveMongoTemplateBulkUnitTests {
replacement.firstName = "Minsu";
replacement.lastName = "Kim";
Bulk bulk = ops.inCollection("test", SomeDomainType.class)
.replaceOne(query(where("firstName").is("danerys")), replacement).build();
Bulk bulk = Bulk.builder().inCollection(OTHER_COLLECTION).insert(simpleInsertInOtherCollection())
.inCollection("test", SomeDomainType.class).replaceOne(query(where("firstName").is("danerys")), replacement)
.build();
template.bulkWrite(bulk, BulkWriteOptions.ordered()).block();
verify(client).bulkWrite(captor.capture(), any());
ConcreteClientReplaceOneModel replaceModel = extractWriteModel(ConcreteClientReplaceOneModel.class,
captor.getValue().get(0));
captor.getValue().get(1));
assertThat(replaceModel.getFilter()).isEqualTo(new Document("first_name", "danerys"));
assertThat(replaceModel.getReplacement()).asInstanceOf(InstanceOfAssertFactories.map(String.class, Object.class))
.containsEntry("first_name", "Minsu")
.containsEntry("lastName", "Kim");
.containsEntry("first_name", "Minsu").containsEntry("lastName", "Kim");
}
@Test // GH-5087
void bulkInsertInvokesEntityCallbacks() {
Person entity = new Person("init");
Bulk bulk = ops.inCollection("person").insert(entity).build();
Bulk bulk = Bulk.builder().inCollection(OTHER_COLLECTION).insert(simpleInsertInOtherCollection())
.inCollection("person").insert(entity).build();
template.bulkWrite(bulk, BulkWriteOptions.ordered()).block();
@ -265,7 +303,7 @@ class ReactiveMongoTemplateBulkUnitTests { @@ -265,7 +303,7 @@ class ReactiveMongoTemplateBulkUnitTests {
verify(client).bulkWrite(captor.capture(), any());
ConcreteClientInsertOneModel insertModel = extractWriteModel(ConcreteClientInsertOneModel.class,
captor.getValue().get(0));
captor.getValue().get(1));
assertThat(insertModel.getDocument()).asInstanceOf(InstanceOfAssertFactories.map(String.class, Object.class))
.containsEntry("firstName", "after-save");
}
@ -274,46 +312,57 @@ class ReactiveMongoTemplateBulkUnitTests { @@ -274,46 +312,57 @@ class ReactiveMongoTemplateBulkUnitTests {
@SuppressWarnings("rawtypes")
void bulkReplaceOneEmitsEventsCorrectly() {
ops.replaceOne(query(where("firstName").is("danerys")), new SomeDomainType());
Bulk bulk = Bulk.builder().inCollection(OTHER_COLLECTION).insert(simpleInsertInOtherCollection())
.inCollection("default-collection").replaceOne(query(where("firstName").is("danerys")), new SomeDomainType())
.build();
verifyNoInteractions(applicationContext);
template.bulkWrite(ops.build(), BulkWriteOptions.ordered()).block();
template.bulkWrite(bulk, BulkWriteOptions.ordered()).block();
MockingDetails mockingDetails = Mockito.mockingDetails(applicationContext);
Collection<Invocation> invocations = mockingDetails.getInvocations();
assertThat(invocations).hasSize(3).extracting(tt -> tt.getArgument(0)).map(Object::getClass)
.containsExactly((Class) BeforeConvertEvent.class, (Class) BeforeSaveEvent.class, (Class) AfterSaveEvent.class);
// Insert in other-collection (3 events) + replace in default-collection (3 events)
assertThat(invocations).hasSize(6).extracting(tt -> tt.getArgument(0)).map(Object::getClass)
.containsExactlyInAnyOrder((Class) BeforeConvertEvent.class, (Class) BeforeConvertEvent.class,
(Class) BeforeSaveEvent.class, (Class) BeforeSaveEvent.class, (Class) AfterSaveEvent.class,
(Class) AfterSaveEvent.class);
}
@Test // GH-5087
@SuppressWarnings("rawtypes")
void bulkInsertEmitsEventsCorrectly() {
ops.insert(new SomeDomainType());
Bulk bulk = Bulk.builder().inCollection(OTHER_COLLECTION).insert(simpleInsertInOtherCollection())
.inCollection("default-collection").insert(new SomeDomainType()).build();
verify(applicationContext, never()).publishEvent(any(BeforeConvertEvent.class));
verify(applicationContext, never()).publishEvent(any(BeforeSaveEvent.class));
verify(applicationContext, never()).publishEvent(any(AfterSaveEvent.class));
template.bulkWrite(ops.build(), BulkWriteOptions.ordered()).block();
template.bulkWrite(bulk, BulkWriteOptions.ordered()).block();
MockingDetails mockingDetails = Mockito.mockingDetails(applicationContext);
Collection<Invocation> invocations = mockingDetails.getInvocations();
assertThat(invocations).hasSize(3).extracting(tt -> tt.getArgument(0)).map(Object::getClass)
.containsExactly((Class) BeforeConvertEvent.class, (Class) BeforeSaveEvent.class, (Class) AfterSaveEvent.class);
// Two inserts (other-collection + default-collection) each emit BeforeConvert, BeforeSave, AfterSave
assertThat(invocations).hasSize(6).extracting(tt -> tt.getArgument(0)).map(Object::getClass)
.containsExactlyInAnyOrder((Class) BeforeConvertEvent.class, (Class) BeforeConvertEvent.class,
(Class) BeforeSaveEvent.class, (Class) BeforeSaveEvent.class, (Class) AfterSaveEvent.class,
(Class) AfterSaveEvent.class);
}
@Test // GH-5087
void appliesArrayFilterWhenPresent() {
Bulk bulk = ops.updateOne(new BasicQuery("{}"), new Update().filterArray(where("element").gte(100))).build();
Bulk bulk = Bulk.builder().inCollection(OTHER_COLLECTION).insert(simpleInsertInOtherCollection())
.inCollection("default-collection")
.updateOne(new BasicQuery("{}"), new Update().filterArray(where("element").gte(100))).build();
template.bulkWrite(bulk, BulkWriteOptions.ordered()).block();
verify(client).bulkWrite(captor.capture(), any());
ConcreteClientUpdateOneModel updateModel = extractWriteModel(ConcreteClientUpdateOneModel.class,
captor.getValue().get(0));
captor.getValue().get(1));
assertThat(updateModel.getOptions().getArrayFilters().get()).satisfies(it -> {
assertThat((List<Document>) it).containsExactly(new Document("element", new Document("$gte", 100)));
});
@ -322,13 +371,15 @@ class ReactiveMongoTemplateBulkUnitTests { @@ -322,13 +371,15 @@ class ReactiveMongoTemplateBulkUnitTests {
@Test // GH-5087
void shouldRetainNestedArrayPathWithPlaceholdersForNoMatchingPaths() {
Bulk bulk = ops.updateOne(new BasicQuery("{}"), new Update().set("items.$.documents.0.fileId", "new-id")).build();
Bulk bulk = Bulk.builder().inCollection(OTHER_COLLECTION).insert(simpleInsertInOtherCollection())
.inCollection("default-collection")
.updateOne(new BasicQuery("{}"), new Update().set("items.$.documents.0.fileId", "new-id")).build();
template.bulkWrite(bulk, BulkWriteOptions.ordered()).block();
verify(client).bulkWrite(captor.capture(), any());
ConcreteClientUpdateOneModel updateModel = extractWriteModel(ConcreteClientUpdateOneModel.class,
captor.getValue().get(0));
captor.getValue().get(1));
assertThat(updateModel.getUpdate())
.contains(new Document("$set", new Document("items.$.documents.0.fileId", "new-id")));
}
@ -336,14 +387,15 @@ class ReactiveMongoTemplateBulkUnitTests { @@ -336,14 +387,15 @@ class ReactiveMongoTemplateBulkUnitTests {
@Test // GH-5087
void shouldRetainNestedArrayPathWithPlaceholdersForMappedEntity() {
Bulk bulk = ops.inCollection("collection-1", OrderTest.class)
Bulk bulk = Bulk.builder().inCollection(OTHER_COLLECTION).insert(simpleInsertInOtherCollection())
.inCollection("collection-1", OrderTest.class)
.updateOne(new BasicQuery("{}"), Update.update("items.$.documents.0.fileId", "file-id")).build();
template.bulkWrite(bulk, BulkWriteOptions.ordered()).block();
verify(client).bulkWrite(captor.capture(), any());
ConcreteClientUpdateOneModel updateModel = extractWriteModel(ConcreteClientUpdateOneModel.class,
captor.getValue().get(0));
captor.getValue().get(1));
assertThat(updateModel.getUpdate())
.contains(new Document("$set", new Document("items.$.documents.0.the_file_id", "file-id")));
}

Loading…
Cancel
Save