Browse Source

DATAMONGO-1803 - Add support for ChangeStreams.

As of MongoDB 3.6, Change Streams allow application to get notified about changes without having to tailing the oplog.

NOTE: Change Stream support is only available with replica sets or a sharded cluster.

Change Streams can be subscribed to with both the imperative and the reactive MongoDB java driver. It is highly recommended to use the reactive variant as it is less resource intensive. However if you do not feel comfortable using the reactive API for whatever reason, you can sill obtain the change events via a Messaging concept already common in the Spring ecosystem.

== Change Streams - Sync ==

Listening to a Change Stream using a Sync Driver is a long running, blocking task that needs to be delegated to a separate component.
In this case we need to create a MessageListenerContainer first which will be the main entry point for running the specific SubscriptionRequests.
Spring Data MongoDB already ships with a default implementation that operates upon MongoTemplate and is capable of creating and executing Tasks for a ChangeStreamRequest.

MessageListenerContainer container = MessageListenerContainer.create(template);
container.start();
MessageListener<ChangeStreamDocument<Document>, User> listener = System.out::println;
ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("user", ChangeStreamOptions.empty());

Subscription subscription = container.register(new ChangeStreamRequest<>(listener, options), User.class);

== Change Streams - Reactive ==

Subscribing to Change Stream via the reactive API is clearly more straight forward. Still the building blocks like ChangeStreamOptions remain the same.

Aggregation filter = newAggregation(User.class, match(where("age").gte(38));
Flux<ChangeStreamEvent<User>> flux = reactiveTemplate.changeStream(filter), User.class, ChangeStreamOptions.empty());

== Tailable Cursors - Sync ==

This commit also adds support for tailable cursors using the synchronous driver to be used with capped collections:

MessageListenerContainer container = MessageListenerContainer.create(template);
container.start();
TailableCursorRequestOptions options = TailableCursorRequestOptions.builder()
  .collection("user")
  .filter(query(where("age").is(7)))
  .build()

container.register(new TailableCursorRequest<>(messageListener, options, User.class));

Original pull request: #528.
pull/528/merge
Christoph Strobl 8 years ago committed by Mark Paluch
parent
commit
162a936736
  1. 2
      pom.xml
  2. 8
      spring-data-mongodb/pom.xml
  3. 116
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java
  4. 199
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java
  5. 45
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java
  6. 62
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java
  7. 2
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/ExposedFields.java
  8. 110
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/PrefixingDelegatingAggregationOperationContext.java
  9. 34
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/Cancelable.java
  10. 329
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java
  11. 304
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java
  12. 155
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/Message.java
  13. 33
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/MessageListener.java
  14. 145
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/MessageListenerContainer.java
  15. 78
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/SimpleMessage.java
  16. 70
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/Subscription.java
  17. 59
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/SubscriptionRequest.java
  18. 166
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/TailableCursorRequest.java
  19. 51
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/Task.java
  20. 527
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/TaskFactory.java
  21. 6
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/package-info.java
  22. 74
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/Message.java
  23. 53
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateTests.java
  24. 201
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java
  25. 13
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java
  26. 395
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTests.java
  27. 227
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/CursorReadingTaskUnitTests.java
  28. 324
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainerTests.java
  29. 285
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainerUnitTests.java
  30. 165
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/SubscriptionUtils.java
  31. 195
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/TailableCursorTests.java
  32. 80
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/TaskFactoryUnitTests.java
  33. 27
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/monitor/Resumeable.java
  34. 106
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/test/util/ReplicaSet.java
  35. 1
      src/main/asciidoc/new-features.adoc
  36. 53
      src/main/asciidoc/reference/mongodb.adoc

2
pom.xml

@ -29,7 +29,7 @@ @@ -29,7 +29,7 @@
<dist.id>spring-data-mongodb</dist.id>
<springdata.commons>2.1.0.BUILD-SNAPSHOT</springdata.commons>
<mongo>3.6.0</mongo>
<mongo.reactivestreams>1.6.0</mongo.reactivestreams>
<mongo.reactivestreams>1.7.0</mongo.reactivestreams>
<jmh.version>1.19</jmh.version>
</properties>

8
spring-data-mongodb/pom.xml

@ -19,6 +19,7 @@ @@ -19,6 +19,7 @@
<objenesis>1.3</objenesis>
<equalsverifier>1.7.8</equalsverifier>
<java-module-name>spring.data.mongodb</java-module-name>
<multithreadedtc>1.01</multithreadedtc>
</properties>
<dependencies>
@ -245,6 +246,13 @@ @@ -245,6 +246,13 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>edu.umd.cs.mtc</groupId>
<artifactId>multithreadedtc</artifactId>
<version>${multithreadedtc}</version>
<scope>test</scope>
</dependency>
<!-- Kotlin extension -->
<dependency>
<groupId>org.jetbrains.kotlin</groupId>

116
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java

@ -0,0 +1,116 @@ @@ -0,0 +1,116 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core;
import lombok.EqualsAndHashCode;
import java.util.concurrent.atomic.AtomicReference;
import org.bson.Document;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.messaging.Message;
import org.springframework.lang.Nullable;
import org.springframework.util.ClassUtils;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
/**
* {@link Message} implementation specific to MongoDB <a href="https://docs.mongodb.com/manual/changeStreams/">Change
* Streams</a>.
*
* @author Christoph Strobl
* @since 2.1
*/
@EqualsAndHashCode
public class ChangeStreamEvent<T> {
@Nullable private final ChangeStreamDocument<Document> raw;
private final Class<T> targetType;
private final MongoConverter converter;
private final AtomicReference<T> converted = new AtomicReference<>();
/**
* @param raw can be {@literal null}.
* @param messageProperties must not be {@literal null}.
* @param targetType must not be {@literal null}.
* @param converter must not be {@literal null}.
*/
public ChangeStreamEvent(ChangeStreamDocument<Document> raw, Class<T> targetType, MongoConverter converter) {
this.raw = raw;
this.targetType = targetType;
this.converter = converter;
}
/**
* Get the raw {@link ChangeStreamDocument} as emitted by the driver.
*
* @return can be {@literal null}.
*/
@Nullable
public ChangeStreamDocument<Document> getRaw() {
return raw;
}
/**
* Get the potentially converted {@link ChangeStreamDocument#getFullDocument()}.
*
* @return {@literal null} when {@link #getRaw()} or {@link ChangeStreamDocument#getFullDocument()} is
* {@literal null}.
*/
@Nullable
public T getBody() {
if (raw == null) {
return targetType.cast(raw);
}
if (raw.getFullDocument() == null) {
return targetType.cast(raw.getFullDocument());
}
return getConverted();
}
private T getConverted() {
T result = converted.get();
if (result != null) {
return result;
}
if (ClassUtils.isAssignable(Document.class, raw.getFullDocument().getClass())) {
result = converter.read(targetType, raw.getFullDocument());
return converted.compareAndSet(null, result) ? result : converted.get();
}
if (converter.getConversionService().canConvert(raw.getFullDocument().getClass(), targetType)) {
result = converter.getConversionService().convert(raw.getFullDocument(), targetType);
return converted.compareAndSet(null, result) ? result : converted.get();
}
throw new IllegalArgumentException(String.format("No converter found capable of converting %s to %s",
raw.getFullDocument().getClass(), targetType));
}
@Override
public String toString() {
return "ChangeStreamEvent {" + "raw=" + raw + ", targetType=" + targetType + '}';
}
}

199
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java

@ -0,0 +1,199 @@ @@ -0,0 +1,199 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core;
import lombok.EqualsAndHashCode;
import java.util.Arrays;
import java.util.Optional;
import org.bson.BsonValue;
import org.bson.Document;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.query.Collation;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
/**
* Options applicable to MongoDB <a href="https://docs.mongodb.com/manual/changeStreams/">Change Streams</a>. Intended
* to be used along with {@link org.springframework.data.mongodb.core.messaging.ChangeStreamRequest} in a sync world as
* well {@link ReactiveMongoOperations} if you prefer it that way.
*
* @author Christoph Strobl
* @since 2.1
*/
@EqualsAndHashCode
public class ChangeStreamOptions {
private @Nullable Object filter;
private @Nullable BsonValue resumeToken;
private @Nullable FullDocument fullDocumentLookup;
private @Nullable Collation collation;
/**
* @return {@link Optional#empty()} if not set.
*/
public Optional<Object> getFilter() {
return Optional.ofNullable(filter);
}
/**
* @return {@link Optional#empty()} if not set.
*/
public Optional<BsonValue> getResumeToken() {
return Optional.ofNullable(resumeToken);
}
/**
* @return {@link Optional#empty()} if not set.
*/
public Optional<FullDocument> getFullDocumentLookup() {
return Optional.ofNullable(fullDocumentLookup);
}
/**
* @return {@link Optional#empty()} if not set.
*/
public Optional<Collation> getCollation() {
return Optional.ofNullable(collation);
}
/**
* @return empty {@link ChangeStreamOptions}.
*/
public static ChangeStreamOptions empty() {
return ChangeStreamOptions.builder().build();
}
/**
* Obtain a shiny new {@link ChangeStreamRequestOptionsBuilder} and start defining options in this fancy fluent way.
* Just don't forget to call {@link ChangeStreamRequestOptionsBuilder#build() build()} when your're done.
*
* @return new instance of {@link ChangeStreamRequestOptionsBuilder}.
*/
public static ChangeStreamOptionsBuilder builder() {
return new ChangeStreamOptionsBuilder();
}
/**
* Builder for creating {@link ChangeStreamOptions}.
*
* @author Christoph Strobl
* @since 2.1
*/
public static class ChangeStreamOptionsBuilder {
private ChangeStreamOptions options = new ChangeStreamOptions();
/**
* Set the collation to use.
*
* @param collation must not be {@literal null} nor {@literal empty}.
* @return this.
*/
public ChangeStreamOptionsBuilder collation(Collation collation) {
Assert.notNull(collation, "Collation must not be null nor empty!");
options.collation = collation;
return this;
}
/**
* Set the filter to apply.
* <p/>
* Fields on aggregation expression root level are prefixed to map to fields contained in
* {@link ChangeStreamDocument#getFullDocument() fullDocument}. However {@literal operationType}, {@literal ns},
* {@literal documentKey} and {@literal fullDocument} are reserved words that will be omitted, and therefore taken
* as given, during the mapping procedure. You may want to have a look at the
* <a href="https://docs.mongodb.com/manual/reference/change-events/">structure of Change Events</a>.
* <p/>
* Use {@link org.springframework.data.mongodb.core.aggregation.TypedAggregation} to ensure filter expressions are
* mapped to domain type fields.
*
* @param filter the {@link Aggregation Aggregation pipeline} to apply for filtering events. Must not be
* {@literal null}.
* @return this.
*/
public ChangeStreamOptionsBuilder filter(Aggregation filter) {
Assert.notNull(filter, "Filter must not be null!");
options.filter = filter;
return this;
}
/**
* Set the plain filter chain to apply.
*
* @param filter must not be {@literal null} nor contain {@literal null} values.
* @return this.
*/
public ChangeStreamOptionsBuilder filter(Document... filter) {
Assert.noNullElements(filter, "Filter must not contain null values");
options.filter = Arrays.asList(filter);
return this;
}
/**
* Set the resume token (typically a {@link org.bson.BsonDocument} containing a {@link org.bson.BsonBinary binary
* token}) after which to start with listening.
*
* @param resumeToken must not be {@literal null}.
* @return this.
*/
public ChangeStreamOptionsBuilder resumeToken(BsonValue resumeToken) {
Assert.notNull(resumeToken, "ResumeToken must not be null!");
options.resumeToken = resumeToken;
return this;
}
/**
* Set the {@link FullDocument} lookup to {@link FullDocument#UPDATE_LOOKUP}.
*
* @return this.
* @see #fullDocumentLookup(FullDocument)
*/
public ChangeStreamOptionsBuilder returnFullDocumentOnUpdate() {
return fullDocumentLookup(FullDocument.UPDATE_LOOKUP);
}
/**
* Set the {@link FullDocument} lookup to use.
*
* @param lookup must not be {@literal null}.
* @return this.
*/
public ChangeStreamOptionsBuilder fullDocumentLookup(FullDocument lookup) {
Assert.notNull(lookup, "Lookup must not be null!");
options.fullDocumentLookup = lookup;
return this;
}
public ChangeStreamOptions build() {
ChangeStreamOptions tmp = options;
options = new ChangeStreamOptions();
return tmp;
}
}
}

45
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java

