|
|
|
|
@ -15,9 +15,6 @@
@@ -15,9 +15,6 @@
|
|
|
|
|
*/ |
|
|
|
|
package org.springframework.data.mongodb.core; |
|
|
|
|
|
|
|
|
|
import lombok.NonNull; |
|
|
|
|
import lombok.Value; |
|
|
|
|
|
|
|
|
|
import java.util.ArrayList; |
|
|
|
|
import java.util.Collections; |
|
|
|
|
import java.util.List; |
|
|
|
|
@ -46,8 +43,13 @@ import org.springframework.lang.Nullable;
@@ -46,8 +43,13 @@ import org.springframework.lang.Nullable;
|
|
|
|
|
import org.springframework.util.Assert; |
|
|
|
|
|
|
|
|
|
import com.mongodb.WriteConcern; |
|
|
|
|
import com.mongodb.bulk.BulkWriteResult; |
|
|
|
|
import com.mongodb.client.MongoCollection; |
|
|
|
|
import com.mongodb.client.model.*; |
|
|
|
|
|
|
|
|
|
import lombok.NonNull; |
|
|
|
|
import lombok.Value; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Default implementation for {@link BulkOperations}. |
|
|
|
|
* |
|
|
|
|
@ -56,6 +58,7 @@ import com.mongodb.client.model.*;
@@ -56,6 +58,7 @@ import com.mongodb.client.model.*;
|
|
|
|
|
* @author Christoph Strobl |
|
|
|
|
* @author Mark Paluch |
|
|
|
|
* @author Minsu Kim |
|
|
|
|
* @author Jens Schauder |
|
|
|
|
* @since 1.9 |
|
|
|
|
*/ |
|
|
|
|
class DefaultBulkOperations implements BulkOperations { |
|
|
|
|
@ -289,32 +292,44 @@ class DefaultBulkOperations implements BulkOperations {
@@ -289,32 +292,44 @@ class DefaultBulkOperations implements BulkOperations {
|
|
|
|
|
|
|
|
|
|
try { |
|
|
|
|
|
|
|
|
|
com.mongodb.bulk.BulkWriteResult result = mongoOperations.execute(collectionName, collection -> { |
|
|
|
|
return collection.bulkWrite(models.stream().map(it -> { |
|
|
|
|
com.mongodb.bulk.BulkWriteResult result = mongoOperations.execute(collectionName, this::bulkWriteTo); |
|
|
|
|
|
|
|
|
|
maybeEmitBeforeSaveEvent(it); |
|
|
|
|
Assert.state(result != null, "Result must not be null."); |
|
|
|
|
|
|
|
|
|
if (it.getModel() instanceof InsertOneModel) { |
|
|
|
|
models.forEach(this::maybeEmitAfterSaveEvent); |
|
|
|
|
|
|
|
|
|
Document target = ((InsertOneModel<Document>) it.getModel()).getDocument(); |
|
|
|
|
maybeInvokeBeforeSaveCallback(it.getSource(), target); |
|
|
|
|
} else if (it.getModel() instanceof ReplaceOneModel) { |
|
|
|
|
|
|
|
|
|
Document target = ((ReplaceOneModel<Document>) it.getModel()).getReplacement(); |
|
|
|
|
maybeInvokeBeforeSaveCallback(it.getSource(), target); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return mapWriteModel(it.getModel()); |
|
|
|
|
}).collect(Collectors.toList()), bulkOptions); |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
models.stream().forEach(this::maybeEmitAfterSaveEvent); |
|
|
|
|
return result; |
|
|
|
|
} finally { |
|
|
|
|
this.bulkOptions = getBulkWriteOptions(bulkOperationContext.getBulkMode()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private BulkWriteResult bulkWriteTo(MongoCollection<Document> collection) { |
|
|
|
|
|
|
|
|
|
return collection.bulkWrite( //
|
|
|
|
|
models.stream() //
|
|
|
|
|
.map(this::extractAndMapWriteModel) //
|
|
|
|
|
.collect(Collectors.toList()), //
|
|
|
|
|
bulkOptions); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private WriteModel<Document> extractAndMapWriteModel(SourceAwareWriteModelHolder it) { |
|
|
|
|
|
|
|
|
|
maybeEmitBeforeSaveEvent(it); |
|
|
|
|
|
|
|
|
|
if (it.getModel() instanceof InsertOneModel) { |
|
|
|
|
|
|
|
|
|
Document target = ((InsertOneModel<Document>) it.getModel()).getDocument(); |
|
|
|
|
maybeInvokeBeforeSaveCallback(it.getSource(), target); |
|
|
|
|
} else if (it.getModel() instanceof ReplaceOneModel) { |
|
|
|
|
|
|
|
|
|
Document target = ((ReplaceOneModel<Document>) it.getModel()).getReplacement(); |
|
|
|
|
maybeInvokeBeforeSaveCallback(it.getSource(), target); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return mapWriteModel(it.getModel()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Performs update and upsert bulk operations. |
|
|
|
|
* |
|
|
|
|
|