diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java index fecaecd64..b25b3eb4f 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java @@ -18,7 +18,7 @@ package org.springframework.data.mongodb.core; import lombok.EqualsAndHashCode; import java.time.Instant; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.bson.BsonValue; import org.bson.Document; @@ -41,11 +41,17 @@ import com.mongodb.client.model.changestream.OperationType; @EqualsAndHashCode public class ChangeStreamEvent { + @SuppressWarnings("rawtypes") // + private static final AtomicReferenceFieldUpdater CONVERTED_UPDATER = AtomicReferenceFieldUpdater + .newUpdater(ChangeStreamEvent.class, Object.class, "converted"); + private final @Nullable ChangeStreamDocument raw; private final Class targetType; private final MongoConverter converter; - private final AtomicReference converted = new AtomicReference<>(); + + // accessed through CONVERTED_UPDATER. + private volatile @Nullable T converted; /** * @param raw can be {@literal null}. @@ -77,7 +83,7 @@ public class ChangeStreamEvent { */ @Nullable public Instant getTimestamp() { - return raw != null ? Instant.ofEpochMilli(raw.getClusterTime().getValue()) : null; + return raw != null && raw.getClusterTime() != null ? Instant.ofEpochMilli(raw.getClusterTime().getValue()) : null; } /** @@ -133,36 +139,48 @@ public class ChangeStreamEvent { return null; } - if (raw.getFullDocument() == null) { - return targetType.cast(raw.getFullDocument()); + Document fullDocument = raw.getFullDocument(); + + if (fullDocument == null) { + return targetType.cast(fullDocument); } - return getConverted(); + return getConverted(fullDocument); } - private T getConverted() { + @SuppressWarnings("unchecked") + private T getConverted(Document fullDocument) { + return (T) doGetConverted(fullDocument); + } + + private Object doGetConverted(Document fullDocument) { + + Object result = CONVERTED_UPDATER.get(this); - T result = converted.get(); if (result != null) { return result; } - if (ClassUtils.isAssignable(Document.class, raw.getFullDocument().getClass())) { + if (ClassUtils.isAssignable(Document.class, fullDocument.getClass())) { - result = converter.read(targetType, raw.getFullDocument()); - return converted.compareAndSet(null, result) ? result : converted.get(); + result = converter.read(targetType, fullDocument); + return CONVERTED_UPDATER.compareAndSet(this, null, result) ? result : CONVERTED_UPDATER.get(this); } - if (converter.getConversionService().canConvert(raw.getFullDocument().getClass(), targetType)) { + if (converter.getConversionService().canConvert(fullDocument.getClass(), targetType)) { - result = converter.getConversionService().convert(raw.getFullDocument(), targetType); - return converted.compareAndSet(null, result) ? result : converted.get(); + result = converter.getConversionService().convert(fullDocument, targetType); + return CONVERTED_UPDATER.compareAndSet(this, null, result) ? result : CONVERTED_UPDATER.get(this); } throw new IllegalArgumentException(String.format("No converter found capable of converting %s to %s", - raw.getFullDocument().getClass(), targetType)); + fullDocument.getClass(), targetType)); } + /* + * (non-Javadoc) + * @see java.lang.Object#toString() + */ @Override public String toString() { return "ChangeStreamEvent {" + "raw=" + raw + ", targetType=" + targetType + '}'; diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java index 34683b45f..95b964e3d 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java @@ -26,6 +26,7 @@ import org.bson.Document; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; import org.springframework.data.geo.GeoResult; +import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory; import org.springframework.data.mongodb.core.aggregation.Aggregation; import org.springframework.data.mongodb.core.aggregation.AggregationOptions; import org.springframework.data.mongodb.core.aggregation.TypedAggregation; @@ -1356,9 +1357,9 @@ public interface ReactiveMongoOperations extends ReactiveFluentMongoOperations { Flux tail(Query query, Class entityClass, String collectionName); /** - * Subscribe to a MongoDB Change Streams for all events - * in the configured default database via the reactive infrastructure. Use the optional provided {@link Aggregation} - * to filter events. The stream will not be completed unless the {@link org.reactivestreams.Subscription} is + * Subscribe to a MongoDB Change Stream for all events in + * the configured default database via the reactive infrastructure. Use the optional provided {@link Aggregation} to + * filter events. The stream will not be completed unless the {@link org.reactivestreams.Subscription} is * {@link Subscription#cancel() canceled}. *

* The {@link ChangeStreamEvent#getBody()} is mapped to the {@literal resultType} while the @@ -1372,14 +1373,16 @@ public interface ReactiveMongoOperations extends ReactiveFluentMongoOperations { * @param * @return the {@link Flux} emitting {@link ChangeStreamEvent events} as they arrive. * @since 2.1 + * @see ReactiveMongoDatabaseFactory#getMongoDatabase() + * @see ChangeStreamOptions#getFilter() */ default Flux> changeStream(ChangeStreamOptions options, Class targetType) { return changeStream(null, options, targetType); } /** - * Subscribe to a MongoDB Change Streams for all events - * in the given collection via the reactive infrastructure. Use the optional provided {@link Aggregation} to filter + * Subscribe to a MongoDB Change Stream for all events in + * the given collection via the reactive infrastructure. Use the optional provided {@link Aggregation} to filter * events. The stream will not be completed unless the {@link org.reactivestreams.Subscription} is * {@link Subscription#cancel() canceled}. *

@@ -1389,12 +1392,13 @@ public interface ReactiveMongoOperations extends ReactiveFluentMongoOperations { * Use {@link ChangeStreamOptions} to set arguments like {@link ChangeStreamOptions#getResumeToken() the resumseToken} * for resuming change streams. * - * @param collectionName the collection to watch. Can be {@literal null}, watches all collections if so. + * @param collectionName the collection to watch. Can be {@literal null} to watch all collections. * @param options must not be {@literal null}. Use {@link ChangeStreamOptions#empty()}. * @param targetType the result type to use. * @param * @return the {@link Flux} emitting {@link ChangeStreamEvent events} as they arrive. * @since 2.1 + * @see ChangeStreamOptions#getFilter() */ default Flux> changeStream(@Nullable String collectionName, ChangeStreamOptions options, Class targetType) { @@ -1403,7 +1407,7 @@ public interface ReactiveMongoOperations extends ReactiveFluentMongoOperations { } /** - * Subscribe to a MongoDB Change Streams via the reactive + * Subscribe to a MongoDB Change Stream via the reactive * infrastructure. Use the optional provided {@link Aggregation} to filter events. The stream will not be completed * unless the {@link org.reactivestreams.Subscription} is {@link Subscription#cancel() canceled}. *

@@ -1420,6 +1424,7 @@ public interface ReactiveMongoOperations extends ReactiveFluentMongoOperations { * @param * @return the {@link Flux} emitting {@link ChangeStreamEvent events} as they arrive. * @since 2.1 + * @see ChangeStreamOptions#getFilter() */ Flux> changeStream(@Nullable String database, @Nullable String collectionName, ChangeStreamOptions options, Class targetType); diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java index c6e3bf918..060b14943 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java @@ -2028,11 +2028,8 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati List prepareFilter(ChangeStreamOptions options) { - if (!options.getFilter().isPresent()) { - return Collections.emptyList(); - } + Object filter = options.getFilter().orElse(Collections.emptyList()); - Object filter = options.getFilter().get(); if (filter instanceof Aggregation) { Aggregation agg = (Aggregation) filter; AggregationOperationContext context = agg instanceof TypedAggregation @@ -2042,12 +2039,14 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati return agg.toPipeline(new PrefixingDelegatingAggregationOperationContext(context, "fullDocument", Arrays.asList("operationType", "fullDocument", "documentKey", "updateDescription", "ns"))); - } else if (filter instanceof List) { + } + + if (filter instanceof List) { return (List) filter; - } else { - throw new IllegalArgumentException( - "ChangeStreamRequestOptions.filter mut be either an Aggregation or a plain list of Documents"); } + + throw new IllegalArgumentException( + "ChangeStreamRequestOptions.filter mut be either an Aggregation or a plain list of Documents"); } /* diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java index 0d86cc020..e6a17c705 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java @@ -46,13 +46,13 @@ import com.mongodb.client.model.changestream.FullDocument; * * or {@link com.mongodb.client.MongoDatabase} which receives events from all {@link com.mongodb.client.MongoCollection * collections} in that database. - * + * *

  * 
  *     ChangeStreamRequest request = new ChangeStreamRequest<>(System.out::println, RequestOptions.justDatabase("test"));
  * 
  * 
- * + * * For more advanced scenarios {@link ChangeStreamOptions} offers abstractions for options like filtering, resuming,... * *
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java
index 38ca51ddf..6e887548a 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java
@@ -58,6 +58,7 @@ import com.mongodb.client.model.changestream.FullDocument;
  * {@link Task} implementation for obtaining {@link ChangeStreamDocument ChangeStreamDocuments} from MongoDB.
  *
  * @author Christoph Strobl
+ * @author Mark Paluch
  * @since 2.1
  */
 class ChangeStreamTask extends CursorReadingTask, Object> {
@@ -77,7 +78,7 @@ class ChangeStreamTask extends CursorReadingTask,
 		mongoConverter = template.getConverter();
 	}
 
-	/* 
+	/*
 	 * (non-Javadoc)
 	 * @see org.springframework.data.mongodb.core.messaging.CursorReadingTask#initCursor(org.springframework.data.mongodb.core.MongoTemplate, org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions, java.lang.Class)
 	 */
@@ -114,9 +115,8 @@ class ChangeStreamTask extends CursorReadingTask,
 					.orElseGet(() -> ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT
 							: FullDocument.UPDATE_LOOKUP);
 
-			if (changeStreamOptions.getResumeTimestamp().isPresent()) {
-				startAt = new BsonTimestamp(changeStreamOptions.getResumeTimestamp().get().toEpochMilli());
-			}
+			startAt = changeStreamOptions.getResumeTimestamp().map(Instant::toEpochMilli).map(BsonTimestamp::new)
+					.orElse(null);
 		}
 
 		MongoDatabase db = StringUtils.hasText(options.getDatabaseName())
@@ -149,13 +149,15 @@ class ChangeStreamTask extends CursorReadingTask,
 		return iterable.iterator();
 	}
 
+	@SuppressWarnings("unchecked")
 	List prepareFilter(MongoTemplate template, ChangeStreamOptions options) {
 
 		if (!options.getFilter().isPresent()) {
 			return Collections.emptyList();
 		}
 
-		Object filter = options.getFilter().get();
+		Object filter = options.getFilter().orElse(null);
+
 		if (filter instanceof Aggregation) {
 			Aggregation agg = (Aggregation) filter;
 			AggregationOperationContext context = agg instanceof TypedAggregation
@@ -164,14 +166,20 @@ class ChangeStreamTask extends CursorReadingTask,
 					: Aggregation.DEFAULT_CONTEXT;
 
 			return agg.toPipeline(new PrefixingDelegatingAggregationOperationContext(context, "fullDocument", blacklist));
-		} else if (filter instanceof List) {
+		}
+
+		if (filter instanceof List) {
 			return (List) filter;
-		} else {
-			throw new IllegalArgumentException(
-					"ChangeStreamRequestOptions.filter mut be either an Aggregation or a plain list of Documents");
 		}
+
+		throw new IllegalArgumentException(
+				"ChangeStreamRequestOptions.filter mut be either an Aggregation or a plain list of Documents");
 	}
 
+	/*
+	 * (non-Javadoc)
+	 * @see org.springframework.data.mongodb.core.messaging.CursorReadingTask#createMessage(java.lang.Object, java.lang.Class, org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions)
+	 */
 	@Override
 	protected Message, Object> createMessage(ChangeStreamDocument source,
 			Class targetType, RequestOptions options) {
@@ -202,7 +210,7 @@ class ChangeStreamTask extends CursorReadingTask,
 		private final ChangeStreamEvent delegate;
 		private final MessageProperties messageProperties;
 
-		/* 
+		/*
 		 * (non-Javadoc)
 		 * @see org.springframework.data.mongodb.core.messaging.Message#getRaw()
 		 */
@@ -212,7 +220,7 @@ class ChangeStreamTask extends CursorReadingTask,
 			return delegate.getRaw();
 		}
 
-		/* 
+		/*
 		 * (non-Javadoc)
 		 * @see org.springframework.data.mongodb.core.messaging.Message#getBody()
 		 */
@@ -222,7 +230,7 @@ class ChangeStreamTask extends CursorReadingTask,
 			return delegate.getBody();
 		}
 
-		/* 
+		/*
 		 * (non-Javadoc)
 		 * @see org.springframework.data.mongodb.core.messaging.Message#getProperties()
 		 */
@@ -232,7 +240,7 @@ class ChangeStreamTask extends CursorReadingTask,
 		}
 
 		/**
-		 * @return the resume token or {@litearl null} if not set.
+		 * @return the resume token or {@literal null} if not set.
 		 * @see ChangeStreamEvent#getResumeToken()
 		 */
 		@Nullable
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/SubscriptionRequest.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/SubscriptionRequest.java
index 307a0bc22..e19d94b5c 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/SubscriptionRequest.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/SubscriptionRequest.java
@@ -57,7 +57,7 @@ public interface SubscriptionRequest {
 		 * Get the database name of the db.
 		 *
 		 * @return the name of the database to subscribe to. Can be {@literal null} in which case the default
-		 *         {@link MongoDbFactory#getDb()} is used.
+		 *         {@link MongoDbFactory#getDb() database} is used.
 		 */
 		@Nullable
 		default String getDatabaseName() {
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainerTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainerTests.java
index c5edaf8b1..03dd7d3b0 100644
--- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainerTests.java
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainerTests.java
@@ -56,16 +56,16 @@ public class DefaultMessageListenerContainerTests {
 	public static final String DATABASE_NAME = "change-stream-events";
 	public static final String COLLECTION_NAME = "collection-1";
 	public static final String COLLECTION_2_NAME = "collection-2";
-	MongoDbFactory dbFactory;
 
+	public @Rule TestRule replSet = ReplicaSet.none();
+
+	MongoDbFactory dbFactory;
 	MongoCollection collection;
-	private MongoCollection collection2;
+	MongoCollection collection2;
 
 	private CollectingMessageListener messageListener;
 	private MongoTemplate template;
 
-	public @Rule TestRule replSet = ReplicaSet.none();
-
 	@Before
 	public void setUp() {
 
diff --git a/src/main/asciidoc/reference/change-streams.adoc b/src/main/asciidoc/reference/change-streams.adoc
index 945d6e876..a31aad2a4 100644
--- a/src/main/asciidoc/reference/change-streams.adoc
+++ b/src/main/asciidoc/reference/change-streams.adoc
@@ -5,16 +5,16 @@ As of MongoDB 3.6, https://docs.mongodb.com/manual/changeStreams/[Change Streams
 
 NOTE: Change Stream support is only possible for replica sets or for a sharded cluster.
 
-Change Streams can be subscribed to with both the imperative and the reactive MongoDB Java driver. It is highly recommended to use the reactive variant, as it is less resource-intensive. However if you cannot use the reactive API, you can still obtain the change events by using the  messaging concept that is already prevalent in the Spring ecosystem.
+Change Streams can be consumed with both, the imperative and the reactive MongoDB Java driver. It is highly recommended to use the reactive variant, as it is less resource-intensive. However, if you cannot use the reactive API, you can still obtain change events by using the messaging concept that is already prevalent in the Spring ecosystem.
 
 It is possible to watch both on a collection as well as database level, whereas the database level variant publishes
-changes from all collections within the database. So when subscribing to a database change stream, make sure to use a
- suitable type for the event type in use as conversion might not apply correctly when set to specificly. In doubt use
-  `Document`.
+changes from all collections within the database. When subscribing to a database change stream, make sure to use a
+ suitable type for the event type as conversion might not apply correctly across different entity types.
+In doubt, use `Document`.
 
 === Change Streams with `MessageListener`
 
-Listening to a https://docs.mongodb.com/manual/tutorial/change-streams-example/[Change Stream by using a Sync Driver] is a long running, blocking task that needs to be delegated to a separate component.
+Listening to a https://docs.mongodb.com/manual/tutorial/change-streams-example/[Change Stream by using a Sync Driver] creates a long running, blocking task that needs to be delegated to a separate component.
 In this case, we need to first create a `MessageListenerContainer`, which will be the main entry point for running the specific `SubscriptionRequest` tasks.
 Spring Data MongoDB already ships with a default implementation that operates on `MongoTemplate` and is capable of creating and executing `Task` instances for a `ChangeStreamRequest`.
 
@@ -25,34 +25,34 @@ The following example shows how to use Change Streams with `MessageListener` ins
 [source,java]
 ----
 MessageListenerContainer container = new DefaultMessageListenerContainer(template);
-container.start();    <1>
+container.start();                                                                                        <1>
 
-MessageListener, User> listener = System.out::println; <2>
+MessageListener, User> listener = System.out::println;                     <2>
 ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("user", ChangeStreamOptions.empty()); <3>
 
 Subscription subscription = container.register(new ChangeStreamRequest<>(listener, options), User.class); <4>
 
 // ...
 
-container.stop(); <5>
+container.stop();                                                                                         <5>
 ----
-<1> Starting the container intializes the resources and starts the `Task` instances for the already registered `SubscriptionRequest` instances. Requests added after the startup are run immediately.
+<1> Starting the container intializes the resources and starts `Task` instances for already registered `SubscriptionRequest` instances. Requests added after startup are ran immediately.
 <2> Define the listener called when a `Message` is received. The `Message#getBody()` is converted to the requested domain type. Use `Document` to receive raw results without conversion.
 <3> Set the collection to listen to and provide additional options through `ChangeStreamOptions`.
 <4> Register the request. The returned `Subscription` can be used to check the current `Task` state and cancel its execution to free resources.
 <5> Do not forget to stop the container once you are sure you no longer need it. Doing so stops all running `Task` instances within the container.
 ====
 
-=== Change Streams - Reactive
+=== Reactive Change Streams
 
-Subscribing to Change Stream with the reactive API is more straightforward. Still the essential building blocks, such as `ChangeStreamOptions`, remain the same. The following example shows how to use Change Streams with reactive `MessageListeners`:
+Subscribing to Change Streams with the reactive API is a more natural approach to work with streams. Still, the essential building blocks, such as `ChangeStreamOptions`, remain the same. The following example shows how to use Change Streams emitting ``ChangeStreamEvent``s:
 
-.Change Streams with `MessageListeners`
+.Change Streams emitting `ChangeStreamEvent`
 ====
 [source,java]
 ----
 ChangeStreamOptions options = ChangeStreamOptions.builder()
-    .filter(newAggregation(User.class, match(where("age").gte(38))) <1>
+    .filter(newAggregation(User.class, match(where("age").gte(38)))                              <1>
     .build();
 
 Flux> flux = reactiveTemplate.changeStream("user", options, User.class); <2>
@@ -63,20 +63,21 @@ Flux> flux = reactiveTemplate.changeStream("user", optio
 
 === Resuming Change Streams
 
-Change Streams can be resumed and will pick up emitting events where you left. To resume the stream either a resume
-token or the server time in UTC from where to resume is required. Use `ChangeStreamOptions` to set the value
-accordingly.
+Change Streams can be resumed and resume emitting events where you left. To resume the stream, you need to supply either a resume
+token or the last known server time (in UTC). Use `ChangeStreamOptions` to set the value accordingly.
+
+The following example shows how to set the resume offset using server time:
 
 .Resume a Change Stream
 ====
 [source,java]
 ----
 ChangeStreamOptions = ChangeStreamOptions.builder()
-    .resumeAt(Instant.now().minusSeconds(1)) <1>
+    .resumeAt(Instant.now().minusSeconds(1))        <1>
     .build()
 
 Flux> resumed = template.changeStream("person", options, User.class)
 ----
-<1> You may obtain the server time of an `ChangeStreamEvent` via the `getTimestamp` method or use the `resumeToken`
-exposed via `getResumeToken`.
+<1> You may obtain the server time of an `ChangeStreamEvent` through the `getTimestamp` method or use the `resumeToken`
+exposed through `getResumeToken`.
 ====