@ -37,7 +37,6 @@ import org.springframework.data.mongodb.core.aggregation.TypeBasedAggregationOpe
import org.springframework.data.mongodb.core.aggregation.TypedAggregation ;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation ;
import org.springframework.data.mongodb.core.convert.MongoConverter ;
import org.springframework.data.mongodb.core.convert.MongoConverter ;
import org.springframework.data.mongodb.core.convert.QueryMapper ;
import org.springframework.data.mongodb.core.convert.QueryMapper ;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest.ChangeStreamRequestOptions ;
import org.springframework.data.mongodb.core.messaging.Message.MessageProperties ;
import org.springframework.data.mongodb.core.messaging.Message.MessageProperties ;
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions ;
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions ;
import org.springframework.lang.Nullable ;
import org.springframework.lang.Nullable ;
@ -88,7 +87,7 @@ class ChangeStreamTask extends CursorReadingTask<ChangeStreamDocument<Document>,
Collation collation = null ;
Collation collation = null ;
FullDocument fullDocument = ClassUtils . isAssignable ( Document . class , targetType ) ? FullDocument . DEFAULT
FullDocument fullDocument = ClassUtils . isAssignable ( Document . class , targetType ) ? FullDocument . DEFAULT
: FullDocument . UPDATE_LOOKUP ;
: FullDocument . UPDATE_LOOKUP ;
FullDocumentBeforeChange fullDocumentBeforeChange = FullDocumentBeforeChange . DEFAULT ;
FullDocumentBeforeChange fullDocumentBeforeChange = null ;
BsonTimestamp startAt = null ;
BsonTimestamp startAt = null ;
boolean resumeAfter = true ;
boolean resumeAfter = true ;
@ -116,8 +115,9 @@ class ChangeStreamTask extends CursorReadingTask<ChangeStreamDocument<Document>,
. orElseGet ( ( ) - > ClassUtils . isAssignable ( Document . class , targetType ) ? FullDocument . DEFAULT
. orElseGet ( ( ) - > ClassUtils . isAssignable ( Document . class , targetType ) ? FullDocument . DEFAULT
: FullDocument . UPDATE_LOOKUP ) ;
: FullDocument . UPDATE_LOOKUP ) ;
fullDocumentBeforeChange = changeStreamOptions . getFullDocumentBeforeChangeLookup ( )
if ( changeStreamOptions . getFullDocumentBeforeChangeLookup ( ) . isPresent ( ) ) {
. orElse ( FullDocumentBeforeChange . DEFAULT ) ;
fullDocumentBeforeChange = changeStreamOptions . getFullDocumentBeforeChangeLookup ( ) . get ( ) ;
}
startAt = changeStreamOptions . getResumeBsonTimestamp ( ) . orElse ( null ) ;
startAt = changeStreamOptions . getResumeBsonTimestamp ( ) . orElse ( null ) ;
}
}
@ -158,7 +158,9 @@ class ChangeStreamTask extends CursorReadingTask<ChangeStreamDocument<Document>,
}
}
iterable = iterable . fullDocument ( fullDocument ) ;
iterable = iterable . fullDocument ( fullDocument ) ;
if ( fullDocumentBeforeChange ! = null ) {
iterable = iterable . fullDocumentBeforeChange ( fullDocumentBeforeChange ) ;
iterable = iterable . fullDocumentBeforeChange ( fullDocumentBeforeChange ) ;
}
return iterable . iterator ( ) ;
return iterable . iterator ( ) ;
}
}