diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java index 10c806f55..e3e74c7d4 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java @@ -168,6 +168,7 @@ class ReactiveChangeStreamOperationSupport implements ReactiveChangeStreamOperat } }); options.getFullDocumentLookup().ifPresent(builder::fullDocumentLookup); + options.getFullDocumentBeforeChangeLookup().ifPresent(builder::fullDocumentBeforeChangeLookup); options.getCollation().ifPresent(builder::collation); if (options.isResumeAfter()) { diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java index 9b03f0820..3e926d8f9 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java @@ -2050,6 +2050,10 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation) .orElse(publisher); publisher = options.getResumeBsonTimestamp().map(publisher::startAtOperationTime).orElse(publisher); + + if(options.getFullDocumentBeforeChangeLookup().isPresent()) { + publisher = publisher.fullDocumentBeforeChange(options.getFullDocumentBeforeChangeLookup().get()); + } return publisher.fullDocument(options.getFullDocumentLookup().orElse(fullDocument)); }) // .flatMapMany(publisher -> Flux.from(publisher) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java index a19b65521..ebe61665a 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java @@ -37,7 +37,6 @@ import org.springframework.data.mongodb.core.aggregation.TypeBasedAggregationOpe import org.springframework.data.mongodb.core.aggregation.TypedAggregation; import org.springframework.data.mongodb.core.convert.MongoConverter; import org.springframework.data.mongodb.core.convert.QueryMapper; -import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest.ChangeStreamRequestOptions; import org.springframework.data.mongodb.core.messaging.Message.MessageProperties; import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions; import org.springframework.lang.Nullable; @@ -88,7 +87,7 @@ class ChangeStreamTask extends CursorReadingTask, Collation collation = null; FullDocument fullDocument = ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT : FullDocument.UPDATE_LOOKUP; - FullDocumentBeforeChange fullDocumentBeforeChange = FullDocumentBeforeChange.DEFAULT; + FullDocumentBeforeChange fullDocumentBeforeChange = null; BsonTimestamp startAt = null; boolean resumeAfter = true; @@ -116,8 +115,9 @@ class ChangeStreamTask extends CursorReadingTask, .orElseGet(() -> ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT : FullDocument.UPDATE_LOOKUP); - fullDocumentBeforeChange = changeStreamOptions.getFullDocumentBeforeChangeLookup() - .orElse(FullDocumentBeforeChange.DEFAULT); + if(changeStreamOptions.getFullDocumentBeforeChangeLookup().isPresent()) { + fullDocumentBeforeChange = changeStreamOptions.getFullDocumentBeforeChangeLookup().get(); + } startAt = changeStreamOptions.getResumeBsonTimestamp().orElse(null); } @@ -158,7 +158,9 @@ class ChangeStreamTask extends CursorReadingTask, } iterable = iterable.fullDocument(fullDocument); - iterable = iterable.fullDocumentBeforeChange(fullDocumentBeforeChange); + if(fullDocumentBeforeChange != null) { + iterable = iterable.fullDocumentBeforeChange(fullDocumentBeforeChange); + } return iterable.iterator(); } diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java index c7b1e8f03..ddd71fca2 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java @@ -20,8 +20,6 @@ import static org.mockito.Mockito.*; import static org.springframework.data.mongodb.core.aggregation.Aggregation.*; import static org.springframework.data.mongodb.test.util.Assertions.assertThat; -import com.mongodb.WriteConcern; -import org.springframework.data.mongodb.core.MongoTemplateUnitTests.Sith; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -101,6 +99,7 @@ import org.springframework.util.CollectionUtils; import com.mongodb.MongoClientSettings; import com.mongodb.ReadConcern; import com.mongodb.ReadPreference; +import com.mongodb.WriteConcern; import com.mongodb.client.model.CountOptions; import com.mongodb.client.model.CreateCollectionOptions; import com.mongodb.client.model.DeleteOptions; @@ -110,6 +109,7 @@ import com.mongodb.client.model.FindOneAndUpdateOptions; import com.mongodb.client.model.ReplaceOptions; import com.mongodb.client.model.TimeSeriesGranularity; import com.mongodb.client.model.UpdateOptions; +import com.mongodb.client.model.changestream.FullDocumentBeforeChange; import com.mongodb.client.result.DeleteResult; import com.mongodb.client.result.InsertManyResult; import com.mongodb.client.result.InsertOneResult; @@ -1604,6 +1604,25 @@ public class ReactiveMongoTemplateUnitTests { verify(changeStreamPublisher).startAfter(eq(token)); } + @Test // GH-4495 + void changeStreamOptionFullDocumentBeforeChangeShouldBeApplied() { + + when(factory.getMongoDatabase(anyString())).thenReturn(Mono.just(db)); + + when(collection.watch(any(Class.class))).thenReturn(changeStreamPublisher); + when(changeStreamPublisher.batchSize(anyInt())).thenReturn(changeStreamPublisher); + when(changeStreamPublisher.startAfter(any())).thenReturn(changeStreamPublisher); + when(changeStreamPublisher.fullDocument(any())).thenReturn(changeStreamPublisher); + when(changeStreamPublisher.fullDocumentBeforeChange(any())).thenReturn(changeStreamPublisher); + + template + .changeStream("database", "collection", ChangeStreamOptions.builder().fullDocumentBeforeChangeLookup(FullDocumentBeforeChange.REQUIRED).build(), Object.class) + .subscribe(); + + verify(changeStreamPublisher).fullDocumentBeforeChange(eq(FullDocumentBeforeChange.REQUIRED)); + + } + @Test // GH-4462 void replaceShouldUseCollationWhenPresent() { diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTaskUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTaskUnitTests.java index ae053277f..a629cc8de 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTaskUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTaskUnitTests.java @@ -27,7 +27,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; - import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.convert.MappingMongoConverter; import org.springframework.data.mongodb.core.convert.MongoConverter; @@ -39,6 +38,7 @@ import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.changestream.ChangeStreamDocument; +import com.mongodb.client.model.changestream.FullDocumentBeforeChange; /** * @author Christoph Strobl @@ -68,8 +68,6 @@ class ChangeStreamTaskUnitTests { when(mongoCollection.watch(eq(Document.class))).thenReturn(changeStreamIterable); when(changeStreamIterable.fullDocument(any())).thenReturn(changeStreamIterable); - - when(changeStreamIterable.fullDocumentBeforeChange(any())).thenReturn(changeStreamIterable); } @Test // DATAMONGO-2258 @@ -125,6 +123,22 @@ class ChangeStreamTaskUnitTests { verify(changeStreamIterable).startAfter(eq(resumeToken)); } + @Test // GH-4495 + void shouldApplyFullDocumentBeforeChangeToChangeStream() { + + when(changeStreamIterable.fullDocumentBeforeChange(any())).thenReturn(changeStreamIterable); + + ChangeStreamRequest request = ChangeStreamRequest.builder() // + .collection("start-wars") // + .fullDocumentBeforeChangeLookup(FullDocumentBeforeChange.REQUIRED) // + .publishTo(message -> {}) // + .build(); + + initTask(request, Document.class); + + verify(changeStreamIterable).fullDocumentBeforeChange(eq(FullDocumentBeforeChange.REQUIRED)); + } + private MongoCursor> initTask(ChangeStreamRequest request, Class targetType) { ChangeStreamTask task = new ChangeStreamTask(template, request, targetType, er -> {});