Browse Source

DATAMONGO-1803 - Polishing.

Allow reuse of builders instead of resetting state after MessagePropertiesBuilder.build(). Use Java streams where possible. Slightly reorder fields to match constructor argument order. Add generics to request builders and introduce typed builder(…) methods to retain builder generics. Add builder for TailableCursorRequest.

Introduce factory method on MessageListenerContainer for container creation. Change Subscription.await() to use CountDownLatch instead of polling to integrate better with ManagedBlocker.

Add protected constructors to options and builder classes. Add assertions where appropriate. Move task classes into top-level types. Extract methods. Typo fixes in reference docs.

Original pull request: #528.
pull/528/merge
Mark Paluch 8 years ago
parent
commit
8745518131
  1. 9
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java
  2. 43
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java
  3. 2
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java
  4. 19
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java
  5. 44
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/PrefixingDelegatingAggregationOperationContext.java
  6. 98
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java
  7. 204
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java
  8. 251
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java
  9. 46
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java
  10. 85
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/LazyMappingDelegatingMessage.java
  11. 24
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/Message.java
  12. 2
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/MessageListener.java
  13. 17
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/MessageListenerContainer.java
  14. 36
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/Subscription.java
  15. 9
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/SubscriptionRequest.java
  16. 135
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/TailableCursorRequest.java
  17. 80
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/TailableCursorTask.java
  18. 11
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/Task.java
  19. 464
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/TaskFactory.java
  20. 26
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTests.java
  21. 4
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/CursorReadingTaskUnitTests.java
  22. 2
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainerTests.java
  23. 11
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainerUnitTests.java
  24. 45
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/TailableCursorRequestUnitTests.java
  25. 21
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/TailableCursorTests.java
  26. 2
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/TaskFactoryUnitTests.java
  27. 3
      src/main/asciidoc/new-features.adoc
  28. 20
      src/main/asciidoc/reference/mongodb.adoc

9
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java

