diff --git a/.travis.yml b/.travis.yml index c04cd83d3..604395caa 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,7 +18,7 @@ env: matrix: - PROFILE=ci global: - - MONGO_VERSION=4.0.0-rc4 + - MONGO_VERSION=4.0.0 addons: apt: diff --git a/pom.xml b/pom.xml index 7227da358..0ff20c582 100644 --- a/pom.xml +++ b/pom.xml @@ -28,8 +28,8 @@ multi spring-data-mongodb 2.1.0.BUILD-SNAPSHOT - 3.8.0-beta2 - 1.9.0-beta1 + 3.8.0 + 1.9.0 1.19 diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java index a5930833f..fecaecd64 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java @@ -17,8 +17,10 @@ package org.springframework.data.mongodb.core; import lombok.EqualsAndHashCode; +import java.time.Instant; import java.util.concurrent.atomic.AtomicReference; +import org.bson.BsonValue; import org.bson.Document; import org.springframework.data.mongodb.core.convert.MongoConverter; import org.springframework.data.mongodb.core.messaging.Message; @@ -26,6 +28,7 @@ import org.springframework.lang.Nullable; import org.springframework.util.ClassUtils; import com.mongodb.client.model.changestream.ChangeStreamDocument; +import com.mongodb.client.model.changestream.OperationType; /** * {@link Message} implementation specific to MongoDB Change @@ -67,6 +70,56 @@ public class ChangeStreamEvent { return raw; } + /** + * Get the {@link ChangeStreamDocument#getClusterTime() cluster time} as {@link Instant} the event was emitted at. + * + * @return can be {@literal null}. + */ + @Nullable + public Instant getTimestamp() { + return raw != null ? Instant.ofEpochMilli(raw.getClusterTime().getValue()) : null; + } + + /** + * Get the {@link ChangeStreamDocument#getResumeToken() resume token} for this event. + * + * @return can be {@literal null}. + */ + @Nullable + public BsonValue getResumeToken() { + return raw != null ? raw.getResumeToken() : null; + } + + /** + * Get the {@link ChangeStreamDocument#getOperationType() operation type} for this event. + * + * @return can be {@literal null}. + */ + @Nullable + public OperationType getOperationType() { + return raw != null ? raw.getOperationType() : null; + } + + /** + * Get the database name the event was originated at. + * + * @return can be {@literal null}. + */ + @Nullable + public String getDatabaseName() { + return raw != null ? raw.getNamespace().getDatabaseName() : null; + } + + /** + * Get the collection name the event was originated at. + * + * @return can be {@literal null}. + */ + @Nullable + public String getCollectionName() { + return raw != null ? raw.getNamespace().getCollectionName() : null; + } + /** * Get the potentially converted {@link ChangeStreamDocument#getFullDocument()}. * diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java index e5c4fa5d8..42b0de8c1 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java @@ -17,6 +17,7 @@ package org.springframework.data.mongodb.core; import lombok.EqualsAndHashCode; +import java.time.Instant; import java.util.Arrays; import java.util.Optional; @@ -46,6 +47,7 @@ public class ChangeStreamOptions { private @Nullable BsonValue resumeToken; private @Nullable FullDocument fullDocumentLookup; private @Nullable Collation collation; + private @Nullable Instant resumeTimestamp; protected ChangeStreamOptions() {} @@ -77,6 +79,13 @@ public class ChangeStreamOptions { return Optional.ofNullable(collation); } + /** + * @return {@link Optional#empty()} if not set. + */ + public Optional getResumeTimestamp() { + return Optional.ofNullable(resumeTimestamp); + } + /** * @return empty {@link ChangeStreamOptions}. */ @@ -106,6 +115,7 @@ public class ChangeStreamOptions { private @Nullable BsonValue resumeToken; private @Nullable FullDocument fullDocumentLookup; private @Nullable Collation collation; + private @Nullable Instant resumeTimestamp; private ChangeStreamOptionsBuilder() {} @@ -200,6 +210,20 @@ public class ChangeStreamOptions { return this; } + /** + * Set the cluster time to resume from. + * + * @param resumeTimestamp must not be {@literal null}. + * @return this. + */ + public ChangeStreamOptionsBuilder resumeAt(Instant resumeTimestamp) { + + Assert.notNull(resumeTimestamp, "ResumeTimestamp must not be null!"); + + this.resumeTimestamp = resumeTimestamp; + return this; + } + /** * @return the built {@link ChangeStreamOptions} */ @@ -211,6 +235,7 @@ public class ChangeStreamOptions { options.resumeToken = resumeToken; options.fullDocumentLookup = fullDocumentLookup; options.collation = collation; + options.resumeTimestamp = resumeTimestamp; return options; } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java index dd152a917..65f1ed71d 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java @@ -3534,18 +3534,13 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, return super.count(query, entityClass, collectionName); } - AggregationUtil aggregationUtil = new AggregationUtil(delegate.queryMapper, delegate.mappingContext); - Aggregation aggregation = aggregationUtil.createCountAggregation(query, entityClass); - AggregationResults aggregationResults = aggregate(aggregation, collectionName, Document.class); + CountOptions options = new CountOptions(); + query.getCollation().map(Collation::toMongoCollation).ifPresent(options::collation); - List result = (List) aggregationResults.getRawResults().getOrDefault("results", - Collections.emptyList()); + Document document = delegate.queryMapper.getMappedObject(query.getQueryObject(), + Optional.ofNullable(entityClass).map(it -> delegate.mappingContext.getPersistentEntity(entityClass))); - if (result.isEmpty()) { - return 0; - } - - return result.get(0).get("totalEntityCount", Number.class).longValue(); + return execute(collectionName, collection -> collection.countDocuments(document, options)); } } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java index 63c1c21ea..34683b45f 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java @@ -19,7 +19,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.util.Collection; -import java.util.List; import java.util.function.Consumer; import java.util.function.Supplier; @@ -1357,9 +1356,10 @@ public interface ReactiveMongoOperations extends ReactiveFluentMongoOperations { Flux tail(Query query, Class entityClass, String collectionName); /** - * Subscribe to a MongoDB Change Streams via the reactive - * infrastructure. Use the optional provided {@link Aggregation} to filter events. The stream will not be completed - * unless the {@link org.reactivestreams.Subscription} is {@link Subscription#cancel() canceled}. + * Subscribe to a MongoDB Change Streams for all events + * in the configured default database via the reactive infrastructure. Use the optional provided {@link Aggregation} + * to filter events. The stream will not be completed unless the {@link org.reactivestreams.Subscription} is + * {@link Subscription#cancel() canceled}. *

* The {@link ChangeStreamEvent#getBody()} is mapped to the {@literal resultType} while the * {@link ChangeStreamEvent#getRaw()} contains the unmodified payload. @@ -1367,38 +1367,62 @@ public interface ReactiveMongoOperations extends ReactiveFluentMongoOperations { * Use {@link ChangeStreamOptions} to set arguments like {@link ChangeStreamOptions#getResumeToken() the resumseToken} * for resuming change streams. * - * @param filter can be {@literal null}. - * @param resultType must not be {@literal null}. - * @param options must not be {@literal null}. - * @param collectionName must not be {@literal null} nor empty. + * @param options must not be {@literal null}. Use {@link ChangeStreamOptions#empty()}. + * @param targetType the result type to use. * @param - * @return + * @return the {@link Flux} emitting {@link ChangeStreamEvent events} as they arrive. * @since 2.1 */ - Flux> changeStream(@Nullable Aggregation filter, Class resultType, - ChangeStreamOptions options, String collectionName); + default Flux> changeStream(ChangeStreamOptions options, Class targetType) { + return changeStream(null, options, targetType); + } + + /** + * Subscribe to a MongoDB Change Streams for all events + * in the given collection via the reactive infrastructure. Use the optional provided {@link Aggregation} to filter + * events. The stream will not be completed unless the {@link org.reactivestreams.Subscription} is + * {@link Subscription#cancel() canceled}. + *

+ * The {@link ChangeStreamEvent#getBody()} is mapped to the {@literal resultType} while the + * {@link ChangeStreamEvent#getRaw()} contains the unmodified payload. + *

+ * Use {@link ChangeStreamOptions} to set arguments like {@link ChangeStreamOptions#getResumeToken() the resumseToken} + * for resuming change streams. + * + * @param collectionName the collection to watch. Can be {@literal null}, watches all collections if so. + * @param options must not be {@literal null}. Use {@link ChangeStreamOptions#empty()}. + * @param targetType the result type to use. + * @param + * @return the {@link Flux} emitting {@link ChangeStreamEvent events} as they arrive. + * @since 2.1 + */ + default Flux> changeStream(@Nullable String collectionName, ChangeStreamOptions options, + Class targetType) { + + return changeStream(null, collectionName, options, targetType); + } /** * Subscribe to a MongoDB Change Streams via the reactive - * infrastructure. Use the optional provided aggregation chain to filter events. The stream will not be completed + * infrastructure. Use the optional provided {@link Aggregation} to filter events. The stream will not be completed * unless the {@link org.reactivestreams.Subscription} is {@link Subscription#cancel() canceled}. *

* The {@link ChangeStreamEvent#getBody()} is mapped to the {@literal resultType} while the * {@link ChangeStreamEvent#getRaw()} contains the unmodified payload. *

- * Use {@link ChangeStreamOptions} to set arguments like {@link ChangeStreamOptions#getResumeToken() the resumeToken} + * Use {@link ChangeStreamOptions} to set arguments like {@link ChangeStreamOptions#getResumeToken() the resumseToken} * for resuming change streams. * - * @param filter can be empty, must not be {@literal null}. - * @param resultType must not be {@literal null}. - * @param options must not be {@literal null}. - * @param collectionName must not be {@literal null} nor empty. + * @param database the database to watch. Can be {@literal null}, uses configured default if so. + * @param collectionName the collection to watch. Can be {@literal null}, watches all collections if so. + * @param options must not be {@literal null}. Use {@link ChangeStreamOptions#empty()}. + * @param targetType the result type to use. * @param - * @return + * @return the {@link Flux} emitting {@link ChangeStreamEvent events} as they arrive. * @since 2.1 */ - Flux> changeStream(List filter, Class resultType, ChangeStreamOptions options, - String collectionName); + Flux> changeStream(@Nullable String database, @Nullable String collectionName, + ChangeStreamOptions options, Class targetType); /** * Execute a map-reduce operation. Use {@link MapReduceOptions} to optionally specify an output collection and other diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java index 17bb1e20c..c6e3bf918 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java @@ -34,6 +34,7 @@ import java.util.stream.Collectors; import javax.annotation.Nonnull; +import org.bson.BsonTimestamp; import org.bson.BsonValue; import org.bson.Document; import org.bson.codecs.Codec; @@ -71,10 +72,8 @@ import org.springframework.data.mapping.model.ConvertingPropertyAccessor; import org.springframework.data.mongodb.MongoDbFactory; import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory; import org.springframework.data.mongodb.core.aggregation.Aggregation; -import org.springframework.data.mongodb.core.aggregation.AggregationOperation; import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext; import org.springframework.data.mongodb.core.aggregation.AggregationOptions; -import org.springframework.data.mongodb.core.aggregation.CountOperation; import org.springframework.data.mongodb.core.aggregation.PrefixingDelegatingAggregationOperationContext; import org.springframework.data.mongodb.core.aggregation.TypeBasedAggregationOperationContext; import org.springframework.data.mongodb.core.aggregation.TypedAggregation; @@ -1998,56 +1997,57 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati new TailingQueryFindPublisherPreparer(query, entityClass)); } - /* - * (non-Javadoc) - * @see org.springframework.data.mongodb.core.ReactiveMongoOperations#tail(org.springframework.data.mongodb.core.aggregation.Aggregation, java.lang.Class, org.springframework.data.mongodb.core.ChangeStreamOptions, java.lang.String) - */ @Override - public Flux> changeStream(@Nullable Aggregation filter, Class resultType, - ChangeStreamOptions options, String collectionName) { + public Flux> changeStream(@Nullable String database, @Nullable String collectionName, + ChangeStreamOptions options, Class targetType) { - Assert.notNull(resultType, "Result type must not be null!"); - Assert.notNull(options, "ChangeStreamOptions must not be null!"); - Assert.hasText(collectionName, "Collection name must not be null or empty!"); + List filter = prepareFilter(options); + FullDocument fullDocument = ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT + : FullDocument.UPDATE_LOOKUP; - if (filter == null) { - return changeStream(Collections.emptyList(), resultType, options, collectionName); + MongoDatabase db = StringUtils.hasText(database) ? mongoDatabaseFactory.getMongoDatabase(database) + : getMongoDatabase(); + + ChangeStreamPublisher publisher; + if (StringUtils.hasText(collectionName)) { + publisher = filter.isEmpty() ? db.getCollection(collectionName).watch(Document.class) + : db.getCollection(collectionName).watch(filter, Document.class); + + } else { + publisher = filter.isEmpty() ? db.watch(Document.class) : db.watch(filter, Document.class); } - AggregationOperationContext context = filter instanceof TypedAggregation ? new TypeBasedAggregationOperationContext( - ((TypedAggregation) filter).getInputType(), mappingContext, queryMapper) : Aggregation.DEFAULT_CONTEXT; + publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::resumeAfter).orElse(publisher); + publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation).orElse(publisher); + publisher = options.getResumeTimestamp().map(it -> new BsonTimestamp(it.toEpochMilli())) + .map(publisher::startAtOperationTime).orElse(publisher); + publisher = publisher.fullDocument(options.getFullDocumentLookup().orElse(fullDocument)); - return changeStream( - filter.toPipeline(new PrefixingDelegatingAggregationOperationContext(context, "fullDocument", - Arrays.asList("operationType", "fullDocument", "documentKey", "updateDescription", "ns"))), - resultType, options, collectionName); + return Flux.from(publisher).map(document -> new ChangeStreamEvent<>(document, targetType, getConverter())); } - /* - * (non-Javadoc) - * @see org.springframework.data.mongodb.core.ReactiveMongoOperations#tail(java.util.List, java.lang.Class, org.springframework.data.mongodb.core.ChangeStreamOptions, java.lang.String) - */ - @Override - public Flux> changeStream(List filter, Class resultType, - ChangeStreamOptions options, String collectionName) { - - Assert.notNull(filter, "Filter must not be null!"); - Assert.notNull(resultType, "Result type must not be null!"); - Assert.notNull(options, "ChangeStreamOptions must not be null!"); - Assert.hasText(collectionName, "Collection name must not be null or empty!"); + List prepareFilter(ChangeStreamOptions options) { - ChangeStreamPublisher publisher = filter.isEmpty() ? getCollection(collectionName).watch() - : getCollection(collectionName).watch(filter); + if (!options.getFilter().isPresent()) { + return Collections.emptyList(); + } - publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::resumeAfter).orElse(publisher); - publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation).orElse(publisher); + Object filter = options.getFilter().get(); + if (filter instanceof Aggregation) { + Aggregation agg = (Aggregation) filter; + AggregationOperationContext context = agg instanceof TypedAggregation + ? new TypeBasedAggregationOperationContext(((TypedAggregation) agg).getInputType(), + getConverter().getMappingContext(), queryMapper) + : Aggregation.DEFAULT_CONTEXT; - if (options.getFullDocumentLookup().isPresent() || resultType != Document.class) { - publisher = publisher.fullDocument(options.getFullDocumentLookup().isPresent() - ? options.getFullDocumentLookup().get() : FullDocument.UPDATE_LOOKUP); + return agg.toPipeline(new PrefixingDelegatingAggregationOperationContext(context, "fullDocument", + Arrays.asList("operationType", "fullDocument", "documentKey", "updateDescription", "ns"))); + } else if (filter instanceof List) { + return (List) filter; + } else { + throw new IllegalArgumentException( + "ChangeStreamRequestOptions.filter mut be either an Aggregation or a plain list of Documents"); } - - return Flux.from(publisher).map(document -> new ChangeStreamEvent<>(document, resultType, getConverter())); } /* @@ -3396,41 +3396,19 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati return super.count(query, entityClass, collectionName); } - AggregationUtil aggregationUtil = new AggregationUtil(delegate.queryMapper, delegate.mappingContext); - Aggregation aggregation = aggregationUtil.createCountAggregation(query, entityClass); - - return aggregate(aggregation, collectionName, Document.class) // - .next() // - .map(it -> it.get("totalEntityCount", Number.class).longValue()) // - .defaultIfEmpty(0L); - } - - private List computeCountAggregationPipeline(@Nullable Query query, - @Nullable Class entityType) { - - CountOperation count = Aggregation.count().as("totalEntityCount"); - if (query == null || query.getQueryObject().isEmpty()) { - return Arrays.asList(count); - } - - Document mappedQuery = delegate.queryMapper.getMappedObject(query.getQueryObject(), - delegate.getPersistentEntity(entityType)); - - CriteriaDefinition criteria = new CriteriaDefinition() { + return createMono(collectionName, collection -> { - @Override - public Document getCriteriaObject() { - return mappedQuery; - } + final Document Document = query == null ? null + : delegate.queryMapper.getMappedObject(query.getQueryObject(), + entityClass == null ? null : delegate.mappingContext.getPersistentEntity(entityClass)); - @Nullable - @Override - public String getKey() { - return null; + CountOptions options = new CountOptions(); + if (query != null) { + query.getCollation().map(Collation::toMongoCollation).ifPresent(options::collation); } - }; - return Arrays.asList(Aggregation.match(criteria), count); + return collection.countDocuments(Document, options); + }); } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/PrefixingDelegatingAggregationOperationContext.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/PrefixingDelegatingAggregationOperationContext.java index 11edfb58f..8ebfd683d 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/PrefixingDelegatingAggregationOperationContext.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/PrefixingDelegatingAggregationOperationContext.java @@ -102,7 +102,7 @@ public class PrefixingDelegatingAggregationOperationContext implements Aggregati } private String prefixKey(String key) { - return (key.startsWith("$") || blacklist.contains(key)) ? key : (prefix + "." + key); + return (key.startsWith("$") || isBlacklisted(key)) ? key : (prefix + "." + key); } private Object prefixCollection(Collection sourceCollection) { @@ -119,4 +119,23 @@ public class PrefixingDelegatingAggregationOperationContext implements Aggregati return prefixed; } + + private boolean isBlacklisted(String key) { + + if (blacklist.contains(key)) { + return true; + } + + if (!key.contains(".")) { + return false; + } + + for (String blacklisted : blacklist) { + if (key.startsWith(blacklisted + ".")) { + return true; + } + } + + return false; + } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java index fd24ef31a..0d86cc020 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java @@ -15,6 +15,8 @@ */ package org.springframework.data.mongodb.core.messaging; +import java.time.Instant; + import org.bson.BsonValue; import org.bson.Document; import org.springframework.data.mongodb.core.ChangeStreamOptions; @@ -34,7 +36,7 @@ import com.mongodb.client.model.changestream.FullDocument; * using the synchronous MongoDB Java driver. *

* The most trivial use case is subscribing to all events of a specific {@link com.mongodb.client.MongoCollection - * collection}. + * collection} * *

  * 
