From 67bd722cfd73a6cd782a24f2c5c059d818ab49ba Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Fri, 14 Apr 2023 14:50:54 +0200 Subject: [PATCH] Polishing. Extract common code into BulkOperationsSupport. Reorder methods. Add missing verifyComplete to tests. See #2821 Original pull request: #4342 --- .../data/mongodb/core/BulkOperations.java | 4 +- .../mongodb/core/BulkOperationsSupport.java | 221 ++++++++++ .../mongodb/core/DefaultBulkOperations.java | 167 ++------ .../core/DefaultReactiveBulkOperations.java | 395 +++++------------- .../mongodb/core/ReactiveBulkOperations.java | 14 +- .../mongodb/core/ReactiveMongoOperations.java | 67 +-- .../mongodb/core/ReactiveMongoTemplate.java | 55 ++- .../DefaultReactiveBulkOperationsTests.java | 6 +- 8 files changed, 425 insertions(+), 504 deletions(-) create mode 100644 spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/BulkOperationsSupport.java diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/BulkOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/BulkOperations.java index 9e7dd2d2a..c9d67d1e3 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/BulkOperations.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/BulkOperations.java @@ -31,9 +31,9 @@ import com.mongodb.bulk.BulkWriteResult; * {@link #execute()}. * *
- * MongoTemplate template = …;
+ * MongoOperations ops = …;
  *
- * template.bulkOps(BulkMode.UNORDERED, Person.class)
+ * ops.bulkOps(BulkMode.UNORDERED, Person.class)
  * 				.insert(newPerson)
  * 				.updateOne(where("firstname").is("Joe"), Update.update("lastname", "Doe"))
  * 				.execute();
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/BulkOperationsSupport.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/BulkOperationsSupport.java
new file mode 100644
index 000000000..5bbcbd37d
--- /dev/null
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/BulkOperationsSupport.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright 2023 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.mongodb.core;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.springframework.context.ApplicationEvent;
+import org.springframework.data.mapping.PersistentEntity;
+import org.springframework.data.mongodb.core.BulkOperations.BulkMode;
+import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
+import org.springframework.data.mongodb.core.aggregation.AggregationUpdate;
+import org.springframework.data.mongodb.core.aggregation.RelaxedTypeBasedAggregationOperationContext;
+import org.springframework.data.mongodb.core.convert.QueryMapper;
+import org.springframework.data.mongodb.core.convert.UpdateMapper;
+import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
+import org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent;
+import org.springframework.data.mongodb.core.mapping.event.BeforeSaveEvent;
+import org.springframework.data.mongodb.core.query.Collation;
+import org.springframework.data.mongodb.core.query.Query;
+import org.springframework.data.mongodb.core.query.Update;
+import org.springframework.data.mongodb.core.query.UpdateDefinition;
+import org.springframework.data.mongodb.core.query.UpdateDefinition.ArrayFilter;
+import org.springframework.util.Assert;
+
+import com.mongodb.client.model.BulkWriteOptions;
+import com.mongodb.client.model.DeleteManyModel;
+import com.mongodb.client.model.DeleteOneModel;
+import com.mongodb.client.model.InsertOneModel;
+import com.mongodb.client.model.ReplaceOneModel;
+import com.mongodb.client.model.UpdateManyModel;
+import com.mongodb.client.model.UpdateOneModel;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.model.WriteModel;
+
+/**
+ * Support class for bulk operations.
+ *
+ * @author Mark Paluch
+ * @since 4.1
+ */
+abstract class BulkOperationsSupport {
+
+	private final String collectionName;
+
+	BulkOperationsSupport(String collectionName) {
+
+		Assert.hasText(collectionName, "CollectionName must not be null nor empty");
+
+		this.collectionName = collectionName;
+	}
+
+	/**
+	 * Emit a {@link BeforeSaveEvent}.
+	 *
+	 * @param holder
+	 */
+	void maybeEmitBeforeSaveEvent(SourceAwareWriteModelHolder holder) {
+
+		if (holder.model() instanceof InsertOneModel) {
+
+			Document target = ((InsertOneModel) holder.model()).getDocument();
+			maybeEmitEvent(new BeforeSaveEvent<>(holder.source(), target, collectionName));
+		} else if (holder.model() instanceof ReplaceOneModel) {
+
+			Document target = ((ReplaceOneModel) holder.model()).getReplacement();
+			maybeEmitEvent(new BeforeSaveEvent<>(holder.source(), target, collectionName));
+		}
+	}
+
+	/**
+	 * Emit a {@link AfterSaveEvent}.
+	 *
+	 * @param holder
+	 */
+	void maybeEmitAfterSaveEvent(SourceAwareWriteModelHolder holder) {
+
+		if (holder.model() instanceof InsertOneModel) {
+
+			Document target = ((InsertOneModel) holder.model()).getDocument();
+			maybeEmitEvent(new AfterSaveEvent<>(holder.source(), target, collectionName));
+		} else if (holder.model() instanceof ReplaceOneModel) {
+
+			Document target = ((ReplaceOneModel) holder.model()).getReplacement();
+			maybeEmitEvent(new AfterSaveEvent<>(holder.source(), target, collectionName));
+		}
+	}
+
+	WriteModel mapWriteModel(Object source, WriteModel writeModel) {
+
+		if (writeModel instanceof UpdateOneModel model) {
+
+			if (source instanceof AggregationUpdate aggregationUpdate) {
+
+				List pipeline = mapUpdatePipeline(aggregationUpdate);
+				return new UpdateOneModel<>(getMappedQuery(model.getFilter()), pipeline, model.getOptions());
+			}
+
+			return new UpdateOneModel<>(getMappedQuery(model.getFilter()), getMappedUpdate(model.getUpdate()),
+					model.getOptions());
+		}
+
+		if (writeModel instanceof UpdateManyModel model) {
+
+			if (source instanceof AggregationUpdate aggregationUpdate) {
+
+				List pipeline = mapUpdatePipeline(aggregationUpdate);
+				return new UpdateManyModel<>(getMappedQuery(model.getFilter()), pipeline, model.getOptions());
+			}
+
+			return new UpdateManyModel<>(getMappedQuery(model.getFilter()), getMappedUpdate(model.getUpdate()),
+					model.getOptions());
+		}
+
+		if (writeModel instanceof DeleteOneModel model) {
+			return new DeleteOneModel<>(getMappedQuery(model.getFilter()), model.getOptions());
+		}
+
+		if (writeModel instanceof DeleteManyModel model) {
+			return new DeleteManyModel<>(getMappedQuery(model.getFilter()), model.getOptions());
+		}
+
+		return writeModel;
+	}
+
+	private List mapUpdatePipeline(AggregationUpdate source) {
+
+		Class type = entity().isPresent() ? entity().map(PersistentEntity::getType).get() : Object.class;
+		AggregationOperationContext context = new RelaxedTypeBasedAggregationOperationContext(type,
+				updateMapper().getMappingContext(), queryMapper());
+
+		return new AggregationUtil(queryMapper(), queryMapper().getMappingContext()).createPipeline(source, context);
+	}
+
+	/**
+	 * Emit a {@link ApplicationEvent} if event multicasting is enabled.
+	 *
+	 * @param event
+	 */
+	protected abstract void maybeEmitEvent(ApplicationEvent event);
+
+	/**
+	 * @return the {@link UpdateMapper} to use.
+	 */
+	protected abstract UpdateMapper updateMapper();
+
+	/**
+	 * @return the {@link QueryMapper} to use.
+	 */
+	protected abstract QueryMapper queryMapper();
+
+	/**
+	 * @return the associated {@link PersistentEntity}. Can be {@link Optional#empty()}.
+	 */
+	protected abstract Optional> entity();
+
+	protected Bson getMappedUpdate(Bson update) {
+		return updateMapper().getMappedObject(update, entity());
+	}
+
+	protected Bson getMappedQuery(Bson query) {
+		return queryMapper().getMappedObject(query, entity());
+	}
+
+	protected static BulkWriteOptions getBulkWriteOptions(BulkMode bulkMode) {
+
+		BulkWriteOptions options = new BulkWriteOptions();
+
+		return switch (bulkMode) {
+			case ORDERED -> options.ordered(true);
+			case UNORDERED -> options.ordered(false);
+		};
+	}
+
+	/**
+	 * @param filterQuery The {@link Query} to read a potential {@link Collation} from. Must not be {@literal null}.
+	 * @param update The {@link Update} to apply
+	 * @param upsert flag to indicate if document should be upserted.
+	 * @return new instance of {@link UpdateOptions}.
+	 */
+	protected static UpdateOptions computeUpdateOptions(Query filterQuery, UpdateDefinition update, boolean upsert) {
+
+		UpdateOptions options = new UpdateOptions();
+		options.upsert(upsert);
+
+		if (update.hasArrayFilters()) {
+			List list = new ArrayList<>(update.getArrayFilters().size());
+			for (ArrayFilter arrayFilter : update.getArrayFilters()) {
+				list.add(arrayFilter.asDocument());
+			}
+			options.arrayFilters(list);
+		}
+
+		filterQuery.getCollation().map(Collation::toMongoCollation).ifPresent(options::collation);
+		return options;
+	}
+
+	/**
+	 * Value object chaining together an actual source with its {@link WriteModel} representation.
+	 *
+	 * @author Christoph Strobl
+	 */
+	record SourceAwareWriteModelHolder(Object source, WriteModel model) {
+	}
+}
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultBulkOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultBulkOperations.java
index bca9d1588..cee7d9624 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultBulkOperations.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultBulkOperations.java
@@ -21,32 +21,24 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 
 import org.bson.Document;