@ -32,12 +32,13 @@ import com.mongodb.client.model.changestream.ChangeStreamDocument; @@ -32,12 +32,13 @@ import com.mongodb.client.model.changestream.ChangeStreamDocument;
* Streams</a>.
*
* @author Christoph Strobl
* @author Mark Paluch
* @since 2.1
*/
@EqualsAndHashCode
public class ChangeStreamEvent<T> {
@Nullable private final ChangeStreamDocument<Document> raw;
private final @Nullable ChangeStreamDocument<Document> raw;
private final Class<T> targetType;
private final MongoConverter converter;
@ -45,11 +46,11 @@ public class ChangeStreamEvent<T> { @@ -45,11 +46,11 @@ public class ChangeStreamEvent<T> {
/**
* @param raw can be {@literal null}.
* @param messageProperties must not be {@literal null}.
* @param targetType must not be {@literal null}.
* @param converter must not be {@literal null}.
*/
public ChangeStreamEvent(ChangeStreamDocument<Document> raw, Class<T> targetType, MongoConverter converter) {
public ChangeStreamEvent(@Nullable ChangeStreamDocument<Document> raw, Class<T> targetType,
MongoConverter converter) {
this.raw = raw;
this.targetType = targetType;
@ -76,7 +77,7 @@ public class ChangeStreamEvent<T> { @@ -76,7 +77,7 @@ public class ChangeStreamEvent<T> {
public T getBody() {
if (raw == null) {
return targetType.cast(raw);
return null;
}
if (raw.getFullDocument() == null) {

43
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java

@ -36,6 +36,7 @@ import com.mongodb.client.model.changestream.FullDocument; @@ -36,6 +36,7 @@ import com.mongodb.client.model.changestream.FullDocument;
* well {@link ReactiveMongoOperations} if you prefer it that way.
*
* @author Christoph Strobl
* @author Mark Paluch
* @since 2.1
*/
@EqualsAndHashCode
@ -46,6 +47,8 @@ public class ChangeStreamOptions { @@ -46,6 +47,8 @@ public class ChangeStreamOptions {
private @Nullable FullDocument fullDocumentLookup;
private @Nullable Collation collation;
protected ChangeStreamOptions() {}
/**
* @return {@link Optional#empty()} if not set.
*/
@ -82,10 +85,10 @@ public class ChangeStreamOptions { @@ -82,10 +85,10 @@ public class ChangeStreamOptions {
}
/**
* Obtain a shiny new {@link ChangeStreamRequestOptionsBuilder} and start defining options in this fancy fluent way.
* Just don't forget to call {@link ChangeStreamRequestOptionsBuilder#build() build()} when your're done.
* Obtain a shiny new {@link ChangeStreamOptionsBuilder} and start defining options in this fancy fluent way. Just
* don't forget to call {@link ChangeStreamOptionsBuilder#build() build()} when your're done.
*
* @return new instance of {@link ChangeStreamRequestOptionsBuilder}.
* @return new instance of {@link ChangeStreamOptionsBuilder}.
*/
public static ChangeStreamOptionsBuilder builder() {
return new ChangeStreamOptionsBuilder();
@ -99,7 +102,12 @@ public class ChangeStreamOptions { @@ -99,7 +102,12 @@ public class ChangeStreamOptions {
*/
public static class ChangeStreamOptionsBuilder {
private ChangeStreamOptions options = new ChangeStreamOptions();
private @Nullable Object filter;
private @Nullable BsonValue resumeToken;
private @Nullable FullDocument fullDocumentLookup;
private @Nullable Collation collation;
private ChangeStreamOptionsBuilder() {}
/**
* Set the collation to use.
@ -111,7 +119,7 @@ public class ChangeStreamOptions { @@ -111,7 +119,7 @@ public class ChangeStreamOptions {
Assert.notNull(collation, "Collation must not be null nor empty!");
options.collation = collation;
this.collation = collation;
return this;
}
@ -135,7 +143,7 @@ public class ChangeStreamOptions { @@ -135,7 +143,7 @@ public class ChangeStreamOptions {
Assert.notNull(filter, "Filter must not be null!");
options.filter = filter;
this.filter = filter;
return this;
}
@ -148,7 +156,8 @@ public class ChangeStreamOptions { @@ -148,7 +156,8 @@ public class ChangeStreamOptions {
public ChangeStreamOptionsBuilder filter(Document... filter) {
Assert.noNullElements(filter, "Filter must not contain null values");
options.filter = Arrays.asList(filter);
this.filter = Arrays.asList(filter);
return this;
}
@ -162,7 +171,8 @@ public class ChangeStreamOptions { @@ -162,7 +171,8 @@ public class ChangeStreamOptions {
public ChangeStreamOptionsBuilder resumeToken(BsonValue resumeToken) {
Assert.notNull(resumeToken, "ResumeToken must not be null!");
options.resumeToken = resumeToken;
this.resumeToken = resumeToken;
return this;
}
@ -185,15 +195,24 @@ public class ChangeStreamOptions { @@ -185,15 +195,24 @@ public class ChangeStreamOptions {
public ChangeStreamOptionsBuilder fullDocumentLookup(FullDocument lookup) {
Assert.notNull(lookup, "Lookup must not be null!");
options.fullDocumentLookup = lookup;
this.fullDocumentLookup = lookup;
return this;
}
/**
* @return the built {@link ChangeStreamOptions}
*/
public ChangeStreamOptions build() {
ChangeStreamOptions tmp = options;
options = new ChangeStreamOptions();
return tmp;
ChangeStreamOptions options = new ChangeStreamOptions();
options.filter = filter;
options.resumeToken = resumeToken;
options.fullDocumentLookup = fullDocumentLookup;
options.collation = collation;
return options;
}
}
}

2
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java

@ -1142,7 +1142,7 @@ public interface ReactiveMongoOperations extends ReactiveFluentMongoOperations { @@ -1142,7 +1142,7 @@ public interface ReactiveMongoOperations extends ReactiveFluentMongoOperations {
* Use {@link ChangeStreamOptions} to set arguments like {@link ChangeStreamOptions#getResumeToken() the resumeToken}
* for resuming change streams.
*
* @param filter can be {@literal null}.
* @param filter can be empty, must not be {@literal null}.
* @param resultType must not be {@literal null}.
* @param options must not be {@literal null}.
* @param collectionName must not be {@literal null} nor empty.

19
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java

@ -1788,6 +1788,10 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati @@ -1788,6 +1788,10 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
public <T> Flux<ChangeStreamEvent<T>> changeStream(@Nullable Aggregation filter, Class<T> resultType,
ChangeStreamOptions options, String collectionName) {
Assert.notNull(resultType, "Result type must not be null!");
Assert.notNull(options, "ChangeStreamOptions must not be null!");
Assert.hasText(collectionName, "Collection name must not be null or empty!");
if (filter == null) {
return changeStream(Collections.emptyList(), resultType, options, collectionName);
}
@ -1809,22 +1813,23 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati @@ -1809,22 +1813,23 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
public <T> Flux<ChangeStreamEvent<T>> changeStream(List<Document> filter, Class<T> resultType,
ChangeStreamOptions options, String collectionName) {
Assert.notNull(filter, "Filter must not be null!");
Assert.notNull(resultType, "Result type must not be null!");
Assert.notNull(options, "ChangeStreamOptions must not be null!");
Assert.hasText(collectionName, "Collection name must not be null or empty!");
ChangeStreamPublisher<Document> publisher = filter.isEmpty() ? getCollection(collectionName).watch()
: getCollection(collectionName).watch(filter);
if (options.getResumeToken().isPresent()) {
publisher = publisher.resumeAfter(options.getResumeToken().get().asDocument());
}
publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::resumeAfter).orElse(publisher);
publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation).orElse(publisher);
if (options.getFullDocumentLookup().isPresent() || resultType != Document.class) {
publisher = publisher.fullDocument(options.getFullDocumentLookup().isPresent()
? options.getFullDocumentLookup().get() : FullDocument.UPDATE_LOOKUP);
}
if (options.getCollation().isPresent()) {
publisher = publisher.collation(options.getCollation().map(Collation::toMongoCollation).get());
}
return Flux.from(publisher).map(document -> new ChangeStreamEvent(document, resultType, getConverter()));
return Flux.from(publisher).map(document -> new ChangeStreamEvent<>(document, resultType, getConverter()));
}
/*

44
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/PrefixingDelegatingAggregationOperationContext.java

@ -27,12 +27,13 @@ import org.bson.Document; @@ -27,12 +27,13 @@ import org.bson.Document;
import org.springframework.data.mongodb.core.aggregation.ExposedFields.FieldReference;
/**
* {@link AggregationOperationContext} implementation prefixing non command keys on root level with the given prefix.
* {@link AggregationOperationContext} implementation prefixing non-command keys on root level with the given prefix.
* Useful when mapping fields to domain specific types while having to prefix keys for query purpose.
* <p />
* Fields to be excluded from prefixing my be added to a {@literal blacklist}.
*
* @author Christoph Strobl
* @author Mark Paluch
* @since 2.1
*/
public class PrefixingDelegatingAggregationOperationContext implements AggregationOperationContext {
@ -59,7 +60,7 @@ public class PrefixingDelegatingAggregationOperationContext implements Aggregati @@ -59,7 +60,7 @@ public class PrefixingDelegatingAggregationOperationContext implements Aggregati
*/
@Override
public Document getMappedObject(Document document) {
return prefix(delegate.getMappedObject(document));
return doPrefix(delegate.getMappedObject(document));
}
/*
@ -80,31 +81,42 @@ public class PrefixingDelegatingAggregationOperationContext implements Aggregati @@ -80,31 +81,42 @@ public class PrefixingDelegatingAggregationOperationContext implements Aggregati
return delegate.getReference(name);
}
private Document prefix(Document source) {
@SuppressWarnings("unchecked")
private Document doPrefix(Document source) {
Document result = new Document();
for (Map.Entry<String, Object> entry : source.entrySet()) {
String key = (entry.getKey().startsWith("$") || blacklist.contains(entry.getKey())) ? entry.getKey()
: (prefix + "." + entry.getKey());
String key = prefixKey(entry.getKey());
Object value = entry.getValue();
if (entry.getValue() instanceof Collection) {
List tmp = new ArrayList();
for (Object o : (Collection) entry.getValue()) {
if (o instanceof Document) {
tmp.add(prefix((Document) o));
} else {
tmp.add(o);
}
}
value = tmp;
} else if (entry.getValue() instanceof Document) {
value = entry.getValue();
Collection<Object> sourceCollection = (Collection<Object>) entry.getValue();
value = prefixCollection(sourceCollection);
}
result.append(key, value);
}
return result;
}
private String prefixKey(String key) {
return (key.startsWith("$") || blacklist.contains(key)) ? key : (prefix + "." + key);
}
private Object prefixCollection(Collection<Object> sourceCollection) {
List<Object> prefixed = new ArrayList<>(sourceCollection.size());
for (Object o : sourceCollection) {
if (o instanceof Document) {
prefixed.add(doPrefix((Document) o));
} else {
prefixed.add(o);
}
}
return prefixed;
}
}

98
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java

@ -1,19 +1,3 @@ @@ -1,19 +1,3 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 2018 the original author or authors.
*
@ -63,10 +47,10 @@ import com.mongodb.client.model.changestream.FullDocument; @@ -63,10 +47,10 @@ import com.mongodb.client.model.changestream.FullDocument;
* <pre>
* <code>
* ChangeStreamOptions options = ChangeStreamOptions.builder()
* .filter(newAggregation(match(where("age").is(7))))
* .returnFullDocumentOnUpdate()
* .build();
*
* .filter(newAggregation(match(where("age").is(7))))
* .returnFullDocumentOnUpdate()
* .build();
*
* ChangeStreamRequest<Document> request = new ChangeStreamRequest<>(System.out::println, new ChangeStreamRequestOptions("collection-name", options));
* </code>
* </pre>
@ -77,11 +61,11 @@ import com.mongodb.client.model.changestream.FullDocument; @@ -77,11 +61,11 @@ import com.mongodb.client.model.changestream.FullDocument;
* <pre>
* <code>
* ChangeStreamRequest<Document> request = ChangeStreamRequest.builder()
* .collectionName("collection-name")
* .publishTo(System.out::println)
* .filter(newAggregation(match(where("age").is(7))))
* .fullDocumentLookup(UPDATE_LOOKUP)
* .build();
* .collection("collection-name")
* .publishTo(System.out::println)
* .filter(newAggregation(match(where("age").is(7))))
* .fullDocumentLookup(UPDATE_LOOKUP)
* .build();
* </code>
* </pre>
*
@ -93,12 +77,13 @@ import com.mongodb.client.model.changestream.FullDocument; @@ -93,12 +77,13 @@ import com.mongodb.client.model.changestream.FullDocument;
* {@link FullDocument#UPDATE_LOOKUP}.
*
* @author Christoph Strobl
* @author Mark Paluch
* @since 2.1
*/
public class ChangeStreamRequest<T>
implements SubscriptionRequest<ChangeStreamDocument<Document>, T, ChangeStreamRequestOptions> {
private final MessageListener<ChangeStreamDocument<Document>, T> messageListener;
private final MessageListener<ChangeStreamDocument<Document>, ? super T> messageListener;
private final ChangeStreamRequestOptions options;
/**
@ -108,7 +93,7 @@ public class ChangeStreamRequest<T> @@ -108,7 +93,7 @@ public class ChangeStreamRequest<T>
* @param messageListener must not be {@literal null}.
* @param options must not be {@literal null}.
*/
public ChangeStreamRequest(MessageListener<ChangeStreamDocument<Document>, T> messageListener,
public ChangeStreamRequest(MessageListener<ChangeStreamDocument<Document>, ? super T> messageListener,
RequestOptions options) {
Assert.notNull(messageListener, "MessageListener must not be null!");
@ -125,7 +110,7 @@ public class ChangeStreamRequest<T> @@ -125,7 +110,7 @@ public class ChangeStreamRequest<T>
* @see org.springframework.data.mongodb.monitor.SubscriptionRequest#getMessageListener()
*/
@Override
public MessageListener<ChangeStreamDocument<Document>, T> getMessageListener() {
public MessageListener<ChangeStreamDocument<Document>, ? super T> getMessageListener() {
return messageListener;
}
@ -148,6 +133,19 @@ public class ChangeStreamRequest<T> @@ -148,6 +133,19 @@ public class ChangeStreamRequest<T>
return new ChangeStreamRequestBuilder();
}
/**
* Obtain a shiny new {@link ChangeStreamRequestBuilder} and start defining your {@link ChangeStreamRequest} in this
* fancy fluent way. Just don't forget to call {@link ChangeStreamRequestBuilder#build() build()} when your're done.
*
* @return new instance of {@link ChangeStreamRequest}.
*/
public static <T> ChangeStreamRequestBuilder<T> builder(
MessageListener<ChangeStreamDocument<Document>, ? super T> listener) {
ChangeStreamRequestBuilder<T> builder = new ChangeStreamRequestBuilder<>();
return builder.publishTo(listener);
}
/**
* {@link SubscriptionRequest.RequestOptions} implementation specific to a {@link ChangeStreamRequest}.
*
@ -174,7 +172,7 @@ public class ChangeStreamRequest<T> @@ -174,7 +172,7 @@ public class ChangeStreamRequest<T>
this.options = options;
}
static ChangeStreamRequestOptions of(RequestOptions options) {
public static ChangeStreamRequestOptions of(RequestOptions options) {
Assert.notNull(options, "Options must not be null!");
@ -210,16 +208,18 @@ public class ChangeStreamRequest<T> @@ -210,16 +208,18 @@ public class ChangeStreamRequest<T>
public static class ChangeStreamRequestBuilder<T> {
private @Nullable String collectionName;
private @Nullable MessageListener<ChangeStreamDocument<Document>, T> listener;
private @Nullable MessageListener<ChangeStreamDocument<Document>, ? super T> listener;
private ChangeStreamOptionsBuilder delegate = ChangeStreamOptions.builder();
private ChangeStreamRequestBuilder() {}
/**
* Set the name of the {@link com.mongodb.client.MongoCollection} to listen to.
*
* @param collectionName must not be {@literal null} nor empty.
* @return this.
*/
public ChangeStreamRequestBuilder collection(String collectionName) {
public ChangeStreamRequestBuilder<T> collection(String collectionName) {
Assert.hasText(collectionName, "CollectionName must not be null!");
@ -233,7 +233,8 @@ public class ChangeStreamRequest<T> @@ -233,7 +233,8 @@ public class ChangeStreamRequest<T>
* @param messageListener must not be {@literal null}.
* @return this.
*/
public ChangeStreamRequestBuilder publishTo(MessageListener<ChangeStreamDocument<Document>, T> messageListener) {
public ChangeStreamRequestBuilder<T> publishTo(
MessageListener<ChangeStreamDocument<Document>, ? super T> messageListener) {
Assert.notNull(messageListener, "MessageListener must not be null!");
@ -253,13 +254,15 @@ public class ChangeStreamRequest<T> @@ -253,13 +254,15 @@ public class ChangeStreamRequest<T>
* Use {@link org.springframework.data.mongodb.core.aggregation.TypedAggregation} to ensure filter expressions are
* mapped to domain type fields.
*
* @param filter the {@link Aggregation Aggregation pipeline} to apply for filtering events. Must not be
* @param aggregation the {@link Aggregation Aggregation pipeline} to apply for filtering events. Must not be
* {@literal null}.
* @return this.
* @see ChangeStreamOptions#getFilter()
* @see ChangeStreamOptionsBuilder#filter(Aggregation)
*/
public ChangeStreamRequestBuilder filter(Aggregation aggregation) {
public ChangeStreamRequestBuilder<T> filter(Aggregation aggregation) {
Assert.notNull(aggregation, "Aggregation must not be null!");
this.delegate.filter(aggregation);
return this;
@ -268,12 +271,14 @@ public class ChangeStreamRequest<T> @@ -268,12 +271,14 @@ public class ChangeStreamRequest<T>
/**
* Set the plain filter chain to apply.
*
* @param filter must not be {@literal null} nor contain {@literal null} values.
* @param pipeline must not be {@literal null} nor contain {@literal null} values.
* @return this.
* @see ChangeStreamOptions#getFilter()
* @see ChangeStreamOptionsBuilder#filter(java.util.List)
*/
public ChangeStreamRequestBuilder filter(Document... pipeline) {
public ChangeStreamRequestBuilder<T> filter(Document... pipeline) {
Assert.notNull(pipeline, "Aggregation pipeline must not be null!");
Assert.noNullElements(pipeline, "Aggregation pipeline must not contain null elements!");
this.delegate.filter(pipeline);
return this;
@ -289,6 +294,8 @@ public class ChangeStreamRequest<T> @@ -289,6 +294,8 @@ public class ChangeStreamRequest<T>
*/
public ChangeStreamRequestBuilder collation(Collation collation) {
Assert.notNull(collation, "Collation must not be null!");
this.delegate.collation(collation);
return this;
}
@ -302,7 +309,9 @@ public class ChangeStreamRequest<T> @@ -302,7 +309,9 @@ public class ChangeStreamRequest<T>
* @see ChangeStreamOptions#getResumeToken()
* @see ChangeStreamOptionsBuilder#resumeToken(org.bson.BsonValue)
*/
public ChangeStreamRequestBuilder resumeToken(BsonValue resumeToken) {
public ChangeStreamRequestBuilder<T> resumeToken(BsonValue resumeToken) {
Assert.notNull(resumeToken, "Resume token not be null!");
this.delegate.resumeToken(resumeToken);
return this;
@ -316,14 +325,23 @@ public class ChangeStreamRequest<T> @@ -316,14 +325,23 @@ public class ChangeStreamRequest<T>
* @see ChangeStreamOptions#getFullDocumentLookup()
* @see ChangeStreamOptionsBuilder#fullDocumentLookup(FullDocument)
*/
public ChangeStreamRequestBuilder fullDocumentLookup(FullDocument lookup) {
public ChangeStreamRequestBuilder<T> fullDocumentLookup(FullDocument lookup) {
Assert.notNull(lookup, "FullDocument not be null!");
this.delegate.fullDocumentLookup(lookup);
return this;
}
/**
* @return the build {@link ChangeStreamRequest}.
*/
public ChangeStreamRequest<T> build() {
return new ChangeStreamRequest(listener, new ChangeStreamRequestOptions(collectionName, delegate.build()));
Assert.notNull(listener, "MessageListener must not be null!");
Assert.hasText(collectionName, "CollectionName must not be null!");
return new ChangeStreamRequest<>(listener, new ChangeStreamRequestOptions(collectionName, delegate.build()));
}
}
}

204
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java

@ -0,0 +1,204 @@ @@ -0,0 +1,204 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.messaging;
import lombok.AllArgsConstructor;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import org.bson.BsonDocument;
import org.bson.Document;
import org.springframework.data.mongodb.core.ChangeStreamEvent;
import org.springframework.data.mongodb.core.ChangeStreamOptions;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.PrefixingDelegatingAggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.TypeBasedAggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.convert.QueryMapper;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest.ChangeStreamRequestOptions;
import org.springframework.data.mongodb.core.messaging.Message.MessageProperties;
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions;
import org.springframework.lang.Nullable;
import org.springframework.util.ClassUtils;
import org.springframework.util.ErrorHandler;
import com.mongodb.MongoNamespace;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Collation;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
/**
* {@link Task} implementation for obtaining {@link ChangeStreamDocument ChangeStreamDocuments} from MongoDB.
*
* @author Christoph Strobl
* @since 2.1
*/
class ChangeStreamTask extends CursorReadingTask<ChangeStreamDocument<Document>, Object> {
private final Set<String> blacklist = new HashSet<>(
Arrays.asList("operationType", "fullDocument", "documentKey", "updateDescription", "ns"));
private final QueryMapper queryMapper;
private final MongoConverter mongoConverter;
@SuppressWarnings({ "unchecked", "rawtypes" })
ChangeStreamTask(MongoTemplate template, ChangeStreamRequest<?> request, Class<?> targetType,
ErrorHandler errorHandler) {
super(template, (ChangeStreamRequest) request, (Class) targetType, errorHandler);
queryMapper = new QueryMapper(template.getConverter());
mongoConverter = template.getConverter();
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.core.messaging.CursorReadingTask#initCursor(org.springframework.data.mongodb.core.MongoTemplate, org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions, java.lang.Class)
*/
@Override
protected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate template, RequestOptions options,
Class<?> targetType) {
List<Document> filter = Collections.emptyList();
BsonDocument resumeToken = new BsonDocument();
Collation collation = null;
FullDocument fullDocument = FullDocument.DEFAULT;
if (options instanceof ChangeStreamRequest.ChangeStreamRequestOptions) {
ChangeStreamOptions changeStreamOptions = ((ChangeStreamRequestOptions) options).getChangeStreamOptions();
filter = prepareFilter(template, changeStreamOptions);
if (changeStreamOptions.getFilter().isPresent()) {
Object val = changeStreamOptions.getFilter().get();
if (val instanceof Aggregation) {
collation = ((Aggregation) val).getOptions().getCollation()
.map(org.springframework.data.mongodb.core.query.Collation::toMongoCollation).orElse(null);
}
}
if (changeStreamOptions.getResumeToken().isPresent()) {
resumeToken = changeStreamOptions.getResumeToken().get().asDocument();
}
fullDocument = changeStreamOptions.getFullDocumentLookup()
.orElseGet(() -> ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT
: FullDocument.UPDATE_LOOKUP);
}
ChangeStreamIterable<Document> iterable = filter.isEmpty()
? template.getCollection(options.getCollectionName()).watch(Document.class)
: template.getCollection(options.getCollectionName()).watch(filter, Document.class);
if (!resumeToken.isEmpty()) {
iterable = iterable.resumeAfter(resumeToken);
}
if (collation != null) {
iterable = iterable.collation(collation);
}
iterable = iterable.fullDocument(fullDocument);
return iterable.iterator();
}
List<Document> prepareFilter(MongoTemplate template, ChangeStreamOptions options) {
if (!options.getFilter().isPresent()) {
return Collections.emptyList();
}
Object filter = options.getFilter().get();
if (filter instanceof Aggregation) {
Aggregation agg = (Aggregation) filter;
AggregationOperationContext context = agg instanceof TypedAggregation
? new TypeBasedAggregationOperationContext(((TypedAggregation<?>) agg).getInputType(),
template.getConverter().getMappingContext(), queryMapper)
: Aggregation.DEFAULT_CONTEXT;
return agg.toPipeline(new PrefixingDelegatingAggregationOperationContext(context, "fullDocument", blacklist));
} else if (filter instanceof List) {
return (List<Document>) filter;
} else {
throw new IllegalArgumentException(
"ChangeStreamRequestOptions.filter mut be either an Aggregation or a plain list of Documents");
}
}
@Override
protected Message<ChangeStreamDocument<Document>, Object> createMessage(ChangeStreamDocument<Document> source,
Class<Object> targetType, RequestOptions options) {
// namespace might be null for eg. OperationType.INVALIDATE
MongoNamespace namespace = Optional.ofNullable(source.getNamespace())
.orElse(new MongoNamespace("unknown", options.getCollectionName()));
return new ChangeStreamEventMessage<>(new ChangeStreamEvent<>(source, targetType, mongoConverter), MessageProperties
.builder().databaseName(namespace.getDatabaseName()).collectionName(namespace.getCollectionName()).build());
}
/**
* {@link Message} implementation for ChangeStreams
*
* @since 2.1
*/
@AllArgsConstructor
static class ChangeStreamEventMessage<T> implements Message<ChangeStreamDocument<Document>, T> {
private final ChangeStreamEvent<T> delegate;
private final MessageProperties messageProperties;
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.core.messaging.Message#getRaw()
*/
@Nullable
@Override
public ChangeStreamDocument<Document> getRaw() {
return delegate.getRaw();
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.core.messaging.Message#getBody()
*/
@Nullable
@Override
public T getBody() {
return delegate.getBody();
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.core.messaging.Message#getProperties()
*/
@Override
public MessageProperties getProperties() {
return this.messageProperties;
}
}
}

251
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java

@ -0,0 +1,251 @@ @@ -0,0 +1,251 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.messaging;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.messaging.Message.MessageProperties;
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions;
import org.springframework.lang.Nullable;
import org.springframework.util.ErrorHandler;
import com.mongodb.client.MongoCursor;
import com.mysema.commons.lang.Assert;
/**
* @author Christoph Strobl
* @author Mark Paluch
* @param <T> type of objects returned by the cursor.
* @param <R> conversion target type.
* @since 2.1
*/
abstract class CursorReadingTask<T, R> implements Task {
private final Object lifecycleMonitor = new Object();
private final MongoTemplate template;
private final SubscriptionRequest<T, R, RequestOptions> request;
private final Class<R> targetType;
private final ErrorHandler errorHandler;
private final CountDownLatch awaitStart = new CountDownLatch(1);
private State state = State.CREATED;
private MongoCursor<T> cursor;
/**
* @param template must not be {@literal null}.
* @param request must not be {@literal null}.
* @param targetType must not be {@literal null}.
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
CursorReadingTask(MongoTemplate template, SubscriptionRequest<?, ? super T, ? extends RequestOptions> request,
Class<R> targetType, ErrorHandler errorHandler) {
this.template = template;
this.request = (SubscriptionRequest) request;
this.targetType = targetType;
this.errorHandler = errorHandler;
}
/*
* (non-Javadoc)
* @see java.lang.Runnable
*/
@Override
public void run() {
start();
while (isRunning()) {
try {
T next = getNext();
if (next != null) {
emitMessage(createMessage(next, targetType, request.getRequestOptions()));
} else {
Thread.sleep(10);
}
} catch (InterruptedException e) {
synchronized (lifecycleMonitor) {
state = State.CANCELLED;
}
Thread.interrupted();
} catch (RuntimeException e) {
Exception translated = template.getExceptionTranslator().translateExceptionIfPossible(e);
Exception toHandle = translated != null ? translated : e;
errorHandler.handleError(toHandle);
}
}
}
/**
* Initialize the Task by 1st setting the current state to {@link State#STARTING starting} indicating the
* initialization procedure. <br />
* Moving on the underlying {@link MongoCursor} gets {@link #initCursor(MongoTemplate, RequestOptions) created} and is
* {@link #isValidCursor(MongoCursor) health checked}. Once a valid {@link MongoCursor} is created the {@link #state}
* is set to {@link State#RUNNING running}. If the health check is not passed the {@link MongoCursor} is immediately
* {@link MongoCursor#close() closed} and a new {@link MongoCursor} is requested until a valid one is retrieved or the
* {@link #state} changes.
*/
private void start() {
synchronized (lifecycleMonitor) {
if (!State.RUNNING.equals(state)) {
state = State.STARTING;
}
}
do {
boolean valid = false;
synchronized (lifecycleMonitor) {
if (State.STARTING.equals(state)) {
MongoCursor<T> cursor = initCursor(template, request.getRequestOptions(), targetType);
valid = isValidCursor(cursor);
if (valid) {
this.cursor = cursor;
state = State.RUNNING;
} else {
cursor.close();
}
}
}
if (!valid) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
synchronized (lifecycleMonitor) {
state = State.CANCELLED;
}
Thread.interrupted();
}
}
} while (State.STARTING.equals(getState()));
if (awaitStart.getCount() == 1) {
awaitStart.countDown();
}
}
protected abstract MongoCursor<T> initCursor(MongoTemplate template, RequestOptions options, Class<?> targetType);
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.core.messaging.Cancelable#cancel()
*/
@Override
public void cancel() throws DataAccessResourceFailureException {
synchronized (lifecycleMonitor) {
if (State.RUNNING.equals(state) || State.STARTING.equals(state)) {
this.state = State.CANCELLED;
if (cursor != null) {
cursor.close();
}
}
}
}
/*
* (non-Javadoc)
* @see org.springframework.scheduling.SchedulingAwareRunnable#isLongLived()
*/
@Override
public boolean isLongLived() {
return true;
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.core.messaging.Task#getState()
*/
@Override
public State getState() {
synchronized (lifecycleMonitor) {
return state;
}
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.core.messaging.Task#awaitStart(java.time.Duration)
*/
@Override
public boolean awaitStart(Duration timeout) throws InterruptedException {
Assert.notNull(timeout, "Timeout must not be null!");
Assert.isFalse(timeout.isNegative(), "Timeout must not be negative!");
return awaitStart.await(timeout.toNanos(), TimeUnit.NANOSECONDS);
}
protected Message<T, R> createMessage(T source, Class<R> targetType, RequestOptions options) {
SimpleMessage<T, T> message = new SimpleMessage<>(source, source, MessageProperties.builder()
.databaseName(template.getDb().getName()).collectionName(options.getCollectionName()).build());
return new LazyMappingDelegatingMessage<>(message, targetType, template.getConverter());
}
private boolean isRunning() {
return State.RUNNING.equals(getState());
}
@SuppressWarnings("unchecked")
private void emitMessage(Message<T, R> message) {
request.getMessageListener().onMessage((Message) message);
}
@Nullable
private T getNext() {
synchronized (lifecycleMonitor) {
if (State.RUNNING.equals(state)) {
return cursor.tryNext();
}
}
throw new IllegalStateException(String.format("Cursor %s is not longer open.", cursor));
}
private static boolean isValidCursor(@Nullable MongoCursor<?> cursor) {
if (cursor == null) {
return false;
}
if (cursor.getServerCursor() == null || cursor.getServerCursor().getId() == 0) {
return false;
}
return true;
}
}

46
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java

@ -19,6 +19,7 @@ import lombok.AccessLevel; @@ -19,6 +19,7 @@ import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.RequiredArgsConstructor;
import java.time.Duration;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
@ -38,23 +39,24 @@ import org.springframework.util.ErrorHandler; @@ -38,23 +39,24 @@ import org.springframework.util.ErrorHandler;
* Simple {@link Executor} based {@link MessageListenerContainer} implementation for running {@link Task tasks} like
* listening to MongoDB <a href="https://docs.mongodb.com/manual/changeStreams/">Change Streams</a> and tailable
* cursors.
* <p />
* This message container creates long-running tasks that are executed on {@link Executor}.
*
* @author Christoph Strobl
* @author Mark Paluch
* @since 2.1
*/
public class DefaultMessageListenerContainer implements MessageListenerContainer {
private final Executor taskExecutor;
private final TaskFactory taskFactory;
private final Optional<ErrorHandler> errorHandler;
private final Object lifecycleMonitor = new Object();
private final Map<SubscriptionRequest, Subscription> subscriptions = new LinkedHashMap<>();
private int phase = Integer.MAX_VALUE;
private boolean running = false;
private final Map<SubscriptionRequest, Subscription> subscriptions = new LinkedHashMap<>();
private final TaskFactory taskFactory;
private final Optional<ErrorHandler> errorHandler;
/**
* Create a new {@link DefaultMessageListenerContainer}.
*
@ -124,18 +126,18 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer @@ -124,18 +126,18 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer
synchronized (lifecycleMonitor) {
if (!this.running) {
if (this.running) {
return;
}
for (Subscription subscription : subscriptions.values()) {
subscriptions.values().stream() //
.filter(it -> !it.isActive()) //
.filter(it -> it instanceof TaskSubscription) //
.map(TaskSubscription.class::cast) //
.map(TaskSubscription::getTask) //
.forEach(taskExecutor::execute);
if (!subscription.isActive()) {
if (subscription instanceof TaskSubscription) {
taskExecutor.execute(((TaskSubscription) subscription).getTask());
}
}
}
running = true;
}
running = true;
}
}
@ -149,9 +151,9 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer @@ -149,9 +151,9 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer
synchronized (lifecycleMonitor) {
if (this.running) {
for (Subscription subscription : subscriptions.values()) {
subscription.cancel();
}
subscriptions.values().forEach(Cancelable::cancel);
running = false;
}
}
@ -175,7 +177,7 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer @@ -175,7 +177,7 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer
*/
@Override
public int getPhase() {
return this.phase;
return Integer.MAX_VALUE;
}
/*
@ -207,6 +209,7 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer @@ -207,6 +209,7 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer
*/
@Override
public Optional<Subscription> lookup(SubscriptionRequest<?, ?, ?> request) {
synchronized (lifecycleMonitor) {
return Optional.ofNullable(subscriptions.get(request));
}
@ -274,6 +277,11 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer @@ -274,6 +277,11 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer
return task.isActive();
}
@Override
public boolean await(Duration timeout) throws InterruptedException {
return task.awaitStart(timeout);
}
@Override
public void cancel() throws DataAccessResourceFailureException {
task.cancel();

85
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/LazyMappingDelegatingMessage.java

@ -0,0 +1,85 @@ @@ -0,0 +1,85 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.messaging;
import lombok.ToString;
import org.bson.Document;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.util.ClassUtils;
/**
* @author Christoph Strobl
* @author Mark Paluch
* @since 2.1
*/
@ToString(of = { "delegate", "targetType" })
class LazyMappingDelegatingMessage<S, T> implements Message<S, T> {
private final Message<S, ?> delegate;
private final Class<T> targetType;
private final MongoConverter converter;
LazyMappingDelegatingMessage(Message<S, ?> delegate, Class<T> targetType, MongoConverter converter) {
this.delegate = delegate;
this.targetType = targetType;
this.converter = converter;
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.core.messaging.Message#getRaw()
*/
@Override
public S getRaw() {
return delegate.getRaw();
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.core.messaging.Message#getBody()
*/
@Override
public T getBody() {
if (delegate.getBody() == null || targetType.equals(delegate.getBody().getClass())) {
return targetType.cast(delegate.getBody());
}
Object messageBody = delegate.getBody();
if (ClassUtils.isAssignable(Document.class, messageBody.getClass())) {
return converter.read(targetType, (Document) messageBody);
}
if (converter.getConversionService().canConvert(messageBody.getClass(), targetType)) {
return converter.getConversionService().convert(messageBody, targetType);
}
throw new IllegalArgumentException(
String.format("No converter found capable of converting %s to %s", messageBody.getClass(), targetType));
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.core.messaging.Message#getProperties()
*/
@Override
public MessageProperties getProperties() {
return delegate.getProperties();
}
}

24
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/Message.java

@ -32,6 +32,7 @@ import org.springframework.util.Assert; @@ -32,6 +32,7 @@ import org.springframework.util.Assert;
* using the mapping infrastructure.
*
* @author Christoph Strobl
* @author Mark Paluch
* @see MessageProperties
* @since 2.1
*/
@ -66,7 +67,7 @@ public interface Message<S, T> { @@ -66,7 +67,7 @@ public interface Message<S, T> {
*/
@ToString
@EqualsAndHashCode
static class MessageProperties {
class MessageProperties {
private static final MessageProperties EMPTY = new MessageProperties();
@ -118,7 +119,8 @@ public interface Message<S, T> { @@ -118,7 +119,8 @@ public interface Message<S, T> {
*/
public static class MessagePropertiesBuilder {
private MessageProperties properties = new MessageProperties();
private @Nullable String databaseName;
private @Nullable String collectionName;
/**
* @param dbName must not be {@literal null}.
@ -126,9 +128,9 @@ public interface Message<S, T> { @@ -126,9 +128,9 @@ public interface Message<S, T> {
*/
public MessagePropertiesBuilder databaseName(String dbName) {
Assert.notNull(dbName, "DbName must not be null!");
Assert.notNull(dbName, "Database name must not be null!");
properties.databaseName = dbName;
this.databaseName = dbName;
return this;
}
@ -138,16 +140,22 @@ public interface Message<S, T> { @@ -138,16 +140,22 @@ public interface Message<S, T> {
*/
public MessagePropertiesBuilder collectionName(String collectionName) {
Assert.notNull(collectionName, "CollectionName must not be null!");
Assert.notNull(collectionName, "Collection name must not be null!");
properties.collectionName = collectionName;
this.collectionName = collectionName;
return this;
}
/**
* @return the built {@link MessageProperties}.
*/
public MessageProperties build() {
MessageProperties properties = this.properties;
this.properties = new MessageProperties();
MessageProperties properties = new MessageProperties();
properties.collectionName = collectionName;
properties.databaseName = databaseName;
return properties;
}
}

2
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/MessageListener.java

@ -19,6 +19,8 @@ package org.springframework.data.mongodb.core.messaging; @@ -19,6 +19,8 @@ package org.springframework.data.mongodb.core.messaging;
* Listener interface to receive delivery of {@link Message Messages}.
*
* @author Christoph Strobl
* @param <S> source message type.
* @param <T> target message type.
* @since 2.1
*/
@FunctionalInterface

17
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/MessageListenerContainer.java

@ -18,6 +18,7 @@ package org.springframework.data.mongodb.core.messaging; @@ -18,6 +18,7 @@ package org.springframework.data.mongodb.core.messaging;
import java.util.Optional;
import org.springframework.context.SmartLifecycle;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions;
import org.springframework.util.ErrorHandler;
@ -28,7 +29,17 @@ import org.springframework.util.ErrorHandler; @@ -28,7 +29,17 @@ import org.springframework.util.ErrorHandler;
* @author Christoph Strobl
* @since 2.1
*/
interface MessageListenerContainer extends SmartLifecycle {
public interface MessageListenerContainer extends SmartLifecycle {
/**
* Create a new {@link MessageListenerContainer} given {@link MongoTemplate}.
*
* @param template must not be {@literal null}.
* @return a new {@link MessageListenerContainer} using {@link MongoTemplate}.
*/
static MessageListenerContainer create(MongoTemplate template) {
return new DefaultMessageListenerContainer(template);
}
/**
* Register a new {@link SubscriptionRequest} in the container. If the {@link MessageListenerContainer#isRunning() is
@ -63,10 +74,10 @@ interface MessageListenerContainer extends SmartLifecycle { @@ -63,10 +74,10 @@ interface MessageListenerContainer extends SmartLifecycle {
* <pre>
* <code>
* MessageListenerContainer container = ...
*
*
* MessageListener<ChangeStreamDocument<Document>, Document> messageListener = (message) -> message.getBody().toJson();
* ChangeStreamRequest<Document> request = new ChangeStreamRequest<>(messageListener, () -> "collection-name");
*
*
* Subscription subscription = container.register(request, Document.class);
* </code>
* </pre>

36
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/Subscription.java

@ -17,8 +17,6 @@ package org.springframework.data.mongodb.core.messaging; @@ -17,8 +17,6 @@ package org.springframework.data.mongodb.core.messaging;
import java.time.Duration;
import org.springframework.util.Assert;
/**
* The {@link Subscription} is the link between the {@link SubscriptionRequest} and the actual running {@link Task}.
* <p />
@ -27,6 +25,7 @@ import org.springframework.util.Assert; @@ -27,6 +25,7 @@ import org.springframework.util.Assert;
* <p />
*
* @author Christoph Strobl
* @author Mark Paluch
* @since 2.1
*/
public interface Subscription extends Cancelable {
@ -37,34 +36,13 @@ public interface Subscription extends Cancelable { @@ -37,34 +36,13 @@ public interface Subscription extends Cancelable {
boolean isActive();
/**
* Synchronous, <strong>blocking</strong> call that polls the current state and returns once the {@link Subscription}
* becomes {@link #isActive() active}.
* <p />
* If interrupted while waiting the current Subscription state is returned.
* Synchronous, <strong>blocking</strong> call returns once the {@link Subscription} becomes {@link #isActive()
* active} or {@link Duration timeout} exceeds.
*
* @param timeout must not be {@literal null}.
* @return {@code true} if the subscription was activated. {@code false} if the waiting time elapsed before task was
* activated.
* @throws InterruptedException if the current thread is interrupted while waiting.
*/
default boolean await(Duration timeout) {
Assert.notNull(timeout, "Timeout must not be null!");
long sleepTime = 25;
long currentMs = System.currentTimeMillis();
long targetMs = currentMs + timeout.toMillis();
while (currentMs < targetMs && !isActive()) {
try {
Thread.sleep(sleepTime);
currentMs += sleepTime;
} catch (InterruptedException e) {
Thread.interrupted();
break;
}
}
return isActive();
}
boolean await(Duration timeout) throws InterruptedException;
}

9
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/SubscriptionRequest.java

@ -21,19 +21,19 @@ import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.Reque @@ -21,19 +21,19 @@ import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.Reque
* The actual {@link SubscriptionRequest} sent to the {@link MessageListenerContainer}. This wrapper type allows passing
* in {@link RequestOptions additional information} to the Container which can be used for creating the actual
* {@link Task} running. <br />
* The {@link MessageListener} provides the callback interface when pushing {@link Message massages}.
* The {@link MessageListener} provides the callback interface when pushing {@link Message messages}.
*
* @author Christoph Strobl
* @since 2.1
*/
interface SubscriptionRequest<S, T, O extends RequestOptions> {
public interface SubscriptionRequest<S, T, O extends RequestOptions> {
/**
* Obtain the {@link MessageListener} to publish {@link Message messages} to.
*
* @return never {@literal null}.
*/
MessageListener<S, T> getMessageListener();
MessageListener<S, ? super T> getMessageListener();
/**
* Get the {@link RequestOptions} specifying the requests behaviour.
@ -48,12 +48,11 @@ interface SubscriptionRequest<S, T, O extends RequestOptions> { @@ -48,12 +48,11 @@ interface SubscriptionRequest<S, T, O extends RequestOptions> {
* @author Christoph Strobl
* @since 2.1
*/
static interface RequestOptions {
interface RequestOptions {
/**
* @return the name of the collection to subscribe to. Never {@literal null}.
*/
String getCollectionName();
}
}

135
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/TailableCursorRequest.java

@ -19,6 +19,7 @@ import java.util.Optional; @@ -19,6 +19,7 @@ import java.util.Optional;
import org.bson.Document;
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions;
import org.springframework.data.mongodb.core.messaging.TailableCursorRequest.TailableCursorRequestOptions.TailableCursorRequestOptionsBuilder;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
@ -37,12 +38,25 @@ import org.springframework.util.Assert; @@ -37,12 +38,25 @@ import org.springframework.util.Assert;
* </code>
* </pre>
*
* {@link TailableCursorRequestBuilder} offers a fluent API for creating {@link TailableCursorRequest} with
* {@link TailableCursorRequestOptions} in one go.
*
* <pre>
* <code>
* TailableCursorRequest<Document> request = TailableCursorRequest.builder()
* .collection("collection-name")
* .publishTo(System.out::println)
* .build();
* </code>
* </pre>
*
* @author Christoph Strobl
* @author Mark Paluch
* @since 2.1
*/
public class TailableCursorRequest<T> implements SubscriptionRequest<Document, T, RequestOptions> {
private final MessageListener<Document, T> messageListener;
private final MessageListener<Document, ? super T> messageListener;
private final TailableCursorRequestOptions options;
/**
@ -52,7 +66,7 @@ public class TailableCursorRequest<T> implements SubscriptionRequest<Document, T @@ -52,7 +66,7 @@ public class TailableCursorRequest<T> implements SubscriptionRequest<Document, T
* @param messageListener must not be {@literal null}.
* @param options must not be {@literal null}.
*/
public TailableCursorRequest(MessageListener<Document, T> messageListener, RequestOptions options) {
public TailableCursorRequest(MessageListener<Document, ? super T> messageListener, RequestOptions options) {
Assert.notNull(messageListener, "MessageListener must not be null!");
Assert.notNull(options, "Options must not be null!");
@ -67,7 +81,7 @@ public class TailableCursorRequest<T> implements SubscriptionRequest<Document, T @@ -67,7 +81,7 @@ public class TailableCursorRequest<T> implements SubscriptionRequest<Document, T
* @see org.springframework.data.mongodb.monitor.SubscriptionRequest#getMessageListener()
*/
@Override
public MessageListener<Document, T> getMessageListener() {
public MessageListener<Document, ? super T> getMessageListener() {
return messageListener;
}
@ -80,6 +94,28 @@ public class TailableCursorRequest<T> implements SubscriptionRequest<Document, T @@ -80,6 +94,28 @@ public class TailableCursorRequest<T> implements SubscriptionRequest<Document, T
return options;
}
/**
* Obtain a shiny new {@link TailableCursorRequestBuilder} and start defining options in this fancy fluent way. Just
* don't forget to call {@link TailableCursorRequestBuilder#build() build()} when your're done.
*
* @return new instance of {@link TailableCursorRequestBuilder}.
*/
public static TailableCursorRequestBuilder builder() {
return new TailableCursorRequestBuilder();
}
/**
* Obtain a shiny new {@link TailableCursorRequestBuilder} and start defining options in this fancy fluent way. Just
* don't forget to call {@link TailableCursorRequestBuilder#build() build()} when your're done.
*
* @return new instance of {@link TailableCursorRequestBuilder}.
*/
public static <T> TailableCursorRequestBuilder<T> builder(MessageListener<Document, ? super T> listener) {
TailableCursorRequestBuilder<T> builder = new TailableCursorRequestBuilder<>();
return builder.publishTo(listener);
}
/**
* {@link SubscriptionRequest.RequestOptions} implementation specific to a {@link TailableCursorRequest}.
*
@ -93,7 +129,7 @@ public class TailableCursorRequest<T> implements SubscriptionRequest<Document, T @@ -93,7 +129,7 @@ public class TailableCursorRequest<T> implements SubscriptionRequest<Document, T
TailableCursorRequestOptions() {}
static TailableCursorRequestOptions of(RequestOptions options) {
public static TailableCursorRequestOptions of(RequestOptions options) {
return builder().collection(options.getCollectionName()).build();
}
@ -101,7 +137,7 @@ public class TailableCursorRequest<T> implements SubscriptionRequest<Document, T @@ -101,7 +137,7 @@ public class TailableCursorRequest<T> implements SubscriptionRequest<Document, T
* Obtain a shiny new {@link TailableCursorRequestOptionsBuilder} and start defining options in this fancy fluent
* way. Just don't forget to call {@link TailableCursorRequestOptionsBuilder#build() build()} when your're done.
*
* @return new instance of {@link ChangeStreamRequestOptionsBuilder}.
* @return new instance of {@link TailableCursorRequestOptionsBuilder}.
*/
public static TailableCursorRequestOptionsBuilder builder() {
return new TailableCursorRequestOptionsBuilder();
@ -124,10 +160,13 @@ public class TailableCursorRequest<T> implements SubscriptionRequest<Document, T @@ -124,10 +160,13 @@ public class TailableCursorRequest<T> implements SubscriptionRequest<Document, T
*/
public static class TailableCursorRequestOptionsBuilder {
TailableCursorRequestOptions options = new TailableCursorRequestOptions();
private @Nullable String collectionName;
private @Nullable Query query;
private TailableCursorRequestOptionsBuilder() {}
/**
* Set the collection name to listen to.
* Set the collection name to tail.
*
* @param collection must not be {@literal null} nor {@literal empty}.
* @return this.
@ -136,7 +175,7 @@ public class TailableCursorRequest<T> implements SubscriptionRequest<Document, T @@ -136,7 +175,7 @@ public class TailableCursorRequest<T> implements SubscriptionRequest<Document, T
Assert.hasText(collection, "Collection must not be null nor empty!");
options.collectionName = collection;
this.collectionName = collection;
return this;
}
@ -150,17 +189,89 @@ public class TailableCursorRequest<T> implements SubscriptionRequest<Document, T @@ -150,17 +189,89 @@ public class TailableCursorRequest<T> implements SubscriptionRequest<Document, T
Assert.notNull(filter, "Filter must not be null!");
options.query = filter;
this.query = filter;
return this;
}
/**
* @return the built {@link TailableCursorRequestOptions}.
*/
public TailableCursorRequestOptions build() {
TailableCursorRequestOptions tmp = options;
options = new TailableCursorRequestOptions();
return tmp;
TailableCursorRequestOptions options = new TailableCursorRequestOptions();
options.collectionName = collectionName;
options.query = query;
return options;
}
}
}
/**
* Builder for creating {@link TailableCursorRequest}.
*
* @author Mark Paluch
* @since 2.1
* @see TailableCursorRequestOptions
*/
public static class TailableCursorRequestBuilder<T> {
private @Nullable MessageListener<Document, ? super T> listener;
private TailableCursorRequestOptionsBuilder delegate = TailableCursorRequestOptions.builder();
private TailableCursorRequestBuilder() {}
/**
* Set the name of the {@link com.mongodb.client.MongoCollection} to listen to.
*
* @param collectionName must not be {@literal null} nor empty.
* @return this.
*/
public TailableCursorRequestBuilder<T> collection(String collectionName) {
Assert.hasText(collectionName, "CollectionName must not be null!");
delegate.collection(collectionName);
return this;
}
/**
* Set the {@link MessageListener} event {@link Message messages} will be published to.
*
* @param messageListener must not be {@literal null}.
* @return this.
*/
public TailableCursorRequestBuilder<T> publishTo(MessageListener<Document, ? super T> messageListener) {
Assert.notNull(messageListener, "MessageListener must not be null!");
this.listener = messageListener;
return this;
}
/**
* Set the filter to apply.
*
* @param filter the {@link Query } to apply for filtering events. Must not be {@literal null}.
* @return this.
*/
public TailableCursorRequestBuilder<T> filter(Query filter) {
Assert.notNull(filter, "Filter must not be null!");
delegate.filter(filter);
return this;
}
/**
* @return the build {@link ChangeStreamRequest}.
*/
public TailableCursorRequest<T> build() {
Assert.notNull(listener, "MessageListener must not be null!");
return new TailableCursorRequest<>(listener, delegate.build());
}
}
}

80
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/TailableCursorTask.java

@ -0,0 +1,80 @@ @@ -0,0 +1,80 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.messaging;
import org.bson.Document;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.convert.QueryMapper;
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions;
import org.springframework.data.mongodb.core.messaging.TailableCursorRequest.TailableCursorRequestOptions;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.util.ErrorHandler;
import com.mongodb.CursorType;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Collation;
/**
* @author Christoph Strobl
* @since 2.1
*/
class TailableCursorTask extends CursorReadingTask<Document, Object> {
private QueryMapper queryMapper;
@SuppressWarnings({ "unchecked", "rawtypes" })
public TailableCursorTask(MongoTemplate template, TailableCursorRequest<?> request, Class<?> targetType,
ErrorHandler errorHandler) {
super(template, (TailableCursorRequest) request, (Class) targetType, errorHandler);
queryMapper = new QueryMapper(template.getConverter());
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.core.messaging.CursorReadingTask#initCursor(org.springframework.data.mongodb.core.MongoTemplate, org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions, java.lang.Class)
*/
@Override
protected MongoCursor<Document> initCursor(MongoTemplate template, RequestOptions options, Class<?> targetType) {
Document filter = new Document();
Collation collation = null;
if (options instanceof TailableCursorRequest.TailableCursorRequestOptions) {
TailableCursorRequestOptions requestOptions = (TailableCursorRequestOptions) options;
if (requestOptions.getQuery().isPresent()) {
Query query = requestOptions.getQuery().get();
filter.putAll(queryMapper.getMappedObject(query.getQueryObject(), template.getConverter().getMappingContext()
.getPersistentEntity(targetType.equals(Document.class) ? Object.class : targetType)));
collation = query.getCollation().map(org.springframework.data.mongodb.core.query.Collation::toMongoCollation)
.orElse(null);
}
}
FindIterable<Document> iterable = template.getCollection(options.getCollectionName()).find(filter)
.cursorType(CursorType.TailableAwait).noCursorTimeout(true);
if (collation != null) {
iterable = iterable.collation(collation);
}
return iterable.iterator();
}
}

11
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/Task.java

@ -15,6 +15,8 @@ @@ -15,6 +15,8 @@
*/
package org.springframework.data.mongodb.core.messaging;
import java.time.Duration;
import org.springframework.scheduling.SchedulingAwareRunnable;
/**
@ -39,6 +41,15 @@ public interface Task extends SchedulingAwareRunnable, Cancelable { @@ -39,6 +41,15 @@ public interface Task extends SchedulingAwareRunnable, Cancelable {
*/
State getState();
/**
* Synchronous, <strong>blocking</strong> call that awaits until this {@link Task} becomes active.
*
* @param timeout must not be {@literal null}.
* @return {@code true} if the task was started. {@code false} if the waiting time elapsed before task was started.
* @throws InterruptedException if the current thread is interrupted while waiting.
*/
boolean awaitStart(Duration timeout) throws InterruptedException;
/**
* The {@link Task.State} defining the lifecycle phase the actual {@link Task}.
*

464
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/TaskFactory.java

@ -15,45 +15,11 @@ @@ -15,45 +15,11 @@
*/
package org.springframework.data.mongodb.core.messaging;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import org.bson.BsonDocument;
import org.bson.Document;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.data.mongodb.core.ChangeStreamEvent;
import org.springframework.data.mongodb.core.ChangeStreamOptions;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.PrefixingDelegatingAggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.TypeBasedAggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.convert.QueryMapper;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest.ChangeStreamRequestOptions;
import org.springframework.data.mongodb.core.messaging.Message.MessageProperties;
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions;
import org.springframework.data.mongodb.core.messaging.TailableCursorRequest.TailableCursorRequestOptions;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ErrorHandler;
import com.mongodb.CursorType;
import com.mongodb.MongoNamespace;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Collation;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
/**
* A simple factory for creating {@link Task} for a given {@link SubscriptionRequest}.
*
@ -83,7 +49,8 @@ class TaskFactory { @@ -83,7 +49,8 @@ class TaskFactory {
* @return must not be {@literal null}. Consider {@code Object.class}.
* @throws IllegalArgumentException in case the {@link SubscriptionRequest} is unknown.
*/
Task forRequest(SubscriptionRequest<?, ?, ?> request, Class<?> targetType, ErrorHandler errorHandler) {
<S, T> Task forRequest(SubscriptionRequest<S, ? super T, ? extends RequestOptions> request, Class<T> targetType,
ErrorHandler errorHandler) {
Assert.notNull(request, "Request must not be null!");
Assert.notNull(targetType, "TargetType must not be null!");
@ -97,431 +64,4 @@ class TaskFactory { @@ -97,431 +64,4 @@ class TaskFactory {
throw new IllegalArgumentException(
"oh wow - seems you're using some fancy new feature we do not support. Please be so kind and leave us a note in the issue tracker so we can get this fixed.\nThank you!");
}
/**
* @author Christoph Strobl
* @since 2.1
*/
abstract static class CursorReadingTask<T> implements Task {
private final Object lifecycleMonitor = new Object();
private final SubscriptionRequest request;
private final MongoTemplate template;
private final Class<?> targetType;
private State state = State.CREATED;
private MongoCursor<T> cursor;
private final ErrorHandler errorHandler;
/**
* @param template must not be {@literal null}.
* @param request must not be {@literal null}.
* @param targetType must not be {@literal null}.
*/
public CursorReadingTask(MongoTemplate template, SubscriptionRequest request, Class<?> targetType,
ErrorHandler errorHandler) {
this.template = template;
this.request = request;
this.targetType = targetType;
this.errorHandler = errorHandler;
}
/*
* (non-Javadoc)
* @see java.lang.Runnable
*/
@Override
public void run() {
start();
while (isRunning()) {
try {
T next = getNext();
if (next != null) {
emitMessage(createMessage(next, targetType, request.getRequestOptions()));
} else {
Thread.sleep(10);
}
} catch (InterruptedException e) {
synchronized (lifecycleMonitor) {
state = State.CANCELLED;
}
Thread.interrupted();
} catch (Exception e) {
Exception toHandle = e;
if (e instanceof RuntimeException) {
Exception translated = template.getExceptionTranslator().translateExceptionIfPossible((RuntimeException) e);
toHandle = translated != null ? translated : e;
}
errorHandler.handleError(toHandle);
}
}
}
/**
* Initialize the Task by 1st setting the current state to {@link Task.State#STARTING starting} indicating the
* initialization procedure. <br />
* Moving on the underlying {@link MongoCursor} gets {@link #initCursor(MongoTemplate, RequestOptions) created} and
* is {@link #isValidCursor(MongoCursor) health checked}. Once a valid {@link MongoCursor} is created the
* {@link #state} is set to {@link Task.State#RUNNING running}. If the health check is not passed the
* {@link MongoCursor} is immediately {@link MongoCursor#close() closed} and a new {@link MongoCursor} is requested
* until a valid one is retrieved or the {@link #state} changes.
*/
private void start() {
synchronized (lifecycleMonitor) {
if (!State.RUNNING.equals(state)) {
state = State.STARTING;
}
}
do {
boolean valid = false;
synchronized (lifecycleMonitor) {
if (State.STARTING.equals(state)) {
MongoCursor<T> tmp = initCursor(template, request.getRequestOptions(), targetType);
valid = isValidCursor(tmp);
if (valid) {
cursor = tmp;
state = State.RUNNING;
} else {
tmp.close();
}
}
}
if (!valid) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
synchronized (lifecycleMonitor) {
state = State.CANCELLED;
}
Thread.interrupted();
}
}
} while (State.STARTING.equals(getState()));
}
protected abstract MongoCursor<T> initCursor(MongoTemplate template, RequestOptions options, Class<?> targetType);
@Override
public void cancel() throws DataAccessResourceFailureException {
synchronized (lifecycleMonitor) {
if (State.RUNNING.equals(state) || State.STARTING.equals(state)) {
this.state = State.CANCELLED;
if (cursor != null) {
cursor.close();
}
}
}
}
@Override
public boolean isLongLived() {
return true;
}
@Override
public State getState() {
synchronized (lifecycleMonitor) {
return state;
}
}
protected Message createMessage(T source, Class targetType, RequestOptions options) {
return new LazyMappingDelegatingMessage(new SimpleMessage(source, source, MessageProperties.builder()
.databaseName(template.getDb().getName()).collectionName(options.getCollectionName()).build()), targetType,
template.getConverter());
}
private boolean isRunning() {
return State.RUNNING.equals(getState());
}
private void emitMessage(Message message) {
request.getMessageListener().onMessage(message);
}
private T getNext() {
synchronized (lifecycleMonitor) {
if (State.RUNNING.equals(state)) {
return cursor.tryNext();
}
}
throw new IllegalStateException(String.format("Cursor %s is not longer open.", cursor));
}
private boolean isValidCursor(MongoCursor<?> cursor) {
if (cursor == null) {
return false;
}
if (cursor.getServerCursor() == null || cursor.getServerCursor().getId() == 0) {
return false;
}
return true;
}
}
/**
* {@link Task} implementation for obtaining {@link ChangeStreamDocument ChangeStreamDocuments} from MongoDB.
*
* @author Christoph Strobl
* @since 2.1
*/
static class ChangeStreamTask extends CursorReadingTask<ChangeStreamDocument<Document>> {
private final Set<String> blacklist = new HashSet<>(
Arrays.asList("operationType", "fullDocument", "documentKey", "updateDescription", "ns"));
private final QueryMapper queryMapper;
private final MongoConverter mongoConverter;
ChangeStreamTask(MongoTemplate template, ChangeStreamRequest request, Class<?> targetType,
ErrorHandler errorHandler) {
super(template, request, targetType, errorHandler);
queryMapper = new QueryMapper(template.getConverter());
mongoConverter = template.getConverter();
}
@Override
protected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate template, RequestOptions options,
Class<?> targetType) {
List<Document> filter = Collections.emptyList();
BsonDocument resumeToken = new BsonDocument();
Collation collation = null;
FullDocument fullDocument = FullDocument.DEFAULT;
if (options instanceof ChangeStreamRequestOptions) {
ChangeStreamOptions changeStreamOptions = ((ChangeStreamRequestOptions) options).getChangeStreamOptions();
filter = prepareFilter(template, changeStreamOptions);
if (changeStreamOptions.getFilter().isPresent()) {
Object val = changeStreamOptions.getFilter().get();
if (val instanceof Aggregation) {
collation = ((Aggregation) val).getOptions().getCollation()
.map(org.springframework.data.mongodb.core.query.Collation::toMongoCollation).orElse(null);
}
}
if (changeStreamOptions.getResumeToken().isPresent()) {
resumeToken = changeStreamOptions.getResumeToken().get().asDocument();
}
fullDocument = changeStreamOptions.getFullDocumentLookup()
.orElseGet(() -> ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT
: FullDocument.UPDATE_LOOKUP);
}
ChangeStreamIterable<Document> iterable = filter.isEmpty()
? template.getCollection(options.getCollectionName()).watch(Document.class)
: template.getCollection(options.getCollectionName()).watch(filter, Document.class);
if (!resumeToken.isEmpty()) {
iterable = iterable.resumeAfter(resumeToken);
}
if (collation != null) {
iterable = iterable.collation(collation);
}
iterable = iterable.fullDocument(fullDocument);
return iterable.iterator();
}
List<Document> prepareFilter(MongoTemplate template, ChangeStreamOptions options) {
if (!options.getFilter().isPresent()) {
return Collections.emptyList();
}
Object filter = options.getFilter().get();
if (filter instanceof Aggregation) {
Aggregation agg = (Aggregation) filter;
AggregationOperationContext context = agg instanceof TypedAggregation
? new TypeBasedAggregationOperationContext(((TypedAggregation) agg).getInputType(),
template.getConverter().getMappingContext(), queryMapper)
: Aggregation.DEFAULT_CONTEXT;
return agg.toPipeline(new PrefixingDelegatingAggregationOperationContext(context, "fullDocument", blacklist));
} else if (filter instanceof List) {
return (List<Document>) filter;
} else {
throw new IllegalArgumentException(
"ChangeStreamRequestOptions.filter mut be either an Aggregation or a plain list of Documents");
}
}
@Override
protected Message createMessage(ChangeStreamDocument<Document> source, Class targetType, RequestOptions options) {
// namespace might be null for eg. OperationType.INVALIDATE
MongoNamespace namespace = Optional.ofNullable(source.getNamespace())
.orElse(new MongoNamespace("unknown", options.getCollectionName()));
return new ChangeStreamEventMessage(new ChangeStreamEvent(source, targetType, mongoConverter), MessageProperties
.builder().databaseName(namespace.getDatabaseName()).collectionName(namespace.getCollectionName()).build());
}
/**
* {@link Message} implementation for ChangeStreams
*
* @since 2.1
*/
static class ChangeStreamEventMessage<T> implements Message<ChangeStreamDocument<Document>, T> {
private final ChangeStreamEvent<T> delegate;
private final MessageProperties messageProperties;
public ChangeStreamEventMessage(ChangeStreamEvent<T> event, MessageProperties messageProperties) {
this.delegate = event;
this.messageProperties = messageProperties;
}
@Nullable
@Override
public ChangeStreamDocument<Document> getRaw() {
return delegate.getRaw();
}
@Nullable
@Override
public T getBody() {
return delegate.getBody();
}
@Override
public MessageProperties getProperties() {
return this.messageProperties;
}
}
}
/**
* @author Christoph Strobl
* @since 2.1
*/
static class TailableCursorTask extends CursorReadingTask<Document> {
private QueryMapper queryMapper;
public TailableCursorTask(MongoTemplate template, TailableCursorRequest request, Class<?> targetType,
ErrorHandler errorHandler) {
super(template, request, targetType, errorHandler);
queryMapper = new QueryMapper(template.getConverter());
}
@Override
protected MongoCursor<Document> initCursor(MongoTemplate template, RequestOptions options, Class<?> targetType) {
Document filter = new Document();
Collation collation = null;
if (options instanceof TailableCursorRequestOptions) {
TailableCursorRequestOptions requestOptions = (TailableCursorRequestOptions) options;
if (requestOptions.getQuery().isPresent()) {
Query query = requestOptions.getQuery().get();
filter.putAll(queryMapper.getMappedObject(query.getQueryObject(), template.getConverter().getMappingContext()
.getPersistentEntity(targetType.equals(Document.class) ? Object.class : targetType)));
collation = query.getCollation().map(org.springframework.data.mongodb.core.query.Collation::toMongoCollation)
.orElse(null);
}
}
FindIterable<Document> iterable = template.getCollection(options.getCollectionName()).find(filter)
.cursorType(CursorType.TailableAwait).noCursorTimeout(true);
if (collation != null) {
iterable = iterable.collation(collation);
}
return iterable.iterator();
}
}
static class LazyMappingDelegatingMessage<S, T> implements Message<S, T> {
private final Message<S, ?> delegate;
private final Class<T> targetType;
private final MongoConverter converter;
public LazyMappingDelegatingMessage(Message<S, ?> delegate, Class<T> targetType, MongoConverter converter) {
this.delegate = delegate;
this.targetType = targetType;
this.converter = converter;
}
@Nullable
@Override
public S getRaw() {
return delegate.getRaw();
}
@Override
public T getBody() {
if (delegate.getBody() == null || targetType.equals(delegate.getBody().getClass())) {
return targetType.cast(delegate.getBody());
}
Object messageBody = delegate.getBody();
if (ClassUtils.isAssignable(Document.class, messageBody.getClass())) {
return converter.read(targetType, (Document) messageBody);
}
if (converter.getConversionService().canConvert(messageBody.getClass(), targetType)) {
return converter.getConversionService().convert(messageBody, targetType);
}
throw new IllegalArgumentException(
String.format("No converter found capable of converting %s to %s", messageBody.getClass(), targetType));
}
@Override
public MessageProperties getProperties() {
return delegate.getProperties();
}
@Override
public String toString() {
return "LazyMappingDelegatingMessage {" + "delegate=" + delegate + ", targetType=" + targetType + '}';
}
}
}

26
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTests.java

@ -24,12 +24,17 @@ import static org.springframework.data.mongodb.core.query.Query.*; @@ -24,12 +24,17 @@ import static org.springframework.data.mongodb.core.query.Query.*;
import lombok.Data;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.bson.BsonDocument;
import org.bson.Document;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TestRule;
@ -53,11 +58,13 @@ import com.mongodb.client.model.changestream.FullDocument; @@ -53,11 +58,13 @@ import com.mongodb.client.model.changestream.FullDocument;
* {@link DefaultMessageListenerContainer} using {@link ChangeStreamRequest}.
*
* @author Christoph Strobl
* @author Mark Paluch
*/
public class ChangeStreamTests {
public static @ClassRule TestRule replSet = ReplicaSet.required();
static ThreadPoolExecutor executor;
MongoTemplate template;
MessageListenerContainer container;
@ -65,13 +72,18 @@ public class ChangeStreamTests { @@ -65,13 +72,18 @@ public class ChangeStreamTests {
User huffyFluffy;
User sugarSplashy;
@BeforeClass
public static void beforeClass() {
executor = new ThreadPoolExecutor(2, 2, 1, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
}
@Before
public void setUp() {
template = new MongoTemplate(new MongoClient(), "change-stream-tests");
template.dropCollection(User.class);
container = new DefaultMessageListenerContainer(template);
container = new DefaultMessageListenerContainer(template, executor);
container.start();
jellyBelly = new User();
@ -95,6 +107,11 @@ public class ChangeStreamTests { @@ -95,6 +107,11 @@ public class ChangeStreamTests {
container.stop();
}
@AfterClass
public static void afterClass() {
executor.shutdown();
}
@Test // DATAMONGO-1803
public void readsPlainDocumentMessageCorrectly() throws InterruptedException {
@ -121,9 +138,8 @@ public class ChangeStreamTests { @@ -121,9 +138,8 @@ public class ChangeStreamTests {
public void useSimpleAggregationToFilterMessages() throws InterruptedException {
CollectingMessageListener<ChangeStreamDocument<Document>, User> messageListener = new CollectingMessageListener<>();
ChangeStreamRequest<User> request = ChangeStreamRequest.builder() //
ChangeStreamRequest<User> request = ChangeStreamRequest.builder(messageListener) //
.collection("user") //
.publishTo(messageListener) //
.filter(newAggregation(match(where("age").is(7)))) //
.build();
@ -146,9 +162,8 @@ public class ChangeStreamTests { @@ -146,9 +162,8 @@ public class ChangeStreamTests {
public void useAggregationToFilterMessages() throws InterruptedException {
CollectingMessageListener<ChangeStreamDocument<Document>, User> messageListener = new CollectingMessageListener<>();
ChangeStreamRequest<User> request = ChangeStreamRequest.builder() //
ChangeStreamRequest<User> request = ChangeStreamRequest.builder(messageListener) //
.collection("user") //
.publishTo(messageListener) //
.filter(newAggregation(match(
new Criteria().orOperator(where("user_name").is("huffyFluffy"), where("user_name").is("jellyBelly"))))) //
.build();
@ -391,5 +406,4 @@ public class ChangeStreamTests { @@ -391,5 +406,4 @@ public class ChangeStreamTests {
@Field("user_name") String userName;
int age;
}
}

4
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/CursorReadingTaskUnitTests.java

@ -32,7 +32,6 @@ import org.mockito.junit.MockitoJUnitRunner; @@ -32,7 +32,6 @@ import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions;
import org.springframework.data.mongodb.core.messaging.Task.State;
import org.springframework.data.mongodb.core.messaging.TaskFactory.CursorReadingTask;
import org.springframework.util.ErrorHandler;
import com.mongodb.ServerAddress;
@ -198,7 +197,7 @@ public class CursorReadingTaskUnitTests { @@ -198,7 +197,7 @@ public class CursorReadingTaskUnitTests {
static class ValueCapturingTaskStub extends CursorReadingTask {
final MongoCursor cursor;
final List<Object> values = new CopyOnWriteArrayList();
final List<Object> values = new CopyOnWriteArrayList<>();
public ValueCapturingTaskStub(MongoTemplate template, SubscriptionRequest request, Class<?> targetType,
MongoCursor cursor, ErrorHandler errorHandler) {
@ -223,5 +222,4 @@ public class CursorReadingTaskUnitTests { @@ -223,5 +222,4 @@ public class CursorReadingTaskUnitTests {
return values;
}
}
}

2
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainerTests.java

@ -71,7 +71,7 @@ public class DefaultMessageListenerContainerTests { @@ -71,7 +71,7 @@ public class DefaultMessageListenerContainerTests {
template.dropCollection(COLLECTION_NAME);
collection = template.getCollection(COLLECTION_NAME);
messageListener = new CollectingMessageListener();
messageListener = new CollectingMessageListener<>();
}
@Test // DATAMONGO-1803

11
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainerUnitTests.java

@ -20,6 +20,8 @@ import static org.assertj.core.api.Assertions.*; @@ -20,6 +20,8 @@ import static org.assertj.core.api.Assertions.*;
import edu.umd.cs.mtc.MultithreadedTestCase;
import java.time.Duration;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -268,6 +270,15 @@ public class DefaultMessageListenerContainerUnitTests { @@ -268,6 +270,15 @@ public class DefaultMessageListenerContainerUnitTests {
this.error = error;
}
@Override
public boolean awaitStart(Duration timeout) throws InterruptedException {
while (getState() == State.STARTING) {
Thread.sleep(10);
}
return true;
}
}
static class MockSubscriptionRequest implements SubscriptionRequest {

45
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/TailableCursorRequestUnitTests.java

@ -0,0 +1,45 @@ @@ -0,0 +1,45 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.messaging;
import static org.assertj.core.api.Assertions.*;
import static org.springframework.data.mongodb.core.query.Criteria.*;
import org.bson.Document;
import org.junit.Test;
import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainerTests.Person;
import org.springframework.data.mongodb.core.query.Query;
/**
* Unit tests for {@link TailableCursorRequest}.
*
* @author Mark Paluch
*/
public class TailableCursorRequestUnitTests {
@Test // DATAMONGO-1803
public void shouldBuildRequest() {
MessageListener<Document, Person> listener = System.out::println;
TailableCursorRequest<Person> request = TailableCursorRequest.builder(listener).collection("foo")
.filter(Query.query(where("firstname").is("bar"))).build();
assertThat(request.getRequestOptions().getCollectionName()).isEqualTo("foo");
assertThat(request.getRequestOptions().getQuery()).isPresent();
assertThat(request.getMessageListener()).isEqualTo(listener);
}
}

21
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/TailableCursorTests.java

@ -22,9 +22,15 @@ import static org.springframework.data.mongodb.test.util.Assertions.*; @@ -22,9 +22,15 @@ import static org.springframework.data.mongodb.test.util.Assertions.*;
import lombok.Data;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.bson.Document;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.CollectionOptions;
@ -40,11 +46,13 @@ import com.mongodb.MongoClient; @@ -40,11 +46,13 @@ import com.mongodb.MongoClient;
* {@link DefaultMessageListenerContainer} using {@link TailableCursorRequest}.
*
* @author Christoph Strobl
* @author Mark Paluch
*/
public class TailableCursorTests {
static final String COLLECTION_NAME = "user";
static ThreadPoolExecutor executor;
MongoTemplate template;
MessageListenerContainer container;
@ -52,6 +60,11 @@ public class TailableCursorTests { @@ -52,6 +60,11 @@ public class TailableCursorTests {
User huffyFluffy;
User sugarSplashy;
@BeforeClass
public static void beforeClass() {
executor = new ThreadPoolExecutor(2, 2, 1, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
}
@Before
public void setUp() {
@ -60,7 +73,7 @@ public class TailableCursorTests { @@ -60,7 +73,7 @@ public class TailableCursorTests {
template.dropCollection(User.class);
template.createCollection(User.class, CollectionOptions.empty().capped().maxDocuments(10000).size(10000));
container = new DefaultMessageListenerContainer(template);
container = new DefaultMessageListenerContainer(template, executor);
container.start();
jellyBelly = new User();
@ -84,6 +97,11 @@ public class TailableCursorTests { @@ -84,6 +97,11 @@ public class TailableCursorTests {
container.stop();
}
@AfterClass
public static void afterClass() {
executor.shutdown();
}
@Test // DATAMONGO-1803
public void readsDocumentMessageCorrectly() throws InterruptedException {
@ -191,5 +209,4 @@ public class TailableCursorTests { @@ -191,5 +209,4 @@ public class TailableCursorTests {
@Field("user_name") String userName;
int age;
}
}

2
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/TaskFactoryUnitTests.java

@ -28,8 +28,6 @@ import org.springframework.data.mongodb.core.MongoTemplate; @@ -28,8 +28,6 @@ import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest.ChangeStreamRequestOptions;
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions;
import org.springframework.data.mongodb.core.messaging.TaskFactory.ChangeStreamTask;
import org.springframework.data.mongodb.core.messaging.TaskFactory.TailableCursorTask;
import org.springframework.util.ErrorHandler;
/**

3
src/main/asciidoc/new-features.adoc

@ -7,7 +7,8 @@ @@ -7,7 +7,8 @@
* <<mongo-template.query.distinct,Distinct queries>> for imperative and reactive Template API.
* <<mongo.mongo-3.validation,`validator` support for collections>>.
* <<mongo.jsonSchema,`$jsonSchema` support>> for queries and collection creation.
* <<changes-streams, Change Stream support>> for imperative and reactive drivers.
* <<change-streams, Change Stream support>> for imperative and reactive drivers.
* Tailable cursors for imperative driver.
[[new-features.2-0-0]]
== What's new in Spring Data MongoDB 2.0

20
src/main/asciidoc/reference/mongodb.adoc

@ -2939,16 +2939,16 @@ class GridFsClient { @@ -2939,16 +2939,16 @@ class GridFsClient {
`GridFsOperations` extending `ResourcePatternResolver` allows the `GridFsTemplate` e.g. to be plugged into an `ApplicationContext` to read Spring Config files from a MongoDB.
[[changes-streams]]
[[change-streams]]
== Change Streams
As of MongoDB 3.6, https://docs.mongodb.com/manual/changeStreams/[Change Streams] allow application to get notified about changes without having to tailing the oplog.
As of MongoDB 3.6, https://docs.mongodb.com/manual/changeStreams/[Change Streams] allow application to get notified about changes without having to tail the oplog.
NOTE: Change Stream support is only possible for replica sets or a sharded cluster.
Change Streams can be subscribed to with both the imperative and the reactive MongoDB java driver. It is highly recommended to use the reactive variant as it is less resource intensive. However if you do not feel comfortable using the reactive API for whatever reason, you can sill obtain the change events via a Messaging concept already common in the Spring ecosystem.
Change Streams can be subscribed to with both the imperative and the reactive MongoDB Java driver. It is highly recommended to use the reactive variant as it is less resource intensive. However if you do not feel comfortable using the reactive API for whatever reason, you can sill obtain the change events via a Messaging concept already common in the Spring ecosystem.
=== Change Streams - Sync
=== Change Streams using MessageListener
Listening to a https://docs.mongodb.com/manual/tutorial/change-streams-example/[Change Stream using a Sync Driver] is a long running, blocking task that needs to be delegated to a separate component.
In this case we need to create a `MessageListenerContainer` first which will be the main entry point for running the specific ``SubscriptionRequest``s.
@ -2959,9 +2959,9 @@ Spring Data MongoDB already ships with a default implementation that operates up @@ -2959,9 +2959,9 @@ Spring Data MongoDB already ships with a default implementation that operates up
[source,java]
----
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start(); <1>
container.start(); <1>
MessageListener<Message<ChangeStreamDocument<Document>, User>> listener = System.out::println; <2>
MessageListener<ChangeStreamDocument<Document>, User> listener = System.out::println; <2>
ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("user", ChangeStreamOptions.empty()); <3>
Subscription subscription = container.register(new ChangeStreamRequest<>(listener, options), User.class); <4>
@ -2970,8 +2970,8 @@ Subscription subscription = container.register(new ChangeStreamRequest<>(listene @@ -2970,8 +2970,8 @@ Subscription subscription = container.register(new ChangeStreamRequest<>(listene
container.stop(); <5>
----
<1> Starting the container intializes the resources and starts the ``Task``s for allready registered ``SubscriptionRequest``s. Requests added after the statup are run immeadeately.
<2> Define the listener called when a `Message` is reveived. The `Message#getBody()` is be converted to the domain type registered. Use `Document.class` to just get the raw results.
<1> Starting the container intializes the resources and starts the ``Task``s for already registered ``SubscriptionRequest``s. Requests added after the statup are run immediately.
<2> Define the listener called when a `Message` is received. The `Message#getBody()` is converted to the requested domain type. Use `Document` to receive raw results without conversion.
<3> Set the collection to listen to and provide additional options via `ChangeStreamOptions`.
<4> Register the request. The returned `Subscription` can be used to check the current `Task` state and cancel the execution to free resources.
<5> Do not forget to stop the container once you're sure you won't need it any more. This will stop all running ``Task``s within the container.
@ -2989,5 +2989,5 @@ Aggregation filter = newAggregation(User.class, match(where("age").gte(38)); <1> @@ -2989,5 +2989,5 @@ Aggregation filter = newAggregation(User.class, match(where("age").gte(38)); <1>
Flux<ChangeStreamEvent<User>> flux = reactiveTemplate.changeStream(filter), User.class, ChangeStreamOptions.empty()); <2>
----
<1> Use an aggregation pipeline to filter events.
<2> Obtain the flux emitting change stream events. The `ChangeStreamEvent#getBody()` is be converted to the domain type registered. Use `Document.class` to just get the raw results.
====
<2> Obtain a `Flux` of change stream events. The `ChangeStreamEvent#getBody()` is converted to requested domain type. Use `Document` to receive raw results without conversion.
====

Loading…
Cancel
Save