30 changed files with 3641 additions and 89 deletions
@ -0,0 +1,35 @@
@@ -0,0 +1,35 @@
|
||||
/* |
||||
* Copyright 2026-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb; |
||||
|
||||
/** |
||||
* Interface that can provide access to a MongoDB cluster. |
||||
* |
||||
* @param <T> the MongoDB cluster/client type (e.g. {@link com.mongodb.client.MongoCluster} or |
||||
* {@link com.mongodb.reactivestreams.client.MongoCluster}). |
||||
* @author Christoph Strobl |
||||
* @since 5.1 |
||||
*/ |
||||
public interface MongoClusterCapable<T> { |
||||
|
||||
/** |
||||
* Returns the MongoDB cluster used by this factory. |
||||
* |
||||
* @return the cluster; never {@literal null}. |
||||
* @throws IllegalStateException if cluster cannot be obtained. |
||||
*/ |
||||
T getMongoCluster(); |
||||
} |
||||
@ -0,0 +1,106 @@
@@ -0,0 +1,106 @@
|
||||
/* |
||||
* Copyright 2026-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb.core; |
||||
|
||||
import java.util.List; |
||||
|
||||
import com.mongodb.client.model.bulk.ClientDeleteOneOptions; |
||||
import com.mongodb.client.model.bulk.ClientReplaceOneOptions; |
||||
import org.bson.Document; |
||||
|
||||
import com.mongodb.MongoNamespace; |
||||
import com.mongodb.client.model.DeleteOptions; |
||||
import com.mongodb.client.model.UpdateOptions; |
||||
import com.mongodb.client.model.bulk.ClientDeleteManyOptions; |
||||
import com.mongodb.client.model.bulk.ClientNamespacedWriteModel; |
||||
import com.mongodb.client.model.bulk.ClientUpdateManyOptions; |
||||
import com.mongodb.client.model.bulk.ClientUpdateOneOptions; |
||||
|
||||
/** |
||||
* @author Christoph Strobl |
||||
*/ |
||||
abstract class BulkWriteSupport { |
||||
|
||||
static ClientNamespacedWriteModel updateMany(MongoNamespace namespace, Document query, Object update, |
||||
UpdateOptions updateOptions) { |
||||
|
||||
ClientUpdateManyOptions updateManyOptions = ClientUpdateManyOptions.clientUpdateManyOptions(); |
||||
updateManyOptions.arrayFilters(updateOptions.getArrayFilters()); |
||||
updateManyOptions.collation(updateOptions.getCollation()); |
||||
updateManyOptions.upsert(updateOptions.isUpsert()); |
||||
updateManyOptions.hint(updateOptions.getHint()); |
||||
updateManyOptions.hintString(updateOptions.getHintString()); |
||||
|
||||
if (update instanceof List<?> pipeline) { |
||||
return ClientNamespacedWriteModel.updateMany(namespace, query, (List<Document>) pipeline, updateManyOptions); |
||||
} else { |
||||
return ClientNamespacedWriteModel.updateMany(namespace, query, (Document) update, updateManyOptions); |
||||
} |
||||
} |
||||
|
||||
static ClientNamespacedWriteModel updateOne(MongoNamespace namespace, Document query, Object update, |
||||
UpdateOptions updateOptions) { |
||||
|
||||
ClientUpdateOneOptions updateOneOptions = ClientUpdateOneOptions.clientUpdateOneOptions(); |
||||
updateOneOptions.sort(updateOptions.getSort()); |
||||
updateOneOptions.arrayFilters(updateOptions.getArrayFilters()); |
||||
updateOneOptions.collation(updateOptions.getCollation()); |
||||
updateOneOptions.upsert(updateOptions.isUpsert()); |
||||
updateOneOptions.hint(updateOptions.getHint()); |
||||
updateOneOptions.hintString(updateOptions.getHintString()); |
||||
|
||||
if (update instanceof List<?> pipeline) { |
||||
return ClientNamespacedWriteModel.updateOne(namespace, query, (List<Document>) pipeline, updateOneOptions); |
||||
} else { |
||||
return ClientNamespacedWriteModel.updateOne(namespace, query, (Document) update, updateOneOptions); |
||||
} |
||||
} |
||||
|
||||
static ClientNamespacedWriteModel removeMany(MongoNamespace namespace, Document query, DeleteOptions deleteOptions) { |
||||
|
||||
ClientDeleteManyOptions clientDeleteManyOptions = ClientDeleteManyOptions.clientDeleteManyOptions(); |
||||
clientDeleteManyOptions.collation(deleteOptions.getCollation()); |
||||
clientDeleteManyOptions.hint(deleteOptions.getHint()); |
||||
clientDeleteManyOptions.hintString(deleteOptions.getHintString()); |
||||
|
||||
return ClientNamespacedWriteModel.deleteMany(namespace, query, clientDeleteManyOptions); |
||||
} |
||||
|
||||
static ClientNamespacedWriteModel removeOne(MongoNamespace namespace, Document query, DeleteOptions deleteOptions) { |
||||
|
||||
ClientDeleteOneOptions clientDeleteOneOptions = ClientDeleteOneOptions.clientDeleteOneOptions(); |
||||
// TODO: open an issue with MongoDB to enable sort for deleteOne
|
||||
clientDeleteOneOptions.collation(deleteOptions.getCollation()); |
||||
clientDeleteOneOptions.hint(deleteOptions.getHint()); |
||||
clientDeleteOneOptions.hintString(deleteOptions.getHintString()); |
||||
|
||||
|
||||
return ClientNamespacedWriteModel.deleteOne(namespace, query, clientDeleteOneOptions); |
||||
} |
||||
|
||||
static ClientNamespacedWriteModel replaceOne(MongoNamespace namespace, Document query, Document replacement, UpdateOptions updateOptions) { |
||||
|
||||
ClientReplaceOneOptions replaceOptions = ClientReplaceOneOptions.clientReplaceOneOptions(); |
||||
replaceOptions.sort(updateOptions.getSort()); |
||||
replaceOptions.upsert(updateOptions.isUpsert()); |
||||
replaceOptions.hint(updateOptions.getHint()); |
||||
replaceOptions.hintString(updateOptions.getHintString()); |
||||
replaceOptions.collation(updateOptions.getCollation()); |
||||
|
||||
return ClientNamespacedWriteModel.replaceOne(namespace, query, |
||||
replacement, replaceOptions); |
||||
} |
||||
} |
||||
@ -0,0 +1,149 @@
@@ -0,0 +1,149 @@
|
||||
/* |
||||
* Copyright 2026-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb.core; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.List; |
||||
|
||||
import org.bson.Document; |
||||
import org.springframework.dao.DataAccessException; |
||||
import org.springframework.data.mongodb.core.MongoTemplate.SourceAwareDocument; |
||||
import org.springframework.data.mongodb.core.QueryOperations.DeleteContext; |
||||
import org.springframework.data.mongodb.core.QueryOperations.UpdateContext; |
||||
import org.springframework.data.mongodb.core.bulk.Bulk; |
||||
import org.springframework.data.mongodb.core.bulk.BulkWriteOptions.Order; |
||||
import org.springframework.data.mongodb.core.bulk.BulkWriteOptions; |
||||
import org.springframework.data.mongodb.core.bulk.BulkOperation; |
||||
import org.springframework.data.mongodb.core.bulk.BulkOperation.Insert; |
||||
import org.springframework.data.mongodb.core.bulk.BulkOperation.Remove; |
||||
import org.springframework.data.mongodb.core.bulk.BulkOperation.RemoveFirst; |
||||
import org.springframework.data.mongodb.core.bulk.BulkOperation.Replace; |
||||
import org.springframework.data.mongodb.core.bulk.BulkOperation.Update; |
||||
import org.springframework.data.mongodb.core.bulk.BulkOperation.UpdateFirst; |
||||
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity; |
||||
import org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent; |
||||
|
||||
import com.mongodb.MongoBulkWriteException; |
||||
import com.mongodb.MongoNamespace; |
||||
import com.mongodb.client.model.DeleteOptions; |
||||
import com.mongodb.client.model.UpdateOptions; |
||||
import com.mongodb.client.model.bulk.ClientBulkWriteOptions; |
||||
import com.mongodb.client.model.bulk.ClientBulkWriteResult; |
||||
import com.mongodb.client.model.bulk.ClientNamespacedWriteModel; |
||||
|
||||
/** |
||||
* Internal API wrapping a {@link MongoTemplate} to encapsulate {@link Bulk} handling. |
||||
* |
||||
* @author Christoph Strobl |
||||
* @since 2026/02 |
||||
*/ |
||||
class BulkWriter { |
||||
|
||||
MongoTemplate template; |
||||
|
||||
BulkWriter(MongoTemplate template) { |
||||
this.template = template; |
||||
} |
||||
|
||||
public ClientBulkWriteResult write(String defaultDatabase, BulkWriteOptions options, Bulk bulk) { |
||||
|
||||
List<ClientNamespacedWriteModel> writeModels = new ArrayList<>(); |
||||
List<SourceAwareDocument<Object>> afterSaveCallables = new ArrayList<>(); |
||||
|
||||
for (BulkOperation bulkOp : bulk.operations()) { |
||||
|
||||
String collectionName = bulkOp.context().namespace().collection() != null |
||||
? bulkOp.context().namespace().collection() |
||||
: template.getCollectionName(bulkOp.context().namespace().type()); |
||||
|
||||
MongoNamespace mongoNamespace = new MongoNamespace(defaultDatabase, collectionName); |
||||
if (bulkOp instanceof Insert insert) { |
||||
|
||||
SourceAwareDocument<Object> sourceAwareDocument = template.prepareObjectForSave(collectionName, insert.value(), |
||||
template.getConverter()); |
||||
writeModels.add(ClientNamespacedWriteModel.insertOne(mongoNamespace, sourceAwareDocument.document())); |
||||
afterSaveCallables.add(sourceAwareDocument); |
||||
} else if (bulkOp instanceof Update update) { |
||||
|
||||
Class<?> domainType = update.context().namespace().type(); |
||||
boolean multi = !(bulkOp instanceof UpdateFirst); |
||||
|
||||
UpdateContext updateContext = template.getQueryOperations().updateContext(update.update(), update.query(), |
||||
update.upsert()); |
||||
MongoPersistentEntity<?> entity = template.getPersistentEntity(domainType); |
||||
|
||||
Document mappedQuery = updateContext.getMappedQuery(entity); |
||||
Object mappedUpdate = updateContext.isAggregationUpdate() ? updateContext.getUpdatePipeline(domainType) |
||||
: updateContext.getMappedUpdate(entity); |
||||
UpdateOptions updateOptions = updateContext.getUpdateOptions(domainType, update.query()); |
||||
|
||||
if (multi) { |
||||
writeModels.add(BulkWriteSupport.updateMany(mongoNamespace, mappedQuery, mappedUpdate, updateOptions)); |
||||
} else { |
||||
writeModels.add(BulkWriteSupport.updateOne(mongoNamespace, mappedQuery, mappedUpdate, updateOptions)); |
||||
} |
||||
} else if (bulkOp instanceof Remove remove) { |
||||
|
||||
Class<?> domainType = remove.context().namespace().type(); |
||||
DeleteContext deleteContext = template.getQueryOperations().deleteQueryContext(remove.query()); |
||||
|
||||
Document mappedQuery = deleteContext.getMappedQuery(template.getPersistentEntity(domainType)); |
||||
DeleteOptions deleteOptions = deleteContext.getDeleteOptions(domainType); |
||||
|
||||
if (remove instanceof RemoveFirst) { |
||||
writeModels.add(BulkWriteSupport.removeOne(mongoNamespace, mappedQuery, deleteOptions)); |
||||
} else { |
||||
writeModels.add(BulkWriteSupport.removeMany(mongoNamespace, mappedQuery, deleteOptions)); |
||||
} |
||||
} else if (bulkOp instanceof Replace replace) { |
||||
|
||||
Class<?> domainType = replace.context().namespace().type(); |
||||
|
||||
SourceAwareDocument<Object> sourceAwareDocument = template.prepareObjectForSave(collectionName, |
||||
replace.replacement(), template.getConverter()); |
||||
|
||||
UpdateContext updateContext = template.getQueryOperations().replaceSingleContext(replace.query(), |
||||
MappedDocument.of(sourceAwareDocument.document()), replace.upsert()); |
||||
|
||||
Document mappedQuery = updateContext.getMappedQuery(template.getPersistentEntity(domainType)); |
||||
UpdateOptions updateOptions = updateContext.getUpdateOptions(domainType, replace.query()); |
||||
|
||||
writeModels.add( |
||||
BulkWriteSupport.replaceOne(mongoNamespace, mappedQuery, sourceAwareDocument.document(), updateOptions)); |
||||
afterSaveCallables.add(sourceAwareDocument); |
||||
} |
||||
} |
||||
|
||||
try { |
||||
|
||||
ClientBulkWriteResult clientBulkWriteResult = template.doWithClient(client -> client.bulkWrite(writeModels, |
||||
ClientBulkWriteOptions.clientBulkWriteOptions().ordered(options.getOrder().equals(BulkWriteOptions.Order.ORDERED)))); |
||||
|
||||
afterSaveCallables.forEach(callable -> { |
||||
template |
||||
.maybeEmitEvent(new AfterSaveEvent<>(callable.source(), callable.document(), callable.collectionName())); |
||||
template.maybeCallAfterSave(callable.source(), callable.document(), callable.collectionName()); |
||||
}); |
||||
return clientBulkWriteResult; |
||||
} catch (MongoBulkWriteException e) { |
||||
DataAccessException dataAccessException = template.getExceptionTranslator().translateExceptionIfPossible(e); |
||||
if (dataAccessException != null) { |
||||
throw dataAccessException; |
||||
} |
||||
throw e; |
||||
} |
||||
} |
||||
} |
||||
@ -0,0 +1,175 @@
@@ -0,0 +1,175 @@
|
||||
/* |
||||
* Copyright 2026-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb.core; |
||||
|
||||
import org.springframework.data.mongodb.core.bulk.BulkWriteOptions; |
||||
import reactor.core.publisher.Flux; |
||||
import reactor.core.publisher.Mono; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.List; |
||||
|
||||
import org.bson.Document; |
||||
import org.springframework.data.mongodb.core.QueryOperations.DeleteContext; |
||||
import org.springframework.data.mongodb.core.QueryOperations.UpdateContext; |
||||
import org.springframework.data.mongodb.core.ReactiveMongoTemplate.SourceAwareDocument; |
||||
import org.springframework.data.mongodb.core.bulk.Bulk; |
||||
import org.springframework.data.mongodb.core.bulk.BulkWriteOptions.Order; |
||||
import org.springframework.data.mongodb.core.bulk.BulkOperation; |
||||
import org.springframework.data.mongodb.core.bulk.BulkOperation.Insert; |
||||
import org.springframework.data.mongodb.core.bulk.BulkOperation.Remove; |
||||
import org.springframework.data.mongodb.core.bulk.BulkOperation.RemoveFirst; |
||||
import org.springframework.data.mongodb.core.bulk.BulkOperation.Replace; |
||||
import org.springframework.data.mongodb.core.bulk.BulkOperation.Update; |
||||
import org.springframework.data.mongodb.core.bulk.BulkOperation.UpdateFirst; |
||||
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity; |
||||
import org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent; |
||||
|
||||
import com.mongodb.MongoNamespace; |
||||
import com.mongodb.client.model.DeleteOptions; |
||||
import com.mongodb.client.model.UpdateOptions; |
||||
import com.mongodb.client.model.bulk.ClientBulkWriteOptions; |
||||
import com.mongodb.client.model.bulk.ClientBulkWriteResult; |
||||
import com.mongodb.client.model.bulk.ClientNamespacedWriteModel; |
||||
|
||||
/** |
||||
* Internal API wrapping a {@link ReactiveMongoTemplate} to encapsulate {@link Bulk} handling using a reactive flow. |
||||
* |
||||
* @author Christoph Strobl |
||||
* @since 2026/02 |
||||
*/ |
||||
class ReactiveBulkWriter { |
||||
|
||||
ReactiveMongoTemplate template; |
||||
|
||||
ReactiveBulkWriter(ReactiveMongoTemplate template) { |
||||
this.template = template; |
||||
} |
||||
|
||||
public Mono<ClientBulkWriteResult> write(String defaultDatabase, BulkWriteOptions options, Bulk bulk) { |
||||
|
||||
return Flux.fromIterable(bulk.operations()).concatMap(bulkOp -> toWriteModelAndAfterSave(defaultDatabase, bulkOp)) |
||||
.collectList().flatMap(results -> { |
||||
|
||||
List<ClientNamespacedWriteModel> writeModels = new ArrayList<>(); |
||||
List<SourceAwareDocument<?>> afterSaveCallables = new ArrayList<>(); |
||||
|
||||
for (WriteModelAndAfterSave result : results) { |
||||
writeModels.add(result.model()); |
||||
if (result.afterSave() != null) { |
||||
afterSaveCallables.add(result.afterSave()); |
||||
} |
||||
} |
||||
|
||||
return template |
||||
.doWithClient(client -> client.bulkWrite(writeModels, |
||||
ClientBulkWriteOptions.clientBulkWriteOptions().ordered(options.getOrder().equals(BulkWriteOptions.Order.ORDERED)))) |
||||
.doOnSuccess(v -> afterSaveCallables.forEach(callable -> { |
||||
template.maybeEmitEvent( |
||||
new AfterSaveEvent<>(callable.source(), callable.document(), callable.collectionName())); |
||||
|
||||
})) |
||||
.flatMap(v -> Flux.concat(afterSaveCallables.stream().map(callable -> template |
||||
.maybeCallAfterSave(callable.source(), callable.document(), callable.collectionName())).toList()) |
||||
.then(Mono.just(v))); |
||||
}); |
||||
} |
||||
|
||||
private Mono<WriteModelAndAfterSave> toWriteModelAndAfterSave(String defaultDatabase, BulkOperation bulkOp) { |
||||
|
||||
String collectionName = bulkOp.context().namespace().collection() != null |
||||
? bulkOp.context().namespace().collection() |
||||
: template.getCollectionName(bulkOp.context().namespace().type()); |
||||
|
||||
MongoNamespace mongoNamespace = new MongoNamespace(defaultDatabase, collectionName); |
||||
|
||||
if (bulkOp instanceof Insert insert) { |
||||
|
||||
return template.prepareObjectForSaveReactive(collectionName, insert.value(), template.getConverter()) |
||||
.map(sourceAwareDocument -> { |
||||
ClientNamespacedWriteModel model = ClientNamespacedWriteModel.insertOne(mongoNamespace, |
||||
sourceAwareDocument.document()); |
||||
return new WriteModelAndAfterSave(model, sourceAwareDocument); |
||||
}); |
||||
} |
||||
|
||||
if (bulkOp instanceof Update update) { |
||||
|
||||
Class<?> domainType = update.context().namespace().type(); |
||||
boolean multi = !(bulkOp instanceof UpdateFirst); |
||||
|
||||
UpdateContext updateContext = template.getQueryOperations().updateContext(update.update(), update.query(), |
||||
update.upsert()); |
||||
MongoPersistentEntity<?> entity = template.getPersistentEntity(domainType); |
||||
|
||||
Document mappedQuery = updateContext.getMappedQuery(entity); |
||||
Object mappedUpdate = updateContext.isAggregationUpdate() ? updateContext.getUpdatePipeline(domainType) |
||||
: updateContext.getMappedUpdate(entity); |
||||
UpdateOptions updateOptions = updateContext.getUpdateOptions(domainType, update.query()); |
||||
|
||||
if (multi) { |
||||
|
||||
ClientNamespacedWriteModel model = BulkWriteSupport.updateMany(mongoNamespace, mappedQuery, mappedUpdate, |
||||
updateOptions); |
||||
return Mono.just(new WriteModelAndAfterSave(model, null)); |
||||
} |
||||
ClientNamespacedWriteModel model = BulkWriteSupport.updateOne(mongoNamespace, mappedQuery, mappedUpdate, |
||||
updateOptions); |
||||
return Mono.just(new WriteModelAndAfterSave(model, null)); |
||||
} |
||||
|
||||
if (bulkOp instanceof Remove remove) { |
||||
|
||||
Class<?> domainType = remove.context().namespace().type(); |
||||
DeleteContext deleteContext = template.getQueryOperations().deleteQueryContext(remove.query()); |
||||
|
||||
Document mappedQuery = deleteContext.getMappedQuery(template.getPersistentEntity(domainType)); |
||||
DeleteOptions deleteOptions = deleteContext.getDeleteOptions(domainType); |
||||
|
||||
if (remove instanceof RemoveFirst) { |
||||
ClientNamespacedWriteModel model = BulkWriteSupport.removeOne(mongoNamespace, mappedQuery, deleteOptions); |
||||
return Mono.just(new WriteModelAndAfterSave(model, null)); |
||||
} else { |
||||
ClientNamespacedWriteModel model = BulkWriteSupport.removeMany(mongoNamespace, mappedQuery, deleteOptions); |
||||
return Mono.just(new WriteModelAndAfterSave(model, null)); |
||||
} |
||||
} |
||||
|
||||
if (bulkOp instanceof Replace replace) { |
||||
|
||||
return template.prepareObjectForSaveReactive(collectionName, replace.replacement(), template.getConverter()) |
||||
.map(sourceAwareDocument -> { |
||||
|
||||
UpdateContext updateContext = template.getQueryOperations().replaceSingleContext(replace.query(), |
||||
MappedDocument.of(sourceAwareDocument.document()), replace.upsert()); |
||||
|
||||
Document mappedQuery = updateContext |
||||
.getMappedQuery(template.getPersistentEntity(replace.context().namespace().type())); |
||||
UpdateOptions updateOptions = updateContext.getUpdateOptions(replace.context().namespace().type(), |
||||
replace.query()); |
||||
|
||||
ClientNamespacedWriteModel model = BulkWriteSupport.replaceOne(mongoNamespace, mappedQuery, |
||||
sourceAwareDocument.document(), updateOptions); |
||||
return new WriteModelAndAfterSave(model, sourceAwareDocument); |
||||
}); |
||||
} |
||||
|
||||
return Mono.error(new IllegalStateException("Unknown bulk operation type: " + bulkOp.getClass())); |
||||
} |
||||
|
||||
private record WriteModelAndAfterSave(ClientNamespacedWriteModel model, SourceAwareDocument<?> afterSave) { |
||||
} |
||||
} |
||||
@ -0,0 +1,295 @@
@@ -0,0 +1,295 @@
|
||||
/* |
||||
* Copyright 2026-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb.core.bulk; |
||||
|
||||
import java.util.List; |
||||
import java.util.function.Consumer; |
||||
|
||||
import org.springframework.data.mongodb.core.bulk.BulkWriteOptions.Order; |
||||
import org.springframework.data.mongodb.core.query.CriteriaDefinition; |
||||
import org.springframework.data.mongodb.core.query.Query; |
||||
import org.springframework.data.mongodb.core.query.UpdateDefinition; |
||||
|
||||
/** |
||||
* Container for an ordered list of {@link BulkOperation bulk operations} that modify documents in one or more |
||||
* collections within a single request. Execution can be {@link Order#ORDERED ordered} (serial, stop on first error) or |
||||
* {@link Order#UNORDERED unordered} (possibly parallel, continue on errors). |
||||
* |
||||
* @author Christoph Strobl |
||||
* @since 5.1 |
||||
*/ |
||||
public interface Bulk { |
||||
|
||||
/** |
||||
* Returns the ordered list of bulk operations to execute. |
||||
* |
||||
* @return the ordered list of {@link BulkOperation operations}; never {@literal null}. |
||||
*/ |
||||
List<BulkOperation> operations(); |
||||
|
||||
/** |
||||
* Creates a new {@link Bulk} by applying the given consumer to a fresh {@link BulkBuilder}. |
||||
* |
||||
* @param consumer the consumer that configures the builder with operations; must not be {@literal null}. |
||||
* @return a new {@link Bulk} instance; never {@literal null}. |
||||
*/ |
||||
static Bulk create(Consumer<BulkBuilder> consumer) { |
||||
BulkBuilder bulkBuilder = Bulk.builder(); |
||||
consumer.accept(bulkBuilder); |
||||
return bulkBuilder.build(); |
||||
} |
||||
|
||||
/** |
||||
* Returns a new {@link BulkBuilder} to define and build a {@link Bulk}. |
||||
* |
||||
* <pre class="code"> |
||||
* Bulk bulk = Bulk.builder() |
||||
* .inCollection(Person.class).insert(p1).upsert(where(.... |
||||
* .inCollection("user").update(where(... |
||||
* .build(); |
||||
* </pre> |
||||
* |
||||
* @return a new {@link BulkBuilder}; never {@literal null}. |
||||
*/ |
||||
static BulkBuilder builder() { |
||||
return new NamespaceAwareBulkBuilder<>(); |
||||
} |
||||
|
||||
/** |
||||
* Builder for defining {@link BulkOperation bulk operations} across one or more collections. |
||||
*/ |
||||
interface BulkBuilder { |
||||
|
||||
/** |
||||
* Adds operations for the given collection name within a scoped consumer. |
||||
* |
||||
* @param collectionName the target collection name; must not be {@literal null} or empty. |
||||
* @param scoped the consumer that defines operations for that collection; must not be {@literal null}. |
||||
* @return this. |
||||
*/ |
||||
BulkBuilder inCollection(String collectionName, Consumer<BulkBuilderBase<Object>> scoped); |
||||
|
||||
/** |
||||
* Adds operations for the collection mapped to the given domain type within a scoped consumer. |
||||
* |
||||
* @param type the domain type used to resolve the collection; must not be {@literal null}. |
||||
* @param scoped the consumer that defines operations for that collection; must not be {@literal null}. |
||||
* @param <T> the domain type. |
||||
* @return this. |
||||
*/ |
||||
<T> BulkBuilder inCollection(Class<T> type, Consumer<BulkBuilderBase<T>> scoped); |
||||
|
||||
/** |
||||
* Switches the target to the given collection by name. Subsequent operations apply to this collection until another |
||||
* collection is selected. |
||||
* |
||||
* @param collectionName the target collection name; must not be {@literal null} or empty. |
||||
* @return a collection bound builder; never {@literal null}. |
||||
*/ |
||||
NamespaceBoundBulkBuilder<Object> inCollection(String collectionName); |
||||
|
||||
/** |
||||
* Switches the target to the collection mapped to the given domain type. |
||||
* |
||||
* @param type the domain type used to resolve the collection; must not be {@literal null}. |
||||
* @param <T> the domain type. |
||||
* @return a collection bound builder; never {@literal null}. |
||||
*/ |
||||
<T> NamespaceBoundBulkBuilder<T> inCollection(Class<T> type); |
||||
|
||||
/** |
||||
* Switches the target to the given collection name, using the given type for mapping. |
||||
* |
||||
* @param collectionName the target collection name; must not be {@literal null} or empty. |
||||
* @param type the domain type used for mapping; must not be {@literal null}. |
||||
* @param <T> the domain type. |
||||
* @return a collection bound builder; never {@literal null}. |
||||
*/ |
||||
<T> NamespaceBoundBulkBuilder<T> inCollection(String collectionName, Class<T> type); |
||||
|
||||
/** |
||||
* Builds the {@link Bulk} with all operations added so far. |
||||
* |
||||
* @return the built {@link Bulk}; never {@literal null}. |
||||
*/ |
||||
Bulk build(); |
||||
} |
||||
|
||||
/** |
||||
* Builder for adding bulk operations (insert, update, replace, remove) to a single collection. |
||||
* |
||||
* @param <T> the domain type for the target collection. |
||||
*/ |
||||
interface BulkBuilderBase<T> { |
||||
|
||||
/** |
||||
* Adds an insert of the given document. |
||||
* |
||||
* @param object the document to insert; must not be {@literal null}. |
||||
* @return this. |
||||
*/ |
||||
BulkBuilderBase<T> insert(T object); |
||||
|
||||
/** |
||||
* Adds inserts for all given documents. |
||||
* |
||||
* @param objects the documents to insert; must not be {@literal null}. |
||||
* @return this. |
||||
*/ |
||||
BulkBuilderBase<T> insertAll(Iterable<? extends T> objects); |
||||
|
||||
/** Adds an update-one operation (update at most one document matching the criteria). */ |
||||
default BulkBuilderBase<T> updateOne(CriteriaDefinition where, UpdateDefinition update) { |
||||
return updateOne(Query.query(where), update); |
||||
} |
||||
|
||||
/** |
||||
* Adds an update-one operation (update at most one document matching the filter). |
||||
* |
||||
* @param filter the query to select the document; must not be {@literal null}. |
||||
* @param update the update to apply; must not be {@literal null}. |
||||
* @return this. |
||||
*/ |
||||
BulkBuilderBase<T> updateOne(Query filter, UpdateDefinition update); |
||||
|
||||
/** Adds an update-many operation (update all documents matching the criteria). */ |
||||
default BulkBuilderBase<T> updateMulti(CriteriaDefinition where, UpdateDefinition update) { |
||||
return updateMulti(Query.query(where), update); |
||||
} |
||||
|
||||
/** |
||||
* Adds an update-many operation (update all documents matching the filter). |
||||
* |
||||
* @param filter the query to select documents; must not be {@literal null}. |
||||
* @param update the update to apply; must not be {@literal null}. |
||||
* @return this. |
||||
*/ |
||||
BulkBuilderBase<T> updateMulti(Query filter, UpdateDefinition update); |
||||
|
||||
/** Adds an upsert operation (update if a document matches, otherwise insert). */ |
||||
default BulkBuilderBase<T> upsert(CriteriaDefinition where, UpdateDefinition update) { |
||||
return upsert(Query.query(where), update); |
||||
} |
||||
|
||||
/** |
||||
* Adds an upsert operation (update if a document matches the filter, otherwise insert). |
||||
* |
||||
* @param filter the query to find an existing document; must not be {@literal null}. |
||||
* @param update the update to apply or use for the new document; must not be {@literal null}. |
||||
* @return this. |
||||
*/ |
||||
BulkBuilderBase<T> upsert(Query filter, UpdateDefinition update); |
||||
|
||||
/** Adds a remove operation (delete all documents matching the criteria). */ |
||||
default BulkBuilderBase<T> remove(CriteriaDefinition where) { |
||||
return remove(Query.query(where)); |
||||
} |
||||
|
||||
/** |
||||
* Adds a remove operation (delete all documents matching the filter). |
||||
* |
||||
* @param filter the query to select documents to delete; must not be {@literal null}. |
||||
* @return this. |
||||
*/ |
||||
BulkBuilderBase<T> remove(Query filter); |
||||
|
||||
/** Adds a replace-one operation (replace at most one document matching the criteria). */ |
||||
default BulkBuilderBase<T> replaceOne(CriteriaDefinition where, Object replacement) { |
||||
return replaceOne(Query.query(where), replacement); |
||||
} |
||||
|
||||
/** |
||||
* Adds a replace-one operation (replace at most one document matching the filter). |
||||
* |
||||
* @param filter the query to select the document; must not be {@literal null}. |
||||
* @param replacement the replacement document; must not be {@literal null}. |
||||
* @return this. |
||||
*/ |
||||
BulkBuilderBase<T> replaceOne(Query filter, Object replacement); |
||||
|
||||
/** Adds a replace-one-if-exists operation (replace only if a document matches the criteria). */ |
||||
default BulkBuilderBase<T> replaceIfExists(CriteriaDefinition where, Object replacement) { |
||||
return replaceIfExists(Query.query(where), replacement); |
||||
} |
||||
|
||||
/** |
||||
* Adds a replace-one-if-exists operation (replace only if a document matches the filter). |
||||
* |
||||
* @param filter the query to select the document; must not be {@literal null}. |
||||
* @param replacement the replacement document; must not be {@literal null}. |
||||
* @return this. |
||||
*/ |
||||
BulkBuilderBase<T> replaceIfExists(Query filter, Object replacement); |
||||
} |
||||
|
||||
/** |
||||
* Builder for bulk operations that is bound to a specific collection (namespace). Extends both {@link BulkBuilder} |
||||
* (to switch collection or build) and {@link BulkBuilderBase} (to add operations in the current collection). |
||||
* |
||||
* @param <T> the domain type for the bound collection. |
||||
*/ |
||||
interface NamespaceBoundBulkBuilder<T> extends BulkBuilderBase<T>, BulkBuilder { |
||||
|
||||
@Override |
||||
NamespaceBoundBulkBuilder<T> insert(T object); |
||||
|
||||
@Override |
||||
NamespaceBoundBulkBuilder<T> insertAll(Iterable<? extends T> objects); |
||||
|
||||
default NamespaceBoundBulkBuilder<T> updateOne(CriteriaDefinition where, UpdateDefinition update) { |
||||
return updateOne(Query.query(where), update); |
||||
} |
||||
|
||||
@Override |
||||
NamespaceBoundBulkBuilder<T> updateOne(Query filter, UpdateDefinition update); |
||||
|
||||
default NamespaceBoundBulkBuilder<T> updateMulti(CriteriaDefinition where, UpdateDefinition update) { |
||||
return updateMulti(Query.query(where), update); |
||||
} |
||||
|
||||
@Override |
||||
NamespaceBoundBulkBuilder<T> updateMulti(Query filter, UpdateDefinition update); |
||||
|
||||
default NamespaceBoundBulkBuilder<T> upsert(CriteriaDefinition where, UpdateDefinition update) { |
||||
return upsert(Query.query(where), update); |
||||
} |
||||
|
||||
@Override |
||||
NamespaceBoundBulkBuilder<T> upsert(Query filter, UpdateDefinition update); |
||||
|
||||
default NamespaceBoundBulkBuilder<T> remove(CriteriaDefinition where) { |
||||
return remove(Query.query(where)); |
||||
} |
||||
|
||||
@Override |
||||
NamespaceBoundBulkBuilder<T> remove(Query filter); |
||||
|
||||
default NamespaceBoundBulkBuilder<T> replaceOne(CriteriaDefinition where, Object replacement) { |
||||
return replaceOne(Query.query(where), replacement); |
||||
} |
||||
|
||||
@Override |
||||
NamespaceBoundBulkBuilder<T> replaceOne(Query filter, Object replacement); |
||||
|
||||
default NamespaceBoundBulkBuilder<T> replaceIfExists(CriteriaDefinition where, Object replacement) { |
||||
return replaceIfExists(Query.query(where), replacement); |
||||
} |
||||
|
||||
@Override |
||||
NamespaceBoundBulkBuilder<T> replaceIfExists(Query filter, Object replacement); |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,121 @@
@@ -0,0 +1,121 @@
|
||||
/* |
||||
* Copyright 2026-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb.core.bulk; |
||||
|
||||
import org.springframework.data.mongodb.core.query.Query; |
||||
import org.springframework.data.mongodb.core.query.UpdateDefinition; |
||||
|
||||
/** |
||||
* A single operation (insert, update, replace, or remove) within a {@link Bulk} . Each operation has a |
||||
* {@link #context()} that provides the target namespace (database and collection). |
||||
* |
||||
* @author Christoph Strobl |
||||
* @since 5.1 |
||||
*/ |
||||
public interface BulkOperation { |
||||
|
||||
/** |
||||
* Returns the context for this operation. |
||||
* |
||||
* @return the {@link BulkOperationContext}; never {@literal null}. |
||||
*/ |
||||
BulkOperationContext context(); |
||||
|
||||
/** |
||||
* Insert operation: insert a single document. |
||||
*/ |
||||
interface Insert extends BulkOperation { |
||||
|
||||
/** |
||||
* Returns the document to insert. |
||||
* |
||||
* @return the document; never {@literal null}. |
||||
*/ |
||||
Object value(); |
||||
} |
||||
|
||||
/** |
||||
* Update operation: update documents matching the {@link #query()}. |
||||
*/ |
||||
interface Update extends BulkOperation { |
||||
|
||||
/** |
||||
* Returns the update definition to apply. |
||||
* |
||||
* @return the update; never {@literal null}. |
||||
*/ |
||||
UpdateDefinition update(); |
||||
|
||||
/** |
||||
* Returns the query that selects which documents to update. |
||||
* |
||||
* @return the query; never {@literal null}. |
||||
*/ |
||||
Query query(); |
||||
|
||||
/** |
||||
* Returns whether to perform an upsert if no document matches. |
||||
* |
||||
* @return {@literal true} for upsert. |
||||
*/ |
||||
boolean upsert(); |
||||
} |
||||
|
||||
/** Update-one operation: update the first document matching the {@link #query()}. */ |
||||
interface UpdateFirst extends Update {} |
||||
|
||||
/** |
||||
* Remove operation: delete documents matching the {@link #query()}. |
||||
*/ |
||||
interface Remove extends BulkOperation { |
||||
/** |
||||
* Returns the query that selects which documents to remove. |
||||
* |
||||
* @return the query; never {@literal null}. |
||||
*/ |
||||
Query query(); |
||||
} |
||||
|
||||
/** Remove-one operation: delete the first document matching the {@link #query()}. */ |
||||
interface RemoveFirst extends Remove {} |
||||
|
||||
/** |
||||
* Replace operation: replace the document matching the {@link #query()} with the {@link #replacement()} document. |
||||
*/ |
||||
interface Replace extends BulkOperation { |
||||
|
||||
/** |
||||
* Returns the query that selects the document to replace. |
||||
* |
||||
* @return the query; never {@literal null}. |
||||
*/ |
||||
Query query(); |
||||
|
||||
/** |
||||
* Returns the replacement document. |
||||
* |
||||
* @return the replacement; never {@literal null}. |
||||
*/ |
||||
Object replacement(); |
||||
|
||||
/** |
||||
* Returns whether to perform an upsert if no document matches. |
||||
* |
||||
* @return {@literal true} for upsert. |
||||
*/ |
||||
boolean upsert(); |
||||
} |
||||
} |
||||
@ -0,0 +1,51 @@
@@ -0,0 +1,51 @@
|
||||
/* |
||||
* Copyright 2026-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb.core.bulk; |
||||
|
||||
import org.jspecify.annotations.Nullable; |
||||
|
||||
/** |
||||
* Context for a {@link BulkOperation}, providing the target namespace (database and collection) and optional domain |
||||
* type used for mapping. |
||||
* |
||||
* @author Christoph Strobl |
||||
* @since 5.1 |
||||
*/ |
||||
public interface BulkOperationContext { |
||||
|
||||
/** |
||||
* Returns the target namespace for this bulk operation. |
||||
* |
||||
* @return the {@link TypedNamespace}; never {@literal null}. |
||||
*/ |
||||
TypedNamespace namespace(); |
||||
|
||||
/** |
||||
* Value object holding namespace (database and collection) information and an optional domain type used for |
||||
* mapping {@link BulkOperation bulk operations}. |
||||
* <p> |
||||
* <strong>NOTE:</strong> Provide at least either {@literal type} or {@literal collection}. An explicit |
||||
* {@literal collection} name takes precedence over the collection name derived from {@literal type}. |
||||
* |
||||
* @param type target domain type for mapping queries and updates; used to derive collection name when |
||||
* {@literal collection} is {@literal null}; may be {@literal null}. |
||||
* @param database target database; use {@literal null} for the configured default database. |
||||
* @param collection target collection name; may be {@literal null}. |
||||
*/ |
||||
record TypedNamespace(@Nullable Class<?> type, @Nullable String database, @Nullable String collection) { |
||||
|
||||
} |
||||
} |
||||
@ -0,0 +1,125 @@
@@ -0,0 +1,125 @@
|
||||
/* |
||||
* Copyright 2026-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb.core.bulk; |
||||
|
||||
import com.mongodb.client.model.bulk.ClientBulkWriteResult; |
||||
|
||||
/** |
||||
* Result of a {@link Bulk} write execution. Exposes counts for inserted, modified, deleted, and upserted documents, and |
||||
* whether the operation was acknowledged by the server. |
||||
* |
||||
* @param <T> the type of the raw driver result (e.g. {@link ClientBulkWriteResult}). |
||||
* @author Christoph Strobl |
||||
* @since 5.1 |
||||
*/ |
||||
public interface BulkOperationResult<T> { |
||||
|
||||
/** |
||||
* Creates a {@link BulkOperationResult} from a MongoDB driver {@link ClientBulkWriteResult}. |
||||
* |
||||
* @param result the driver result; must not be {@literal null}. |
||||
* @return a new {@link BulkOperationResult} wrapping the given result; never {@literal null}. |
||||
*/ |
||||
static BulkOperationResult<ClientBulkWriteResult> from(ClientBulkWriteResult result) { |
||||
|
||||
return new BulkOperationResult<>() { |
||||
@Override |
||||
public long insertCount() { |
||||
return result.getInsertedCount(); |
||||
} |
||||
|
||||
@Override |
||||
public long modifiedCount() { |
||||
return result.getModifiedCount(); |
||||
} |
||||
|
||||
@Override |
||||
public long deleteCount() { |
||||
return result.getDeletedCount(); |
||||
} |
||||
|
||||
@Override |
||||
public long upsertCount() { |
||||
return result.getUpsertedCount(); |
||||
} |
||||
|
||||
@Override |
||||
public boolean acknowledged() { |
||||
return result.isAcknowledged(); |
||||
} |
||||
|
||||
@Override |
||||
public long matchedCount() { |
||||
return result.getMatchedCount(); |
||||
} |
||||
|
||||
@Override |
||||
public ClientBulkWriteResult rawResult() { |
||||
return result; |
||||
} |
||||
}; |
||||
} |
||||
|
||||
/** |
||||
* Returns the number of documents inserted. |
||||
* |
||||
* @return the insert count. |
||||
*/ |
||||
long insertCount(); |
||||
|
||||
/** |
||||
* Returns the number of documents modified by update operations. |
||||
* |
||||
* @return the modified count. |
||||
*/ |
||||
long modifiedCount(); |
||||
|
||||
/** |
||||
* Returns the number of documents deleted. |
||||
* |
||||
* @return the delete count. |
||||
*/ |
||||
long deleteCount(); |
||||
|
||||
/** |
||||
* Returns the number of documents upserted. |
||||
* |
||||
* @return the upsert count. |
||||
*/ |
||||
long upsertCount(); |
||||
|
||||
/** |
||||
* Returns whether the bulk write was acknowledged by the server. |
||||
* |
||||
* @return {@literal true} if acknowledged. |
||||
*/ |
||||
boolean acknowledged(); |
||||
|
||||
/** |
||||
* Returns the number of documents that matched the query criteria in update, replace, or remove operations. |
||||
* |
||||
* @return the matched count. |
||||
*/ |
||||
long matchedCount(); |
||||
|
||||
/** |
||||
* Returns the raw result from the MongoDB driver. |
||||
* |
||||
* @return the raw result. |
||||
*/ |
||||
T rawResult(); |
||||
|
||||
} |
||||
@ -0,0 +1,76 @@
@@ -0,0 +1,76 @@
|
||||
/* |
||||
* Copyright 2026-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb.core.bulk; |
||||
|
||||
/** |
||||
* Options for executing a {@link Bulk} write, such as whether operations run in {@link Order#ORDERED ordered} or |
||||
* {@link Order#UNORDERED unordered} mode. |
||||
* |
||||
* @author Christoph Strobl |
||||
* @since 5.1 |
||||
*/ |
||||
public class BulkWriteOptions { |
||||
|
||||
private final Order order; |
||||
|
||||
BulkWriteOptions(Order order) { |
||||
this.order = order; |
||||
} |
||||
|
||||
/** |
||||
* Returns options for ordered execution: operations run serially and execution stops on the first error. |
||||
* |
||||
* @return options for ordered bulk write; never {@literal null}. |
||||
*/ |
||||
public static BulkWriteOptions ordered() { |
||||
return new BulkWriteOptions(Order.ORDERED); |
||||
} |
||||
|
||||
/** |
||||
* Returns options for unordered execution: operations may run in any order (possibly in parallel) and execution |
||||
* continues even if some operations fail. |
||||
* |
||||
* @return options for unordered bulk write; never {@literal null}. |
||||
*/ |
||||
public static BulkWriteOptions unordered() { |
||||
return new BulkWriteOptions(Order.UNORDERED); |
||||
} |
||||
|
||||
/** |
||||
* Returns the execution order for the bulk write. |
||||
* |
||||
* @return the {@link Order}; never {@literal null}. |
||||
*/ |
||||
public Order getOrder() { |
||||
return order; |
||||
} |
||||
|
||||
/** |
||||
* Execution order for bulk write operations. |
||||
*/ |
||||
public enum Order { |
||||
|
||||
/** |
||||
* Execute {@link BulkOperation operations} in the order of {@link Bulk#operations()}; stop on first error. |
||||
*/ |
||||
ORDERED, |
||||
|
||||
/** |
||||
* Execute {@link BulkOperation operations} in no particular order; continue despite errors. |
||||
*/ |
||||
UNORDERED |
||||
} |
||||
} |
||||
@ -0,0 +1,272 @@
@@ -0,0 +1,272 @@
|
||||
/* |
||||
* Copyright 2026-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb.core.bulk; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.List; |
||||
import java.util.function.Consumer; |
||||
|
||||
import org.springframework.data.mongodb.core.bulk.Bulk.BulkBuilder; |
||||
import org.springframework.data.mongodb.core.bulk.Bulk.BulkBuilderBase; |
||||
import org.springframework.data.mongodb.core.bulk.Bulk.NamespaceBoundBulkBuilder; |
||||
import org.springframework.data.mongodb.core.bulk.BulkOperation.RemoveFirst; |
||||
import org.springframework.data.mongodb.core.bulk.BulkOperation.UpdateFirst; |
||||
import org.springframework.data.mongodb.core.bulk.BulkOperationContext.TypedNamespace; |
||||
import org.springframework.data.mongodb.core.query.Query; |
||||
import org.springframework.data.mongodb.core.query.UpdateDefinition; |
||||
|
||||
/** |
||||
* Default implementation of {@link BulkBuilder} and {@link Bulk.NamespaceBoundBulkBuilder} that tracks the current |
||||
* collection (namespace) and builds a list of {@link BulkOperation bulk operations} for execution. Supports bulk |
||||
* writes across multiple collections as in MongoDB's bulk write model. |
||||
* |
||||
* @author Christoph Strobl |
||||
* @since 5.1 |
||||
*/ |
||||
class NamespaceAwareBulkBuilder<T> implements BulkBuilder, NamespaceBoundBulkBuilder<T> { |
||||
|
||||
TypedNamespace namespace; |
||||
List<BulkOperation> bulkOperations = new ArrayList<>(); |
||||
|
||||
@Override |
||||
public NamespaceBoundBulkBuilder<T> insert(T object) { |
||||
bulkOperations.add(new BulkInsert(object, namespace)); |
||||
return this; |
||||
} |
||||
|
||||
@Override |
||||
public NamespaceBoundBulkBuilder<T> insertAll(Iterable<? extends T> objects) { |
||||
objects.forEach(this::insert); |
||||
return this; |
||||
} |
||||
|
||||
@Override |
||||
public NamespaceBoundBulkBuilder<T> updateOne(Query filter, UpdateDefinition update) { |
||||
bulkOperations.add(new BulkUpdateFirst(filter, update, false, namespace)); |
||||
return this; |
||||
} |
||||
|
||||
@Override |
||||
public NamespaceBoundBulkBuilder<T> updateMulti(Query filter, UpdateDefinition update) { |
||||
bulkOperations.add(new BulkUpdate(filter, update, false, namespace)); |
||||
return this; |
||||
} |
||||
|
||||
@Override |
||||
public NamespaceBoundBulkBuilder<T> upsert(Query filter, UpdateDefinition update) { |
||||
bulkOperations.add(new BulkUpdate(filter, update, true, namespace)); |
||||
return this; |
||||
} |
||||
|
||||
@Override |
||||
public NamespaceBoundBulkBuilder<T> remove(Query filter) { |
||||
bulkOperations.add(new BulkRemove(filter, namespace)); |
||||
return this; |
||||
} |
||||
|
||||
@Override |
||||
public NamespaceBoundBulkBuilder<T> replaceOne(Query filter, Object replacement) { |
||||
bulkOperations.add(new BulkReplace(filter, replacement, true, namespace)); |
||||
return this; |
||||
} |
||||
|
||||
@Override |
||||
public NamespaceBoundBulkBuilder<T> replaceIfExists(Query filter, Object replacement) { |
||||
bulkOperations.add(new BulkReplace(filter, replacement, false, namespace)); |
||||
return this; |
||||
} |
||||
|
||||
@Override |
||||
public BulkBuilder inCollection(String collectionName, Consumer<BulkBuilderBase<Object>> scoped) { |
||||
|
||||
TypedNamespace currentNamespace = this.namespace; |
||||
|
||||
NamespaceBoundBulkBuilder<Object> builder = inCollection(collectionName); |
||||
scoped.accept(builder); |
||||
|
||||
this.namespace = currentNamespace; |
||||
return this; |
||||
} |
||||
|
||||
@Override |
||||
@SuppressWarnings("unchecked") |
||||
public <S> NamespaceBoundBulkBuilder<S> inCollection(Class<S> type) { |
||||
this.namespace = new TypedNamespace(type, null, null); |
||||
return (NamespaceBoundBulkBuilder<S>) this; |
||||
} |
||||
|
||||
@Override |
||||
@SuppressWarnings("unchecked") |
||||
public NamespaceBoundBulkBuilder<Object> inCollection(String collectionName) { |
||||
this.namespace = new TypedNamespace(null, null, collectionName); |
||||
return (NamespaceBoundBulkBuilder<Object>) this; |
||||
} |
||||
|
||||
@Override |
||||
@SuppressWarnings("unchecked") |
||||
public <S> NamespaceBoundBulkBuilder<S> inCollection(String collectionName, Class<S> type) { |
||||
this.namespace = new TypedNamespace(type, null, collectionName); |
||||
return (NamespaceBoundBulkBuilder<S>) this; |
||||
} |
||||
|
||||
@Override |
||||
public Bulk build() { |
||||
return () -> List.copyOf(NamespaceAwareBulkBuilder.this.bulkOperations); |
||||
} |
||||
|
||||
@Override |
||||
public <S> BulkBuilder inCollection(Class<S> type, Consumer<BulkBuilderBase<S>> scoped) { |
||||
TypedNamespace currentNamespace = this.namespace; |
||||
|
||||
NamespaceBoundBulkBuilder<S> builder = inCollection(type); |
||||
scoped.accept(builder); |
||||
|
||||
this.namespace = currentNamespace; |
||||
return this; |
||||
} |
||||
|
||||
private static class ContextAware { |
||||
|
||||
protected BulkOperationContext context; |
||||
|
||||
public void setContext(BulkOperationContext context) { |
||||
this.context = context; |
||||
} |
||||
|
||||
public BulkOperationContext context() { |
||||
return context; |
||||
} |
||||
} |
||||
|
||||
private static class DefaultBulkOperationContext implements BulkOperationContext { |
||||
|
||||
TypedNamespace namespace; |
||||
|
||||
public DefaultBulkOperationContext(TypedNamespace namespace) { |
||||
this.namespace = namespace; |
||||
} |
||||
|
||||
@Override |
||||
public TypedNamespace namespace() { |
||||
return namespace; |
||||
} |
||||
} |
||||
|
||||
static class BulkInsert extends ContextAware implements BulkOperation.Insert { |
||||
|
||||
private final Object value; |
||||
|
||||
public BulkInsert(Object value, TypedNamespace namespace) { |
||||
|
||||
this.value = value; |
||||
setContext(new DefaultBulkOperationContext(namespace)); |
||||
} |
||||
|
||||
@Override |
||||
public Object value() { |
||||
return value; |
||||
} |
||||
} |
||||
|
||||
static class BulkUpdate extends ContextAware implements BulkOperation.Update { |
||||
|
||||
private final Query query; |
||||
private final UpdateDefinition update; |
||||
private final boolean upsert; |
||||
|
||||
public BulkUpdate(Query query, UpdateDefinition update, boolean upsert, TypedNamespace namespace) { |
||||
this.query = query; |
||||
this.update = update; |
||||
this.upsert = upsert; |
||||
setContext(new DefaultBulkOperationContext(namespace)); |
||||
} |
||||
|
||||
@Override |
||||
public UpdateDefinition update() { |
||||
return update; |
||||
} |
||||
|
||||
@Override |
||||
public Query query() { |
||||
return query; |
||||
} |
||||
|
||||
@Override |
||||
public boolean upsert() { |
||||
return upsert; |
||||
} |
||||
} |
||||
|
||||
static class BulkUpdateFirst extends BulkUpdate implements UpdateFirst { |
||||
|
||||
public BulkUpdateFirst(Query query, UpdateDefinition update, boolean upsert, TypedNamespace namespace) { |
||||
super(query, update, upsert, namespace); |
||||
} |
||||
|
||||
} |
||||
|
||||
static class BulkRemove extends ContextAware implements BulkOperation.Remove { |
||||
|
||||
private final Query query; |
||||
|
||||
public BulkRemove(Query query, TypedNamespace namespace) { |
||||
this.query = query; |
||||
setContext(new DefaultBulkOperationContext(namespace)); |
||||
} |
||||
|
||||
@Override |
||||
public Query query() { |
||||
return query; |
||||
} |
||||
} |
||||
|
||||
static class BulkRemoveFirst extends BulkRemove implements RemoveFirst { |
||||
|
||||
public BulkRemoveFirst(Query query, TypedNamespace namespace) { |
||||
super(query, namespace); |
||||
} |
||||
} |
||||
|
||||
static class BulkReplace extends ContextAware implements BulkOperation.Replace { |
||||
|
||||
private final Query query; |
||||
private final Object replacement; |
||||
private final boolean upsert; |
||||
|
||||
BulkReplace(Query query, Object replacement, boolean upsert, TypedNamespace namespace) { |
||||
this.query = query; |
||||
this.replacement = replacement; |
||||
this.upsert = upsert; |
||||
setContext(new DefaultBulkOperationContext(namespace)); |
||||
} |
||||
|
||||
@Override |
||||
public Query query() { |
||||
return query; |
||||
} |
||||
|
||||
@Override |
||||
public Object replacement() { |
||||
return replacement; |
||||
} |
||||
|
||||
@Override |
||||
public boolean upsert() { |
||||
return upsert; |
||||
} |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,6 @@
@@ -0,0 +1,6 @@
|
||||
/** |
||||
* Abstraction for MongoDB <a href="https://www.mongodb.com/docs/manual/core/bulk-write-operations/">bulk write |
||||
* operations</a>. |
||||
*/ |
||||
@org.jspecify.annotations.NullMarked |
||||
package org.springframework.data.mongodb.core.bulk; |
||||
@ -0,0 +1,472 @@
@@ -0,0 +1,472 @@
|
||||
/* |
||||
* Copyright 2026-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb.core; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; |
||||
import static org.springframework.data.mongodb.core.query.Criteria.where; |
||||
|
||||
import java.util.Arrays; |
||||
import java.util.List; |
||||
|
||||
import org.bson.Document; |
||||
import org.junit.jupiter.api.BeforeEach; |
||||
import org.junit.jupiter.api.Test; |
||||
import org.springframework.data.mongodb.core.bulk.Bulk; |
||||
import org.springframework.data.mongodb.core.bulk.BulkOperationResult; |
||||
import org.springframework.data.mongodb.core.bulk.BulkWriteOptions; |
||||
import org.springframework.data.mongodb.core.query.Query; |
||||
import org.springframework.data.mongodb.core.query.Update; |
||||
import org.springframework.data.mongodb.core.query.UpdateDefinition; |
||||
import org.springframework.data.mongodb.test.util.Client; |
||||
import org.springframework.data.mongodb.test.util.EnableIfMongoServerVersion; |
||||
import org.springframework.data.mongodb.test.util.MongoTestTemplate; |
||||
import org.springframework.data.mongodb.test.util.Template; |
||||
import org.springframework.data.util.Pair; |
||||
|
||||
import com.mongodb.ClientBulkWriteException; |
||||
import com.mongodb.client.MongoClient; |
||||
import com.mongodb.client.MongoCollection; |
||||
|
||||
/** |
||||
* @author Christoph Strobl |
||||
* @since 2026/02 |
||||
*/ |
||||
@EnableIfMongoServerVersion(isGreaterThanEqual = "8.0") |
||||
public class MongoTemplateBulkTests { |
||||
|
||||
@Client static MongoClient mongoClient; |
||||
|
||||
@Template(initialEntitySet = { BaseDoc.class, SpecialDoc.class }) //
|
||||
static MongoTestTemplate operations; |
||||
|
||||
@BeforeEach |
||||
public void setUp() { |
||||
operations.flushDatabase(); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void bulkWriteMultipleCollections() { |
||||
|
||||
operations.flushDatabase(); |
||||
|
||||
BaseDoc doc1 = new BaseDoc(); |
||||
doc1.id = "id-doc1"; |
||||
doc1.value = "value-doc1"; |
||||
|
||||
BaseDoc doc2 = new BaseDoc(); |
||||
doc2.id = "id-doc2"; |
||||
doc2.value = "value-doc2"; |
||||
|
||||
Bulk bulk = Bulk |
||||
.create( |
||||
builder -> builder |
||||
.inCollection(BaseDoc.class, |
||||
ops -> ops.insert(doc1).insert(doc2).upsert(where("_id").is("id-doc3"), |
||||
new Update().set("value", "upserted"))) |
||||
.inCollection(SpecialDoc.class).insert(new SpecialDoc())); |
||||
|
||||
operations.bulkWrite(bulk, BulkWriteOptions.ordered()); |
||||
|
||||
Long inBaseDocCollection = operations.execute(BaseDoc.class, MongoCollection::countDocuments); |
||||
Long inSpecialCollection = operations.execute(SpecialDoc.class, MongoCollection::countDocuments); |
||||
assertThat(inBaseDocCollection).isEqualTo(3L); |
||||
assertThat(inSpecialCollection).isOne(); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void insertOrderedAcrossCollections() { |
||||
|
||||
BaseDoc doc1 = newDoc("1"); |
||||
BaseDoc doc2 = newDoc("2"); |
||||
SpecialDoc specialDoc = new SpecialDoc(); |
||||
specialDoc.id = "id-special"; |
||||
specialDoc.value = "value-special"; |
||||
|
||||
Bulk bulk = Bulk.builder().inCollection(BaseDoc.class, ops -> ops.insert(doc1).insert(doc2)) |
||||
.inCollection(SpecialDoc.class).insert(specialDoc).build(); |
||||
BulkOperationResult result = operations.bulkWrite(bulk, BulkWriteOptions.ordered()); |
||||
|
||||
assertThat(result.insertCount()).isEqualTo(3); |
||||
|
||||
Long baseCount = operations.execute(BaseDoc.class, MongoCollection::countDocuments); |
||||
Long specialCount = operations.execute(SpecialDoc.class, MongoCollection::countDocuments); |
||||
assertThat(baseCount).isEqualTo(2L); |
||||
assertThat(specialCount).isOne(); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void insertOrderedFailsStopsAtDuplicateInCollection() { |
||||
|
||||
BaseDoc doc1 = newDoc("1"); |
||||
Bulk bulk = Bulk.builder().inCollection(BaseDoc.class, ops -> ops.insert(doc1).insert(doc1)) |
||||
.inCollection(SpecialDoc.class).insert(new SpecialDoc()).build(); |
||||
|
||||
assertThatThrownBy(() -> operations.bulkWrite(bulk, BulkWriteOptions.ordered())) //
|
||||
// .isInstanceOf(BulkOperationException.class) // TODO
|
||||
.hasCauseInstanceOf(ClientBulkWriteException.class) //
|
||||
.extracting(Throwable::getCause) //
|
||||
.satisfies(it -> { |
||||
ClientBulkWriteException ex = (ClientBulkWriteException) it; |
||||
assertThat(ex.getPartialResult().get().getInsertedCount()).isOne(); |
||||
assertThat(ex.getWriteErrors()).isNotNull(); |
||||
assertThat(ex.getWriteErrors().size()).isOne(); |
||||
}); |
||||
|
||||
Long baseCount = operations.execute(BaseDoc.class, MongoCollection::countDocuments); |
||||
Long specialCount = operations.execute(SpecialDoc.class, MongoCollection::countDocuments); |
||||
assertThat(baseCount).isOne(); |
||||
assertThat(specialCount).isZero(); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void insertUnOrderedAcrossCollections() { |
||||
|
||||
BaseDoc doc1 = newDoc("1"); |
||||
BaseDoc doc2 = newDoc("2"); |
||||
SpecialDoc specialDoc = new SpecialDoc(); |
||||
specialDoc.id = "id-special"; |
||||
|
||||
Bulk bulk = Bulk.builder().inCollection(BaseDoc.class, ops -> ops.insert(doc1).insert(doc2)) |
||||
.inCollection(SpecialDoc.class).insert(specialDoc).build(); |
||||
BulkOperationResult result = operations.bulkWrite(bulk, BulkWriteOptions.unordered()); |
||||
|
||||
assertThat(result.insertCount()).isEqualTo(3); |
||||
Long baseCount = operations.execute(BaseDoc.class, MongoCollection::countDocuments); |
||||
Long specialCount = operations.execute(SpecialDoc.class, MongoCollection::countDocuments); |
||||
assertThat(baseCount).isEqualTo(2L); |
||||
assertThat(specialCount).isOne(); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void insertUnOrderedContinuesOnErrorInOneCollection() { |
||||
|
||||
BaseDoc doc1 = newDoc("1"); |
||||
Bulk bulk = Bulk.builder().inCollection(BaseDoc.class, ops -> ops.insert(doc1).insert(doc1)) |
||||
.inCollection(SpecialDoc.class).insert(new SpecialDoc()).build(); |
||||
|
||||
assertThatThrownBy(() -> operations.bulkWrite(bulk, BulkWriteOptions.unordered())) //
|
||||
// .isInstanceOf(BulkOperationException.class) // TODO
|
||||
.hasCauseInstanceOf(ClientBulkWriteException.class) //
|
||||
.extracting(Throwable::getCause) //
|
||||
.satisfies(it -> { |
||||
ClientBulkWriteException ex = (ClientBulkWriteException) it; |
||||
assertThat(ex.getPartialResult().get().getInsertedCount()).isEqualTo(2); |
||||
assertThat(ex.getWriteErrors()).isNotNull(); |
||||
assertThat(ex.getWriteErrors().size()).isOne(); |
||||
}); |
||||
|
||||
Long baseCount = operations.execute(BaseDoc.class, MongoCollection::countDocuments); |
||||
Long specialCount = operations.execute(SpecialDoc.class, MongoCollection::countDocuments); |
||||
assertThat(baseCount).isOne(); |
||||
assertThat(specialCount).isOne(); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void updateOneAcrossCollections() { |
||||
|
||||
insertSomeDocumentsIntoBaseDoc(); |
||||
insertSomeDocumentsIntoSpecialDoc(); |
||||
|
||||
List<Pair<Query, UpdateDefinition>> updatesBase = Arrays |
||||
.asList(Pair.of(queryWhere("value", "value1"), set("value", "value3"))); |
||||
List<Pair<Query, UpdateDefinition>> updatesSpecial = Arrays |
||||
.asList(Pair.of(queryWhere("value", "value1"), set("value", "value3"))); |
||||
|
||||
Bulk bulk = Bulk.builder() |
||||
.inCollection(BaseDoc.class, ops -> updatesBase.forEach(p -> ops.updateOne(p.getFirst(), p.getSecond()))) |
||||
.inCollection(SpecialDoc.class, ops -> updatesSpecial.forEach(p -> ops.updateOne(p.getFirst(), p.getSecond()))) |
||||
.build(); |
||||
BulkOperationResult result = operations.bulkWrite(bulk, BulkWriteOptions.ordered()); |
||||
|
||||
assertThat(result.modifiedCount()).isEqualTo(2); |
||||
|
||||
Long baseWithValue3 = operations.execute(BaseDoc.class, col -> col.countDocuments(new Document("value", "value3"))); |
||||
Long specialWithValue3 = operations.execute(SpecialDoc.class, |
||||
col -> col.countDocuments(new Document("value", "value3"))); |
||||
assertThat(baseWithValue3).isEqualTo(1L); |
||||
assertThat(specialWithValue3).isEqualTo(1L); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void updateMultiAcrossCollections() { |
||||
|
||||
insertSomeDocumentsIntoBaseDoc(); |
||||
insertSomeDocumentsIntoSpecialDoc(); |
||||
|
||||
List<Pair<Query, UpdateDefinition>> updatesBase = Arrays.asList( |
||||
Pair.of(queryWhere("value", "value1"), set("value", "value3")), |
||||
Pair.of(queryWhere("value", "value2"), set("value", "value4"))); |
||||
List<Pair<Query, UpdateDefinition>> updatesSpecial = Arrays.asList( |
||||
Pair.of(queryWhere("value", "value1"), set("value", "value3")), |
||||
Pair.of(queryWhere("value", "value2"), set("value", "value4"))); |
||||
|
||||
Bulk bulk = Bulk.builder() |
||||
.inCollection(BaseDoc.class, ops -> updatesBase.forEach(p -> ops.updateMulti(p.getFirst(), p.getSecond()))) |
||||
.inCollection(SpecialDoc.class, |
||||
ops -> updatesSpecial.forEach(p -> ops.updateMulti(p.getFirst(), p.getSecond()))) |
||||
.build(); |
||||
BulkOperationResult result = operations.bulkWrite(bulk, BulkWriteOptions.ordered()); |
||||
|
||||
assertThat(result.modifiedCount()).isEqualTo(8); |
||||
|
||||
Long baseValue3 = operations.execute(BaseDoc.class, col -> col.countDocuments(new Document("value", "value3"))); |
||||
Long baseValue4 = operations.execute(BaseDoc.class, col -> col.countDocuments(new Document("value", "value4"))); |
||||
Long specialValue3 = operations.execute(SpecialDoc.class, |
||||
col -> col.countDocuments(new Document("value", "value3"))); |
||||
Long specialValue4 = operations.execute(SpecialDoc.class, |
||||
col -> col.countDocuments(new Document("value", "value4"))); |
||||
assertThat(baseValue3).isEqualTo(2L); |
||||
assertThat(baseValue4).isEqualTo(2L); |
||||
assertThat(specialValue3).isEqualTo(2L); |
||||
assertThat(specialValue4).isEqualTo(2L); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void upsertDoesUpdateInEachCollection() { |
||||
|
||||
insertSomeDocumentsIntoBaseDoc(); |
||||
insertSomeDocumentsIntoSpecialDoc(); |
||||
|
||||
Bulk bulk = Bulk.builder() |
||||
.inCollection(BaseDoc.class, ops -> ops.upsert(queryWhere("value", "value1"), set("value", "value2"))) |
||||
.inCollection(SpecialDoc.class, ops -> ops.upsert(queryWhere("value", "value1"), set("value", "value2"))) |
||||
.build(); |
||||
BulkOperationResult result = operations.bulkWrite(bulk, BulkWriteOptions.ordered()); |
||||
|
||||
assertThat(result.matchedCount()).isEqualTo(4); |
||||
assertThat(result.modifiedCount()).isEqualTo(4); |
||||
assertThat(result.insertCount()).isZero(); |
||||
assertThat(result.upsertCount()).isZero(); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void upsertDoesInsertInEachCollection() { |
||||
|
||||
Bulk bulk = Bulk.builder() |
||||
.inCollection(BaseDoc.class, ops -> ops.upsert(queryWhere("_id", "new-id-1"), set("value", "upserted1"))) |
||||
.inCollection(SpecialDoc.class, ops -> ops.upsert(queryWhere("_id", "new-id-2"), set("value", "upserted2"))) |
||||
.build(); |
||||
BulkOperationResult result = operations.bulkWrite(bulk, BulkWriteOptions.ordered()); |
||||
|
||||
assertThat(result.matchedCount()).isZero(); |
||||
assertThat(result.modifiedCount()).isZero(); |
||||
assertThat(result.upsertCount()).isEqualTo(2); |
||||
|
||||
assertThat(operations.findOne(queryWhere("_id", "new-id-1"), BaseDoc.class)).isNotNull(); |
||||
assertThat(operations.findOne(queryWhere("_id", "new-id-2"), SpecialDoc.class)).isNotNull(); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void removeAcrossCollections() { |
||||
|
||||
insertSomeDocumentsIntoBaseDoc(); |
||||
insertSomeDocumentsIntoSpecialDoc(); |
||||
|
||||
List<Query> removesBase = Arrays.asList(queryWhere("_id", "1"), queryWhere("value", "value2")); |
||||
List<Query> removesSpecial = Arrays.asList(queryWhere("_id", "1"), queryWhere("value", "value2")); |
||||
|
||||
Bulk bulk = Bulk.builder().inCollection(BaseDoc.class, ops -> removesBase.forEach(ops::remove)) |
||||
.inCollection(SpecialDoc.class, ops -> removesSpecial.forEach(ops::remove)).build(); |
||||
BulkOperationResult result = operations.bulkWrite(bulk, BulkWriteOptions.ordered()); |
||||
|
||||
assertThat(result.deleteCount()).isEqualTo(6); |
||||
|
||||
Long baseCount = operations.execute(BaseDoc.class, MongoCollection::countDocuments); |
||||
Long specialCount = operations.execute(SpecialDoc.class, MongoCollection::countDocuments); |
||||
assertThat(baseCount).isOne(); |
||||
assertThat(specialCount).isOne(); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void replaceOneAcrossCollections() { |
||||
|
||||
insertSomeDocumentsIntoBaseDoc(); |
||||
insertSomeDocumentsIntoSpecialDoc(); |
||||
|
||||
Document replacementBase = rawDoc("1", "replaced-base"); |
||||
Document replacementSpecial = new Document("_id", "1").append("value", "replaced-special").append("specialValue", |
||||
"special"); |
||||
|
||||
Bulk bulk = Bulk.builder() |
||||
.inCollection(BaseDoc.class, ops -> ops.replaceOne(queryWhere("_id", "1"), replacementBase)) |
||||
.inCollection(SpecialDoc.class, ops -> ops.replaceOne(queryWhere("_id", "1"), replacementSpecial)).build(); |
||||
BulkOperationResult result = operations.bulkWrite(bulk, BulkWriteOptions.ordered()); |
||||
|
||||
assertThat(result.matchedCount()).isEqualTo(2); |
||||
assertThat(result.modifiedCount()).isEqualTo(2); |
||||
|
||||
Document inBase = operations.execute(BaseDoc.class, col -> col.find(new Document("_id", "1")).first()); |
||||
Document inSpecial = operations.execute(SpecialDoc.class, col -> col.find(new Document("_id", "1")).first()); |
||||
assertThat(inBase).containsEntry("value", "replaced-base"); |
||||
assertThat(inSpecial).containsEntry("value", "replaced-special").containsEntry("specialValue", "special"); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void replaceOneWithUpsertInCollection() { |
||||
|
||||
Document replacement = rawDoc("new-id", "upserted-value"); |
||||
|
||||
Bulk bulk = Bulk.builder() |
||||
.inCollection(BaseDoc.class, ops -> ops.replaceOne(queryWhere("_id", "new-id"), replacement)).build(); |
||||
BulkOperationResult result = operations.bulkWrite(bulk, BulkWriteOptions.ordered()); |
||||
|
||||
assertThat(result.matchedCount()).isZero(); |
||||
assertThat(result.modifiedCount()).isZero(); |
||||
assertThat(result.upsertCount()).isOne(); |
||||
|
||||
assertThat(operations.findOne(queryWhere("_id", "new-id"), BaseDoc.class)).isNotNull(); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void mixedBulkOrderedAcrossCollections() { |
||||
|
||||
BaseDoc doc1 = newDoc("1", "v1"); |
||||
SpecialDoc doc2 = new SpecialDoc(); |
||||
doc2.id = "2"; |
||||
doc2.value = "v2"; |
||||
|
||||
Bulk bulk = Bulk.builder().inCollection(BaseDoc.class, |
||||
ops -> ops.insert(doc1).updateOne(queryWhere("_id", "1"), set("value", "v2")).remove(queryWhere("value", "v2"))) |
||||
.inCollection(SpecialDoc.class).insert(doc2).build(); |
||||
BulkOperationResult result = operations.bulkWrite(bulk, BulkWriteOptions.ordered()); |
||||
|
||||
assertThat(result.insertCount()).isEqualTo(2); |
||||
assertThat(result.modifiedCount()).isOne(); |
||||
assertThat(result.deleteCount()).isOne(); |
||||
|
||||
Long baseCount = operations.execute(BaseDoc.class, MongoCollection::countDocuments); |
||||
Long specialCount = operations.execute(SpecialDoc.class, MongoCollection::countDocuments); |
||||
assertThat(baseCount).isZero(); |
||||
assertThat(specialCount).isOne(); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void mixedBulkOrderedWithListAcrossCollections() { |
||||
|
||||
List<BaseDoc> insertsBase = Arrays.asList(newDoc("1", "v1"), newDoc("2", "v2"), newDoc("3", "v2")); |
||||
List<Pair<Query, UpdateDefinition>> updatesBase = Arrays |
||||
.asList(Pair.of(queryWhere("value", "v2"), set("value", "v3"))); |
||||
List<Query> removesBase = Arrays.asList(queryWhere("_id", "1")); |
||||
|
||||
SpecialDoc specialDoc = new SpecialDoc(); |
||||
specialDoc.id = "s1"; |
||||
specialDoc.value = "sv1"; |
||||
|
||||
Bulk bulk = Bulk.builder().inCollection(BaseDoc.class, ops -> { |
||||
ops.insertAll(insertsBase); |
||||
updatesBase.forEach(p -> ops.updateMulti(p.getFirst(), p.getSecond())); |
||||
removesBase.forEach(ops::remove); |
||||
}).inCollection(SpecialDoc.class).insert(specialDoc).build(); |
||||
BulkOperationResult result = operations.bulkWrite(bulk, BulkWriteOptions.ordered()); |
||||
|
||||
assertThat(result.insertCount()).isEqualTo(4); |
||||
assertThat(result.modifiedCount()).isEqualTo(2); |
||||
assertThat(result.deleteCount()).isOne(); |
||||
|
||||
Long baseCount = operations.execute(BaseDoc.class, MongoCollection::countDocuments); |
||||
Long specialCount = operations.execute(SpecialDoc.class, MongoCollection::countDocuments); |
||||
assertThat(baseCount).isEqualTo(2L); |
||||
assertThat(specialCount).isOne(); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void insertShouldConsiderInheritancePerCollection() { |
||||
|
||||
SpecialDoc specialDoc = new SpecialDoc(); |
||||
specialDoc.id = "id-special"; |
||||
specialDoc.value = "normal-value"; |
||||
specialDoc.specialValue = "special-value"; |
||||
|
||||
Bulk bulk = Bulk.builder().inCollection(SpecialDoc.class).insert(specialDoc).build(); |
||||
operations.bulkWrite(bulk, BulkWriteOptions.ordered()); |
||||
|
||||
BaseDoc doc = operations.findOne(queryWhere("_id", specialDoc.id), BaseDoc.class, |
||||
operations.getCollectionName(SpecialDoc.class)); |
||||
assertThat(doc).isNotNull(); |
||||
assertThat(doc).isInstanceOf(SpecialDoc.class); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void switchingDatabasesBackAndForth/* srly, why? */() { |
||||
|
||||
mongoClient.getDatabase(operations.getDb().getName()).drop(); |
||||
mongoClient.getDatabase("bulk-ops-db-2").drop(); |
||||
mongoClient.getDatabase("bulk-ops-db-3").drop(); |
||||
|
||||
Bulk bulk = Bulk.builder().inCollection("c1").insert(newDoc("c1-id-1", "v1")).build(); |
||||
operations.bulkWrite(bulk, BulkWriteOptions.ordered()); |
||||
mongoClient.getDatabase("bulk-ops-db-2").getCollection("c1").insertOne(rawDoc("c1-id-1", "v1")); |
||||
mongoClient.getDatabase("bulk-ops-db-3").getCollection("c1").insertOne(rawDoc("c1-id-1", "v1")); |
||||
|
||||
Document inDefaultDB = mongoClient.getDatabase(operations.getDb().getName()).getCollection("c1") |
||||
.find(new Document("_id", "c1-id-1")).first(); |
||||
Document inDB2 = mongoClient.getDatabase("bulk-ops-db-2").getCollection("c1").find(new Document("_id", "c1-id-1")) |
||||
.first(); |
||||
Document inDB3 = mongoClient.getDatabase("bulk-ops-db-3").getCollection("c1").find(new Document("_id", "c1-id-1")) |
||||
.first(); |
||||
assertThat(inDefaultDB).isNotNull(); |
||||
assertThat(inDB2).isNotNull(); |
||||
assertThat(inDB3).isNotNull(); |
||||
} |
||||
|
||||
private void insertSomeDocumentsIntoBaseDoc() { |
||||
String coll = operations.getCollectionName(BaseDoc.class); |
||||
operations.execute(coll, col -> { |
||||
col.insertOne(rawDoc("1", "value1")); |
||||
col.insertOne(rawDoc("2", "value1")); |
||||
col.insertOne(rawDoc("3", "value2")); |
||||
col.insertOne(rawDoc("4", "value2")); |
||||
return null; |
||||
}); |
||||
} |
||||
|
||||
private void insertSomeDocumentsIntoSpecialDoc() { |
||||
String coll = operations.getCollectionName(SpecialDoc.class); |
||||
operations.execute(coll, col -> { |
||||
col.insertOne(rawDoc("1", "value1")); |
||||
col.insertOne(rawDoc("2", "value1")); |
||||
col.insertOne(rawDoc("3", "value2")); |
||||
col.insertOne(rawDoc("4", "value2")); |
||||
return null; |
||||
}); |
||||
} |
||||
|
||||
private static BaseDoc newDoc(String id) { |
||||
BaseDoc doc = new BaseDoc(); |
||||
doc.id = id; |
||||
return doc; |
||||
} |
||||
|
||||
private static BaseDoc newDoc(String id, String value) { |
||||
BaseDoc doc = newDoc(id); |
||||
doc.value = value; |
||||
return doc; |
||||
} |
||||
|
||||
private static Query queryWhere(String field, String value) { |
||||
return new Query(org.springframework.data.mongodb.core.query.Criteria.where(field).is(value)); |
||||
} |
||||
|
||||
private static Update set(String field, String value) { |
||||
return new Update().set(field, value); |
||||
} |
||||
|
||||
private static Document rawDoc(String id, String value) { |
||||
return new Document("_id", id).append("value", value); |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,421 @@
@@ -0,0 +1,421 @@
|
||||
/* |
||||
* Copyright 2026-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb.core; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
import static org.mockito.ArgumentMatchers.any; |
||||
import static org.mockito.ArgumentMatchers.anyString; |
||||
import static org.mockito.ArgumentMatchers.eq; |
||||
import static org.mockito.Mockito.never; |
||||
import static org.mockito.Mockito.spy; |
||||
import static org.mockito.Mockito.verify; |
||||
import static org.mockito.Mockito.verifyNoInteractions; |
||||
import static org.mockito.Mockito.when; |
||||
import static org.springframework.data.mongodb.core.query.Criteria.where; |
||||
import static org.springframework.data.mongodb.core.query.Query.query; |
||||
|
||||
import java.util.Collection; |
||||
import java.util.List; |
||||
|
||||
import org.assertj.core.api.InstanceOfAssertFactories; |
||||
import org.bson.Document; |
||||
import org.junit.jupiter.api.BeforeEach; |
||||
import org.junit.jupiter.api.Test; |
||||
import org.junit.jupiter.api.extension.ExtendWith; |
||||
import org.mockito.Answers; |
||||
import org.mockito.ArgumentCaptor; |
||||
import org.mockito.Captor; |
||||
import org.mockito.Mock; |
||||
import org.mockito.MockingDetails; |
||||
import org.mockito.Mockito; |
||||
import org.mockito.invocation.Invocation; |
||||
import org.mockito.junit.jupiter.MockitoExtension; |
||||
import org.mockito.junit.jupiter.MockitoSettings; |
||||
import org.mockito.quality.Strictness; |
||||
import org.springframework.context.ApplicationEventPublisher; |
||||
import org.springframework.dao.DataAccessException; |
||||
import org.springframework.dao.support.PersistenceExceptionTranslator; |
||||
import org.springframework.data.annotation.Id; |
||||
import org.springframework.data.mapping.callback.EntityCallbacks; |
||||
import org.springframework.data.mongodb.core.bulk.Bulk; |
||||
import org.springframework.data.mongodb.core.bulk.Bulk.NamespaceBoundBulkBuilder; |
||||
import org.springframework.data.mongodb.core.bulk.BulkWriteOptions; |
||||
import org.springframework.data.mongodb.core.convert.DbRefResolver; |
||||
import org.springframework.data.mongodb.core.convert.MappingMongoConverter; |
||||
import org.springframework.data.mongodb.core.convert.MongoConverter; |
||||
import org.springframework.data.mongodb.core.mapping.Field; |
||||
import org.springframework.data.mongodb.core.mapping.MongoMappingContext; |
||||
import org.springframework.data.mongodb.core.mapping.event.AfterSaveCallback; |
||||
import org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent; |
||||
import org.springframework.data.mongodb.core.mapping.event.BeforeConvertCallback; |
||||
import org.springframework.data.mongodb.core.mapping.event.BeforeConvertEvent; |
||||
import org.springframework.data.mongodb.core.mapping.event.BeforeSaveCallback; |
||||
import org.springframework.data.mongodb.core.mapping.event.BeforeSaveEvent; |
||||
import org.springframework.data.mongodb.core.query.BasicQuery; |
||||
import org.springframework.data.mongodb.core.query.Collation; |
||||
import org.springframework.data.mongodb.core.query.Update; |
||||
|
||||
import com.mongodb.client.MongoClient; |
||||
import com.mongodb.client.MongoCollection; |
||||
import com.mongodb.client.MongoDatabase; |
||||
import com.mongodb.client.model.bulk.ClientNamespacedWriteModel; |
||||
import com.mongodb.internal.client.model.bulk.AbstractClientNamespacedWriteModel; |
||||
import com.mongodb.internal.client.model.bulk.ClientWriteModel; |
||||
import com.mongodb.internal.client.model.bulk.ConcreteClientDeleteManyModel; |
||||
import com.mongodb.internal.client.model.bulk.ConcreteClientInsertOneModel; |
||||
import com.mongodb.internal.client.model.bulk.ConcreteClientReplaceOneModel; |
||||
import com.mongodb.internal.client.model.bulk.ConcreteClientUpdateManyModel; |
||||
import com.mongodb.internal.client.model.bulk.ConcreteClientUpdateOneModel; |
||||
|
||||
/** |
||||
* Unit tests for {@link MongoTemplate}. |
||||
* |
||||
* @author Christoph Strobl |
||||
*/ |
||||
@ExtendWith(MockitoExtension.class) |
||||
@MockitoSettings(strictness = Strictness.LENIENT) |
||||
class MongoTemplateBulkUnitTests { |
||||
|
||||
private MongoTemplate template; |
||||
@Mock MongoClient client; |
||||
@Mock MongoDatabase database; |
||||
@Mock(answer = Answers.RETURNS_DEEP_STUBS) MongoCollection<Document> collection; |
||||
SimpleMongoClientDatabaseFactory factory; |
||||
@Mock DbRefResolver dbRefResolver; |
||||
|
||||
@Mock ApplicationEventPublisher eventPublisher; |
||||
@Captor ArgumentCaptor<List<ClientNamespacedWriteModel>> captor; |
||||
private MongoConverter converter; |
||||
private MongoMappingContext mappingContext; |
||||
|
||||
BeforeConvertPersonCallback beforeConvertCallback; |
||||
BeforeSavePersonCallback beforeSaveCallback; |
||||
AfterSavePersonCallback afterSaveCallback; |
||||
EntityCallbacks entityCallbacks; |
||||
|
||||
private NamespaceBoundBulkBuilder<Object> ops; |
||||
|
||||
@BeforeEach |
||||
void setUp() { |
||||
|
||||
factory = spy(new SimpleMongoClientDatabaseFactory(client, "default-db")); |
||||
when(factory.getMongoCluster()).thenReturn(client); |
||||
when(factory.getMongoDatabase()).thenReturn(database); |
||||
when(factory.getExceptionTranslator()).thenReturn(new NullExceptionTranslator()); |
||||
when(database.getCollection(anyString(), eq(Document.class))).thenReturn(collection); |
||||
when(database.getName()).thenReturn("default-db"); |
||||
|
||||
beforeConvertCallback = spy(new BeforeConvertPersonCallback()); |
||||
beforeSaveCallback = spy(new BeforeSavePersonCallback()); |
||||
afterSaveCallback = spy(new AfterSavePersonCallback()); |
||||
entityCallbacks = EntityCallbacks.create(beforeConvertCallback, beforeSaveCallback, afterSaveCallback); |
||||
|
||||
mappingContext = new MongoMappingContext(); |
||||
mappingContext.afterPropertiesSet(); |
||||
|
||||
converter = new MappingMongoConverter(dbRefResolver, mappingContext); |
||||
template = new MongoTemplate(factory, converter); |
||||
template.setApplicationEventPublisher(eventPublisher); |
||||
template.setEntityCallbacks(entityCallbacks); |
||||
|
||||
ops = Bulk.builder().inCollection("default-collection"); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void updateOneShouldUseCollationWhenPresent() { |
||||
|
||||
Bulk bulk = ops |
||||
.updateOne(new BasicQuery("{}").collation(Collation.of("de")), new Update().set("lastName", "targaryen")) |
||||
.build(); |
||||
|
||||
template.bulkWrite(bulk, BulkWriteOptions.ordered()); |
||||
|
||||
verify(client).bulkWrite(captor.capture(), any()); |
||||
|
||||
assertThat( |
||||
extractWriteModel(ConcreteClientUpdateOneModel.class, captor.getValue().get(0)).getOptions().getCollation()) |
||||
.contains(com.mongodb.client.model.Collation.builder().locale("de").build()); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void updateManyShouldUseCollationWhenPresent() { |
||||
|
||||
Bulk bulk = ops |
||||
.updateMulti(new BasicQuery("{}").collation(Collation.of("de")), new Update().set("lastName", "targaryen")) |
||||
.build(); |
||||
|
||||
template.bulkWrite(bulk, BulkWriteOptions.ordered()); |
||||
|
||||
verify(client).bulkWrite(captor.capture(), any()); |
||||
|
||||
assertThat( |
||||
extractWriteModel(ConcreteClientUpdateManyModel.class, captor.getValue().get(0)).getOptions().getCollation()) |
||||
.contains(com.mongodb.client.model.Collation.builder().locale("de").build()); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void removeShouldUseCollationWhenPresent() { |
||||
|
||||
Bulk bulk = ops.remove(new BasicQuery("{}").collation(Collation.of("de"))).build(); |
||||
|
||||
template.bulkWrite(bulk, BulkWriteOptions.ordered()); |
||||
|
||||
verify(client).bulkWrite(captor.capture(), any()); |
||||
|
||||
assertThat( |
||||
extractWriteModel(ConcreteClientDeleteManyModel.class, captor.getValue().get(0)).getOptions().getCollation()) |
||||
.contains(com.mongodb.client.model.Collation.builder().locale("de").build()); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void replaceOneShouldUseCollationWhenPresent() { |
||||
|
||||
Bulk bulk = ops.replaceOne(new BasicQuery("{}").collation(Collation.of("de")), new SomeDomainType()).build(); |
||||
|
||||
template.bulkWrite(bulk, BulkWriteOptions.ordered()); |
||||
|
||||
verify(client).bulkWrite(captor.capture(), any()); |
||||
|
||||
assertThat( |
||||
extractWriteModel(ConcreteClientReplaceOneModel.class, captor.getValue().get(0)).getOptions().getCollation()) |
||||
.contains(com.mongodb.client.model.Collation.builder().locale("de").build()); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void bulkUpdateShouldMapQueryAndUpdateCorrectly() { |
||||
|
||||
Bulk bulk = ops.inCollection("test", SomeDomainType.class) |
||||
.updateOne(query(where("firstName").is("danerys")), Update.update("firstName", "queen danerys")).build(); |
||||
|
||||
template.bulkWrite(bulk, BulkWriteOptions.ordered()); |
||||
verify(client).bulkWrite(captor.capture(), any()); |
||||
|
||||
ConcreteClientUpdateOneModel updateModel = extractWriteModel(ConcreteClientUpdateOneModel.class, |
||||
captor.getValue().get(0)); |
||||
assertThat(updateModel.getFilter()).isEqualTo(new Document("first_name", "danerys")); |
||||
assertThat(updateModel.getUpdate()).contains(new Document("$set", new Document("first_name", "queen danerys"))); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void bulkRemoveShouldMapQueryCorrectly() { |
||||
|
||||
Bulk bulk = ops.inCollection("test", SomeDomainType.class).remove(query(where("firstName").is("danerys"))).build(); |
||||
|
||||
template.bulkWrite(bulk, BulkWriteOptions.ordered()); |
||||
verify(client).bulkWrite(captor.capture(), any()); |
||||
|
||||
ConcreteClientDeleteManyModel deleteModel = extractWriteModel(ConcreteClientDeleteManyModel.class, |
||||
captor.getValue().get(0)); |
||||
assertThat(deleteModel.getFilter()).isEqualTo(new Document("first_name", "danerys")); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void bulkReplaceOneShouldMapQueryCorrectly() { |
||||
|
||||
SomeDomainType replacement = new SomeDomainType(); |
||||
replacement.firstName = "Minsu"; |
||||
replacement.lastName = "Kim"; |
||||
|
||||
Bulk bulk = ops.inCollection("test", SomeDomainType.class) |
||||
.replaceOne(query(where("firstName").is("danerys")), replacement).build(); |
||||
|
||||
template.bulkWrite(bulk, BulkWriteOptions.ordered()); |
||||
verify(client).bulkWrite(captor.capture(), any()); |
||||
|
||||
ConcreteClientReplaceOneModel replaceModel = extractWriteModel(ConcreteClientReplaceOneModel.class, |
||||
captor.getValue().get(0)); |
||||
assertThat(replaceModel.getFilter()).isEqualTo(new Document("first_name", "danerys")); |
||||
assertThat(replaceModel.getReplacement()).asInstanceOf(InstanceOfAssertFactories.map(String.class, Object.class)) |
||||
.containsEntry("first_name", "Minsu") //
|
||||
.containsEntry("lastName", "Kim"); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void bulkInsertInvokesEntityCallbacks() { |
||||
|
||||
Person entity = new Person("init"); |
||||
Bulk bulk = ops.inCollection("person").insert(entity).build(); |
||||
|
||||
template.bulkWrite(bulk, BulkWriteOptions.ordered()); |
||||
|
||||
ArgumentCaptor<Person> personArgumentCaptor = ArgumentCaptor.forClass(Person.class); |
||||
verify(beforeConvertCallback).onBeforeConvert(personArgumentCaptor.capture(), eq("person")); |
||||
verify(beforeSaveCallback).onBeforeSave(personArgumentCaptor.capture(), any(), eq("person")); |
||||
verify(afterSaveCallback).onAfterSave(personArgumentCaptor.capture(), any(), eq("person")); |
||||
assertThat(personArgumentCaptor.getAllValues()).extracting("firstName").containsExactly("init", "before-convert", |
||||
"before-save"); |
||||
verify(client).bulkWrite(captor.capture(), any()); |
||||
|
||||
ConcreteClientInsertOneModel insertModel = extractWriteModel(ConcreteClientInsertOneModel.class, |
||||
captor.getValue().get(0)); |
||||
assertThat(insertModel.getDocument()).asInstanceOf(InstanceOfAssertFactories.map(String.class, Object.class)) |
||||
.containsEntry("firstName", "after-save"); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
@SuppressWarnings("rawtypes") |
||||
void bulkReplaceOneEmitsEventsCorrectly() { |
||||
|
||||
ops.replaceOne(query(where("firstName").is("danerys")), new SomeDomainType()); |
||||
|
||||
verifyNoInteractions(eventPublisher); |
||||
|
||||
template.bulkWrite(ops.build(), BulkWriteOptions.ordered()); |
||||
|
||||
MockingDetails mockingDetails = Mockito.mockingDetails(eventPublisher); |
||||
Collection<Invocation> invocations = mockingDetails.getInvocations(); |
||||
assertThat(invocations).hasSize(3).extracting(tt -> tt.getArgument(0)).map(Object::getClass) |
||||
.containsExactly((Class) BeforeConvertEvent.class, (Class) BeforeSaveEvent.class, (Class) AfterSaveEvent.class); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
@SuppressWarnings("rawtypes") |
||||
void bulkInsertEmitsEventsCorrectly() { |
||||
|
||||
ops.insert(new SomeDomainType()); |
||||
|
||||
verify(eventPublisher, never()).publishEvent(any(BeforeConvertEvent.class)); |
||||
verify(eventPublisher, never()).publishEvent(any(BeforeSaveEvent.class)); |
||||
verify(eventPublisher, never()).publishEvent(any(AfterSaveEvent.class)); |
||||
|
||||
template.bulkWrite(ops.build(), BulkWriteOptions.ordered()); |
||||
|
||||
MockingDetails mockingDetails = Mockito.mockingDetails(eventPublisher); |
||||
Collection<Invocation> invocations = mockingDetails.getInvocations(); |
||||
assertThat(invocations).hasSize(3).extracting(tt -> tt.getArgument(0)).map(Object::getClass) |
||||
.containsExactly((Class) BeforeConvertEvent.class, (Class) BeforeSaveEvent.class, (Class) AfterSaveEvent.class); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void appliesArrayFilterWhenPresent() { |
||||
|
||||
Bulk bulk = ops.updateOne(new BasicQuery("{}"), new Update().filterArray(where("element").gte(100))).build(); |
||||
|
||||
template.bulkWrite(bulk, BulkWriteOptions.ordered()); |
||||
verify(client).bulkWrite(captor.capture(), any()); |
||||
|
||||
ConcreteClientUpdateOneModel updateModel = extractWriteModel(ConcreteClientUpdateOneModel.class, |
||||
captor.getValue().get(0)); |
||||
assertThat(updateModel.getOptions().getArrayFilters().get()).satisfies(it -> { |
||||
assertThat((List<Document>) it).containsExactly(new Document("element", new Document("$gte", 100))); |
||||
}); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void shouldRetainNestedArrayPathWithPlaceholdersForNoMatchingPaths() { |
||||
|
||||
Bulk bulk = ops.updateOne(new BasicQuery("{}"), new Update().set("items.$.documents.0.fileId", "new-id")).build(); |
||||
|
||||
template.bulkWrite(bulk, BulkWriteOptions.ordered()); |
||||
verify(client).bulkWrite(captor.capture(), any()); |
||||
|
||||
ConcreteClientUpdateOneModel updateModel = extractWriteModel(ConcreteClientUpdateOneModel.class, |
||||
captor.getValue().get(0)); |
||||
assertThat(updateModel.getUpdate()) |
||||
.contains(new Document("$set", new Document("items.$.documents.0.fileId", "new-id"))); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void shouldRetainNestedArrayPathWithPlaceholdersForMappedEntity() { |
||||
|
||||
Bulk bulk = ops.inCollection("collection-1", OrderTest.class) |
||||
.updateOne(new BasicQuery("{}"), Update.update("items.$.documents.0.fileId", "file-id")).build(); |
||||
|
||||
template.bulkWrite(bulk, BulkWriteOptions.ordered()); |
||||
verify(client).bulkWrite(captor.capture(), any()); |
||||
|
||||
ConcreteClientUpdateOneModel updateModel = extractWriteModel(ConcreteClientUpdateOneModel.class, |
||||
captor.getValue().get(0)); |
||||
assertThat(updateModel.getUpdate()) |
||||
.contains(new Document("$set", new Document("items.$.documents.0.the_file_id", "file-id"))); |
||||
} |
||||
|
||||
static <T extends ClientWriteModel> T extractWriteModel(Class<T> type, ClientNamespacedWriteModel source) { |
||||
|
||||
if (!(source instanceof AbstractClientNamespacedWriteModel cnwm)) { |
||||
throw new IllegalArgumentException("Expected AbstractClientNamespacedWriteModel, got " + source.getClass()); |
||||
} |
||||
ClientWriteModel model = cnwm.getModel(); |
||||
|
||||
return type.cast(model); |
||||
|
||||
} |
||||
|
||||
static class OrderTest { |
||||
|
||||
String id; |
||||
List<OrderTestItem> items; |
||||
} |
||||
|
||||
static class OrderTestItem { |
||||
|
||||
private String cartId; |
||||
private List<OrderTestDocument> documents; |
||||
} |
||||
|
||||
static class OrderTestDocument { |
||||
|
||||
@Field("the_file_id") private String fileId; |
||||
} |
||||
|
||||
static class SomeDomainType { |
||||
|
||||
@Id String id; |
||||
Gender gender; |
||||
@Field("first_name") String firstName; |
||||
@Field String lastName; |
||||
} |
||||
|
||||
enum Gender { |
||||
M, F |
||||
} |
||||
|
||||
static class BeforeConvertPersonCallback implements BeforeConvertCallback<Person> { |
||||
|
||||
@Override |
||||
public Person onBeforeConvert(Person entity, String collection) { |
||||
return new Person("before-convert"); |
||||
} |
||||
} |
||||
|
||||
static class BeforeSavePersonCallback implements BeforeSaveCallback<Person> { |
||||
|
||||
@Override |
||||
public Person onBeforeSave(Person entity, Document document, String collection) { |
||||
|
||||
document.put("firstName", "before-save"); |
||||
return new Person("before-save"); |
||||
} |
||||
} |
||||
|
||||
static class AfterSavePersonCallback implements AfterSaveCallback<Person> { |
||||
|
||||
@Override |
||||
public Person onAfterSave(Person entity, Document document, String collection) { |
||||
|
||||
document.put("firstName", "after-save"); |
||||
return new Person("after-save"); |
||||
} |
||||
} |
||||
|
||||
static class NullExceptionTranslator implements PersistenceExceptionTranslator { |
||||
|
||||
@Override |
||||
public DataAccessException translateExceptionIfPossible(RuntimeException ex) { |
||||
return null; |
||||
} |
||||
} |
||||
} |
||||
@ -0,0 +1,446 @@
@@ -0,0 +1,446 @@
|
||||
/* |
||||
* Copyright 2026-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb.core; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
import static org.springframework.data.mongodb.core.query.Criteria.where; |
||||
|
||||
import java.util.Arrays; |
||||
import java.util.List; |
||||
|
||||
import org.bson.Document; |
||||
import org.junit.jupiter.api.BeforeEach; |
||||
import org.junit.jupiter.api.Test; |
||||
import org.springframework.data.mongodb.core.bulk.Bulk; |
||||
import org.springframework.data.mongodb.core.bulk.BulkWriteOptions; |
||||
import org.springframework.data.mongodb.core.query.Query; |
||||
import org.springframework.data.mongodb.core.query.Update; |
||||
import org.springframework.data.mongodb.core.query.UpdateDefinition; |
||||
import org.springframework.data.mongodb.test.util.Client; |
||||
import org.springframework.data.mongodb.test.util.EnableIfMongoServerVersion; |
||||
import org.springframework.data.mongodb.test.util.ReactiveMongoTestTemplate; |
||||
import org.springframework.data.mongodb.test.util.Template; |
||||
import org.springframework.data.util.Pair; |
||||
|
||||
import com.mongodb.ClientBulkWriteException; |
||||
import com.mongodb.reactivestreams.client.MongoClient; |
||||
import com.mongodb.reactivestreams.client.MongoCollection; |
||||
|
||||
import reactor.core.publisher.Mono; |
||||
import reactor.test.StepVerifier; |
||||
|
||||
/** |
||||
* Reactive integration tests for {@link ReactiveMongoOperations#bulkWrite}. |
||||
* |
||||
* @author Christoph Strobl |
||||
*/ |
||||
@EnableIfMongoServerVersion(isGreaterThanEqual = "8.0") |
||||
public class ReactiveMongoTemplateBulkTests { |
||||
|
||||
@Client static MongoClient mongoClient; |
||||
|
||||
@Template(initialEntitySet = { BaseDoc.class, SpecialDoc.class }) static ReactiveMongoTestTemplate operations; |
||||
|
||||
@BeforeEach |
||||
void setUp() { |
||||
operations.flushDatabase().block(); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void bulkWriteMultipleCollections() { |
||||
|
||||
operations.flushDatabase().block(); |
||||
|
||||
BaseDoc doc1 = new BaseDoc(); |
||||
doc1.id = "id-doc1"; |
||||
doc1.value = "value-doc1"; |
||||
BaseDoc doc2 = new BaseDoc(); |
||||
doc2.id = "id-doc2"; |
||||
doc2.value = "value-doc2"; |
||||
|
||||
Bulk bulk = Bulk.builder().inCollection(BaseDoc.class, |
||||
ops -> ops.insert(doc1).insert(doc2).upsert(where("_id").is("id-doc3"), new Update().set("value", "upserted"))) |
||||
.inCollection(SpecialDoc.class).insert(new SpecialDoc()).build(); |
||||
|
||||
operations.bulkWrite(bulk, BulkWriteOptions.ordered()).as(StepVerifier::create).expectNextCount(1).verifyComplete(); |
||||
|
||||
operations.execute(BaseDoc.class, MongoCollection::countDocuments).as(StepVerifier::create).expectNext(3L) |
||||
.verifyComplete(); |
||||
operations.execute(SpecialDoc.class, MongoCollection::countDocuments).as(StepVerifier::create).expectNext(1L) |
||||
.verifyComplete(); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void insertOrderedAcrossCollections() { |
||||
|
||||
BaseDoc doc1 = newDoc("1"); |
||||
BaseDoc doc2 = newDoc("2"); |
||||
SpecialDoc specialDoc = new SpecialDoc(); |
||||
specialDoc.id = "id-special"; |
||||
specialDoc.value = "value-special"; |
||||
|
||||
Bulk bulk = Bulk.builder().inCollection(BaseDoc.class, o -> o.insert(doc1).insert(doc2)) |
||||
.inCollection(SpecialDoc.class).insert(specialDoc).build(); |
||||
operations.bulkWrite(bulk, BulkWriteOptions.ordered()).as(StepVerifier::create) |
||||
.expectNextMatches(result -> result.insertCount() == 3).verifyComplete(); |
||||
|
||||
operations.execute(BaseDoc.class, MongoCollection::countDocuments).as(StepVerifier::create).expectNext(2L) |
||||
.verifyComplete(); |
||||
operations.execute(SpecialDoc.class, MongoCollection::countDocuments).as(StepVerifier::create).expectNext(1L) |
||||
.verifyComplete(); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void insertOrderedFailsStopsAtDuplicateInCollection() { |
||||
|
||||
BaseDoc doc1 = newDoc("1"); |
||||
Bulk bulk = Bulk.builder().inCollection(BaseDoc.class, o -> o.insert(doc1).insert(doc1)) |
||||
.inCollection(SpecialDoc.class).insert(new SpecialDoc()).build(); |
||||
|
||||
StepVerifier.create(operations.bulkWrite(bulk, BulkWriteOptions.ordered())).expectErrorMatches(throwable -> { |
||||
if (!(throwable.getCause() instanceof ClientBulkWriteException)) |
||||
return false; |
||||
ClientBulkWriteException ex = (ClientBulkWriteException) throwable.getCause(); |
||||
assertThat(ex.getPartialResult().get().getInsertedCount()).isOne(); |
||||
assertThat(ex.getWriteErrors()).isNotNull(); |
||||
assertThat(ex.getWriteErrors().size()).isOne(); |
||||
return true; |
||||
}).verify(); |
||||
|
||||
operations.execute(BaseDoc.class, MongoCollection::countDocuments).as(StepVerifier::create).expectNext(1L) |
||||
.verifyComplete(); |
||||
operations.execute(SpecialDoc.class, MongoCollection::countDocuments).as(StepVerifier::create).expectNext(0L) |
||||
.verifyComplete(); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void insertUnOrderedAcrossCollections() { |
||||
|
||||
BaseDoc doc1 = newDoc("1"); |
||||
BaseDoc doc2 = newDoc("2"); |
||||
SpecialDoc specialDoc = new SpecialDoc(); |
||||
specialDoc.id = "id-special"; |
||||
|
||||
Bulk bulk = Bulk.builder().inCollection(BaseDoc.class, o -> o.insert(doc1).insert(doc2)) |
||||
.inCollection(SpecialDoc.class).insert(specialDoc).build(); |
||||
operations.bulkWrite(bulk, BulkWriteOptions.unordered()).as(StepVerifier::create).expectNextMatches(result -> result.insertCount() == 3) |
||||
.verifyComplete(); |
||||
|
||||
operations.execute(BaseDoc.class, MongoCollection::countDocuments).as(StepVerifier::create).expectNext(2L) |
||||
.verifyComplete(); |
||||
operations.execute(SpecialDoc.class, MongoCollection::countDocuments).as(StepVerifier::create).expectNext(1L) |
||||
.verifyComplete(); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void insertUnOrderedContinuesOnErrorInOneCollection() { |
||||
|
||||
BaseDoc doc1 = newDoc("1"); |
||||
Bulk bulk = Bulk.builder().inCollection(BaseDoc.class, o -> o.insert(doc1).insert(doc1)) |
||||
.inCollection(SpecialDoc.class).insert(new SpecialDoc()).build(); |
||||
|
||||
StepVerifier.create(operations.bulkWrite(bulk, BulkWriteOptions.unordered())).expectErrorMatches(throwable -> { |
||||
if (!(throwable.getCause() instanceof ClientBulkWriteException)) |
||||
return false; |
||||
ClientBulkWriteException ex = (ClientBulkWriteException) throwable.getCause(); |
||||
assertThat(ex.getPartialResult().get().getInsertedCount()).isEqualTo(2); |
||||
assertThat(ex.getWriteErrors()).isNotNull(); |
||||
assertThat(ex.getWriteErrors().size()).isOne(); |
||||
return true; |
||||
}).verify(); |
||||
|
||||
operations.execute(BaseDoc.class, MongoCollection::countDocuments).as(StepVerifier::create).expectNext(1L) |
||||
.verifyComplete(); |
||||
operations.execute(SpecialDoc.class, MongoCollection::countDocuments).as(StepVerifier::create).expectNext(1L) |
||||
.verifyComplete(); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void updateOneAcrossCollections() { |
||||
|
||||
insertSomeDocumentsIntoBaseDoc(); |
||||
insertSomeDocumentsIntoSpecialDoc(); |
||||
|
||||
List<Pair<Query, UpdateDefinition>> updatesBase = Arrays |
||||
.asList(Pair.of(queryWhere("value", "value1"), set("value", "value3"))); |
||||
List<Pair<Query, UpdateDefinition>> updatesSpecial = Arrays |
||||
.asList(Pair.of(queryWhere("value", "value1"), set("value", "value3"))); |
||||
|
||||
Bulk bulk = Bulk.builder() |
||||
.inCollection(BaseDoc.class, o -> updatesBase.forEach(p -> o.updateOne(p.getFirst(), p.getSecond()))) |
||||
.inCollection(SpecialDoc.class, o -> updatesSpecial.forEach(p -> o.updateOne(p.getFirst(), p.getSecond()))) |
||||
.build(); |
||||
operations.bulkWrite(bulk, BulkWriteOptions.ordered()).as(StepVerifier::create) |
||||
.expectNextMatches(result -> result.modifiedCount() == 2).verifyComplete(); |
||||
|
||||
operations.execute(BaseDoc.class, col -> col.countDocuments(new Document("value", "value3"))) |
||||
.as(StepVerifier::create).expectNext(1L).verifyComplete(); |
||||
operations.execute(SpecialDoc.class, col -> col.countDocuments(new Document("value", "value3"))) |
||||
.as(StepVerifier::create).expectNext(1L).verifyComplete(); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void updateMultiAcrossCollections() { |
||||
|
||||
insertSomeDocumentsIntoBaseDoc(); |
||||
insertSomeDocumentsIntoSpecialDoc(); |
||||
|
||||
List<Pair<Query, UpdateDefinition>> updatesBase = Arrays.asList( |
||||
Pair.of(queryWhere("value", "value1"), set("value", "value3")), |
||||
Pair.of(queryWhere("value", "value2"), set("value", "value4"))); |
||||
List<Pair<Query, UpdateDefinition>> updatesSpecial = Arrays.asList( |
||||
Pair.of(queryWhere("value", "value1"), set("value", "value3")), |
||||
Pair.of(queryWhere("value", "value2"), set("value", "value4"))); |
||||
|
||||
Bulk bulk = Bulk.builder() |
||||
.inCollection(BaseDoc.class, o -> updatesBase.forEach(p -> o.updateMulti(p.getFirst(), p.getSecond()))) |
||||
.inCollection(SpecialDoc.class, o -> updatesSpecial.forEach(p -> o.updateMulti(p.getFirst(), p.getSecond()))) |
||||
.build(); |
||||
operations.bulkWrite(bulk, BulkWriteOptions.ordered()).as(StepVerifier::create) |
||||
.expectNextMatches(result -> result.modifiedCount() == 8).verifyComplete(); |
||||
|
||||
operations.execute(BaseDoc.class, col -> col.countDocuments(new Document("value", "value3"))) |
||||
.as(StepVerifier::create).expectNext(2L).verifyComplete(); |
||||
operations.execute(BaseDoc.class, col -> col.countDocuments(new Document("value", "value4"))) |
||||
.as(StepVerifier::create).expectNext(2L).verifyComplete(); |
||||
operations.execute(SpecialDoc.class, col -> col.countDocuments(new Document("value", "value3"))) |
||||
.as(StepVerifier::create).expectNext(2L).verifyComplete(); |
||||
operations.execute(SpecialDoc.class, col -> col.countDocuments(new Document("value", "value4"))) |
||||
.as(StepVerifier::create).expectNext(2L).verifyComplete(); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void upsertDoesUpdateInEachCollection() { |
||||
|
||||
insertSomeDocumentsIntoBaseDoc(); |
||||
insertSomeDocumentsIntoSpecialDoc(); |
||||
|
||||
Bulk bulk = Bulk.builder() |
||||
.inCollection(BaseDoc.class, o -> o.upsert(queryWhere("value", "value1"), set("value", "value2"))) |
||||
.inCollection(SpecialDoc.class, o -> o.upsert(queryWhere("value", "value1"), set("value", "value2"))).build(); |
||||
operations.bulkWrite(bulk, BulkWriteOptions.ordered()).as(StepVerifier::create) |
||||
.expectNextMatches(result -> result.matchedCount() == 4 && result.modifiedCount() == 4 |
||||
&& result.insertCount() == 0 && result.upsertCount() == 0) |
||||
.verifyComplete(); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void upsertDoesInsertInEachCollection() { |
||||
|
||||
Bulk bulk = Bulk.builder() |
||||
.inCollection(BaseDoc.class, o -> o.upsert(queryWhere("_id", "new-id-1"), set("value", "upserted1"))) |
||||
.inCollection(SpecialDoc.class, o -> o.upsert(queryWhere("_id", "new-id-2"), set("value", "upserted2"))) |
||||
.build(); |
||||
operations.bulkWrite(bulk, BulkWriteOptions.ordered()).as(StepVerifier::create) |
||||
.expectNextMatches( |
||||
result -> result.matchedCount() == 0 && result.modifiedCount() == 0 && result.upsertCount() == 2) |
||||
.verifyComplete(); |
||||
|
||||
operations.findOne(queryWhere("_id", "new-id-1"), BaseDoc.class).as(StepVerifier::create) |
||||
.expectNextMatches(doc -> doc != null).verifyComplete(); |
||||
operations.findOne(queryWhere("_id", "new-id-2"), SpecialDoc.class).as(StepVerifier::create) |
||||
.expectNextMatches(doc -> doc != null).verifyComplete(); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void removeAcrossCollections() { |
||||
|
||||
insertSomeDocumentsIntoBaseDoc(); |
||||
insertSomeDocumentsIntoSpecialDoc(); |
||||
|
||||
List<Query> removesBase = Arrays.asList(queryWhere("_id", "1"), queryWhere("value", "value2")); |
||||
List<Query> removesSpecial = Arrays.asList(queryWhere("_id", "1"), queryWhere("value", "value2")); |
||||
|
||||
Bulk bulk = Bulk.builder().inCollection(BaseDoc.class, o -> removesBase.forEach(o::remove)) |
||||
.inCollection(SpecialDoc.class, o -> removesSpecial.forEach(o::remove)).build(); |
||||
operations.bulkWrite(bulk, BulkWriteOptions.ordered()).as(StepVerifier::create) |
||||
.expectNextMatches(result -> result.deleteCount() == 6).verifyComplete(); |
||||
|
||||
operations.execute(BaseDoc.class, MongoCollection::countDocuments).as(StepVerifier::create).expectNext(1L) |
||||
.verifyComplete(); |
||||
operations.execute(SpecialDoc.class, MongoCollection::countDocuments).as(StepVerifier::create).expectNext(1L) |
||||
.verifyComplete(); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void replaceOneAcrossCollections() { |
||||
|
||||
insertSomeDocumentsIntoBaseDoc(); |
||||
insertSomeDocumentsIntoSpecialDoc(); |
||||
|
||||
Document replacementBase = rawDoc("1", "replaced-base"); |
||||
Document replacementSpecial = new Document("_id", "1").append("value", "replaced-special").append("specialValue", |
||||
"special"); |
||||
|
||||
Bulk bulk = Bulk.builder().inCollection(BaseDoc.class, o -> o.replaceOne(queryWhere("_id", "1"), replacementBase)) |
||||
.inCollection(SpecialDoc.class, o -> o.replaceOne(queryWhere("_id", "1"), replacementSpecial)).build(); |
||||
operations.bulkWrite(bulk, BulkWriteOptions.ordered()).as(StepVerifier::create) |
||||
.expectNextMatches(result -> result.matchedCount() == 2 && result.modifiedCount() == 2).verifyComplete(); |
||||
|
||||
operations.execute(BaseDoc.class, col -> Mono.from(col.find(new Document("_id", "1")).first())) |
||||
.as(StepVerifier::create) |
||||
.expectNextMatches(inBase -> inBase != null && "replaced-base".equals(inBase.get("value"))).verifyComplete(); |
||||
operations.execute(SpecialDoc.class, col -> Mono.from(col.find(new Document("_id", "1")).first())) |
||||
.as(StepVerifier::create).expectNextMatches(inSpecial -> inSpecial != null |
||||
&& "replaced-special".equals(inSpecial.get("value")) && "special".equals(inSpecial.get("specialValue"))) |
||||
.verifyComplete(); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void replaceOneWithUpsertInCollection() { |
||||
|
||||
Document replacement = rawDoc("new-id", "upserted-value"); |
||||
|
||||
Bulk bulk = Bulk.builder().inCollection(BaseDoc.class, o -> o.replaceOne(queryWhere("_id", "new-id"), replacement)) |
||||
.build(); |
||||
operations.bulkWrite(bulk, BulkWriteOptions.ordered()).as(StepVerifier::create) |
||||
.expectNextMatches( |
||||
result -> result.matchedCount() == 0 && result.modifiedCount() == 0 && result.upsertCount() == 1) |
||||
.verifyComplete(); |
||||
|
||||
operations.findOne(queryWhere("_id", "new-id"), BaseDoc.class).as(StepVerifier::create) |
||||
.expectNextMatches(doc -> doc != null).verifyComplete(); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void mixedBulkOrderedAcrossCollections() { |
||||
|
||||
BaseDoc doc1 = newDoc("1", "v1"); |
||||
SpecialDoc doc2 = new SpecialDoc(); |
||||
doc2.id = "2"; |
||||
doc2.value = "v2"; |
||||
|
||||
Bulk bulk = Bulk.builder() |
||||
.inCollection(BaseDoc.class, |
||||
o -> o.insert(doc1).updateOne(queryWhere("_id", "1"), set("value", "v2")).remove(queryWhere("value", "v2"))) |
||||
.inCollection(SpecialDoc.class).insert(doc2).build(); |
||||
operations.bulkWrite(bulk, BulkWriteOptions.ordered()).as(StepVerifier::create) |
||||
.expectNextMatches( |
||||
result -> result.insertCount() == 2 && result.modifiedCount() == 1 && result.deleteCount() == 1) |
||||
.verifyComplete(); |
||||
|
||||
operations.execute(BaseDoc.class, MongoCollection::countDocuments).as(StepVerifier::create).expectNext(0L) |
||||
.verifyComplete(); |
||||
operations.execute(SpecialDoc.class, MongoCollection::countDocuments).as(StepVerifier::create).expectNext(1L) |
||||
.verifyComplete(); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void mixedBulkOrderedWithListAcrossCollections() { |
||||
|
||||
List<BaseDoc> insertsBase = Arrays.asList(newDoc("1", "v1"), newDoc("2", "v2"), newDoc("3", "v2")); |
||||
List<Pair<Query, UpdateDefinition>> updatesBase = Arrays |
||||
.asList(Pair.of(queryWhere("value", "v2"), set("value", "v3"))); |
||||
List<Query> removesBase = Arrays.asList(queryWhere("_id", "1")); |
||||
|
||||
SpecialDoc specialDoc = new SpecialDoc(); |
||||
specialDoc.id = "s1"; |
||||
specialDoc.value = "sv1"; |
||||
|
||||
Bulk bulk = Bulk.builder().inCollection(BaseDoc.class, o -> { |
||||
o.insertAll(insertsBase); |
||||
updatesBase.forEach(p -> o.updateMulti(p.getFirst(), p.getSecond())); |
||||
removesBase.forEach(o::remove); |
||||
}).inCollection(SpecialDoc.class).insert(specialDoc).build(); |
||||
operations.bulkWrite(bulk, BulkWriteOptions.ordered()).as(StepVerifier::create) |
||||
.expectNextMatches( |
||||
result -> result.insertCount() == 4 && result.modifiedCount() == 2 && result.deleteCount() == 1) |
||||
.verifyComplete(); |
||||
|
||||
operations.execute(BaseDoc.class, MongoCollection::countDocuments).as(StepVerifier::create).expectNext(2L) |
||||
.verifyComplete(); |
||||
operations.execute(SpecialDoc.class, MongoCollection::countDocuments).as(StepVerifier::create).expectNext(1L) |
||||
.verifyComplete(); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void insertShouldConsiderInheritancePerCollection() { |
||||
|
||||
SpecialDoc specialDoc = new SpecialDoc(); |
||||
specialDoc.id = "id-special"; |
||||
specialDoc.value = "normal-value"; |
||||
specialDoc.specialValue = "special-value"; |
||||
|
||||
Bulk bulk = Bulk.builder().inCollection(SpecialDoc.class).insert(specialDoc).build(); |
||||
operations.bulkWrite(bulk, BulkWriteOptions.ordered()).as(StepVerifier::create).expectNextCount(1).verifyComplete(); |
||||
|
||||
operations.findOne(queryWhere("_id", specialDoc.id), BaseDoc.class, operations.getCollectionName(SpecialDoc.class)) |
||||
.as(StepVerifier::create).expectNextMatches(doc -> doc != null && doc instanceof SpecialDoc).verifyComplete(); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void switchingDatabasesBackAndForth() { |
||||
|
||||
String dbName = operations.getMongoDatabase().map(db -> db.getName()).block(); |
||||
Mono.from(mongoClient.getDatabase(dbName).drop()).block(); |
||||
Mono.from(mongoClient.getDatabase("bulk-ops-db-2").drop()).block(); |
||||
Mono.from(mongoClient.getDatabase("bulk-ops-db-3").drop()).block(); |
||||
|
||||
Bulk bulk = Bulk.builder().inCollection("c1").insert(newDoc("c1-id-1", "v1")).build(); |
||||
operations.bulkWrite(bulk, BulkWriteOptions.ordered()).as(StepVerifier::create).expectNextCount(1).verifyComplete(); |
||||
Mono.from(mongoClient.getDatabase("bulk-ops-db-2").getCollection("c1").insertOne(rawDoc("c1-id-1", "v1"))).block(); |
||||
Mono.from(mongoClient.getDatabase("bulk-ops-db-3").getCollection("c1").insertOne(rawDoc("c1-id-1", "v1"))).block(); |
||||
|
||||
operations.execute("c1", col -> Mono.from(col.find(new Document("_id", "c1-id-1")).first())) |
||||
.as(StepVerifier::create).expectNextMatches(doc -> doc != null).verifyComplete(); |
||||
Mono.from(mongoClient.getDatabase("bulk-ops-db-2").getCollection("c1").find(new Document("_id", "c1-id-1")).first()) |
||||
.as(StepVerifier::create).expectNextMatches(doc -> doc != null).verifyComplete(); |
||||
Mono.from(mongoClient.getDatabase("bulk-ops-db-3").getCollection("c1").find(new Document("_id", "c1-id-1")).first()) |
||||
.as(StepVerifier::create).expectNextMatches(doc -> doc != null).verifyComplete(); |
||||
} |
||||
|
||||
private void insertSomeDocumentsIntoBaseDoc() { |
||||
String coll = operations.getCollectionName(BaseDoc.class); |
||||
operations.execute(coll, |
||||
col -> Mono.from(col.insertOne(rawDoc("1", "value1"))).then(Mono.from(col.insertOne(rawDoc("2", "value1")))) |
||||
.then(Mono.from(col.insertOne(rawDoc("3", "value2")))) |
||||
.then(Mono.from(col.insertOne(rawDoc("4", "value2"))))) |
||||
.then().block(); |
||||
} |
||||
|
||||
private void insertSomeDocumentsIntoSpecialDoc() { |
||||
String coll = operations.getCollectionName(SpecialDoc.class); |
||||
operations.execute(coll, |
||||
col -> Mono.from(col.insertOne(rawDoc("1", "value1"))).then(Mono.from(col.insertOne(rawDoc("2", "value1")))) |
||||
.then(Mono.from(col.insertOne(rawDoc("3", "value2")))) |
||||
.then(Mono.from(col.insertOne(rawDoc("4", "value2"))))) |
||||
.then().block(); |
||||
} |
||||
|
||||
private static BaseDoc newDoc(String id) { |
||||
BaseDoc doc = new BaseDoc(); |
||||
doc.id = id; |
||||
return doc; |
||||
} |
||||
|
||||
private static BaseDoc newDoc(String id, String value) { |
||||
BaseDoc doc = newDoc(id); |
||||
doc.value = value; |
||||
return doc; |
||||
} |
||||
|
||||
private static Query queryWhere(String field, String value) { |
||||
return new Query(org.springframework.data.mongodb.core.query.Criteria.where(field).is(value)); |
||||
} |
||||
|
||||
private static Update set(String field, String value) { |
||||
return new Update().set(field, value); |
||||
} |
||||
|
||||
private static Document rawDoc(String id, String value) { |
||||
return new Document("_id", id).append("value", value); |
||||
} |
||||
} |
||||
@ -0,0 +1,425 @@
@@ -0,0 +1,425 @@
|
||||
/* |
||||
* Copyright 2026-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb.core; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
import static org.mockito.ArgumentMatchers.any; |
||||
import static org.mockito.ArgumentMatchers.anyList; |
||||
import static org.mockito.ArgumentMatchers.anyString; |
||||
import static org.mockito.ArgumentMatchers.eq; |
||||
import static org.mockito.Mockito.never; |
||||
import static org.mockito.Mockito.spy; |
||||
import static org.mockito.Mockito.verify; |
||||
import static org.mockito.Mockito.verifyNoInteractions; |
||||
import static org.mockito.Mockito.when; |
||||
import static org.springframework.data.mongodb.core.query.Criteria.where; |
||||
import static org.springframework.data.mongodb.core.query.Query.query; |
||||
|
||||
import java.util.Collection; |
||||
import java.util.List; |
||||
|
||||
import org.assertj.core.api.InstanceOfAssertFactories; |
||||
import org.bson.Document; |
||||
import org.junit.jupiter.api.BeforeEach; |
||||
import org.junit.jupiter.api.Test; |
||||
import org.junit.jupiter.api.extension.ExtendWith; |
||||
import org.mockito.Answers; |
||||
import org.mockito.ArgumentCaptor; |
||||
import org.mockito.Captor; |
||||
import org.mockito.Mock; |
||||
import org.mockito.MockingDetails; |
||||
import org.mockito.Mockito; |
||||
import org.mockito.invocation.Invocation; |
||||
import org.mockito.junit.jupiter.MockitoExtension; |
||||
import org.mockito.junit.jupiter.MockitoSettings; |
||||
import org.mockito.quality.Strictness; |
||||
import org.springframework.context.ApplicationContext; |
||||
import org.springframework.dao.DataAccessException; |
||||
import org.springframework.dao.support.PersistenceExceptionTranslator; |
||||
import org.springframework.data.annotation.Id; |
||||
import org.springframework.data.mapping.callback.ReactiveEntityCallbacks; |
||||
import org.springframework.data.mongodb.core.bulk.Bulk; |
||||
import org.springframework.data.mongodb.core.bulk.Bulk.NamespaceBoundBulkBuilder; |
||||
import org.springframework.data.mongodb.core.bulk.BulkWriteOptions; |
||||
import org.springframework.data.mongodb.core.convert.DbRefResolver; |
||||
import org.springframework.data.mongodb.core.convert.MappingMongoConverter; |
||||
import org.springframework.data.mongodb.core.convert.MongoConverter; |
||||
import org.springframework.data.mongodb.core.mapping.Field; |
||||
import org.springframework.data.mongodb.core.mapping.MongoMappingContext; |
||||
import org.springframework.data.mongodb.core.mapping.event.ReactiveAfterSaveCallback; |
||||
import org.springframework.data.mongodb.core.mapping.event.ReactiveBeforeConvertCallback; |
||||
import org.springframework.data.mongodb.core.mapping.event.ReactiveBeforeSaveCallback; |
||||
import org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent; |
||||
import org.springframework.data.mongodb.core.mapping.event.BeforeConvertEvent; |
||||
import org.springframework.data.mongodb.core.mapping.event.BeforeSaveEvent; |
||||
import org.springframework.data.mongodb.core.query.BasicQuery; |
||||
import org.springframework.data.mongodb.core.query.Collation; |
||||
import org.springframework.data.mongodb.core.query.Update; |
||||
|
||||
import com.mongodb.client.model.bulk.ClientBulkWriteResult; |
||||
import com.mongodb.client.model.bulk.ClientNamespacedWriteModel; |
||||
import com.mongodb.internal.client.model.bulk.AbstractClientNamespacedWriteModel; |
||||
import com.mongodb.internal.client.model.bulk.ClientWriteModel; |
||||
import com.mongodb.internal.client.model.bulk.ConcreteClientDeleteManyModel; |
||||
import com.mongodb.internal.client.model.bulk.ConcreteClientInsertOneModel; |
||||
import com.mongodb.internal.client.model.bulk.ConcreteClientReplaceOneModel; |
||||
import com.mongodb.internal.client.model.bulk.ConcreteClientUpdateManyModel; |
||||
import com.mongodb.internal.client.model.bulk.ConcreteClientUpdateOneModel; |
||||
import com.mongodb.reactivestreams.client.MongoClient; |
||||
import com.mongodb.reactivestreams.client.MongoCollection; |
||||
import com.mongodb.reactivestreams.client.MongoDatabase; |
||||
|
||||
import reactor.core.publisher.Mono; |
||||
|
||||
/** |
||||
* Unit tests for {@link ReactiveMongoOperations#bulkWrite}. |
||||
* |
||||
* @author Christoph Strobl |
||||
*/ |
||||
@ExtendWith(MockitoExtension.class) |
||||
@MockitoSettings(strictness = Strictness.LENIENT) |
||||
class ReactiveMongoTemplateBulkUnitTests { |
||||
|
||||
private ReactiveMongoTemplate template; |
||||
@Mock MongoClient client; |
||||
@Mock MongoDatabase database; |
||||
@Mock(answer = Answers.RETURNS_DEEP_STUBS) MongoCollection<Document> collection; |
||||
SimpleReactiveMongoDatabaseFactory factory; |
||||
@Mock DbRefResolver dbRefResolver; |
||||
|
||||
@Mock ApplicationContext applicationContext; |
||||
@Captor ArgumentCaptor<List<ClientNamespacedWriteModel>> captor; |
||||
private MongoConverter converter; |
||||
private MongoMappingContext mappingContext; |
||||
|
||||
ReactiveBeforeConvertPersonCallback beforeConvertCallback; |
||||
ReactiveBeforeSavePersonCallback beforeSaveCallback; |
||||
ReactiveAfterSavePersonCallback afterSaveCallback; |
||||
ReactiveEntityCallbacks entityCallbacks; |
||||
|
||||
private NamespaceBoundBulkBuilder<Object> ops; |
||||
|
||||
@BeforeEach |
||||
void setUp() { |
||||
|
||||
factory = spy(new SimpleReactiveMongoDatabaseFactory(client, "default-db")); |
||||
when(factory.getMongoCluster()).thenReturn(client); |
||||
when(factory.getMongoDatabase()).thenReturn(Mono.just(database)); |
||||
when(factory.getExceptionTranslator()).thenReturn(new NullExceptionTranslator()); |
||||
when(database.getCollection(anyString(), eq(Document.class))).thenReturn(collection); |
||||
when(database.getName()).thenReturn("default-db"); |
||||
when(client.bulkWrite(anyList(), any())).thenReturn(Mono.just(Mockito.mock(ClientBulkWriteResult.class))); |
||||
|
||||
beforeConvertCallback = spy(new ReactiveBeforeConvertPersonCallback()); |
||||
beforeSaveCallback = spy(new ReactiveBeforeSavePersonCallback()); |
||||
afterSaveCallback = spy(new ReactiveAfterSavePersonCallback()); |
||||
entityCallbacks = ReactiveEntityCallbacks.create(beforeConvertCallback, beforeSaveCallback, afterSaveCallback); |
||||
|
||||
mappingContext = new MongoMappingContext(); |
||||
mappingContext.afterPropertiesSet(); |
||||
|
||||
converter = new MappingMongoConverter(dbRefResolver, mappingContext); |
||||
template = new ReactiveMongoTemplate(factory, converter); |
||||
template.setApplicationEventPublisher(applicationContext); |
||||
template.setEntityCallbacks(entityCallbacks); |
||||
|
||||
ops = Bulk.builder().inCollection("default-collection"); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void updateOneShouldUseCollationWhenPresent() { |
||||
|
||||
Bulk bulk = ops |
||||
.updateOne(new BasicQuery("{}").collation(Collation.of("de")), new Update().set("lastName", "targaryen")) |
||||
.build(); |
||||
|
||||
template.bulkWrite(bulk, BulkWriteOptions.ordered()).block(); |
||||
|
||||
verify(client).bulkWrite(captor.capture(), any()); |
||||
|
||||
assertThat( |
||||
extractWriteModel(ConcreteClientUpdateOneModel.class, captor.getValue().get(0)).getOptions().getCollation()) |
||||
.contains(com.mongodb.client.model.Collation.builder().locale("de").build()); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void updateManyShouldUseCollationWhenPresent() { |
||||
|
||||
Bulk bulk = ops |
||||
.updateMulti(new BasicQuery("{}").collation(Collation.of("de")), new Update().set("lastName", "targaryen")) |
||||
.build(); |
||||
|
||||
template.bulkWrite(bulk, BulkWriteOptions.ordered()).block(); |
||||
|
||||
verify(client).bulkWrite(captor.capture(), any()); |
||||
|
||||
assertThat( |
||||
extractWriteModel(ConcreteClientUpdateManyModel.class, captor.getValue().get(0)).getOptions().getCollation()) |
||||
.contains(com.mongodb.client.model.Collation.builder().locale("de").build()); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void removeShouldUseCollationWhenPresent() { |
||||
|
||||
Bulk bulk = ops.remove(new BasicQuery("{}").collation(Collation.of("de"))).build(); |
||||
|
||||
template.bulkWrite(bulk, BulkWriteOptions.ordered()).block(); |
||||
|
||||
verify(client).bulkWrite(captor.capture(), any()); |
||||
|
||||
assertThat( |
||||
extractWriteModel(ConcreteClientDeleteManyModel.class, captor.getValue().get(0)).getOptions().getCollation()) |
||||
.contains(com.mongodb.client.model.Collation.builder().locale("de").build()); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void replaceOneShouldUseCollationWhenPresent() { |
||||
|
||||
Bulk bulk = ops.replaceOne(new BasicQuery("{}").collation(Collation.of("de")), new SomeDomainType()).build(); |
||||
|
||||
template.bulkWrite(bulk, BulkWriteOptions.ordered()).block(); |
||||
|
||||
verify(client).bulkWrite(captor.capture(), any()); |
||||
|
||||
assertThat( |
||||
extractWriteModel(ConcreteClientReplaceOneModel.class, captor.getValue().get(0)).getOptions().getCollation()) |
||||
.contains(com.mongodb.client.model.Collation.builder().locale("de").build()); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void bulkUpdateShouldMapQueryAndUpdateCorrectly() { |
||||
|
||||
Bulk bulk = ops.inCollection("test", SomeDomainType.class) |
||||
.updateOne(query(where("firstName").is("danerys")), Update.update("firstName", "queen danerys")).build(); |
||||
|
||||
template.bulkWrite(bulk, BulkWriteOptions.ordered()).block(); |
||||
verify(client).bulkWrite(captor.capture(), any()); |
||||
|
||||
ConcreteClientUpdateOneModel updateModel = extractWriteModel(ConcreteClientUpdateOneModel.class, |
||||
captor.getValue().get(0)); |
||||
assertThat(updateModel.getFilter()).isEqualTo(new Document("first_name", "danerys")); |
||||
assertThat(updateModel.getUpdate()).contains(new Document("$set", new Document("first_name", "queen danerys"))); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void bulkRemoveShouldMapQueryCorrectly() { |
||||
|
||||
Bulk bulk = ops.inCollection("test", SomeDomainType.class).remove(query(where("firstName").is("danerys"))).build(); |
||||
|
||||
template.bulkWrite(bulk, BulkWriteOptions.ordered()).block(); |
||||
verify(client).bulkWrite(captor.capture(), any()); |
||||
|
||||
ConcreteClientDeleteManyModel deleteModel = extractWriteModel(ConcreteClientDeleteManyModel.class, |
||||
captor.getValue().get(0)); |
||||
assertThat(deleteModel.getFilter()).isEqualTo(new Document("first_name", "danerys")); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void bulkReplaceOneShouldMapQueryCorrectly() { |
||||
|
||||
SomeDomainType replacement = new SomeDomainType(); |
||||
replacement.firstName = "Minsu"; |
||||
replacement.lastName = "Kim"; |
||||
|
||||
Bulk bulk = ops.inCollection("test", SomeDomainType.class) |
||||
.replaceOne(query(where("firstName").is("danerys")), replacement).build(); |
||||
|
||||
template.bulkWrite(bulk, BulkWriteOptions.ordered()).block(); |
||||
verify(client).bulkWrite(captor.capture(), any()); |
||||
|
||||
ConcreteClientReplaceOneModel replaceModel = extractWriteModel(ConcreteClientReplaceOneModel.class, |
||||
captor.getValue().get(0)); |
||||
assertThat(replaceModel.getFilter()).isEqualTo(new Document("first_name", "danerys")); |
||||
assertThat(replaceModel.getReplacement()).asInstanceOf(InstanceOfAssertFactories.map(String.class, Object.class)) |
||||
.containsEntry("first_name", "Minsu") |
||||
.containsEntry("lastName", "Kim"); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void bulkInsertInvokesEntityCallbacks() { |
||||
|
||||
Person entity = new Person("init"); |
||||
Bulk bulk = ops.inCollection("person").insert(entity).build(); |
||||
|
||||
template.bulkWrite(bulk, BulkWriteOptions.ordered()).block(); |
||||
|
||||
ArgumentCaptor<Person> personArgumentCaptor = ArgumentCaptor.forClass(Person.class); |
||||
verify(beforeConvertCallback).onBeforeConvert(personArgumentCaptor.capture(), eq("person")); |
||||
verify(beforeSaveCallback).onBeforeSave(personArgumentCaptor.capture(), any(), eq("person")); |
||||
verify(afterSaveCallback).onAfterSave(personArgumentCaptor.capture(), any(), eq("person")); |
||||
assertThat(personArgumentCaptor.getAllValues()).extracting("firstName").containsExactly("init", "before-convert", |
||||
"before-save"); |
||||
verify(client).bulkWrite(captor.capture(), any()); |
||||
|
||||
ConcreteClientInsertOneModel insertModel = extractWriteModel(ConcreteClientInsertOneModel.class, |
||||
captor.getValue().get(0)); |
||||
assertThat(insertModel.getDocument()).asInstanceOf(InstanceOfAssertFactories.map(String.class, Object.class)) |
||||
.containsEntry("firstName", "after-save"); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
@SuppressWarnings("rawtypes") |
||||
void bulkReplaceOneEmitsEventsCorrectly() { |
||||
|
||||
ops.replaceOne(query(where("firstName").is("danerys")), new SomeDomainType()); |
||||
|
||||
verifyNoInteractions(applicationContext); |
||||
|
||||
template.bulkWrite(ops.build(), BulkWriteOptions.ordered()).block(); |
||||
|
||||
MockingDetails mockingDetails = Mockito.mockingDetails(applicationContext); |
||||
Collection<Invocation> invocations = mockingDetails.getInvocations(); |
||||
assertThat(invocations).hasSize(3).extracting(tt -> tt.getArgument(0)).map(Object::getClass) |
||||
.containsExactly((Class) BeforeConvertEvent.class, (Class) BeforeSaveEvent.class, (Class) AfterSaveEvent.class); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
@SuppressWarnings("rawtypes") |
||||
void bulkInsertEmitsEventsCorrectly() { |
||||
|
||||
ops.insert(new SomeDomainType()); |
||||
|
||||
verify(applicationContext, never()).publishEvent(any(BeforeConvertEvent.class)); |
||||
verify(applicationContext, never()).publishEvent(any(BeforeSaveEvent.class)); |
||||
verify(applicationContext, never()).publishEvent(any(AfterSaveEvent.class)); |
||||
|
||||
template.bulkWrite(ops.build(), BulkWriteOptions.ordered()).block(); |
||||
|
||||
MockingDetails mockingDetails = Mockito.mockingDetails(applicationContext); |
||||
Collection<Invocation> invocations = mockingDetails.getInvocations(); |
||||
assertThat(invocations).hasSize(3).extracting(tt -> tt.getArgument(0)).map(Object::getClass) |
||||
.containsExactly((Class) BeforeConvertEvent.class, (Class) BeforeSaveEvent.class, (Class) AfterSaveEvent.class); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void appliesArrayFilterWhenPresent() { |
||||
|
||||
Bulk bulk = ops.updateOne(new BasicQuery("{}"), new Update().filterArray(where("element").gte(100))).build(); |
||||
|
||||
template.bulkWrite(bulk, BulkWriteOptions.ordered()).block(); |
||||
verify(client).bulkWrite(captor.capture(), any()); |
||||
|
||||
ConcreteClientUpdateOneModel updateModel = extractWriteModel(ConcreteClientUpdateOneModel.class, |
||||
captor.getValue().get(0)); |
||||
assertThat(updateModel.getOptions().getArrayFilters().get()).satisfies(it -> { |
||||
assertThat((List<Document>) it).containsExactly(new Document("element", new Document("$gte", 100))); |
||||
}); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void shouldRetainNestedArrayPathWithPlaceholdersForNoMatchingPaths() { |
||||
|
||||
Bulk bulk = ops.updateOne(new BasicQuery("{}"), new Update().set("items.$.documents.0.fileId", "new-id")).build(); |
||||
|
||||
template.bulkWrite(bulk, BulkWriteOptions.ordered()).block(); |
||||
verify(client).bulkWrite(captor.capture(), any()); |
||||
|
||||
ConcreteClientUpdateOneModel updateModel = extractWriteModel(ConcreteClientUpdateOneModel.class, |
||||
captor.getValue().get(0)); |
||||
assertThat(updateModel.getUpdate()) |
||||
.contains(new Document("$set", new Document("items.$.documents.0.fileId", "new-id"))); |
||||
} |
||||
|
||||
@Test // GH-5087
|
||||
void shouldRetainNestedArrayPathWithPlaceholdersForMappedEntity() { |
||||
|
||||
Bulk bulk = ops.inCollection("collection-1", OrderTest.class) |
||||
.updateOne(new BasicQuery("{}"), Update.update("items.$.documents.0.fileId", "file-id")).build(); |
||||
|
||||
template.bulkWrite(bulk, BulkWriteOptions.ordered()).block(); |
||||
verify(client).bulkWrite(captor.capture(), any()); |
||||
|
||||
ConcreteClientUpdateOneModel updateModel = extractWriteModel(ConcreteClientUpdateOneModel.class, |
||||
captor.getValue().get(0)); |
||||
assertThat(updateModel.getUpdate()) |
||||
.contains(new Document("$set", new Document("items.$.documents.0.the_file_id", "file-id"))); |
||||
} |
||||
|
||||
static <T extends ClientWriteModel> T extractWriteModel(Class<T> type, ClientNamespacedWriteModel source) { |
||||
|
||||
if (!(source instanceof AbstractClientNamespacedWriteModel)) { |
||||
throw new IllegalArgumentException("Expected AbstractClientNamespacedWriteModel, got " + source.getClass()); |
||||
} |
||||
ClientWriteModel model = ((AbstractClientNamespacedWriteModel) source).getModel(); |
||||
|
||||
return type.cast(model); |
||||
} |
||||
|
||||
static class OrderTest { |
||||
|
||||
String id; |
||||
List<OrderTestItem> items; |
||||
} |
||||
|
||||
static class OrderTestItem { |
||||
|
||||
private String cartId; |
||||
private List<OrderTestDocument> documents; |
||||
} |
||||
|
||||
static class OrderTestDocument { |
||||
|
||||
@Field("the_file_id") private String fileId; |
||||
} |
||||
|
||||
static class SomeDomainType { |
||||
|
||||
@Id String id; |
||||
Gender gender; |
||||
@Field("first_name") String firstName; |
||||
@Field String lastName; |
||||
} |
||||
|
||||
enum Gender { |
||||
M, F |
||||
} |
||||
|
||||
static class ReactiveBeforeConvertPersonCallback implements ReactiveBeforeConvertCallback<Person> { |
||||
|
||||
@Override |
||||
public Mono<Person> onBeforeConvert(Person entity, String collection) { |
||||
return Mono.just(new Person("before-convert")); |
||||
} |
||||
} |
||||
|
||||
static class ReactiveBeforeSavePersonCallback implements ReactiveBeforeSaveCallback<Person> { |
||||
|
||||
@Override |
||||
public Mono<Person> onBeforeSave(Person entity, Document document, String collection) { |
||||
|
||||
document.put("firstName", "before-save"); |
||||
return Mono.just(new Person("before-save")); |
||||
} |
||||
} |
||||
|
||||
static class ReactiveAfterSavePersonCallback implements ReactiveAfterSaveCallback<Person> { |
||||
|
||||
@Override |
||||
public Mono<Person> onAfterSave(Person entity, Document document, String collection) { |
||||
|
||||
document.put("firstName", "after-save"); |
||||
return Mono.just(new Person("after-save")); |
||||
} |
||||
} |
||||
|
||||
static class NullExceptionTranslator implements PersistenceExceptionTranslator { |
||||
|
||||
@Override |
||||
public DataAccessException translateExceptionIfPossible(RuntimeException ex) { |
||||
return null; |
||||
} |
||||
} |
||||
} |
||||
Loading…
Reference in new issue