From 8b438b4ecfbcdfa6ad6cd03e163a8c20370a6d38 Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Thu, 2 Nov 2023 11:41:19 +0100 Subject: [PATCH] Pass on FullDocumentBeforeChange option to change stream iterable/publisher. This commit ensures to pass on a potentially set FullDocumentBeforeChange option to the change stream iterable/publisher. It also corrects false optional behavior within the change stream task which did some defaulting though the actual value is an optional one that must not be present. Original pull request: #4541 Closes #4495 --- .../ReactiveChangeStreamOperationSupport.java | 1 + .../mongodb/core/ReactiveMongoTemplate.java | 4 ++++ .../core/messaging/ChangeStreamTask.java | 12 ++++++----- .../core/ReactiveMongoTemplateUnitTests.java | 21 +++++++++++++++++++ .../messaging/ChangeStreamTaskUnitTests.java | 20 +++++++++++++++--- 5 files changed, 50 insertions(+), 8 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java index 0e57a920b..d4e672427 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java @@ -168,6 +168,7 @@ class ReactiveChangeStreamOperationSupport implements ReactiveChangeStreamOperat } }); options.getFullDocumentLookup().ifPresent(builder::fullDocumentLookup); + options.getFullDocumentBeforeChangeLookup().ifPresent(builder::fullDocumentBeforeChangeLookup); options.getCollation().ifPresent(builder::collation); if (options.isResumeAfter()) { diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java index 723372bc9..58d5e44dc 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java @@ -1906,6 +1906,10 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation) .orElse(publisher); publisher = options.getResumeBsonTimestamp().map(publisher::startAtOperationTime).orElse(publisher); + + if(options.getFullDocumentBeforeChangeLookup().isPresent()) { + publisher = publisher.fullDocumentBeforeChange(options.getFullDocumentBeforeChangeLookup().get()); + } return publisher.fullDocument(options.getFullDocumentLookup().orElse(fullDocument)); }) // .flatMapMany(publisher -> Flux.from(publisher) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java index 637a4db54..2733c0f32 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java @@ -37,7 +37,6 @@ import org.springframework.data.mongodb.core.aggregation.TypeBasedAggregationOpe import org.springframework.data.mongodb.core.aggregation.TypedAggregation; import org.springframework.data.mongodb.core.convert.MongoConverter; import org.springframework.data.mongodb.core.convert.QueryMapper; -import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest.ChangeStreamRequestOptions; import org.springframework.data.mongodb.core.messaging.Message.MessageProperties; import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions; import org.springframework.lang.Nullable; @@ -88,7 +87,7 @@ class ChangeStreamTask extends CursorReadingTask, Collation collation = null; FullDocument fullDocument = ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT : FullDocument.UPDATE_LOOKUP; - FullDocumentBeforeChange fullDocumentBeforeChange = FullDocumentBeforeChange.DEFAULT; + FullDocumentBeforeChange fullDocumentBeforeChange = null; BsonTimestamp startAt = null; boolean resumeAfter = true; @@ -116,8 +115,9 @@ class ChangeStreamTask extends CursorReadingTask, .orElseGet(() -> ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT : FullDocument.UPDATE_LOOKUP); - fullDocumentBeforeChange = changeStreamOptions.getFullDocumentBeforeChangeLookup() - .orElse(FullDocumentBeforeChange.DEFAULT); + if(changeStreamOptions.getFullDocumentBeforeChangeLookup().isPresent()) { + fullDocumentBeforeChange = changeStreamOptions.getFullDocumentBeforeChangeLookup().get(); + } startAt = changeStreamOptions.getResumeBsonTimestamp().orElse(null); } @@ -158,7 +158,9 @@ class ChangeStreamTask extends CursorReadingTask, } iterable = iterable.fullDocument(fullDocument); - iterable = iterable.fullDocumentBeforeChange(fullDocumentBeforeChange); + if(fullDocumentBeforeChange != null) { + iterable = iterable.fullDocumentBeforeChange(fullDocumentBeforeChange); + } return iterable.iterator(); } diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java index c2543ade8..9453adca5 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java @@ -100,6 +100,7 @@ import org.springframework.util.CollectionUtils; import com.mongodb.MongoClientSettings; import com.mongodb.ReadPreference; +import com.mongodb.WriteConcern; import com.mongodb.client.model.CountOptions; import com.mongodb.client.model.CreateCollectionOptions; import com.mongodb.client.model.DeleteOptions; @@ -109,6 +110,7 @@ import com.mongodb.client.model.FindOneAndUpdateOptions; import com.mongodb.client.model.ReplaceOptions; import com.mongodb.client.model.TimeSeriesGranularity; import com.mongodb.client.model.UpdateOptions; +import com.mongodb.client.model.changestream.FullDocumentBeforeChange; import com.mongodb.client.result.DeleteResult; import com.mongodb.client.result.InsertManyResult; import com.mongodb.client.result.InsertOneResult; @@ -1528,6 +1530,25 @@ public class ReactiveMongoTemplateUnitTests { verify(changeStreamPublisher).startAfter(eq(token)); } + @Test // GH-4495 + void changeStreamOptionFullDocumentBeforeChangeShouldBeApplied() { + + when(factory.getMongoDatabase(anyString())).thenReturn(Mono.just(db)); + + when(collection.watch(any(Class.class))).thenReturn(changeStreamPublisher); + when(changeStreamPublisher.batchSize(anyInt())).thenReturn(changeStreamPublisher); + when(changeStreamPublisher.startAfter(any())).thenReturn(changeStreamPublisher); + when(changeStreamPublisher.fullDocument(any())).thenReturn(changeStreamPublisher); + when(changeStreamPublisher.fullDocumentBeforeChange(any())).thenReturn(changeStreamPublisher); + + template + .changeStream("database", "collection", ChangeStreamOptions.builder().fullDocumentBeforeChangeLookup(FullDocumentBeforeChange.REQUIRED).build(), Object.class) + .subscribe(); + + verify(changeStreamPublisher).fullDocumentBeforeChange(eq(FullDocumentBeforeChange.REQUIRED)); + + } + private void stubFindSubscribe(Document document) { Publisher realPublisher = Flux.just(document); diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTaskUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTaskUnitTests.java index ae053277f..a629cc8de 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTaskUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTaskUnitTests.java @@ -27,7 +27,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; - import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.convert.MappingMongoConverter; import org.springframework.data.mongodb.core.convert.MongoConverter; @@ -39,6 +38,7 @@ import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.changestream.ChangeStreamDocument; +import com.mongodb.client.model.changestream.FullDocumentBeforeChange; /** * @author Christoph Strobl @@ -68,8 +68,6 @@ class ChangeStreamTaskUnitTests { when(mongoCollection.watch(eq(Document.class))).thenReturn(changeStreamIterable); when(changeStreamIterable.fullDocument(any())).thenReturn(changeStreamIterable); - - when(changeStreamIterable.fullDocumentBeforeChange(any())).thenReturn(changeStreamIterable); } @Test // DATAMONGO-2258 @@ -125,6 +123,22 @@ class ChangeStreamTaskUnitTests { verify(changeStreamIterable).startAfter(eq(resumeToken)); } + @Test // GH-4495 + void shouldApplyFullDocumentBeforeChangeToChangeStream() { + + when(changeStreamIterable.fullDocumentBeforeChange(any())).thenReturn(changeStreamIterable); + + ChangeStreamRequest request = ChangeStreamRequest.builder() // + .collection("start-wars") // + .fullDocumentBeforeChangeLookup(FullDocumentBeforeChange.REQUIRED) // + .publishTo(message -> {}) // + .build(); + + initTask(request, Document.class); + + verify(changeStreamIterable).fullDocumentBeforeChange(eq(FullDocumentBeforeChange.REQUIRED)); + } + private MongoCursor> initTask(ChangeStreamRequest request, Class targetType) { ChangeStreamTask task = new ChangeStreamTask(template, request, targetType, er -> {});