From bf54054ad57ecc003cb9b3dc0bbac43f7549cd8e Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Thu, 2 Nov 2023 11:41:19 +0100 Subject: [PATCH] Pass on FullDocumentBeforeChange option to change stream iterable/publisher. This commit ensures to pass on a potentially set FullDocumentBeforeChange option to the change stream iterable/publisher. It also corrects false optional behavior within the change stream task which did some defaulting though the actual value is an optional one that must not be present. Original pull request: #4541 Closes #4495 --- .../ReactiveChangeStreamOperationSupport.java | 1 + .../mongodb/core/ReactiveMongoTemplate.java | 4 ++++ .../core/messaging/ChangeStreamTask.java | 12 ++++++---- .../core/ReactiveMongoTemplateUnitTests.java | 23 +++++++++++++++++-- .../messaging/ChangeStreamTaskUnitTests.java | 20 +++++++++++++--- 5 files changed, 50 insertions(+), 10 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java index 10c806f55..e3e74c7d4 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java @@ -168,6 +168,7 @@ class ReactiveChangeStreamOperationSupport implements ReactiveChangeStreamOperat } }); options.getFullDocumentLookup().ifPresent(builder::fullDocumentLookup); + options.getFullDocumentBeforeChangeLookup().ifPresent(builder::fullDocumentBeforeChangeLookup); options.getCollation().ifPresent(builder::collation); if (options.isResumeAfter()) { diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java index 9b03f0820..3e926d8f9 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java @@ -2050,6 +2050,10 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation) .orElse(publisher); publisher = options.getResumeBsonTimestamp().map(publisher::startAtOperationTime).orElse(publisher); + + if(options.getFullDocumentBeforeChangeLookup().isPresent()) { + publisher = publisher.fullDocumentBeforeChange(options.getFullDocumentBeforeChangeLookup().get()); + } return publisher.fullDocument(options.getFullDocumentLookup().orElse(fullDocument)); }) // .flatMapMany(publisher -> Flux.from(publisher) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java index a19b65521..ebe61665a 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java @@ -37,7 +37,6 @@ import org.springframework.data.mongodb.core.aggregation.TypeBasedAggregationOpe import org.springframework.data.mongodb.core.aggregation.TypedAggregation; import org.springframework.data.mongodb.core.convert.MongoConverter; import org.springframework.data.mongodb.core.convert.QueryMapper; -import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest.ChangeStreamRequestOptions; import org.springframework.data.mongodb.core.messaging.Message.MessageProperties; import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions; import org.springframework.lang.Nullable; @@ -88,7 +87,7 @@ class ChangeStreamTask extends CursorReadingTask, Collation collation = null; FullDocument fullDocument = ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT : FullDocument.UPDATE_LOOKUP; - FullDocumentBeforeChange fullDocumentBeforeChange = FullDocumentBeforeChange.DEFAULT; + FullDocumentBeforeChange fullDocumentBeforeChange = null; BsonTimestamp startAt = null; boolean resumeAfter = true; @@ -116,8 +115,9 @@ class ChangeStreamTask extends CursorReadingTask, .orElseGet(() -> ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT : FullDocument.UPDATE_LOOKUP); - fullDocumentBeforeChange = changeStreamOptions.getFullDocumentBeforeChangeLookup() - .orElse(FullDocumentBeforeChange.DEFAULT); + if(changeStreamOptions.getFullDocumentBeforeChangeLookup().isPresent()) { + fullDocumentBeforeChange = changeStreamOptions.getFullDocumentBeforeChangeLookup().get(); + } startAt = changeStreamOptions.getResumeBsonTimestamp().orElse(null); } @@ -158,7 +158,9 @@ class ChangeStreamTask extends CursorReadingTask, } iterable = iterable.fullDocument(fullDocument); - iterable = iterable.fullDocumentBeforeChange(fullDocumentBeforeChange); + if(fullDocumentBeforeChange != null) { + iterable = iterable.fullDocumentBeforeChange(fullDocumentBeforeChange); + } return iterable.iterator(); } diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java index c7b1e8f03..ddd71fca2 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java @@ -20,8 +20,6 @@ import static org.mockito.Mockito.*; import static org.springframework.data.mongodb.core.aggregation.Aggregation.*; import static org.springframework.data.mongodb.test.util.Assertions.assertThat; -import com.mongodb.WriteConcern; -import org.springframework.data.mongodb.core.MongoTemplateUnitTests.Sith; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -101,6 +99,7 @@ import org.springframework.util.CollectionUtils; import com.mongodb.MongoClientSettings; import com.mongodb.ReadConcern; import com.mongodb.ReadPreference; +import com.mongodb.WriteConcern; import com.mongodb.client.model.CountOptions; import com.mongodb.client.model.CreateCollectionOptions; import com.mongodb.client.model.DeleteOptions; @@ -110,6 +109,7 @@ import com.mongodb.client.model.FindOneAndUpdateOptions; import com.mongodb.client.model.ReplaceOptions; import com.mongodb.client.model.TimeSeriesGranularity; import com.mongodb.client.model.UpdateOptions; +import com.mongodb.client.model.changestream.FullDocumentBeforeChange; import com.mongodb.client.result.DeleteResult; import com.mongodb.client.result.InsertManyResult; import com.mongodb.client.result.InsertOneResult; @@ -1604,6 +1604,25 @@ public class ReactiveMongoTemplateUnitTests { verify(changeStreamPublisher).startAfter(eq(token)); } + @Test // GH-4495 + void changeStreamOptionFullDocumentBeforeChangeShouldBeApplied() { + + when(factory.getMongoDatabase(anyString())).thenReturn(Mono.just(db)); + + when(collection.watch(any(Class.class))).thenReturn(changeStreamPublisher); + when(changeStreamPublisher.batchSize(anyInt())).thenReturn(changeStreamPublisher); + when(changeStreamPublisher.startAfter(any())).thenReturn(changeStreamPublisher); + when(changeStreamPublisher.fullDocument(any())).thenReturn(changeStreamPublisher); + when(changeStreamPublisher.fullDocumentBeforeChange(any())).thenReturn(changeStreamPublisher); + + template + .changeStream("database", "collection", ChangeStreamOptions.builder().fullDocumentBeforeChangeLookup(FullDocumentBeforeChange.REQUIRED).build(), Object.class) + .subscribe(); + + verify(changeStreamPublisher).fullDocumentBeforeChange(eq(FullDocumentBeforeChange.REQUIRED)); + + } + @Test // GH-4462 void replaceShouldUseCollationWhenPresent() { diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTaskUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTaskUnitTests.java index ae053277f..a629cc8de 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTaskUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTaskUnitTests.java @@ -27,7 +27,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; - import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.convert.MappingMongoConverter; import org.springframework.data.mongodb.core.convert.MongoConverter; @@ -39,6 +38,7 @@ import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.changestream.ChangeStreamDocument; +import com.mongodb.client.model.changestream.FullDocumentBeforeChange; /** * @author Christoph Strobl @@ -68,8 +68,6 @@ class ChangeStreamTaskUnitTests { when(mongoCollection.watch(eq(Document.class))).thenReturn(changeStreamIterable); when(changeStreamIterable.fullDocument(any())).thenReturn(changeStreamIterable); - - when(changeStreamIterable.fullDocumentBeforeChange(any())).thenReturn(changeStreamIterable); } @Test // DATAMONGO-2258 @@ -125,6 +123,22 @@ class ChangeStreamTaskUnitTests { verify(changeStreamIterable).startAfter(eq(resumeToken)); } + @Test // GH-4495 + void shouldApplyFullDocumentBeforeChangeToChangeStream() { + + when(changeStreamIterable.fullDocumentBeforeChange(any())).thenReturn(changeStreamIterable); + + ChangeStreamRequest request = ChangeStreamRequest.builder() // + .collection("start-wars") // + .fullDocumentBeforeChangeLookup(FullDocumentBeforeChange.REQUIRED) // + .publishTo(message -> {}) // + .build(); + + initTask(request, Document.class); + + verify(changeStreamIterable).fullDocumentBeforeChange(eq(FullDocumentBeforeChange.REQUIRED)); + } + private MongoCursor> initTask(ChangeStreamRequest request, Class targetType) { ChangeStreamTask task = new ChangeStreamTask(template, request, targetType, er -> {});