@ -19,6 +19,7 @@ import reactor.core.publisher.Flux; @@ -19,6 +19,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Collection;
import java.util.List;
import org.bson.Document;
import org.reactivestreams.Publisher;
@ -1108,6 +1109,50 @@ public interface ReactiveMongoOperations extends ReactiveFluentMongoOperations { @@ -1108,6 +1109,50 @@ public interface ReactiveMongoOperations extends ReactiveFluentMongoOperations {
*/
<T> Flux<T> tail(Query query, Class<T> entityClass, String collectionName);
/**
* Subscribe to a MongoDB <a href="https://docs.mongodb.com/manual/changeStreams/">Change Streams</a> via the reactive
* infrastructure. Use the optional provided {@link Aggregation} to filter events. The stream will not be completed
* unless the {@link org.reactivestreams.Subscription} is {@link Subscription#cancel() canceled}.
* <p />
* The {@link ChangeStreamEvent#getBody()} is mapped to the {@literal resultType} while the
* {@link ChangeStreamEvent#getRaw()} contains the unmodified payload.
* <p />
* Use {@link ChangeStreamOptions} to set arguments like {@link ChangeStreamOptions#getResumeToken() the resumseToken}
* for resuming change streams.
*
* @param filter can be {@literal null}.
* @param resultType must not be {@literal null}.
* @param options must not be {@literal null}.
* @param collectionName must not be {@literal null} nor empty.
* @param <T>
* @return
* @since 2.1
*/
<T> Flux<ChangeStreamEvent<T>> changeStream(@Nullable Aggregation filter, Class<T> resultType,
ChangeStreamOptions options, String collectionName);
/**
* Subscribe to a MongoDB <a href="https://docs.mongodb.com/manual/changeStreams/">Change Streams</a> via the reactive
* infrastructure. Use the optional provided aggregation chain to filter events. The stream will not be completed
* unless the {@link org.reactivestreams.Subscription} is {@link Subscription#cancel() canceled}.
* <p />
* The {@link ChangeStreamEvent#getBody()} is mapped to the {@literal resultType} while the
* {@link ChangeStreamEvent#getRaw()} contains the unmodified payload.
* <p />
* Use {@link ChangeStreamOptions} to set arguments like {@link ChangeStreamOptions#getResumeToken() the resumeToken}
* for resuming change streams.
*
* @param filter can be {@literal null}.
* @param resultType must not be {@literal null}.
* @param options must not be {@literal null}.
* @param collectionName must not be {@literal null} nor empty.
* @param <T>
* @return
* @since 2.1
*/
<T> Flux<ChangeStreamEvent<T>> changeStream(List<Document> filter, Class<T> resultType, ChangeStreamOptions options,
String collectionName);
/**
* Returns the underlying {@link MongoConverter}.
*

62
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java

@ -24,17 +24,8 @@ import reactor.core.publisher.Flux; @@ -24,17 +24,8 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -76,6 +67,7 @@ import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory; @@ -76,6 +67,7 @@ import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
import org.springframework.data.mongodb.core.aggregation.PrefixingDelegatingAggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.TypeBasedAggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.convert.*;
@ -128,9 +120,11 @@ import com.mongodb.client.model.FindOneAndUpdateOptions; @@ -128,9 +120,11 @@ import com.mongodb.client.model.FindOneAndUpdateOptions;
import com.mongodb.client.model.ReturnDocument;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.ValidationOptions;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.UpdateResult;
import com.mongodb.reactivestreams.client.AggregatePublisher;
import com.mongodb.reactivestreams.client.ChangeStreamPublisher;
import com.mongodb.reactivestreams.client.DistinctPublisher;
import com.mongodb.reactivestreams.client.FindPublisher;
import com.mongodb.reactivestreams.client.MongoClient;
@ -1786,6 +1780,53 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati @@ -1786,6 +1780,53 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
new TailingQueryFindPublisherPreparer(query, entityClass));
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.core.ReactiveMongoOperations#tail(org.springframework.data.mongodb.core.aggregation.Aggregation, java.lang.Class, org.springframework.data.mongodb.core.ChangeStreamOptions, java.lang.String)
*/
@Override
public <T> Flux<ChangeStreamEvent<T>> changeStream(@Nullable Aggregation filter, Class<T> resultType,
ChangeStreamOptions options, String collectionName) {
if (filter == null) {
return changeStream(Collections.emptyList(), resultType, options, collectionName);
}
AggregationOperationContext context = filter instanceof TypedAggregation ? new TypeBasedAggregationOperationContext(
((TypedAggregation) filter).getInputType(), mappingContext, queryMapper) : Aggregation.DEFAULT_CONTEXT;
return changeStream(
filter.toPipeline(new PrefixingDelegatingAggregationOperationContext(context, "fullDocument",
Arrays.asList("operationType", "fullDocument", "documentKey", "updateDescription", "ns"))),
resultType, options, collectionName);
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.core.ReactiveMongoOperations#tail(java.util.List, java.lang.Class, org.springframework.data.mongodb.core.ChangeStreamOptions, java.lang.String)
*/
@Override
public <T> Flux<ChangeStreamEvent<T>> changeStream(List<Document> filter, Class<T> resultType,
ChangeStreamOptions options, String collectionName) {
ChangeStreamPublisher<Document> publisher = filter.isEmpty() ? getCollection(collectionName).watch()
: getCollection(collectionName).watch(filter);
if (options.getResumeToken().isPresent()) {
publisher = publisher.resumeAfter(options.getResumeToken().get().asDocument());
}
if (options.getFullDocumentLookup().isPresent() || resultType != Document.class) {
publisher = publisher.fullDocument(options.getFullDocumentLookup().isPresent()
? options.getFullDocumentLookup().get() : FullDocument.UPDATE_LOOKUP);
}
if (options.getCollation().isPresent()) {
publisher = publisher.collation(options.getCollation().map(Collation::toMongoCollation).get());
}
return Flux.from(publisher).map(document -> new ChangeStreamEvent(document, resultType, getConverter()));
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.core.ReactiveFindOperation#query(java.lang.Class)
@ -2605,7 +2646,6 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati @@ -2605,7 +2646,6 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
*
* @author Mark Paluch
*/
interface ReactiveCollectionQueryCallback<T> extends ReactiveCollectionCallback<T> {
FindPublisher<T> doInCollection(MongoCollection<Document> collection) throws MongoException, DataAccessException;

2
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/ExposedFields.java

@ -355,7 +355,7 @@ public final class ExposedFields implements Iterable<ExposedField> { @@ -355,7 +355,7 @@ public final class ExposedFields implements Iterable<ExposedField> {
* @author Christoph Strobl
* @since 1.10
*/
interface FieldReference {
public interface FieldReference {
/**
* Returns the raw, unqualified reference, i.e. the field reference without a {@literal $} prefix.

110
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/PrefixingDelegatingAggregationOperationContext.java

@ -0,0 +1,110 @@ @@ -0,0 +1,110 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.aggregation;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.bson.Document;
import org.springframework.data.mongodb.core.aggregation.ExposedFields.FieldReference;
/**
* {@link AggregationOperationContext} implementation prefixing non command keys on root level with the given prefix.
* Useful when mapping fields to domain specific types while having to prefix keys for query purpose.
* <p />
* Fields to be excluded from prefixing my be added to a {@literal blacklist}.
*
* @author Christoph Strobl
* @since 2.1
*/
public class PrefixingDelegatingAggregationOperationContext implements AggregationOperationContext {
private final AggregationOperationContext delegate;
private final String prefix;
private final Set<String> blacklist;
public PrefixingDelegatingAggregationOperationContext(AggregationOperationContext delegate, String prefix) {
this(delegate, prefix, Collections.emptySet());
}
public PrefixingDelegatingAggregationOperationContext(AggregationOperationContext delegate, String prefix,
Collection<String> blacklist) {
this.delegate = delegate;
this.prefix = prefix;
this.blacklist = new HashSet<>(blacklist);
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.core.aggregation.AggregationOperationContext#getMappedObject(org.bson.Document)
*/
@Override
public Document getMappedObject(Document document) {
return prefix(delegate.getMappedObject(document));
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.core.aggregation.AggregationOperationContext#getReference(org.springframework.data.mongodb.core.aggregation.Field)
*/
@Override
public FieldReference getReference(Field field) {
return delegate.getReference(field);
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.core.aggregation.AggregationOperationContext#getReference(java.lang.String)
*/
@Override
public FieldReference getReference(String name) {
return delegate.getReference(name);
}
private Document prefix(Document source) {
Document result = new Document();
for (Map.Entry<String, Object> entry : source.entrySet()) {
String key = (entry.getKey().startsWith("$") || blacklist.contains(entry.getKey())) ? entry.getKey()
: (prefix + "." + entry.getKey());
Object value = entry.getValue();
if (entry.getValue() instanceof Collection) {
List tmp = new ArrayList();
for (Object o : (Collection) entry.getValue()) {
if (o instanceof Document) {
tmp.add(prefix((Document) o));
} else {
tmp.add(o);
}
}
value = tmp;
} else if (entry.getValue() instanceof Document) {
value = entry.getValue();
}
result.append(key, value);
}
return result;
}
}

34
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/Cancelable.java

@ -0,0 +1,34 @@ @@ -0,0 +1,34 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.messaging;
import org.springframework.dao.DataAccessResourceFailureException;
/**
* Cancelable allows stopping long running tasks and freeing underlying resources.
*
* @author Christoph Strobl
* @since 2.1
*/
public interface Cancelable {
/**
* Abort and free resources.
*
* @throws DataAccessResourceFailureException
*/
void cancel() throws DataAccessResourceFailureException;
}

329
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java

@ -0,0 +1,329 @@ @@ -0,0 +1,329 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.messaging;
import org.bson.BsonValue;
import org.bson.Document;
import org.springframework.data.mongodb.core.ChangeStreamOptions;
import org.springframework.data.mongodb.core.ChangeStreamOptions.ChangeStreamOptionsBuilder;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest.ChangeStreamRequestOptions;
import org.springframework.data.mongodb.core.query.Collation;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
/**
* {@link SubscriptionRequest} implementation to be used for listening to
* <a href="https://docs.mongodb.com/manual/changeStreams/">Change Streams</a> via a {@link MessageListenerContainer}
* using the synchronous MongoDB Java driver.
* <p/>
* The most trivial use case is subscribing to all events of a specific {@link com.mongodb.client.MongoCollection
* collection}.
*
* <pre>
* <code>
* ChangeStreamRequest<Document> request = new ChangeStreamRequest<>(System.out::println, () -> "collection-name");
* </code>
* </pre>
*
* For more advanced scenarios {@link ChangeStreamOptions} offers abstractions for options like filtering, resuming,...
*
* <pre>
* <code>
* ChangeStreamOptions options = ChangeStreamOptions.builder()
* .filter(newAggregation(match(where("age").is(7))))
* .returnFullDocumentOnUpdate()
* .build();
*
* ChangeStreamRequest<Document> request = new ChangeStreamRequest<>(System.out::println, new ChangeStreamRequestOptions("collection-name", options));
* </code>
* </pre>
*
* {@link ChangeStreamRequestBuilder} offers a fluent API for creating {@link ChangeStreamRequest} with
* {@link ChangeStreamOptions} in one go.
*
* <pre>
* <code>
* ChangeStreamRequest<Document> request = ChangeStreamRequest.builder()
* .collectionName("collection-name")
* .publishTo(System.out::println)
* .filter(newAggregation(match(where("age").is(7))))
* .fullDocumentLookup(UPDATE_LOOKUP)
* .build();
* </code>
* </pre>
*
* {@link Message Messges} passed to the {@link MessageListener} contain the {@link ChangeStreamDocument} within their
* {@link Message#getRaw() raw value} while the {@code fullDocument} is extracted into the {@link Message#getBody()
* messages body}. Unless otherwise specified (via {@link ChangeStreamOptions#getFullDocumentLookup()} the
* {@link Message#getBody() message body} for {@code update events} will be empty for a {@link Document} target type.
* {@link Message#getBody()} Message bodies} that map to a different target type automatically enforce an
* {@link FullDocument#UPDATE_LOOKUP}.
*
* @author Christoph Strobl
* @since 2.1
*/
public class ChangeStreamRequest<T>
implements SubscriptionRequest<ChangeStreamDocument<Document>, T, ChangeStreamRequestOptions> {
private final MessageListener<ChangeStreamDocument<Document>, T> messageListener;
private final ChangeStreamRequestOptions options;
/**
* Create a new {@link ChangeStreamRequest} with options, passing {@link Message messages} to the given
* {@link MessageListener}.
*
* @param messageListener must not be {@literal null}.
* @param options must not be {@literal null}.
*/
public ChangeStreamRequest(MessageListener<ChangeStreamDocument<Document>, T> messageListener,
RequestOptions options) {
Assert.notNull(messageListener, "MessageListener must not be null!");
Assert.notNull(options, "Options must not be null!");
this.options = options instanceof ChangeStreamRequestOptions ? (ChangeStreamRequestOptions) options
: ChangeStreamRequestOptions.of(options);
this.messageListener = messageListener;
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.monitor.SubscriptionRequest#getMessageListener()
*/
@Override
public MessageListener<ChangeStreamDocument<Document>, T> getMessageListener() {
return messageListener;
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.monitor.SubscriptionRequest#getRequestOptions()
*/
@Override
public ChangeStreamRequestOptions getRequestOptions() {
return options;
}
/**
* Obtain a shiny new {@link ChangeStreamRequestBuilder} and start defining your {@link ChangeStreamRequest} in this
* fancy fluent way. Just don't forget to call {@link ChangeStreamRequestBuilder#build() build()} when your're done.
*
* @return new instance of {@link ChangeStreamRequest}.
*/
public static ChangeStreamRequestBuilder builder() {
return new ChangeStreamRequestBuilder();
}
/**
* {@link SubscriptionRequest.RequestOptions} implementation specific to a {@link ChangeStreamRequest}.
*
* @author Christoph Strobl
* @since 2.1
*/
public static class ChangeStreamRequestOptions implements SubscriptionRequest.RequestOptions {
private final String collectionName;
private final ChangeStreamOptions options;
/**
* Create new {@link ChangeStreamRequestOptions}.
*
* @param collectionName must not be {@literal null}.
* @param options must not be {@literal null}.
*/
public ChangeStreamRequestOptions(String collectionName, ChangeStreamOptions options) {
Assert.notNull(collectionName, "CollectionName must not be null!");
Assert.notNull(options, "Options must not be null!");
this.collectionName = collectionName;
this.options = options;
}
static ChangeStreamRequestOptions of(RequestOptions options) {
Assert.notNull(options, "Options must not be null!");
return new ChangeStreamRequestOptions(options.getCollectionName(), ChangeStreamOptions.builder().build());
}
/**
* Get the {@link ChangeStreamOptions} defined.
*
* @return never {@literal null}.
*/
public ChangeStreamOptions getChangeStreamOptions() {
return options;
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.monitor.SubscriptionRequest.RequestOptions#getCollectionName()
*/
@Override
public String getCollectionName() {
return collectionName;
}
}
/**
* Builder for creating {@link ChangeStreamRequest}.
*
* @author Christoph Strobl
* @since 2.1
* @see ChangeStreamOptions
*/
public static class ChangeStreamRequestBuilder<T> {
private @Nullable String collectionName;
private @Nullable MessageListener<ChangeStreamDocument<Document>, T> listener;
private ChangeStreamOptionsBuilder delegate = ChangeStreamOptions.builder();
/**
* Set the name of the {@link com.mongodb.client.MongoCollection} to listen to.
*
* @param collectionName must not be {@literal null} nor empty.
* @return this.
*/
public ChangeStreamRequestBuilder collection(String collectionName) {
Assert.hasText(collectionName, "CollectionName must not be null!");
this.collectionName = collectionName;
return this;
}
/**
* Set the {@link MessageListener} event {@link Message messages} will be published to.
*
* @param messageListener must not be {@literal null}.
* @return this.
*/
public ChangeStreamRequestBuilder publishTo(MessageListener<ChangeStreamDocument<Document>, T> messageListener) {
Assert.notNull(messageListener, "MessageListener must not be null!");
this.listener = messageListener;
return this;
}
/**
* Set the filter to apply.
* <p/>
* Fields on aggregation expression root level are prefixed to map to fields contained in
* {@link ChangeStreamDocument#getFullDocument() fullDocument}. However {@literal operationType}, {@literal ns},
* {@literal documentKey} and {@literal fullDocument} are reserved words that will be omitted, and therefore taken
* as given, during the mapping procedure. You may want to have a look at the
* <a href="https://docs.mongodb.com/manual/reference/change-events/">structure of Change Events</a>.
* <p/>
* Use {@link org.springframework.data.mongodb.core.aggregation.TypedAggregation} to ensure filter expressions are
* mapped to domain type fields.
*
* @param filter the {@link Aggregation Aggregation pipeline} to apply for filtering events. Must not be
* {@literal null}.
* @return this.
* @see ChangeStreamOptions#getFilter()
* @see ChangeStreamOptionsBuilder#filter(Aggregation)
*/
public ChangeStreamRequestBuilder filter(Aggregation aggregation) {
this.delegate.filter(aggregation);
return this;
}
/**
* Set the plain filter chain to apply.
*
* @param filter must not be {@literal null} nor contain {@literal null} values.
* @return this.
* @see ChangeStreamOptions#getFilter()
* @see ChangeStreamOptionsBuilder#filter(java.util.List)
*/
public ChangeStreamRequestBuilder filter(Document... pipeline) {
this.delegate.filter(pipeline);
return this;
}
/**
* Set the collation to use.
*
* @param collation must not be {@literal null} nor {@literal empty}.
* @return this.
* @see ChangeStreamOptions#getCollation()
* @see ChangeStreamOptionsBuilder#collation(Collation)
*/
public ChangeStreamRequestBuilder collation(Collation collation) {
this.delegate.collation(collation);
return this;
}
/**
* Set the resume token (typically a {@link org.bson.BsonDocument} containing a {@link org.bson.BsonBinary binary
* token}) after which to start with listening.
*
* @param resumeToken must not be {@literal null}.
* @return this.
* @see ChangeStreamOptions#getResumeToken()
* @see ChangeStreamOptionsBuilder#resumeToken(org.bson.BsonValue)
*/
public ChangeStreamRequestBuilder resumeToken(BsonValue resumeToken) {
this.delegate.resumeToken(resumeToken);
return this;
}
/**
* Set the {@link FullDocument} lookup to {@link FullDocument#UPDATE_LOOKUP}.
*
* @return this.
* @see #fullDocumentLookup(FullDocument)
* @see ChangeStreamOptions#getFullDocumentLookup()
* @see ChangeStreamOptionsBuilder#fullDocumentLookup(FullDocument)
*/
public ChangeStreamRequestBuilder fullDocumentLookup(FullDocument lookup) {
this.delegate.fullDocumentLookup(lookup);
return this;
}
public ChangeStreamRequest<T> build() {
return new ChangeStreamRequest(listener, new ChangeStreamRequestOptions(collectionName, delegate.build()));
}
}
}

304
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java

@ -0,0 +1,304 @@ @@ -0,0 +1,304 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.messaging;
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.RequiredArgsConstructor;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ErrorHandler;
/**
* Simple {@link Executor} based {@link MessageListenerContainer} implementation for running {@link Task tasks} like
* listening to MongoDB <a href="https://docs.mongodb.com/manual/changeStreams/">Change Streams</a> and tailable
* cursors.
*
* @author Christoph Strobl
* @since 2.1
*/
public class DefaultMessageListenerContainer implements MessageListenerContainer {
private final Executor taskExecutor;
private final Object lifecycleMonitor = new Object();
private int phase = Integer.MAX_VALUE;
private boolean running = false;
private final Map<SubscriptionRequest, Subscription> subscriptions = new LinkedHashMap<>();
private final TaskFactory taskFactory;
private final Optional<ErrorHandler> errorHandler;
/**
* Create a new {@link DefaultMessageListenerContainer}.
*
* @param template must not be {@literal null}.
*/
public DefaultMessageListenerContainer(MongoTemplate template) {
this(template, new SimpleAsyncTaskExecutor());
}
/**
* Create a new {@link DefaultMessageListenerContainer} running {@link Task tasks} via the given
* {@literal taskExecutor}.
*
* @param template must not be {@literal null}.
* @param taskExecutor must not be {@literal null}.
*/
public DefaultMessageListenerContainer(MongoTemplate template, Executor taskExecutor) {
this(template, taskExecutor, null);
}
/**
* Create a new {@link DefaultMessageListenerContainer} running {@link Task tasks} via the given
* {@literal taskExecutor} delegating {@link Exception errors} to the given {@link ErrorHandler}.
*
* @param template must not be {@literal null}. Used by the {@link TaskFactory}.
* @param taskExecutor must not be {@literal null}.
* @param errorHandler the default {@link ErrorHandler} to be used by tasks inside the container. Can be
* {@literal null}.
*/
public DefaultMessageListenerContainer(MongoTemplate template, Executor taskExecutor,
@Nullable ErrorHandler errorHandler) {
Assert.notNull(template, "Template must not be null!");
Assert.notNull(taskExecutor, "TaskExecutor must not be null!");
this.taskExecutor = taskExecutor;
this.taskFactory = new TaskFactory(template);
this.errorHandler = Optional.ofNullable(errorHandler);
}
/*
* (non-Javadoc)
* @see org.springframework.context.SmartLifecycle#isAutoStartup()
*/
@Override
public boolean isAutoStartup() {
return false;
}
/*
* (non-Javadoc)
* @see org.springframework.context.SmartLifecycle#stop(java.lang.Runnable)
*/
@Override
public void stop(Runnable callback) {
stop();
callback.run();
}
/*
* (non-Javadoc)
* @see org.springframework.context.Lifecycle#start()
*/
@Override
public void start() {
synchronized (lifecycleMonitor) {
if (!this.running) {
for (Subscription subscription : subscriptions.values()) {
if (!subscription.isActive()) {
if (subscription instanceof TaskSubscription) {
taskExecutor.execute(((TaskSubscription) subscription).getTask());
}
}
}
running = true;
}
}
}
/*
* (non-Javadoc)
* @see org.springframework.context.Lifecycle#stop()
*/
@Override
public void stop() {
synchronized (lifecycleMonitor) {
if (this.running) {
for (Subscription subscription : subscriptions.values()) {
subscription.cancel();
}
running = false;
}
}
}
/*
* (non-Javadoc)
* @see org.springframework.context.Lifecycle#isRunning()
*/
@Override
public boolean isRunning() {
synchronized (this.lifecycleMonitor) {
return running;
}
}
/*
* (non-Javadoc)
* @see org.springframework.context.Phased#getPhase()
*/
@Override
public int getPhase() {
return this.phase;
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.monitor.MessageListenerContainer#register(org.springframework.data.mongodb.monitor.SubscriptionRequest, java.lang.Class)
*/
@Override
public <S, T> Subscription register(SubscriptionRequest<S, ? super T, ? extends RequestOptions> request,
Class<T> bodyType) {
return register(request, bodyType, errorHandler.orElseGet(
() -> new DecoratingLoggingErrorHandler((exception) -> lookup(request).ifPresent(Subscription::cancel))));
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.monitor.MessageListenerContainer#register(org.springframework.data.mongodb.monitor.SubscriptionRequest, java.lang.Class, org.springframework.util.ErrorHandler)
*/
@Override
public <S, T> Subscription register(SubscriptionRequest<S, ? super T, ? extends RequestOptions> request,
Class<T> bodyType, ErrorHandler errorHandler) {
return register(request, taskFactory.forRequest(request, bodyType, errorHandler));
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.monitor.MessageListenerContainer#lookup(org.springframework.data.mongodb.monitor.SubscriptionRequest)
*/
@Override
public Optional<Subscription> lookup(SubscriptionRequest<?, ?, ?> request) {
synchronized (lifecycleMonitor) {
return Optional.ofNullable(subscriptions.get(request));
}
}
public Subscription register(SubscriptionRequest request, Task task) {
Subscription subscription = new TaskSubscription(task);
synchronized (lifecycleMonitor) {
if (subscriptions.containsKey(request)) {
return subscriptions.get(request);
}
this.subscriptions.put(request, subscription);
if (this.running) {
taskExecutor.execute(task);
}
}
return subscription;
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.monitor.MessageListenerContainer#remove(org.springframework.data.mongodb.monitor.Subscription)
*/
@Override
public void remove(Subscription subscription) {
synchronized (lifecycleMonitor) {
if (subscriptions.containsValue(subscription)) {
if (subscription.isActive()) {
subscription.cancel();
}
subscriptions.values().remove(subscription);
}
}
}
/**
* @author Christoph Strobl
* @since 2.1
*/
@EqualsAndHashCode
static class TaskSubscription implements Subscription {
private final Task task;
TaskSubscription(Task task) {
this.task = task;
}
Task getTask() {
return task;
}
@Override
public boolean isActive() {
return task.isActive();
}
@Override
public void cancel() throws DataAccessResourceFailureException {
task.cancel();
}
}
/**
* @author Christoph Strobl
* @since 2.1
*/
@RequiredArgsConstructor(access = AccessLevel.PACKAGE)
private static class DecoratingLoggingErrorHandler implements ErrorHandler {
private final Log logger = LogFactory.getLog(DecoratingLoggingErrorHandler.class);
private final ErrorHandler delegate;
@Override
public void handleError(Throwable t) {
if (logger.isErrorEnabled()) {
logger.error("Unexpected error occurred while listening to MongoDB.", t);
}
delegate.handleError(t);
}
}
}

155
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/Message.java

@ -0,0 +1,155 @@ @@ -0,0 +1,155 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.messaging;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* General message abstraction for any type of Event / Message published by MongoDB server to the client. This might be
* <a href="https://docs.mongodb.com/manual/reference/change-events/">Change Stream Events</a>, or
* {@link org.bson.Document Documents} published by a
* <a href="https://docs.mongodb.com/manual/core/tailable-cursors/">tailable cursor</a>. The original message received
* is preserved in the raw parameter. Additional information about the origin of the {@link Message} is contained in
* {@link MessageProperties}. <br />
* For convenience the {@link #getBody()} of the message gets lazily converted into the target domain type if necessary
* using the mapping infrastructure.
*
* @author Christoph Strobl
* @see MessageProperties
* @since 2.1
*/
public interface Message<S, T> {
/**
* The raw message source as emitted by the origin.
*
* @return can be {@literal null}.
*/
@Nullable
S getRaw();
/**
* The converted message body if available.
*
* @return can be {@literal null}.
*/
@Nullable
T getBody();
/**
* {@link MessageProperties} containing information about the {@link Message} origin and other metadata.
*
* @return never {@literal null}.
*/
MessageProperties getProperties();
/**
* @author Christoph Strobl
* @since 2.1
*/
@ToString
@EqualsAndHashCode
static class MessageProperties {
private static final MessageProperties EMPTY = new MessageProperties();
private @Nullable String databaseName;
private @Nullable String collectionName;
/**
* The database name the message originates from.
*
* @return
*/
@Nullable
public String getDatabaseName() {
return databaseName;
}
/**
* The collection name the message originates from.
*
* @return
*/
@Nullable
public String getCollectionName() {
return collectionName;
}
/**
* @return empty {@link MessageProperties}.
*/
public static MessageProperties empty() {
return EMPTY;
}
/**
* Obtain a shiny new {@link MessagePropertiesBuilder} and start defining options in this fancy fluent way. Just
* don't forget to call {@link MessagePropertiesBuilder#build() build()} when your're done.
*
* @return new instance of {@link MessagePropertiesBuilder}.
*/
public static MessagePropertiesBuilder builder() {
return new MessagePropertiesBuilder();
}
/**
* Builder for {@link MessageProperties}.
*
* @author Christoph Strobl
* @since 2.1
*/
public static class MessagePropertiesBuilder {
private MessageProperties properties = new MessageProperties();
/**
* @param dbName must not be {@literal null}.
* @return this.
*/
public MessagePropertiesBuilder databaseName(String dbName) {
Assert.notNull(dbName, "DbName must not be null!");
properties.databaseName = dbName;
return this;
}
/**
* @param collectionName must not be {@literal null}.
* @return this
*/
public MessagePropertiesBuilder collectionName(String collectionName) {
Assert.notNull(collectionName, "CollectionName must not be null!");
properties.collectionName = collectionName;
return this;
}
public MessageProperties build() {
MessageProperties properties = this.properties;
this.properties = new MessageProperties();
return properties;
}
}
}
}

33
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/MessageListener.java

@ -0,0 +1,33 @@ @@ -0,0 +1,33 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.messaging;
/**
* Listener interface to receive delivery of {@link Message Messages}.
*
* @author Christoph Strobl
* @since 2.1
*/
@FunctionalInterface
public interface MessageListener<S, T> {
/**
* Callback invoked on receiving {@link Message}.
*
* @param message never {@literal null}.
*/
void onMessage(Message<S, T> message);
}

145
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/MessageListenerContainer.java

@ -0,0 +1,145 @@ @@ -0,0 +1,145 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.messaging;
import java.util.Optional;
import org.springframework.context.SmartLifecycle;
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions;
import org.springframework.util.ErrorHandler;
/**
* Internal abstraction used by the framework representing a message listener container. <strong>Not</strong> meant to
* be implemented externally.
*
* @author Christoph Strobl
* @since 2.1
*/
interface MessageListenerContainer extends SmartLifecycle {
/**
* Register a new {@link SubscriptionRequest} in the container. If the {@link MessageListenerContainer#isRunning() is
* already running} the {@link Subscription} will be added and run immediately, otherwise it'll be scheduled and
* started once the container is actually {@link MessageListenerContainer#start() started}.
*
* <pre>
* <code>
* MessageListenerContainer container = ...
*
* MessageListener<ChangeStreamDocument<Document>, Object> messageListener = (message) -> message....
* ChangeStreamRequest<Object> request = new ChangeStreamRequest<>(messageListener, () -> "collection-name");
*
* Subscription subscription = container.register(request);
* </code>
* </pre>
*
* Errors during {@link Message} retrieval lead to {@link Subscription#cancel() cannelation} of the underlying task.
*
* @param request must not be {@literal null}.
* @return never {@literal null}.
*/
default <T> Subscription register(SubscriptionRequest<T, Object, ? extends RequestOptions> request) {
return register(request, Object.class);
}
/**
* Register a new {@link SubscriptionRequest} in the container. If the {@link MessageListenerContainer#isRunning() is
* already running} the {@link Subscription} will be added and run immediately, otherwise it'll be scheduled and
* started once the container is actually {@link MessageListenerContainer#start() started}.
*
* <pre>
* <code>
* MessageListenerContainer container = ...
*
* MessageListener<ChangeStreamDocument<Document>, Document> messageListener = (message) -> message.getBody().toJson();
* ChangeStreamRequest<Document> request = new ChangeStreamRequest<>(messageListener, () -> "collection-name");
*
* Subscription subscription = container.register(request, Document.class);
* </code>
* </pre>
*
* On {@link MessageListenerContainer#stop()} all {@link Subscription subscriptions} are cancelled prior to shutting
* down the container itself.
* <p />
* Registering the very same {@link SubscriptionRequest} more than once simply returns the already existing
* {@link Subscription}.
* <p />
* Unless a {@link Subscription} is {@link #remove(Subscription) removed} form the container, the {@link Subscription}
* is restarted once the container itself is restarted.
* <p />
* Errors during {@link Message} retrieval lead to {@link Subscription#cancel() cannelation} of the underlying task.
*
* @param request must not be {@literal null}.
* @param type the exact target or a more concrete type of the {@link Message#getBody()}.
* @return never {@literal null}.
*/
<S, T> Subscription register(SubscriptionRequest<S, ? super T, ? extends RequestOptions> request, Class<T> bodyType);
/**
* Register a new {@link SubscriptionRequest} in the container. If the {@link MessageListenerContainer#isRunning() is
* already running} the {@link Subscription} will be added and run immediately, otherwise it'll be scheduled and
* started once the container is actually {@link MessageListenerContainer#start() started}.
*
* <pre>
* <code>
* MessageListenerContainer container = ...
*
* MessageListener<ChangeStreamDocument<Document>, Document> messageListener = (message) -> message.getBody().toJson();
* ChangeStreamRequest<Document> request = new ChangeStreamRequest<>(messageListener, () -> "collection-name");
*
* Subscription subscription = container.register(request, Document.class);
* </code>
* </pre>
*
* On {@link MessageListenerContainer#stop()} all {@link Subscription subscriptions} are cancelled prior to shutting
* down the container itself.
* <p />
* Registering the very same {@link SubscriptionRequest} more than once simply returns the already existing
* {@link Subscription}.
* <p />
* Unless a {@link Subscription} is {@link #remove(Subscription) removed} form the container, the {@link Subscription}
* is restarted once the container itself is restarted.
* <p />
* Errors during {@link Message} retrieval are delegated to the given {@link ErrorHandler}.
*
* @param request must not be {@literal null}.
* @param type the exact target or a more concrete type of the {@link Message#getBody()}. Must not be {@literal null}.
* @param errorHandler the callback to invoke when retrieving the {@link Message} from the data source fails for some
* reason.
* @return never {@literal null}.
*/
<S, T> Subscription register(SubscriptionRequest<S, ? super T, ? extends RequestOptions> request, Class<T> bodyType,
ErrorHandler errorHandler);
/**
* Unregister a given {@link Subscription} from the container. This prevents the {@link Subscription} to be restarted
* in a potential {@link SmartLifecycle#stop() stop}/{@link SmartLifecycle#start() start} scenario.<br />
* An {@link Subscription#isActive() active} {@link Subscription subcription} is {@link Subscription#cancel()
* cancelled} prior to removal.
*
* @param subscription must not be {@literal null}.
*/
void remove(Subscription subscription);
/**
* Lookup the given {@link SubscriptionRequest} within the container and return the associated {@link Subscription} if
* present.
*
* @param request must not be {@literal null}.
* @return {@link Optional#empty()} if not set.
*/
Optional<Subscription> lookup(SubscriptionRequest<?, ?, ?> request);
}

78
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/SimpleMessage.java

@ -0,0 +1,78 @@ @@ -0,0 +1,78 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.messaging;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* Trivial {@link Message} implementation.
*
* @author Christoph Strobl
* @since 2.1
*/
@EqualsAndHashCode
@ToString
class SimpleMessage<S, T> implements Message<S, T> {
private @Nullable final S raw;
private @Nullable final T body;
private final MessageProperties properties;
/**
* @param raw
* @param body
* @param properties must not be {@literal null}. Use {@link MessageProperties#empty()} instead.
*/
SimpleMessage(@Nullable S raw, @Nullable T body, MessageProperties properties) {
Assert.notNull(properties, "Properties must not be null! Use MessageProperties.empty() instead.");
this.raw = raw;
this.body = body;
this.properties = properties;
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.core.messaging.Message#getRaw()
*/
@Override
public S getRaw() {
return raw;
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.core.messaging.Message#getBody()
*/
@Override
public T getBody() {
return body;
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.core.messaging.Message#getProperties()
*/
@Override
public MessageProperties getProperties() {
return properties;
}
}

70
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/Subscription.java

@ -0,0 +1,70 @@ @@ -0,0 +1,70 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.messaging;
import java.time.Duration;
import org.springframework.util.Assert;
/**
* The {@link Subscription} is the link between the {@link SubscriptionRequest} and the actual running {@link Task}.
* <p />
* Due to the asynchronous nature of the {@link Task} execution a {@link Subscription} might not immediately become
* active. {@link #isActive()} provides an answer if the underlying {@link Task} is already running.
* <p />
*
* @author Christoph Strobl
* @since 2.1
*/
public interface Subscription extends Cancelable {
/**
* @return {@literal true} if the subscription is currently executed.
*/
boolean isActive();
/**
* Synchronous, <strong>blocking</strong> call that polls the current state and returns once the {@link Subscription}
* becomes {@link #isActive() active}.
* <p />
* If interrupted while waiting the current Subscription state is returned.
*
* @param timeout must not be {@literal null}.
*/
default boolean await(Duration timeout) {
Assert.notNull(timeout, "Timeout must not be null!");
long sleepTime = 25;
long currentMs = System.currentTimeMillis();
long targetMs = currentMs + timeout.toMillis();
while (currentMs < targetMs && !isActive()) {
try {
Thread.sleep(sleepTime);
currentMs += sleepTime;
} catch (InterruptedException e) {
Thread.interrupted();
break;
}
}
return isActive();
}
}

59
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/SubscriptionRequest.java

@ -0,0 +1,59 @@ @@ -0,0 +1,59 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.messaging;
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions;
/**
* The actual {@link SubscriptionRequest} sent to the {@link MessageListenerContainer}. This wrapper type allows passing
* in {@link RequestOptions additional information} to the Container which can be used for creating the actual
* {@link Task} running. <br />
* The {@link MessageListener} provides the callback interface when pushing {@link Message massages}.
*
* @author Christoph Strobl
* @since 2.1
*/
interface SubscriptionRequest<S, T, O extends RequestOptions> {
/**
* Obtain the {@link MessageListener} to publish {@link Message messages} to.
*
* @return never {@literal null}.
*/
MessageListener<S, T> getMessageListener();
/**
* Get the {@link RequestOptions} specifying the requests behaviour.
*
* @return never {@literal null}.
*/
O getRequestOptions();
/**
* Options for specifying the behaviour of the {@link SubscriptionRequest}.
*
* @author Christoph Strobl
* @since 2.1
*/
static interface RequestOptions {
/**
* @return the name of the collection to subscribe to. Never {@literal null}.
*/
String getCollectionName();
}
}

166
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/TailableCursorRequest.java

@ -0,0 +1,166 @@ @@ -0,0 +1,166 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.messaging;
import java.util.Optional;
import org.bson.Document;
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* {@link SubscriptionRequest} implementation to be used to listen to query results in a
* <a href="https://docs.mongodb.com/manual/core/capped-collections/">Capped Collection</a> using a
* <a href="https://docs.mongodb.com/manual/core/tailable-cursors/">Tailable Cursor</a>.
* <p />
* The most trivial use case is subscribing to all events of a specific {@link com.mongodb.client.MongoCollection
* collection}.
*
* <pre>
* <code>
* TailableCursorRequest<Document> request = new TailableCursorRequest<>(System.out::println, () -> "collection-name");
* </code>
* </pre>
*
* @author Christoph Strobl
* @since 2.1
*/
public class TailableCursorRequest<T> implements SubscriptionRequest<Document, T, RequestOptions> {
private final MessageListener<Document, T> messageListener;
private final TailableCursorRequestOptions options;
/**
* Create a new {@link TailableCursorRequest} with options, passing {@link Message messages} to the given
* {@link MessageListener}.
*
* @param messageListener must not be {@literal null}.
* @param options must not be {@literal null}.
*/
public TailableCursorRequest(MessageListener<Document, T> messageListener, RequestOptions options) {
Assert.notNull(messageListener, "MessageListener must not be null!");
Assert.notNull(options, "Options must not be null!");
this.messageListener = messageListener;
this.options = options instanceof TailableCursorRequestOptions ? (TailableCursorRequestOptions) options
: TailableCursorRequestOptions.of(options);
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.monitor.SubscriptionRequest#getMessageListener()
*/
@Override
public MessageListener<Document, T> getMessageListener() {
return messageListener;
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.monitor.SubscriptionRequest#getRequestOptions()
*/
@Override
public TailableCursorRequestOptions getRequestOptions() {
return options;
}
/**
* {@link SubscriptionRequest.RequestOptions} implementation specific to a {@link TailableCursorRequest}.
*
* @author Christoph Strobl
* @since 2.1
*/
public static class TailableCursorRequestOptions implements SubscriptionRequest.RequestOptions {
private @Nullable String collectionName;
private @Nullable Query query;
TailableCursorRequestOptions() {}
static TailableCursorRequestOptions of(RequestOptions options) {
return builder().collection(options.getCollectionName()).build();
}
/**
* Obtain a shiny new {@link TailableCursorRequestOptionsBuilder} and start defining options in this fancy fluent
* way. Just don't forget to call {@link TailableCursorRequestOptionsBuilder#build() build()} when your're done.
*
* @return new instance of {@link ChangeStreamRequestOptionsBuilder}.
*/
public static TailableCursorRequestOptionsBuilder builder() {
return new TailableCursorRequestOptionsBuilder();
}
@Override
public String getCollectionName() {
return collectionName;
}
public Optional<Query> getQuery() {
return Optional.ofNullable(query);
}
/**
* Builder for creating {@link TailableCursorRequestOptions}.
*
* @author Christoph Strobl
* @since 2.1
*/
public static class TailableCursorRequestOptionsBuilder {
TailableCursorRequestOptions options = new TailableCursorRequestOptions();
/**
* Set the collection name to listen to.
*
* @param collection must not be {@literal null} nor {@literal empty}.
* @return this.
*/
public TailableCursorRequestOptionsBuilder collection(String collection) {
Assert.hasText(collection, "Collection must not be null nor empty!");
options.collectionName = collection;
return this;
}
/**
* Set the filter to apply.
*
* @param filter the {@link Query } to apply for filtering events. Must not be {@literal null}.
* @return this.
*/
public TailableCursorRequestOptionsBuilder filter(Query filter) {
Assert.notNull(filter, "Filter must not be null!");
options.query = filter;
return this;
}
public TailableCursorRequestOptions build() {
TailableCursorRequestOptions tmp = options;
options = new TailableCursorRequestOptions();
return tmp;
}
}
}
}

51
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/Task.java

@ -0,0 +1,51 @@ @@ -0,0 +1,51 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.messaging;
import org.springframework.scheduling.SchedulingAwareRunnable;
/**
* The actual {@link Task} to run within the {@link MessageListenerContainer}.
*
* @author Christoph Strobl
* @since 2.1
*/
public interface Task extends SchedulingAwareRunnable, Cancelable {
/**
* @return {@literal true} if the task is currently {@link State#RUNNING running}.
*/
default boolean isActive() {
return State.RUNNING.equals(getState());
}
/**
* Get the current lifecycle phase.
*
* @return never {@literal null}.
*/
State getState();
/**
* The {@link Task.State} defining the lifecycle phase the actual {@link Task}.
*
* @author Christoph Strobl
* @since 2.1
*/
enum State {
CREATED, STARTING, RUNNING, CANCELLED;
}
}

527
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/TaskFactory.java

@ -0,0 +1,527 @@ @@ -0,0 +1,527 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.messaging;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import org.bson.BsonDocument;
import org.bson.Document;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.data.mongodb.core.ChangeStreamEvent;
import org.springframework.data.mongodb.core.ChangeStreamOptions;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.PrefixingDelegatingAggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.TypeBasedAggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.convert.QueryMapper;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest.ChangeStreamRequestOptions;
import org.springframework.data.mongodb.core.messaging.Message.MessageProperties;
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions;
import org.springframework.data.mongodb.core.messaging.TailableCursorRequest.TailableCursorRequestOptions;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ErrorHandler;
import com.mongodb.CursorType;
import com.mongodb.MongoNamespace;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Collation;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
/**
* A simple factory for creating {@link Task} for a given {@link SubscriptionRequest}.
*
* @author Christoph Strobl
* @since 2.1
*/
class TaskFactory {
private final MongoTemplate tempate;
/**
* @param template must not be {@literal null}.
*/
TaskFactory(MongoTemplate template) {
Assert.notNull(template, "Template must not be null!");
this.tempate = template;
}
/**
* Create a {@link Task} for the given {@link SubscriptionRequest}.
*
* @param request must not be {@literal null}.
* @param targetType must not be {@literal null}.
* @param errorHandler must not be {@literal null}.
* @return must not be {@literal null}. Consider {@code Object.class}.
* @throws IllegalArgumentException in case the {@link SubscriptionRequest} is unknown.
*/
Task forRequest(SubscriptionRequest<?, ?, ?> request, Class<?> targetType, ErrorHandler errorHandler) {
Assert.notNull(request, "Request must not be null!");
Assert.notNull(targetType, "TargetType must not be null!");
if (request instanceof ChangeStreamRequest) {
return new ChangeStreamTask(tempate, (ChangeStreamRequest) request, targetType, errorHandler);
} else if (request instanceof TailableCursorRequest) {
return new TailableCursorTask(tempate, (TailableCursorRequest) request, targetType, errorHandler);
}
throw new IllegalArgumentException(
"oh wow - seems you're using some fancy new feature we do not support. Please be so kind and leave us a note in the issue tracker so we can get this fixed.\nThank you!");
}
/**
* @author Christoph Strobl
* @since 2.1
*/
abstract static class CursorReadingTask<T> implements Task {
private final Object lifecycleMonitor = new Object();
private final SubscriptionRequest request;
private final MongoTemplate template;
private final Class<?> targetType;
private State state = State.CREATED;
private MongoCursor<T> cursor;
private final ErrorHandler errorHandler;
/**
* @param template must not be {@literal null}.
* @param request must not be {@literal null}.
* @param targetType must not be {@literal null}.
*/
public CursorReadingTask(MongoTemplate template, SubscriptionRequest request, Class<?> targetType,
ErrorHandler errorHandler) {
this.template = template;
this.request = request;
this.targetType = targetType;
this.errorHandler = errorHandler;
}
/*
* (non-Javadoc)
* @see java.lang.Runnable
*/
@Override
public void run() {
start();
while (isRunning()) {
try {
T next = getNext();
if (next != null) {
emitMessage(createMessage(next, targetType, request.getRequestOptions()));
} else {
Thread.sleep(10);
}
} catch (InterruptedException e) {
synchronized (lifecycleMonitor) {
state = State.CANCELLED;
}
Thread.interrupted();
} catch (Exception e) {
Exception toHandle = e;
if (e instanceof RuntimeException) {
Exception translated = template.getExceptionTranslator().translateExceptionIfPossible((RuntimeException) e);
toHandle = translated != null ? translated : e;
}
errorHandler.handleError(toHandle);
}
}
}
/**
* Initialize the Task by 1st setting the current state to {@link Task.State#STARTING starting} indicating the
* initialization procedure. <br />
* Moving on the underlying {@link MongoCursor} gets {@link #initCursor(MongoTemplate, RequestOptions) created} and
* is {@link #isValidCursor(MongoCursor) health checked}. Once a valid {@link MongoCursor} is created the
* {@link #state} is set to {@link Task.State#RUNNING running}. If the health check is not passed the
* {@link MongoCursor} is immediately {@link MongoCursor#close() closed} and a new {@link MongoCursor} is requested
* until a valid one is retrieved or the {@link #state} changes.
*/
private void start() {
synchronized (lifecycleMonitor) {
if (!State.RUNNING.equals(state)) {
state = State.STARTING;
}
}
do {
boolean valid = false;
synchronized (lifecycleMonitor) {
if (State.STARTING.equals(state)) {
MongoCursor<T> tmp = initCursor(template, request.getRequestOptions(), targetType);
valid = isValidCursor(tmp);
if (valid) {
cursor = tmp;
state = State.RUNNING;
} else {
tmp.close();
}
}
}
if (!valid) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
synchronized (lifecycleMonitor) {
state = State.CANCELLED;
}
Thread.interrupted();
}
}
} while (State.STARTING.equals(getState()));
}
protected abstract MongoCursor<T> initCursor(MongoTemplate template, RequestOptions options, Class<?> targetType);
@Override
public void cancel() throws DataAccessResourceFailureException {
synchronized (lifecycleMonitor) {
if (State.RUNNING.equals(state) || State.STARTING.equals(state)) {
this.state = State.CANCELLED;
if (cursor != null) {
cursor.close();
}
}
}
}
@Override
public boolean isLongLived() {
return true;
}
@Override
public State getState() {
synchronized (lifecycleMonitor) {
return state;
}
}
protected Message createMessage(T source, Class targetType, RequestOptions options) {
return new LazyMappingDelegatingMessage(new SimpleMessage(source, source, MessageProperties.builder()
.databaseName(template.getDb().getName()).collectionName(options.getCollectionName()).build()), targetType,
template.getConverter());
}
private boolean isRunning() {
return State.RUNNING.equals(getState());
}
private void emitMessage(Message message) {
request.getMessageListener().onMessage(message);
}
private T getNext() {
synchronized (lifecycleMonitor) {
if (State.RUNNING.equals(state)) {
return cursor.tryNext();
}
}
throw new IllegalStateException(String.format("Cursor %s is not longer open.", cursor));
}
private boolean isValidCursor(MongoCursor<?> cursor) {
if (cursor == null) {
return false;
}
if (cursor.getServerCursor() == null || cursor.getServerCursor().getId() == 0) {
return false;
}
return true;
}
}
/**
* {@link Task} implementation for obtaining {@link ChangeStreamDocument ChangeStreamDocuments} from MongoDB.
*
* @author Christoph Strobl
* @since 2.1
*/
static class ChangeStreamTask extends CursorReadingTask<ChangeStreamDocument<Document>> {
private final Set<String> blacklist = new HashSet<>(
Arrays.asList("operationType", "fullDocument", "documentKey", "updateDescription", "ns"));
private final QueryMapper queryMapper;
private final MongoConverter mongoConverter;
ChangeStreamTask(MongoTemplate template, ChangeStreamRequest request, Class<?> targetType,
ErrorHandler errorHandler) {
super(template, request, targetType, errorHandler);
queryMapper = new QueryMapper(template.getConverter());
mongoConverter = template.getConverter();
}
@Override
protected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate template, RequestOptions options,
Class<?> targetType) {
List<Document> filter = Collections.emptyList();
BsonDocument resumeToken = new BsonDocument();
Collation collation = null;
FullDocument fullDocument = FullDocument.DEFAULT;
if (options instanceof ChangeStreamRequestOptions) {
ChangeStreamOptions changeStreamOptions = ((ChangeStreamRequestOptions) options).getChangeStreamOptions();
filter = prepareFilter(template, changeStreamOptions);
if (changeStreamOptions.getFilter().isPresent()) {
Object val = changeStreamOptions.getFilter().get();
if (val instanceof Aggregation) {
collation = ((Aggregation) val).getOptions().getCollation()
.map(org.springframework.data.mongodb.core.query.Collation::toMongoCollation).orElse(null);
}
}
if (changeStreamOptions.getResumeToken().isPresent()) {
resumeToken = changeStreamOptions.getResumeToken().get().asDocument();
}
fullDocument = changeStreamOptions.getFullDocumentLookup()
.orElseGet(() -> ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT
: FullDocument.UPDATE_LOOKUP);
}
ChangeStreamIterable<Document> iterable = filter.isEmpty()
? template.getCollection(options.getCollectionName()).watch(Document.class)
: template.getCollection(options.getCollectionName()).watch(filter, Document.class);
if (!resumeToken.isEmpty()) {
iterable = iterable.resumeAfter(resumeToken);
}
if (collation != null) {
iterable = iterable.collation(collation);
}
iterable = iterable.fullDocument(fullDocument);
return iterable.iterator();
}
List<Document> prepareFilter(MongoTemplate template, ChangeStreamOptions options) {
if (!options.getFilter().isPresent()) {
return Collections.emptyList();
}
Object filter = options.getFilter().get();
if (filter instanceof Aggregation) {
Aggregation agg = (Aggregation) filter;
AggregationOperationContext context = agg instanceof TypedAggregation
? new TypeBasedAggregationOperationContext(((TypedAggregation) agg).getInputType(),
template.getConverter().getMappingContext(), queryMapper)
: Aggregation.DEFAULT_CONTEXT;
return agg.toPipeline(new PrefixingDelegatingAggregationOperationContext(context, "fullDocument", blacklist));
} else if (filter instanceof List) {
return (List<Document>) filter;
} else {
throw new IllegalArgumentException(
"ChangeStreamRequestOptions.filter mut be either an Aggregation or a plain list of Documents");
}
}
@Override
protected Message createMessage(ChangeStreamDocument<Document> source, Class targetType, RequestOptions options) {
// namespace might be null for eg. OperationType.INVALIDATE
MongoNamespace namespace = Optional.ofNullable(source.getNamespace())
.orElse(new MongoNamespace("unknown", options.getCollectionName()));
return new ChangeStreamEventMessage(new ChangeStreamEvent(source, targetType, mongoConverter), MessageProperties
.builder().databaseName(namespace.getDatabaseName()).collectionName(namespace.getCollectionName()).build());
}
/**
* {@link Message} implementation for ChangeStreams
*
* @since 2.1
*/
static class ChangeStreamEventMessage<T> implements Message<ChangeStreamDocument<Document>, T> {
private final ChangeStreamEvent<T> delegate;
private final MessageProperties messageProperties;
public ChangeStreamEventMessage(ChangeStreamEvent<T> event, MessageProperties messageProperties) {
this.delegate = event;
this.messageProperties = messageProperties;
}
@Nullable
@Override
public ChangeStreamDocument<Document> getRaw() {
return delegate.getRaw();
}
@Nullable
@Override
public T getBody() {
return delegate.getBody();
}
@Override
public MessageProperties getProperties() {
return this.messageProperties;
}
}
}
/**
* @author Christoph Strobl
* @since 2.1
*/
static class TailableCursorTask extends CursorReadingTask<Document> {
private QueryMapper queryMapper;
public TailableCursorTask(MongoTemplate template, TailableCursorRequest request, Class<?> targetType,
ErrorHandler errorHandler) {
super(template, request, targetType, errorHandler);
queryMapper = new QueryMapper(template.getConverter());
}
@Override
protected MongoCursor<Document> initCursor(MongoTemplate template, RequestOptions options, Class<?> targetType) {
Document filter = new Document();
Collation collation = null;
if (options instanceof TailableCursorRequestOptions) {
TailableCursorRequestOptions requestOptions = (TailableCursorRequestOptions) options;
if (requestOptions.getQuery().isPresent()) {
Query query = requestOptions.getQuery().get();
filter.putAll(queryMapper.getMappedObject(query.getQueryObject(), template.getConverter().getMappingContext()
.getPersistentEntity(targetType.equals(Document.class) ? Object.class : targetType)));
collation = query.getCollation().map(org.springframework.data.mongodb.core.query.Collation::toMongoCollation)
.orElse(null);
}
}
FindIterable<Document> iterable = template.getCollection(options.getCollectionName()).find(filter)
.cursorType(CursorType.TailableAwait).noCursorTimeout(true);
if (collation != null) {
iterable = iterable.collation(collation);
}
return iterable.iterator();
}
}
static class LazyMappingDelegatingMessage<S, T> implements Message<S, T> {
private final Message<S, ?> delegate;
private final Class<T> targetType;
private final MongoConverter converter;
public LazyMappingDelegatingMessage(Message<S, ?> delegate, Class<T> targetType, MongoConverter converter) {
this.delegate = delegate;
this.targetType = targetType;
this.converter = converter;
}
@Nullable
@Override
public S getRaw() {
return delegate.getRaw();
}
@Override
public T getBody() {
if (delegate.getBody() == null || targetType.equals(delegate.getBody().getClass())) {
return targetType.cast(delegate.getBody());
}
Object messageBody = delegate.getBody();
if (ClassUtils.isAssignable(Document.class, messageBody.getClass())) {
return converter.read(targetType, (Document) messageBody);
}
if (converter.getConversionService().canConvert(messageBody.getClass(), targetType)) {
return converter.getConversionService().convert(messageBody, targetType);
}
throw new IllegalArgumentException(
String.format("No converter found capable of converting %s to %s", messageBody.getClass(), targetType));
}
@Override
public MessageProperties getProperties() {
return delegate.getProperties();
}
@Override
public String toString() {
return "LazyMappingDelegatingMessage {" + "delegate=" + delegate + ", targetType=" + targetType + '}';
}
}
}

6
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/package-info.java

@ -0,0 +1,6 @@ @@ -0,0 +1,6 @@
/**
* MongoDB specific messaging support for listening to eg.
* <a href="https://docs.mongodb.com/manual/changeStreams/">Change Streams</a>.
*/
@org.springframework.lang.NonNullApi
package org.springframework.data.mongodb.core.messaging;

74
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/Message.java

@ -1,74 +0,0 @@ @@ -1,74 +0,0 @@
/*
* Copyright 2010-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core;
import java.util.Date;
import org.bson.types.ObjectId;
public class Message {
private ObjectId id;
private String text;
private Date timestamp;
public Message() {
}
public Message(String text) {
super();
this.text = text;
this.timestamp = new Date();
}
public Message(String text, Date timestamp) {
super();
this.text = text;
this.timestamp = timestamp;
}
public ObjectId getId() {
return id;
}
public void setId(ObjectId id) {
this.id = id;
}
public String getText() {
return text;
}
public void setText(String text) {
this.text = text;
}
public Date getTimestamp() {
return timestamp;
}
public void setTimestamp(Date timestamp) {
this.timestamp = timestamp;
}
@Override
public String toString() {
return "Message [id=" + id + ", text=" + text + ", timestamp=" + timestamp + "]";
}
}

53
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateTests.java

@ -3721,4 +3721,57 @@ public class MongoTemplateTests { @@ -3721,4 +3721,57 @@ public class MongoTemplateTests {
person.setId(UUID.randomUUID());
}
}
public static class Message {
private ObjectId id;
private String text;
private Date timestamp;
public Message() {}
public Message(String text) {
super();
this.text = text;
this.timestamp = new Date();
}
public Message(String text, Date timestamp) {
super();
this.text = text;
this.timestamp = timestamp;
}
public ObjectId getId() {
return id;
}
public void setId(ObjectId id) {
this.id = id;
}
public String getText() {
return text;
}
public void setText(String text) {
this.text = text;
}
public Date getTimestamp() {
return timestamp;
}
public void setTimestamp(Date timestamp) {
this.timestamp = timestamp;
}
@Override
public String toString() {
return "Message [id=" + id + ", text=" + text + ", timestamp=" + timestamp + "]";
}
}
}

201
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java

@ -17,6 +17,7 @@ package org.springframework.data.mongodb.core; @@ -17,6 +17,7 @@ package org.springframework.data.mongodb.core;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import static org.springframework.data.mongodb.core.aggregation.Aggregation.*;
import static org.springframework.data.mongodb.core.query.Criteria.*;
import static org.springframework.data.mongodb.core.query.Query.*;
@ -34,7 +35,10 @@ import java.util.Map; @@ -34,7 +35,10 @@ import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.assertj.core.api.Assumptions;
import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.types.ObjectId;
import org.junit.After;
@ -62,6 +66,8 @@ import org.springframework.data.mongodb.core.query.Criteria; @@ -62,6 +66,8 @@ import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.NearQuery;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.data.mongodb.test.util.Assertions;
import org.springframework.data.mongodb.test.util.ReplicaSet;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@ -298,9 +304,12 @@ public class ReactiveMongoTemplateTests { @@ -298,9 +304,12 @@ public class ReactiveMongoTemplateTests {
public void updateFirstByEntityTypeShouldUpdateObject() {
Person person = new Person("Oliver2", 25);
StepVerifier.create(template.insert(person) //
.then(template.updateFirst(new Query(where("age").is(25)), new Update().set("firstName", "Sven"), Person.class)) //
.flatMapMany(p -> template.find(new Query(where("age").is(25)), Person.class))).consumeNextWith(actual -> {
StepVerifier
.create(template.insert(person) //
.then(template.updateFirst(new Query(where("age").is(25)), new Update().set("firstName", "Sven"),
Person.class)) //
.flatMapMany(p -> template.find(new Query(where("age").is(25)), Person.class)))
.consumeNextWith(actual -> {
assertThat(actual.getFirstName(), is(equalTo("Sven")));
}).verifyComplete();
@ -324,7 +333,7 @@ public class ReactiveMongoTemplateTests { @@ -324,7 +333,7 @@ public class ReactiveMongoTemplateTests {
public void updateMultiByEntityTypeShouldUpdateObjects() {
Query query = new Query(
new Criteria().orOperator(where("firstName").is("Walter Jr"), Criteria.where("firstName").is("Walter")));
new Criteria().orOperator(where("firstName").is("Walter Jr"), where("firstName").is("Walter")));
StepVerifier
.create(template
@ -340,7 +349,7 @@ public class ReactiveMongoTemplateTests { @@ -340,7 +349,7 @@ public class ReactiveMongoTemplateTests {
public void updateMultiByCollectionNameShouldUpdateObject() {
Query query = new Query(
new Criteria().orOperator(where("firstName").is("Walter Jr"), Criteria.where("firstName").is("Walter")));
new Criteria().orOperator(where("firstName").is("Walter Jr"), where("firstName").is("Walter")));
List<Person> people = Arrays.asList(new Person("Walter", 50), //
new Person("Skyler", 43), //
@ -414,7 +423,7 @@ public class ReactiveMongoTemplateTests { @@ -414,7 +423,7 @@ public class ReactiveMongoTemplateTests {
.expectNextCount(3) //
.verifyComplete();
Query query = new Query(Criteria.where("firstName").is("Harry"));
Query query = new Query(where("firstName").is("Harry"));
Update update = new Update().inc("age", 1);
Person p = template.findAndModify(query, update, Person.class).block(); // return old
@ -436,7 +445,7 @@ public class ReactiveMongoTemplateTests { @@ -436,7 +445,7 @@ public class ReactiveMongoTemplateTests {
p = template.findOne(query, Person.class).block();
assertThat(p.getAge(), is(27));
Query query2 = new Query(Criteria.where("firstName").is("Mary"));
Query query2 = new Query(where("firstName").is("Mary"));
p = template.findAndModify(query2, update, new FindAndModifyOptions().returnNew(true).upsert(true), Person.class)
.block();
assertThat(p.getFirstName(), is("Mary"));
@ -481,7 +490,7 @@ public class ReactiveMongoTemplateTests { @@ -481,7 +490,7 @@ public class ReactiveMongoTemplateTests {
@Test(expected = IllegalArgumentException.class) // DATAMONGO-1774
public void removeWithNullShouldThrowError() {
template.remove((Object)null).subscribe();
template.remove((Object) null).subscribe();
}
@Test // DATAMONGO-1774
@ -939,6 +948,182 @@ public class ReactiveMongoTemplateTests { @@ -939,6 +948,182 @@ public class ReactiveMongoTemplateTests {
.verifyComplete();
}
@Test // DATAMONGO-1803
public void changeStreamEventsShouldBeEmittedCorrectly() throws InterruptedException {
Assumptions.assumeThat(ReplicaSet.required().runsAsReplicaSet()).isTrue();
StepVerifier.create(template.createCollection(Person.class)).expectNextCount(1).verifyComplete();
BlockingQueue<ChangeStreamEvent<Document>> documents = new LinkedBlockingQueue<>(100);
Disposable disposable = template
.changeStream(Collections.emptyList(), Document.class, ChangeStreamOptions.empty(), "person")
.doOnNext(documents::add).subscribe();
Thread.sleep(500); // just give it some time to link to the collection.
Person person1 = new Person("Spring", 38);
Person person2 = new Person("Data", 39);
Person person3 = new Person("MongoDB", 37);
StepVerifier.create(template.save(person1)).expectNextCount(1).verifyComplete();
StepVerifier.create(template.save(person2)).expectNextCount(1).verifyComplete();
StepVerifier.create(template.save(person3)).expectNextCount(1).verifyComplete();
Thread.sleep(500); // just give it some time to link receive all events
try {
Assertions.assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())).hasSize(3)
.allMatch(val -> val instanceof Document);
} finally {
disposable.dispose();
}
}
@Test // DATAMONGO-1803
public void changeStreamEventsShouldBeConvertedCorrectly() throws InterruptedException {
Assumptions.assumeThat(ReplicaSet.required().runsAsReplicaSet()).isTrue();
StepVerifier.create(template.createCollection(Person.class)).expectNextCount(1).verifyComplete();
BlockingQueue<ChangeStreamEvent<Person>> documents = new LinkedBlockingQueue<>(100);
Disposable disposable = template
.changeStream(Collections.emptyList(), Person.class, ChangeStreamOptions.empty(), "person")
.doOnNext(documents::add).subscribe();
Thread.sleep(500); // just give it some time to link to the collection.
Person person1 = new Person("Spring", 38);
Person person2 = new Person("Data", 39);
Person person3 = new Person("MongoDB", 37);
StepVerifier.create(template.save(person1)).expectNextCount(1).verifyComplete();
StepVerifier.create(template.save(person2)).expectNextCount(1).verifyComplete();
StepVerifier.create(template.save(person3)).expectNextCount(1).verifyComplete();
Thread.sleep(500); // just give it some time to link receive all events
try {
Assertions.assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList()))
.containsExactly(person1, person2, person3);
} finally {
disposable.dispose();
}
}
@Test // DATAMONGO-1803
public void changeStreamEventsShouldBeFilteredCorrectly() throws InterruptedException {
Assumptions.assumeThat(ReplicaSet.required().runsAsReplicaSet()).isTrue();
StepVerifier.create(template.createCollection(Person.class)).expectNextCount(1).verifyComplete();
BlockingQueue<ChangeStreamEvent<Person>> documents = new LinkedBlockingQueue<>(100);
Disposable disposable = template.changeStream(newAggregation(Person.class, match(where("age").gte(38))),
Person.class, ChangeStreamOptions.empty(), "person").doOnNext(documents::add).subscribe();
Thread.sleep(500); // just give it some time to link to the collection.
Person person1 = new Person("Spring", 38);
Person person2 = new Person("Data", 37);
Person person3 = new Person("MongoDB", 39);
StepVerifier.create(template.save(person1)).expectNextCount(1).verifyComplete();
StepVerifier.create(template.save(person2)).expectNextCount(1).verifyComplete();
StepVerifier.create(template.save(person3)).expectNextCount(1).verifyComplete();
Thread.sleep(500); // just give it some time to link receive all events
try {
Assertions.assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList()))
.containsExactly(person1, person3);
} finally {
disposable.dispose();
}
}
@Test // DATAMONGO-1803
public void mapsReservedWordsCorrectly() throws InterruptedException {
Assumptions.assumeThat(ReplicaSet.required().runsAsReplicaSet()).isTrue();
StepVerifier.create(template.createCollection(Person.class)).expectNextCount(1).verifyComplete();
BlockingQueue<ChangeStreamEvent<Person>> documents = new LinkedBlockingQueue<>(100);
Disposable disposable = template
.changeStream(newAggregation(Person.class, match(where("operationType").is("replace"))), Person.class,
ChangeStreamOptions.empty(), "person")
.doOnNext(documents::add).subscribe();
Thread.sleep(500); // just give it some time to link to the collection.
Person person1 = new Person("Spring", 38);
Person person2 = new Person("Data", 37);
StepVerifier.create(template.save(person1)).expectNextCount(1).verifyComplete();
StepVerifier.create(template.save(person2)).expectNextCount(1).verifyComplete();
Person replacement = new Person(person2.getId(), "BDognoM");
replacement.setAge(person2.getAge());
StepVerifier.create(template.save(replacement)).expectNextCount(1).verifyComplete();
Thread.sleep(500); // just give it some time to link receive all events
try {
Assertions.assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList()))
.containsExactly(replacement);
} finally {
disposable.dispose();
}
}
@Test // DATAMONGO-1803
public void changeStreamEventsShouldBeResumedCorrectly() throws InterruptedException {
Assumptions.assumeThat(ReplicaSet.required().runsAsReplicaSet()).isTrue();
StepVerifier.create(template.createCollection(Person.class)).expectNextCount(1).verifyComplete();
BlockingQueue<ChangeStreamEvent<Person>> documents = new LinkedBlockingQueue<>(100);
Disposable disposable = template
.changeStream(Collections.emptyList(), Person.class, ChangeStreamOptions.empty(), "person")
.doOnNext(documents::add).subscribe();
Thread.sleep(500); // just give it some time to link to the collection.
Person person1 = new Person("Spring", 38);
Person person2 = new Person("Data", 37);
Person person3 = new Person("MongoDB", 39);
StepVerifier.create(template.save(person1)).expectNextCount(1).verifyComplete();
StepVerifier.create(template.save(person2)).expectNextCount(1).verifyComplete();
StepVerifier.create(template.save(person3)).expectNextCount(1).verifyComplete();
Thread.sleep(500); // just give it some time to link receive all events
disposable.dispose();
BsonDocument resumeToken = documents.take().getRaw().getResumeToken();
BlockingQueue<ChangeStreamEvent<Person>> resumeDocuments = new LinkedBlockingQueue<>(100);
template
.changeStream(Collections.emptyList(), Person.class,
ChangeStreamOptions.builder().resumeToken(resumeToken).build(), "person")
.doOnNext(resumeDocuments::add).subscribe();
Thread.sleep(500); // just give it some time to link receive all events
try {
Assertions.assertThat(resumeDocuments.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList()))
.containsExactly(person2, person3);
} finally {
disposable.dispose();
}
}
private PersonWithAList createPersonWithAList(String firstname, int age) {
PersonWithAList p = new PersonWithAList();

13
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java

@ -25,6 +25,7 @@ import lombok.Data; @@ -25,6 +25,7 @@ import lombok.Data;
import reactor.core.publisher.Mono;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@ -144,7 +145,7 @@ public class ReactiveMongoTemplateUnitTests { @@ -144,7 +145,7 @@ public class ReactiveMongoTemplateUnitTests {
@Test // DATAMONGO-1518
public void findAndModfiyShoudUseCollationWhenPresent() {
when(collection.findOneAndUpdate(any(), any(), any())).thenReturn(Mono.empty());
when(collection.findOneAndUpdate(any(Bson.class), any(), any())).thenReturn(Mono.empty());
template.findAndModify(new BasicQuery("{}").collation(Collation.of("fr")), new Update(), AutogenerateableId.class)
.subscribe();
@ -158,7 +159,7 @@ public class ReactiveMongoTemplateUnitTests { @@ -158,7 +159,7 @@ public class ReactiveMongoTemplateUnitTests {
@Test // DATAMONGO-1518
public void findAndRemoveShouldUseCollationWhenPresent() {
when(collection.findOneAndDelete(any(), any())).thenReturn(Mono.empty());
when(collection.findOneAndDelete(any(Bson.class), any())).thenReturn(Mono.empty());
template.findAndRemove(new BasicQuery("{}").collation(Collation.of("fr")), AutogenerateableId.class).subscribe();
@ -185,7 +186,7 @@ public class ReactiveMongoTemplateUnitTests { @@ -185,7 +186,7 @@ public class ReactiveMongoTemplateUnitTests {
@Test // DATAMONGO-1518
public void updateOneShouldUseCollationWhenPresent() {
when(collection.updateOne(any(), any(), any())).thenReturn(Mono.empty());
when(collection.updateOne(any(Bson.class), any(), any())).thenReturn(Mono.empty());
template.updateFirst(new BasicQuery("{}").collation(Collation.of("fr")), new Update().set("foo", "bar"),
AutogenerateableId.class).subscribe();
@ -199,7 +200,7 @@ public class ReactiveMongoTemplateUnitTests { @@ -199,7 +200,7 @@ public class ReactiveMongoTemplateUnitTests {
@Test // DATAMONGO-1518
public void updateManyShouldUseCollationWhenPresent() {
when(collection.updateMany(any(), any(), any())).thenReturn(Mono.empty());
when(collection.updateMany(any(Bson.class), any(), any())).thenReturn(Mono.empty());
template.updateMulti(new BasicQuery("{}").collation(Collation.of("fr")), new Update().set("foo", "bar"),
AutogenerateableId.class).subscribe();
@ -214,13 +215,13 @@ public class ReactiveMongoTemplateUnitTests { @@ -214,13 +215,13 @@ public class ReactiveMongoTemplateUnitTests {
@Test // DATAMONGO-1518
public void replaceOneShouldUseCollationWhenPresent() {
when(collection.replaceOne(any(), any(), any())).thenReturn(Mono.empty());
when(collection.replaceOne(any(Bson.class), any(), any())).thenReturn(Mono.empty());
template.updateFirst(new BasicQuery("{}").collation(Collation.of("fr")), new Update(), AutogenerateableId.class)
.subscribe();
ArgumentCaptor<UpdateOptions> options = ArgumentCaptor.forClass(UpdateOptions.class);
verify(collection).replaceOne(Mockito.any(), Mockito.any(), options.capture());
verify(collection).replaceOne(Mockito.any(Bson.class), Mockito.any(), options.capture());
assertThat(options.getValue().getCollation().getLocale(), is("fr"));
}

395
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTests.java

@ -0,0 +1,395 @@ @@ -0,0 +1,395 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.messaging;
import static org.assertj.core.api.Assertions.*;
import static org.springframework.data.mongodb.core.aggregation.Aggregation.*;
import static org.springframework.data.mongodb.core.messaging.SubscriptionUtils.*;
import static org.springframework.data.mongodb.core.query.Criteria.*;
import static org.springframework.data.mongodb.core.query.Query.*;
import lombok.Data;
import java.util.List;
import java.util.stream.Collectors;
import org.bson.BsonDocument;
import org.bson.Document;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.ChangeStreamOptions;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.mapping.Field;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest.ChangeStreamRequestOptions;
import org.springframework.data.mongodb.core.messaging.Message.MessageProperties;
import org.springframework.data.mongodb.core.messaging.SubscriptionUtils.*;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.data.mongodb.test.util.ReplicaSet;
import com.mongodb.MongoClient;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
/**
* Integration test for subscribing to a {@link com.mongodb.operation.ChangeStreamBatchCursor} inside the
* {@link DefaultMessageListenerContainer} using {@link ChangeStreamRequest}.
*
* @author Christoph Strobl
*/
public class ChangeStreamTests {
public static @ClassRule TestRule replSet = ReplicaSet.required();
MongoTemplate template;
MessageListenerContainer container;
User jellyBelly;
User huffyFluffy;
User sugarSplashy;
@Before
public void setUp() {
template = new MongoTemplate(new MongoClient(), "change-stream-tests");
template.dropCollection(User.class);
container = new DefaultMessageListenerContainer(template);
container.start();
jellyBelly = new User();
jellyBelly.id = "id-1";
jellyBelly.userName = "jellyBelly";
jellyBelly.age = 7;
huffyFluffy = new User();
huffyFluffy.id = "id-2";
huffyFluffy.userName = "huffyFluffy";
huffyFluffy.age = 7;
sugarSplashy = new User();
sugarSplashy.id = "id-3";
sugarSplashy.userName = "sugarSplashy";
sugarSplashy.age = 5;
}
@After
public void tearDown() {
container.stop();
}
@Test // DATAMONGO-1803
public void readsPlainDocumentMessageCorrectly() throws InterruptedException {
CollectingMessageListener<ChangeStreamDocument<Document>, Document> messageListener = new CollectingMessageListener<>();
ChangeStreamRequest<Document> request = new ChangeStreamRequest<>(messageListener, () -> "user");
Subscription subscription = container.register(request, Document.class);
awaitSubscription(subscription);
template.save(jellyBelly);
awaitMessages(messageListener, 1);
Message<ChangeStreamDocument<Document>, Document> message1 = messageListener.getFirstMessage();
assertThat(message1.getRaw()).isNotNull();
assertThat(message1.getProperties())
.isEqualTo(MessageProperties.builder().collectionName("user").databaseName("change-stream-tests").build());
assertThat(message1.getBody()).isEqualTo(new Document("_id", "id-1").append("user_name", "jellyBelly")
.append("age", 7).append("_class", User.class.getName()));
}
@Test // DATAMONGO-1803
public void useSimpleAggregationToFilterMessages() throws InterruptedException {
CollectingMessageListener<ChangeStreamDocument<Document>, User> messageListener = new CollectingMessageListener<>();
ChangeStreamRequest<User> request = ChangeStreamRequest.builder() //
.collection("user") //
.publishTo(messageListener) //
.filter(newAggregation(match(where("age").is(7)))) //
.build();
Subscription subscription = container.register(request, User.class);
awaitSubscription(subscription);
template.save(jellyBelly);
template.save(sugarSplashy);
template.save(huffyFluffy);
awaitMessages(messageListener);
List<User> messageBodies = messageListener.getMessages().stream().map(Message::getBody)
.collect(Collectors.toList());
assertThat(messageBodies).hasSize(2).doesNotContain(sugarSplashy);
}
@Test // DATAMONGO-1803
public void useAggregationToFilterMessages() throws InterruptedException {
CollectingMessageListener<ChangeStreamDocument<Document>, User> messageListener = new CollectingMessageListener<>();
ChangeStreamRequest<User> request = ChangeStreamRequest.builder() //
.collection("user") //
.publishTo(messageListener) //
.filter(newAggregation(match(
new Criteria().orOperator(where("user_name").is("huffyFluffy"), where("user_name").is("jellyBelly"))))) //
.build();
Subscription subscription = container.register(request, User.class);
awaitSubscription(subscription);
template.save(jellyBelly);
template.save(sugarSplashy);
template.save(huffyFluffy);
awaitMessages(messageListener);
List<User> messageBodies = messageListener.getMessages().stream().map(Message::getBody)
.collect(Collectors.toList());
assertThat(messageBodies).hasSize(2).doesNotContain(sugarSplashy);
}
@Test // DATAMONGO-1803
public void mapsTypedAggregationToFilterMessages() throws InterruptedException {
CollectingMessageListener<ChangeStreamDocument<Document>, User> messageListener = new CollectingMessageListener<>();
ChangeStreamRequest<User> request = ChangeStreamRequest.builder() //
.collection("user") //
.publishTo(messageListener) //
.filter(newAggregation(User.class,
match(new Criteria().orOperator(where("userName").is("huffyFluffy"), where("userName").is("jellyBelly"))))) //
.build();
Subscription subscription = container.register(request, User.class);
awaitSubscription(subscription);
template.save(jellyBelly);
template.save(sugarSplashy);
template.save(huffyFluffy);
awaitMessages(messageListener);
List<User> messageBodies = messageListener.getMessages().stream().map(Message::getBody)
.collect(Collectors.toList());
assertThat(messageBodies).hasSize(2).doesNotContain(sugarSplashy);
}
@Test // DATAMONGO-1803
public void mapsReservedWordsCorrectly() throws InterruptedException {
CollectingMessageListener<ChangeStreamDocument<Document>, User> messageListener = new CollectingMessageListener<>();
ChangeStreamRequest<User> request = ChangeStreamRequest.builder() //
.collection("user") //
.publishTo(messageListener) //
.filter(newAggregation(User.class, match(where("operationType").is("replace")))) //
.build();
Subscription subscription = container.register(request, User.class);
awaitSubscription(subscription);
template.save(jellyBelly);
template.save(sugarSplashy);
User replacement = new User();
replacement.id = jellyBelly.id;
replacement.userName = new StringBuilder(jellyBelly.userName).reverse().toString();
replacement.age = jellyBelly.age;
template.save(replacement);
awaitMessages(messageListener);
List<User> messageBodies = messageListener.getMessages().stream().map(Message::getBody)
.collect(Collectors.toList());
assertThat(messageBodies).hasSize(1).containsExactly(replacement);
}
@Test // DATAMONGO-1803
public void plainAggregationPipelineToFilterMessages() throws InterruptedException {
CollectingMessageListener<ChangeStreamDocument<Document>, User> messageListener = new CollectingMessageListener<>();
ChangeStreamRequest<User> request = ChangeStreamRequest.builder() //
.collection("user") //
.publishTo(messageListener) //
.filter(new Document("$match", new Document("fullDocument.user_name", "sugarSplashy"))) //
.build();
Subscription subscription = container.register(request, User.class);
awaitSubscription(subscription);
template.save(jellyBelly);
template.save(sugarSplashy);
template.save(huffyFluffy);
awaitMessages(messageListener);
List<User> messageBodies = messageListener.getMessages().stream().map(Message::getBody)
.collect(Collectors.toList());
assertThat(messageBodies).hasSize(1).containsExactly(sugarSplashy);
}
@Test // DATAMONGO-1803
public void resumesCorrectly() throws InterruptedException {
CollectingMessageListener<ChangeStreamDocument<Document>, User> messageListener1 = new CollectingMessageListener<>();
Subscription subscription1 = container.register(new ChangeStreamRequest<>(messageListener1, () -> "user"),
User.class);
awaitSubscription(subscription1);
template.save(jellyBelly);
template.save(sugarSplashy);
template.save(huffyFluffy);
awaitMessages(messageListener1, 3);
BsonDocument resumeToken = messageListener1.getFirstMessage().getRaw().getResumeToken();
CollectingMessageListener<ChangeStreamDocument<Document>, User> messageListener2 = new CollectingMessageListener<>();
ChangeStreamRequest<User> subSequentRequest = ChangeStreamRequest.builder().collection("user")
.publishTo(messageListener2).resumeToken(resumeToken).build();
Subscription subscription2 = container.register(subSequentRequest, User.class);
awaitSubscription(subscription2);
awaitMessages(messageListener2);
List<User> messageBodies = messageListener2.getMessages().stream().map(Message::getBody)
.collect(Collectors.toList());
assertThat(messageBodies).hasSize(2).doesNotContain(jellyBelly);
}
@Test // DATAMONGO-1803
public void readsAndConvertsMessageBodyCorrectly() throws InterruptedException {
CollectingMessageListener<ChangeStreamDocument<Document>, User> messageListener = new CollectingMessageListener<>();
ChangeStreamRequest<User> request = new ChangeStreamRequest<>(messageListener, () -> "user");
Subscription subscription = container.register(request, User.class);
awaitSubscription(subscription);
template.save(jellyBelly);
awaitMessages(messageListener, 1);
Message<ChangeStreamDocument<Document>, User> message1 = messageListener.getFirstMessage();
assertThat(message1.getRaw()).isNotNull();
assertThat(message1.getProperties())
.isEqualTo(MessageProperties.builder().collectionName("user").databaseName("change-stream-tests").build());
assertThat(message1.getBody()).isEqualTo(jellyBelly);
}
@Test // DATAMONGO-1803
public void readsAndConvertsUpdateMessageBodyCorrectly() throws InterruptedException {
CollectingMessageListener<ChangeStreamDocument<Document>, User> messageListener = new CollectingMessageListener<>();
ChangeStreamRequest<User> request = new ChangeStreamRequest<>(messageListener, () -> "user");
Subscription subscription = container.register(request, User.class);
awaitSubscription(subscription);
template.save(jellyBelly);
template.update(User.class).matching(query(where("id").is(jellyBelly.id))).apply(Update.update("age", 8)).first();
awaitMessages(messageListener, 2);
assertThat(messageListener.getFirstMessage().getBody()).isEqualTo(jellyBelly);
assertThat(messageListener.getLastMessage().getBody()).isNotNull().hasFieldOrPropertyWithValue("age", 8);
}
@Test // DATAMONGO-1803
public void readsOnlyDiffForUpdateWhenNotMappedToDomainType() throws InterruptedException {
CollectingMessageListener<ChangeStreamDocument<Document>, Document> messageListener = new CollectingMessageListener<>();
ChangeStreamRequest<Document> request = new ChangeStreamRequest<>(messageListener, () -> "user");
Subscription subscription = container.register(request, Document.class);
awaitSubscription(subscription);
template.save(jellyBelly);
template.update(User.class).matching(query(where("id").is(jellyBelly.id))).apply(Update.update("age", 8)).first();
awaitMessages(messageListener, 2);
assertThat(messageListener.getFirstMessage().getBody()).isEqualTo(new Document("_id", "id-1")
.append("user_name", "jellyBelly").append("age", 7).append("_class", User.class.getName()));
assertThat(messageListener.getLastMessage().getBody()).isNull();
}
@Test // DATAMONGO-1803
public void readsOnlyDiffForUpdateWhenOptionsDeclareDefaultExplicitly() throws InterruptedException {
CollectingMessageListener<ChangeStreamDocument<Document>, User> messageListener = new CollectingMessageListener<>();
ChangeStreamRequest<User> request = new ChangeStreamRequest<>(messageListener, new ChangeStreamRequestOptions(
"user", ChangeStreamOptions.builder().fullDocumentLookup(FullDocument.DEFAULT).build()));
Subscription subscription = container.register(request, User.class);
awaitSubscription(subscription);
template.save(jellyBelly);
template.update(User.class).matching(query(where("id").is(jellyBelly.id))).apply(Update.update("age", 8)).first();
awaitMessages(messageListener, 2);
assertThat(messageListener.getFirstMessage().getBody()).isEqualTo(jellyBelly);
assertThat(messageListener.getLastMessage().getBody()).isNull();
}
@Test // DATAMONGO-1803
public void readsFullDocumentForUpdateWhenNotMappedToDomainTypeButLookupSpecified() throws InterruptedException {
CollectingMessageListener<ChangeStreamDocument<Document>, Document> messageListener = new CollectingMessageListener<>();
ChangeStreamRequest<Document> request = new ChangeStreamRequest<>(messageListener,
new ChangeStreamRequestOptions("user", ChangeStreamOptions.builder().returnFullDocumentOnUpdate().build()));
Subscription subscription = container.register(request, Document.class);
awaitSubscription(subscription);
template.save(jellyBelly);
template.update(User.class).matching(query(where("id").is(jellyBelly.id))).apply(Update.update("age", 8)).first();
awaitMessages(messageListener, 2);
assertThat(messageListener.getFirstMessage().getBody()).isEqualTo(new Document("_id", "id-1")
.append("user_name", "jellyBelly").append("age", 7).append("_class", User.class.getName()));
assertThat(messageListener.getLastMessage().getBody()).isEqualTo(new Document("_id", "id-1")
.append("user_name", "jellyBelly").append("age", 8).append("_class", User.class.getName()));
}
@Data
static class User {
@Id String id;
@Field("user_name") String userName;
int age;
}
}

227
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/CursorReadingTaskUnitTests.java

@ -0,0 +1,227 @@ @@ -0,0 +1,227 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.messaging;
import static edu.umd.cs.mtc.TestFramework.*;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.*;
import edu.umd.cs.mtc.MultithreadedTestCase;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions;
import org.springframework.data.mongodb.core.messaging.Task.State;
import org.springframework.data.mongodb.core.messaging.TaskFactory.CursorReadingTask;
import org.springframework.util.ErrorHandler;
import com.mongodb.ServerAddress;
import com.mongodb.ServerCursor;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
/**
* Unit test for mainly lifecycle issues of {@link CursorReadingTask}.
*
* @author Christoph Strobl
*/
@RunWith(MockitoJUnitRunner.class)
public class CursorReadingTaskUnitTests {
@Mock MongoDatabase db;
@Mock MongoCursor cursor;
@Mock SubscriptionRequest request;
@Mock MessageListener listener;
@Mock RequestOptions options;
@Mock MongoTemplate template;
@Mock ErrorHandler errorHandler;
ValueCapturingTaskStub task;
@Before
public void setUp() {
when(request.getRequestOptions()).thenReturn(options);
when(request.getMessageListener()).thenReturn(listener);
when(options.getCollectionName()).thenReturn("collection-name");
when(template.getDb()).thenReturn(db);
when(db.getName()).thenReturn("mock-db");
task = new ValueCapturingTaskStub(template, request, Object.class, cursor, errorHandler);
}
@Test // DATAMONGO-1803
public void stopTaskWhileStarting() throws Throwable {
runOnce(new MultithreadedStopDuringStartupInitialization(task, cursor));
}
@Test // DATAMONGO-1803
public void stopRunningTask() throws Throwable {
when(cursor.getServerCursor()).thenReturn(new ServerCursor(10, new ServerAddress("mock")));
runOnce(new MultithreadedStopRunning(task, cursor));
}
@Test // DATAMONGO-1803
public void stopTaskWhileEmittingMessages() throws Throwable {
when(cursor.getServerCursor()).thenReturn(new ServerCursor(10, new ServerAddress("mock")));
when(cursor.tryNext()).thenReturn("hooyah");
runOnce(new MultithreadedStopRunningWhileEmittingMessages(task, cursor));
verify(listener, times(task.getValues().size())).onMessage(any());
}
private static class MultithreadedStopRunningWhileEmittingMessages extends MultithreadedTestCase {
CursorReadingTask task;
MongoCursor cursor;
public MultithreadedStopRunningWhileEmittingMessages(CursorReadingTask task, MongoCursor cursor) {
this.task = task;
this.cursor = cursor;
}
public void thread1() {
assertTick(0);
assertThat(task.getState()).isEqualTo(State.CREATED);
task.run();
waitForTick(1);
assertThat(task.isActive()).isFalse();
assertThat(task.getState()).isEqualTo(State.CANCELLED);
verify(cursor).close();
}
public void thread2() throws InterruptedException {
while (!task.isActive()) {
Thread.sleep(20);
}
verify(cursor, never()).close();
task.cancel();
}
}
private static class MultithreadedStopRunning extends MultithreadedTestCase {
CursorReadingTask task;
MongoCursor cursor;
public MultithreadedStopRunning(CursorReadingTask task, MongoCursor cursor) {
this.task = task;
this.cursor = cursor;
}
public void thread1() {
assertTick(0);
assertThat(task.getState()).isEqualTo(State.CREATED);
task.run();
waitForTick(2);
assertThat(task.isActive()).isFalse();
assertThat(task.getState()).isEqualTo(State.CANCELLED);
verify(cursor).close();
}
public void thread2() throws InterruptedException {
waitForTick(1);
assertThat(task.isActive()).isTrue();
assertThat(task.getState()).isEqualTo(State.RUNNING);
verify(cursor, never()).close();
task.cancel();
}
}
private static class MultithreadedStopDuringStartupInitialization extends MultithreadedTestCase {
CursorReadingTask task;
MongoCursor cursor;
public MultithreadedStopDuringStartupInitialization(CursorReadingTask task, MongoCursor cursor) {
this.task = task;
this.cursor = cursor;
}
public void thread1() {
assertTick(0);
task.run();
waitForTick(2);
assertThat(task.isActive()).isFalse();
assertThat(task.getState()).isEqualTo(State.CANCELLED);
verify(cursor).close();
}
public void thread2() throws InterruptedException {
waitForTick(1);
assertThat(task.isActive()).isFalse();
assertThat(task.getState()).isEqualTo(State.STARTING);
task.cancel();
}
}
static class ValueCapturingTaskStub extends CursorReadingTask {
final MongoCursor cursor;
final List<Object> values = new CopyOnWriteArrayList();
public ValueCapturingTaskStub(MongoTemplate template, SubscriptionRequest request, Class<?> targetType,
MongoCursor cursor, ErrorHandler errorHandler) {
super(template, request, targetType, errorHandler);
this.cursor = cursor;
}
@Override
protected MongoCursor initCursor(MongoTemplate dbFactory, RequestOptions options, Class targetType) {
return cursor;
}
@Override
protected Message createMessage(Object source, Class targetType, RequestOptions options) {
values.add(source);
return super.createMessage(source, targetType, options);
}
public List<Object> getValues() {
return values;
}
}
}

324
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainerTests.java

@ -0,0 +1,324 @@ @@ -0,0 +1,324 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.messaging;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
import static org.springframework.data.mongodb.core.messaging.SubscriptionUtils.*;
import lombok.Data;
import java.time.Duration;
import java.util.stream.Collectors;
import org.bson.Document;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.dao.DataAccessException;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.MongoDbFactory;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.SimpleMongoDbFactory;
import org.springframework.data.mongodb.test.util.ReplicaSet;
import org.springframework.test.annotation.IfProfileValue;
import org.springframework.util.ErrorHandler;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.CreateCollectionOptions;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
/**
* Integration tests for {@link DefaultMessageListenerContainer}.
*
* @author Christoph Strobl
*/
public class DefaultMessageListenerContainerTests {
public static final String DATABASE_NAME = "change-stream-events";
public static final String COLLECTION_NAME = "collection-1";
MongoDbFactory dbFactory;
MongoCollection<Document> collection;
private CollectingMessageListener<Object, Object> messageListener;
private MongoTemplate template;
public @Rule TestRule replSet = ReplicaSet.none();
@Before
public void setUp() {
dbFactory = new SimpleMongoDbFactory(new MongoClient(), DATABASE_NAME);
template = new MongoTemplate(dbFactory);
template.dropCollection(COLLECTION_NAME);
collection = template.getCollection(COLLECTION_NAME);
messageListener = new CollectingMessageListener();
}
@Test // DATAMONGO-1803
@IfProfileValue(name = "replSet", value = "true")
public void shouldCollectMappedChangeStreamMessagesCorrectly() throws InterruptedException {
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
Subscription subscription = container.register(new ChangeStreamRequest(messageListener, () -> COLLECTION_NAME),
Person.class);
container.start();
awaitSubscription(subscription, Duration.ofMillis(500));
collection.insertOne(new Document("_id", "id-1").append("firstname", "foo"));
collection.insertOne(new Document("_id", "id-2").append("firstname", "bar"));
awaitMessages(messageListener, 2, Duration.ofMillis(500));
assertThat(messageListener.getMessages().stream().map(Message::getBody).collect(Collectors.toList()))
.containsExactly(new Person("id-1", "foo"), new Person("id-2", "bar"));
}
@Test // DATAMONGO-1803
@IfProfileValue(name = "replSet", value = "true")
public void shouldNoLongerReceiveMessagesWhenConainerStopped() throws InterruptedException {
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
Subscription subscription = container.register(new ChangeStreamRequest(messageListener, () -> COLLECTION_NAME),
Document.class);
container.start();
awaitSubscription(subscription, Duration.ofMillis(500));
collection.insertOne(new Document("_id", "id-1").append("value", "foo"));
collection.insertOne(new Document("_id", "id-2").append("value", "bar"));
awaitMessages(messageListener, 2, Duration.ofMillis(500));
container.stop();
collection.insertOne(new Document("_id", "id-3").append("value", "bar"));
Thread.sleep(200);
assertThat(messageListener.getTotalNumberMessagesReceived()).isEqualTo(2);
}
@Test // DATAMONGO-1803
@IfProfileValue(name = "replSet", value = "true")
public void shouldReceiveMessagesWhenAddingRequestToAlreadyStartedContainer() throws InterruptedException {
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start();
Document unexpected = new Document("_id", "id-1").append("value", "foo");
collection.insertOne(unexpected);
Subscription subscription = container.register(new ChangeStreamRequest(messageListener, () -> COLLECTION_NAME),
Document.class);
awaitSubscription(subscription, Duration.ofMillis(500));
Document expected = new Document("_id", "id-2").append("value", "bar");
collection.insertOne(expected);
awaitMessages(messageListener, 1, Duration.ofMillis(500));
container.stop();
assertThat(messageListener.getMessages().stream().map(Message::getBody).collect(Collectors.toList()))
.containsExactly(expected);
}
@Test // DATAMONGO-1803
@IfProfileValue(name = "replSet", value = "true")
public void shouldStartReceivingMessagesWhenContainerStarts() throws InterruptedException {
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
Subscription subscription = container.register(new ChangeStreamRequest(messageListener, () -> COLLECTION_NAME),
Document.class);
collection.insertOne(new Document("_id", "id-1").append("value", "foo"));
Thread.sleep(200);
container.start();
awaitSubscription(subscription);
Document expected = new Document("_id", "id-2").append("value", "bar");
collection.insertOne(expected);
awaitMessages(messageListener);
container.stop();
assertThat(messageListener.getMessages().stream().map(Message::getBody).collect(Collectors.toList()))
.containsExactly(expected);
}
@Test // DATAMONGO-1803
public void tailableCursor() throws InterruptedException {
dbFactory.getDb().createCollection(COLLECTION_NAME,
new CreateCollectionOptions().capped(true).maxDocuments(10000).sizeInBytes(10000));
collection.insertOne(new Document("_id", "id-1").append("value", "foo"));
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start();
awaitSubscription(
container.register(new TailableCursorRequest(messageListener, () -> COLLECTION_NAME), Document.class),
Duration.ofMillis(500));
collection.insertOne(new Document("_id", "id-2").append("value", "bar"));
awaitMessages(messageListener, 2, Duration.ofSeconds(2));
container.stop();
assertThat(messageListener.getTotalNumberMessagesReceived()).isEqualTo(2);
}
@Test // DATAMONGO-1803
public void tailableCursorOnEmptyCollection() throws InterruptedException {
dbFactory.getDb().createCollection(COLLECTION_NAME,
new CreateCollectionOptions().capped(true).maxDocuments(10000).sizeInBytes(10000));
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start();
awaitSubscription(
container.register(new TailableCursorRequest(messageListener, () -> COLLECTION_NAME), Document.class),
Duration.ofMillis(500));
collection.insertOne(new Document("_id", "id-1").append("value", "foo"));
collection.insertOne(new Document("_id", "id-2").append("value", "bar"));
awaitMessages(messageListener, 2, Duration.ofSeconds(2));
container.stop();
assertThat(messageListener.getTotalNumberMessagesReceived()).isEqualTo(2);
}
@Test // DATAMONGO-1803
public void abortsSubscriptionOnError() throws InterruptedException {
dbFactory.getDb().createCollection(COLLECTION_NAME,
new CreateCollectionOptions().capped(true).maxDocuments(10000).sizeInBytes(10000));
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start();
collection.insertOne(new Document("_id", "id-1").append("value", "foo"));
Subscription subscription = container.register(new TailableCursorRequest(messageListener, () -> COLLECTION_NAME),
Document.class);
awaitSubscription(subscription);
assertThat(subscription.isActive()).isTrue();
collection.insertOne(new Document("_id", "id-2").append("value", "bar"));
collection.drop();
awaitMessages(messageListener);
assertThat(subscription.isActive()).isFalse();
container.stop();
}
@Test // DATAMONGO-1803
public void callsDefaultErrorHandlerOnError() throws InterruptedException {
dbFactory.getDb().createCollection(COLLECTION_NAME,
new CreateCollectionOptions().capped(true).maxDocuments(10000).sizeInBytes(10000));
collection.insertOne(new Document("_id", "id-1").append("value", "foo"));
ErrorHandler errorHandler = mock(ErrorHandler.class);
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer(template,
new SimpleAsyncTaskExecutor(), errorHandler);
try {
container.start();
Subscription subscription = container.register(new TailableCursorRequest(messageListener, () -> COLLECTION_NAME),
Document.class);
SubscriptionUtils.awaitSubscription(subscription);
template.dropCollection(COLLECTION_NAME);
Thread.sleep(20);
verify(errorHandler, atLeast(1)).handleError(any(DataAccessException.class));
} finally {
container.stop();
}
}
@Test // DATAMONGO-1803
@IfProfileValue(name = "replSet", value = "true")
public void runsMoreThanOneTaskAtOnce() throws InterruptedException {
dbFactory.getDb().createCollection(COLLECTION_NAME,
new CreateCollectionOptions().capped(true).maxDocuments(10000).sizeInBytes(10000));
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start();
CollectingMessageListener<Document, Document> tailableListener = new CollectingMessageListener<>();
Subscription tailableSubscription = container
.register(new TailableCursorRequest(tailableListener, () -> COLLECTION_NAME), Document.class);
CollectingMessageListener<ChangeStreamDocument<Document>, Document> changeStreamListener = new CollectingMessageListener<>();
Subscription changeStreamSubscription = container
.register(new ChangeStreamRequest(changeStreamListener, () -> COLLECTION_NAME), Document.class);
awaitSubscriptions(tailableSubscription, changeStreamSubscription);
collection.insertOne(new Document("_id", "id-1").append("value", "foo"));
awaitMessages(tailableListener);
awaitMessages(changeStreamListener);
assertThat(tailableListener.getTotalNumberMessagesReceived()).isEqualTo(1);
assertThat(tailableListener.getFirstMessage().getRaw()).isInstanceOf(Document.class);
assertThat(changeStreamListener.getTotalNumberMessagesReceived()).isEqualTo(1);
assertThat(changeStreamListener.getFirstMessage().getRaw()).isInstanceOf(ChangeStreamDocument.class);
}
@Data
static class Person {
@Id String id;
private String firstname;
private String lastname;
public Person() {}
public Person(String id, String firstname) {
this.id = id;
this.firstname = firstname;
}
}
}

285
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainerUnitTests.java

@ -0,0 +1,285 @@ @@ -0,0 +1,285 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.messaging;
import static edu.umd.cs.mtc.TestFramework.*;
import static org.assertj.core.api.Assertions.*;
import edu.umd.cs.mtc.MultithreadedTestCase;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.util.ErrorHandler;
/**
* Unit tests for {@link DefaultMessageListenerContainer}.
*
* @author Christoph Strobl
*/
@RunWith(MockitoJUnitRunner.class)
public class DefaultMessageListenerContainerUnitTests {
@Mock MongoTemplate template;
@Mock ErrorHandler errorHandler;
DefaultMessageListenerContainer container;
@Before
public void setUp() {
container = new DefaultMessageListenerContainer(template);
}
@Test(expected = IllegalArgumentException.class) // DATAMONGO-1803
public void throwsErrorOnNullTemplate() {
new DefaultMessageListenerContainer(null);
}
@Test // DATAMONGO-1803
public void startStopContainer() throws Throwable {
runOnce(new MultithreadedStartStopContainer(container));
}
@Test // DATAMONGO-1803
public void subscribeToContainerBeforeStartup() throws Throwable {
runOnce(new MultithreadedSubscribeBeforeStartup(container));
}
@Test // DATAMONGO-1803
public void subscribeToContainerAfterStartup() throws Throwable {
runOnce(new MultithreadedSubscribeAfterStartup(container));
}
@Test // DATAMONGO-1803
public void stopSubscriptionWhileRunning() throws Throwable {
runOnce(new StopSubscriptionWhileRunning(container));
}
@Test // DATAMONGO-1803
public void removeSubscriptionWhileRunning() throws Throwable {
runOnce(new RemoveSubscriptionWhileRunning(container));
}
private static class RemoveSubscriptionWhileRunning extends MultithreadedTestCase {
DefaultMessageListenerContainer container;
Subscription subscription;
public RemoveSubscriptionWhileRunning(DefaultMessageListenerContainer container) {
this.container = container;
subscription = container.register(new MockSubscriptionRequest(), new MockTask());
}
public void thread1() {
assertTick(0);
container.start();
waitForTick(2);
assertThat(container.isRunning());
container.stop();
}
public void thread2() throws InterruptedException {
waitForTick(1);
assertThat(subscription.isActive()).isTrue();
container.remove(subscription);
assertThat(subscription.isActive()).isFalse();
}
}
private static class StopSubscriptionWhileRunning extends MultithreadedTestCase {
DefaultMessageListenerContainer container;
Subscription subscription;
public StopSubscriptionWhileRunning(DefaultMessageListenerContainer container) {
this.container = container;
subscription = container.register(new MockSubscriptionRequest(), new MockTask());
}
public void thread1() {
assertTick(0);
container.start();
waitForTick(2);
assertThat(container.isRunning());
container.stop();
}
public void thread2() throws InterruptedException {
waitForTick(1);
assertThat(subscription.isActive()).isTrue();
subscription.cancel();
assertThat(subscription.isActive()).isFalse();
}
}
private static class MultithreadedSubscribeAfterStartup extends MultithreadedTestCase {
DefaultMessageListenerContainer container;
public MultithreadedSubscribeAfterStartup(DefaultMessageListenerContainer container) {
this.container = container;
}
public void thread1() {
assertTick(0);
container.start();
waitForTick(2);
container.stop();
}
public void thread2() throws InterruptedException {
waitForTick(1);
Subscription subscription = container.register(new MockSubscriptionRequest(), new MockTask());
Thread.sleep(10);
assertThat(subscription.isActive()).isTrue();
waitForTick(3);
assertThat(subscription.isActive()).isFalse();
}
}
private static class MultithreadedSubscribeBeforeStartup extends MultithreadedTestCase {
DefaultMessageListenerContainer container;
public MultithreadedSubscribeBeforeStartup(DefaultMessageListenerContainer container) {
this.container = container;
}
public void thread1() {
assertTick(0);
Subscription subscription = container.register(new MockSubscriptionRequest(), new MockTask());
assertThat(subscription.isActive()).isFalse();
waitForTick(2);
assertThat(subscription.isActive()).isTrue();
waitForTick(4);
assertThat(subscription.isActive()).isFalse();
}
public void thread2() {
waitForTick(1);
container.start();
waitForTick(3);
container.stop();
}
}
private static class MultithreadedStartStopContainer extends MultithreadedTestCase {
DefaultMessageListenerContainer container;
public MultithreadedStartStopContainer(DefaultMessageListenerContainer container) {
this.container = container;
}
public void thread1() {
assertTick(0);
container.start();
waitForTick(2);
assertThat(container.isRunning()).isFalse();
}
public void thread2() {
waitForTick(1);
assertThat(container.isRunning()).isTrue();
container.stop();
}
}
static class MockTask implements Task {
volatile State state;
volatile RuntimeException error;
@Override
public void cancel() throws DataAccessResourceFailureException {
state = State.CANCELLED;
}
@Override
public boolean isLongLived() {
return true;
}
@Override
public State getState() {
return state;
}
@Override
public void run() {
state = State.RUNNING;
while (isActive()) {
if (error != null) {
throw error;
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.interrupted();
}
}
}
void emitError(RuntimeException error) {
this.error = error;
}
}
static class MockSubscriptionRequest implements SubscriptionRequest {
@Override
public MessageListener getMessageListener() {
return message -> {};
}
@Override
public RequestOptions getRequestOptions() {
return () -> "foo";
}
}
}

165
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/SubscriptionUtils.java

@ -0,0 +1,165 @@ @@ -0,0 +1,165 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.messaging;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
/**
* Utilities for testing long running asnyc message retrieval.
*
* @author Christoph Strobl
*/
class SubscriptionUtils {
static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(1);
/**
* Wait for {@link Subscription#isActive() to become active} but not longer than {@link #DEFAULT_TIMEOUT}.
*
* @param subscription
* @throws InterruptedException
*/
static void awaitSubscription(Subscription subscription) throws InterruptedException {
awaitSubscription(subscription, DEFAULT_TIMEOUT);
}
/**
* Wait for all {@link Subscription Subscriptions} to {@link Subscription#isActive() become active} but not longer
* than {@link #DEFAULT_TIMEOUT}.
*
* @param subscription
* @throws InterruptedException
*/
static void awaitSubscriptions(Subscription... subscriptions) throws InterruptedException {
awaitSubscriptions(DEFAULT_TIMEOUT, subscriptions);
}
/**
* Wait for all {@link Subscription Subscriptions} to {@link Subscription#isActive() become active} but not longer
* than {@literal timeout}.
*
* @param timeout
* @param subscriptions
* @throws InterruptedException
*/
static void awaitSubscriptions(Duration timeout, Subscription... subscriptions) throws InterruptedException {
long passedMs = 0;
long maxMs = timeout.toMillis();
Collection<Subscription> subscriptionList = Arrays.asList(subscriptions);
while (!subscriptionList.stream().allMatch(Subscription::isActive) && passedMs < maxMs) {
Thread.sleep(10);
passedMs += 10;
}
}
/**
* Wait for {@link Subscription#isActive() to become active} but not longer than {@literal timeout}.
*
* @param subscription
* @param timeout
* @throws InterruptedException
*/
static void awaitSubscription(Subscription subscription, Duration timeout) throws InterruptedException {
subscription.await(timeout);
}
/**
* Wait for {@link CollectingMessageListener} to receive messages but not longer than {@link #DEFAULT_TIMEOUT}.
*
* @param listener
* @throws InterruptedException
*/
static void awaitMessages(CollectingMessageListener listener) throws InterruptedException {
awaitMessages(listener, Integer.MAX_VALUE);
}
/**
* Wait for {@link CollectingMessageListener} to receive exactly {@literal nrMessages} messages but not longer than
* {@link #DEFAULT_TIMEOUT}.
*
* @param listener
* @param nrMessages
* @throws InterruptedException
*/
static void awaitMessages(CollectingMessageListener listener, int nrMessages) throws InterruptedException {
awaitMessages(listener, nrMessages, DEFAULT_TIMEOUT);
}
/**
* Wait for {@link CollectingMessageListener} to receive exactly {@literal nrMessages} messages but not longer than
* {@literal timeout}.
*
* @param listener
* @param nrMessages
* @param timeout
* @throws InterruptedException
*/
static void awaitMessages(CollectingMessageListener listener, int nrMessages, Duration timeout)
throws InterruptedException {
long passedMs = 0;
long maxMs = timeout.toMillis();
while (listener.getTotalNumberMessagesReceived() < nrMessages && passedMs < maxMs) {
Thread.sleep(10);
passedMs += 10;
}
}
/**
* {@link MessageListener} implementation collecting received {@link Message messages}.
*
* @param <M>
*/
static class CollectingMessageListener<S, T> implements MessageListener<S, T> {
private volatile List<Message<S, T>> messages = new ArrayList<>();
@Override
public void onMessage(Message<S, T> message) {
messages.add(message);
}
int getTotalNumberMessagesReceived() {
return messages.size();
}
public List<Message<S, T>> getMessages() {
return messages;
}
public Message<S, T> getMessage(int nr) {
return messages.get(nr);
}
public Message<S, T> getFirstMessage() {
return messages.get(0);
}
public Message<S, T> getLastMessage() {
return messages.get(messages.size() - 1);
}
}
}

195
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/TailableCursorTests.java

@ -0,0 +1,195 @@ @@ -0,0 +1,195 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.messaging;
import static org.springframework.data.mongodb.core.messaging.SubscriptionUtils.*;
import static org.springframework.data.mongodb.core.query.Criteria.*;
import static org.springframework.data.mongodb.core.query.Query.*;
import static org.springframework.data.mongodb.test.util.Assertions.*;
import lombok.Data;
import org.bson.Document;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.CollectionOptions;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.mapping.Field;
import org.springframework.data.mongodb.core.messaging.Message.MessageProperties;
import org.springframework.data.mongodb.core.messaging.TailableCursorRequest.TailableCursorRequestOptions;
import com.mongodb.MongoClient;
/**
* Integration test for subscribing to a capped {@link com.mongodb.client.MongoCollection} inside the
* {@link DefaultMessageListenerContainer} using {@link TailableCursorRequest}.
*
* @author Christoph Strobl
*/
public class TailableCursorTests {
static final String COLLECTION_NAME = "user";
MongoTemplate template;
MessageListenerContainer container;
User jellyBelly;
User huffyFluffy;
User sugarSplashy;
@Before
public void setUp() {
template = new MongoTemplate(new MongoClient(), "tailable-cursor-tests");
template.dropCollection(User.class);
template.createCollection(User.class, CollectionOptions.empty().capped().maxDocuments(10000).size(10000));
container = new DefaultMessageListenerContainer(template);
container.start();
jellyBelly = new User();
jellyBelly.id = "id-1";
jellyBelly.userName = "jellyBelly";
jellyBelly.age = 7;
huffyFluffy = new User();
huffyFluffy.id = "id-2";
huffyFluffy.userName = "huffyFluffy";
huffyFluffy.age = 7;
sugarSplashy = new User();
sugarSplashy.id = "id-3";
sugarSplashy.userName = "sugarSplashy";
sugarSplashy.age = 5;
}
@After
public void tearDown() {
container.stop();
}
@Test // DATAMONGO-1803
public void readsDocumentMessageCorrectly() throws InterruptedException {
CollectingMessageListener<Document, Document> messageListener = new CollectingMessageListener<>();
awaitSubscription(
container.register(new TailableCursorRequest<>(messageListener, () -> COLLECTION_NAME), Document.class));
template.save(jellyBelly);
awaitMessages(messageListener, 1);
Document expected = new Document("_id", "id-1").append("user_name", "jellyBelly").append("age", 7).append("_class",
TailableCursorTests.User.class.getName());
assertThat(messageListener.getFirstMessage().getProperties())
.isEqualTo(MessageProperties.builder().collectionName("user").databaseName("tailable-cursor-tests").build());
assertThat(messageListener.getFirstMessage().getRaw()).isEqualTo(expected);
assertThat(messageListener.getFirstMessage().getBody()).isEqualTo(expected);
}
@Test // DATAMONGO-1803
public void convertsMessageCorrectly() throws InterruptedException {
CollectingMessageListener<Document, User> messageListener = new CollectingMessageListener<>();
awaitSubscription(
container.register(new TailableCursorRequest<>(messageListener, () -> COLLECTION_NAME), User.class));
template.save(jellyBelly);
awaitMessages(messageListener, 1);
Document expected = new Document("_id", "id-1").append("user_name", "jellyBelly").append("age", 7).append("_class",
TailableCursorTests.User.class.getName());
assertThat(messageListener.getFirstMessage().getProperties())
.isEqualTo(MessageProperties.builder().collectionName("user").databaseName("tailable-cursor-tests").build());
assertThat(messageListener.getFirstMessage().getRaw()).isEqualTo(expected);
assertThat(messageListener.getFirstMessage().getBody()).isEqualTo(jellyBelly);
}
@Test // DATAMONGO-1803
public void filtersMessagesCorrectly() throws InterruptedException {
CollectingMessageListener<Document, User> messageListener = new CollectingMessageListener<>();
awaitSubscription(container.register(new TailableCursorRequest<>(messageListener,
TailableCursorRequestOptions.builder().collection(COLLECTION_NAME).filter(query(where("age").is(7))).build()),
User.class));
template.save(jellyBelly);
template.save(sugarSplashy);
template.save(huffyFluffy);
awaitMessages(messageListener);
assertThat(messageListener.getMessages().stream().map(Message::getBody)).hasSize(2).doesNotContain(sugarSplashy);
}
@Test // DATAMONGO-1803
public void mapsFilterToDomainType() throws InterruptedException {
CollectingMessageListener<Document, User> messageListener = new CollectingMessageListener<>();
awaitSubscription(
container
.register(
new TailableCursorRequest<>(messageListener, TailableCursorRequestOptions.builder()
.collection(COLLECTION_NAME).filter(query(where("userName").is("sugarSplashy"))).build()),
User.class));
template.save(jellyBelly);
template.save(sugarSplashy);
template.save(huffyFluffy);
awaitMessages(messageListener);
assertThat(messageListener.getMessages().stream().map(Message::getBody)).hasSize(1).containsExactly(sugarSplashy);
}
@Test // DATAMONGO-1803
public void emitsFromStart() throws InterruptedException {
template.save(jellyBelly);
template.save(huffyFluffy);
CollectingMessageListener<Document, User> messageListener = new CollectingMessageListener<>();
awaitSubscription(
container.register(new TailableCursorRequest<>(messageListener, () -> COLLECTION_NAME), User.class));
template.save(sugarSplashy);
awaitMessages(messageListener);
assertThat(messageListener.getMessages().stream().map(Message::getBody)).hasSize(3).containsExactly(jellyBelly,
huffyFluffy, sugarSplashy);
}
@Data
static class User {
@Id String id;
@Field("user_name") String userName;
int age;
}
}

80
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/TaskFactoryUnitTests.java

@ -0,0 +1,80 @@ @@ -0,0 +1,80 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.messaging;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.*;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest.ChangeStreamRequestOptions;
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions;
import org.springframework.data.mongodb.core.messaging.TaskFactory.ChangeStreamTask;
import org.springframework.data.mongodb.core.messaging.TaskFactory.TailableCursorTask;
import org.springframework.util.ErrorHandler;
/**
* Unit tests for {@link TaskFactory}.
*
* @author Christoph Strobl
*/
@RunWith(MockitoJUnitRunner.class)
public class TaskFactoryUnitTests {
@Mock MongoConverter converter;
@Mock MongoTemplate template;
@Mock MessageListener<Object, Object> messageListener;
@Mock ErrorHandler errorHandler;
TaskFactory factory;
@Before
public void setUp() {
when(template.getConverter()).thenReturn(converter);
factory = new TaskFactory(template);
}
@Test(expected = IllegalArgumentException.class) // DATAMONGO-1803
public void requestMustNotBeNull() {
factory.forRequest(null, Object.class, errorHandler);
}
@Test // DATAMONGO-1803
public void createsChangeStreamRequestCorrectly() {
ChangeStreamRequestOptions options = Mockito.mock(ChangeStreamRequestOptions.class);
Task task = factory.forRequest(new ChangeStreamRequest(messageListener, options), Object.class, errorHandler);
assertThat(task).isInstanceOf(ChangeStreamTask.class);
}
@Test // DATAMONGO-1803
public void createsTailableRequestCorrectly() {
RequestOptions options = Mockito.mock(RequestOptions.class);
when(options.getCollectionName()).thenReturn("collection-1");
Task task = factory.forRequest(new TailableCursorRequest(messageListener, options), Object.class, errorHandler);
assertThat(task).isInstanceOf(TailableCursorTask.class);
}
}

27
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/monitor/Resumeable.java

@ -0,0 +1,27 @@ @@ -0,0 +1,27 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.monitor;
import java.util.function.Supplier;
/**
* @author Christoph Strobl
* @since 2018/01
*/
interface Resumeable<T> {
void resumeAt(Supplier<T> token);
}

106
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/test/util/ReplicaSet.java

@ -0,0 +1,106 @@ @@ -0,0 +1,106 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.test.util;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.bson.Document;
import org.junit.AssumptionViolatedException;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import org.springframework.test.annotation.IfProfileValue;
import com.mongodb.MongoClient;
/**
* {@link TestRule} evaluating if MongoDB Server is running with {@code --replSet} flag.
*
* @author Christoph Strobl
*/
public class ReplicaSet implements TestRule {
boolean required = false;
AtomicReference<Boolean> runsAsReplicaSet = new AtomicReference<>();
private ReplicaSet(boolean required) {
this.required = required;
}
/**
* A MongoDB server running with {@code --replSet} flag is required to execute tests.
*
* @return new instance of {@link ReplicaSet}.
*/
public static ReplicaSet required() {
return new ReplicaSet(true);
}
/**
* A MongoDB server running with {@code --replSet} flag might be required to execute some tests. Those tests are
* marked with {@code @IfProfileValue(name="replSet", value="true")}.
*
* @return new instance of {@link ReplicaSet}.
*/
public static ReplicaSet none() {
return new ReplicaSet(false);
}
@Override
public Statement apply(Statement base, Description description) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
if (!required) {
IfProfileValue profileValue = description.getAnnotation(IfProfileValue.class);
if (profileValue == null || !profileValue.name().equalsIgnoreCase("replSet")) {
base.evaluate();
return;
}
if (!Boolean.valueOf(profileValue.value())) {
base.evaluate();
return;
}
}
if (!runsAsReplicaSet()) {
throw new AssumptionViolatedException("Not runnig in repl set mode");
}
base.evaluate();
}
};
}
public boolean runsAsReplicaSet() {
if (runsAsReplicaSet.get() == null) {
try (MongoClient client = new MongoClient()) {
boolean tmp = client.getDatabase("admin").runCommand(new Document("getCmdLineOpts", "1"))
.get("argv", List.class).contains("--replSet");
runsAsReplicaSet.compareAndSet(null, tmp);
}
}
return runsAsReplicaSet.get();
}
}

1
src/main/asciidoc/new-features.adoc

@ -7,6 +7,7 @@ @@ -7,6 +7,7 @@
* <<mongo-template.query.distinct,Distinct queries>> for imperative and reactive Template API.
* <<mongo.mongo-3.validation,`validator` support for collections>>.
* <<mongo.jsonSchema,`$jsonSchema` support>> for queries and collection creation.
* <<changes-streams, Change Stream support>> for imperative and reactive drivers.
[[new-features.2-0-0]]
== What's new in Spring Data MongoDB 2.0

53
src/main/asciidoc/reference/mongodb.adoc

@ -2938,3 +2938,56 @@ class GridFsClient { @@ -2938,3 +2938,56 @@ class GridFsClient {
====
`GridFsOperations` extending `ResourcePatternResolver` allows the `GridFsTemplate` e.g. to be plugged into an `ApplicationContext` to read Spring Config files from a MongoDB.
[[changes-streams]]
== Change Streams
As of MongoDB 3.6, https://docs.mongodb.com/manual/changeStreams/[Change Streams] allow application to get notified about changes without having to tailing the oplog.
NOTE: Change Stream support is only possible for replica sets or a sharded cluster.
Change Streams can be subscribed to with both the imperative and the reactive MongoDB java driver. It is highly recommended to use the reactive variant as it is less resource intensive. However if you do not feel comfortable using the reactive API for whatever reason, you can sill obtain the change events via a Messaging concept already common in the Spring ecosystem.
=== Change Streams - Sync
Listening to a https://docs.mongodb.com/manual/tutorial/change-streams-example/[Change Stream using a Sync Driver] is a long running, blocking task that needs to be delegated to a separate component.
In this case we need to create a `MessageListenerContainer` first which will be the main entry point for running the specific ``SubscriptionRequest``s.
Spring Data MongoDB already ships with a default implementation that operates upon `MongoTemplate` and is capable of creating and executing ``Task``s for a `ChangeStreamRequest`.
.Change Streams with `MessageListeners`
====
[source,java]
----
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start(); <1>
MessageListener<Message<ChangeStreamDocument<Document>, User>> listener = System.out::println; <2>
ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("user", ChangeStreamOptions.empty()); <3>
Subscription subscription = container.register(new ChangeStreamRequest<>(listener, options), User.class); <4>
// ...
container.stop(); <5>
----
<1> Starting the container intializes the resources and starts the ``Task``s for allready registered ``SubscriptionRequest``s. Requests added after the statup are run immeadeately.
<2> Define the listener called when a `Message` is reveived. The `Message#getBody()` is be converted to the domain type registered. Use `Document.class` to just get the raw results.
<3> Set the collection to listen to and provide additional options via `ChangeStreamOptions`.
<4> Register the request. The returned `Subscription` can be used to check the current `Task` state and cancel the execution to free resources.
<5> Do not forget to stop the container once you're sure you won't need it any more. This will stop all running ``Task``s within the container.
====
=== Change Streams - Reactive
Subscribing to Change Stream via the reactive API is clearly more straight forward. Still the building blocks like `ChangeStreamOptions` remain the same.
.Change Streams with `MessageListeners`
====
[source,java]
----
Aggregation filter = newAggregation(User.class, match(where("age").gte(38)); <1>
Flux<ChangeStreamEvent<User>> flux = reactiveTemplate.changeStream(filter), User.class, ChangeStreamOptions.empty()); <2>
----
<1> Use an aggregation pipeline to filter events.
<2> Obtain the flux emitting change stream events. The `ChangeStreamEvent#getBody()` is be converted to the domain type registered. Use `Document.class` to just get the raw results.
====
Loading…
Cancel
Save