Browse Source

DATAMONGO-2012 - Upgrade drivers to 3.8 (sync) and 1.9 (reactive).

We still stick to count for non session operations as countDocuments does not allow geo operators like $near in the filter query. For now we will wait to see if this is resolved within the driver.

Added options to watch an entire database and resume the changestream from a given point in time (UTC).

Original pull request: #576.
pull/577/merge
Christoph Strobl 8 years ago committed by Mark Paluch
parent
commit
88150eca54
  1. 2
      .travis.yml
  2. 4
      pom.xml
  3. 53
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java
  4. 25
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java
  5. 15
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java
  6. 64
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java
  7. 120
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java
  8. 21
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/PrefixingDelegatingAggregationOperationContext.java
  9. 70
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java
  10. 73
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java
  11. 90
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/SubscriptionRequest.java
  12. 100
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java
  13. 28
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/SessionBoundMongoTemplateTests.java
  14. 134
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTests.java
  15. 32
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainerTests.java
  16. 82
      src/main/asciidoc/reference/change-streams.adoc
  17. 55
      src/main/asciidoc/reference/mongodb.adoc

2
.travis.yml

@ -18,7 +18,7 @@ env: @@ -18,7 +18,7 @@ env:
matrix:
- PROFILE=ci
global:
- MONGO_VERSION=4.0.0-rc4
- MONGO_VERSION=4.0.0
addons:
apt:

4
pom.xml

@ -28,8 +28,8 @@ @@ -28,8 +28,8 @@
<project.type>multi</project.type>
<dist.id>spring-data-mongodb</dist.id>
<springdata.commons>2.1.0.BUILD-SNAPSHOT</springdata.commons>
<mongo>3.8.0-beta2</mongo>
<mongo.reactivestreams>1.9.0-beta1</mongo.reactivestreams>
<mongo>3.8.0</mongo>
<mongo.reactivestreams>1.9.0</mongo.reactivestreams>
<jmh.version>1.19</jmh.version>
</properties>

53
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java

@ -17,8 +17,10 @@ package org.springframework.data.mongodb.core; @@ -17,8 +17,10 @@ package org.springframework.data.mongodb.core;
import lombok.EqualsAndHashCode;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicReference;
import org.bson.BsonValue;
import org.bson.Document;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.messaging.Message;
@ -26,6 +28,7 @@ import org.springframework.lang.Nullable; @@ -26,6 +28,7 @@ import org.springframework.lang.Nullable;
import org.springframework.util.ClassUtils;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.OperationType;
/**
* {@link Message} implementation specific to MongoDB <a href="https://docs.mongodb.com/manual/changeStreams/">Change
@ -67,6 +70,56 @@ public class ChangeStreamEvent<T> { @@ -67,6 +70,56 @@ public class ChangeStreamEvent<T> {
return raw;
}
/**
* Get the {@link ChangeStreamDocument#getClusterTime() cluster time} as {@link Instant} the event was emitted at.
*
* @return can be {@literal null}.
*/
@Nullable
public Instant getTimestamp() {
return raw != null ? Instant.ofEpochMilli(raw.getClusterTime().getValue()) : null;
}
/**
* Get the {@link ChangeStreamDocument#getResumeToken() resume token} for this event.
*
* @return can be {@literal null}.
*/
@Nullable
public BsonValue getResumeToken() {
return raw != null ? raw.getResumeToken() : null;
}
/**
* Get the {@link ChangeStreamDocument#getOperationType() operation type} for this event.
*
* @return can be {@literal null}.
*/
@Nullable
public OperationType getOperationType() {
return raw != null ? raw.getOperationType() : null;
}
/**
* Get the database name the event was originated at.
*
* @return can be {@literal null}.
*/
@Nullable
public String getDatabaseName() {
return raw != null ? raw.getNamespace().getDatabaseName() : null;
}
/**
* Get the collection name the event was originated at.
*
* @return can be {@literal null}.
*/
@Nullable
public String getCollectionName() {
return raw != null ? raw.getNamespace().getCollectionName() : null;
}
/**
* Get the potentially converted {@link ChangeStreamDocument#getFullDocument()}.
*

25
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java

@ -17,6 +17,7 @@ package org.springframework.data.mongodb.core; @@ -17,6 +17,7 @@ package org.springframework.data.mongodb.core;
import lombok.EqualsAndHashCode;
import java.time.Instant;
import java.util.Arrays;
import java.util.Optional;
@ -46,6 +47,7 @@ public class ChangeStreamOptions { @@ -46,6 +47,7 @@ public class ChangeStreamOptions {
private @Nullable BsonValue resumeToken;
private @Nullable FullDocument fullDocumentLookup;
private @Nullable Collation collation;
private @Nullable Instant resumeTimestamp;
protected ChangeStreamOptions() {}
@ -77,6 +79,13 @@ public class ChangeStreamOptions { @@ -77,6 +79,13 @@ public class ChangeStreamOptions {
return Optional.ofNullable(collation);
}
/**
* @return {@link Optional#empty()} if not set.
*/
public Optional<Instant> getResumeTimestamp() {
return Optional.ofNullable(resumeTimestamp);
}
/**
* @return empty {@link ChangeStreamOptions}.
*/
@ -106,6 +115,7 @@ public class ChangeStreamOptions { @@ -106,6 +115,7 @@ public class ChangeStreamOptions {
private @Nullable BsonValue resumeToken;
private @Nullable FullDocument fullDocumentLookup;
private @Nullable Collation collation;
private @Nullable Instant resumeTimestamp;
private ChangeStreamOptionsBuilder() {}
@ -200,6 +210,20 @@ public class ChangeStreamOptions { @@ -200,6 +210,20 @@ public class ChangeStreamOptions {
return this;
}
/**
* Set the cluster time to resume from.
*
* @param resumeTimestamp must not be {@literal null}.
* @return this.
*/
public ChangeStreamOptionsBuilder resumeAt(Instant resumeTimestamp) {
Assert.notNull(resumeTimestamp, "ResumeTimestamp must not be null!");
this.resumeTimestamp = resumeTimestamp;
return this;
}
/**
* @return the built {@link ChangeStreamOptions}
*/
@ -211,6 +235,7 @@ public class ChangeStreamOptions { @@ -211,6 +235,7 @@ public class ChangeStreamOptions {
options.resumeToken = resumeToken;
options.fullDocumentLookup = fullDocumentLookup;
options.collation = collation;
options.resumeTimestamp = resumeTimestamp;
return options;
}

15
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java

@ -3534,18 +3534,13 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, @@ -3534,18 +3534,13 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
return super.count(query, entityClass, collectionName);
}
AggregationUtil aggregationUtil = new AggregationUtil(delegate.queryMapper, delegate.mappingContext);
Aggregation aggregation = aggregationUtil.createCountAggregation(query, entityClass);
AggregationResults<Document> aggregationResults = aggregate(aggregation, collectionName, Document.class);
CountOptions options = new CountOptions();
query.getCollation().map(Collation::toMongoCollation).ifPresent(options::collation);
List<Document> result = (List<Document>) aggregationResults.getRawResults().getOrDefault("results",
Collections.emptyList());
Document document = delegate.queryMapper.getMappedObject(query.getQueryObject(),
Optional.ofNullable(entityClass).map(it -> delegate.mappingContext.getPersistentEntity(entityClass)));
if (result.isEmpty()) {
return 0;
}
return result.get(0).get("totalEntityCount", Number.class).longValue();
return execute(collectionName, collection -> collection.countDocuments(document, options));
}
}
}

64
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java

