@ -22,9 +22,11 @@ import java.util.stream.Collectors;
@@ -22,9 +22,11 @@ import java.util.stream.Collectors;
import org.bson.Document ;
import org.bson.conversions.Bson ;
import org.springframework.context.ApplicationEvent ;
import org.springframework.context.ApplicationEventPublisher ;
import org.springframework.dao.DataIntegrityViolationException ;
import org.springframework.data.mapping.PersistentEntity ;
import org.springframework.data.mapping.callback.EntityCallback ;
import org.springframework.data.mapping.callback.EntityCallbacks ;
import org.springframework.data.mongodb.BulkOperationException ;
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext ;
@ -48,7 +50,6 @@ import org.springframework.data.mongodb.core.query.UpdateDefinition.ArrayFilter;
@@ -48,7 +50,6 @@ import org.springframework.data.mongodb.core.query.UpdateDefinition.ArrayFilter;
import org.springframework.data.util.Pair ;
import org.springframework.lang.Nullable ;
import org.springframework.util.Assert ;
import org.springframework.util.ObjectUtils ;
import com.mongodb.MongoBulkWriteException ;
import com.mongodb.WriteConcern ;
@ -78,7 +79,6 @@ class DefaultBulkOperations implements BulkOperations {
@@ -78,7 +79,6 @@ class DefaultBulkOperations implements BulkOperations {
private final List < SourceAwareWriteModelHolder > models = new ArrayList < > ( ) ;
private @Nullable WriteConcern defaultWriteConcern ;
private BulkWriteOptions bulkOptions ;
/ * *
@ -100,7 +100,7 @@ class DefaultBulkOperations implements BulkOperations {
@@ -100,7 +100,7 @@ class DefaultBulkOperations implements BulkOperations {
this . mongoOperations = mongoOperations ;
this . collectionName = collectionName ;
this . bulkOperationContext = bulkOperationContext ;
this . bulkOptions = getBulkWriteOptions ( bulkOperationContext . getB ulkMode( ) ) ;
this . bulkOptions = getBulkWriteOptions ( bulkOperationContext . b ulkMode( ) ) ;
}
/ * *
@ -145,11 +145,11 @@ class DefaultBulkOperations implements BulkOperations {
@@ -145,11 +145,11 @@ class DefaultBulkOperations implements BulkOperations {
}
@Override
public BulkOperations updateOne ( List < Pair < Query , Update > > updates ) {
public BulkOperations updateOne ( List < Pair < Query , UpdateDefinition > > updates ) {
Assert . notNull ( updates , "Updates must not be null" ) ;
for ( Pair < Query , Update > update : updates ) {
for ( Pair < Query , UpdateDefinition > update : updates ) {
update ( update . getFirst ( ) , update . getSecond ( ) , false , false ) ;
}
@ -169,11 +169,11 @@ class DefaultBulkOperations implements BulkOperations {
@@ -169,11 +169,11 @@ class DefaultBulkOperations implements BulkOperations {
}
@Override
public BulkOperations updateMulti ( List < Pair < Query , Update > > updates ) {
public BulkOperations updateMulti ( List < Pair < Query , UpdateDefinition > > updates ) {
Assert . notNull ( updates , "Updates must not be null" ) ;
for ( Pair < Query , Update > update : updates ) {
for ( Pair < Query , UpdateDefinition > update : updates ) {
update ( update . getFirst ( ) , update . getSecond ( ) , false , true ) ;
}
@ -253,7 +253,7 @@ class DefaultBulkOperations implements BulkOperations {
@@ -253,7 +253,7 @@ class DefaultBulkOperations implements BulkOperations {
return result ;
} finally {
this . bulkOptions = getBulkWriteOptions ( bulkOperationContext . getB ulkMode( ) ) ;
this . bulkOptions = getBulkWriteOptions ( bulkOperationContext . b ulkMode( ) ) ;
}
}
@ -272,9 +272,8 @@ class DefaultBulkOperations implements BulkOperations {
@@ -272,9 +272,8 @@ class DefaultBulkOperations implements BulkOperations {
bulkOptions ) ;
} catch ( RuntimeException ex ) {
if ( ex instanceof MongoBulkWriteException ) {
if ( ex instanceof MongoBulkWriteException mongoBulkWriteException ) {
MongoBulkWriteException mongoBulkWriteException = ( MongoBulkWriteException ) ex ;
if ( mongoBulkWriteException . getWriteConcernError ( ) ! = null ) {
throw new DataIntegrityViolationException ( ex . getMessage ( ) , ex ) ;
}
@ -289,17 +288,17 @@ class DefaultBulkOperations implements BulkOperations {
@@ -289,17 +288,17 @@ class DefaultBulkOperations implements BulkOperations {
maybeEmitBeforeSaveEvent ( it ) ;
if ( it . getM odel( ) instanceof InsertOneModel ) {
if ( it . m odel( ) instanceof InsertOneModel ) {
Document target = ( ( InsertOneModel < Document > ) it . getM odel( ) ) . getDocument ( ) ;
maybeInvokeBeforeSaveCallback ( it . getS ource( ) , target ) ;
} else if ( it . getM odel( ) instanceof ReplaceOneModel ) {
Document target = ( ( InsertOneModel < Document > ) it . m odel( ) ) . getDocument ( ) ;
maybeInvokeBeforeSaveCallback ( it . s ource( ) , target ) ;
} else if ( it . m odel( ) instanceof ReplaceOneModel ) {
Document target = ( ( ReplaceOneModel < Document > ) it . getM odel( ) ) . getReplacement ( ) ;
maybeInvokeBeforeSaveCallback ( it . getS ource( ) , target ) ;
Document target = ( ( ReplaceOneModel < Document > ) it . m odel( ) ) . getReplacement ( ) ;
maybeInvokeBeforeSaveCallback ( it . s ource( ) , target ) ;
}
return mapWriteModel ( it . getS ource( ) , it . getM odel( ) ) ;
return mapWriteModel ( it . s ource( ) , it . m odel( ) ) ;
}
/ * *
@ -329,9 +328,8 @@ class DefaultBulkOperations implements BulkOperations {
@@ -329,9 +328,8 @@ class DefaultBulkOperations implements BulkOperations {
private WriteModel < Document > mapWriteModel ( Object source , WriteModel < Document > writeModel ) {
if ( writeModel instanceof UpdateOneModel ) {
if ( writeModel instanceof UpdateOneModel < Document > model ) {
UpdateOneModel < Document > model = ( UpdateOneModel < Document > ) writeModel ;
if ( source instanceof AggregationUpdate aggregationUpdate ) {
List < Document > pipeline = mapUpdatePipeline ( aggregationUpdate ) ;
@ -342,9 +340,8 @@ class DefaultBulkOperations implements BulkOperations {
@@ -342,9 +340,8 @@ class DefaultBulkOperations implements BulkOperations {
model . getOptions ( ) ) ;
}
if ( writeModel instanceof UpdateManyModel ) {
if ( writeModel instanceof UpdateManyModel < Document > model ) {
UpdateManyModel < Document > model = ( UpdateManyModel < Document > ) writeModel ;
if ( source instanceof AggregationUpdate aggregationUpdate ) {
List < Document > pipeline = mapUpdatePipeline ( aggregationUpdate ) ;
@ -355,17 +352,11 @@ class DefaultBulkOperations implements BulkOperations {
@@ -355,17 +352,11 @@ class DefaultBulkOperations implements BulkOperations {
model . getOptions ( ) ) ;
}
if ( writeModel instanceof DeleteOneModel ) {
DeleteOneModel < Document > model = ( DeleteOneModel < Document > ) writeModel ;
if ( writeModel instanceof DeleteOneModel < Document > model ) {
return new DeleteOneModel < > ( getMappedQuery ( model . getFilter ( ) ) , model . getOptions ( ) ) ;
}
if ( writeModel instanceof DeleteManyModel ) {
DeleteManyModel < Document > model = ( DeleteManyModel < Document > ) writeModel ;
if ( writeModel instanceof DeleteManyModel < Document > model ) {
return new DeleteManyModel < > ( getMappedQuery ( model . getFilter ( ) ) , model . getOptions ( ) ) ;
}
@ -373,24 +364,23 @@ class DefaultBulkOperations implements BulkOperations {
@@ -373,24 +364,23 @@ class DefaultBulkOperations implements BulkOperations {
}
private List < Document > mapUpdatePipeline ( AggregationUpdate source ) {
Class < ? > type = bulkOperationContext . getEntity ( ) . isPresent ( )
? bulkOperationContext . getEntity ( ) . map ( PersistentEntity : : getType ) . get ( )
Class < ? > type = bulkOperationContext . entity ( ) . isPresent ( )
? bulkOperationContext . entity ( ) . map ( PersistentEntity : : getType ) . get ( )
: Object . class ;
AggregationOperationContext context = new RelaxedTypeBasedAggregationOperationContext ( type ,
bulkOperationContext . getU pdateMapper( ) . getMappingContext ( ) , bulkOperationContext . getQ ueryMapper( ) ) ;
bulkOperationContext . u pdateMapper( ) . getMappingContext ( ) , bulkOperationContext . q ueryMapper( ) ) ;
List < Document > pipeline = new AggregationUtil ( bulkOperationContext . getQueryMapper ( ) ,
bulkOperationContext . getQueryMapper ( ) . getMappingContext ( ) ) . createPipeline ( source ,
context ) ;
return pipeline ;
return new AggregationUtil ( bulkOperationContext . queryMapper ( ) ,
bulkOperationContext . queryMapper ( ) . getMappingContext ( ) ) . createPipeline ( source , context ) ;
}
private Bson getMappedUpdate ( Bson update ) {
return bulkOperationContext . getU pdateMapper( ) . getMappedObject ( update , bulkOperationContext . g etE ntity( ) ) ;
return bulkOperationContext . u pdateMapper( ) . getMappedObject ( update , bulkOperationContext . entity ( ) ) ;
}
private Bson getMappedQuery ( Bson query ) {
return bulkOperationContext . getQ ueryMapper( ) . getMappedObject ( query , bulkOperationContext . g etE ntity( ) ) ;
return bulkOperationContext . q ueryMapper( ) . getMappedObject ( query , bulkOperationContext . entity ( ) ) ;
}
private Document getMappedObject ( Object source ) {
@ -411,80 +401,57 @@ class DefaultBulkOperations implements BulkOperations {
@@ -411,80 +401,57 @@ class DefaultBulkOperations implements BulkOperations {
private void maybeEmitBeforeSaveEvent ( SourceAwareWriteModelHolder holder ) {
if ( holder . getM odel( ) instanceof InsertOneModel ) {
if ( holder . m odel( ) instanceof InsertOneModel ) {
Document target = ( ( InsertOneModel < Document > ) holder . getM odel( ) ) . getDocument ( ) ;
maybeEmitEvent ( new BeforeSaveEvent < > ( holder . getS ource( ) , target , collectionName ) ) ;
} else if ( holder . getM odel( ) instanceof ReplaceOneModel ) {
Document target = ( ( InsertOneModel < Document > ) holder . m odel( ) ) . getDocument ( ) ;
maybeEmitEvent ( new BeforeSaveEvent < > ( holder . s ource( ) , target , collectionName ) ) ;
} else if ( holder . m odel( ) instanceof ReplaceOneModel ) {
Document target = ( ( ReplaceOneModel < Document > ) holder . getM odel( ) ) . getReplacement ( ) ;
maybeEmitEvent ( new BeforeSaveEvent < > ( holder . getS ource( ) , target , collectionName ) ) ;
Document target = ( ( ReplaceOneModel < Document > ) holder . m odel( ) ) . getReplacement ( ) ;
maybeEmitEvent ( new BeforeSaveEvent < > ( holder . s ource( ) , target , collectionName ) ) ;
}
}
private void maybeEmitAfterSaveEvent ( SourceAwareWriteModelHolder holder ) {
if ( holder . getM odel( ) instanceof InsertOneModel ) {
if ( holder . m odel( ) instanceof InsertOneModel ) {
Document target = ( ( InsertOneModel < Document > ) holder . getM odel( ) ) . getDocument ( ) ;
maybeEmitEvent ( new AfterSaveEvent < > ( holder . getS ource( ) , target , collectionName ) ) ;
} else if ( holder . getM odel( ) instanceof ReplaceOneModel ) {
Document target = ( ( InsertOneModel < Document > ) holder . m odel( ) ) . getDocument ( ) ;
maybeEmitEvent ( new AfterSaveEvent < > ( holder . s ource( ) , target , collectionName ) ) ;
} else if ( holder . m odel( ) instanceof ReplaceOneModel ) {
Document target = ( ( ReplaceOneModel < Document > ) holder . getM odel( ) ) . getReplacement ( ) ;
maybeEmitEvent ( new AfterSaveEvent < > ( holder . getS ource( ) , target , collectionName ) ) ;
Document target = ( ( ReplaceOneModel < Document > ) holder . m odel( ) ) . getReplacement ( ) ;
maybeEmitEvent ( new AfterSaveEvent < > ( holder . s ource( ) , target , collectionName ) ) ;
}
}
private void maybeInvokeAfterSaveCallback ( SourceAwareWriteModelHolder holder ) {
if ( holder . getM odel( ) instanceof InsertOneModel ) {
if ( holder . m odel( ) instanceof InsertOneModel ) {
Document target = ( ( InsertOneModel < Document > ) holder . getM odel( ) ) . getDocument ( ) ;
maybeInvokeAfterSaveCallback ( holder . getS ource( ) , target ) ;
} else if ( holder . getM odel( ) instanceof ReplaceOneModel ) {
Document target = ( ( InsertOneModel < Document > ) holder . m odel( ) ) . getDocument ( ) ;
maybeInvokeAfterSaveCallback ( holder . s ource( ) , target ) ;
} else if ( holder . m odel( ) instanceof ReplaceOneModel ) {
Document target = ( ( ReplaceOneModel < Document > ) holder . getM odel( ) ) . getReplacement ( ) ;
maybeInvokeAfterSaveCallback ( holder . getS ource( ) , target ) ;
Document target = ( ( ReplaceOneModel < Document > ) holder . m odel( ) ) . getReplacement ( ) ;
maybeInvokeAfterSaveCallback ( holder . s ource( ) , target ) ;
}
}
private < E extends MongoMappingEvent < T > , T > E maybeEmitEvent ( E event ) {
if ( bulkOperationContext . getEventPublisher ( ) = = null ) {
return event ;
}
bulkOperationContext . getEventPublisher ( ) . publishEvent ( event ) ;
return event ;
private void maybeEmitEvent ( MongoMappingEvent < ? > event ) {
bulkOperationContext . publishEvent ( event ) ;
}
private Object maybeInvokeBeforeConvertCallback ( Object value ) {
if ( bulkOperationContext . getEntityCallbacks ( ) = = null ) {
return value ;
}
return bulkOperationContext . getEntityCallbacks ( ) . callback ( BeforeConvertCallback . class , value , collectionName ) ;
return bulkOperationContext . callback ( BeforeConvertCallback . class , value , collectionName ) ;
}
private Object maybeInvokeBeforeSaveCallback ( Object value , Document mappedDocument ) {
if ( bulkOperationContext . getEntityCallbacks ( ) = = null ) {
return value ;
}
return bulkOperationContext . getEntityCallbacks ( ) . callback ( BeforeSaveCallback . class , value , mappedDocument ,
collectionName ) ;
return bulkOperationContext . callback ( BeforeSaveCallback . class , value , mappedDocument , collectionName ) ;
}
private Object maybeInvokeAfterSaveCallback ( Object value , Document mappedDocument ) {
if ( bulkOperationContext . getEntityCallbacks ( ) = = null ) {
return value ;
}
return bulkOperationContext . getEntityCallbacks ( ) . callback ( AfterSaveCallback . class , value , mappedDocument ,
collectionName ) ;
return bulkOperationContext . callback ( AfterSaveCallback . class , value , mappedDocument , collectionName ) ;
}
private static BulkWriteOptions getBulkWriteOptions ( BulkMode bulkMode ) {
@ -525,152 +492,62 @@ class DefaultBulkOperations implements BulkOperations {
@@ -525,152 +492,62 @@ class DefaultBulkOperations implements BulkOperations {
}
/ * *
* { @link BulkOperationContext } holds information about
* { @link org . springframework . data . mongodb . core . BulkOperations . BulkMode } the entity in use as well as references to
* { @link BulkOperationContext } holds information about { @link BulkMode } the entity in use as well as references to
* { @link QueryMapper } and { @link UpdateMapper } .
*
* @author Christoph Strobl
* @since 2 . 0
* /
static final class BulkOperationContext {
private final BulkMode bulkMode ;
private final Optional < ? extends MongoPersistentEntity < ? > > entity ;
private final QueryMapper queryMapper ;
private final UpdateMapper updateMapper ;
private final ApplicationEventPublisher eventPublisher ;
private final EntityCallbacks entityCallbacks ;
BulkOperationContext ( BulkOperations . BulkMode bulkMode , Optional < ? extends MongoPersistentEntity < ? > > entity ,
QueryMapper queryMapper , UpdateMapper updateMapper , ApplicationEventPublisher eventPublisher ,
EntityCallbacks entityCallbacks ) {
this . bulkMode = bulkMode ;
this . entity = entity ;
this . queryMapper = queryMapper ;
this . updateMapper = updateMapper ;
this . eventPublisher = eventPublisher ;
this . entityCallbacks = entityCallbacks ;
}
record BulkOperationContext ( BulkMode bulkMode , Optional < ? extends MongoPersistentEntity < ? > > entity ,
QueryMapper queryMapper , UpdateMapper updateMapper , @Nullable ApplicationEventPublisher eventPublisher ,
@Nullable EntityCallbacks entityCallbacks ) {
public BulkMode getBulkMode ( ) {
return this . bulkMode ;
public boolean skipEventPublishing ( ) {
return eventPublisher = = null ;
}
public Optional < ? extends MongoPersistentEntity < ? > > getEntity ( ) {
return this . entity ;
public boolean skipEntityCallbacks ( ) {
return entityCallbacks = = null ;
}
public QueryMapper getQueryMapper ( ) {
return this . queryMapper ;
}
public UpdateMapper getUpdateMapper ( ) {
return this . updateMapper ;
}
@SuppressWarnings ( "rawtypes" )
public < T > T callback ( Class < ? extends EntityCallback > callbackType , T entity , String collectionName ) {
public ApplicationEventPublisher getEventPublisher ( ) {
return this . eventPublisher ;
}
if ( skipEntityCallbacks ( ) ) {
return entity ;
}
public EntityCallbacks getEntityCallbacks ( ) {
return this . entityCallbacks ;
return entityCallbacks . callback ( callbackType , entity , collectionName ) ;
}
@Override
public boolean equals ( @Nullable Object o ) {
if ( this = = o )
return true ;
if ( o = = null | | getClass ( ) ! = o . getClass ( ) )
return false ;
BulkOperationContext that = ( BulkOperationContext ) o ;
@SuppressWarnings ( "rawtypes" )
public < T > T callback ( Class < ? extends EntityCallback > callbackType , T entity , Document document ,
String collectionName ) {
if ( bulkMode ! = that . bulkMode )
return false ;
if ( ! ObjectUtils . nullSafeEquals ( this . entity , that . entity ) ) {
return false ;
if ( skipEntityCallbacks ( ) ) {
return entity ;
}
if ( ! ObjectUtils . nullSafeEquals ( this . queryMapper , that . queryMapper ) ) {
return false ;
}
if ( ! ObjectUtils . nullSafeEquals ( this . updateMapper , that . updateMapper ) ) {
return false ;
}
if ( ! ObjectUtils . nullSafeEquals ( this . eventPublisher , that . eventPublisher ) ) {
return false ;
}
return ObjectUtils . nullSafeEquals ( this . entityCallbacks , that . entityCallbacks ) ;
}
@Override
public int hashCode ( ) {
int result = bulkMode ! = null ? bulkMode . hashCode ( ) : 0 ;
result = 31 * result + ObjectUtils . nullSafeHashCode ( entity ) ;
result = 31 * result + ObjectUtils . nullSafeHashCode ( queryMapper ) ;
result = 31 * result + ObjectUtils . nullSafeHashCode ( updateMapper ) ;
result = 31 * result + ObjectUtils . nullSafeHashCode ( eventPublisher ) ;
result = 31 * result + ObjectUtils . nullSafeHashCode ( entityCallbacks ) ;
return result ;
return entityCallbacks . callback ( callbackType , entity , document , collectionName ) ;
}
public String toString ( ) {
return "DefaultBulkOperations.BulkOperationContext(bulkMode=" + this . getBulkMode ( ) + ", entity="
+ this . getEntity ( ) + ", queryMapper=" + this . getQueryMapper ( ) + ", updateMapper=" + this . getUpdateMapper ( )
+ ", eventPublisher=" + this . getEventPublisher ( ) + ", entityCallbacks=" + this . getEntityCallbacks ( ) + ")" ;
public void publishEvent ( ApplicationEvent event ) {
if ( skipEventPublishing ( ) ) {
return ;
}
eventPublisher . publishEvent ( event ) ;
}
}
/ * *
* Value object chaining together an actual source with its { @link WriteModel } representation .
*
* @since 2 . 2
* @author Christoph Strobl
* @since 2 . 2
* /
private static final class SourceAwareWriteModelHolder {
private final Object source ;
private final WriteModel < Document > model ;
SourceAwareWriteModelHolder ( Object source , WriteModel < Document > model ) {
this . source = source ;
this . model = model ;
}
record SourceAwareWriteModelHolder ( Object source , WriteModel < Document > model ) {
public Object getSource ( ) {
return this . source ;
}
public WriteModel < Document > getModel ( ) {
return this . model ;
}
@Override
public boolean equals ( @Nullable Object o ) {
if ( this = = o )
return true ;
if ( o = = null | | getClass ( ) ! = o . getClass ( ) )
return false ;
SourceAwareWriteModelHolder that = ( SourceAwareWriteModelHolder ) o ;
if ( ! ObjectUtils . nullSafeEquals ( this . source , that . source ) ) {
return false ;
}
return ObjectUtils . nullSafeEquals ( this . model , that . model ) ;
}
@Override
public int hashCode ( ) {
int result = ObjectUtils . nullSafeHashCode ( model ) ;
result = 31 * result + ObjectUtils . nullSafeHashCode ( source ) ;
return result ;
}
public String toString ( ) {
return "DefaultBulkOperations.SourceAwareWriteModelHolder(source=" + this . getSource ( ) + ", model="
+ this . getModel ( ) + ")" ;
}
}
}