diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java index 4a92c68e8..99e25d96b 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java @@ -162,6 +162,13 @@ public class ChangeStreamEvent { return getConvertedFullDocument(raw.getFullDocument()); } + /** + * Get the potentially converted {@link ChangeStreamDocument#getFullDocumentBeforeChange() document} before being changed. + * + * @return {@literal null} when {@link #getRaw()} or {@link ChangeStreamDocument#getFullDocumentBeforeChange()} is + * {@literal null}. + * @since 4.0 + */ @Nullable public T getBodyBeforeChange() { diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java index 3e5ba7a0d..672945f33 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java @@ -19,7 +19,6 @@ import java.time.Instant; import java.util.Arrays; import java.util.Optional; -import com.mongodb.client.model.changestream.FullDocumentBeforeChange; import org.bson.BsonDocument; import org.bson.BsonTimestamp; import org.bson.BsonValue; @@ -33,6 +32,7 @@ import org.springframework.util.ObjectUtils; import com.mongodb.client.model.changestream.ChangeStreamDocument; import com.mongodb.client.model.changestream.FullDocument; +import com.mongodb.client.model.changestream.FullDocumentBeforeChange; /** * Options applicable to MongoDB Change Streams. Intended @@ -79,6 +79,7 @@ public class ChangeStreamOptions { /** * @return {@link Optional#empty()} if not set. + * @since 4.0 */ public Optional getFullDocumentBeforeChangeLookup() { return Optional.ofNullable(fullDocumentBeforeChangeLookup); @@ -342,6 +343,7 @@ public class ChangeStreamOptions { * * @param lookup must not be {@literal null}. * @return this. + * @since 4.0 */ public ChangeStreamOptionsBuilder fullDocumentBeforeChangeLookup(FullDocumentBeforeChange lookup) { @@ -351,6 +353,17 @@ public class ChangeStreamOptions { return this; } + /** + * Return the full document before being changed if it is available. + * + * @return this. + * @since 4.0 + * @see #fullDocumentBeforeChangeLookup(FullDocumentBeforeChange) + */ + public ChangeStreamOptionsBuilder returnFullDocumentBeforeChange() { + return fullDocumentBeforeChangeLookup(FullDocumentBeforeChange.WHEN_AVAILABLE); + } + /** * Set the cluster time to resume from. * diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java index c9e0e065c..2aa24a646 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java @@ -18,7 +18,6 @@ package org.springframework.data.mongodb.core.messaging; import java.time.Duration; import java.time.Instant; -import com.mongodb.client.model.changestream.FullDocumentBeforeChange; import org.bson.BsonValue; import org.bson.Document; import org.springframework.data.mongodb.core.ChangeStreamOptions; @@ -31,6 +30,7 @@ import org.springframework.util.Assert; import com.mongodb.client.model.changestream.ChangeStreamDocument; import com.mongodb.client.model.changestream.FullDocument; +import com.mongodb.client.model.changestream.FullDocumentBeforeChange; /** * {@link SubscriptionRequest} implementation to be used for listening to @@ -415,7 +415,6 @@ public class ChangeStreamRequest * Set the {@link FullDocument} lookup to {@link FullDocument#UPDATE_LOOKUP}. * * @return this. - * @see #fullDocumentLookup(FullDocument) * @see ChangeStreamOptions#getFullDocumentLookup() * @see ChangeStreamOptionsBuilder#fullDocumentLookup(FullDocument) */ @@ -428,8 +427,10 @@ public class ChangeStreamRequest } /** + * Set the {@link FullDocumentBeforeChange} lookup to the given value. + * * @return this. - * @see #fullDocumentBeforeChangeLookup(FullDocumentBeforeChange) (FullDocumentBeforeChange) + * @since 4.0 * @see ChangeStreamOptions#getFullDocumentBeforeChangeLookup() * @see ChangeStreamOptionsBuilder#fullDocumentBeforeChangeLookup(FullDocumentBeforeChange) */ diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java index 7ea4da2b4..e1059f512 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; -import com.mongodb.client.model.changestream.FullDocumentBeforeChange; import org.bson.BsonDocument; import org.bson.BsonTimestamp; import org.bson.BsonValue; @@ -53,6 +52,7 @@ import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Collation; import com.mongodb.client.model.changestream.ChangeStreamDocument; import com.mongodb.client.model.changestream.FullDocument; +import com.mongodb.client.model.changestream.FullDocumentBeforeChange; /** * {@link Task} implementation for obtaining {@link ChangeStreamDocument ChangeStreamDocuments} from MongoDB. @@ -150,7 +150,7 @@ class ChangeStreamTask extends CursorReadingTask, } if (startAt != null) { - iterable.startAtOperationTime(startAt); + iterable = iterable.startAtOperationTime(startAt); } if (collation != null) { diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/Message.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/Message.java index 8749a7609..0ca9332b6 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/Message.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/Message.java @@ -57,6 +57,7 @@ public interface Message { * The converted message body before change if available. * * @return can be {@literal null}. + * @since 4.0 */ @Nullable default T getBodyBeforeChange() { diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTests.java index eda6f4834..889b66269 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTests.java @@ -541,7 +541,7 @@ class ChangeStreamTests { assertThat(messageBodies).hasSize(2); } - @Test // issue/41087 + @Test // GH-4187 @EnableIfMongoServerVersion(isGreaterThanEqual = "6.0") void readsFullDocumentBeforeChangeWhenOptionDeclaredWhenAvailable() throws InterruptedException { @@ -571,7 +571,7 @@ class ChangeStreamTests { assertThat(messageListener.getLastMessage().getBody()).isEqualTo(jellyBelly.withAge(8)); } - @Test // issue/41087 + @Test // GH-4187 @EnableIfMongoServerVersion(isGreaterThanEqual = "6.0") void readsFullDocumentBeforeChangeWhenOptionDeclaredRequired() throws InterruptedException { @@ -601,7 +601,7 @@ class ChangeStreamTests { assertThat(messageListener.getLastMessage().getBody()).isEqualTo(jellyBelly.withAge(8)); } - @Test // issue/41087 + @Test // GH-4187 @EnableIfMongoServerVersion(isGreaterThanEqual = "6.0") void readsFullDocumentBeforeChangeWhenOptionIsNotDeclared() throws InterruptedException { @@ -626,7 +626,7 @@ class ChangeStreamTests { assertThat(messageListener.getLastMessage().getBodyBeforeChange()).isNull(); } - @Test // issue/41087 + @Test // GH-4187 @EnableIfMongoServerVersion(isGreaterThanEqual = "6.0") void readsFullDocumentBeforeChangeWhenOptionDeclaredDefault() throws InterruptedException { @@ -651,7 +651,7 @@ class ChangeStreamTests { assertThat(messageListener.getLastMessage().getBodyBeforeChange()).isNull(); } - @Test // issue/41087 + @Test // GH-4187 @EnableIfMongoServerVersion(isGreaterThanEqual = "6.0") void readsFullDocumentBeforeChangeWhenOptionDeclaredOff() throws InterruptedException { @@ -676,7 +676,7 @@ class ChangeStreamTests { assertThat(messageListener.getLastMessage().getBodyBeforeChange()).isNull(); } - @Test // issue/41087 + @Test // GH-4187 @EnableIfMongoServerVersion(isGreaterThanEqual = "6.0") void readsFullDocumentBeforeChangeWhenOptionDeclaredWhenAvailableAndChangeStreamPreAndPostImagesDisabled() throws InterruptedException { @@ -700,7 +700,7 @@ class ChangeStreamTests { assertThat(messageListener.getLastMessage().getBodyBeforeChange()).isNull(); } - @Test // issue/41087 + @Test // GH-4187 @EnableIfMongoServerVersion(isLessThan = "6.0") void readsFullDocumentBeforeChangeWhenOptionDeclaredRequiredAndMongoVersionIsLessThan6() throws InterruptedException { @@ -739,6 +739,7 @@ class ChangeStreamTests { Address address; User withAge(int age) { + User user = new User(); user.id = id; user.userName = userName;