@@ -42,6 +44,15 @@ import com.mongodb.client.model.changestream.FullDocument;
  * 
  * 
* + * or {@link com.mongodb.client.MongoDatabase} which receives events from all {@link com.mongodb.client.MongoCollection + * collections} in that database. + * + *
+ * 
+ *     ChangeStreamRequest request = new ChangeStreamRequest<>(System.out::println, RequestOptions.justDatabase("test"));
+ * 
+ * 
+ * * For more advanced scenarios {@link ChangeStreamOptions} offers abstractions for options like filtering, resuming,... * *
@@ -154,21 +165,23 @@ public class ChangeStreamRequest
 	 */
 	public static class ChangeStreamRequestOptions implements SubscriptionRequest.RequestOptions {
 
-		private final String collectionName;
+		private final @Nullable String databaseName;
+		private final @Nullable String collectionName;
 		private final ChangeStreamOptions options;
 
 		/**
 		 * Create new {@link ChangeStreamRequestOptions}.
 		 *
-		 * @param collectionName must not be {@literal null}.
+		 * @param collectionName can be {@literal null}.
 		 * @param options must not be {@literal null}.
 		 */
-		public ChangeStreamRequestOptions(String collectionName, ChangeStreamOptions options) {
+		public ChangeStreamRequestOptions(@Nullable String databaseName, @Nullable String collectionName,
+				ChangeStreamOptions options) {
 
-			Assert.notNull(collectionName, "CollectionName must not be null!");
 			Assert.notNull(options, "Options must not be null!");
 
 			this.collectionName = collectionName;
+			this.databaseName = databaseName;
 			this.options = options;
 		}
 
@@ -176,7 +189,8 @@ public class ChangeStreamRequest
 
 			Assert.notNull(options, "Options must not be null!");
 
-			return new ChangeStreamRequestOptions(options.getCollectionName(), ChangeStreamOptions.builder().build());
+			return new ChangeStreamRequestOptions(options.getDatabaseName(), options.getCollectionName(),
+					ChangeStreamOptions.builder().build());
 		}
 
 		/**
@@ -196,6 +210,15 @@ public class ChangeStreamRequest
 		public String getCollectionName() {
 			return collectionName;
 		}
+
+		/*
+		 * (non-Javadoc)
+		 * @see org.springframework.data.mongodb.monitor.SubscriptionRequest.RequestOptions#getDatabaseName()
+		 */
+		@Override
+		public String getDatabaseName() {
+			return databaseName;
+		}
 	}
 
 	/**
@@ -207,12 +230,27 @@ public class ChangeStreamRequest
 	 */
 	public static class ChangeStreamRequestBuilder {
 
+		private @Nullable String databaseName;
 		private @Nullable String collectionName;
 		private @Nullable MessageListener, ? super T> listener;
 		private ChangeStreamOptionsBuilder delegate = ChangeStreamOptions.builder();
 
 		private ChangeStreamRequestBuilder() {}
 
+		/**
+		 * Set the name of the {@link com.mongodb.client.MongoDatabase} to listen to.
+		 *
+		 * @param databaseName must not be {@literal null} nor empty.
+		 * @return this.
+		 */
+		public ChangeStreamRequestBuilder database(String databaseName) {
+
+			Assert.hasText(databaseName, "DatabaseName must not be null!");
+
+			this.databaseName = databaseName;
+			return this;
+		}
+
 		/**
 		 * Set the name of the {@link com.mongodb.client.MongoCollection} to listen to.
 		 *
@@ -317,6 +355,22 @@ public class ChangeStreamRequest
 			return this;
 		}
 
+		/**
+		 * Set the cluster time at which to resume listening.
+		 *
+		 * @param clusterTime must not be {@literal null}.
+		 * @return this.
+		 * @see ChangeStreamOptions#getResumeTimestamp()
+		 * @see ChangeStreamOptionsBuilder#resumeAt(java.time.Instant)
+		 */
+		public ChangeStreamRequestBuilder resumeAt(Instant clusterTime) {
+
+			Assert.notNull(clusterTime, "ClusterTime must not be null!");
+
+			this.delegate.resumeAt(clusterTime);
+			return this;
+		}
+
 		/**
 		 * Set the {@link FullDocument} lookup to {@link FullDocument#UPDATE_LOOKUP}.
 		 *
@@ -339,9 +393,9 @@ public class ChangeStreamRequest
 		public ChangeStreamRequest build() {
 
 			Assert.notNull(listener, "MessageListener must not be null!");
-			Assert.hasText(collectionName, "CollectionName must not be null!");
 
-			return new ChangeStreamRequest<>(listener, new ChangeStreamRequestOptions(collectionName, delegate.build()));
+			return new ChangeStreamRequest<>(listener,
+					new ChangeStreamRequestOptions(databaseName, collectionName, delegate.build()));
 		}
 	}
 }
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java
index 5c604fbfb..38ca51ddf 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java
@@ -17,14 +17,16 @@ package org.springframework.data.mongodb.core.messaging;
 
 import lombok.AllArgsConstructor;
 
+import java.time.Instant;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Optional;
 import java.util.Set;
 
 import org.bson.BsonDocument;
+import org.bson.BsonTimestamp;
+import org.bson.BsonValue;
 import org.bson.Document;
 import org.springframework.data.mongodb.core.ChangeStreamEvent;
 import org.springframework.data.mongodb.core.ChangeStreamOptions;
@@ -42,10 +44,12 @@ import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.Reque
 import org.springframework.lang.Nullable;
 import org.springframework.util.ClassUtils;
 import org.springframework.util.ErrorHandler;
+import org.springframework.util.StringUtils;
 
 import com.mongodb.MongoNamespace;
 import com.mongodb.client.ChangeStreamIterable;
 import com.mongodb.client.MongoCursor;
+import com.mongodb.client.MongoDatabase;
 import com.mongodb.client.model.Collation;
 import com.mongodb.client.model.changestream.ChangeStreamDocument;
 import com.mongodb.client.model.changestream.FullDocument;
@@ -84,7 +88,9 @@ class ChangeStreamTask extends CursorReadingTask,
 		List filter = Collections.emptyList();
 		BsonDocument resumeToken = new BsonDocument();
 		Collation collation = null;
-		FullDocument fullDocument = FullDocument.DEFAULT;
+		FullDocument fullDocument = ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT
+				: FullDocument.UPDATE_LOOKUP;
+		BsonTimestamp startAt = null;
 
 		if (options instanceof ChangeStreamRequest.ChangeStreamRequestOptions) {
 
@@ -107,16 +113,33 @@ class ChangeStreamTask extends CursorReadingTask,
 			fullDocument = changeStreamOptions.getFullDocumentLookup()
 					.orElseGet(() -> ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT
 							: FullDocument.UPDATE_LOOKUP);
+
+			if (changeStreamOptions.getResumeTimestamp().isPresent()) {
+				startAt = new BsonTimestamp(changeStreamOptions.getResumeTimestamp().get().toEpochMilli());
+			}
 		}
 
-		ChangeStreamIterable iterable = filter.isEmpty()
-				? template.getCollection(options.getCollectionName()).watch(Document.class)
-				: template.getCollection(options.getCollectionName()).watch(filter, Document.class);
+		MongoDatabase db = StringUtils.hasText(options.getDatabaseName())
+				? template.getMongoDbFactory().getDb(options.getDatabaseName()) : template.getDb();
+
+		ChangeStreamIterable iterable;
+
+		if (StringUtils.hasText(options.getCollectionName())) {
+			iterable = filter.isEmpty() ? db.getCollection(options.getCollectionName()).watch(Document.class)
+					: db.getCollection(options.getCollectionName()).watch(filter, Document.class);
+
+		} else {
+			iterable = filter.isEmpty() ? db.watch(Document.class) : db.watch(filter, Document.class);
+		}
 
 		if (!resumeToken.isEmpty()) {
 			iterable = iterable.resumeAfter(resumeToken);
 		}
 
+		if (startAt != null) {
+			iterable.startAtOperationTime(startAt);
+		}
+
 		if (collation != null) {
 			iterable = iterable.collation(collation);
 		}
@@ -153,14 +176,21 @@ class ChangeStreamTask extends CursorReadingTask,
 	protected Message, Object> createMessage(ChangeStreamDocument source,
 			Class targetType, RequestOptions options) {
 
-		// namespace might be null for eg. OperationType.INVALIDATE
-		MongoNamespace namespace = Optional.ofNullable(source.getNamespace())
-				.orElse(new MongoNamespace("unknown", options.getCollectionName()));
+		MongoNamespace namespace = source.getNamespace() != null ? source.getNamespace()
+				: createNamespaceFromOptions(options);
 
 		return new ChangeStreamEventMessage<>(new ChangeStreamEvent<>(source, targetType, mongoConverter), MessageProperties
 				.builder().databaseName(namespace.getDatabaseName()).collectionName(namespace.getCollectionName()).build());
 	}
 
+	MongoNamespace createNamespaceFromOptions(RequestOptions options) {
+
+		String collectionName = StringUtils.hasText(options.getCollectionName()) ? options.getCollectionName() : "unknown";
+		String databaseName = StringUtils.hasText(options.getDatabaseName()) ? options.getDatabaseName() : "unknown";
+
+		return new MongoNamespace(databaseName, collectionName);
+	}
+
 	/**
 	 * {@link Message} implementation for ChangeStreams
 	 *
@@ -200,5 +230,32 @@ class ChangeStreamTask extends CursorReadingTask,
 		public MessageProperties getProperties() {
 			return this.messageProperties;
 		}
+
+		/**
+		 * @return the resume token or {@litearl null} if not set.
+		 * @see ChangeStreamEvent#getResumeToken()
+		 */
+		@Nullable
+		BsonValue getResumeToken() {
+			return delegate.getResumeToken();
+		}
+
+		/**
+		 * @return the cluster time of the event or {@literal null}.
+		 * @see ChangeStreamEvent#getTimestamp()
+		 */
+		@Nullable
+		Instant getTimestamp() {
+			return delegate.getTimestamp();
+		}
+
+		/**
+		 * Get the {@link ChangeStreamEvent} from the message.
+		 *
+		 * @return never {@literal null}.
+		 */
+		ChangeStreamEvent getChangeStreamEvent() {
+			return delegate;
+		}
 	}
 }
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/SubscriptionRequest.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/SubscriptionRequest.java
index 854aa9248..307a0bc22 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/SubscriptionRequest.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/SubscriptionRequest.java
@@ -15,7 +15,10 @@
  */
 package org.springframework.data.mongodb.core.messaging;
 