-import org.bson.conversions.Bson;
 import org.springframework.context.ApplicationEvent;
 import org.springframework.context.ApplicationEventPublisher;
 import org.springframework.dao.DataIntegrityViolationException;
-import org.springframework.data.mapping.PersistentEntity;
 import org.springframework.data.mapping.callback.EntityCallback;
 import org.springframework.data.mapping.callback.EntityCallbacks;
 import org.springframework.data.mongodb.BulkOperationException;
-import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
-import org.springframework.data.mongodb.core.aggregation.AggregationUpdate;
-import org.springframework.data.mongodb.core.aggregation.RelaxedTypeBasedAggregationOperationContext;
 import org.springframework.data.mongodb.core.convert.QueryMapper;
 import org.springframework.data.mongodb.core.convert.UpdateMapper;
 import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
 import org.springframework.data.mongodb.core.mapping.event.AfterSaveCallback;
-import org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent;
 import org.springframework.data.mongodb.core.mapping.event.BeforeConvertCallback;
 import org.springframework.data.mongodb.core.mapping.event.BeforeConvertEvent;
 import org.springframework.data.mongodb.core.mapping.event.BeforeSaveCallback;
-import org.springframework.data.mongodb.core.mapping.event.BeforeSaveEvent;
 import org.springframework.data.mongodb.core.mapping.event.MongoMappingEvent;
 import org.springframework.data.mongodb.core.query.Collation;
 import org.springframework.data.mongodb.core.query.Query;
 import org.springframework.data.mongodb.core.query.Update;
 import org.springframework.data.mongodb.core.query.UpdateDefinition;
-import org.springframework.data.mongodb.core.query.UpdateDefinition.ArrayFilter;
 import org.springframework.data.util.Pair;
 import org.springframework.lang.Nullable;
 import org.springframework.util.Assert;
@@ -55,7 +47,16 @@ import com.mongodb.MongoBulkWriteException;
 import com.mongodb.WriteConcern;
 import com.mongodb.bulk.BulkWriteResult;
 import com.mongodb.client.MongoCollection;
-import com.mongodb.client.model.*;
+import com.mongodb.client.model.BulkWriteOptions;
+import com.mongodb.client.model.DeleteManyModel;
+import com.mongodb.client.model.DeleteOptions;
+import com.mongodb.client.model.InsertOneModel;
+import com.mongodb.client.model.ReplaceOneModel;
+import com.mongodb.client.model.ReplaceOptions;
+import com.mongodb.client.model.UpdateManyModel;
+import com.mongodb.client.model.UpdateOneModel;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.model.WriteModel;
 
 /**
  * Default implementation for {@link BulkOperations}.
@@ -71,7 +72,7 @@ import com.mongodb.client.model.*;
  * @author Jacob Botuck
  * @since 1.9
  */
