@ -15,14 +15,21 @@
* /
* /
package org.springframework.data.mongodb.core ;
package org.springframework.data.mongodb.core ;
import lombok.Data ;
import java.util.ArrayList ;
import java.util.ArrayList ;
import java.util.Arrays ;
import java.util.Arrays ;
import java.util.List ;
import java.util.List ;
import java.util.Optional ;
import java.util.stream.Collectors ;
import com.mongodb.client.model.DeleteOptions ;
import org.bson.Document ;
import org.bson.Document ;
import org.bson.conversions.Bson ;
import org.springframework.dao.DataAccessException ;
import org.springframework.dao.DataAccessException ;
import org.springframework.dao.support.PersistenceExceptionTranslator ;
import org.springframework.dao.support.PersistenceExceptionTranslator ;
import org.springframework.data.mongodb.core.convert.QueryMapper ;
import org.springframework.data.mongodb.core.convert.UpdateMapper ;
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity ;
import org.springframework.data.mongodb.core.query.Query ;
import org.springframework.data.mongodb.core.query.Query ;
import org.springframework.data.mongodb.core.query.Update ;
import org.springframework.data.mongodb.core.query.Update ;
import org.springframework.data.util.Pair ;
import org.springframework.data.util.Pair ;
@ -33,6 +40,8 @@ import com.mongodb.WriteConcern;
import com.mongodb.client.MongoCollection ;
import com.mongodb.client.MongoCollection ;
import com.mongodb.client.model.BulkWriteOptions ;
import com.mongodb.client.model.BulkWriteOptions ;
import com.mongodb.client.model.DeleteManyModel ;
import com.mongodb.client.model.DeleteManyModel ;
import com.mongodb.client.model.DeleteOneModel ;
import com.mongodb.client.model.DeleteOptions ;
import com.mongodb.client.model.InsertOneModel ;
import com.mongodb.client.model.InsertOneModel ;
import com.mongodb.client.model.UpdateManyModel ;
import com.mongodb.client.model.UpdateManyModel ;
import com.mongodb.client.model.UpdateOneModel ;
import com.mongodb.client.model.UpdateOneModel ;
@ -41,7 +50,7 @@ import com.mongodb.client.model.WriteModel;
/ * *
/ * *
* Default implementation for { @link BulkOperations } .
* Default implementation for { @link BulkOperations } .
*
*
* @author Tobias Trelle
* @author Tobias Trelle
* @author Oliver Gierke
* @author Oliver Gierke
* @author Christoph Strobl
* @author Christoph Strobl
@ -50,11 +59,11 @@ import com.mongodb.client.model.WriteModel;
class DefaultBulkOperations implements BulkOperations {
class DefaultBulkOperations implements BulkOperations {
private final MongoOperations mongoOperations ;
private final MongoOperations mongoOperations ;
private final BulkMode bulkMode ;
private final String collectionName ;
private final String collectionName ;
private final BulkOperationContext bulkOperationContext ;
private PersistenceExceptionTranslator exceptionTranslator ;
private WriteConcernResolver writeConcernResolver ;
private WriteConcernResolver writeConcernResolver ;
private PersistenceExceptionTranslator exceptionTranslator ;
private WriteConcern defaultWriteConcern ;
private WriteConcern defaultWriteConcern ;
private BulkWriteOptions bulkOptions ;
private BulkWriteOptions bulkOptions ;
@ -62,28 +71,25 @@ class DefaultBulkOperations implements BulkOperations {
List < WriteModel < Document > > models = new ArrayList < > ( ) ;
List < WriteModel < Document > > models = new ArrayList < > ( ) ;
/ * *
/ * *
* Creates a new { @link DefaultBulkOperations } for the given { @link MongoOperations } , { @link BulkMode } , collection
* Creates a new { @link DefaultBulkOperations } for the given { @link MongoOperations } , collection name and
* name and { @link WriteConcern } .
* { @link BulkOperationContext } .
*
*
* @param mongoOperations The underlying { @link MongoOperations } , must not be { @literal null } .
* @param mongoOperations must not be { @literal null } .
* @param bulkMod e must not be { @literal null } .
* @param collectionNam e must not be { @literal null } .
* @param collectionName Name of the collection to work on , must not be { @literal null } or empty .
* @param bulkOperationContext must not be { @literal null } .
* @param entityType the entity type , can be { @literal null } .
* @since 2 . 0
* /
* /
DefaultBulkOperations ( MongoOperations mongoOperations , BulkMode bulkMode , String collectionName ,
DefaultBulkOperations ( MongoOperations mongoOperations , String collectionName ,
Class < ? > entityType ) {
BulkOperationContext bulkOperationContext ) {
Assert . notNull ( mongoOperations , "MongoOperations must not be null!" ) ;
Assert . notNull ( mongoOperations , "MongoOperations must not be null!" ) ;
Assert . notNull ( bulkMode , "BulkMode must not be null !") ;
Assert . hasText ( collectionName , "CollectionName must not be null nor empty !") ;
Assert . hasText ( collectionName , "Collection name must not be null or empty !") ;
Assert . notNull ( bulkOperationContext , "BulkOperationContext must not be null !") ;
this . mongoOperations = mongoOperations ;
this . mongoOperations = mongoOperations ;
this . bulkMode = bulkMode ;
this . collectionName = collectionName ;
this . collectionName = collectionName ;
this . bulkOperationContext = bulkOperationContext ;
this . exceptionTranslator = new MongoExceptionTranslator ( ) ;
this . exceptionTranslator = new MongoExceptionTranslator ( ) ;
this . writeConcernResolver = DefaultWriteConcernResolver . INSTANCE ;
this . bulkOptions = initBulkOperation ( ) ;
this . bulkOptions = initBulkOperation ( ) ;
}
}
@ -282,7 +288,7 @@ class DefaultBulkOperations implements BulkOperations {
collection = collection . withWriteConcern ( defaultWriteConcern ) ;
collection = collection . withWriteConcern ( defaultWriteConcern ) ;
}
}
return collection . bulkWrite ( models , bulkOptions ) ;
return collection . bulkWrite ( models . stream ( ) . map ( this : : mapWriteModel ) . collect ( Collectors . toList ( ) ) , bulkOptions ) ;
} catch ( BulkWriteException o_O ) {
} catch ( BulkWriteException o_O ) {
@ -323,7 +329,8 @@ class DefaultBulkOperations implements BulkOperations {
private final BulkWriteOptions initBulkOperation ( ) {
private final BulkWriteOptions initBulkOperation ( ) {
BulkWriteOptions options = new BulkWriteOptions ( ) ;
BulkWriteOptions options = new BulkWriteOptions ( ) ;
switch ( bulkMode ) {
switch ( bulkOperationContext . getBulkMode ( ) ) {
case ORDERED :
case ORDERED :
return options . ordered ( true ) ;
return options . ordered ( true ) ;
case UNORDERED :
case UNORDERED :
@ -331,4 +338,64 @@ class DefaultBulkOperations implements BulkOperations {
}
}
throw new IllegalStateException ( "BulkMode was null!" ) ;
throw new IllegalStateException ( "BulkMode was null!" ) ;
}
}
private WriteModel < Document > mapWriteModel ( WriteModel < Document > writeModel ) {
if ( writeModel instanceof UpdateOneModel ) {
UpdateOneModel < Document > model = ( UpdateOneModel < Document > ) writeModel ;
return new UpdateOneModel ( getMappedQuery ( model . getFilter ( ) ) , getMappedUpdate ( model . getUpdate ( ) ) ,
model . getOptions ( ) ) ;
}
if ( writeModel instanceof UpdateManyModel ) {
UpdateManyModel < Document > model = ( UpdateManyModel < Document > ) writeModel ;
return new UpdateManyModel ( getMappedQuery ( model . getFilter ( ) ) , getMappedUpdate ( model . getUpdate ( ) ) ,
model . getOptions ( ) ) ;
}
if ( writeModel instanceof DeleteOneModel ) {
DeleteOneModel < Document > model = ( DeleteOneModel < Document > ) writeModel ;
return new DeleteOneModel ( getMappedQuery ( model . getFilter ( ) ) , model . getOptions ( ) ) ;
}
if ( writeModel instanceof DeleteManyModel ) {
DeleteManyModel < Document > model = ( DeleteManyModel < Document > ) writeModel ;
return new DeleteManyModel ( getMappedQuery ( model . getFilter ( ) ) , model . getOptions ( ) ) ;
}
return writeModel ;
}
private Bson getMappedUpdate ( Bson update ) {
return bulkOperationContext . getUpdateMapper ( ) . getMappedObject ( update , bulkOperationContext . getEntity ( ) ) ;
}
private Bson getMappedQuery ( Bson query ) {
return bulkOperationContext . getQueryMapper ( ) . getMappedObject ( query , bulkOperationContext . getEntity ( ) ) ;
}
/ * *
* { @link BulkOperationContext } holds information about
* { @link org . springframework . data . mongodb . core . BulkOperations . BulkMode } the entity in use as well as references to
* { @link QueryMapper } and { @link UpdateMapper } .
*
* @author Christoph Strobl
* @since 2 . 0
* /
@Data
static class BulkOperationContext {
final BulkMode bulkMode ;
final Optional < ? extends MongoPersistentEntity < ? > > entity ;
final QueryMapper queryMapper ;
final UpdateMapper updateMapper ;
}
}
}