+import org.springframework.data.mongodb.MongoDbFactory;
 import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions;
+import org.springframework.lang.Nullable;
+import org.springframework.util.Assert;
 
 /**
  * The actual {@link SubscriptionRequest} sent to the {@link MessageListenerContainer}. This wrapper type allows passing
@@ -51,8 +54,93 @@ public interface SubscriptionRequest {
 	interface RequestOptions {
 
 		/**
-		 * @return the name of the collection to subscribe to. Never {@literal null}.
+		 * Get the database name of the db.
+		 *
+		 * @return the name of the database to subscribe to. Can be {@literal null} in which case the default
+		 *         {@link MongoDbFactory#getDb()} is used.
 		 */
+		@Nullable
+		default String getDatabaseName() {
+			return null;
+		}
+
+		/**
+		 * Get the collection name.
+		 *
+		 * @return the name of the collection to subscribe to. Can be {@literal null}.
+		 */
+		@Nullable
 		String getCollectionName();
+
+		/**
+		 * Create empty options.
+		 *
+		 * @return new instance of empty {@link RequestOptions}.
+		 */
+		static RequestOptions none() {
+			return () -> null;
+		}
+
+		/**
+		 * Create options with the provided database.
+		 *
+		 * @param database must not be {@literal null}.
+		 * @return new instance of empty {@link RequestOptions}.
+		 */
+		static RequestOptions justDatabase(String database) {
+
+			Assert.notNull(database, "Database must not be null!");
+
+			return new RequestOptions() {
+
+				@Override
+				public String getCollectionName() {
+					return null;
+				}
+
+				@Override
+				public String getDatabaseName() {
+					return database;
+				}
+			};
+		}
+
+		/**
+		 * Create options with the provided collection.
+		 *
+		 * @param collection must not be {@literal null}.
+		 * @return new instance of empty {@link RequestOptions}.
+		 */
+		static RequestOptions justCollection(String collection) {
+
+			Assert.notNull(collection, "Collection must not be null!");
+			return () -> collection;
+		}
+
+		/**
+		 * Create options with the provided database and collection.
+		 *
+		 * @param database must not be {@literal null}.
+		 * @param collection must not be {@literal null}.
+		 * @return new instance of empty {@link RequestOptions}.
+		 */
+		static RequestOptions of(String database, String collection) {
+
+			Assert.notNull(database, "Database must not be null!");
+			Assert.notNull(collection, "Collection must not be null!");
+
+			return new RequestOptions() {
+
+				@Override
+				public String getCollectionName() {
+					return collection;
+				}
+
+				@Override
+				public String getDatabaseName() {
+					return database;
+				}
+			};
+		}
 	}
 }
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java
index 95b224b8d..124c311a4 100644
--- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java
@@ -28,6 +28,7 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
+import java.time.Instant;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -1115,8 +1116,7 @@ public class ReactiveMongoTemplateTests {
 		StepVerifier.create(template.createCollection(Person.class)).expectNextCount(1).verifyComplete();
 
 		BlockingQueue> documents = new LinkedBlockingQueue<>(100);
-		Disposable disposable = template
-				.changeStream(Collections.emptyList(), Document.class, ChangeStreamOptions.empty(), "person")
+		Disposable disposable = template.changeStream("person", ChangeStreamOptions.empty(), Document.class)
 				.doOnNext(documents::add).subscribe();
 
 		Thread.sleep(500); // just give it some time to link to the collection.
@@ -1147,8 +1147,7 @@ public class ReactiveMongoTemplateTests {
 		StepVerifier.create(template.createCollection(Person.class)).expectNextCount(1).verifyComplete();
 
 		BlockingQueue> documents = new LinkedBlockingQueue<>(100);
-		Disposable disposable = template
-				.changeStream(Collections.emptyList(), Person.class, ChangeStreamOptions.empty(), "person")
+		Disposable disposable = template.changeStream("person", ChangeStreamOptions.empty(), Person.class)
 				.doOnNext(documents::add).subscribe();
 
 		Thread.sleep(500); // just give it some time to link to the collection.
@@ -1179,8 +1178,9 @@ public class ReactiveMongoTemplateTests {
 		StepVerifier.create(template.createCollection(Person.class)).expectNextCount(1).verifyComplete();
 
 		BlockingQueue> documents = new LinkedBlockingQueue<>(100);
-		Disposable disposable = template.changeStream(newAggregation(Person.class, match(where("age").gte(38))),
-				Person.class, ChangeStreamOptions.empty(), "person").doOnNext(documents::add).subscribe();
+		Disposable disposable = template.changeStream("person",
+				ChangeStreamOptions.builder().filter(newAggregation(Person.class, match(where("age").gte(38)))).build(),
+				Person.class).doOnNext(documents::add).subscribe();
 
 		Thread.sleep(500); // just give it some time to link to the collection.
 
@@ -1211,8 +1211,10 @@ public class ReactiveMongoTemplateTests {
 
 		BlockingQueue> documents = new LinkedBlockingQueue<>(100);
 		Disposable disposable = template
-				.changeStream(newAggregation(Person.class, match(where("operationType").is("replace"))), Person.class,
-						ChangeStreamOptions.empty(), "person")
+				.changeStream("person",
+						ChangeStreamOptions.builder()
+								.filter(newAggregation(Person.class, match(where("operationType").is("replace")))).build(),
+						Person.class)
 				.doOnNext(documents::add).subscribe();
 
 		Thread.sleep(500); // just give it some time to link to the collection.
@@ -1246,8 +1248,7 @@ public class ReactiveMongoTemplateTests {
 		StepVerifier.create(template.createCollection(Person.class)).expectNextCount(1).verifyComplete();
 
 		BlockingQueue> documents = new LinkedBlockingQueue<>(100);
-		Disposable disposable = template
-				.changeStream(Collections.emptyList(), Person.class, ChangeStreamOptions.empty(), "person")
+		Disposable disposable = template.changeStream("person", ChangeStreamOptions.empty(), Person.class)
 				.doOnNext(documents::add).subscribe();
 
 		Thread.sleep(500); // just give it some time to link to the collection.
@@ -1267,9 +1268,7 @@ public class ReactiveMongoTemplateTests {
 		BsonDocument resumeToken = documents.take().getRaw().getResumeToken();
 
 		BlockingQueue> resumeDocuments = new LinkedBlockingQueue<>(100);
-		template
-				.changeStream(Collections.emptyList(), Person.class,
-						ChangeStreamOptions.builder().resumeToken(resumeToken).build(), "person")
+		template.changeStream("person", ChangeStreamOptions.builder().resumeToken(resumeToken).build(), Person.class)
 				.doOnNext(resumeDocuments::add).subscribe();
 
 		Thread.sleep(500); // just give it some time to link receive all events
@@ -1314,6 +1313,81 @@ public class ReactiveMongoTemplateTests {
 				.verifyComplete();
 	}
 
+	@Test // DATAMONGO-2012
+	public void watchesDatabaseCorrectly() throws InterruptedException {
+
+		Assumptions.assumeThat(ReplicaSet.required().runsAsReplicaSet()).isTrue();
+
+		StepVerifier.create(template.createCollection(Person.class)).expectNextCount(1).verifyComplete();
+		StepVerifier.create(template.createCollection("personX")).expectNextCount(1).verifyComplete();
+
+		BlockingQueue> documents = new LinkedBlockingQueue<>(100);
+		Disposable disposable = template.changeStream(ChangeStreamOptions.empty(), Person.class).doOnNext(documents::add)
+				.subscribe();
+
+		Thread.sleep(500); // just give it some time to link to the collection.
+
+		Person person1 = new Person("Spring", 38);
+		Person person2 = new Person("Data", 37);
+		Person person3 = new Person("MongoDB", 39);
+
+		StepVerifier.create(template.save(person1)).expectNextCount(1).verifyComplete();
+		StepVerifier.create(template.save(person2)).expectNextCount(1).verifyComplete();
+		StepVerifier.create(template.save(person3, "personX")).expectNextCount(1).verifyComplete();
+
+		Thread.sleep(500); // just give it some time to link receive all events
+
+		try {
+			Assertions.assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList()))
+					.containsExactly(person1, person2, person3);
+		} finally {
+			disposable.dispose();
+		}
+	}
+
+	@Test // DATAMONGO-2012
+	public void resumesAtTimestampCorrectly() throws InterruptedException {
+
+		Assumptions.assumeThat(ReplicaSet.required().runsAsReplicaSet()).isTrue();
+
+		StepVerifier.create(template.createCollection(Person.class)).expectNextCount(1).verifyComplete();
+
+		BlockingQueue> documents = new LinkedBlockingQueue<>(100);
+		Disposable disposable = template.changeStream("person", ChangeStreamOptions.empty(), Person.class)
+				.doOnNext(documents::add).subscribe();
+
+		Thread.sleep(500); // just give it some time to link to the collection.
+
+		Person person1 = new Person("Spring", 38);
+		Person person2 = new Person("Data", 37);
+		Person person3 = new Person("MongoDB", 39);
+
+		StepVerifier.create(template.save(person1)).expectNextCount(1).verifyComplete();
+		StepVerifier.create(template.save(person2)).expectNextCount(1).verifyComplete();
+
+		Thread.sleep(500); // just give it some time to link receive all events
+
+		disposable.dispose();
+
+		documents.take(); // skip first
+		Instant resumeAt = documents.take().getTimestamp(); // take 2nd
+
+		StepVerifier.create(template.save(person3)).expectNextCount(1).verifyComplete();
+
+		BlockingQueue> resumeDocuments = new LinkedBlockingQueue<>(100);
+		template.changeStream("person", ChangeStreamOptions.builder().resumeAt(resumeAt).build(), Person.class)
+				.doOnNext(resumeDocuments::add).subscribe();
+
+		Thread.sleep(500); // just give it some time to link receive all events
+
+		try {
+			Assertions.assertThat(resumeDocuments.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList()))
+					.containsExactly(person2, person3);
+		} finally {
+			disposable.dispose();
+		}
+	}
+
 	private PersonWithAList createPersonWithAList(String firstname, int age) {
 
 		PersonWithAList p = new PersonWithAList();
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/SessionBoundMongoTemplateTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/SessionBoundMongoTemplateTests.java
index 90e9d9749..5005af9f4 100644
--- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/SessionBoundMongoTemplateTests.java
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/SessionBoundMongoTemplateTests.java
@@ -40,6 +40,7 @@ import org.bson.Document;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -49,6 +50,7 @@ import org.springframework.aop.Advisor;
 import org.springframework.aop.framework.Advised;
 import org.springframework.dao.DataAccessException;
 import org.springframework.data.annotation.Id;
+import org.springframework.data.geo.Point;
 import org.springframework.data.mongodb.ClientSessionException;
 import org.springframework.data.mongodb.LazyLoadingException;
 import org.springframework.data.mongodb.MongoDbFactory;
@@ -60,6 +62,7 @@ import org.springframework.data.mongodb.core.convert.DefaultDbRefResolver;
 import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
 import org.springframework.data.mongodb.core.convert.MongoConverter;
 import org.springframework.data.mongodb.core.convert.MongoCustomConversions;
+import org.springframework.data.mongodb.core.index.GeospatialIndex;
 import org.springframework.data.mongodb.core.mapping.DBRef;
 import org.springframework.data.mongodb.core.mapping.MongoMappingContext;
 import org.springframework.data.mongodb.core.query.Query;
@@ -295,6 +298,31 @@ public class SessionBoundMongoTemplateTests {
 		session.close();
 	}
 
+	@Test // DATAMONGO-2012
+	@Ignore("error 2 (BadValue): $match does not support $geoNear, $near, and $nearSphere")
+	public void countWithGeoInTransaction() {
+
+		if (!template.collectionExists(Person.class)) {
+			template.createCollection(Person.class);
+			template.indexOps(Person.class).ensureIndex(new GeospatialIndex("location"));
+		} else {
+			template.remove(Person.class).all();
+		}
+
+		ClientSession session = client.startSession();
+		session.startTransaction();
+
+		MongoTemplate sessionBound = template.withSession(session);
+
+		sessionBound.save(new Person("Kylar Stern"));
+
+		assertThat(sessionBound.query(Person.class).matching(query(where("location").near(new Point(1, 0)))).count())
+				.isZero();
+
+		session.commitTransaction();
+		session.close();
+	}
+
 	@Test // DATAMONGO-2001
 	public void countShouldReturnIsolatedCount() throws InterruptedException {
 
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTests.java
index 9408dbc21..c76db2b59 100644
--- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTests.java
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTests.java
@@ -21,8 +21,11 @@ import static org.springframework.data.mongodb.core.messaging.SubscriptionUtils.
 import static org.springframework.data.mongodb.core.query.Criteria.*;
 import static org.springframework.data.mongodb.core.query.Query.*;
 
+import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
+import java.time.Instant;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -39,17 +42,16 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TestRule;
 import org.springframework.data.annotation.Id;
-import org.springframework.data.mongodb.core.ChangeStreamOptions;
 import org.springframework.data.mongodb.core.MongoTemplate;
 import org.springframework.data.mongodb.core.mapping.Field;
-import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest.ChangeStreamRequestOptions;
+import org.springframework.data.mongodb.core.messaging.ChangeStreamTask.ChangeStreamEventMessage;
 import org.springframework.data.mongodb.core.messaging.Message.MessageProperties;
 import org.springframework.data.mongodb.core.messaging.SubscriptionUtils.*;
 import org.springframework.data.mongodb.core.query.Criteria;
 import org.springframework.data.mongodb.core.query.Update;
+import org.springframework.data.mongodb.test.util.MongoTestUtils;
 import org.springframework.data.mongodb.test.util.ReplicaSet;
 
-import com.mongodb.MongoClient;
 import com.mongodb.client.model.changestream.ChangeStreamDocument;
 import com.mongodb.client.model.changestream.FullDocument;
 
@@ -80,7 +82,7 @@ public class ChangeStreamTests {
 	@Before
 	public void setUp() {
 
-		template = new MongoTemplate(new MongoClient(), "change-stream-tests");
+		template = new MongoTemplate(MongoTestUtils.replSetClient(), "change-stream-tests");
 		template.dropCollection(User.class);
 
 		container = new DefaultMessageListenerContainer(template, executor);
@@ -361,8 +363,10 @@ public class ChangeStreamTests {
 	public void readsOnlyDiffForUpdateWhenOptionsDeclareDefaultExplicitly() throws InterruptedException {
 
 		CollectingMessageListener, User> messageListener = new CollectingMessageListener<>();
-		ChangeStreamRequest request = new ChangeStreamRequest<>(messageListener, new ChangeStreamRequestOptions(
-				"user", ChangeStreamOptions.builder().fullDocumentLookup(FullDocument.DEFAULT).build()));
+		ChangeStreamRequest request = ChangeStreamRequest.builder() //
+				.collection("user") //
+				.fullDocumentLookup(FullDocument.DEFAULT) //
+				.publishTo(messageListener).build();
 
 		Subscription subscription = container.register(request, User.class);
 		awaitSubscription(subscription);
@@ -381,8 +385,10 @@ public class ChangeStreamTests {
 	public void readsFullDocumentForUpdateWhenNotMappedToDomainTypeButLookupSpecified() throws InterruptedException {
 
 		CollectingMessageListener, Document> messageListener = new CollectingMessageListener<>();
-		ChangeStreamRequest request = new ChangeStreamRequest<>(messageListener,
-				new ChangeStreamRequestOptions("user", ChangeStreamOptions.builder().returnFullDocumentOnUpdate().build()));
+		ChangeStreamRequest request = ChangeStreamRequest.builder() //
+				.collection("user") //
+				.fullDocumentLookup(FullDocument.UPDATE_LOOKUP) //
+				.publishTo(messageListener).build();
 
 		Subscription subscription = container.register(request, Document.class);
 		awaitSubscription(subscription);
@@ -399,11 +405,123 @@ public class ChangeStreamTests {
 				.append("user_name", "jellyBelly").append("age", 8).append("_class", User.class.getName()));
 	}
 
+	@Test // DATAMONGO-2012
+	public void resumeAtTimestampCorrectly() throws InterruptedException {
+
+		CollectingMessageListener, User> messageListener1 = new CollectingMessageListener<>();
+		Subscription subscription1 = container.register(new ChangeStreamRequest<>(messageListener1, () -> "user"),
+				User.class);
+
+		awaitSubscription(subscription1);
+
+		template.save(jellyBelly);
+		template.save(sugarSplashy);
+
+		awaitMessages(messageListener1, 12);
+
+		Instant resumeAt = ((ChangeStreamEventMessage) messageListener1.getLastMessage()).getTimestamp();
+
+		template.save(huffyFluffy);
+
+		awaitMessages(messageListener1, 3);
+
+		CollectingMessageListener, User> messageListener2 = new CollectingMessageListener<>();
+		ChangeStreamRequest subSequentRequest = ChangeStreamRequest.builder() //
+				.collection("user") //
+				.resumeAt(resumeAt) //
+				.publishTo(messageListener2) //
+				.build();
+
+		Subscription subscription2 = container.register(subSequentRequest, User.class);
+		awaitSubscription(subscription2);
+
+		awaitMessages(messageListener2);
+
+		List messageBodies = messageListener2.getMessages().stream().map(Message::getBody)
+				.collect(Collectors.toList());
+
+		assertThat(messageBodies).hasSize(2).doesNotContain(jellyBelly);
+	}
+
+	@Test // DATAMONGO-1996
+	public void filterOnNestedElementWorksCorrectly() throws InterruptedException {
+
+		CollectingMessageListener, User> messageListener = new CollectingMessageListener<>();
+		ChangeStreamRequest request = ChangeStreamRequest.builder(messageListener) //
+				.collection("user") //
+				.filter(newAggregation(User.class, match(where("address.street").is("flower street")))) //
+				.build();
+
+		Subscription subscription = container.register(request, User.class);
+		awaitSubscription(subscription);
+
+		jellyBelly.address = new Address();
+		jellyBelly.address.street = "candy ave";
+
+		huffyFluffy.address = new Address();
+		huffyFluffy.address.street = "flower street";
+
+		template.save(jellyBelly);
+		template.save(sugarSplashy);
+		template.save(huffyFluffy);
+
+		awaitMessages(messageListener);
+
+		List messageBodies = messageListener.getMessages().stream().map(Message::getBody)
+				.collect(Collectors.toList());
+
+		assertThat(messageBodies).hasSize(1).contains(huffyFluffy);
+	}
+
+	@Test // DATAMONGO-1996
+	public void filterOnUpdateDescriptionElement() throws InterruptedException {
+
+		template.save(jellyBelly);
+		template.save(sugarSplashy);
+		template.save(huffyFluffy);
+
+		CollectingMessageListener, User> messageListener = new CollectingMessageListener<>();
+		ChangeStreamRequest request = ChangeStreamRequest.builder(messageListener) //
+				.collection("user") //
+				.filter(newAggregation(User.class, match(where("updateDescription.updatedFields.address").exists(true)))) //
+				.fullDocumentLookup(FullDocument.UPDATE_LOOKUP).build();
+
+		Subscription subscription = container.register(request, User.class);
+		awaitSubscription(subscription);
+
+		template.update(User.class).matching(query(where("id").is(jellyBelly.id)))
+				.apply(Update.update("address", new Address("candy ave"))).first();
+
+		template.update(User.class).matching(query(where("id").is(sugarSplashy.id))).apply(new Update().inc("age", 1))
+				.first();
+
+		template.update(User.class).matching(query(where("id").is(huffyFluffy.id)))
+				.apply(Update.update("address", new Address("flower street"))).first();
+
+		awaitMessages(messageListener);
+
+		List messageBodies = messageListener.getMessages().stream().map(Message::getBody)
+				.collect(Collectors.toList());
+
+		assertThat(messageBodies).hasSize(2);
+	}
+
 	@Data
 	static class User {
 
 		@Id String id;
 		@Field("user_name") String userName;
 		int age;
+
+		Address address;
 	}
+
+	@Data
+	@AllArgsConstructor
+	@NoArgsConstructor
+	static class Address {
+
+		@Field("s") String street;
+	}
+
 }
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainerTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainerTests.java
index 4ed242342..c5edaf8b1 100644
--- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainerTests.java
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainerTests.java
@@ -36,6 +36,7 @@ import org.springframework.data.annotation.Id;
 import org.springframework.data.mongodb.MongoDbFactory;
 import org.springframework.data.mongodb.core.MongoTemplate;
 import org.springframework.data.mongodb.core.SimpleMongoDbFactory;
+import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions;
 import org.springframework.data.mongodb.test.util.ReplicaSet;
 import org.springframework.test.annotation.IfProfileValue;
 import org.springframework.util.ErrorHandler;
@@ -54,9 +55,12 @@ public class DefaultMessageListenerContainerTests {
 
 	public static final String DATABASE_NAME = "change-stream-events";
 	public static final String COLLECTION_NAME = "collection-1";
+	public static final String COLLECTION_2_NAME = "collection-2";
 	MongoDbFactory dbFactory;
 
 	MongoCollection collection;
+	private MongoCollection collection2;
+
 	private CollectingMessageListener messageListener;
 	private MongoTemplate template;
 
@@ -69,7 +73,10 @@ public class DefaultMessageListenerContainerTests {
 		template = new MongoTemplate(dbFactory);
 
 		template.dropCollection(COLLECTION_NAME);
+		template.dropCollection(COLLECTION_2_NAME);
+
 		collection = template.getCollection(COLLECTION_NAME);
+		collection2 = template.getCollection(COLLECTION_2_NAME);
 
 		messageListener = new CollectingMessageListener<>();
 	}
@@ -308,6 +315,31 @@ public class DefaultMessageListenerContainerTests {
 		assertThat(changeStreamListener.getFirstMessage().getRaw()).isInstanceOf(ChangeStreamDocument.class);
 	}
 
+	@Test // DATAMONGO-2012
+	@IfProfileValue(name = "replSet", value = "true")
+	public void databaseLevelWatch() throws InterruptedException {
+
+		MessageListenerContainer container = new DefaultMessageListenerContainer(template);
+		Subscription subscription = container.register(new ChangeStreamRequest(messageListener, RequestOptions.none()),
+				Person.class);
+
+		container.start();
+
+		awaitSubscription(subscription, Duration.ofMillis(500));
+
+		collection.insertOne(new Document("_id", "col-1-id-1").append("firstname", "foo"));
+		collection.insertOne(new Document("_id", "col-1-id-2").append("firstname", "bar"));
+
+		collection2.insertOne(new Document("_id", "col-2-id-1").append("firstname", "bar"));
+		collection2.insertOne(new Document("_id", "col-2-id-2").append("firstname", "foo"));
+
+		awaitMessages(messageListener, 4, Duration.ofMillis(500));
+
+		assertThat(messageListener.getMessages().stream().map(Message::getBody).collect(Collectors.toList()))
+				.containsExactly(new Person("col-1-id-1", "foo"), new Person("col-1-id-2", "bar"),
+						new Person("col-2-id-1", "bar"), new Person("col-2-id-2", "foo"));
+	}
+
 	@Data
 	static class Person {
 		@Id String id;
diff --git a/src/main/asciidoc/reference/change-streams.adoc b/src/main/asciidoc/reference/change-streams.adoc
new file mode 100644
index 000000000..945d6e876
--- /dev/null
+++ b/src/main/asciidoc/reference/change-streams.adoc
@@ -0,0 +1,82 @@
+[[change-streams]]
+== Change Streams
+
+As of MongoDB 3.6, https://docs.mongodb.com/manual/changeStreams/[Change Streams] let applications get notified about changes without having to tail the oplog.
+
+NOTE: Change Stream support is only possible for replica sets or for a sharded cluster.
+
+Change Streams can be subscribed to with both the imperative and the reactive MongoDB Java driver. It is highly recommended to use the reactive variant, as it is less resource-intensive. However if you cannot use the reactive API, you can still obtain the change events by using the  messaging concept that is already prevalent in the Spring ecosystem.
+
+It is possible to watch both on a collection as well as database level, whereas the database level variant publishes
+changes from all collections within the database. So when subscribing to a database change stream, make sure to use a
+ suitable type for the event type in use as conversion might not apply correctly when set to specificly. In doubt use
+  `Document`.
+
+=== Change Streams with `MessageListener`
+
+Listening to a https://docs.mongodb.com/manual/tutorial/change-streams-example/[Change Stream by using a Sync Driver] is a long running, blocking task that needs to be delegated to a separate component.
+In this case, we need to first create a `MessageListenerContainer`, which will be the main entry point for running the specific `SubscriptionRequest` tasks.
+Spring Data MongoDB already ships with a default implementation that operates on `MongoTemplate` and is capable of creating and executing `Task` instances for a `ChangeStreamRequest`.
+
+The following example shows how to use Change Streams with `MessageListener` instances:
+
+.Change Streams with `MessageListener` instances
+====
+[source,java]
+----
+MessageListenerContainer container = new DefaultMessageListenerContainer(template);
+container.start();    <1>
+
+MessageListener, User> listener = System.out::println; <2>
+ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("user", ChangeStreamOptions.empty()); <3>
+
+Subscription subscription = container.register(new ChangeStreamRequest<>(listener, options), User.class); <4>
+
+// ...
+
+container.stop(); <5>
+----
+<1> Starting the container intializes the resources and starts the `Task` instances for the already registered `SubscriptionRequest` instances. Requests added after the startup are run immediately.
+<2> Define the listener called when a `Message` is received. The `Message#getBody()` is converted to the requested domain type. Use `Document` to receive raw results without conversion.
+<3> Set the collection to listen to and provide additional options through `ChangeStreamOptions`.
+<4> Register the request. The returned `Subscription` can be used to check the current `Task` state and cancel its execution to free resources.
+<5> Do not forget to stop the container once you are sure you no longer need it. Doing so stops all running `Task` instances within the container.
+====
+
+=== Change Streams - Reactive
+
+Subscribing to Change Stream with the reactive API is more straightforward. Still the essential building blocks, such as `ChangeStreamOptions`, remain the same. The following example shows how to use Change Streams with reactive `MessageListeners`:
+
+.Change Streams with `MessageListeners`
+====
+[source,java]
+----
+ChangeStreamOptions options = ChangeStreamOptions.builder()
+    .filter(newAggregation(User.class, match(where("age").gte(38))) <1>
+    .build();
+
+Flux> flux = reactiveTemplate.changeStream("user", options, User.class); <2>
+----
+<1> Use an aggregation pipeline to filter events.
+<2> Obtain a `Flux` of change stream events. The `ChangeStreamEvent#getBody()` is converted to the requested domain type. Use `Document` to receive raw results without conversion.
+====
+
+=== Resuming Change Streams
+
+Change Streams can be resumed and will pick up emitting events where you left. To resume the stream either a resume
+token or the server time in UTC from where to resume is required. Use `ChangeStreamOptions` to set the value
+accordingly.
+
+.Resume a Change Stream
+====
+[source,java]
+----
+ChangeStreamOptions = ChangeStreamOptions.builder()
+    .resumeAt(Instant.now().minusSeconds(1)) <1>
+    .build()
+
+Flux> resumed = template.changeStream("person", options, User.class)
+----
+<1> You may obtain the server time of an `ChangeStreamEvent` via the `getTimestamp` method or use the `resumeToken`
+exposed via `getResumeToken`.
+====
diff --git a/src/main/asciidoc/reference/mongodb.adoc b/src/main/asciidoc/reference/mongodb.adoc
index e40077741..cb530ce22 100644
--- a/src/main/asciidoc/reference/mongodb.adoc
+++ b/src/main/asciidoc/reference/mongodb.adoc
@@ -3072,57 +3072,4 @@ class GridFsClient {
 
 `GridFsOperations` extends `ResourcePatternResolver` and lets the `GridFsTemplate` (for example) to be plugged into an `ApplicationContext` to read Spring Config files from MongoDB database.
 
-[[change-streams]]
-== Change Streams
-
-As of MongoDB 3.6, https://docs.mongodb.com/manual/changeStreams/[Change Streams] let applications get notified about changes without having to tail the oplog.
-
-NOTE: Change Stream support is only possible for replica sets or for a sharded cluster.
-
-Change Streams can be subscribed to with both the imperative and the reactive MongoDB Java driver. It is highly recommended to use the reactive variant, as it is less resource-intensive. However if you cannot use the reactive API, you can still obtain the change events by using the  messaging concept that is already prevalent in the Spring ecosystem.
-
-=== Change Streams with `MessageListener`
-
-Listening to a https://docs.mongodb.com/manual/tutorial/change-streams-example/[Change Stream by using a Sync Driver] is a long running, blocking task that needs to be delegated to a separate component.
-In this case, we need to first create a `MessageListenerContainer`, which will be the main entry point for running the specific `SubscriptionRequest` tasks.
-Spring Data MongoDB already ships with a default implementation that operates on `MongoTemplate` and is capable of creating and executing `Task` instances for a `ChangeStreamRequest`.
-
-The following example shows how to use Change Streams with `MessageListener` instances:
-
-.Change Streams with `MessageListener` instances
-====
-[source,java]
-----
-MessageListenerContainer container = new DefaultMessageListenerContainer(template);
-container.start();    <1>
-
-MessageListener, User> listener = System.out::println; <2>
-ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("user", ChangeStreamOptions.empty()); <3>
-
-Subscription subscription = container.register(new ChangeStreamRequest<>(listener, options), User.class); <4>
-
-// ...
-
-container.stop(); <5>
-----
-<1> Starting the container intializes the resources and starts the `Task` instances for the already registered `SubscriptionRequest` instances. Requests added after the startup are run immediately.
-<2> Define the listener called when a `Message` is received. The `Message#getBody()` is converted to the requested domain type. Use `Document` to receive raw results without conversion.
-<3> Set the collection to listen to and provide additional options through `ChangeStreamOptions`.
-<4> Register the request. The returned `Subscription` can be used to check the current `Task` state and cancel its execution to free resources.
-<5> Do not forget to stop the container once you are sure you no longer need it. Doing so stops all running `Task` instances within the container.
-====
-
-=== Change Streams - Reactive
-
-Subscribing to Change Stream with the reactive API is more straightforward. Still the essential building blocks, such as `ChangeStreamOptions`, remain the same. The following example shows how to use Change Streams with reactive `MessageListeners`:
-
-.Change Streams with `MessageListeners`
-====
-[source,java]
-----
-Aggregation filter = newAggregation(User.class, match(where("age").gte(38)); <1>
-Flux> flux = reactiveTemplate.changeStream(filter), User.class, ChangeStreamOptions.empty()); <2>
-----
-<1> Use an aggregation pipeline to filter events.
-<2> Obtain a `Flux` of change stream events. The `ChangeStreamEvent#getBody()` is converted to the requested domain type. Use `Document` to receive raw results without conversion.
-====
+include::change-streams.adoc[]
\ No newline at end of file