-class DefaultBulkOperations implements BulkOperations {
+class DefaultBulkOperations extends BulkOperationsSupport implements BulkOperations {
 
 	private final MongoOperations mongoOperations;
 	private final String collectionName;
@@ -93,6 +94,7 @@ class DefaultBulkOperations implements BulkOperations {
 	DefaultBulkOperations(MongoOperations mongoOperations, String collectionName,
 			BulkOperationContext bulkOperationContext) {
 
+		super(collectionName);
 		Assert.notNull(mongoOperations, "MongoOperations must not be null");
 		Assert.hasText(collectionName, "CollectionName must not be null nor empty");
 		Assert.notNull(bulkOperationContext, "BulkOperationContext must not be null");
@@ -135,7 +137,6 @@ class DefaultBulkOperations implements BulkOperations {
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
 	public BulkOperations updateOne(Query query, UpdateDefinition update) {
 
 		Assert.notNull(query, "Query must not be null");
@@ -157,7 +158,6 @@ class DefaultBulkOperations implements BulkOperations {
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
 	public BulkOperations updateMulti(Query query, UpdateDefinition update) {
 
 		Assert.notNull(query, "Query must not be null");
@@ -326,61 +326,24 @@ class DefaultBulkOperations implements BulkOperations {
 		return this;
 	}
 
-	private WriteModel mapWriteModel(Object source, WriteModel writeModel) {
-
-		if (writeModel instanceof UpdateOneModel model) {
-
-			if (source instanceof AggregationUpdate aggregationUpdate) {
-
-				List pipeline = mapUpdatePipeline(aggregationUpdate);
-				return new UpdateOneModel<>(getMappedQuery(model.getFilter()), pipeline, model.getOptions());
-			}
-
-			return new UpdateOneModel<>(getMappedQuery(model.getFilter()), getMappedUpdate(model.getUpdate()),
-					model.getOptions());
-		}
-
-		if (writeModel instanceof UpdateManyModel model) {
-
-			if (source instanceof AggregationUpdate aggregationUpdate) {
-
-				List pipeline = mapUpdatePipeline(aggregationUpdate);
-				return new UpdateManyModel<>(getMappedQuery(model.getFilter()), pipeline, model.getOptions());
-			}
-
-			return new UpdateManyModel<>(getMappedQuery(model.getFilter()), getMappedUpdate(model.getUpdate()),
-					model.getOptions());
-		}
-
-		if (writeModel instanceof DeleteOneModel model) {
-			return new DeleteOneModel<>(getMappedQuery(model.getFilter()), model.getOptions());
-		}
-
-		if (writeModel instanceof DeleteManyModel model) {
-			return new DeleteManyModel<>(getMappedQuery(model.getFilter()), model.getOptions());
-		}
-
-		return writeModel;
+	@Override
+	protected void maybeEmitEvent(ApplicationEvent event) {
+		bulkOperationContext.publishEvent(event);
 	}
 
-	private List mapUpdatePipeline(AggregationUpdate source) {
-
-		Class type = bulkOperationContext.entity().isPresent()
-				? bulkOperationContext.entity().map(PersistentEntity::getType).get()
-				: Object.class;
-		AggregationOperationContext context = new RelaxedTypeBasedAggregationOperationContext(type,
-				bulkOperationContext.updateMapper().getMappingContext(), bulkOperationContext.queryMapper());
-
-		return new AggregationUtil(bulkOperationContext.queryMapper(),
-				bulkOperationContext.queryMapper().getMappingContext()).createPipeline(source, context);
+	@Override
+	protected UpdateMapper updateMapper() {
+		return bulkOperationContext.updateMapper();
 	}
 
-	private Bson getMappedUpdate(Bson update) {
-		return bulkOperationContext.updateMapper().getMappedObject(update, bulkOperationContext.entity());
+	@Override
+	protected QueryMapper queryMapper() {
+		return bulkOperationContext.queryMapper();
 	}
 
-	private Bson getMappedQuery(Bson query) {
-		return bulkOperationContext.queryMapper().getMappedObject(query, bulkOperationContext.entity());
+	@Override
+	protected Optional> entity() {
+		return bulkOperationContext.entity();
 	}
 
 	private Document getMappedObject(Object source) {
@@ -399,32 +362,6 @@ class DefaultBulkOperations implements BulkOperations {
 		models.add(new SourceAwareWriteModelHolder(source, model));
 	}
 
-	private void maybeEmitBeforeSaveEvent(SourceAwareWriteModelHolder holder) {
-
-		if (holder.model() instanceof InsertOneModel) {
-
-			Document target = ((InsertOneModel) holder.model()).getDocument();
-			maybeEmitEvent(new BeforeSaveEvent<>(holder.source(), target, collectionName));
-		} else if (holder.model() instanceof ReplaceOneModel) {
-
-			Document target = ((ReplaceOneModel) holder.model()).getReplacement();
-			maybeEmitEvent(new BeforeSaveEvent<>(holder.source(), target, collectionName));
-		}
-	}
-
-	private void maybeEmitAfterSaveEvent(SourceAwareWriteModelHolder holder) {
-
-		if (holder.model() instanceof InsertOneModel) {
-
-			Document target = ((InsertOneModel) holder.model()).getDocument();
-			maybeEmitEvent(new AfterSaveEvent<>(holder.source(), target, collectionName));
-		} else if (holder.model() instanceof ReplaceOneModel) {
-
-			Document target = ((ReplaceOneModel) holder.model()).getReplacement();
-			maybeEmitEvent(new AfterSaveEvent<>(holder.source(), target, collectionName));
-		}
-	}
-
 	private void maybeInvokeAfterSaveCallback(SourceAwareWriteModelHolder holder) {
 
 		if (holder.model() instanceof InsertOneModel) {
@@ -438,7 +375,7 @@ class DefaultBulkOperations implements BulkOperations {
 		}
 	}
 
-	private void maybeEmitEvent(MongoMappingEvent event) {
+	private void publishEvent(MongoMappingEvent event) {
 		bulkOperationContext.publishEvent(event);
 	}
 
@@ -454,43 +391,6 @@ class DefaultBulkOperations implements BulkOperations {
 		return bulkOperationContext.callback(AfterSaveCallback.class, value, mappedDocument, collectionName);
 	}
 
-	private static BulkWriteOptions getBulkWriteOptions(BulkMode bulkMode) {
-
-		BulkWriteOptions options = new BulkWriteOptions();
-
-		switch (bulkMode) {
-			case ORDERED:
-				return options.ordered(true);
-			case UNORDERED:
-				return options.ordered(false);
-		}
-
-		throw new IllegalStateException("BulkMode was null");
-	}
-
-	/**
-	 * @param filterQuery The {@link Query} to read a potential {@link Collation} from. Must not be {@literal null}.
-	 * @param update The {@link Update} to apply
-	 * @param upsert flag to indicate if document should be upserted.
-	 * @return new instance of {@link UpdateOptions}.
-	 */
-	private static UpdateOptions computeUpdateOptions(Query filterQuery, UpdateDefinition update, boolean upsert) {
-
-		UpdateOptions options = new UpdateOptions();
-		options.upsert(upsert);
-
-		if (update.hasArrayFilters()) {
-			List list = new ArrayList<>(update.getArrayFilters().size());
-			for (ArrayFilter arrayFilter : update.getArrayFilters()) {
-				list.add(arrayFilter.asDocument());
-			}
-			options.arrayFilters(list);
-		}
-
-		filterQuery.getCollation().map(Collation::toMongoCollation).ifPresent(options::collation);
-		return options;
-	}
-
 	/**
 	 * {@link BulkOperationContext} holds information about {@link BulkMode} the entity in use as well as references to
 	 * {@link QueryMapper} and {@link UpdateMapper}.
@@ -502,14 +402,14 @@ class DefaultBulkOperations implements BulkOperations {
 			QueryMapper queryMapper, UpdateMapper updateMapper, @Nullable ApplicationEventPublisher eventPublisher,
 			@Nullable EntityCallbacks entityCallbacks) {
 
-		public boolean skipEventPublishing() {
-			return eventPublisher == null;
-		}
-
 		public boolean skipEntityCallbacks() {
 			return entityCallbacks == null;
 		}
 
+		public boolean skipEventPublishing() {
+			return eventPublisher == null;
+		}
+
 		@SuppressWarnings("rawtypes")
 		public  T callback(Class callbackType, T entity, String collectionName) {
 
@@ -541,13 +441,4 @@ class DefaultBulkOperations implements BulkOperations {
 		}
 	}
 
-	/**
-	 * Value object chaining together an actual source with its {@link WriteModel} representation.
-	 *
-	 * @author Christoph Strobl
-	 * @since 2.2
-	 */
-	record SourceAwareWriteModelHolder(Object source, WriteModel model) {
-
-	}
 }
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperations.java
index ce24710f3..45d6709c9 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperations.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperations.java
@@ -24,19 +24,15 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 
 import org.bson.Document;
-import org.bson.conversions.Bson;
+import org.springframework.context.ApplicationEvent;
 import org.springframework.context.ApplicationEventPublisher;
-import org.springframework.dao.DataIntegrityViolationException;
+import org.springframework.data.mapping.callback.EntityCallback;
 import org.springframework.data.mapping.callback.ReactiveEntityCallbacks;
-import org.springframework.data.mongodb.BulkOperationException;
 import org.springframework.data.mongodb.core.BulkOperations.BulkMode;
 import org.springframework.data.mongodb.core.convert.QueryMapper;
 import org.springframework.data.mongodb.core.convert.UpdateMapper;
 import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
-import org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent;
 import org.springframework.data.mongodb.core.mapping.event.BeforeConvertEvent;
-import org.springframework.data.mongodb.core.mapping.event.BeforeSaveEvent;
-import org.springframework.data.mongodb.core.mapping.event.MongoMappingEvent;
 import org.springframework.data.mongodb.core.mapping.event.ReactiveAfterSaveCallback;
 import org.springframework.data.mongodb.core.mapping.event.ReactiveBeforeConvertCallback;
 import org.springframework.data.mongodb.core.mapping.event.ReactiveBeforeSaveCallback;
@@ -44,24 +40,30 @@ import org.springframework.data.mongodb.core.query.Collation;
 import org.springframework.data.mongodb.core.query.Query;
 import org.springframework.data.mongodb.core.query.Update;
 import org.springframework.data.mongodb.core.query.UpdateDefinition;
-import org.springframework.data.mongodb.core.query.UpdateDefinition.ArrayFilter;
 import org.springframework.lang.Nullable;
 import org.springframework.util.Assert;
-import org.springframework.util.ObjectUtils;
 
-import com.mongodb.MongoBulkWriteException;
 import com.mongodb.WriteConcern;
 import com.mongodb.bulk.BulkWriteResult;
-import com.mongodb.client.model.*;
+import com.mongodb.client.model.BulkWriteOptions;
+import com.mongodb.client.model.DeleteManyModel;
+import com.mongodb.client.model.DeleteOptions;
+import com.mongodb.client.model.InsertOneModel;
+import com.mongodb.client.model.ReplaceOneModel;
+import com.mongodb.client.model.ReplaceOptions;
+import com.mongodb.client.model.UpdateManyModel;
+import com.mongodb.client.model.UpdateOneModel;
+import com.mongodb.client.model.UpdateOptions;
 import com.mongodb.reactivestreams.client.MongoCollection;
 
 /**
  * Default implementation for {@link ReactiveBulkOperations}.
  *
  * @author Christoph Strobl
+ * @author Mark Paluch
  * @since 4.1
  */
-class DefaultReactiveBulkOperations implements ReactiveBulkOperations {
+class DefaultReactiveBulkOperations extends BulkOperationsSupport implements ReactiveBulkOperations {
 
 	private final ReactiveMongoOperations mongoOperations;
 	private final String collectionName;
@@ -83,6 +85,8 @@ class DefaultReactiveBulkOperations implements ReactiveBulkOperations {
 	DefaultReactiveBulkOperations(ReactiveMongoOperations mongoOperations, String collectionName,
 			ReactiveBulkOperationContext bulkOperationContext) {
 
+		super(collectionName);
+
 		Assert.notNull(mongoOperations, "MongoOperations must not be null");
 		Assert.hasText(collectionName, "CollectionName must not be null nor empty");
 		Assert.notNull(bulkOperationContext, "BulkOperationContext must not be null");
@@ -90,7 +94,7 @@ class DefaultReactiveBulkOperations implements ReactiveBulkOperations {
 		this.mongoOperations = mongoOperations;
 		this.collectionName = collectionName;
 		this.bulkOperationContext = bulkOperationContext;
-		this.bulkOptions = getBulkWriteOptions(bulkOperationContext.getBulkMode());
+		this.bulkOptions = getBulkWriteOptions(bulkOperationContext.bulkMode());
 	}
 
 	/**
@@ -110,7 +114,7 @@ class DefaultReactiveBulkOperations implements ReactiveBulkOperations {
 		this.models.add(Mono.just(document).flatMap(it -> {
 			maybeEmitEvent(new BeforeConvertEvent<>(it, collectionName));
 			return maybeInvokeBeforeConvertCallback(it);
-		}).map(it -> new SourceAwareWriteModelHolder(it, new InsertOneModel(getMappedObject(it)))));
+		}).map(it -> new SourceAwareWriteModelHolder(it, new InsertOneModel<>(getMappedObject(it)))));
 
 		return this;
 	}
@@ -126,7 +130,6 @@ class DefaultReactiveBulkOperations implements ReactiveBulkOperations {
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
 	public ReactiveBulkOperations updateOne(Query query, UpdateDefinition update) {
 
 		Assert.notNull(query, "Query must not be null");
@@ -137,7 +140,6 @@ class DefaultReactiveBulkOperations implements ReactiveBulkOperations {
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
 	public ReactiveBulkOperations updateMulti(Query query, UpdateDefinition update) {
 
 		Assert.notNull(query, "Query must not be null");
@@ -202,11 +204,9 @@ class DefaultReactiveBulkOperations implements ReactiveBulkOperations {
 	public Mono execute() {
 
 		try {
-
-			Mono result = mongoOperations.execute(collectionName, this::bulkWriteTo).next();
-			return result;
+			return mongoOperations.execute(collectionName, this::bulkWriteTo).next();
 		} finally {
-			this.bulkOptions = getBulkWriteOptions(bulkOperationContext.getBulkMode());
+			this.bulkOptions = getBulkWriteOptions(bulkOperationContext.bulkMode());
 		}
 	}
 
@@ -216,55 +216,39 @@ class DefaultReactiveBulkOperations implements ReactiveBulkOperations {
 			collection = collection.withWriteConcern(defaultWriteConcern);
 		}
 
-		try {
+		Flux concat = Flux.concat(models).flatMap(it -> {
 
-			Flux concat = Flux.concat(models).flatMap(it -> {
-				if (it.getModel()instanceof InsertOneModel insertOneModel) {
-
-					Document target = insertOneModel.getDocument();
-					maybeEmitBeforeSaveEvent(it);
-					return maybeInvokeBeforeSaveCallback(it.getSource(), target)
-							.map(afterCallback -> new SourceAwareWriteModelHolder(afterCallback, mapWriteModel(insertOneModel)));
-				} else if (it.getModel()instanceof ReplaceOneModel replaceOneModel) {
-
-					Document target = replaceOneModel.getReplacement();
-					maybeEmitBeforeSaveEvent(it);
-					return maybeInvokeBeforeSaveCallback(it.getSource(), target)
-							.map(afterCallback -> new SourceAwareWriteModelHolder(afterCallback, mapWriteModel(replaceOneModel)));
-				}
-				return Mono.just(new SourceAwareWriteModelHolder(it.getSource(), mapWriteModel(it.getModel())));
-			});
-			MongoCollection theCollection = collection;
-			return concat.collectList().flatMap(it -> {
-
-				return Mono
-						.from(theCollection.bulkWrite(
-								it.stream().map(SourceAwareWriteModelHolder::getModel).collect(Collectors.toList()), bulkOptions))
-						.doOnSuccess(state -> {
-							it.forEach(saved -> {
-								maybeEmitAfterSaveEvent(saved);
-							});
-						}).flatMap(state -> {
-							List> monos = it.stream().map(saved -> {
-								return maybeInvokeAfterSaveCallback(saved);
-							}).collect(Collectors.toList());
-
-							return Flux.concat(monos).then(Mono.just(state));
-						});
-			});
-		} catch (RuntimeException ex) {
-
-			if (ex instanceof MongoBulkWriteException) {
-
-				MongoBulkWriteException mongoBulkWriteException = (MongoBulkWriteException) ex;
-				if (mongoBulkWriteException.getWriteConcernError() != null) {
-					throw new DataIntegrityViolationException(ex.getMessage(), ex);
-				}
-				throw new BulkOperationException(ex.getMessage(), mongoBulkWriteException);
+			if (it.model()instanceof InsertOneModel iom) {
+
+				Document target = iom.getDocument();
+				maybeEmitBeforeSaveEvent(it);
+				return maybeInvokeBeforeSaveCallback(it.source(), target)
+						.map(afterCallback -> new SourceAwareWriteModelHolder(afterCallback, mapWriteModel(afterCallback, iom)));
+			} else if (it.model()instanceof ReplaceOneModel rom) {
+
+				Document target = rom.getReplacement();
+				maybeEmitBeforeSaveEvent(it);
+				return maybeInvokeBeforeSaveCallback(it.source(), target)
+						.map(afterCallback -> new SourceAwareWriteModelHolder(afterCallback, mapWriteModel(afterCallback, rom)));
 			}
 
-			throw ex;
-		}
+			return Mono.just(new SourceAwareWriteModelHolder(it.source(), mapWriteModel(it.source(), it.model())));
+		});
+
+		MongoCollection theCollection = collection;
+		return concat.collectList().flatMap(it -> {
+
+			return Mono
+					.from(theCollection
+							.bulkWrite(it.stream().map(SourceAwareWriteModelHolder::model).collect(Collectors.toList()), bulkOptions))
+					.doOnSuccess(state -> {
+						it.forEach(this::maybeEmitAfterSaveEvent);
+					}).flatMap(state -> {
+						List> monos = it.stream().map(this::maybeInvokeAfterSaveCallback).collect(Collectors.toList());
+
+						return Flux.concat(monos).then(Mono.just(state));
+					});
+		});
 	}
 
 	/**
@@ -295,47 +279,24 @@ class DefaultReactiveBulkOperations implements ReactiveBulkOperations {
 		return this;
 	}
 
-	private WriteModel mapWriteModel(WriteModel writeModel) {
-
-		if (writeModel instanceof UpdateOneModel) {
-
-			UpdateOneModel model = (UpdateOneModel) writeModel;
-
-			return new UpdateOneModel<>(getMappedQuery(model.getFilter()), getMappedUpdate(model.getUpdate()),
-					model.getOptions());
-		}
-
-		if (writeModel instanceof UpdateManyModel) {
-
-			UpdateManyModel model = (UpdateManyModel) writeModel;
-
-			return new UpdateManyModel<>(getMappedQuery(model.getFilter()), getMappedUpdate(model.getUpdate()),
-					model.getOptions());
-		}
-
-		if (writeModel instanceof DeleteOneModel) {
-
-			DeleteOneModel model = (DeleteOneModel) writeModel;
-
-			return new DeleteOneModel<>(getMappedQuery(model.getFilter()), model.getOptions());
-		}
-
-		if (writeModel instanceof DeleteManyModel) {
-
-			DeleteManyModel model = (DeleteManyModel) writeModel;
-
-			return new DeleteManyModel<>(getMappedQuery(model.getFilter()), model.getOptions());
-		}
+	@Override
+	protected void maybeEmitEvent(ApplicationEvent event) {
+		bulkOperationContext.publishEvent(event);
+	}
 
-		return writeModel;
+	@Override
+	protected UpdateMapper updateMapper() {
+		return bulkOperationContext.updateMapper();
 	}
 
-	private Bson getMappedUpdate(Bson update) {
-		return bulkOperationContext.getUpdateMapper().getMappedObject(update, bulkOperationContext.getEntity());
+	@Override
+	protected QueryMapper queryMapper() {
+		return bulkOperationContext.queryMapper();
 	}
 
-	private Bson getMappedQuery(Bson query) {
-		return bulkOperationContext.getQueryMapper().getMappedObject(query, bulkOperationContext.getEntity());
+	@Override
+	protected Optional> entity() {
+		return bulkOperationContext.entity();
 	}
 
 	private Document getMappedObject(Object source) {
@@ -350,121 +311,30 @@ class DefaultReactiveBulkOperations implements ReactiveBulkOperations {
 		return sink;
 	}
 
-	private void maybeEmitBeforeSaveEvent(SourceAwareWriteModelHolder holder) {
-
-		if (holder.getModel() instanceof InsertOneModel) {
-
-			Document target = ((InsertOneModel) holder.getModel()).getDocument();
-			maybeEmitEvent(new BeforeSaveEvent<>(holder.getSource(), target, collectionName));
-		} else if (holder.getModel() instanceof ReplaceOneModel) {
-
-			Document target = ((ReplaceOneModel) holder.getModel()).getReplacement();
-			maybeEmitEvent(new BeforeSaveEvent<>(holder.getSource(), target, collectionName));
-		}
-	}
-
-	private void maybeEmitAfterSaveEvent(SourceAwareWriteModelHolder holder) {
-
-		if (holder.getModel() instanceof InsertOneModel) {
-
-			Document target = ((InsertOneModel) holder.getModel()).getDocument();
-			maybeEmitEvent(new AfterSaveEvent<>(holder.getSource(), target, collectionName));
-		} else if (holder.getModel() instanceof ReplaceOneModel) {
-
-			Document target = ((ReplaceOneModel) holder.getModel()).getReplacement();
-			maybeEmitEvent(new AfterSaveEvent<>(holder.getSource(), target, collectionName));
-		}
-	}
-
 	private Mono maybeInvokeAfterSaveCallback(SourceAwareWriteModelHolder holder) {
 
-		if (holder.getModel() instanceof InsertOneModel) {
+		if (holder.model() instanceof InsertOneModel) {
 
-			Document target = ((InsertOneModel) holder.getModel()).getDocument();
-			return maybeInvokeAfterSaveCallback(holder.getSource(), target);
-		} else if (holder.getModel() instanceof ReplaceOneModel) {
+			Document target = ((InsertOneModel) holder.model()).getDocument();
+			return maybeInvokeAfterSaveCallback(holder.source(), target);
+		} else if (holder.model() instanceof ReplaceOneModel) {
 
-			Document target = ((ReplaceOneModel) holder.getModel()).getReplacement();
-			return maybeInvokeAfterSaveCallback(holder.getSource(), target);
+			Document target = ((ReplaceOneModel) holder.model()).getReplacement();
+			return maybeInvokeAfterSaveCallback(holder.source(), target);
 		}
-		return Mono.just(holder.getSource());
-	}
-
-	private , T> E maybeEmitEvent(E event) {
-
-		if (bulkOperationContext.getEventPublisher() == null) {
-			return event;
-		}
-
-		bulkOperationContext.getEventPublisher().publishEvent(event);
-		return event;
+		return Mono.just(holder.source());
 	}
 
 	private Mono maybeInvokeBeforeConvertCallback(Object value) {
-
-		if (bulkOperationContext.getEntityCallbacks() == null) {
-			return Mono.just(value);
-		}
-
-		return bulkOperationContext.getEntityCallbacks().callback(ReactiveBeforeConvertCallback.class, value,
-				collectionName);
+		return bulkOperationContext.callback(ReactiveBeforeConvertCallback.class, value, collectionName);
 	}
 
 	private Mono maybeInvokeBeforeSaveCallback(Object value, Document mappedDocument) {
-
-		if (bulkOperationContext.getEntityCallbacks() == null) {
-			return Mono.just(value);
-		}
-
-		return bulkOperationContext.getEntityCallbacks().callback(ReactiveBeforeSaveCallback.class, value, mappedDocument,
-				collectionName);
+		return bulkOperationContext.callback(ReactiveBeforeSaveCallback.class, value, mappedDocument, collectionName);
 	}
 
 	private Mono maybeInvokeAfterSaveCallback(Object value, Document mappedDocument) {
-
-		if (bulkOperationContext.getEntityCallbacks() == null) {
-			return Mono.just(value);
-		}
-
-		return bulkOperationContext.getEntityCallbacks().callback(ReactiveAfterSaveCallback.class, value, mappedDocument,
-				collectionName);
-	}
-
-	private static BulkWriteOptions getBulkWriteOptions(BulkMode bulkMode) {
-
-		BulkWriteOptions options = new BulkWriteOptions();
-
-		switch (bulkMode) {
-			case ORDERED:
-				return options.ordered(true);
-			case UNORDERED:
-				return options.ordered(false);
-		}
-
-		throw new IllegalStateException("BulkMode was null");
-	}
-
-	/**
-	 * @param filterQuery The {@link Query} to read a potential {@link Collation} from. Must not be {@literal null}.
-	 * @param update The {@link Update} to apply
-	 * @param upsert flag to indicate if document should be upserted.
-	 * @return new instance of {@link UpdateOptions}.
-	 */
-	private static UpdateOptions computeUpdateOptions(Query filterQuery, UpdateDefinition update, boolean upsert) {
-
-		UpdateOptions options = new UpdateOptions();
-		options.upsert(upsert);
-
-		if (update.hasArrayFilters()) {
-			List list = new ArrayList<>(update.getArrayFilters().size());
-			for (ArrayFilter arrayFilter : update.getArrayFilters()) {
-				list.add(arrayFilter.asDocument());
-			}
-			options.arrayFilters(list);
-		}
-
-		filterQuery.getCollation().map(Collation::toMongoCollation).ifPresent(options::collation);
-		return options;
+		return bulkOperationContext.callback(ReactiveAfterSaveCallback.class, value, mappedDocument, collectionName);
 	}
 
 	/**
@@ -474,118 +344,47 @@ class DefaultReactiveBulkOperations implements ReactiveBulkOperations {
 	 * @author Christoph Strobl
 	 * @since 2.0
 	 */
-	static final class ReactiveBulkOperationContext {
-
-		private final BulkMode bulkMode;
-		private final Optional> entity;
-		private final QueryMapper queryMapper;
-		private final UpdateMapper updateMapper;
-		private final ApplicationEventPublisher eventPublisher;
-		private final ReactiveEntityCallbacks entityCallbacks;
-
-		ReactiveBulkOperationContext(BulkMode bulkMode, Optional> entity,
-				QueryMapper queryMapper, UpdateMapper updateMapper, ApplicationEventPublisher eventPublisher,
-				ReactiveEntityCallbacks entityCallbacks) {
-
-			this.bulkMode = bulkMode;
-			this.entity = entity;
-			this.queryMapper = queryMapper;
-			this.updateMapper = updateMapper;
-			this.eventPublisher = eventPublisher;
-			this.entityCallbacks = entityCallbacks;
-		}
+	record ReactiveBulkOperationContext(BulkMode bulkMode, Optional> entity,
+			QueryMapper queryMapper, UpdateMapper updateMapper, @Nullable ApplicationEventPublisher eventPublisher,
+			@Nullable ReactiveEntityCallbacks entityCallbacks) {
 
-		public BulkMode getBulkMode() {
-			return this.bulkMode;
+		public boolean skipEntityCallbacks() {
+			return entityCallbacks == null;
 		}
 
-		public Optional> getEntity() {
-			return this.entity;
+		public boolean skipEventPublishing() {
+			return eventPublisher == null;
 		}
 
-		public QueryMapper getQueryMapper() {
-			return this.queryMapper;
-		}
-
-		public UpdateMapper getUpdateMapper() {
-			return this.updateMapper;
-		}
+		@SuppressWarnings("rawtypes")
+		public  Mono callback(Class callbackType, T entity, String collectionName) {
 
-		public ApplicationEventPublisher getEventPublisher() {
-			return this.eventPublisher;
-		}
+			if (skipEntityCallbacks()) {
+				return Mono.just(entity);
+			}
 
-		public ReactiveEntityCallbacks getEntityCallbacks() {
-			return this.entityCallbacks;
+			return entityCallbacks.callback(callbackType, entity, collectionName);
 		}
 
-		@Override
-		public boolean equals(@Nullable Object o) {
-			if (this == o)
-				return true;
-			if (o == null || getClass() != o.getClass())
-				return false;
-
-			ReactiveBulkOperationContext that = (ReactiveBulkOperationContext) o;
+		@SuppressWarnings("rawtypes")
+		public  Mono callback(Class callbackType, T entity, Document document,
+				String collectionName) {
 
-			if (bulkMode != that.bulkMode)
-				return false;
-			if (!ObjectUtils.nullSafeEquals(this.entity, that.entity)) {
-				return false;
+			if (skipEntityCallbacks()) {
+				return Mono.just(entity);
 			}
-			if (!ObjectUtils.nullSafeEquals(this.queryMapper, that.queryMapper)) {
-				return false;
-			}
-			if (!ObjectUtils.nullSafeEquals(this.updateMapper, that.updateMapper)) {
-				return false;
-			}
-			if (!ObjectUtils.nullSafeEquals(this.eventPublisher, that.eventPublisher)) {
-				return false;
-			}
-			return ObjectUtils.nullSafeEquals(this.entityCallbacks, that.entityCallbacks);
-		}
-
-		@Override
-		public int hashCode() {
-			int result = bulkMode != null ? bulkMode.hashCode() : 0;
-			result = 31 * result + ObjectUtils.nullSafeHashCode(entity);
-			result = 31 * result + ObjectUtils.nullSafeHashCode(queryMapper);
-			result = 31 * result + ObjectUtils.nullSafeHashCode(updateMapper);
-			result = 31 * result + ObjectUtils.nullSafeHashCode(eventPublisher);
-			result = 31 * result + ObjectUtils.nullSafeHashCode(entityCallbacks);
-			return result;
-		}
 
-		public String toString() {
-			return "DefaultBulkOperations.BulkOperationContext(bulkMode=" + this.getBulkMode() + ", entity="
-					+ this.getEntity() + ", queryMapper=" + this.getQueryMapper() + ", updateMapper=" + this.getUpdateMapper()
-					+ ", eventPublisher=" + this.getEventPublisher() + ", entityCallbacks=" + this.getEntityCallbacks() + ")";
+			return entityCallbacks.callback(callbackType, entity, document, collectionName);
 		}
-	}
-
-	/**
-	 * Value object chaining together an actual source with its {@link WriteModel} representation.
-	 *
-	 * @since 4.1
-	 * @author Christoph Strobl
-	 */
-	private static final class SourceAwareWriteModelHolder {
 
-		private final Object source;
-		private final WriteModel model;
+		public void publishEvent(ApplicationEvent event) {
 
-		SourceAwareWriteModelHolder(Object source, WriteModel model) {
-
-			this.source = source;
-			this.model = model;
-		}
-
-		public Object getSource() {
-			return this.source;
-		}
+			if (skipEventPublishing()) {
+				return;
+			}
 
-		public WriteModel getModel() {
-			return this.model;
+			eventPublisher.publishEvent(event);
 		}
 	}
+
 }
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveBulkOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveBulkOperations.java
index b477c98fe..92d81f49f 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveBulkOperations.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveBulkOperations.java
@@ -15,21 +15,29 @@
  */
 package org.springframework.data.mongodb.core;
 
+import reactor.core.publisher.Mono;
+
 import java.util.List;
 
 import org.springframework.data.mongodb.core.query.Query;
-import org.springframework.data.mongodb.core.query.Update;
 import org.springframework.data.mongodb.core.query.UpdateDefinition;
-import org.springframework.data.util.Pair;
 
 import com.mongodb.bulk.BulkWriteResult;
-import reactor.core.publisher.Mono;
 
 /**
  * Bulk operations for insert/update/remove actions on a collection. Bulk operations are available since MongoDB 2.6 and
  * make use of low level bulk commands on the protocol level. This interface defines a fluent API to add multiple single
  * operations or list of similar operations in sequence which can then eventually be executed by calling
  * {@link #execute()}.
+ *
+ * 
+ * ReactiveMongoOperations ops = …;
+ *
+ * ops.bulkOps(BulkMode.UNORDERED, Person.class)
+ * 				.insert(newPerson)
+ * 				.updateOne(where("firstname").is("Joe"), Update.update("lastname", "Doe"))
+ * 				.execute();
+ * 
*

* Bulk operations are issued as one batch that pulls together all insert, update, and delete operations. Operations * that require individual operation results such as optimistic locking (using {@code @Version}) are not supported and diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java index 20fb2fdf9..9f391bb79 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java @@ -350,6 +350,40 @@ public interface ReactiveMongoOperations extends ReactiveFluentMongoOperations { */ Mono dropCollection(String collectionName); + /** + * Returns a new {@link ReactiveBulkOperations} for the given collection.
+ * NOTE: Any additional support for field mapping, etc. is not available for {@literal update} or + * {@literal remove} operations in bulk mode due to the lack of domain type information. Use + * {@link #bulkOps(BulkMode, Class, String)} to get full type specific support. + * + * @param mode the {@link BulkMode} to use for bulk operations, must not be {@literal null}. + * @param collectionName the name of the collection to work on, must not be {@literal null} or empty. + * @return {@link ReactiveBulkOperations} on the named collection + * @since 4.1 + */ + ReactiveBulkOperations bulkOps(BulkMode mode, String collectionName); + + /** + * Returns a new {@link ReactiveBulkOperations} for the given entity type. + * + * @param mode the {@link BulkMode} to use for bulk operations, must not be {@literal null}. + * @param entityClass the name of the entity class, must not be {@literal null}. + * @return {@link ReactiveBulkOperations} on the named collection associated of the given entity class. + * @since 4.1 + */ + ReactiveBulkOperations bulkOps(BulkMode mode, Class entityClass); + + /** + * Returns a new {@link ReactiveBulkOperations} for the given entity type and collection name. + * + * @param mode the {@link BulkMode} to use for bulk operations, must not be {@literal null}. + * @param entityType the name of the entity class. Can be {@literal null}. + * @param collectionName the name of the collection to work on, must not be {@literal null} or empty. + * @return {@link ReactiveBulkOperations} on the named collection associated with the given entity class. + * @since 4.1 + */ + ReactiveBulkOperations bulkOps(BulkMode mode, @Nullable Class entityType, String collectionName); + /** * Query for a {@link Flux} of objects of type T from the collection used by the entity class.
* The object is converted from the MongoDB native representation using an instance of {@see MongoConverter}. Unless @@ -1748,39 +1782,6 @@ public interface ReactiveMongoOperations extends ReactiveFluentMongoOperations { Flux mapReduce(Query filterQuery, Class domainType, String inputCollectionName, Class resultType, String mapFunction, String reduceFunction, MapReduceOptions options); - /** - * Returns a new {@link ReactiveBulkOperations} for the given collection.
- * NOTE: Any additional support for field mapping, etc. is not available for {@literal update} or - * {@literal remove} operations in bulk mode due to the lack of domain type information. Use - * {@link #bulkOps(BulkMode, Class, String)} to get full type specific support. - * - * @param mode the {@link BulkMode} to use for bulk operations, must not be {@literal null}. - * @param collectionName the name of the collection to work on, must not be {@literal null} or empty. - * @return {@link ReactiveBulkOperations} on the named collection - * @since 4.1 - */ - ReactiveBulkOperations bulkOps(BulkMode mode, String collectionName); - - /** - * Returns a new {@link ReactiveBulkOperations} for the given entity type. - * - * @param mode the {@link BulkMode} to use for bulk operations, must not be {@literal null}. - * @param entityClass the name of the entity class, must not be {@literal null}. - * @return {@link ReactiveBulkOperations} on the named collection associated of the given entity class. - * @since 4.1 - */ - ReactiveBulkOperations bulkOps(BulkMode mode, Class entityClass); - - /** - * Returns a new {@link ReactiveBulkOperations} for the given entity type and collection name. - * - * @param mode the {@link BulkMode} to use for bulk operations, must not be {@literal null}. - * @param entityType the name of the entity class. Can be {@literal null}. - * @param collectionName the name of the collection to work on, must not be {@literal null} or empty. - * @return {@link ReactiveBulkOperations} on the named collection associated with the given entity class. - * @since 4.1 - */ - ReactiveBulkOperations bulkOps(BulkMode mode, @Nullable Class entityType, String collectionName); /** * Returns the underlying {@link MongoConverter}. diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java index 63543dad5..abf9e2ea1 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java @@ -17,8 +17,6 @@ package org.springframework.data.mongodb.core; import static org.springframework.data.mongodb.core.query.SerializationUtils.*; -import org.springframework.data.mongodb.core.BulkOperations.BulkMode; -import org.springframework.data.mongodb.core.DefaultReactiveBulkOperations.ReactiveBulkOperationContext; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; @@ -74,7 +72,9 @@ import org.springframework.data.mongodb.MongoDatabaseFactory; import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory; import org.springframework.data.mongodb.ReactiveMongoDatabaseUtils; import org.springframework.data.mongodb.SessionSynchronization; +import org.springframework.data.mongodb.core.BulkOperations.BulkMode; import org.springframework.data.mongodb.core.CollectionPreparerSupport.ReactiveCollectionPreparerDelegate; +import org.springframework.data.mongodb.core.DefaultReactiveBulkOperations.ReactiveBulkOperationContext; import org.springframework.data.mongodb.core.EntityOperations.AdaptibleEntity; import org.springframework.data.mongodb.core.QueryOperations.AggregationDefinition; import org.springframework.data.mongodb.core.QueryOperations.CountContext; @@ -481,31 +481,6 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati return new DefaultReactiveIndexOperations(this, getCollectionName(entityClass), this.queryMapper, entityClass); } - @Override - public ReactiveBulkOperations bulkOps(BulkMode bulkMode, String collectionName) { - return bulkOps(bulkMode, null, collectionName); - } - - @Override - public ReactiveBulkOperations bulkOps(BulkMode bulkMode, Class entityClass) { - return bulkOps(bulkMode, entityClass, getCollectionName(entityClass)); - } - - @Override - public ReactiveBulkOperations bulkOps(BulkMode mode, @Nullable Class entityType, String collectionName) { - - Assert.notNull(mode, "BulkMode must not be null"); - Assert.hasText(collectionName, "Collection name must not be null or empty"); - - DefaultReactiveBulkOperations operations = new DefaultReactiveBulkOperations(this, collectionName, - new ReactiveBulkOperationContext(mode, Optional.ofNullable(getPersistentEntity(entityType)), queryMapper, updateMapper, - eventPublisher, entityCallbacks)); - - operations.setDefaultWriteConcern(writeConcern); - - return operations; - } - @Override public String getCollectionName(Class entityClass) { return operations.determineCollectionName(entityClass); @@ -763,7 +738,6 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati public Mono dropCollection(Class entityClass) { return dropCollection(getCollectionName(entityClass)); } - @Override public Mono dropCollection(String collectionName) { @@ -774,6 +748,31 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati }).then(); } + @Override + public ReactiveBulkOperations bulkOps(BulkMode mode, String collectionName) { + return bulkOps(mode, null, collectionName); + } + + @Override + public ReactiveBulkOperations bulkOps(BulkMode mode, Class entityClass) { + return bulkOps(mode, entityClass, getCollectionName(entityClass)); + } + + @Override + public ReactiveBulkOperations bulkOps(BulkMode mode, @Nullable Class entityType, String collectionName) { + + Assert.notNull(mode, "BulkMode must not be null"); + Assert.hasText(collectionName, "Collection name must not be null or empty"); + + DefaultReactiveBulkOperations operations = new DefaultReactiveBulkOperations(this, collectionName, + new ReactiveBulkOperationContext(mode, Optional.ofNullable(getPersistentEntity(entityType)), queryMapper, + updateMapper, eventPublisher, entityCallbacks)); + + operations.setDefaultWriteConcern(writeConcern); + + return operations; + } + @Override public Flux getCollectionNames() { return createFlux(MongoDatabase::listCollectionNames); diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperationsTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperationsTests.java index c17f76fae..43d593556 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperationsTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperationsTests.java @@ -51,6 +51,8 @@ import com.mongodb.WriteConcern; import com.mongodb.bulk.BulkWriteResult; /** + * Tests for {@link DefaultReactiveBulkOperations}. + * * @author Christoph Strobl */ @ExtendWith(MongoTemplateExtension.class) @@ -74,7 +76,7 @@ class DefaultReactiveBulkOperationsTests { .execute().as(StepVerifier::create) // .consumeNextWith(result -> { assertThat(result.getInsertedCount()).isEqualTo(2); - }); + }).verifyComplete(); } @Test // GH-2821 @@ -98,7 +100,7 @@ class DefaultReactiveBulkOperationsTests { .execute().as(StepVerifier::create) // .consumeNextWith(result -> { assertThat(result.getInsertedCount()).isEqualTo(2); - }); + }).verifyComplete(); } @Test // GH-2821