diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperations.java new file mode 100644 index 000000000..ce24710f3 --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperations.java @@ -0,0 +1,591 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.core; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.bson.Document; +import org.bson.conversions.Bson; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.dao.DataIntegrityViolationException; +import org.springframework.data.mapping.callback.ReactiveEntityCallbacks; +import org.springframework.data.mongodb.BulkOperationException; +import org.springframework.data.mongodb.core.BulkOperations.BulkMode; +import org.springframework.data.mongodb.core.convert.QueryMapper; +import org.springframework.data.mongodb.core.convert.UpdateMapper; +import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity; +import org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent; +import org.springframework.data.mongodb.core.mapping.event.BeforeConvertEvent; +import org.springframework.data.mongodb.core.mapping.event.BeforeSaveEvent; +import org.springframework.data.mongodb.core.mapping.event.MongoMappingEvent; +import org.springframework.data.mongodb.core.mapping.event.ReactiveAfterSaveCallback; +import org.springframework.data.mongodb.core.mapping.event.ReactiveBeforeConvertCallback; +import org.springframework.data.mongodb.core.mapping.event.ReactiveBeforeSaveCallback; +import org.springframework.data.mongodb.core.query.Collation; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.data.mongodb.core.query.Update; +import org.springframework.data.mongodb.core.query.UpdateDefinition; +import org.springframework.data.mongodb.core.query.UpdateDefinition.ArrayFilter; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; +import org.springframework.util.ObjectUtils; + +import com.mongodb.MongoBulkWriteException; +import com.mongodb.WriteConcern; +import com.mongodb.bulk.BulkWriteResult; +import com.mongodb.client.model.*; +import com.mongodb.reactivestreams.client.MongoCollection; + +/** + * Default implementation for {@link ReactiveBulkOperations}. + * + * @author Christoph Strobl + * @since 4.1 + */ +class DefaultReactiveBulkOperations implements ReactiveBulkOperations { + + private final ReactiveMongoOperations mongoOperations; + private final String collectionName; + private final ReactiveBulkOperationContext bulkOperationContext; + private final List> models = new ArrayList<>(); + + private @Nullable WriteConcern defaultWriteConcern; + + private BulkWriteOptions bulkOptions; + + /** + * Creates a new {@link DefaultReactiveBulkOperations} for the given {@link MongoOperations}, collection name and + * {@link ReactiveBulkOperationContext}. + * + * @param mongoOperations must not be {@literal null}. + * @param collectionName must not be {@literal null}. + * @param bulkOperationContext must not be {@literal null}. + */ + DefaultReactiveBulkOperations(ReactiveMongoOperations mongoOperations, String collectionName, + ReactiveBulkOperationContext bulkOperationContext) { + + Assert.notNull(mongoOperations, "MongoOperations must not be null"); + Assert.hasText(collectionName, "CollectionName must not be null nor empty"); + Assert.notNull(bulkOperationContext, "BulkOperationContext must not be null"); + + this.mongoOperations = mongoOperations; + this.collectionName = collectionName; + this.bulkOperationContext = bulkOperationContext; + this.bulkOptions = getBulkWriteOptions(bulkOperationContext.getBulkMode()); + } + + /** + * Configures the default {@link WriteConcern} to be used. Defaults to {@literal null}. + * + * @param defaultWriteConcern can be {@literal null}. + */ + void setDefaultWriteConcern(@Nullable WriteConcern defaultWriteConcern) { + this.defaultWriteConcern = defaultWriteConcern; + } + + @Override + public ReactiveBulkOperations insert(Object document) { + + Assert.notNull(document, "Document must not be null"); + + this.models.add(Mono.just(document).flatMap(it -> { + maybeEmitEvent(new BeforeConvertEvent<>(it, collectionName)); + return maybeInvokeBeforeConvertCallback(it); + }).map(it -> new SourceAwareWriteModelHolder(it, new InsertOneModel(getMappedObject(it))))); + + return this; + } + + @Override + public ReactiveBulkOperations insert(List documents) { + + Assert.notNull(documents, "Documents must not be null"); + + documents.forEach(this::insert); + + return this; + } + + @Override + @SuppressWarnings("unchecked") + public ReactiveBulkOperations updateOne(Query query, UpdateDefinition update) { + + Assert.notNull(query, "Query must not be null"); + Assert.notNull(update, "Update must not be null"); + + update(query, update, false, false); + return this; + } + + @Override + @SuppressWarnings("unchecked") + public ReactiveBulkOperations updateMulti(Query query, UpdateDefinition update) { + + Assert.notNull(query, "Query must not be null"); + Assert.notNull(update, "Update must not be null"); + + update(query, update, false, true); + return this; + } + + @Override + public ReactiveBulkOperations upsert(Query query, UpdateDefinition update) { + return update(query, update, true, true); + } + + @Override + public ReactiveBulkOperations remove(Query query) { + + Assert.notNull(query, "Query must not be null"); + + DeleteOptions deleteOptions = new DeleteOptions(); + query.getCollation().map(Collation::toMongoCollation).ifPresent(deleteOptions::collation); + + this.models.add(Mono.just(query) + .map(it -> new SourceAwareWriteModelHolder(it, new DeleteManyModel<>(it.getQueryObject(), deleteOptions)))); + + return this; + } + + @Override + public ReactiveBulkOperations remove(List removes) { + + Assert.notNull(removes, "Removals must not be null"); + + for (Query query : removes) { + remove(query); + } + + return this; + } + + @Override + public ReactiveBulkOperations replaceOne(Query query, Object replacement, FindAndReplaceOptions options) { + + Assert.notNull(query, "Query must not be null"); + Assert.notNull(replacement, "Replacement must not be null"); + Assert.notNull(options, "Options must not be null"); + + ReplaceOptions replaceOptions = new ReplaceOptions(); + replaceOptions.upsert(options.isUpsert()); + query.getCollation().map(Collation::toMongoCollation).ifPresent(replaceOptions::collation); + + this.models.add(Mono.just(replacement).flatMap(it -> { + maybeEmitEvent(new BeforeConvertEvent<>(it, collectionName)); + return maybeInvokeBeforeConvertCallback(it); + }).map(it -> new SourceAwareWriteModelHolder(it, + new ReplaceOneModel<>(getMappedQuery(query.getQueryObject()), getMappedObject(it), replaceOptions)))); + + return this; + } + + @Override + public Mono execute() { + + try { + + Mono result = mongoOperations.execute(collectionName, this::bulkWriteTo).next(); + return result; + } finally { + this.bulkOptions = getBulkWriteOptions(bulkOperationContext.getBulkMode()); + } + } + + private Mono bulkWriteTo(MongoCollection collection) { + + if (defaultWriteConcern != null) { + collection = collection.withWriteConcern(defaultWriteConcern); + } + + try { + + Flux concat = Flux.concat(models).flatMap(it -> { + if (it.getModel()instanceof InsertOneModel insertOneModel) { + + Document target = insertOneModel.getDocument(); + maybeEmitBeforeSaveEvent(it); + return maybeInvokeBeforeSaveCallback(it.getSource(), target) + .map(afterCallback -> new SourceAwareWriteModelHolder(afterCallback, mapWriteModel(insertOneModel))); + } else if (it.getModel()instanceof ReplaceOneModel replaceOneModel) { + + Document target = replaceOneModel.getReplacement(); + maybeEmitBeforeSaveEvent(it); + return maybeInvokeBeforeSaveCallback(it.getSource(), target) + .map(afterCallback -> new SourceAwareWriteModelHolder(afterCallback, mapWriteModel(replaceOneModel))); + } + return Mono.just(new SourceAwareWriteModelHolder(it.getSource(), mapWriteModel(it.getModel()))); + }); + MongoCollection theCollection = collection; + return concat.collectList().flatMap(it -> { + + return Mono + .from(theCollection.bulkWrite( + it.stream().map(SourceAwareWriteModelHolder::getModel).collect(Collectors.toList()), bulkOptions)) + .doOnSuccess(state -> { + it.forEach(saved -> { + maybeEmitAfterSaveEvent(saved); + }); + }).flatMap(state -> { + List> monos = it.stream().map(saved -> { + return maybeInvokeAfterSaveCallback(saved); + }).collect(Collectors.toList()); + + return Flux.concat(monos).then(Mono.just(state)); + }); + }); + } catch (RuntimeException ex) { + + if (ex instanceof MongoBulkWriteException) { + + MongoBulkWriteException mongoBulkWriteException = (MongoBulkWriteException) ex; + if (mongoBulkWriteException.getWriteConcernError() != null) { + throw new DataIntegrityViolationException(ex.getMessage(), ex); + } + throw new BulkOperationException(ex.getMessage(), mongoBulkWriteException); + } + + throw ex; + } + } + + /** + * Performs update and upsert bulk operations. + * + * @param query the {@link Query} to determine documents to update. + * @param update the {@link Update} to perform, must not be {@literal null}. + * @param upsert whether to upsert. + * @param multi whether to issue a multi-update. + * @return the {@link BulkOperations} with the update registered. + */ + private ReactiveBulkOperations update(Query query, UpdateDefinition update, boolean upsert, boolean multi) { + + Assert.notNull(query, "Query must not be null"); + Assert.notNull(update, "Update must not be null"); + + UpdateOptions options = computeUpdateOptions(query, update, upsert); + + this.models.add(Mono.just(update).map(it -> { + if (multi) { + return new SourceAwareWriteModelHolder(update, + new UpdateManyModel<>(query.getQueryObject(), it.getUpdateObject(), options)); + } + return new SourceAwareWriteModelHolder(update, + new UpdateOneModel<>(query.getQueryObject(), it.getUpdateObject(), options)); + })); + + return this; + } + + private WriteModel mapWriteModel(WriteModel writeModel) { + + if (writeModel instanceof UpdateOneModel) { + + UpdateOneModel model = (UpdateOneModel) writeModel; + + return new UpdateOneModel<>(getMappedQuery(model.getFilter()), getMappedUpdate(model.getUpdate()), + model.getOptions()); + } + + if (writeModel instanceof UpdateManyModel) { + + UpdateManyModel model = (UpdateManyModel) writeModel; + + return new UpdateManyModel<>(getMappedQuery(model.getFilter()), getMappedUpdate(model.getUpdate()), + model.getOptions()); + } + + if (writeModel instanceof DeleteOneModel) { + + DeleteOneModel model = (DeleteOneModel) writeModel; + + return new DeleteOneModel<>(getMappedQuery(model.getFilter()), model.getOptions()); + } + + if (writeModel instanceof DeleteManyModel) { + + DeleteManyModel model = (DeleteManyModel) writeModel; + + return new DeleteManyModel<>(getMappedQuery(model.getFilter()), model.getOptions()); + } + + return writeModel; + } + + private Bson getMappedUpdate(Bson update) { + return bulkOperationContext.getUpdateMapper().getMappedObject(update, bulkOperationContext.getEntity()); + } + + private Bson getMappedQuery(Bson query) { + return bulkOperationContext.getQueryMapper().getMappedObject(query, bulkOperationContext.getEntity()); + } + + private Document getMappedObject(Object source) { + + if (source instanceof Document) { + return (Document) source; + } + + Document sink = new Document(); + + mongoOperations.getConverter().write(source, sink); + return sink; + } + + private void maybeEmitBeforeSaveEvent(SourceAwareWriteModelHolder holder) { + + if (holder.getModel() instanceof InsertOneModel) { + + Document target = ((InsertOneModel) holder.getModel()).getDocument(); + maybeEmitEvent(new BeforeSaveEvent<>(holder.getSource(), target, collectionName)); + } else if (holder.getModel() instanceof ReplaceOneModel) { + + Document target = ((ReplaceOneModel) holder.getModel()).getReplacement(); + maybeEmitEvent(new BeforeSaveEvent<>(holder.getSource(), target, collectionName)); + } + } + + private void maybeEmitAfterSaveEvent(SourceAwareWriteModelHolder holder) { + + if (holder.getModel() instanceof InsertOneModel) { + + Document target = ((InsertOneModel) holder.getModel()).getDocument(); + maybeEmitEvent(new AfterSaveEvent<>(holder.getSource(), target, collectionName)); + } else if (holder.getModel() instanceof ReplaceOneModel) { + + Document target = ((ReplaceOneModel) holder.getModel()).getReplacement(); + maybeEmitEvent(new AfterSaveEvent<>(holder.getSource(), target, collectionName)); + } + } + + private Mono maybeInvokeAfterSaveCallback(SourceAwareWriteModelHolder holder) { + + if (holder.getModel() instanceof InsertOneModel) { + + Document target = ((InsertOneModel) holder.getModel()).getDocument(); + return maybeInvokeAfterSaveCallback(holder.getSource(), target); + } else if (holder.getModel() instanceof ReplaceOneModel) { + + Document target = ((ReplaceOneModel) holder.getModel()).getReplacement(); + return maybeInvokeAfterSaveCallback(holder.getSource(), target); + } + return Mono.just(holder.getSource()); + } + + private , T> E maybeEmitEvent(E event) { + + if (bulkOperationContext.getEventPublisher() == null) { + return event; + } + + bulkOperationContext.getEventPublisher().publishEvent(event); + return event; + } + + private Mono maybeInvokeBeforeConvertCallback(Object value) { + + if (bulkOperationContext.getEntityCallbacks() == null) { + return Mono.just(value); + } + + return bulkOperationContext.getEntityCallbacks().callback(ReactiveBeforeConvertCallback.class, value, + collectionName); + } + + private Mono maybeInvokeBeforeSaveCallback(Object value, Document mappedDocument) { + + if (bulkOperationContext.getEntityCallbacks() == null) { + return Mono.just(value); + } + + return bulkOperationContext.getEntityCallbacks().callback(ReactiveBeforeSaveCallback.class, value, mappedDocument, + collectionName); + } + + private Mono maybeInvokeAfterSaveCallback(Object value, Document mappedDocument) { + + if (bulkOperationContext.getEntityCallbacks() == null) { + return Mono.just(value); + } + + return bulkOperationContext.getEntityCallbacks().callback(ReactiveAfterSaveCallback.class, value, mappedDocument, + collectionName); + } + + private static BulkWriteOptions getBulkWriteOptions(BulkMode bulkMode) { + + BulkWriteOptions options = new BulkWriteOptions(); + + switch (bulkMode) { + case ORDERED: + return options.ordered(true); + case UNORDERED: + return options.ordered(false); + } + + throw new IllegalStateException("BulkMode was null"); + } + + /** + * @param filterQuery The {@link Query} to read a potential {@link Collation} from. Must not be {@literal null}. + * @param update The {@link Update} to apply + * @param upsert flag to indicate if document should be upserted. + * @return new instance of {@link UpdateOptions}. + */ + private static UpdateOptions computeUpdateOptions(Query filterQuery, UpdateDefinition update, boolean upsert) { + + UpdateOptions options = new UpdateOptions(); + options.upsert(upsert); + + if (update.hasArrayFilters()) { + List list = new ArrayList<>(update.getArrayFilters().size()); + for (ArrayFilter arrayFilter : update.getArrayFilters()) { + list.add(arrayFilter.asDocument()); + } + options.arrayFilters(list); + } + + filterQuery.getCollation().map(Collation::toMongoCollation).ifPresent(options::collation); + return options; + } + + /** + * {@link ReactiveBulkOperationContext} holds information about {@link BulkMode} the entity in use as well as + * references to {@link QueryMapper} and {@link UpdateMapper}. + * + * @author Christoph Strobl + * @since 2.0 + */ + static final class ReactiveBulkOperationContext { + + private final BulkMode bulkMode; + private final Optional> entity; + private final QueryMapper queryMapper; + private final UpdateMapper updateMapper; + private final ApplicationEventPublisher eventPublisher; + private final ReactiveEntityCallbacks entityCallbacks; + + ReactiveBulkOperationContext(BulkMode bulkMode, Optional> entity, + QueryMapper queryMapper, UpdateMapper updateMapper, ApplicationEventPublisher eventPublisher, + ReactiveEntityCallbacks entityCallbacks) { + + this.bulkMode = bulkMode; + this.entity = entity; + this.queryMapper = queryMapper; + this.updateMapper = updateMapper; + this.eventPublisher = eventPublisher; + this.entityCallbacks = entityCallbacks; + } + + public BulkMode getBulkMode() { + return this.bulkMode; + } + + public Optional> getEntity() { + return this.entity; + } + + public QueryMapper getQueryMapper() { + return this.queryMapper; + } + + public UpdateMapper getUpdateMapper() { + return this.updateMapper; + } + + public ApplicationEventPublisher getEventPublisher() { + return this.eventPublisher; + } + + public ReactiveEntityCallbacks getEntityCallbacks() { + return this.entityCallbacks; + } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + ReactiveBulkOperationContext that = (ReactiveBulkOperationContext) o; + + if (bulkMode != that.bulkMode) + return false; + if (!ObjectUtils.nullSafeEquals(this.entity, that.entity)) { + return false; + } + if (!ObjectUtils.nullSafeEquals(this.queryMapper, that.queryMapper)) { + return false; + } + if (!ObjectUtils.nullSafeEquals(this.updateMapper, that.updateMapper)) { + return false; + } + if (!ObjectUtils.nullSafeEquals(this.eventPublisher, that.eventPublisher)) { + return false; + } + return ObjectUtils.nullSafeEquals(this.entityCallbacks, that.entityCallbacks); + } + + @Override + public int hashCode() { + int result = bulkMode != null ? bulkMode.hashCode() : 0; + result = 31 * result + ObjectUtils.nullSafeHashCode(entity); + result = 31 * result + ObjectUtils.nullSafeHashCode(queryMapper); + result = 31 * result + ObjectUtils.nullSafeHashCode(updateMapper); + result = 31 * result + ObjectUtils.nullSafeHashCode(eventPublisher); + result = 31 * result + ObjectUtils.nullSafeHashCode(entityCallbacks); + return result; + } + + public String toString() { + return "DefaultBulkOperations.BulkOperationContext(bulkMode=" + this.getBulkMode() + ", entity=" + + this.getEntity() + ", queryMapper=" + this.getQueryMapper() + ", updateMapper=" + this.getUpdateMapper() + + ", eventPublisher=" + this.getEventPublisher() + ", entityCallbacks=" + this.getEntityCallbacks() + ")"; + } + } + + /** + * Value object chaining together an actual source with its {@link WriteModel} representation. + * + * @since 4.1 + * @author Christoph Strobl + */ + private static final class SourceAwareWriteModelHolder { + + private final Object source; + private final WriteModel model; + + SourceAwareWriteModelHolder(Object source, WriteModel model) { + + this.source = source; + this.model = model; + } + + public Object getSource() { + return this.source; + } + + public WriteModel getModel() { + return this.model; + } + } +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveBulkOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveBulkOperations.java new file mode 100644 index 000000000..b477c98fe --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveBulkOperations.java @@ -0,0 +1,130 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.core; + +import java.util.List; + +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.data.mongodb.core.query.Update; +import org.springframework.data.mongodb.core.query.UpdateDefinition; +import org.springframework.data.util.Pair; + +import com.mongodb.bulk.BulkWriteResult; +import reactor.core.publisher.Mono; + +/** + * Bulk operations for insert/update/remove actions on a collection. Bulk operations are available since MongoDB 2.6 and + * make use of low level bulk commands on the protocol level. This interface defines a fluent API to add multiple single + * operations or list of similar operations in sequence which can then eventually be executed by calling + * {@link #execute()}. + *

+ * Bulk operations are issued as one batch that pulls together all insert, update, and delete operations. Operations + * that require individual operation results such as optimistic locking (using {@code @Version}) are not supported and + * the version field remains not populated. + * + * @author Christoph Strobl + * @since 4.1 + */ +public interface ReactiveBulkOperations { + + /** + * Add a single insert to the bulk operation. + * + * @param documents the document to insert, must not be {@literal null}. + * @return the current {@link ReactiveBulkOperations} instance with the insert added, will never be {@literal null}. + */ + ReactiveBulkOperations insert(Object documents); + + /** + * Add a list of inserts to the bulk operation. + * + * @param documents List of documents to insert, must not be {@literal null}. + * @return the current {@link ReactiveBulkOperations} instance with the insert added, will never be {@literal null}. + */ + ReactiveBulkOperations insert(List documents); + + /** + * Add a single update to the bulk operation. For the update request, only the first matching document is updated. + * + * @param query update criteria, must not be {@literal null}. + * @param update {@link UpdateDefinition} operation to perform, must not be {@literal null}. + * @return the current {@link ReactiveBulkOperations} instance with the update added, will never be {@literal null}. + */ + ReactiveBulkOperations updateOne(Query query, UpdateDefinition update); + + /** + * Add a single update to the bulk operation. For the update request, all matching documents are updated. + * + * @param query Update criteria. + * @param update Update operation to perform. + * @return the current {@link ReactiveBulkOperations} instance with the update added, will never be {@literal null}. + */ + ReactiveBulkOperations updateMulti(Query query, UpdateDefinition update); + + /** + * Add a single upsert to the bulk operation. An upsert is an update if the set of matching documents is not empty, + * else an insert. + * + * @param query Update criteria. + * @param update Update operation to perform. + * @return the current {@link ReactiveBulkOperations} instance with the update added, will never be {@literal null}. + */ + ReactiveBulkOperations upsert(Query query, UpdateDefinition update); + + /** + * Add a single remove operation to the bulk operation. + * + * @param remove the {@link Query} to select the documents to be removed, must not be {@literal null}. + * @return the current {@link ReactiveBulkOperations} instance with the removal added, will never be {@literal null}. + */ + ReactiveBulkOperations remove(Query remove); + + /** + * Add a list of remove operations to the bulk operation. + * + * @param removes the remove operations to perform, must not be {@literal null}. + * @return the current {@link ReactiveBulkOperations} instance with the removal added, will never be {@literal null}. + */ + ReactiveBulkOperations remove(List removes); + + /** + * Add a single replace operation to the bulk operation. + * + * @param query Update criteria. + * @param replacement the replacement document. Must not be {@literal null}. + * @return the current {@link ReactiveBulkOperations} instance with the replace added, will never be {@literal null}. + */ + default ReactiveBulkOperations replaceOne(Query query, Object replacement) { + return replaceOne(query, replacement, FindAndReplaceOptions.empty()); + } + + /** + * Add a single replace operation to the bulk operation. + * + * @param query Update criteria. + * @param replacement the replacement document. Must not be {@literal null}. + * @param options the {@link FindAndModifyOptions} holding additional information. Must not be {@literal null}. + * @return the current {@link ReactiveBulkOperations} instance with the replace added, will never be {@literal null}. + */ + ReactiveBulkOperations replaceOne(Query query, Object replacement, FindAndReplaceOptions options); + + /** + * Execute all bulk operations using the default write concern. + * + * @return a {@link Mono} emitting the result of the bulk operation providing counters for inserts/updates etc. + */ + Mono execute(); +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java index af3698965..20fb2fdf9 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java @@ -29,6 +29,7 @@ import org.springframework.data.domain.KeysetScrollPosition; import org.springframework.data.domain.Window; import org.springframework.data.geo.GeoResult; import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory; +import org.springframework.data.mongodb.core.BulkOperations.BulkMode; import org.springframework.data.mongodb.core.aggregation.Aggregation; import org.springframework.data.mongodb.core.aggregation.AggregationOperation; import org.springframework.data.mongodb.core.aggregation.AggregationOptions; @@ -1747,6 +1748,40 @@ public interface ReactiveMongoOperations extends ReactiveFluentMongoOperations { Flux mapReduce(Query filterQuery, Class domainType, String inputCollectionName, Class resultType, String mapFunction, String reduceFunction, MapReduceOptions options); + /** + * Returns a new {@link ReactiveBulkOperations} for the given collection.
+ * NOTE: Any additional support for field mapping, etc. is not available for {@literal update} or + * {@literal remove} operations in bulk mode due to the lack of domain type information. Use + * {@link #bulkOps(BulkMode, Class, String)} to get full type specific support. + * + * @param mode the {@link BulkMode} to use for bulk operations, must not be {@literal null}. + * @param collectionName the name of the collection to work on, must not be {@literal null} or empty. + * @return {@link ReactiveBulkOperations} on the named collection + * @since 4.1 + */ + ReactiveBulkOperations bulkOps(BulkMode mode, String collectionName); + + /** + * Returns a new {@link ReactiveBulkOperations} for the given entity type. + * + * @param mode the {@link BulkMode} to use for bulk operations, must not be {@literal null}. + * @param entityClass the name of the entity class, must not be {@literal null}. + * @return {@link ReactiveBulkOperations} on the named collection associated of the given entity class. + * @since 4.1 + */ + ReactiveBulkOperations bulkOps(BulkMode mode, Class entityClass); + + /** + * Returns a new {@link ReactiveBulkOperations} for the given entity type and collection name. + * + * @param mode the {@link BulkMode} to use for bulk operations, must not be {@literal null}. + * @param entityType the name of the entity class. Can be {@literal null}. + * @param collectionName the name of the collection to work on, must not be {@literal null} or empty. + * @return {@link ReactiveBulkOperations} on the named collection associated with the given entity class. + * @since 4.1 + */ + ReactiveBulkOperations bulkOps(BulkMode mode, @Nullable Class entityType, String collectionName); + /** * Returns the underlying {@link MongoConverter}. * diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java index 594ca3541..63543dad5 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java @@ -17,6 +17,8 @@ package org.springframework.data.mongodb.core; import static org.springframework.data.mongodb.core.query.SerializationUtils.*; +import org.springframework.data.mongodb.core.BulkOperations.BulkMode; +import org.springframework.data.mongodb.core.DefaultReactiveBulkOperations.ReactiveBulkOperationContext; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; @@ -479,6 +481,31 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati return new DefaultReactiveIndexOperations(this, getCollectionName(entityClass), this.queryMapper, entityClass); } + @Override + public ReactiveBulkOperations bulkOps(BulkMode bulkMode, String collectionName) { + return bulkOps(bulkMode, null, collectionName); + } + + @Override + public ReactiveBulkOperations bulkOps(BulkMode bulkMode, Class entityClass) { + return bulkOps(bulkMode, entityClass, getCollectionName(entityClass)); + } + + @Override + public ReactiveBulkOperations bulkOps(BulkMode mode, @Nullable Class entityType, String collectionName) { + + Assert.notNull(mode, "BulkMode must not be null"); + Assert.hasText(collectionName, "Collection name must not be null or empty"); + + DefaultReactiveBulkOperations operations = new DefaultReactiveBulkOperations(this, collectionName, + new ReactiveBulkOperationContext(mode, Optional.ofNullable(getPersistentEntity(entityType)), queryMapper, updateMapper, + eventPublisher, entityCallbacks)); + + operations.setDefaultWriteConcern(writeConcern); + + return operations; + } + @Override public String getCollectionName(Class entityClass) { return operations.determineCollectionName(entityClass); diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperationsTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperationsTests.java new file mode 100644 index 000000000..c17f76fae --- /dev/null +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperationsTests.java @@ -0,0 +1,346 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.core; + +import static org.assertj.core.api.Assertions.*; + +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.stream.Stream; + +import org.bson.Document; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.MethodSource; +import org.springframework.dao.DuplicateKeyException; +import org.springframework.data.mongodb.core.BulkOperations.BulkMode; +import org.springframework.data.mongodb.core.DefaultReactiveBulkOperations.ReactiveBulkOperationContext; +import org.springframework.data.mongodb.core.convert.QueryMapper; +import org.springframework.data.mongodb.core.convert.UpdateMapper; +import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity; +import org.springframework.data.mongodb.core.query.Criteria; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.data.mongodb.core.query.Update; +import org.springframework.data.mongodb.test.util.MongoTemplateExtension; +import org.springframework.data.mongodb.test.util.ReactiveMongoTestTemplate; +import org.springframework.data.mongodb.test.util.Template; + +import com.mongodb.MongoBulkWriteException; +import com.mongodb.WriteConcern; +import com.mongodb.bulk.BulkWriteResult; + +/** + * @author Christoph Strobl + */ +@ExtendWith(MongoTemplateExtension.class) +class DefaultReactiveBulkOperationsTests { + + static final String COLLECTION_NAME = "reactive-bulk-ops"; + + @Template(initialEntitySet = BaseDoc.class) static ReactiveMongoTestTemplate template; + + @BeforeEach + public void setUp() { + template.flush(COLLECTION_NAME).as(StepVerifier::create).verifyComplete(); + } + + @Test // GH-2821 + void insertOrdered() { + + List documents = Arrays.asList(newDoc("1"), newDoc("2")); + + createBulkOps(BulkMode.ORDERED).insert(documents) // + .execute().as(StepVerifier::create) // + .consumeNextWith(result -> { + assertThat(result.getInsertedCount()).isEqualTo(2); + }); + } + + @Test // GH-2821 + void insertOrderedFails() { + + List documents = Arrays.asList(newDoc("1"), newDoc("1"), newDoc("2")); + + createBulkOps(BulkMode.ORDERED).insert(documents) // + .execute().as(StepVerifier::create) // + .verifyErrorSatisfies(error -> { + assertThat(error).isInstanceOf(DuplicateKeyException.class); + }); + } + + @Test // GH-2821 + public void insertUnOrdered() { + + List documents = Arrays.asList(newDoc("1"), newDoc("2")); + + createBulkOps(BulkMode.UNORDERED).insert(documents) // + .execute().as(StepVerifier::create) // + .consumeNextWith(result -> { + assertThat(result.getInsertedCount()).isEqualTo(2); + }); + } + + @Test // GH-2821 + public void insertUnOrderedContinuesOnError() { + + List documents = Arrays.asList(newDoc("1"), newDoc("1"), newDoc("2")); + + createBulkOps(BulkMode.UNORDERED).insert(documents) // + .execute().as(StepVerifier::create) // + .verifyErrorSatisfies(error -> { + + assertThat(error).isInstanceOf(DuplicateKeyException.class); + assertThat(error.getCause()).isInstanceOf(MongoBulkWriteException.class); + + MongoBulkWriteException cause = (MongoBulkWriteException) error.getCause(); + assertThat(cause.getWriteResult().getInsertedCount()).isEqualTo(2); + assertThat(cause.getWriteErrors()).isNotNull(); + assertThat(cause.getWriteErrors().size()).isOne(); + }); + } + + @Test // GH-2821 + void upsertDoesUpdate() { + + insertSomeDocuments(); + + createBulkOps(BulkMode.ORDERED).// + upsert(where("value", "value1"), set("value", "value2")).// + execute().as(StepVerifier::create) // + .consumeNextWith(result -> { + assertThat(result).isNotNull(); + assertThat(result.getMatchedCount()).isEqualTo(2); + assertThat(result.getModifiedCount()).isEqualTo(2); + assertThat(result.getInsertedCount()).isZero(); + assertThat(result.getUpserts()).isNotNull(); + assertThat(result.getUpserts().size()).isZero(); + }) // + .verifyComplete(); + } + + @Test // GH-2821 + public void upsertDoesInsert() { + + createBulkOps(BulkMode.ORDERED).// + upsert(where("_id", "1"), set("value", "v1")).// + execute().as(StepVerifier::create) // + .consumeNextWith(result -> { + + assertThat(result).isNotNull(); + assertThat(result.getMatchedCount()).isZero(); + assertThat(result.getModifiedCount()).isZero(); + assertThat(result.getUpserts()).isNotNull(); + assertThat(result.getUpserts().size()).isOne(); + }) // + .verifyComplete(); + } + + @ParameterizedTest // GH-2821 + @MethodSource + public void testUpdates(BulkMode mode, boolean multi, int expectedUpdateCount) { + + insertSomeDocuments(); + ReactiveBulkOperations bulkOps = createBulkOps(mode); + + if (multi) { + bulkOps.updateMulti(where("value", "value1"), set("value", "value3")); + bulkOps.updateMulti(where("value", "value2"), set("value", "value4")); + } else { + bulkOps.updateOne(where("value", "value1"), set("value", "value3")); + bulkOps.updateOne(where("value", "value2"), set("value", "value4")); + } + + bulkOps.execute().map(BulkWriteResult::getModifiedCount) // + .as(StepVerifier::create) // + .expectNext(expectedUpdateCount) // + .verifyComplete(); + } + + private static Stream testUpdates() { + return Stream.of(Arguments.of(BulkMode.ORDERED, false, 2), Arguments.of(BulkMode.ORDERED, true, 4), + Arguments.of(BulkMode.UNORDERED, false, 2), Arguments.of(BulkMode.UNORDERED, false, 2)); + } + + @ParameterizedTest // GH-2821 + @EnumSource(BulkMode.class) + void testRemove(BulkMode mode) { + + insertSomeDocuments(); + + List removes = Arrays.asList(where("_id", "1"), where("value", "value2")); + + createBulkOps(mode).remove(removes).execute().map(BulkWriteResult::getDeletedCount).as(StepVerifier::create) + .expectNext(3).verifyComplete(); + } + + @ParameterizedTest // GH-2821 + @EnumSource(BulkMode.class) + void testReplaceOne(BulkMode mode) { + + insertSomeDocuments(); + + Query query = where("_id", "1"); + Document document = rawDoc("1", "value2"); + createBulkOps(mode).replaceOne(query, document).execute().map(BulkWriteResult::getModifiedCount) + .as(StepVerifier::create).expectNext(1).verifyComplete(); + } + + @Test // GH-2821 + public void replaceOneDoesReplace() { + + insertSomeDocuments(); + + createBulkOps(BulkMode.ORDERED).// + replaceOne(where("_id", "1"), rawDoc("1", "value2")).// + execute().as(StepVerifier::create).consumeNextWith(result -> { + + assertThat(result).isNotNull(); + assertThat(result.getMatchedCount()).isOne(); + assertThat(result.getModifiedCount()).isOne(); + assertThat(result.getInsertedCount()).isZero(); + }).verifyComplete(); + } + + @Test // GH-2821 + public void replaceOneWithUpsert() { + + createBulkOps(BulkMode.ORDERED).// + replaceOne(where("_id", "1"), rawDoc("1", "value2"), FindAndReplaceOptions.options().upsert()).// + execute().as(StepVerifier::create).consumeNextWith(result -> { + + assertThat(result).isNotNull(); + assertThat(result.getMatchedCount()).isZero(); + assertThat(result.getInsertedCount()).isZero(); + assertThat(result.getModifiedCount()).isZero(); + assertThat(result.getUpserts().size()).isOne(); + }); + } + + @Test // GH-2821 + public void mixedBulkOrdered() { + + createBulkOps(BulkMode.ORDERED, BaseDoc.class).insert(newDoc("1", "v1")).// + updateOne(where("_id", "1"), set("value", "v2")).// + remove(where("value", "v2")).// + execute().as(StepVerifier::create).consumeNextWith(result -> { + + assertThat(result).isNotNull(); + assertThat(result.getInsertedCount()).isOne(); + assertThat(result.getModifiedCount()).isOne(); + assertThat(result.getDeletedCount()).isOne(); + }).verifyComplete(); + } + + @Test // GH-2821 + public void mixedBulkOrderedWithList() { + + List inserts = Arrays.asList(newDoc("1", "v1"), newDoc("2", "v2"), newDoc("3", "v2")); + List removes = Arrays.asList(where("_id", "1")); + + createBulkOps(BulkMode.ORDERED, BaseDoc.class).insert(inserts).updateMulti(where("value", "v2"), set("value", "v3")) + .remove(removes).execute().as(StepVerifier::create).consumeNextWith(result -> { + + assertThat(result).isNotNull(); + assertThat(result.getInsertedCount()).isEqualTo(3); + assertThat(result.getModifiedCount()).isEqualTo(2); + assertThat(result.getDeletedCount()).isOne(); + }).verifyComplete(); + } + + @Test // GH-2821 + public void insertShouldConsiderInheritance() { + + SpecialDoc specialDoc = new SpecialDoc(); + specialDoc.id = "id-special"; + specialDoc.value = "normal-value"; + specialDoc.specialValue = "special-value"; + + createBulkOps(BulkMode.ORDERED, SpecialDoc.class).insert(Arrays.asList(specialDoc)).execute().then() + .as(StepVerifier::create).verifyComplete(); + + template.findOne(where("_id", specialDoc.id), BaseDoc.class, COLLECTION_NAME).as(StepVerifier::create) + .consumeNextWith(doc -> { + + assertThat(doc).isNotNull(); + assertThat(doc).isInstanceOf(SpecialDoc.class); + }).verifyComplete(); + } + + private void insertSomeDocuments() { + + template.execute(COLLECTION_NAME, collection -> { + return Flux.from(collection.insertMany( + List.of(rawDoc("1", "value1"), rawDoc("2", "value1"), rawDoc("3", "value2"), rawDoc("4", "value2")))); + }).then().as(StepVerifier::create).verifyComplete(); + + } + + private DefaultReactiveBulkOperations createBulkOps(BulkMode mode) { + return createBulkOps(mode, null); + } + + private DefaultReactiveBulkOperations createBulkOps(BulkMode mode, Class entityType) { + + Optional> entity = entityType != null + ? Optional.of(template.getConverter().getMappingContext().getPersistentEntity(entityType)) + : Optional.empty(); + + ReactiveBulkOperationContext bulkOperationContext = new ReactiveBulkOperationContext(mode, entity, + new QueryMapper(template.getConverter()), new UpdateMapper(template.getConverter()), null, null); + + DefaultReactiveBulkOperations bulkOps = new DefaultReactiveBulkOperations(template, COLLECTION_NAME, + bulkOperationContext); + bulkOps.setDefaultWriteConcern(WriteConcern.ACKNOWLEDGED); + + return bulkOps; + } + + private static BaseDoc newDoc(String id) { + + BaseDoc doc = new BaseDoc(); + doc.id = id; + + return doc; + } + + private static BaseDoc newDoc(String id, String value) { + + BaseDoc doc = newDoc(id); + doc.value = value; + + return doc; + } + + private static Query where(String field, String value) { + return new Query().addCriteria(Criteria.where(field).is(value)); + } + + private static Update set(String field, String value) { + return new Update().set(field, value); + } + + private static Document rawDoc(String id, String value) { + return new Document("_id", id).append("value", value); + } +} diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperationsUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperationsUnitTests.java new file mode 100644 index 000000000..2d30d14d1 --- /dev/null +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperationsUnitTests.java @@ -0,0 +1,347 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.core; + +import static org.assertj.core.api.Assertions.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; +import static org.springframework.data.mongodb.core.query.Criteria.*; +import static org.springframework.data.mongodb.core.query.Query.*; + +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.util.List; +import java.util.Optional; + +import org.bson.BsonDocument; +import org.bson.BsonString; +import org.bson.Document; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Answers; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.data.annotation.Id; +import org.springframework.data.mapping.callback.ReactiveEntityCallbacks; +import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory; +import org.springframework.data.mongodb.core.BulkOperations.BulkMode; +import org.springframework.data.mongodb.core.DefaultBulkOperationsUnitTests.NullExceptionTranslator; +import org.springframework.data.mongodb.core.DefaultReactiveBulkOperations.ReactiveBulkOperationContext; +import org.springframework.data.mongodb.core.convert.MappingMongoConverter; +import org.springframework.data.mongodb.core.convert.MongoConverter; +import org.springframework.data.mongodb.core.convert.NoOpDbRefResolver; +import org.springframework.data.mongodb.core.convert.QueryMapper; +import org.springframework.data.mongodb.core.convert.UpdateMapper; +import org.springframework.data.mongodb.core.mapping.Field; +import org.springframework.data.mongodb.core.mapping.MongoMappingContext; +import org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent; +import org.springframework.data.mongodb.core.mapping.event.BeforeConvertEvent; +import org.springframework.data.mongodb.core.mapping.event.BeforeSaveEvent; +import org.springframework.data.mongodb.core.mapping.event.ReactiveAfterSaveCallback; +import org.springframework.data.mongodb.core.mapping.event.ReactiveBeforeConvertCallback; +import org.springframework.data.mongodb.core.mapping.event.ReactiveBeforeSaveCallback; +import org.springframework.data.mongodb.core.query.BasicQuery; +import org.springframework.data.mongodb.core.query.Collation; +import org.springframework.data.mongodb.core.query.Criteria; +import org.springframework.data.mongodb.core.query.Update; + +import com.mongodb.MongoWriteException; +import com.mongodb.WriteError; +import com.mongodb.bulk.BulkWriteResult; +import com.mongodb.client.model.BulkWriteOptions; +import com.mongodb.client.model.DeleteManyModel; +import com.mongodb.client.model.InsertOneModel; +import com.mongodb.client.model.ReplaceOneModel; +import com.mongodb.client.model.UpdateOneModel; +import com.mongodb.client.model.WriteModel; +import com.mongodb.reactivestreams.client.MongoCollection; +import com.mongodb.reactivestreams.client.MongoDatabase; + +/** + * @author Christoph Strobl + */ +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +public class DefaultReactiveBulkOperationsUnitTests { + + ReactiveMongoTemplate template; + @Mock ReactiveMongoDatabaseFactory factory; + + @Mock MongoDatabase database; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) MongoCollection collection; + @Captor ArgumentCaptor>> captor; + + private MongoConverter converter; + private MongoMappingContext mappingContext; + + private DefaultReactiveBulkOperations ops; + + @BeforeEach + void setUp() { + + when(factory.getMongoDatabase()).thenReturn(Mono.just(database)); + when(factory.getExceptionTranslator()).thenReturn(new NullExceptionTranslator()); + when(database.getCollection(anyString(), eq(Document.class))).thenReturn(collection); + when(collection.bulkWrite(anyList(), any())).thenReturn(Mono.just(mock(BulkWriteResult.class))); + + mappingContext = new MongoMappingContext(); + mappingContext.afterPropertiesSet(); + + converter = new MappingMongoConverter(NoOpDbRefResolver.INSTANCE, mappingContext); + template = new ReactiveMongoTemplate(factory, converter); + + ops = new DefaultReactiveBulkOperations(template, "collection-1", + new ReactiveBulkOperationContext(BulkMode.ORDERED, + Optional.of(mappingContext.getPersistentEntity(SomeDomainType.class)), new QueryMapper(converter), + new UpdateMapper(converter), null, null)); + } + + @Test // GH-2821 + void updateOneShouldUseCollationWhenPresent() { + + ops.updateOne(new BasicQuery("{}").collation(Collation.of("de")), new Update().set("lastName", "targaryen")) + .execute().subscribe(); + + verify(collection).bulkWrite(captor.capture(), any()); + assertThat(captor.getValue().get(0)).isInstanceOf(UpdateOneModel.class); + assertThat(((UpdateOneModel) captor.getValue().get(0)).getOptions().getCollation()) + .isEqualTo(com.mongodb.client.model.Collation.builder().locale("de").build()); + } + + @Test // GH-2821 + void replaceOneShouldUseCollationWhenPresent() { + + ops.replaceOne(new BasicQuery("{}").collation(Collation.of("de")), new SomeDomainType()).execute().subscribe(); + + verify(collection).bulkWrite(captor.capture(), any()); + + assertThat(captor.getValue().get(0)).isInstanceOf(ReplaceOneModel.class); + assertThat(((ReplaceOneModel) captor.getValue().get(0)).getReplaceOptions().getCollation()) + .isEqualTo(com.mongodb.client.model.Collation.builder().locale("de").build()); + } + + @Test // GH-2821 + void removeShouldUseCollationWhenPresent() { + + ops.remove(new BasicQuery("{}").collation(Collation.of("de"))).execute().subscribe(); + + verify(collection).bulkWrite(captor.capture(), any()); + + assertThat(captor.getValue().get(0)).isInstanceOf(DeleteManyModel.class); + assertThat(((DeleteManyModel) captor.getValue().get(0)).getOptions().getCollation()) + .isEqualTo(com.mongodb.client.model.Collation.builder().locale("de").build()); + } + + @Test // GH-2821 + void bulkUpdateShouldMapQueryAndUpdateCorrectly() { + + ops.updateOne(query(where("firstName").is("danerys")), Update.update("firstName", "queen danerys")).execute() + .subscribe(); + + verify(collection).bulkWrite(captor.capture(), any()); + + UpdateOneModel updateModel = (UpdateOneModel) captor.getValue().get(0); + assertThat(updateModel.getFilter()).isEqualTo(new Document("first_name", "danerys")); + assertThat(updateModel.getUpdate()).isEqualTo(new Document("$set", new Document("first_name", "queen danerys"))); + } + + @Test // GH-2821 + void bulkRemoveShouldMapQueryCorrectly() { + + ops.remove(query(where("firstName").is("danerys"))).execute().subscribe(); + + verify(collection).bulkWrite(captor.capture(), any()); + + DeleteManyModel updateModel = (DeleteManyModel) captor.getValue().get(0); + assertThat(updateModel.getFilter()).isEqualTo(new Document("first_name", "danerys")); + } + + @Test // GH-2821 + void bulkReplaceOneShouldMapQueryCorrectly() { + + SomeDomainType replacement = new SomeDomainType(); + replacement.firstName = "Minsu"; + replacement.lastName = "Kim"; + + ops.replaceOne(query(where("firstName").is("danerys")), replacement).execute().subscribe(); + + verify(collection).bulkWrite(captor.capture(), any()); + + ReplaceOneModel updateModel = (ReplaceOneModel) captor.getValue().get(0); + assertThat(updateModel.getFilter()).isEqualTo(new Document("first_name", "danerys")); + assertThat(updateModel.getReplacement().getString("first_name")).isEqualTo("Minsu"); + assertThat(updateModel.getReplacement().getString("lastName")).isEqualTo("Kim"); + } + + @Test // GH-2821 + void bulkInsertInvokesEntityCallbacks() { + + BeforeConvertPersonCallback beforeConvertCallback = spy(new BeforeConvertPersonCallback()); + BeforeSavePersonCallback beforeSaveCallback = spy(new BeforeSavePersonCallback()); + AfterSavePersonCallback afterSaveCallback = spy(new AfterSavePersonCallback()); + + ops = new DefaultReactiveBulkOperations(template, "collection-1", + new ReactiveBulkOperationContext(BulkMode.ORDERED, + Optional.of(mappingContext.getPersistentEntity(Person.class)), new QueryMapper(converter), + new UpdateMapper(converter), null, + ReactiveEntityCallbacks.create(beforeConvertCallback, beforeSaveCallback, afterSaveCallback))); + + Person entity = new Person("init"); + ops.insert(entity); + + ArgumentCaptor personArgumentCaptor = ArgumentCaptor.forClass(Person.class); + verifyNoInteractions(beforeConvertCallback); + verifyNoInteractions(beforeSaveCallback); + + ops.execute().then().as(StepVerifier::create).verifyComplete(); + + verify(beforeConvertCallback).onBeforeConvert(personArgumentCaptor.capture(), eq("collection-1")); + verify(beforeSaveCallback).onBeforeSave(personArgumentCaptor.capture(), any(), eq("collection-1")); + verify(afterSaveCallback).onAfterSave(personArgumentCaptor.capture(), any(), eq("collection-1")); + assertThat(personArgumentCaptor.getAllValues()).extracting("firstName").containsExactly("init", "before-convert", + "before-save"); + verify(collection).bulkWrite(captor.capture(), any()); + + InsertOneModel updateModel = (InsertOneModel) captor.getValue().get(0); + assertThat(updateModel.getDocument()).containsEntry("firstName", "after-save"); + } + + @Test // GH-2821 + void bulkReplaceOneEmitsEventsCorrectly() { + + ApplicationEventPublisher eventPublisher = mock(ApplicationEventPublisher.class); + + ops = new DefaultReactiveBulkOperations(template, "collection-1", + new ReactiveBulkOperationContext(BulkMode.ORDERED, + Optional.of(mappingContext.getPersistentEntity(Person.class)), new QueryMapper(converter), + new UpdateMapper(converter), eventPublisher, null)); + + ops.replaceOne(query(where("firstName").is("danerys")), new SomeDomainType()); + + verify(eventPublisher, never()).publishEvent(any(BeforeConvertEvent.class)); + verify(eventPublisher, never()).publishEvent(any(BeforeSaveEvent.class)); + verify(eventPublisher, never()).publishEvent(any(AfterSaveEvent.class)); + + ops.execute().then().as(StepVerifier::create).verifyComplete(); + + verify(eventPublisher).publishEvent(any(BeforeConvertEvent.class)); + verify(eventPublisher).publishEvent(any(BeforeSaveEvent.class)); + verify(eventPublisher).publishEvent(any(AfterSaveEvent.class)); + } + + @Test // GH-2821 + void bulkInsertEmitsEventsCorrectly() { + + ApplicationEventPublisher eventPublisher = mock(ApplicationEventPublisher.class); + + ops = new DefaultReactiveBulkOperations(template, "collection-1", + new ReactiveBulkOperationContext(BulkMode.ORDERED, + Optional.of(mappingContext.getPersistentEntity(Person.class)), new QueryMapper(converter), + new UpdateMapper(converter), eventPublisher, null)); + + ops.insert(new SomeDomainType()); + + verify(eventPublisher, never()).publishEvent(any(BeforeConvertEvent.class)); + verify(eventPublisher, never()).publishEvent(any(BeforeSaveEvent.class)); + verify(eventPublisher, never()).publishEvent(any(AfterSaveEvent.class)); + + ops.execute().then().as(StepVerifier::create).verifyComplete(); + + verify(eventPublisher).publishEvent(any(BeforeConvertEvent.class)); + verify(eventPublisher).publishEvent(any(BeforeSaveEvent.class)); + verify(eventPublisher).publishEvent(any(AfterSaveEvent.class)); + } + + @Test // GH-2821 + void noAfterSaveEventOnFailure() { + + ApplicationEventPublisher eventPublisher = mock(ApplicationEventPublisher.class); + + when(collection.bulkWrite(anyList(), any(BulkWriteOptions.class))).thenThrow(new MongoWriteException( + new WriteError(89, "NetworkTimeout", new BsonDocument("hi", new BsonString("there"))), null)); + + ops = new DefaultReactiveBulkOperations(template, "collection-1", + new ReactiveBulkOperationContext(BulkMode.ORDERED, + Optional.of(mappingContext.getPersistentEntity(Person.class)), new QueryMapper(converter), + new UpdateMapper(converter), eventPublisher, null)); + + ops.insert(new SomeDomainType()); + + ops.execute().as(StepVerifier::create).expectError(); + + verify(eventPublisher, never()).publishEvent(any(AfterSaveEvent.class)); + } + + @Test // GH-2821 + void appliesArrayFilterWhenPresent() { + + ops.updateOne(new BasicQuery("{}"), new Update().filterArray(Criteria.where("element").gte(100))).execute() + .subscribe(); + + verify(collection).bulkWrite(captor.capture(), any()); + + UpdateOneModel updateModel = (UpdateOneModel) captor.getValue().get(0); + assertThat(updateModel.getOptions().getArrayFilters().get(0)) + .isEqualTo(new org.bson.Document("element", new Document("$gte", 100))); + } + + static class BeforeConvertPersonCallback implements ReactiveBeforeConvertCallback { + + @Override + public Mono onBeforeConvert(Person entity, String collection) { + return Mono.just(new Person("before-convert")); + } + } + + static class BeforeSavePersonCallback implements ReactiveBeforeSaveCallback { + + @Override + public Mono onBeforeSave(Person entity, Document document, String collection) { + + document.put("firstName", "before-save"); + return Mono.just(new Person("before-save")); + } + } + + static class AfterSavePersonCallback implements ReactiveAfterSaveCallback { + + @Override + public Mono onAfterSave(Person entity, Document document, String collection) { + + document.put("firstName", "after-save"); + return Mono.just(new Person("after-save")); + } + } + + class SomeDomainType { + + @Id String id; + DefaultBulkOperationsUnitTests.Gender gender; + @Field("first_name") String firstName; + @Field String lastName; + } + + enum Gender { + M, F + } +}