Browse Source

Add support for reactive bulk operations.

Closes #2821
Original pull request: #4342
pull/4373/head
Christoph Strobl 3 years ago committed by Mark Paluch
parent
commit
86dd81f770
No known key found for this signature in database
GPG Key ID: 4406B84C1661DCD1
  1. 591
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperations.java
  2. 130
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveBulkOperations.java
  3. 35
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java
  4. 27
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java
  5. 346
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperationsTests.java
  6. 347
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperationsUnitTests.java

591
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperations.java

@ -0,0 +1,591 @@ @@ -0,0 +1,591 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.data.mapping.callback.ReactiveEntityCallbacks;
import org.springframework.data.mongodb.BulkOperationException;
import org.springframework.data.mongodb.core.BulkOperations.BulkMode;
import org.springframework.data.mongodb.core.convert.QueryMapper;
import org.springframework.data.mongodb.core.convert.UpdateMapper;
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
import org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent;
import org.springframework.data.mongodb.core.mapping.event.BeforeConvertEvent;
import org.springframework.data.mongodb.core.mapping.event.BeforeSaveEvent;
import org.springframework.data.mongodb.core.mapping.event.MongoMappingEvent;
import org.springframework.data.mongodb.core.mapping.event.ReactiveAfterSaveCallback;
import org.springframework.data.mongodb.core.mapping.event.ReactiveBeforeConvertCallback;
import org.springframework.data.mongodb.core.mapping.event.ReactiveBeforeSaveCallback;
import org.springframework.data.mongodb.core.query.Collation;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.data.mongodb.core.query.UpdateDefinition;
import org.springframework.data.mongodb.core.query.UpdateDefinition.ArrayFilter;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import com.mongodb.MongoBulkWriteException;
import com.mongodb.WriteConcern;
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.model.*;
import com.mongodb.reactivestreams.client.MongoCollection;
/**
* Default implementation for {@link ReactiveBulkOperations}.
*
* @author Christoph Strobl
* @since 4.1
*/
class DefaultReactiveBulkOperations implements ReactiveBulkOperations {
private final ReactiveMongoOperations mongoOperations;
private final String collectionName;
private final ReactiveBulkOperationContext bulkOperationContext;
private final List<Mono<SourceAwareWriteModelHolder>> models = new ArrayList<>();
private @Nullable WriteConcern defaultWriteConcern;
private BulkWriteOptions bulkOptions;
/**
* Creates a new {@link DefaultReactiveBulkOperations} for the given {@link MongoOperations}, collection name and
* {@link ReactiveBulkOperationContext}.
*
* @param mongoOperations must not be {@literal null}.
* @param collectionName must not be {@literal null}.
* @param bulkOperationContext must not be {@literal null}.
*/
DefaultReactiveBulkOperations(ReactiveMongoOperations mongoOperations, String collectionName,
ReactiveBulkOperationContext bulkOperationContext) {
Assert.notNull(mongoOperations, "MongoOperations must not be null");
Assert.hasText(collectionName, "CollectionName must not be null nor empty");
Assert.notNull(bulkOperationContext, "BulkOperationContext must not be null");
this.mongoOperations = mongoOperations;
this.collectionName = collectionName;
this.bulkOperationContext = bulkOperationContext;
this.bulkOptions = getBulkWriteOptions(bulkOperationContext.getBulkMode());
}
/**
* Configures the default {@link WriteConcern} to be used. Defaults to {@literal null}.
*
* @param defaultWriteConcern can be {@literal null}.
*/
void setDefaultWriteConcern(@Nullable WriteConcern defaultWriteConcern) {
this.defaultWriteConcern = defaultWriteConcern;
}
@Override
public ReactiveBulkOperations insert(Object document) {
Assert.notNull(document, "Document must not be null");
this.models.add(Mono.just(document).flatMap(it -> {
maybeEmitEvent(new BeforeConvertEvent<>(it, collectionName));
return maybeInvokeBeforeConvertCallback(it);
}).map(it -> new SourceAwareWriteModelHolder(it, new InsertOneModel(getMappedObject(it)))));
return this;
}
@Override
public ReactiveBulkOperations insert(List<? extends Object> documents) {
Assert.notNull(documents, "Documents must not be null");
documents.forEach(this::insert);
return this;
}
@Override
@SuppressWarnings("unchecked")
public ReactiveBulkOperations updateOne(Query query, UpdateDefinition update) {
Assert.notNull(query, "Query must not be null");
Assert.notNull(update, "Update must not be null");
update(query, update, false, false);
return this;
}
@Override
@SuppressWarnings("unchecked")
public ReactiveBulkOperations updateMulti(Query query, UpdateDefinition update) {
Assert.notNull(query, "Query must not be null");
Assert.notNull(update, "Update must not be null");
update(query, update, false, true);
return this;
}
@Override
public ReactiveBulkOperations upsert(Query query, UpdateDefinition update) {
return update(query, update, true, true);
}
@Override
public ReactiveBulkOperations remove(Query query) {
Assert.notNull(query, "Query must not be null");
DeleteOptions deleteOptions = new DeleteOptions();
query.getCollation().map(Collation::toMongoCollation).ifPresent(deleteOptions::collation);
this.models.add(Mono.just(query)
.map(it -> new SourceAwareWriteModelHolder(it, new DeleteManyModel<>(it.getQueryObject(), deleteOptions))));
return this;
}
@Override
public ReactiveBulkOperations remove(List<Query> removes) {
Assert.notNull(removes, "Removals must not be null");
for (Query query : removes) {
remove(query);
}
return this;
}
@Override
public ReactiveBulkOperations replaceOne(Query query, Object replacement, FindAndReplaceOptions options) {
Assert.notNull(query, "Query must not be null");
Assert.notNull(replacement, "Replacement must not be null");
Assert.notNull(options, "Options must not be null");
ReplaceOptions replaceOptions = new ReplaceOptions();
replaceOptions.upsert(options.isUpsert());
query.getCollation().map(Collation::toMongoCollation).ifPresent(replaceOptions::collation);
this.models.add(Mono.just(replacement).flatMap(it -> {
maybeEmitEvent(new BeforeConvertEvent<>(it, collectionName));
return maybeInvokeBeforeConvertCallback(it);
}).map(it -> new SourceAwareWriteModelHolder(it,
new ReplaceOneModel<>(getMappedQuery(query.getQueryObject()), getMappedObject(it), replaceOptions))));
return this;
}
@Override
public Mono<BulkWriteResult> execute() {
try {
Mono<BulkWriteResult> result = mongoOperations.execute(collectionName, this::bulkWriteTo).next();
return result;
} finally {
this.bulkOptions = getBulkWriteOptions(bulkOperationContext.getBulkMode());
}
}
private Mono<BulkWriteResult> bulkWriteTo(MongoCollection<Document> collection) {
if (defaultWriteConcern != null) {
collection = collection.withWriteConcern(defaultWriteConcern);
}
try {
Flux<SourceAwareWriteModelHolder> concat = Flux.concat(models).flatMap(it -> {
if (it.getModel()instanceof InsertOneModel<Document> insertOneModel) {
Document target = insertOneModel.getDocument();
maybeEmitBeforeSaveEvent(it);
return maybeInvokeBeforeSaveCallback(it.getSource(), target)
.map(afterCallback -> new SourceAwareWriteModelHolder(afterCallback, mapWriteModel(insertOneModel)));
} else if (it.getModel()instanceof ReplaceOneModel<Document> replaceOneModel) {
Document target = replaceOneModel.getReplacement();
maybeEmitBeforeSaveEvent(it);
return maybeInvokeBeforeSaveCallback(it.getSource(), target)
.map(afterCallback -> new SourceAwareWriteModelHolder(afterCallback, mapWriteModel(replaceOneModel)));
}
return Mono.just(new SourceAwareWriteModelHolder(it.getSource(), mapWriteModel(it.getModel())));
});
MongoCollection theCollection = collection;
return concat.collectList().flatMap(it -> {
return Mono
.from(theCollection.bulkWrite(
it.stream().map(SourceAwareWriteModelHolder::getModel).collect(Collectors.toList()), bulkOptions))
.doOnSuccess(state -> {
it.forEach(saved -> {
maybeEmitAfterSaveEvent(saved);
});
}).flatMap(state -> {
List<Mono<Object>> monos = it.stream().map(saved -> {
return maybeInvokeAfterSaveCallback(saved);
}).collect(Collectors.toList());
return Flux.concat(monos).then(Mono.just(state));
});
});
} catch (RuntimeException ex) {
if (ex instanceof MongoBulkWriteException) {
MongoBulkWriteException mongoBulkWriteException = (MongoBulkWriteException) ex;
if (mongoBulkWriteException.getWriteConcernError() != null) {
throw new DataIntegrityViolationException(ex.getMessage(), ex);
}
throw new BulkOperationException(ex.getMessage(), mongoBulkWriteException);
}
throw ex;
}
}
/**
* Performs update and upsert bulk operations.
*
* @param query the {@link Query} to determine documents to update.
* @param update the {@link Update} to perform, must not be {@literal null}.
* @param upsert whether to upsert.
* @param multi whether to issue a multi-update.
* @return the {@link BulkOperations} with the update registered.
*/
private ReactiveBulkOperations update(Query query, UpdateDefinition update, boolean upsert, boolean multi) {
Assert.notNull(query, "Query must not be null");
Assert.notNull(update, "Update must not be null");
UpdateOptions options = computeUpdateOptions(query, update, upsert);
this.models.add(Mono.just(update).map(it -> {
if (multi) {
return new SourceAwareWriteModelHolder(update,
new UpdateManyModel<>(query.getQueryObject(), it.getUpdateObject(), options));
}
return new SourceAwareWriteModelHolder(update,
new UpdateOneModel<>(query.getQueryObject(), it.getUpdateObject(), options));
}));
return this;
}
private WriteModel<Document> mapWriteModel(WriteModel<Document> writeModel) {
if (writeModel instanceof UpdateOneModel) {
UpdateOneModel<Document> model = (UpdateOneModel<Document>) writeModel;
return new UpdateOneModel<>(getMappedQuery(model.getFilter()), getMappedUpdate(model.getUpdate()),
model.getOptions());
}
if (writeModel instanceof UpdateManyModel) {
UpdateManyModel<Document> model = (UpdateManyModel<Document>) writeModel;
return new UpdateManyModel<>(getMappedQuery(model.getFilter()), getMappedUpdate(model.getUpdate()),
model.getOptions());
}
if (writeModel instanceof DeleteOneModel) {
DeleteOneModel<Document> model = (DeleteOneModel<Document>) writeModel;
return new DeleteOneModel<>(getMappedQuery(model.getFilter()), model.getOptions());
}
if (writeModel instanceof DeleteManyModel) {
DeleteManyModel<Document> model = (DeleteManyModel<Document>) writeModel;
return new DeleteManyModel<>(getMappedQuery(model.getFilter()), model.getOptions());
}
return writeModel;
}
private Bson getMappedUpdate(Bson update) {
return bulkOperationContext.getUpdateMapper().getMappedObject(update, bulkOperationContext.getEntity());
}
private Bson getMappedQuery(Bson query) {
return bulkOperationContext.getQueryMapper().getMappedObject(query, bulkOperationContext.getEntity());
}
private Document getMappedObject(Object source) {
if (source instanceof Document) {
return (Document) source;
}
Document sink = new Document();
mongoOperations.getConverter().write(source, sink);
return sink;
}
private void maybeEmitBeforeSaveEvent(SourceAwareWriteModelHolder holder) {
if (holder.getModel() instanceof InsertOneModel) {
Document target = ((InsertOneModel<Document>) holder.getModel()).getDocument();
maybeEmitEvent(new BeforeSaveEvent<>(holder.getSource(), target, collectionName));
} else if (holder.getModel() instanceof ReplaceOneModel) {
Document target = ((ReplaceOneModel<Document>) holder.getModel()).getReplacement();
maybeEmitEvent(new BeforeSaveEvent<>(holder.getSource(), target, collectionName));
}
}
private void maybeEmitAfterSaveEvent(SourceAwareWriteModelHolder holder) {
if (holder.getModel() instanceof InsertOneModel) {
Document target = ((InsertOneModel<Document>) holder.getModel()).getDocument();
maybeEmitEvent(new AfterSaveEvent<>(holder.getSource(), target, collectionName));
} else if (holder.getModel() instanceof ReplaceOneModel) {
Document target = ((ReplaceOneModel<Document>) holder.getModel()).getReplacement();
maybeEmitEvent(new AfterSaveEvent<>(holder.getSource(), target, collectionName));
}
}
private Mono<Object> maybeInvokeAfterSaveCallback(SourceAwareWriteModelHolder holder) {
if (holder.getModel() instanceof InsertOneModel) {
Document target = ((InsertOneModel<Document>) holder.getModel()).getDocument();
return maybeInvokeAfterSaveCallback(holder.getSource(), target);
} else if (holder.getModel() instanceof ReplaceOneModel) {
Document target = ((ReplaceOneModel<Document>) holder.getModel()).getReplacement();
return maybeInvokeAfterSaveCallback(holder.getSource(), target);
}
return Mono.just(holder.getSource());
}
private <E extends MongoMappingEvent<T>, T> E maybeEmitEvent(E event) {
if (bulkOperationContext.getEventPublisher() == null) {
return event;
}
bulkOperationContext.getEventPublisher().publishEvent(event);
return event;
}
private Mono<Object> maybeInvokeBeforeConvertCallback(Object value) {
if (bulkOperationContext.getEntityCallbacks() == null) {
return Mono.just(value);
}
return bulkOperationContext.getEntityCallbacks().callback(ReactiveBeforeConvertCallback.class, value,
collectionName);
}
private Mono<Object> maybeInvokeBeforeSaveCallback(Object value, Document mappedDocument) {
if (bulkOperationContext.getEntityCallbacks() == null) {
return Mono.just(value);
}
return bulkOperationContext.getEntityCallbacks().callback(ReactiveBeforeSaveCallback.class, value, mappedDocument,
collectionName);
}
private Mono<Object> maybeInvokeAfterSaveCallback(Object value, Document mappedDocument) {
if (bulkOperationContext.getEntityCallbacks() == null) {
return Mono.just(value);
}
return bulkOperationContext.getEntityCallbacks().callback(ReactiveAfterSaveCallback.class, value, mappedDocument,
collectionName);
}
private static BulkWriteOptions getBulkWriteOptions(BulkMode bulkMode) {
BulkWriteOptions options = new BulkWriteOptions();
switch (bulkMode) {
case ORDERED:
return options.ordered(true);
case UNORDERED:
return options.ordered(false);
}
throw new IllegalStateException("BulkMode was null");
}
/**
* @param filterQuery The {@link Query} to read a potential {@link Collation} from. Must not be {@literal null}.
* @param update The {@link Update} to apply
* @param upsert flag to indicate if document should be upserted.
* @return new instance of {@link UpdateOptions}.
*/
private static UpdateOptions computeUpdateOptions(Query filterQuery, UpdateDefinition update, boolean upsert) {
UpdateOptions options = new UpdateOptions();
options.upsert(upsert);
if (update.hasArrayFilters()) {
List<Document> list = new ArrayList<>(update.getArrayFilters().size());
for (ArrayFilter arrayFilter : update.getArrayFilters()) {
list.add(arrayFilter.asDocument());
}
options.arrayFilters(list);
}
filterQuery.getCollation().map(Collation::toMongoCollation).ifPresent(options::collation);
return options;
}
/**
* {@link ReactiveBulkOperationContext} holds information about {@link BulkMode} the entity in use as well as
* references to {@link QueryMapper} and {@link UpdateMapper}.
*
* @author Christoph Strobl
* @since 2.0
*/
static final class ReactiveBulkOperationContext {
private final BulkMode bulkMode;
private final Optional<? extends MongoPersistentEntity<?>> entity;
private final QueryMapper queryMapper;
private final UpdateMapper updateMapper;
private final ApplicationEventPublisher eventPublisher;
private final ReactiveEntityCallbacks entityCallbacks;
ReactiveBulkOperationContext(BulkMode bulkMode, Optional<? extends MongoPersistentEntity<?>> entity,
QueryMapper queryMapper, UpdateMapper updateMapper, ApplicationEventPublisher eventPublisher,
ReactiveEntityCallbacks entityCallbacks) {
this.bulkMode = bulkMode;
this.entity = entity;
this.queryMapper = queryMapper;
this.updateMapper = updateMapper;
this.eventPublisher = eventPublisher;
this.entityCallbacks = entityCallbacks;
}
public BulkMode getBulkMode() {
return this.bulkMode;
}
public Optional<? extends MongoPersistentEntity<?>> getEntity() {
return this.entity;
}
public QueryMapper getQueryMapper() {
return this.queryMapper;
}
public UpdateMapper getUpdateMapper() {
return this.updateMapper;
}
public ApplicationEventPublisher getEventPublisher() {
return this.eventPublisher;
}
public ReactiveEntityCallbacks getEntityCallbacks() {
return this.entityCallbacks;
}
@Override
public boolean equals(@Nullable Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
ReactiveBulkOperationContext that = (ReactiveBulkOperationContext) o;
if (bulkMode != that.bulkMode)
return false;
if (!ObjectUtils.nullSafeEquals(this.entity, that.entity)) {
return false;
}
if (!ObjectUtils.nullSafeEquals(this.queryMapper, that.queryMapper)) {
return false;
}
if (!ObjectUtils.nullSafeEquals(this.updateMapper, that.updateMapper)) {
return false;
}
if (!ObjectUtils.nullSafeEquals(this.eventPublisher, that.eventPublisher)) {
return false;
}
return ObjectUtils.nullSafeEquals(this.entityCallbacks, that.entityCallbacks);
}
@Override
public int hashCode() {
int result = bulkMode != null ? bulkMode.hashCode() : 0;
result = 31 * result + ObjectUtils.nullSafeHashCode(entity);
result = 31 * result + ObjectUtils.nullSafeHashCode(queryMapper);
result = 31 * result + ObjectUtils.nullSafeHashCode(updateMapper);
result = 31 * result + ObjectUtils.nullSafeHashCode(eventPublisher);
result = 31 * result + ObjectUtils.nullSafeHashCode(entityCallbacks);
return result;
}
public String toString() {
return "DefaultBulkOperations.BulkOperationContext(bulkMode=" + this.getBulkMode() + ", entity="
+ this.getEntity() + ", queryMapper=" + this.getQueryMapper() + ", updateMapper=" + this.getUpdateMapper()
+ ", eventPublisher=" + this.getEventPublisher() + ", entityCallbacks=" + this.getEntityCallbacks() + ")";
}
}
/**
* Value object chaining together an actual source with its {@link WriteModel} representation.
*
* @since 4.1
* @author Christoph Strobl
*/
private static final class SourceAwareWriteModelHolder {
private final Object source;
private final WriteModel<Document> model;
SourceAwareWriteModelHolder(Object source, WriteModel<Document> model) {
this.source = source;
this.model = model;
}
public Object getSource() {
return this.source;
}
public WriteModel<Document> getModel() {
return this.model;
}
}
}