@ -19,7 +19,6 @@ import reactor.core.publisher.Flux; @@ -19,7 +19,6 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;
@ -1357,9 +1356,10 @@ public interface ReactiveMongoOperations extends ReactiveFluentMongoOperations { @@ -1357,9 +1356,10 @@ public interface ReactiveMongoOperations extends ReactiveFluentMongoOperations {
<T> Flux<T> tail(Query query, Class<T> entityClass, String collectionName);
/**
* Subscribe to a MongoDB <a href="https://docs.mongodb.com/manual/changeStreams/">Change Streams</a> via the reactive
* infrastructure. Use the optional provided {@link Aggregation} to filter events. The stream will not be completed
* unless the {@link org.reactivestreams.Subscription} is {@link Subscription#cancel() canceled}.
* Subscribe to a MongoDB <a href="https://docs.mongodb.com/manual/changeStreams/">Change Streams</a> for all events
* in the configured default database via the reactive infrastructure. Use the optional provided {@link Aggregation}
* to filter events. The stream will not be completed unless the {@link org.reactivestreams.Subscription} is
* {@link Subscription#cancel() canceled}.
* <p />
* The {@link ChangeStreamEvent#getBody()} is mapped to the {@literal resultType} while the
* {@link ChangeStreamEvent#getRaw()} contains the unmodified payload.
@ -1367,38 +1367,62 @@ public interface ReactiveMongoOperations extends ReactiveFluentMongoOperations { @@ -1367,38 +1367,62 @@ public interface ReactiveMongoOperations extends ReactiveFluentMongoOperations {
* Use {@link ChangeStreamOptions} to set arguments like {@link ChangeStreamOptions#getResumeToken() the resumseToken}
* for resuming change streams.
*
* @param filter can be {@literal null}.
* @param resultType must not be {@literal null}.
* @param options must not be {@literal null}.
* @param collectionName must not be {@literal null} nor empty.
* @param options must not be {@literal null}. Use {@link ChangeStreamOptions#empty()}.
* @param targetType the result type to use.
* @param <T>
* @return
* @return the {@link Flux} emitting {@link ChangeStreamEvent events} as they arrive.
* @since 2.1
*/
<T> Flux<ChangeStreamEvent<T>> changeStream(@Nullable Aggregation filter, Class<T> resultType,
ChangeStreamOptions options, String collectionName);
default <T> Flux<ChangeStreamEvent<T>> changeStream(ChangeStreamOptions options, Class<T> targetType) {
return changeStream(null, options, targetType);
}
/**
* Subscribe to a MongoDB <a href="https://docs.mongodb.com/manual/changeStreams/">Change Streams</a> for all events
* in the given collection via the reactive infrastructure. Use the optional provided {@link Aggregation} to filter
* events. The stream will not be completed unless the {@link org.reactivestreams.Subscription} is
* {@link Subscription#cancel() canceled}.
* <p />
* The {@link ChangeStreamEvent#getBody()} is mapped to the {@literal resultType} while the
* {@link ChangeStreamEvent#getRaw()} contains the unmodified payload.
* <p />
* Use {@link ChangeStreamOptions} to set arguments like {@link ChangeStreamOptions#getResumeToken() the resumseToken}
* for resuming change streams.
*
* @param collectionName the collection to watch. Can be {@literal null}, watches all collections if so.
* @param options must not be {@literal null}. Use {@link ChangeStreamOptions#empty()}.
* @param targetType the result type to use.
* @param <T>
* @return the {@link Flux} emitting {@link ChangeStreamEvent events} as they arrive.
* @since 2.1
*/
default <T> Flux<ChangeStreamEvent<T>> changeStream(@Nullable String collectionName, ChangeStreamOptions options,
Class<T> targetType) {
return changeStream(null, collectionName, options, targetType);
}
/**
* Subscribe to a MongoDB <a href="https://docs.mongodb.com/manual/changeStreams/">Change Streams</a> via the reactive
* infrastructure. Use the optional provided aggregation chain to filter events. The stream will not be completed
* infrastructure. Use the optional provided {@link Aggregation} to filter events. The stream will not be completed
* unless the {@link org.reactivestreams.Subscription} is {@link Subscription#cancel() canceled}.
* <p />
* The {@link ChangeStreamEvent#getBody()} is mapped to the {@literal resultType} while the
* {@link ChangeStreamEvent#getRaw()} contains the unmodified payload.
* <p />
* Use {@link ChangeStreamOptions} to set arguments like {@link ChangeStreamOptions#getResumeToken() the resumeToken}
* Use {@link ChangeStreamOptions} to set arguments like {@link ChangeStreamOptions#getResumeToken() the resumseToken}
* for resuming change streams.
*
* @param filter can be empty, must not be {@literal null}.
* @param resultType must not be {@literal null}.
* @param options must not be {@literal null}.
* @param collectionName must not be {@literal null} nor empty.
* @param database the database to watch. Can be {@literal null}, uses configured default if so.
* @param collectionName the collection to watch. Can be {@literal null}, watches all collections if so.
* @param options must not be {@literal null}. Use {@link ChangeStreamOptions#empty()}.
* @param targetType the result type to use.
* @param <T>
* @return
* @return the {@link Flux} emitting {@link ChangeStreamEvent events} as they arrive.
* @since 2.1
*/
<T> Flux<ChangeStreamEvent<T>> changeStream(List<Document> filter, Class<T> resultType, ChangeStreamOptions options,
String collectionName);
<T> Flux<ChangeStreamEvent<T>> changeStream(@Nullable String database, @Nullable String collectionName,
ChangeStreamOptions options, Class<T> targetType);
/**
* Execute a map-reduce operation. Use {@link MapReduceOptions} to optionally specify an output collection and other

120
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java

@ -34,6 +34,7 @@ import java.util.stream.Collectors; @@ -34,6 +34,7 @@ import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.bson.BsonTimestamp;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.codecs.Codec;
@ -71,10 +72,8 @@ import org.springframework.data.mapping.model.ConvertingPropertyAccessor; @@ -71,10 +72,8 @@ import org.springframework.data.mapping.model.ConvertingPropertyAccessor;
import org.springframework.data.mongodb.MongoDbFactory;
import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
import org.springframework.data.mongodb.core.aggregation.CountOperation;
import org.springframework.data.mongodb.core.aggregation.PrefixingDelegatingAggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.TypeBasedAggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
@ -1998,56 +1997,57 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati @@ -1998,56 +1997,57 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
new TailingQueryFindPublisherPreparer(query, entityClass));
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.core.ReactiveMongoOperations#tail(org.springframework.data.mongodb.core.aggregation.Aggregation, java.lang.Class, org.springframework.data.mongodb.core.ChangeStreamOptions, java.lang.String)
*/
@Override
public <T> Flux<ChangeStreamEvent<T>> changeStream(@Nullable Aggregation filter, Class<T> resultType,
ChangeStreamOptions options, String collectionName) {
public <T> Flux<ChangeStreamEvent<T>> changeStream(@Nullable String database, @Nullable String collectionName,
ChangeStreamOptions options, Class<T> targetType) {
Assert.notNull(resultType, "Result type must not be null!");
Assert.notNull(options, "ChangeStreamOptions must not be null!");
Assert.hasText(collectionName, "Collection name must not be null or empty!");
List<Document> filter = prepareFilter(options);
FullDocument fullDocument = ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT
: FullDocument.UPDATE_LOOKUP;
if (filter == null) {
return changeStream(Collections.emptyList(), resultType, options, collectionName);
MongoDatabase db = StringUtils.hasText(database) ? mongoDatabaseFactory.getMongoDatabase(database)
: getMongoDatabase();
ChangeStreamPublisher<Document> publisher;
if (StringUtils.hasText(collectionName)) {
publisher = filter.isEmpty() ? db.getCollection(collectionName).watch(Document.class)
: db.getCollection(collectionName).watch(filter, Document.class);
} else {
publisher = filter.isEmpty() ? db.watch(Document.class) : db.watch(filter, Document.class);
}
AggregationOperationContext context = filter instanceof TypedAggregation ? new TypeBasedAggregationOperationContext(
((TypedAggregation) filter).getInputType(), mappingContext, queryMapper) : Aggregation.DEFAULT_CONTEXT;
publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::resumeAfter).orElse(publisher);
publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation).orElse(publisher);
publisher = options.getResumeTimestamp().map(it -> new BsonTimestamp(it.toEpochMilli()))
.map(publisher::startAtOperationTime).orElse(publisher);
publisher = publisher.fullDocument(options.getFullDocumentLookup().orElse(fullDocument));
return changeStream(
filter.toPipeline(new PrefixingDelegatingAggregationOperationContext(context, "fullDocument",
Arrays.asList("operationType", "fullDocument", "documentKey", "updateDescription", "ns"))),
resultType, options, collectionName);
return Flux.from(publisher).map(document -> new ChangeStreamEvent<>(document, targetType, getConverter()));
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.core.ReactiveMongoOperations#tail(java.util.List, java.lang.Class, org.springframework.data.mongodb.core.ChangeStreamOptions, java.lang.String)
*/
@Override
public <T> Flux<ChangeStreamEvent<T>> changeStream(List<Document> filter, Class<T> resultType,
ChangeStreamOptions options, String collectionName) {
Assert.notNull(filter, "Filter must not be null!");
Assert.notNull(resultType, "Result type must not be null!");
Assert.notNull(options, "ChangeStreamOptions must not be null!");
Assert.hasText(collectionName, "Collection name must not be null or empty!");
List<Document> prepareFilter(ChangeStreamOptions options) {
ChangeStreamPublisher<Document> publisher = filter.isEmpty() ? getCollection(collectionName).watch()
: getCollection(collectionName).watch(filter);
if (!options.getFilter().isPresent()) {
return Collections.emptyList();
}
publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::resumeAfter).orElse(publisher);
publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation).orElse(publisher);
Object filter = options.getFilter().get();
if (filter instanceof Aggregation) {
Aggregation agg = (Aggregation) filter;
AggregationOperationContext context = agg instanceof TypedAggregation
? new TypeBasedAggregationOperationContext(((TypedAggregation<?>) agg).getInputType(),
getConverter().getMappingContext(), queryMapper)
: Aggregation.DEFAULT_CONTEXT;
if (options.getFullDocumentLookup().isPresent() || resultType != Document.class) {
publisher = publisher.fullDocument(options.getFullDocumentLookup().isPresent()
? options.getFullDocumentLookup().get() : FullDocument.UPDATE_LOOKUP);
return agg.toPipeline(new PrefixingDelegatingAggregationOperationContext(context, "fullDocument",
Arrays.asList("operationType", "fullDocument", "documentKey", "updateDescription", "ns")));
} else if (filter instanceof List) {
return (List<Document>) filter;
} else {
throw new IllegalArgumentException(
"ChangeStreamRequestOptions.filter mut be either an Aggregation or a plain list of Documents");
}
return Flux.from(publisher).map(document -> new ChangeStreamEvent<>(document, resultType, getConverter()));
}
/*
@ -3396,41 +3396,19 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati @@ -3396,41 +3396,19 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
return super.count(query, entityClass, collectionName);
}
AggregationUtil aggregationUtil = new AggregationUtil(delegate.queryMapper, delegate.mappingContext);
Aggregation aggregation = aggregationUtil.createCountAggregation(query, entityClass);
return aggregate(aggregation, collectionName, Document.class) //
.next() //
.map(it -> it.get("totalEntityCount", Number.class).longValue()) //
.defaultIfEmpty(0L);
}
private List<AggregationOperation> computeCountAggregationPipeline(@Nullable Query query,
@Nullable Class<?> entityType) {
CountOperation count = Aggregation.count().as("totalEntityCount");
if (query == null || query.getQueryObject().isEmpty()) {
return Arrays.asList(count);
}
Document mappedQuery = delegate.queryMapper.getMappedObject(query.getQueryObject(),
delegate.getPersistentEntity(entityType));
CriteriaDefinition criteria = new CriteriaDefinition() {
return createMono(collectionName, collection -> {
@Override
public Document getCriteriaObject() {
return mappedQuery;
}
final Document Document = query == null ? null
: delegate.queryMapper.getMappedObject(query.getQueryObject(),
entityClass == null ? null : delegate.mappingContext.getPersistentEntity(entityClass));
@Nullable
@Override
public String getKey() {
return null;
CountOptions options = new CountOptions();
if (query != null) {
query.getCollation().map(Collation::toMongoCollation).ifPresent(options::collation);
}
};
return Arrays.asList(Aggregation.match(criteria), count);
return collection.countDocuments(Document, options);
});
}
}

21
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/PrefixingDelegatingAggregationOperationContext.java

@ -102,7 +102,7 @@ public class PrefixingDelegatingAggregationOperationContext implements Aggregati @@ -102,7 +102,7 @@ public class PrefixingDelegatingAggregationOperationContext implements Aggregati
}
private String prefixKey(String key) {
return (key.startsWith("$") || blacklist.contains(key)) ? key : (prefix + "." + key);
return (key.startsWith("$") || isBlacklisted(key)) ? key : (prefix + "." + key);
}
private Object prefixCollection(Collection<Object> sourceCollection) {
@ -119,4 +119,23 @@ public class PrefixingDelegatingAggregationOperationContext implements Aggregati @@ -119,4 +119,23 @@ public class PrefixingDelegatingAggregationOperationContext implements Aggregati
return prefixed;
}
private boolean isBlacklisted(String key) {
if (blacklist.contains(key)) {
return true;
}
if (!key.contains(".")) {
return false;
}
for (String blacklisted : blacklist) {
if (key.startsWith(blacklisted + ".")) {
return true;
}
}
return false;
}
}

70
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java

@ -15,6 +15,8 @@ @@ -15,6 +15,8 @@
*/
package org.springframework.data.mongodb.core.messaging;
import java.time.Instant;
import org.bson.BsonValue;
import org.bson.Document;
import org.springframework.data.mongodb.core.ChangeStreamOptions;
@ -34,7 +36,7 @@ import com.mongodb.client.model.changestream.FullDocument; @@ -34,7 +36,7 @@ import com.mongodb.client.model.changestream.FullDocument;
* using the synchronous MongoDB Java driver.
* <p/>
* The most trivial use case is subscribing to all events of a specific {@link com.mongodb.client.MongoCollection
* collection}.
* collection}
*
* <pre>
* <code>
@ -42,6 +44,15 @@ import com.mongodb.client.model.changestream.FullDocument; @@ -42,6 +44,15 @@ import com.mongodb.client.model.changestream.FullDocument;
* </code>
* </pre>
*
* or {@link com.mongodb.client.MongoDatabase} which receives events from all {@link com.mongodb.client.MongoCollection
* collections} in that database.
*
* <pre>
* <code>
* ChangeStreamRequest<Document> request = new ChangeStreamRequest<>(System.out::println, RequestOptions.justDatabase("test"));
* </code>
* </pre>
*
* For more advanced scenarios {@link ChangeStreamOptions} offers abstractions for options like filtering, resuming,...
*
* <pre>
@ -154,21 +165,23 @@ public class ChangeStreamRequest<T> @@ -154,21 +165,23 @@ public class ChangeStreamRequest<T>
*/
public static class ChangeStreamRequestOptions implements SubscriptionRequest.RequestOptions {
private final String collectionName;
private final @Nullable String databaseName;
private final @Nullable String collectionName;
private final ChangeStreamOptions options;
/**
* Create new {@link ChangeStreamRequestOptions}.
*
* @param collectionName must not be {@literal null}.
* @param collectionName can be {@literal null}.
* @param options must not be {@literal null}.
*/
public ChangeStreamRequestOptions(String collectionName, ChangeStreamOptions options) {
public ChangeStreamRequestOptions(@Nullable String databaseName, @Nullable String collectionName,
ChangeStreamOptions options) {
Assert.notNull(collectionName, "CollectionName must not be null!");
Assert.notNull(options, "Options must not be null!");
this.collectionName = collectionName;
this.databaseName = databaseName;
this.options = options;
}
@ -176,7 +189,8 @@ public class ChangeStreamRequest<T> @@ -176,7 +189,8 @@ public class ChangeStreamRequest<T>
Assert.notNull(options, "Options must not be null!");
return new ChangeStreamRequestOptions(options.getCollectionName(), ChangeStreamOptions.builder().build());
return new ChangeStreamRequestOptions(options.getDatabaseName(), options.getCollectionName(),
ChangeStreamOptions.builder().build());
}
/**
@ -196,6 +210,15 @@ public class ChangeStreamRequest<T> @@ -196,6 +210,15 @@ public class ChangeStreamRequest<T>
public String getCollectionName() {
return collectionName;
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.monitor.SubscriptionRequest.RequestOptions#getDatabaseName()
*/
@Override
public String getDatabaseName() {
return databaseName;
}
}
/**
@ -207,12 +230,27 @@ public class ChangeStreamRequest<T> @@ -207,12 +230,27 @@ public class ChangeStreamRequest<T>
*/
public static class ChangeStreamRequestBuilder<T> {
private @Nullable String databaseName;
private @Nullable String collectionName;
private @Nullable MessageListener<ChangeStreamDocument<Document>, ? super T> listener;
private ChangeStreamOptionsBuilder delegate = ChangeStreamOptions.builder();
private ChangeStreamRequestBuilder() {}
/**
* Set the name of the {@link com.mongodb.client.MongoDatabase} to listen to.
*
* @param databaseName must not be {@literal null} nor empty.
* @return this.
*/
public ChangeStreamRequestBuilder<T> database(String databaseName) {
Assert.hasText(databaseName, "DatabaseName must not be null!");
this.databaseName = databaseName;
return this;
}
/**
* Set the name of the {@link com.mongodb.client.MongoCollection} to listen to.
*
@ -317,6 +355,22 @@ public class ChangeStreamRequest<T> @@ -317,6 +355,22 @@ public class ChangeStreamRequest<T>
return this;
}
/**
* Set the cluster time at which to resume listening.
*
* @param clusterTime must not be {@literal null}.
* @return this.
* @see ChangeStreamOptions#getResumeTimestamp()
* @see ChangeStreamOptionsBuilder#resumeAt(java.time.Instant)
*/
public ChangeStreamRequestBuilder<T> resumeAt(Instant clusterTime) {
Assert.notNull(clusterTime, "ClusterTime must not be null!");
this.delegate.resumeAt(clusterTime);
return this;
}
/**
* Set the {@link FullDocument} lookup to {@link FullDocument#UPDATE_LOOKUP}.
*
@ -339,9 +393,9 @@ public class ChangeStreamRequest<T> @@ -339,9 +393,9 @@ public class ChangeStreamRequest<T>
public ChangeStreamRequest<T> build() {
Assert.notNull(listener, "MessageListener must not be null!");
Assert.hasText(collectionName, "CollectionName must not be null!");
return new ChangeStreamRequest<>(listener, new ChangeStreamRequestOptions(collectionName, delegate.build()));
return new ChangeStreamRequest<>(listener,
new ChangeStreamRequestOptions(databaseName, collectionName, delegate.build()));
}
}
}

73
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java

@ -17,14 +17,16 @@ package org.springframework.data.mongodb.core.messaging; @@ -17,14 +17,16 @@ package org.springframework.data.mongodb.core.messaging;
import lombok.AllArgsConstructor;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
import org.bson.BsonValue;
import org.bson.Document;
import org.springframework.data.mongodb.core.ChangeStreamEvent;
import org.springframework.data.mongodb.core.ChangeStreamOptions;
@ -42,10 +44,12 @@ import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.Reque @@ -42,10 +44,12 @@ import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.Reque
import org.springframework.lang.Nullable;
import org.springframework.util.ClassUtils;
import org.springframework.util.ErrorHandler;
import org.springframework.util.StringUtils;
import com.mongodb.MongoNamespace;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Collation;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
@ -84,7 +88,9 @@ class ChangeStreamTask extends CursorReadingTask<ChangeStreamDocument<Document>, @@ -84,7 +88,9 @@ class ChangeStreamTask extends CursorReadingTask<ChangeStreamDocument<Document>,
List<Document> filter = Collections.emptyList();
BsonDocument resumeToken = new BsonDocument();
Collation collation = null;
FullDocument fullDocument = FullDocument.DEFAULT;
FullDocument fullDocument = ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT
: FullDocument.UPDATE_LOOKUP;
BsonTimestamp startAt = null;
if (options instanceof ChangeStreamRequest.ChangeStreamRequestOptions) {
@ -107,16 +113,33 @@ class ChangeStreamTask extends CursorReadingTask<ChangeStreamDocument<Document>, @@ -107,16 +113,33 @@ class ChangeStreamTask extends CursorReadingTask<ChangeStreamDocument<Document>,
fullDocument = changeStreamOptions.getFullDocumentLookup()
.orElseGet(() -> ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT
: FullDocument.UPDATE_LOOKUP);
if (changeStreamOptions.getResumeTimestamp().isPresent()) {
startAt = new BsonTimestamp(changeStreamOptions.getResumeTimestamp().get().toEpochMilli());
}
}
ChangeStreamIterable<Document> iterable = filter.isEmpty()
? template.getCollection(options.getCollectionName()).watch(Document.class)
: template.getCollection(options.getCollectionName()).watch(filter, Document.class);
MongoDatabase db = StringUtils.hasText(options.getDatabaseName())
? template.getMongoDbFactory().getDb(options.getDatabaseName()) : template.getDb();
ChangeStreamIterable<Document> iterable;
if (StringUtils.hasText(options.getCollectionName())) {
iterable = filter.isEmpty() ? db.getCollection(options.getCollectionName()).watch(Document.class)
: db.getCollection(options.getCollectionName()).watch(filter, Document.class);
} else {
iterable = filter.isEmpty() ? db.watch(Document.class) : db.watch(filter, Document.class);
}
if (!resumeToken.isEmpty()) {
iterable = iterable.resumeAfter(resumeToken);
}
if (startAt != null) {
iterable.startAtOperationTime(startAt);
}
if (collation != null) {
iterable = iterable.collation(collation);
}
@ -153,14 +176,21 @@ class ChangeStreamTask extends CursorReadingTask<ChangeStreamDocument<Document>, @@ -153,14 +176,21 @@ class ChangeStreamTask extends CursorReadingTask<ChangeStreamDocument<Document>,
protected Message<ChangeStreamDocument<Document>, Object> createMessage(ChangeStreamDocument<Document> source,
Class<Object> targetType, RequestOptions options) {
// namespace might be null for eg. OperationType.INVALIDATE
MongoNamespace namespace = Optional.ofNullable(source.getNamespace())
.orElse(new MongoNamespace("unknown", options.getCollectionName()));
MongoNamespace namespace = source.getNamespace() != null ? source.getNamespace()
: createNamespaceFromOptions(options);
return new ChangeStreamEventMessage<>(new ChangeStreamEvent<>(source, targetType, mongoConverter), MessageProperties
.builder().databaseName(namespace.getDatabaseName()).collectionName(namespace.getCollectionName()).build());
}
MongoNamespace createNamespaceFromOptions(RequestOptions options) {
String collectionName = StringUtils.hasText(options.getCollectionName()) ? options.getCollectionName() : "unknown";
String databaseName = StringUtils.hasText(options.getDatabaseName()) ? options.getDatabaseName() : "unknown";
return new MongoNamespace(databaseName, collectionName);
}
/**
* {@link Message} implementation for ChangeStreams
*
@ -200,5 +230,32 @@ class ChangeStreamTask extends CursorReadingTask<ChangeStreamDocument<Document>, @@ -200,5 +230,32 @@ class ChangeStreamTask extends CursorReadingTask<ChangeStreamDocument<Document>,
public MessageProperties getProperties() {
return this.messageProperties;
}
/**
* @return the resume token or {@litearl null} if not set.
* @see ChangeStreamEvent#getResumeToken()
*/
@Nullable
BsonValue getResumeToken() {
return delegate.getResumeToken();
}
/**
* @return the cluster time of the event or {@literal null}.
* @see ChangeStreamEvent#getTimestamp()
*/
@Nullable
Instant getTimestamp() {
return delegate.getTimestamp();
}
/**
* Get the {@link ChangeStreamEvent} from the message.
*
* @return never {@literal null}.
*/
ChangeStreamEvent<T> getChangeStreamEvent() {
return delegate;
}
}
}

90
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/SubscriptionRequest.java

@ -15,7 +15,10 @@ @@ -15,7 +15,10 @@
*/
package org.springframework.data.mongodb.core.messaging;
import org.springframework.data.mongodb.MongoDbFactory;
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* The actual {@link SubscriptionRequest} sent to the {@link MessageListenerContainer}. This wrapper type allows passing
@ -51,8 +54,93 @@ public interface SubscriptionRequest<S, T, O extends RequestOptions> { @@ -51,8 +54,93 @@ public interface SubscriptionRequest<S, T, O extends RequestOptions> {
interface RequestOptions {
/**
* @return the name of the collection to subscribe to. Never {@literal null}.
* Get the database name of the db.
*
* @return the name of the database to subscribe to. Can be {@literal null} in which case the default
* {@link MongoDbFactory#getDb()} is used.
*/
@Nullable
default String getDatabaseName() {
return null;
}
/**
* Get the collection name.
*
* @return the name of the collection to subscribe to. Can be {@literal null}.
*/
@Nullable
String getCollectionName();
/**
* Create empty options.
*
* @return new instance of empty {@link RequestOptions}.
*/
static RequestOptions none() {
return () -> null;
}
/**
* Create options with the provided database.
*
* @param database must not be {@literal null}.
* @return new instance of empty {@link RequestOptions}.
*/
static RequestOptions justDatabase(String database) {
Assert.notNull(database, "Database must not be null!");
return new RequestOptions() {
@Override
public String getCollectionName() {
return null;
}
@Override
public String getDatabaseName() {
return database;
}
};
}
/**
* Create options with the provided collection.
*
* @param collection must not be {@literal null}.
* @return new instance of empty {@link RequestOptions}.
*/
static RequestOptions justCollection(String collection) {
Assert.notNull(collection, "Collection must not be null!");
return () -> collection;
}
/**
* Create options with the provided database and collection.
*
* @param database must not be {@literal null}.
* @param collection must not be {@literal null}.
* @return new instance of empty {@link RequestOptions}.
*/
static RequestOptions of(String database, String collection) {
Assert.notNull(database, "Database must not be null!");
Assert.notNull(collection, "Collection must not be null!");
return new RequestOptions() {
@Override
public String getCollectionName() {
return collection;
}
@Override
public String getDatabaseName() {
return database;
}
};
}
}
}

100
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java

@ -28,6 +28,7 @@ import reactor.core.publisher.Flux; @@ -28,6 +28,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@ -1115,8 +1116,7 @@ public class ReactiveMongoTemplateTests { @@ -1115,8 +1116,7 @@ public class ReactiveMongoTemplateTests {
StepVerifier.create(template.createCollection(Person.class)).expectNextCount(1).verifyComplete();
BlockingQueue<ChangeStreamEvent<Document>> documents = new LinkedBlockingQueue<>(100);
Disposable disposable = template
.changeStream(Collections.emptyList(), Document.class, ChangeStreamOptions.empty(), "person")
Disposable disposable = template.changeStream("person", ChangeStreamOptions.empty(), Document.class)
.doOnNext(documents::add).subscribe();
Thread.sleep(500); // just give it some time to link to the collection.
@ -1147,8 +1147,7 @@ public class ReactiveMongoTemplateTests { @@ -1147,8 +1147,7 @@ public class ReactiveMongoTemplateTests {
StepVerifier.create(template.createCollection(Person.class)).expectNextCount(1).verifyComplete();
BlockingQueue<ChangeStreamEvent<Person>> documents = new LinkedBlockingQueue<>(100);
Disposable disposable = template
.changeStream(Collections.emptyList(), Person.class, ChangeStreamOptions.empty(), "person")
Disposable disposable = template.changeStream("person", ChangeStreamOptions.empty(), Person.class)
.doOnNext(documents::add).subscribe();
Thread.sleep(500); // just give it some time to link to the collection.
@ -1179,8 +1178,9 @@ public class ReactiveMongoTemplateTests { @@ -1179,8 +1178,9 @@ public class ReactiveMongoTemplateTests {
StepVerifier.create(template.createCollection(Person.class)).expectNextCount(1).verifyComplete();
BlockingQueue<ChangeStreamEvent<Person>> documents = new LinkedBlockingQueue<>(100);
Disposable disposable = template.changeStream(newAggregation(Person.class, match(where("age").gte(38))),
Person.class, ChangeStreamOptions.empty(), "person").doOnNext(documents::add).subscribe();
Disposable disposable = template.changeStream("person",
ChangeStreamOptions.builder().filter(newAggregation(Person.class, match(where("age").gte(38)))).build(),
Person.class).doOnNext(documents::add).subscribe();
Thread.sleep(500); // just give it some time to link to the collection.
@ -1211,8 +1211,10 @@ public class ReactiveMongoTemplateTests { @@ -1211,8 +1211,10 @@ public class ReactiveMongoTemplateTests {
BlockingQueue<ChangeStreamEvent<Person>> documents = new LinkedBlockingQueue<>(100);
Disposable disposable = template
.changeStream(newAggregation(Person.class, match(where("operationType").is("replace"))), Person.class,
ChangeStreamOptions.empty(), "person")
.changeStream("person",
ChangeStreamOptions.builder()
.filter(newAggregation(Person.class, match(where("operationType").is("replace")))).build(),
Person.class)
.doOnNext(documents::add).subscribe();
Thread.sleep(500); // just give it some time to link to the collection.
@ -1246,8 +1248,7 @@ public class ReactiveMongoTemplateTests { @@ -1246,8 +1248,7 @@ public class ReactiveMongoTemplateTests {
StepVerifier.create(template.createCollection(Person.class)).expectNextCount(1).verifyComplete();
BlockingQueue<ChangeStreamEvent<Person>> documents = new LinkedBlockingQueue<>(100);
Disposable disposable = template
.changeStream(Collections.emptyList(), Person.class, ChangeStreamOptions.empty(), "person")
Disposable disposable = template.changeStream("person", ChangeStreamOptions.empty(), Person.class)
.doOnNext(documents::add).subscribe();
Thread.sleep(500); // just give it some time to link to the collection.
@ -1267,9 +1268,7 @@ public class ReactiveMongoTemplateTests { @@ -1267,9 +1268,7 @@ public class ReactiveMongoTemplateTests {
BsonDocument resumeToken = documents.take().getRaw().getResumeToken();
BlockingQueue<ChangeStreamEvent<Person>> resumeDocuments = new LinkedBlockingQueue<>(100);
template
.changeStream(Collections.emptyList(), Person.class,
ChangeStreamOptions.builder().resumeToken(resumeToken).build(), "person")
template.changeStream("person", ChangeStreamOptions.builder().resumeToken(resumeToken).build(), Person.class)
.doOnNext(resumeDocuments::add).subscribe();
Thread.sleep(500); // just give it some time to link receive all events
@ -1314,6 +1313,81 @@ public class ReactiveMongoTemplateTests { @@ -1314,6 +1313,81 @@ public class ReactiveMongoTemplateTests {
.verifyComplete();
}
@Test // DATAMONGO-2012
public void watchesDatabaseCorrectly() throws InterruptedException {
Assumptions.assumeThat(ReplicaSet.required().runsAsReplicaSet()).isTrue();
StepVerifier.create(template.createCollection(Person.class)).expectNextCount(1).verifyComplete();
StepVerifier.create(template.createCollection("personX")).expectNextCount(1).verifyComplete();
BlockingQueue<ChangeStreamEvent<Person>> documents = new LinkedBlockingQueue<>(100);
Disposable disposable = template.changeStream(ChangeStreamOptions.empty(), Person.class).doOnNext(documents::add)
.subscribe();
Thread.sleep(500); // just give it some time to link to the collection.
Person person1 = new Person("Spring", 38);
Person person2 = new Person("Data", 37);
Person person3 = new Person("MongoDB", 39);
StepVerifier.create(template.save(person1)).expectNextCount(1).verifyComplete();
StepVerifier.create(template.save(person2)).expectNextCount(1).verifyComplete();
StepVerifier.create(template.save(person3, "personX")).expectNextCount(1).verifyComplete();
Thread.sleep(500); // just give it some time to link receive all events
try {
Assertions.assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList()))
.containsExactly(person1, person2, person3);
} finally {
disposable.dispose();
}
}
@Test // DATAMONGO-2012
public void resumesAtTimestampCorrectly() throws InterruptedException {
Assumptions.assumeThat(ReplicaSet.required().runsAsReplicaSet()).isTrue();
StepVerifier.create(template.createCollection(Person.class)).expectNextCount(1).verifyComplete();
BlockingQueue<ChangeStreamEvent<Person>> documents = new LinkedBlockingQueue<>(100);
Disposable disposable = template.changeStream("person", ChangeStreamOptions.empty(), Person.class)
.doOnNext(documents::add).subscribe();
Thread.sleep(500); // just give it some time to link to the collection.
Person person1 = new Person("Spring", 38);
Person person2 = new Person("Data", 37);
Person person3 = new Person("MongoDB", 39);
StepVerifier.create(template.save(person1)).expectNextCount(1).verifyComplete();
StepVerifier.create(template.save(person2)).expectNextCount(1).verifyComplete();
Thread.sleep(500); // just give it some time to link receive all events
disposable.dispose();
documents.take(); // skip first
Instant resumeAt = documents.take().getTimestamp(); // take 2nd
StepVerifier.create(template.save(person3)).expectNextCount(1).verifyComplete();
BlockingQueue<ChangeStreamEvent<Person>> resumeDocuments = new LinkedBlockingQueue<>(100);
template.changeStream("person", ChangeStreamOptions.builder().resumeAt(resumeAt).build(), Person.class)
.doOnNext(resumeDocuments::add).subscribe();
Thread.sleep(500); // just give it some time to link receive all events
try {
Assertions.assertThat(resumeDocuments.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList()))
.containsExactly(person2, person3);
} finally {
disposable.dispose();
}
}
private PersonWithAList createPersonWithAList(String firstname, int age) {
PersonWithAList p = new PersonWithAList();

28
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/SessionBoundMongoTemplateTests.java

@ -40,6 +40,7 @@ import org.bson.Document; @@ -40,6 +40,7 @@ import org.bson.Document;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@ -49,6 +50,7 @@ import org.springframework.aop.Advisor; @@ -49,6 +50,7 @@ import org.springframework.aop.Advisor;
import org.springframework.aop.framework.Advised;
import org.springframework.dao.DataAccessException;
import org.springframework.data.annotation.Id;
import org.springframework.data.geo.Point;
import org.springframework.data.mongodb.ClientSessionException;
import org.springframework.data.mongodb.LazyLoadingException;
import org.springframework.data.mongodb.MongoDbFactory;
@ -60,6 +62,7 @@ import org.springframework.data.mongodb.core.convert.DefaultDbRefResolver; @@ -60,6 +62,7 @@ import org.springframework.data.mongodb.core.convert.DefaultDbRefResolver;
import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.convert.MongoCustomConversions;
import org.springframework.data.mongodb.core.index.GeospatialIndex;
import org.springframework.data.mongodb.core.mapping.DBRef;
import org.springframework.data.mongodb.core.mapping.MongoMappingContext;
import org.springframework.data.mongodb.core.query.Query;
@ -295,6 +298,31 @@ public class SessionBoundMongoTemplateTests { @@ -295,6 +298,31 @@ public class SessionBoundMongoTemplateTests {
session.close();
}
@Test // DATAMONGO-2012
@Ignore("error 2 (BadValue): $match does not support $geoNear, $near, and $nearSphere")
public void countWithGeoInTransaction() {
if (!template.collectionExists(Person.class)) {
template.createCollection(Person.class);
template.indexOps(Person.class).ensureIndex(new GeospatialIndex("location"));
} else {
template.remove(Person.class).all();
}
ClientSession session = client.startSession();
session.startTransaction();
MongoTemplate sessionBound = template.withSession(session);
sessionBound.save(new Person("Kylar Stern"));
assertThat(sessionBound.query(Person.class).matching(query(where("location").near(new Point(1, 0)))).count())
.isZero();
session.commitTransaction();
session.close();
}
@Test // DATAMONGO-2001
public void countShouldReturnIsolatedCount() throws InterruptedException {

134
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTests.java

@ -21,8 +21,11 @@ import static org.springframework.data.mongodb.core.messaging.SubscriptionUtils. @@ -21,8 +21,11 @@ import static org.springframework.data.mongodb.core.messaging.SubscriptionUtils.
import static org.springframework.data.mongodb.core.query.Criteria.*;
import static org.springframework.data.mongodb.core.query.Query.*;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
@ -39,17 +42,16 @@ import org.junit.ClassRule; @@ -39,17 +42,16 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.ChangeStreamOptions;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.mapping.Field;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest.ChangeStreamRequestOptions;
import org.springframework.data.mongodb.core.messaging.ChangeStreamTask.ChangeStreamEventMessage;
import org.springframework.data.mongodb.core.messaging.Message.MessageProperties;
import org.springframework.data.mongodb.core.messaging.SubscriptionUtils.*;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.data.mongodb.test.util.MongoTestUtils;
import org.springframework.data.mongodb.test.util.ReplicaSet;
import com.mongodb.MongoClient;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
@ -80,7 +82,7 @@ public class ChangeStreamTests { @@ -80,7 +82,7 @@ public class ChangeStreamTests {
@Before
public void setUp() {
template = new MongoTemplate(new MongoClient(), "change-stream-tests");
template = new MongoTemplate(MongoTestUtils.replSetClient(), "change-stream-tests");
template.dropCollection(User.class);
container = new DefaultMessageListenerContainer(template, executor);
@ -361,8 +363,10 @@ public class ChangeStreamTests { @@ -361,8 +363,10 @@ public class ChangeStreamTests {
public void readsOnlyDiffForUpdateWhenOptionsDeclareDefaultExplicitly() throws InterruptedException {
CollectingMessageListener<ChangeStreamDocument<Document>, User> messageListener = new CollectingMessageListener<>();
ChangeStreamRequest<User> request = new ChangeStreamRequest<>(messageListener, new ChangeStreamRequestOptions(
"user", ChangeStreamOptions.builder().fullDocumentLookup(FullDocument.DEFAULT).build()));
ChangeStreamRequest<User> request = ChangeStreamRequest.builder() //
.collection("user") //
.fullDocumentLookup(FullDocument.DEFAULT) //
.publishTo(messageListener).build();
Subscription subscription = container.register(request, User.class);
awaitSubscription(subscription);
@ -381,8 +385,10 @@ public class ChangeStreamTests { @@ -381,8 +385,10 @@ public class ChangeStreamTests {
public void readsFullDocumentForUpdateWhenNotMappedToDomainTypeButLookupSpecified() throws InterruptedException {
CollectingMessageListener<ChangeStreamDocument<Document>, Document> messageListener = new CollectingMessageListener<>();
ChangeStreamRequest<Document> request = new ChangeStreamRequest<>(messageListener,
new ChangeStreamRequestOptions("user", ChangeStreamOptions.builder().returnFullDocumentOnUpdate().build()));
ChangeStreamRequest<Document> request = ChangeStreamRequest.builder() //
.collection("user") //
.fullDocumentLookup(FullDocument.UPDATE_LOOKUP) //
.publishTo(messageListener).build();
Subscription subscription = container.register(request, Document.class);
awaitSubscription(subscription);
@ -399,11 +405,123 @@ public class ChangeStreamTests { @@ -399,11 +405,123 @@ public class ChangeStreamTests {
.append("user_name", "jellyBelly").append("age", 8).append("_class", User.class.getName()));
}
@Test // DATAMONGO-2012
public void resumeAtTimestampCorrectly() throws InterruptedException {
CollectingMessageListener<ChangeStreamDocument<Document>, User> messageListener1 = new CollectingMessageListener<>();
Subscription subscription1 = container.register(new ChangeStreamRequest<>(messageListener1, () -> "user"),
User.class);
awaitSubscription(subscription1);
template.save(jellyBelly);
template.save(sugarSplashy);
awaitMessages(messageListener1, 12);
Instant resumeAt = ((ChangeStreamEventMessage) messageListener1.getLastMessage()).getTimestamp();
template.save(huffyFluffy);
awaitMessages(messageListener1, 3);
CollectingMessageListener<ChangeStreamDocument<Document>, User> messageListener2 = new CollectingMessageListener<>();
ChangeStreamRequest<User> subSequentRequest = ChangeStreamRequest.builder() //
.collection("user") //
.resumeAt(resumeAt) //
.publishTo(messageListener2) //
.build();
Subscription subscription2 = container.register(subSequentRequest, User.class);
awaitSubscription(subscription2);
awaitMessages(messageListener2);
List<User> messageBodies = messageListener2.getMessages().stream().map(Message::getBody)
.collect(Collectors.toList());
assertThat(messageBodies).hasSize(2).doesNotContain(jellyBelly);
}
@Test // DATAMONGO-1996
public void filterOnNestedElementWorksCorrectly() throws InterruptedException {
CollectingMessageListener<ChangeStreamDocument<Document>, User> messageListener = new CollectingMessageListener<>();
ChangeStreamRequest<User> request = ChangeStreamRequest.builder(messageListener) //
.collection("user") //
.filter(newAggregation(User.class, match(where("address.street").is("flower street")))) //
.build();
Subscription subscription = container.register(request, User.class);
awaitSubscription(subscription);
jellyBelly.address = new Address();
jellyBelly.address.street = "candy ave";
huffyFluffy.address = new Address();
huffyFluffy.address.street = "flower street";
template.save(jellyBelly);
template.save(sugarSplashy);
template.save(huffyFluffy);
awaitMessages(messageListener);
List<User> messageBodies = messageListener.getMessages().stream().map(Message::getBody)
.collect(Collectors.toList());
assertThat(messageBodies).hasSize(1).contains(huffyFluffy);
}
@Test // DATAMONGO-1996
public void filterOnUpdateDescriptionElement() throws InterruptedException {
template.save(jellyBelly);
template.save(sugarSplashy);
template.save(huffyFluffy);
CollectingMessageListener<ChangeStreamDocument<Document>, User> messageListener = new CollectingMessageListener<>();
ChangeStreamRequest<User> request = ChangeStreamRequest.builder(messageListener) //
.collection("user") //
.filter(newAggregation(User.class, match(where("updateDescription.updatedFields.address").exists(true)))) //
.fullDocumentLookup(FullDocument.UPDATE_LOOKUP).build();
Subscription subscription = container.register(request, User.class);
awaitSubscription(subscription);
template.update(User.class).matching(query(where("id").is(jellyBelly.id)))
.apply(Update.update("address", new Address("candy ave"))).first();
template.update(User.class).matching(query(where("id").is(sugarSplashy.id))).apply(new Update().inc("age", 1))
.first();
template.update(User.class).matching(query(where("id").is(huffyFluffy.id)))
.apply(Update.update("address", new Address("flower street"))).first();
awaitMessages(messageListener);
List<User> messageBodies = messageListener.getMessages().stream().map(Message::getBody)
.collect(Collectors.toList());
assertThat(messageBodies).hasSize(2);
}
@Data
static class User {
@Id String id;
@Field("user_name") String userName;
int age;
Address address;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
static class Address {
@Field("s") String street;
}
}

32
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainerTests.java

@ -36,6 +36,7 @@ import org.springframework.data.annotation.Id; @@ -36,6 +36,7 @@ import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.MongoDbFactory;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.SimpleMongoDbFactory;
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions;
import org.springframework.data.mongodb.test.util.ReplicaSet;
import org.springframework.test.annotation.IfProfileValue;
import org.springframework.util.ErrorHandler;
@ -54,9 +55,12 @@ public class DefaultMessageListenerContainerTests { @@ -54,9 +55,12 @@ public class DefaultMessageListenerContainerTests {
public static final String DATABASE_NAME = "change-stream-events";
public static final String COLLECTION_NAME = "collection-1";
public static final String COLLECTION_2_NAME = "collection-2";
MongoDbFactory dbFactory;
MongoCollection<Document> collection;
private MongoCollection<Document> collection2;
private CollectingMessageListener<Object, Object> messageListener;
private MongoTemplate template;
@ -69,7 +73,10 @@ public class DefaultMessageListenerContainerTests { @@ -69,7 +73,10 @@ public class DefaultMessageListenerContainerTests {
template = new MongoTemplate(dbFactory);
template.dropCollection(COLLECTION_NAME);
template.dropCollection(COLLECTION_2_NAME);
collection = template.getCollection(COLLECTION_NAME);
collection2 = template.getCollection(COLLECTION_2_NAME);
messageListener = new CollectingMessageListener<>();
}
@ -308,6 +315,31 @@ public class DefaultMessageListenerContainerTests { @@ -308,6 +315,31 @@ public class DefaultMessageListenerContainerTests {
assertThat(changeStreamListener.getFirstMessage().getRaw()).isInstanceOf(ChangeStreamDocument.class);
}
@Test // DATAMONGO-2012
@IfProfileValue(name = "replSet", value = "true")
public void databaseLevelWatch() throws InterruptedException {
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
Subscription subscription = container.register(new ChangeStreamRequest(messageListener, RequestOptions.none()),
Person.class);
container.start();
awaitSubscription(subscription, Duration.ofMillis(500));
collection.insertOne(new Document("_id", "col-1-id-1").append("firstname", "foo"));
collection.insertOne(new Document("_id", "col-1-id-2").append("firstname", "bar"));
collection2.insertOne(new Document("_id", "col-2-id-1").append("firstname", "bar"));
collection2.insertOne(new Document("_id", "col-2-id-2").append("firstname", "foo"));
awaitMessages(messageListener, 4, Duration.ofMillis(500));
assertThat(messageListener.getMessages().stream().map(Message::getBody).collect(Collectors.toList()))
.containsExactly(new Person("col-1-id-1", "foo"), new Person("col-1-id-2", "bar"),
new Person("col-2-id-1", "bar"), new Person("col-2-id-2", "foo"));
}
@Data
static class Person {
@Id String id;

82
src/main/asciidoc/reference/change-streams.adoc

@ -0,0 +1,82 @@ @@ -0,0 +1,82 @@
[[change-streams]]
== Change Streams
As of MongoDB 3.6, https://docs.mongodb.com/manual/changeStreams/[Change Streams] let applications get notified about changes without having to tail the oplog.
NOTE: Change Stream support is only possible for replica sets or for a sharded cluster.
Change Streams can be subscribed to with both the imperative and the reactive MongoDB Java driver. It is highly recommended to use the reactive variant, as it is less resource-intensive. However if you cannot use the reactive API, you can still obtain the change events by using the messaging concept that is already prevalent in the Spring ecosystem.
It is possible to watch both on a collection as well as database level, whereas the database level variant publishes
changes from all collections within the database. So when subscribing to a database change stream, make sure to use a
suitable type for the event type in use as conversion might not apply correctly when set to specificly. In doubt use
`Document`.
=== Change Streams with `MessageListener`
Listening to a https://docs.mongodb.com/manual/tutorial/change-streams-example/[Change Stream by using a Sync Driver] is a long running, blocking task that needs to be delegated to a separate component.
In this case, we need to first create a `MessageListenerContainer`, which will be the main entry point for running the specific `SubscriptionRequest` tasks.
Spring Data MongoDB already ships with a default implementation that operates on `MongoTemplate` and is capable of creating and executing `Task` instances for a `ChangeStreamRequest`.
The following example shows how to use Change Streams with `MessageListener` instances:
.Change Streams with `MessageListener` instances
====
[source,java]
----
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start(); <1>
MessageListener<ChangeStreamDocument<Document>, User> listener = System.out::println; <2>
ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("user", ChangeStreamOptions.empty()); <3>
Subscription subscription = container.register(new ChangeStreamRequest<>(listener, options), User.class); <4>
// ...
container.stop(); <5>
----
<1> Starting the container intializes the resources and starts the `Task` instances for the already registered `SubscriptionRequest` instances. Requests added after the startup are run immediately.
<2> Define the listener called when a `Message` is received. The `Message#getBody()` is converted to the requested domain type. Use `Document` to receive raw results without conversion.
<3> Set the collection to listen to and provide additional options through `ChangeStreamOptions`.
<4> Register the request. The returned `Subscription` can be used to check the current `Task` state and cancel its execution to free resources.
<5> Do not forget to stop the container once you are sure you no longer need it. Doing so stops all running `Task` instances within the container.
====
=== Change Streams - Reactive
Subscribing to Change Stream with the reactive API is more straightforward. Still the essential building blocks, such as `ChangeStreamOptions`, remain the same. The following example shows how to use Change Streams with reactive `MessageListeners`:
.Change Streams with `MessageListeners`
====
[source,java]
----
ChangeStreamOptions options = ChangeStreamOptions.builder()
.filter(newAggregation(User.class, match(where("age").gte(38))) <1>
.build();
Flux<ChangeStreamEvent<User>> flux = reactiveTemplate.changeStream("user", options, User.class); <2>
----
<1> Use an aggregation pipeline to filter events.
<2> Obtain a `Flux` of change stream events. The `ChangeStreamEvent#getBody()` is converted to the requested domain type. Use `Document` to receive raw results without conversion.
====
=== Resuming Change Streams
Change Streams can be resumed and will pick up emitting events where you left. To resume the stream either a resume
token or the server time in UTC from where to resume is required. Use `ChangeStreamOptions` to set the value
accordingly.
.Resume a Change Stream
====
[source,java]
----
ChangeStreamOptions = ChangeStreamOptions.builder()
.resumeAt(Instant.now().minusSeconds(1)) <1>
.build()
Flux<ChangeStreamEvent<Person>> resumed = template.changeStream("person", options, User.class)
----
<1> You may obtain the server time of an `ChangeStreamEvent` via the `getTimestamp` method or use the `resumeToken`
exposed via `getResumeToken`.
====

55
src/main/asciidoc/reference/mongodb.adoc

@ -3072,57 +3072,4 @@ class GridFsClient { @@ -3072,57 +3072,4 @@ class GridFsClient {
`GridFsOperations` extends `ResourcePatternResolver` and lets the `GridFsTemplate` (for example) to be plugged into an `ApplicationContext` to read Spring Config files from MongoDB database.
[[change-streams]]
== Change Streams
As of MongoDB 3.6, https://docs.mongodb.com/manual/changeStreams/[Change Streams] let applications get notified about changes without having to tail the oplog.
NOTE: Change Stream support is only possible for replica sets or for a sharded cluster.
Change Streams can be subscribed to with both the imperative and the reactive MongoDB Java driver. It is highly recommended to use the reactive variant, as it is less resource-intensive. However if you cannot use the reactive API, you can still obtain the change events by using the messaging concept that is already prevalent in the Spring ecosystem.
=== Change Streams with `MessageListener`
Listening to a https://docs.mongodb.com/manual/tutorial/change-streams-example/[Change Stream by using a Sync Driver] is a long running, blocking task that needs to be delegated to a separate component.
In this case, we need to first create a `MessageListenerContainer`, which will be the main entry point for running the specific `SubscriptionRequest` tasks.
Spring Data MongoDB already ships with a default implementation that operates on `MongoTemplate` and is capable of creating and executing `Task` instances for a `ChangeStreamRequest`.
The following example shows how to use Change Streams with `MessageListener` instances:
.Change Streams with `MessageListener` instances
====
[source,java]
----
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start(); <1>
MessageListener<ChangeStreamDocument<Document>, User> listener = System.out::println; <2>
ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("user", ChangeStreamOptions.empty()); <3>
Subscription subscription = container.register(new ChangeStreamRequest<>(listener, options), User.class); <4>
// ...
container.stop(); <5>
----
<1> Starting the container intializes the resources and starts the `Task` instances for the already registered `SubscriptionRequest` instances. Requests added after the startup are run immediately.
<2> Define the listener called when a `Message` is received. The `Message#getBody()` is converted to the requested domain type. Use `Document` to receive raw results without conversion.
<3> Set the collection to listen to and provide additional options through `ChangeStreamOptions`.
<4> Register the request. The returned `Subscription` can be used to check the current `Task` state and cancel its execution to free resources.
<5> Do not forget to stop the container once you are sure you no longer need it. Doing so stops all running `Task` instances within the container.
====
=== Change Streams - Reactive
Subscribing to Change Stream with the reactive API is more straightforward. Still the essential building blocks, such as `ChangeStreamOptions`, remain the same. The following example shows how to use Change Streams with reactive `MessageListeners`:
.Change Streams with `MessageListeners`
====
[source,java]
----
Aggregation filter = newAggregation(User.class, match(where("age").gte(38)); <1>
Flux<ChangeStreamEvent<User>> flux = reactiveTemplate.changeStream(filter), User.class, ChangeStreamOptions.empty()); <2>
----
<1> Use an aggregation pipeline to filter events.
<2> Obtain a `Flux` of change stream events. The `ChangeStreamEvent#getBody()` is converted to the requested domain type. Use `Document` to receive raw results without conversion.
====
include::change-streams.adoc[]
Loading…
Cancel
Save