diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/BulkOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/BulkOperations.java index 9e7dd2d2a..c9d67d1e3 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/BulkOperations.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/BulkOperations.java @@ -31,9 +31,9 @@ import com.mongodb.bulk.BulkWriteResult; * {@link #execute()}. * *
- * MongoTemplate template = …;
+ * MongoOperations ops = …;
*
- * template.bulkOps(BulkMode.UNORDERED, Person.class)
+ * ops.bulkOps(BulkMode.UNORDERED, Person.class)
* .insert(newPerson)
* .updateOne(where("firstname").is("Joe"), Update.update("lastname", "Doe"))
* .execute();
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/BulkOperationsSupport.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/BulkOperationsSupport.java
new file mode 100644
index 000000000..5bbcbd37d
--- /dev/null
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/BulkOperationsSupport.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright 2023 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.mongodb.core;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.springframework.context.ApplicationEvent;
+import org.springframework.data.mapping.PersistentEntity;
+import org.springframework.data.mongodb.core.BulkOperations.BulkMode;
+import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
+import org.springframework.data.mongodb.core.aggregation.AggregationUpdate;
+import org.springframework.data.mongodb.core.aggregation.RelaxedTypeBasedAggregationOperationContext;
+import org.springframework.data.mongodb.core.convert.QueryMapper;
+import org.springframework.data.mongodb.core.convert.UpdateMapper;
+import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
+import org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent;
+import org.springframework.data.mongodb.core.mapping.event.BeforeSaveEvent;
+import org.springframework.data.mongodb.core.query.Collation;
+import org.springframework.data.mongodb.core.query.Query;
+import org.springframework.data.mongodb.core.query.Update;
+import org.springframework.data.mongodb.core.query.UpdateDefinition;
+import org.springframework.data.mongodb.core.query.UpdateDefinition.ArrayFilter;
+import org.springframework.util.Assert;
+
+import com.mongodb.client.model.BulkWriteOptions;
+import com.mongodb.client.model.DeleteManyModel;
+import com.mongodb.client.model.DeleteOneModel;
+import com.mongodb.client.model.InsertOneModel;
+import com.mongodb.client.model.ReplaceOneModel;
+import com.mongodb.client.model.UpdateManyModel;
+import com.mongodb.client.model.UpdateOneModel;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.model.WriteModel;
+
+/**
+ * Support class for bulk operations.
+ *
+ * @author Mark Paluch
+ * @since 4.1
+ */
+abstract class BulkOperationsSupport {
+
+ private final String collectionName;
+
+ BulkOperationsSupport(String collectionName) {
+
+ Assert.hasText(collectionName, "CollectionName must not be null nor empty");
+
+ this.collectionName = collectionName;
+ }
+
+ /**
+ * Emit a {@link BeforeSaveEvent}.
+ *
+ * @param holder
+ */
+ void maybeEmitBeforeSaveEvent(SourceAwareWriteModelHolder holder) {
+
+ if (holder.model() instanceof InsertOneModel) {
+
+ Document target = ((InsertOneModel) holder.model()).getDocument();
+ maybeEmitEvent(new BeforeSaveEvent<>(holder.source(), target, collectionName));
+ } else if (holder.model() instanceof ReplaceOneModel) {
+
+ Document target = ((ReplaceOneModel) holder.model()).getReplacement();
+ maybeEmitEvent(new BeforeSaveEvent<>(holder.source(), target, collectionName));
+ }
+ }
+
+ /**
+ * Emit a {@link AfterSaveEvent}.
+ *
+ * @param holder
+ */
+ void maybeEmitAfterSaveEvent(SourceAwareWriteModelHolder holder) {
+
+ if (holder.model() instanceof InsertOneModel) {
+
+ Document target = ((InsertOneModel) holder.model()).getDocument();
+ maybeEmitEvent(new AfterSaveEvent<>(holder.source(), target, collectionName));
+ } else if (holder.model() instanceof ReplaceOneModel) {
+
+ Document target = ((ReplaceOneModel) holder.model()).getReplacement();
+ maybeEmitEvent(new AfterSaveEvent<>(holder.source(), target, collectionName));
+ }
+ }
+
+ WriteModel mapWriteModel(Object source, WriteModel writeModel) {
+
+ if (writeModel instanceof UpdateOneModel model) {
+
+ if (source instanceof AggregationUpdate aggregationUpdate) {
+
+ List pipeline = mapUpdatePipeline(aggregationUpdate);
+ return new UpdateOneModel<>(getMappedQuery(model.getFilter()), pipeline, model.getOptions());
+ }
+
+ return new UpdateOneModel<>(getMappedQuery(model.getFilter()), getMappedUpdate(model.getUpdate()),
+ model.getOptions());
+ }
+
+ if (writeModel instanceof UpdateManyModel model) {
+
+ if (source instanceof AggregationUpdate aggregationUpdate) {
+
+ List pipeline = mapUpdatePipeline(aggregationUpdate);
+ return new UpdateManyModel<>(getMappedQuery(model.getFilter()), pipeline, model.getOptions());
+ }
+
+ return new UpdateManyModel<>(getMappedQuery(model.getFilter()), getMappedUpdate(model.getUpdate()),
+ model.getOptions());
+ }
+
+ if (writeModel instanceof DeleteOneModel model) {
+ return new DeleteOneModel<>(getMappedQuery(model.getFilter()), model.getOptions());
+ }
+
+ if (writeModel instanceof DeleteManyModel model) {
+ return new DeleteManyModel<>(getMappedQuery(model.getFilter()), model.getOptions());
+ }
+
+ return writeModel;
+ }
+
+ private List mapUpdatePipeline(AggregationUpdate source) {
+
+ Class> type = entity().isPresent() ? entity().map(PersistentEntity::getType).get() : Object.class;
+ AggregationOperationContext context = new RelaxedTypeBasedAggregationOperationContext(type,
+ updateMapper().getMappingContext(), queryMapper());
+
+ return new AggregationUtil(queryMapper(), queryMapper().getMappingContext()).createPipeline(source, context);
+ }
+
+ /**
+ * Emit a {@link ApplicationEvent} if event multicasting is enabled.
+ *
+ * @param event
+ */
+ protected abstract void maybeEmitEvent(ApplicationEvent event);
+
+ /**
+ * @return the {@link UpdateMapper} to use.
+ */
+ protected abstract UpdateMapper updateMapper();
+
+ /**
+ * @return the {@link QueryMapper} to use.
+ */
+ protected abstract QueryMapper queryMapper();
+
+ /**
+ * @return the associated {@link PersistentEntity}. Can be {@link Optional#empty()}.
+ */
+ protected abstract Optional extends MongoPersistentEntity>> entity();
+
+ protected Bson getMappedUpdate(Bson update) {
+ return updateMapper().getMappedObject(update, entity());
+ }
+
+ protected Bson getMappedQuery(Bson query) {
+ return queryMapper().getMappedObject(query, entity());
+ }
+
+ protected static BulkWriteOptions getBulkWriteOptions(BulkMode bulkMode) {
+
+ BulkWriteOptions options = new BulkWriteOptions();
+
+ return switch (bulkMode) {
+ case ORDERED -> options.ordered(true);
+ case UNORDERED -> options.ordered(false);
+ };
+ }
+
+ /**
+ * @param filterQuery The {@link Query} to read a potential {@link Collation} from. Must not be {@literal null}.
+ * @param update The {@link Update} to apply
+ * @param upsert flag to indicate if document should be upserted.
+ * @return new instance of {@link UpdateOptions}.
+ */
+ protected static UpdateOptions computeUpdateOptions(Query filterQuery, UpdateDefinition update, boolean upsert) {
+
+ UpdateOptions options = new UpdateOptions();
+ options.upsert(upsert);
+
+ if (update.hasArrayFilters()) {
+ List list = new ArrayList<>(update.getArrayFilters().size());
+ for (ArrayFilter arrayFilter : update.getArrayFilters()) {
+ list.add(arrayFilter.asDocument());
+ }
+ options.arrayFilters(list);
+ }
+
+ filterQuery.getCollation().map(Collation::toMongoCollation).ifPresent(options::collation);
+ return options;
+ }
+
+ /**
+ * Value object chaining together an actual source with its {@link WriteModel} representation.
+ *
+ * @author Christoph Strobl
+ */
+ record SourceAwareWriteModelHolder(Object source, WriteModel model) {
+ }
+}
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultBulkOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultBulkOperations.java
index bca9d1588..cee7d9624 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultBulkOperations.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultBulkOperations.java
@@ -21,32 +21,24 @@ import java.util.Optional;
import java.util.stream.Collectors;
import org.bson.Document;
-import org.bson.conversions.Bson;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.dao.DataIntegrityViolationException;
-import org.springframework.data.mapping.PersistentEntity;
import org.springframework.data.mapping.callback.EntityCallback;
import org.springframework.data.mapping.callback.EntityCallbacks;
import org.springframework.data.mongodb.BulkOperationException;
-import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
-import org.springframework.data.mongodb.core.aggregation.AggregationUpdate;
-import org.springframework.data.mongodb.core.aggregation.RelaxedTypeBasedAggregationOperationContext;
import org.springframework.data.mongodb.core.convert.QueryMapper;
import org.springframework.data.mongodb.core.convert.UpdateMapper;
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
import org.springframework.data.mongodb.core.mapping.event.AfterSaveCallback;
-import org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent;
import org.springframework.data.mongodb.core.mapping.event.BeforeConvertCallback;
import org.springframework.data.mongodb.core.mapping.event.BeforeConvertEvent;
import org.springframework.data.mongodb.core.mapping.event.BeforeSaveCallback;
-import org.springframework.data.mongodb.core.mapping.event.BeforeSaveEvent;
import org.springframework.data.mongodb.core.mapping.event.MongoMappingEvent;
import org.springframework.data.mongodb.core.query.Collation;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.data.mongodb.core.query.UpdateDefinition;
-import org.springframework.data.mongodb.core.query.UpdateDefinition.ArrayFilter;
import org.springframework.data.util.Pair;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
@@ -55,7 +47,16 @@ import com.mongodb.MongoBulkWriteException;
import com.mongodb.WriteConcern;
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.MongoCollection;
-import com.mongodb.client.model.*;
+import com.mongodb.client.model.BulkWriteOptions;
+import com.mongodb.client.model.DeleteManyModel;
+import com.mongodb.client.model.DeleteOptions;
+import com.mongodb.client.model.InsertOneModel;
+import com.mongodb.client.model.ReplaceOneModel;
+import com.mongodb.client.model.ReplaceOptions;
+import com.mongodb.client.model.UpdateManyModel;
+import com.mongodb.client.model.UpdateOneModel;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.model.WriteModel;
/**
* Default implementation for {@link BulkOperations}.
@@ -71,7 +72,7 @@ import com.mongodb.client.model.*;
* @author Jacob Botuck
* @since 1.9
*/
-class DefaultBulkOperations implements BulkOperations {
+class DefaultBulkOperations extends BulkOperationsSupport implements BulkOperations {
private final MongoOperations mongoOperations;
private final String collectionName;
@@ -93,6 +94,7 @@ class DefaultBulkOperations implements BulkOperations {
DefaultBulkOperations(MongoOperations mongoOperations, String collectionName,
BulkOperationContext bulkOperationContext) {
+ super(collectionName);
Assert.notNull(mongoOperations, "MongoOperations must not be null");
Assert.hasText(collectionName, "CollectionName must not be null nor empty");
Assert.notNull(bulkOperationContext, "BulkOperationContext must not be null");
@@ -135,7 +137,6 @@ class DefaultBulkOperations implements BulkOperations {
}
@Override
- @SuppressWarnings("unchecked")
public BulkOperations updateOne(Query query, UpdateDefinition update) {
Assert.notNull(query, "Query must not be null");
@@ -157,7 +158,6 @@ class DefaultBulkOperations implements BulkOperations {
}
@Override
- @SuppressWarnings("unchecked")
public BulkOperations updateMulti(Query query, UpdateDefinition update) {
Assert.notNull(query, "Query must not be null");
@@ -326,61 +326,24 @@ class DefaultBulkOperations implements BulkOperations {
return this;
}
- private WriteModel mapWriteModel(Object source, WriteModel writeModel) {
-
- if (writeModel instanceof UpdateOneModel model) {
-
- if (source instanceof AggregationUpdate aggregationUpdate) {
-
- List pipeline = mapUpdatePipeline(aggregationUpdate);
- return new UpdateOneModel<>(getMappedQuery(model.getFilter()), pipeline, model.getOptions());
- }
-
- return new UpdateOneModel<>(getMappedQuery(model.getFilter()), getMappedUpdate(model.getUpdate()),
- model.getOptions());
- }
-
- if (writeModel instanceof UpdateManyModel model) {
-
- if (source instanceof AggregationUpdate aggregationUpdate) {
-
- List pipeline = mapUpdatePipeline(aggregationUpdate);
- return new UpdateManyModel<>(getMappedQuery(model.getFilter()), pipeline, model.getOptions());
- }
-
- return new UpdateManyModel<>(getMappedQuery(model.getFilter()), getMappedUpdate(model.getUpdate()),
- model.getOptions());
- }
-
- if (writeModel instanceof DeleteOneModel model) {
- return new DeleteOneModel<>(getMappedQuery(model.getFilter()), model.getOptions());
- }
-
- if (writeModel instanceof DeleteManyModel model) {
- return new DeleteManyModel<>(getMappedQuery(model.getFilter()), model.getOptions());
- }
-
- return writeModel;
+ @Override
+ protected void maybeEmitEvent(ApplicationEvent event) {
+ bulkOperationContext.publishEvent(event);
}
- private List mapUpdatePipeline(AggregationUpdate source) {
-
- Class> type = bulkOperationContext.entity().isPresent()
- ? bulkOperationContext.entity().map(PersistentEntity::getType).get()
- : Object.class;
- AggregationOperationContext context = new RelaxedTypeBasedAggregationOperationContext(type,
- bulkOperationContext.updateMapper().getMappingContext(), bulkOperationContext.queryMapper());
-
- return new AggregationUtil(bulkOperationContext.queryMapper(),
- bulkOperationContext.queryMapper().getMappingContext()).createPipeline(source, context);
+ @Override
+ protected UpdateMapper updateMapper() {
+ return bulkOperationContext.updateMapper();
}
- private Bson getMappedUpdate(Bson update) {
- return bulkOperationContext.updateMapper().getMappedObject(update, bulkOperationContext.entity());
+ @Override
+ protected QueryMapper queryMapper() {
+ return bulkOperationContext.queryMapper();
}
- private Bson getMappedQuery(Bson query) {
- return bulkOperationContext.queryMapper().getMappedObject(query, bulkOperationContext.entity());
+ @Override
+ protected Optional extends MongoPersistentEntity>> entity() {
+ return bulkOperationContext.entity();
}
private Document getMappedObject(Object source) {
@@ -399,32 +362,6 @@ class DefaultBulkOperations implements BulkOperations {
models.add(new SourceAwareWriteModelHolder(source, model));
}
- private void maybeEmitBeforeSaveEvent(SourceAwareWriteModelHolder holder) {
-
- if (holder.model() instanceof InsertOneModel) {
-
- Document target = ((InsertOneModel) holder.model()).getDocument();
- maybeEmitEvent(new BeforeSaveEvent<>(holder.source(), target, collectionName));
- } else if (holder.model() instanceof ReplaceOneModel) {
-
- Document target = ((ReplaceOneModel) holder.model()).getReplacement();
- maybeEmitEvent(new BeforeSaveEvent<>(holder.source(), target, collectionName));
- }
- }
-
- private void maybeEmitAfterSaveEvent(SourceAwareWriteModelHolder holder) {
-
- if (holder.model() instanceof InsertOneModel) {
-
- Document target = ((InsertOneModel) holder.model()).getDocument();
- maybeEmitEvent(new AfterSaveEvent<>(holder.source(), target, collectionName));
- } else if (holder.model() instanceof ReplaceOneModel) {
-
- Document target = ((ReplaceOneModel) holder.model()).getReplacement();
- maybeEmitEvent(new AfterSaveEvent<>(holder.source(), target, collectionName));
- }
- }
-
private void maybeInvokeAfterSaveCallback(SourceAwareWriteModelHolder holder) {
if (holder.model() instanceof InsertOneModel) {
@@ -438,7 +375,7 @@ class DefaultBulkOperations implements BulkOperations {
}
}
- private void maybeEmitEvent(MongoMappingEvent> event) {
+ private void publishEvent(MongoMappingEvent> event) {
bulkOperationContext.publishEvent(event);
}
@@ -454,43 +391,6 @@ class DefaultBulkOperations implements BulkOperations {
return bulkOperationContext.callback(AfterSaveCallback.class, value, mappedDocument, collectionName);
}
- private static BulkWriteOptions getBulkWriteOptions(BulkMode bulkMode) {
-
- BulkWriteOptions options = new BulkWriteOptions();
-
- switch (bulkMode) {
- case ORDERED:
- return options.ordered(true);
- case UNORDERED:
- return options.ordered(false);
- }
-
- throw new IllegalStateException("BulkMode was null");
- }
-
- /**
- * @param filterQuery The {@link Query} to read a potential {@link Collation} from. Must not be {@literal null}.
- * @param update The {@link Update} to apply
- * @param upsert flag to indicate if document should be upserted.
- * @return new instance of {@link UpdateOptions}.
- */
- private static UpdateOptions computeUpdateOptions(Query filterQuery, UpdateDefinition update, boolean upsert) {
-
- UpdateOptions options = new UpdateOptions();
- options.upsert(upsert);
-
- if (update.hasArrayFilters()) {
- List list = new ArrayList<>(update.getArrayFilters().size());
- for (ArrayFilter arrayFilter : update.getArrayFilters()) {
- list.add(arrayFilter.asDocument());
- }
- options.arrayFilters(list);
- }
-
- filterQuery.getCollation().map(Collation::toMongoCollation).ifPresent(options::collation);
- return options;
- }
-
/**
* {@link BulkOperationContext} holds information about {@link BulkMode} the entity in use as well as references to
* {@link QueryMapper} and {@link UpdateMapper}.
@@ -502,14 +402,14 @@ class DefaultBulkOperations implements BulkOperations {
QueryMapper queryMapper, UpdateMapper updateMapper, @Nullable ApplicationEventPublisher eventPublisher,
@Nullable EntityCallbacks entityCallbacks) {
- public boolean skipEventPublishing() {
- return eventPublisher == null;
- }
-
public boolean skipEntityCallbacks() {
return entityCallbacks == null;
}
+ public boolean skipEventPublishing() {
+ return eventPublisher == null;
+ }
+
@SuppressWarnings("rawtypes")
public T callback(Class extends EntityCallback> callbackType, T entity, String collectionName) {
@@ -541,13 +441,4 @@ class DefaultBulkOperations implements BulkOperations {
}
}
- /**
- * Value object chaining together an actual source with its {@link WriteModel} representation.
- *
- * @author Christoph Strobl
- * @since 2.2
- */
- record SourceAwareWriteModelHolder(Object source, WriteModel model) {
-
- }
}
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperations.java
index ce24710f3..45d6709c9 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperations.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperations.java
@@ -24,19 +24,15 @@ import java.util.Optional;
import java.util.stream.Collectors;
import org.bson.Document;
-import org.bson.conversions.Bson;
+import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
-import org.springframework.dao.DataIntegrityViolationException;
+import org.springframework.data.mapping.callback.EntityCallback;
import org.springframework.data.mapping.callback.ReactiveEntityCallbacks;
-import org.springframework.data.mongodb.BulkOperationException;
import org.springframework.data.mongodb.core.BulkOperations.BulkMode;
import org.springframework.data.mongodb.core.convert.QueryMapper;
import org.springframework.data.mongodb.core.convert.UpdateMapper;
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
-import org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent;
import org.springframework.data.mongodb.core.mapping.event.BeforeConvertEvent;
-import org.springframework.data.mongodb.core.mapping.event.BeforeSaveEvent;
-import org.springframework.data.mongodb.core.mapping.event.MongoMappingEvent;
import org.springframework.data.mongodb.core.mapping.event.ReactiveAfterSaveCallback;
import org.springframework.data.mongodb.core.mapping.event.ReactiveBeforeConvertCallback;
import org.springframework.data.mongodb.core.mapping.event.ReactiveBeforeSaveCallback;
@@ -44,24 +40,30 @@ import org.springframework.data.mongodb.core.query.Collation;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.data.mongodb.core.query.UpdateDefinition;
-import org.springframework.data.mongodb.core.query.UpdateDefinition.ArrayFilter;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
-import org.springframework.util.ObjectUtils;
-import com.mongodb.MongoBulkWriteException;
import com.mongodb.WriteConcern;
import com.mongodb.bulk.BulkWriteResult;
-import com.mongodb.client.model.*;
+import com.mongodb.client.model.BulkWriteOptions;
+import com.mongodb.client.model.DeleteManyModel;
+import com.mongodb.client.model.DeleteOptions;
+import com.mongodb.client.model.InsertOneModel;
+import com.mongodb.client.model.ReplaceOneModel;
+import com.mongodb.client.model.ReplaceOptions;
+import com.mongodb.client.model.UpdateManyModel;
+import com.mongodb.client.model.UpdateOneModel;
+import com.mongodb.client.model.UpdateOptions;
import com.mongodb.reactivestreams.client.MongoCollection;
/**
* Default implementation for {@link ReactiveBulkOperations}.
*
* @author Christoph Strobl
+ * @author Mark Paluch
* @since 4.1
*/
-class DefaultReactiveBulkOperations implements ReactiveBulkOperations {
+class DefaultReactiveBulkOperations extends BulkOperationsSupport implements ReactiveBulkOperations {
private final ReactiveMongoOperations mongoOperations;
private final String collectionName;
@@ -83,6 +85,8 @@ class DefaultReactiveBulkOperations implements ReactiveBulkOperations {
DefaultReactiveBulkOperations(ReactiveMongoOperations mongoOperations, String collectionName,
ReactiveBulkOperationContext bulkOperationContext) {
+ super(collectionName);
+
Assert.notNull(mongoOperations, "MongoOperations must not be null");
Assert.hasText(collectionName, "CollectionName must not be null nor empty");
Assert.notNull(bulkOperationContext, "BulkOperationContext must not be null");
@@ -90,7 +94,7 @@ class DefaultReactiveBulkOperations implements ReactiveBulkOperations {
this.mongoOperations = mongoOperations;
this.collectionName = collectionName;
this.bulkOperationContext = bulkOperationContext;
- this.bulkOptions = getBulkWriteOptions(bulkOperationContext.getBulkMode());
+ this.bulkOptions = getBulkWriteOptions(bulkOperationContext.bulkMode());
}
/**
@@ -110,7 +114,7 @@ class DefaultReactiveBulkOperations implements ReactiveBulkOperations {
this.models.add(Mono.just(document).flatMap(it -> {
maybeEmitEvent(new BeforeConvertEvent<>(it, collectionName));
return maybeInvokeBeforeConvertCallback(it);
- }).map(it -> new SourceAwareWriteModelHolder(it, new InsertOneModel(getMappedObject(it)))));
+ }).map(it -> new SourceAwareWriteModelHolder(it, new InsertOneModel<>(getMappedObject(it)))));
return this;
}
@@ -126,7 +130,6 @@ class DefaultReactiveBulkOperations implements ReactiveBulkOperations {
}
@Override
- @SuppressWarnings("unchecked")
public ReactiveBulkOperations updateOne(Query query, UpdateDefinition update) {
Assert.notNull(query, "Query must not be null");
@@ -137,7 +140,6 @@ class DefaultReactiveBulkOperations implements ReactiveBulkOperations {
}
@Override
- @SuppressWarnings("unchecked")
public ReactiveBulkOperations updateMulti(Query query, UpdateDefinition update) {
Assert.notNull(query, "Query must not be null");
@@ -202,11 +204,9 @@ class DefaultReactiveBulkOperations implements ReactiveBulkOperations {
public Mono execute() {
try {
-
- Mono result = mongoOperations.execute(collectionName, this::bulkWriteTo).next();
- return result;
+ return mongoOperations.execute(collectionName, this::bulkWriteTo).next();
} finally {
- this.bulkOptions = getBulkWriteOptions(bulkOperationContext.getBulkMode());
+ this.bulkOptions = getBulkWriteOptions(bulkOperationContext.bulkMode());
}
}
@@ -216,55 +216,39 @@ class DefaultReactiveBulkOperations implements ReactiveBulkOperations {
collection = collection.withWriteConcern(defaultWriteConcern);
}
- try {
+ Flux concat = Flux.concat(models).flatMap(it -> {
- Flux concat = Flux.concat(models).flatMap(it -> {
- if (it.getModel()instanceof InsertOneModel insertOneModel) {
-
- Document target = insertOneModel.getDocument();
- maybeEmitBeforeSaveEvent(it);
- return maybeInvokeBeforeSaveCallback(it.getSource(), target)
- .map(afterCallback -> new SourceAwareWriteModelHolder(afterCallback, mapWriteModel(insertOneModel)));
- } else if (it.getModel()instanceof ReplaceOneModel replaceOneModel) {
-
- Document target = replaceOneModel.getReplacement();
- maybeEmitBeforeSaveEvent(it);
- return maybeInvokeBeforeSaveCallback(it.getSource(), target)
- .map(afterCallback -> new SourceAwareWriteModelHolder(afterCallback, mapWriteModel(replaceOneModel)));
- }
- return Mono.just(new SourceAwareWriteModelHolder(it.getSource(), mapWriteModel(it.getModel())));
- });
- MongoCollection theCollection = collection;
- return concat.collectList().flatMap(it -> {
-
- return Mono
- .from(theCollection.bulkWrite(
- it.stream().map(SourceAwareWriteModelHolder::getModel).collect(Collectors.toList()), bulkOptions))
- .doOnSuccess(state -> {
- it.forEach(saved -> {
- maybeEmitAfterSaveEvent(saved);
- });
- }).flatMap(state -> {
- List> monos = it.stream().map(saved -> {
- return maybeInvokeAfterSaveCallback(saved);
- }).collect(Collectors.toList());
-
- return Flux.concat(monos).then(Mono.just(state));
- });
- });
- } catch (RuntimeException ex) {
-
- if (ex instanceof MongoBulkWriteException) {
-
- MongoBulkWriteException mongoBulkWriteException = (MongoBulkWriteException) ex;
- if (mongoBulkWriteException.getWriteConcernError() != null) {
- throw new DataIntegrityViolationException(ex.getMessage(), ex);
- }
- throw new BulkOperationException(ex.getMessage(), mongoBulkWriteException);
+ if (it.model()instanceof InsertOneModel iom) {
+
+ Document target = iom.getDocument();
+ maybeEmitBeforeSaveEvent(it);
+ return maybeInvokeBeforeSaveCallback(it.source(), target)
+ .map(afterCallback -> new SourceAwareWriteModelHolder(afterCallback, mapWriteModel(afterCallback, iom)));
+ } else if (it.model()instanceof ReplaceOneModel rom) {
+
+ Document target = rom.getReplacement();
+ maybeEmitBeforeSaveEvent(it);
+ return maybeInvokeBeforeSaveCallback(it.source(), target)
+ .map(afterCallback -> new SourceAwareWriteModelHolder(afterCallback, mapWriteModel(afterCallback, rom)));
}
- throw ex;
- }
+ return Mono.just(new SourceAwareWriteModelHolder(it.source(), mapWriteModel(it.source(), it.model())));
+ });
+
+ MongoCollection theCollection = collection;
+ return concat.collectList().flatMap(it -> {
+
+ return Mono
+ .from(theCollection
+ .bulkWrite(it.stream().map(SourceAwareWriteModelHolder::model).collect(Collectors.toList()), bulkOptions))
+ .doOnSuccess(state -> {
+ it.forEach(this::maybeEmitAfterSaveEvent);
+ }).flatMap(state -> {
+ List> monos = it.stream().map(this::maybeInvokeAfterSaveCallback).collect(Collectors.toList());
+
+ return Flux.concat(monos).then(Mono.just(state));
+ });
+ });
}
/**
@@ -295,47 +279,24 @@ class DefaultReactiveBulkOperations implements ReactiveBulkOperations {
return this;
}
- private WriteModel mapWriteModel(WriteModel writeModel) {
-
- if (writeModel instanceof UpdateOneModel) {
-
- UpdateOneModel model = (UpdateOneModel) writeModel;
-
- return new UpdateOneModel<>(getMappedQuery(model.getFilter()), getMappedUpdate(model.getUpdate()),
- model.getOptions());
- }
-
- if (writeModel instanceof UpdateManyModel) {
-
- UpdateManyModel model = (UpdateManyModel) writeModel;
-
- return new UpdateManyModel<>(getMappedQuery(model.getFilter()), getMappedUpdate(model.getUpdate()),
- model.getOptions());
- }
-
- if (writeModel instanceof DeleteOneModel) {
-
- DeleteOneModel model = (DeleteOneModel) writeModel;
-
- return new DeleteOneModel<>(getMappedQuery(model.getFilter()), model.getOptions());
- }
-
- if (writeModel instanceof DeleteManyModel) {
-
- DeleteManyModel model = (DeleteManyModel) writeModel;
-
- return new DeleteManyModel<>(getMappedQuery(model.getFilter()), model.getOptions());
- }
+ @Override
+ protected void maybeEmitEvent(ApplicationEvent event) {
+ bulkOperationContext.publishEvent(event);
+ }
- return writeModel;
+ @Override
+ protected UpdateMapper updateMapper() {
+ return bulkOperationContext.updateMapper();
}
- private Bson getMappedUpdate(Bson update) {
- return bulkOperationContext.getUpdateMapper().getMappedObject(update, bulkOperationContext.getEntity());
+ @Override
+ protected QueryMapper queryMapper() {
+ return bulkOperationContext.queryMapper();
}
- private Bson getMappedQuery(Bson query) {
- return bulkOperationContext.getQueryMapper().getMappedObject(query, bulkOperationContext.getEntity());
+ @Override
+ protected Optional extends MongoPersistentEntity>> entity() {
+ return bulkOperationContext.entity();
}
private Document getMappedObject(Object source) {
@@ -350,121 +311,30 @@ class DefaultReactiveBulkOperations implements ReactiveBulkOperations {
return sink;
}
- private void maybeEmitBeforeSaveEvent(SourceAwareWriteModelHolder holder) {
-
- if (holder.getModel() instanceof InsertOneModel) {
-
- Document target = ((InsertOneModel) holder.getModel()).getDocument();
- maybeEmitEvent(new BeforeSaveEvent<>(holder.getSource(), target, collectionName));
- } else if (holder.getModel() instanceof ReplaceOneModel) {
-
- Document target = ((ReplaceOneModel) holder.getModel()).getReplacement();
- maybeEmitEvent(new BeforeSaveEvent<>(holder.getSource(), target, collectionName));
- }
- }
-
- private void maybeEmitAfterSaveEvent(SourceAwareWriteModelHolder holder) {
-
- if (holder.getModel() instanceof InsertOneModel) {
-
- Document target = ((InsertOneModel) holder.getModel()).getDocument();
- maybeEmitEvent(new AfterSaveEvent<>(holder.getSource(), target, collectionName));
- } else if (holder.getModel() instanceof ReplaceOneModel) {
-
- Document target = ((ReplaceOneModel) holder.getModel()).getReplacement();
- maybeEmitEvent(new AfterSaveEvent<>(holder.getSource(), target, collectionName));
- }
- }
-
private Mono