130
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveBulkOperations.java

@ -0,0 +1,130 @@ @@ -0,0 +1,130 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core;
import java.util.List;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.data.mongodb.core.query.UpdateDefinition;
import org.springframework.data.util.Pair;
import com.mongodb.bulk.BulkWriteResult;
import reactor.core.publisher.Mono;
/**
* Bulk operations for insert/update/remove actions on a collection. Bulk operations are available since MongoDB 2.6 and
* make use of low level bulk commands on the protocol level. This interface defines a fluent API to add multiple single
* operations or list of similar operations in sequence which can then eventually be executed by calling
* {@link #execute()}.
* <p>
* Bulk operations are issued as one batch that pulls together all insert, update, and delete operations. Operations
* that require individual operation results such as optimistic locking (using {@code @Version}) are not supported and
* the version field remains not populated.
*
* @author Christoph Strobl
* @since 4.1
*/
public interface ReactiveBulkOperations {
/**
* Add a single insert to the bulk operation.
*
* @param documents the document to insert, must not be {@literal null}.
* @return the current {@link ReactiveBulkOperations} instance with the insert added, will never be {@literal null}.
*/
ReactiveBulkOperations insert(Object documents);
/**
* Add a list of inserts to the bulk operation.
*
* @param documents List of documents to insert, must not be {@literal null}.
* @return the current {@link ReactiveBulkOperations} instance with the insert added, will never be {@literal null}.
*/
ReactiveBulkOperations insert(List<? extends Object> documents);
/**
* Add a single update to the bulk operation. For the update request, only the first matching document is updated.
*
* @param query update criteria, must not be {@literal null}.
* @param update {@link UpdateDefinition} operation to perform, must not be {@literal null}.
* @return the current {@link ReactiveBulkOperations} instance with the update added, will never be {@literal null}.
*/
ReactiveBulkOperations updateOne(Query query, UpdateDefinition update);
/**
* Add a single update to the bulk operation. For the update request, all matching documents are updated.
*
* @param query Update criteria.
* @param update Update operation to perform.
* @return the current {@link ReactiveBulkOperations} instance with the update added, will never be {@literal null}.
*/
ReactiveBulkOperations updateMulti(Query query, UpdateDefinition update);
/**
* Add a single upsert to the bulk operation. An upsert is an update if the set of matching documents is not empty,
* else an insert.
*
* @param query Update criteria.
* @param update Update operation to perform.
* @return the current {@link ReactiveBulkOperations} instance with the update added, will never be {@literal null}.
*/
ReactiveBulkOperations upsert(Query query, UpdateDefinition update);
/**
* Add a single remove operation to the bulk operation.
*
* @param remove the {@link Query} to select the documents to be removed, must not be {@literal null}.
* @return the current {@link ReactiveBulkOperations} instance with the removal added, will never be {@literal null}.
*/
ReactiveBulkOperations remove(Query remove);
/**
* Add a list of remove operations to the bulk operation.
*
* @param removes the remove operations to perform, must not be {@literal null}.
* @return the current {@link ReactiveBulkOperations} instance with the removal added, will never be {@literal null}.
*/
ReactiveBulkOperations remove(List<Query> removes);
/**
* Add a single replace operation to the bulk operation.
*
* @param query Update criteria.
* @param replacement the replacement document. Must not be {@literal null}.
* @return the current {@link ReactiveBulkOperations} instance with the replace added, will never be {@literal null}.
*/
default ReactiveBulkOperations replaceOne(Query query, Object replacement) {
return replaceOne(query, replacement, FindAndReplaceOptions.empty());
}
/**
* Add a single replace operation to the bulk operation.
*
* @param query Update criteria.
* @param replacement the replacement document. Must not be {@literal null}.
* @param options the {@link FindAndModifyOptions} holding additional information. Must not be {@literal null}.
* @return the current {@link ReactiveBulkOperations} instance with the replace added, will never be {@literal null}.
*/
ReactiveBulkOperations replaceOne(Query query, Object replacement, FindAndReplaceOptions options);
/**
* Execute all bulk operations using the default write concern.
*
* @return a {@link Mono} emitting the result of the bulk operation providing counters for inserts/updates etc.
*/
Mono<BulkWriteResult> execute();
}

35
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java

@ -29,6 +29,7 @@ import org.springframework.data.domain.KeysetScrollPosition; @@ -29,6 +29,7 @@ import org.springframework.data.domain.KeysetScrollPosition;
import org.springframework.data.domain.Window;
import org.springframework.data.geo.GeoResult;
import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
import org.springframework.data.mongodb.core.BulkOperations.BulkMode;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
@ -1747,6 +1748,40 @@ public interface ReactiveMongoOperations extends ReactiveFluentMongoOperations { @@ -1747,6 +1748,40 @@ public interface ReactiveMongoOperations extends ReactiveFluentMongoOperations {
<T> Flux<T> mapReduce(Query filterQuery, Class<?> domainType, String inputCollectionName, Class<T> resultType,
String mapFunction, String reduceFunction, MapReduceOptions options);
/**
* Returns a new {@link ReactiveBulkOperations} for the given collection. <br />
* <strong>NOTE:</strong> Any additional support for field mapping, etc. is not available for {@literal update} or
* {@literal remove} operations in bulk mode due to the lack of domain type information. Use
* {@link #bulkOps(BulkMode, Class, String)} to get full type specific support.
*
* @param mode the {@link BulkMode} to use for bulk operations, must not be {@literal null}.
* @param collectionName the name of the collection to work on, must not be {@literal null} or empty.
* @return {@link ReactiveBulkOperations} on the named collection
* @since 4.1
*/
ReactiveBulkOperations bulkOps(BulkMode mode, String collectionName);
/**
* Returns a new {@link ReactiveBulkOperations} for the given entity type.
*
* @param mode the {@link BulkMode} to use for bulk operations, must not be {@literal null}.
* @param entityClass the name of the entity class, must not be {@literal null}.
* @return {@link ReactiveBulkOperations} on the named collection associated of the given entity class.
* @since 4.1
*/
ReactiveBulkOperations bulkOps(BulkMode mode, Class<?> entityClass);
/**
* Returns a new {@link ReactiveBulkOperations} for the given entity type and collection name.
*
* @param mode the {@link BulkMode} to use for bulk operations, must not be {@literal null}.
* @param entityType the name of the entity class. Can be {@literal null}.
* @param collectionName the name of the collection to work on, must not be {@literal null} or empty.
* @return {@link ReactiveBulkOperations} on the named collection associated with the given entity class.
* @since 4.1
*/
ReactiveBulkOperations bulkOps(BulkMode mode, @Nullable Class<?> entityType, String collectionName);
/**
* Returns the underlying {@link MongoConverter}.
*

27
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java

@ -17,6 +17,8 @@ package org.springframework.data.mongodb.core; @@ -17,6 +17,8 @@ package org.springframework.data.mongodb.core;
import static org.springframework.data.mongodb.core.query.SerializationUtils.*;
import org.springframework.data.mongodb.core.BulkOperations.BulkMode;
import org.springframework.data.mongodb.core.DefaultReactiveBulkOperations.ReactiveBulkOperationContext;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
@ -479,6 +481,31 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati @@ -479,6 +481,31 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
return new DefaultReactiveIndexOperations(this, getCollectionName(entityClass), this.queryMapper, entityClass);
}
@Override
public ReactiveBulkOperations bulkOps(BulkMode bulkMode, String collectionName) {
return bulkOps(bulkMode, null, collectionName);
}
@Override
public ReactiveBulkOperations bulkOps(BulkMode bulkMode, Class<?> entityClass) {
return bulkOps(bulkMode, entityClass, getCollectionName(entityClass));
}
@Override
public ReactiveBulkOperations bulkOps(BulkMode mode, @Nullable Class<?> entityType, String collectionName) {
Assert.notNull(mode, "BulkMode must not be null");
Assert.hasText(collectionName, "Collection name must not be null or empty");
DefaultReactiveBulkOperations operations = new DefaultReactiveBulkOperations(this, collectionName,
new ReactiveBulkOperationContext(mode, Optional.ofNullable(getPersistentEntity(entityType)), queryMapper, updateMapper,
eventPublisher, entityCallbacks));
operations.setDefaultWriteConcern(writeConcern);
return operations;
}
@Override
public String getCollectionName(Class<?> entityClass) {
return operations.determineCollectionName(entityClass);

346
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperationsTests.java

@ -0,0 +1,346 @@ @@ -0,0 +1,346 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core;
import static org.assertj.core.api.Assertions.*;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
import org.bson.Document;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.data.mongodb.core.BulkOperations.BulkMode;
import org.springframework.data.mongodb.core.DefaultReactiveBulkOperations.ReactiveBulkOperationContext;
import org.springframework.data.mongodb.core.convert.QueryMapper;
import org.springframework.data.mongodb.core.convert.UpdateMapper;
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.data.mongodb.test.util.MongoTemplateExtension;
import org.springframework.data.mongodb.test.util.ReactiveMongoTestTemplate;
import org.springframework.data.mongodb.test.util.Template;
import com.mongodb.MongoBulkWriteException;
import com.mongodb.WriteConcern;
import com.mongodb.bulk.BulkWriteResult;
/**
* @author Christoph Strobl
*/
@ExtendWith(MongoTemplateExtension.class)
class DefaultReactiveBulkOperationsTests {
static final String COLLECTION_NAME = "reactive-bulk-ops";
@Template(initialEntitySet = BaseDoc.class) static ReactiveMongoTestTemplate template;
@BeforeEach
public void setUp() {
template.flush(COLLECTION_NAME).as(StepVerifier::create).verifyComplete();
}
@Test // GH-2821
void insertOrdered() {
List<BaseDoc> documents = Arrays.asList(newDoc("1"), newDoc("2"));
createBulkOps(BulkMode.ORDERED).insert(documents) //
.execute().as(StepVerifier::create) //
.consumeNextWith(result -> {
assertThat(result.getInsertedCount()).isEqualTo(2);
});
}
@Test // GH-2821
void insertOrderedFails() {
List<BaseDoc> documents = Arrays.asList(newDoc("1"), newDoc("1"), newDoc("2"));
createBulkOps(BulkMode.ORDERED).insert(documents) //
.execute().as(StepVerifier::create) //
.verifyErrorSatisfies(error -> {
assertThat(error).isInstanceOf(DuplicateKeyException.class);
});
}
@Test // GH-2821
public void insertUnOrdered() {
List<BaseDoc> documents = Arrays.asList(newDoc("1"), newDoc("2"));
createBulkOps(BulkMode.UNORDERED).insert(documents) //
.execute().as(StepVerifier::create) //
.consumeNextWith(result -> {
assertThat(result.getInsertedCount()).isEqualTo(2);
});
}
@Test // GH-2821
public void insertUnOrderedContinuesOnError() {
List<BaseDoc> documents = Arrays.asList(newDoc("1"), newDoc("1"), newDoc("2"));
createBulkOps(BulkMode.UNORDERED).insert(documents) //
.execute().as(StepVerifier::create) //
.verifyErrorSatisfies(error -> {
assertThat(error).isInstanceOf(DuplicateKeyException.class);
assertThat(error.getCause()).isInstanceOf(MongoBulkWriteException.class);
MongoBulkWriteException cause = (MongoBulkWriteException) error.getCause();
assertThat(cause.getWriteResult().getInsertedCount()).isEqualTo(2);
assertThat(cause.getWriteErrors()).isNotNull();
assertThat(cause.getWriteErrors().size()).isOne();
});
}
@Test // GH-2821
void upsertDoesUpdate() {
insertSomeDocuments();
createBulkOps(BulkMode.ORDERED).//
upsert(where("value", "value1"), set("value", "value2")).//
execute().as(StepVerifier::create) //
.consumeNextWith(result -> {
assertThat(result).isNotNull();
assertThat(result.getMatchedCount()).isEqualTo(2);
assertThat(result.getModifiedCount()).isEqualTo(2);
assertThat(result.getInsertedCount()).isZero();
assertThat(result.getUpserts()).isNotNull();
assertThat(result.getUpserts().size()).isZero();
}) //
.verifyComplete();
}
@Test // GH-2821
public void upsertDoesInsert() {
createBulkOps(BulkMode.ORDERED).//
upsert(where("_id", "1"), set("value", "v1")).//
execute().as(StepVerifier::create) //
.consumeNextWith(result -> {
assertThat(result).isNotNull();
assertThat(result.getMatchedCount()).isZero();
assertThat(result.getModifiedCount()).isZero();
assertThat(result.getUpserts()).isNotNull();
assertThat(result.getUpserts().size()).isOne();
}) //
.verifyComplete();
}
@ParameterizedTest // GH-2821
@MethodSource
public void testUpdates(BulkMode mode, boolean multi, int expectedUpdateCount) {
insertSomeDocuments();
ReactiveBulkOperations bulkOps = createBulkOps(mode);
if (multi) {
bulkOps.updateMulti(where("value", "value1"), set("value", "value3"));
bulkOps.updateMulti(where("value", "value2"), set("value", "value4"));
} else {
bulkOps.updateOne(where("value", "value1"), set("value", "value3"));
bulkOps.updateOne(where("value", "value2"), set("value", "value4"));
}
bulkOps.execute().map(BulkWriteResult::getModifiedCount) //
.as(StepVerifier::create) //
.expectNext(expectedUpdateCount) //
.verifyComplete();
}
private static Stream<Arguments> testUpdates() {
return Stream.of(Arguments.of(BulkMode.ORDERED, false, 2), Arguments.of(BulkMode.ORDERED, true, 4),
Arguments.of(BulkMode.UNORDERED, false, 2), Arguments.of(BulkMode.UNORDERED, false, 2));
}
@ParameterizedTest // GH-2821
@EnumSource(BulkMode.class)
void testRemove(BulkMode mode) {
insertSomeDocuments();
List<Query> removes = Arrays.asList(where("_id", "1"), where("value", "value2"));
createBulkOps(mode).remove(removes).execute().map(BulkWriteResult::getDeletedCount).as(StepVerifier::create)
.expectNext(3).verifyComplete();
}
@ParameterizedTest // GH-2821
@EnumSource(BulkMode.class)
void testReplaceOne(BulkMode mode) {
insertSomeDocuments();
Query query = where("_id", "1");
Document document = rawDoc("1", "value2");
createBulkOps(mode).replaceOne(query, document).execute().map(BulkWriteResult::getModifiedCount)
.as(StepVerifier::create).expectNext(1).verifyComplete();
}
@Test // GH-2821
public void replaceOneDoesReplace() {
insertSomeDocuments();
createBulkOps(BulkMode.ORDERED).//
replaceOne(where("_id", "1"), rawDoc("1", "value2")).//
execute().as(StepVerifier::create).consumeNextWith(result -> {
assertThat(result).isNotNull();
assertThat(result.getMatchedCount()).isOne();
assertThat(result.getModifiedCount()).isOne();
assertThat(result.getInsertedCount()).isZero();
}).verifyComplete();
}
@Test // GH-2821
public void replaceOneWithUpsert() {
createBulkOps(BulkMode.ORDERED).//
replaceOne(where("_id", "1"), rawDoc("1", "value2"), FindAndReplaceOptions.options().upsert()).//
execute().as(StepVerifier::create).consumeNextWith(result -> {
assertThat(result).isNotNull();
assertThat(result.getMatchedCount()).isZero();
assertThat(result.getInsertedCount()).isZero();
assertThat(result.getModifiedCount()).isZero();
assertThat(result.getUpserts().size()).isOne();
});
}
@Test // GH-2821
public void mixedBulkOrdered() {
createBulkOps(BulkMode.ORDERED, BaseDoc.class).insert(newDoc("1", "v1")).//
updateOne(where("_id", "1"), set("value", "v2")).//
remove(where("value", "v2")).//
execute().as(StepVerifier::create).consumeNextWith(result -> {
assertThat(result).isNotNull();
assertThat(result.getInsertedCount()).isOne();
assertThat(result.getModifiedCount()).isOne();
assertThat(result.getDeletedCount()).isOne();
}).verifyComplete();
}
@Test // GH-2821
public void mixedBulkOrderedWithList() {
List<BaseDoc> inserts = Arrays.asList(newDoc("1", "v1"), newDoc("2", "v2"), newDoc("3", "v2"));
List<Query> removes = Arrays.asList(where("_id", "1"));
createBulkOps(BulkMode.ORDERED, BaseDoc.class).insert(inserts).updateMulti(where("value", "v2"), set("value", "v3"))
.remove(removes).execute().as(StepVerifier::create).consumeNextWith(result -> {
assertThat(result).isNotNull();
assertThat(result.getInsertedCount()).isEqualTo(3);
assertThat(result.getModifiedCount()).isEqualTo(2);
assertThat(result.getDeletedCount()).isOne();
}).verifyComplete();
}
@Test // GH-2821
public void insertShouldConsiderInheritance() {
SpecialDoc specialDoc = new SpecialDoc();
specialDoc.id = "id-special";
specialDoc.value = "normal-value";
specialDoc.specialValue = "special-value";
createBulkOps(BulkMode.ORDERED, SpecialDoc.class).insert(Arrays.asList(specialDoc)).execute().then()
.as(StepVerifier::create).verifyComplete();
template.findOne(where("_id", specialDoc.id), BaseDoc.class, COLLECTION_NAME).as(StepVerifier::create)
.consumeNextWith(doc -> {
assertThat(doc).isNotNull();
assertThat(doc).isInstanceOf(SpecialDoc.class);
}).verifyComplete();
}
private void insertSomeDocuments() {
template.execute(COLLECTION_NAME, collection -> {
return Flux.from(collection.insertMany(
List.of(rawDoc("1", "value1"), rawDoc("2", "value1"), rawDoc("3", "value2"), rawDoc("4", "value2"))));
}).then().as(StepVerifier::create).verifyComplete();
}
private DefaultReactiveBulkOperations createBulkOps(BulkMode mode) {
return createBulkOps(mode, null);
}
private DefaultReactiveBulkOperations createBulkOps(BulkMode mode, Class<?> entityType) {
Optional<? extends MongoPersistentEntity<?>> entity = entityType != null
? Optional.of(template.getConverter().getMappingContext().getPersistentEntity(entityType))
: Optional.empty();
ReactiveBulkOperationContext bulkOperationContext = new ReactiveBulkOperationContext(mode, entity,
new QueryMapper(template.getConverter()), new UpdateMapper(template.getConverter()), null, null);
DefaultReactiveBulkOperations bulkOps = new DefaultReactiveBulkOperations(template, COLLECTION_NAME,
bulkOperationContext);
bulkOps.setDefaultWriteConcern(WriteConcern.ACKNOWLEDGED);
return bulkOps;
}
private static BaseDoc newDoc(String id) {
BaseDoc doc = new BaseDoc();
doc.id = id;
return doc;
}
private static BaseDoc newDoc(String id, String value) {
BaseDoc doc = newDoc(id);
doc.value = value;
return doc;
}
private static Query where(String field, String value) {
return new Query().addCriteria(Criteria.where(field).is(value));
}
private static Update set(String field, String value) {
return new Update().set(field, value);
}
private static Document rawDoc(String id, String value) {
return new Document("_id", id).append("value", value);
}
}

347
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperationsUnitTests.java

@ -0,0 +1,347 @@ @@ -0,0 +1,347 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
import static org.springframework.data.mongodb.core.query.Criteria.*;
import static org.springframework.data.mongodb.core.query.Query.*;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.util.List;
import java.util.Optional;
import org.bson.BsonDocument;
import org.bson.BsonString;
import org.bson.Document;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.data.annotation.Id;
import org.springframework.data.mapping.callback.ReactiveEntityCallbacks;
import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
import org.springframework.data.mongodb.core.BulkOperations.BulkMode;
import org.springframework.data.mongodb.core.DefaultBulkOperationsUnitTests.NullExceptionTranslator;
import org.springframework.data.mongodb.core.DefaultReactiveBulkOperations.ReactiveBulkOperationContext;
import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.convert.NoOpDbRefResolver;
import org.springframework.data.mongodb.core.convert.QueryMapper;
import org.springframework.data.mongodb.core.convert.UpdateMapper;
import org.springframework.data.mongodb.core.mapping.Field;
import org.springframework.data.mongodb.core.mapping.MongoMappingContext;
import org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent;
import org.springframework.data.mongodb.core.mapping.event.BeforeConvertEvent;
import org.springframework.data.mongodb.core.mapping.event.BeforeSaveEvent;
import org.springframework.data.mongodb.core.mapping.event.ReactiveAfterSaveCallback;
import org.springframework.data.mongodb.core.mapping.event.ReactiveBeforeConvertCallback;
import org.springframework.data.mongodb.core.mapping.event.ReactiveBeforeSaveCallback;
import org.springframework.data.mongodb.core.query.BasicQuery;
import org.springframework.data.mongodb.core.query.Collation;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Update;
import com.mongodb.MongoWriteException;
import com.mongodb.WriteError;
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.DeleteManyModel;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.ReplaceOneModel;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.WriteModel;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
/**
* @author Christoph Strobl
*/
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
public class DefaultReactiveBulkOperationsUnitTests {
ReactiveMongoTemplate template;
@Mock ReactiveMongoDatabaseFactory factory;
@Mock MongoDatabase database;
@Mock(answer = Answers.RETURNS_DEEP_STUBS) MongoCollection<Document> collection;
@Captor ArgumentCaptor<List<WriteModel<Document>>> captor;
private MongoConverter converter;
private MongoMappingContext mappingContext;
private DefaultReactiveBulkOperations ops;
@BeforeEach
void setUp() {
when(factory.getMongoDatabase()).thenReturn(Mono.just(database));
when(factory.getExceptionTranslator()).thenReturn(new NullExceptionTranslator());
when(database.getCollection(anyString(), eq(Document.class))).thenReturn(collection);
when(collection.bulkWrite(anyList(), any())).thenReturn(Mono.just(mock(BulkWriteResult.class)));
mappingContext = new MongoMappingContext();
mappingContext.afterPropertiesSet();
converter = new MappingMongoConverter(NoOpDbRefResolver.INSTANCE, mappingContext);
template = new ReactiveMongoTemplate(factory, converter);
ops = new DefaultReactiveBulkOperations(template, "collection-1",
new ReactiveBulkOperationContext(BulkMode.ORDERED,
Optional.of(mappingContext.getPersistentEntity(SomeDomainType.class)), new QueryMapper(converter),
new UpdateMapper(converter), null, null));
}
@Test // GH-2821
void updateOneShouldUseCollationWhenPresent() {
ops.updateOne(new BasicQuery("{}").collation(Collation.of("de")), new Update().set("lastName", "targaryen"))
.execute().subscribe();
verify(collection).bulkWrite(captor.capture(), any());
assertThat(captor.getValue().get(0)).isInstanceOf(UpdateOneModel.class);
assertThat(((UpdateOneModel<Document>) captor.getValue().get(0)).getOptions().getCollation())
.isEqualTo(com.mongodb.client.model.Collation.builder().locale("de").build());
}
@Test // GH-2821
void replaceOneShouldUseCollationWhenPresent() {
ops.replaceOne(new BasicQuery("{}").collation(Collation.of("de")), new SomeDomainType()).execute().subscribe();
verify(collection).bulkWrite(captor.capture(), any());
assertThat(captor.getValue().get(0)).isInstanceOf(ReplaceOneModel.class);
assertThat(((ReplaceOneModel<Document>) captor.getValue().get(0)).getReplaceOptions().getCollation())
.isEqualTo(com.mongodb.client.model.Collation.builder().locale("de").build());
}
@Test // GH-2821
void removeShouldUseCollationWhenPresent() {
ops.remove(new BasicQuery("{}").collation(Collation.of("de"))).execute().subscribe();
verify(collection).bulkWrite(captor.capture(), any());
assertThat(captor.getValue().get(0)).isInstanceOf(DeleteManyModel.class);
assertThat(((DeleteManyModel<Document>) captor.getValue().get(0)).getOptions().getCollation())
.isEqualTo(com.mongodb.client.model.Collation.builder().locale("de").build());
}
@Test // GH-2821
void bulkUpdateShouldMapQueryAndUpdateCorrectly() {
ops.updateOne(query(where("firstName").is("danerys")), Update.update("firstName", "queen danerys")).execute()
.subscribe();
verify(collection).bulkWrite(captor.capture(), any());
UpdateOneModel<Document> updateModel = (UpdateOneModel<Document>) captor.getValue().get(0);
assertThat(updateModel.getFilter()).isEqualTo(new Document("first_name", "danerys"));
assertThat(updateModel.getUpdate()).isEqualTo(new Document("$set", new Document("first_name", "queen danerys")));
}
@Test // GH-2821
void bulkRemoveShouldMapQueryCorrectly() {
ops.remove(query(where("firstName").is("danerys"))).execute().subscribe();
verify(collection).bulkWrite(captor.capture(), any());
DeleteManyModel<Document> updateModel = (DeleteManyModel<Document>) captor.getValue().get(0);
assertThat(updateModel.getFilter()).isEqualTo(new Document("first_name", "danerys"));
}
@Test // GH-2821
void bulkReplaceOneShouldMapQueryCorrectly() {
SomeDomainType replacement = new SomeDomainType();
replacement.firstName = "Minsu";
replacement.lastName = "Kim";
ops.replaceOne(query(where("firstName").is("danerys")), replacement).execute().subscribe();
verify(collection).bulkWrite(captor.capture(), any());
ReplaceOneModel<Document> updateModel = (ReplaceOneModel<Document>) captor.getValue().get(0);
assertThat(updateModel.getFilter()).isEqualTo(new Document("first_name", "danerys"));
assertThat(updateModel.getReplacement().getString("first_name")).isEqualTo("Minsu");
assertThat(updateModel.getReplacement().getString("lastName")).isEqualTo("Kim");
}
@Test // GH-2821
void bulkInsertInvokesEntityCallbacks() {
BeforeConvertPersonCallback beforeConvertCallback = spy(new BeforeConvertPersonCallback());
BeforeSavePersonCallback beforeSaveCallback = spy(new BeforeSavePersonCallback());
AfterSavePersonCallback afterSaveCallback = spy(new AfterSavePersonCallback());
ops = new DefaultReactiveBulkOperations(template, "collection-1",
new ReactiveBulkOperationContext(BulkMode.ORDERED,
Optional.of(mappingContext.getPersistentEntity(Person.class)), new QueryMapper(converter),
new UpdateMapper(converter), null,
ReactiveEntityCallbacks.create(beforeConvertCallback, beforeSaveCallback, afterSaveCallback)));
Person entity = new Person("init");
ops.insert(entity);
ArgumentCaptor<Person> personArgumentCaptor = ArgumentCaptor.forClass(Person.class);
verifyNoInteractions(beforeConvertCallback);
verifyNoInteractions(beforeSaveCallback);
ops.execute().then().as(StepVerifier::create).verifyComplete();
verify(beforeConvertCallback).onBeforeConvert(personArgumentCaptor.capture(), eq("collection-1"));
verify(beforeSaveCallback).onBeforeSave(personArgumentCaptor.capture(), any(), eq("collection-1"));
verify(afterSaveCallback).onAfterSave(personArgumentCaptor.capture(), any(), eq("collection-1"));
assertThat(personArgumentCaptor.getAllValues()).extracting("firstName").containsExactly("init", "before-convert",
"before-save");
verify(collection).bulkWrite(captor.capture(), any());
InsertOneModel<Document> updateModel = (InsertOneModel<Document>) captor.getValue().get(0);
assertThat(updateModel.getDocument()).containsEntry("firstName", "after-save");
}
@Test // GH-2821
void bulkReplaceOneEmitsEventsCorrectly() {
ApplicationEventPublisher eventPublisher = mock(ApplicationEventPublisher.class);
ops = new DefaultReactiveBulkOperations(template, "collection-1",
new ReactiveBulkOperationContext(BulkMode.ORDERED,
Optional.of(mappingContext.getPersistentEntity(Person.class)), new QueryMapper(converter),
new UpdateMapper(converter), eventPublisher, null));
ops.replaceOne(query(where("firstName").is("danerys")), new SomeDomainType());
verify(eventPublisher, never()).publishEvent(any(BeforeConvertEvent.class));
verify(eventPublisher, never()).publishEvent(any(BeforeSaveEvent.class));
verify(eventPublisher, never()).publishEvent(any(AfterSaveEvent.class));
ops.execute().then().as(StepVerifier::create).verifyComplete();
verify(eventPublisher).publishEvent(any(BeforeConvertEvent.class));
verify(eventPublisher).publishEvent(any(BeforeSaveEvent.class));
verify(eventPublisher).publishEvent(any(AfterSaveEvent.class));
}
@Test // GH-2821
void bulkInsertEmitsEventsCorrectly() {
ApplicationEventPublisher eventPublisher = mock(ApplicationEventPublisher.class);
ops = new DefaultReactiveBulkOperations(template, "collection-1",
new ReactiveBulkOperationContext(BulkMode.ORDERED,
Optional.of(mappingContext.getPersistentEntity(Person.class)), new QueryMapper(converter),
new UpdateMapper(converter), eventPublisher, null));
ops.insert(new SomeDomainType());
verify(eventPublisher, never()).publishEvent(any(BeforeConvertEvent.class));
verify(eventPublisher, never()).publishEvent(any(BeforeSaveEvent.class));
verify(eventPublisher, never()).publishEvent(any(AfterSaveEvent.class));
ops.execute().then().as(StepVerifier::create).verifyComplete();
verify(eventPublisher).publishEvent(any(BeforeConvertEvent.class));
verify(eventPublisher).publishEvent(any(BeforeSaveEvent.class));
verify(eventPublisher).publishEvent(any(AfterSaveEvent.class));
}
@Test // GH-2821
void noAfterSaveEventOnFailure() {
ApplicationEventPublisher eventPublisher = mock(ApplicationEventPublisher.class);
when(collection.bulkWrite(anyList(), any(BulkWriteOptions.class))).thenThrow(new MongoWriteException(
new WriteError(89, "NetworkTimeout", new BsonDocument("hi", new BsonString("there"))), null));
ops = new DefaultReactiveBulkOperations(template, "collection-1",
new ReactiveBulkOperationContext(BulkMode.ORDERED,
Optional.of(mappingContext.getPersistentEntity(Person.class)), new QueryMapper(converter),
new UpdateMapper(converter), eventPublisher, null));
ops.insert(new SomeDomainType());
ops.execute().as(StepVerifier::create).expectError();
verify(eventPublisher, never()).publishEvent(any(AfterSaveEvent.class));
}
@Test // GH-2821
void appliesArrayFilterWhenPresent() {
ops.updateOne(new BasicQuery("{}"), new Update().filterArray(Criteria.where("element").gte(100))).execute()
.subscribe();
verify(collection).bulkWrite(captor.capture(), any());
UpdateOneModel<Document> updateModel = (UpdateOneModel<Document>) captor.getValue().get(0);
assertThat(updateModel.getOptions().getArrayFilters().get(0))
.isEqualTo(new org.bson.Document("element", new Document("$gte", 100)));
}
static class BeforeConvertPersonCallback implements ReactiveBeforeConvertCallback<Person> {
@Override
public Mono<Person> onBeforeConvert(Person entity, String collection) {
return Mono.just(new Person("before-convert"));
}
}
static class BeforeSavePersonCallback implements ReactiveBeforeSaveCallback<Person> {
@Override
public Mono<Person> onBeforeSave(Person entity, Document document, String collection) {
document.put("firstName", "before-save");
return Mono.just(new Person("before-save"));
}
}
static class AfterSavePersonCallback implements ReactiveAfterSaveCallback<Person> {
@Override
public Mono<Person> onAfterSave(Person entity, Document document, String collection) {
document.put("firstName", "after-save");
return Mono.just(new Person("after-save"));
}
}
class SomeDomainType {
@Id String id;
DefaultBulkOperationsUnitTests.Gender gender;
@Field("first_name") String firstName;
@Field String lastName;
}
enum Gender {
M, F
}
}
Loading…
Cancel
Save