diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperation.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperation.java index 521cd9569..89ca7f117 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperation.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperation.java @@ -28,7 +28,7 @@ import org.springframework.data.mongodb.core.query.CriteriaDefinition; /** * {@link ReactiveChangeStreamOperation} allows creation and execution of reactive MongoDB - * Change Stream operations in a fluent API * style.
+ * Change Stream operations in a fluent API style.
* The starting {@literal domainType} is used for mapping a potentially given * {@link org.springframework.data.mongodb.core.aggregation.TypedAggregation} used for filtering. By default, the * originating {@literal domainType} is also used for mapping back the result from the {@link org.bson.Document}. @@ -38,15 +38,14 @@ import org.springframework.data.mongodb.core.query.CriteriaDefinition; * *
  *     
- *         changeStream(Human.class)
+ *         changeStream(Jedi.class)
  *             .watchCollection("star-wars")
  *             .filter(where("operationType").is("insert"))
- *             .as(Jedi.class)
  *             .resumeAt(Instant.now())
  *             .listen();
  *     
  * 
- * + * * @author Christoph Strobl * @since 2.2 */ @@ -88,10 +87,20 @@ public interface ReactiveChangeStreamOperation { * Skip this step to watch all collections within the database. * * @param collection must not be {@literal null} nor {@literal empty}. - * @return new instance of {@link ChangeStreamWithCollection}. - * @throws IllegalArgumentException if collection is {@literal null}. + * @return new instance of {@link ChangeStreamWithFilterAndProjection}. + * @throws IllegalArgumentException if {@code collection} is {@literal null}. */ ChangeStreamWithFilterAndProjection watchCollection(String collection); + + /** + * Set the the collection to watch. Collection name is derived from the {@link Class entityClass}.
+ * Skip this step to watch all collections within the database. + * + * @param entityClass must not be {@literal null}. + * @return new instance of {@link ChangeStreamWithFilterAndProjection}. + * @throws IllegalArgumentException if {@code entityClass} is {@literal null}. + */ + ChangeStreamWithFilterAndProjection watchCollection(Class entityClass); } /** diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java index 1090f8e41..e042b606d 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java @@ -54,14 +54,14 @@ class ReactiveChangeStreamOperationSupport implements ReactiveChangeStreamOperat public ReactiveChangeStream changeStream(Class domainType) { Assert.notNull(domainType, "DomainType must not be null!"); - return new ReactiveChangeStreamSupport(template, domainType, domainType, null, null); + return new ReactiveChangeStreamSupport<>(template, domainType, domainType, null, null); } static class ReactiveChangeStreamSupport implements ReactiveChangeStream, ChangeStreamWithFilterAndProjection { private final ReactiveMongoTemplate template; - private final @Nullable Class domainType; + private final Class domainType; private final Class returnType; private final @Nullable String collection; private final @Nullable ChangeStreamOptions options; @@ -84,9 +84,22 @@ class ReactiveChangeStreamOperationSupport implements ReactiveChangeStreamOperat public ChangeStreamWithFilterAndProjection watchCollection(String collection) { Assert.hasText(collection, "Collection name must not be null nor empty!"); + return new ReactiveChangeStreamSupport<>(template, domainType, returnType, collection, options); } + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ChangeStreamWithCollection#watchCollection(java.lang.Class) + */ + @Override + public ChangeStreamWithFilterAndProjection watchCollection(Class entityClass) { + + Assert.notNull(entityClass, "Collection type not be null!"); + + return watchCollection(template.getCollectionName(entityClass)); + } + /* * (non-Javadoc) * @see org.springframework.data.mongodb.core.ReactiveChangeStreamOperation.ResumingChangeStream#resumeAt(java.lang.Object) @@ -112,6 +125,7 @@ class ReactiveChangeStreamOperationSupport implements ReactiveChangeStreamOperat public TerminatingChangeStream resumeAfter(Object token) { Assert.isInstanceOf(BsonValue.class, token, "Token must be a BsonValue"); + return withOptions(builder -> builder.resumeAfter((BsonValue) token)); } @@ -123,6 +137,7 @@ class ReactiveChangeStreamOperationSupport implements ReactiveChangeStreamOperat public TerminatingChangeStream startAfter(Object token) { Assert.isInstanceOf(BsonValue.class, token, "Token must be a BsonValue"); + return withOptions(builder -> builder.startAfter((BsonValue) token)); } @@ -147,6 +162,7 @@ class ReactiveChangeStreamOperationSupport implements ReactiveChangeStreamOperat public ChangeStreamWithFilterAndProjection as(Class resultType) { Assert.notNull(resultType, "ResultType must not be null!"); + return new ReactiveChangeStreamSupport<>(template, domainType, resultType, collection, options); } @@ -167,8 +183,7 @@ class ReactiveChangeStreamOperationSupport implements ReactiveChangeStreamOperat public ChangeStreamWithFilterAndProjection filter(CriteriaDefinition by) { MatchOperation $match = Aggregation.match(by); - Aggregation aggregation = domainType != null && !Document.class.equals(domainType) - ? Aggregation.newAggregation(domainType, $match) + Aggregation aggregation = !Document.class.equals(domainType) ? Aggregation.newAggregation(domainType, $match) : Aggregation.newAggregation($match); return filter(aggregation); } @@ -208,8 +223,8 @@ class ReactiveChangeStreamOperationSupport implements ReactiveChangeStreamOperat options.getResumeTimestamp().ifPresent(builder::resumeAt); options.getResumeBsonTimestamp().ifPresent(builder::resumeAt); } - return builder; + return builder; } } } diff --git a/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationExtensions.kt b/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationExtensions.kt index bb89842fd..35a6b97cf 100644 --- a/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationExtensions.kt +++ b/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationExtensions.kt @@ -19,9 +19,8 @@ import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.reactive.flow.asFlow - /** - * Extension for [RactiveChangeStreamOperation. changeStream] leveraging reified type parameters. + * Extension for [RactiveChangeStreamOperation.changeStream] leveraging reified type parameters. * * @author Christoph Strobl * @since 2.2 @@ -30,7 +29,7 @@ inline fun ReactiveChangeStreamOperation.changeStream(): React changeStream(T::class.java) /** - * Extension for [ReactiveChangeStreamOperation.ChangeStreamWithFilterAndProjection. as] leveraging reified type parameters. + * Extension for [ReactiveChangeStreamOperation.ChangeStreamWithFilterAndProjection.as] leveraging reified type parameters. * * @author Christoph Strobl * @since 2.2 @@ -39,7 +38,7 @@ inline fun ReactiveChangeStreamOperation.ChangeStreamWithFilte `as`(T::class.java) /** - * Coroutines [Flow] variant of [ReactiveChangeStreamOperation.TerminatingChangeStream. listen]. + * Coroutines [Flow] variant of [ReactiveChangeStreamOperation.TerminatingChangeStream.listen]. * * Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements * and [org.reactivestreams.Subscription.request] size. diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportTests.java index abaf1141f..37eeab78e 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportTests.java @@ -15,6 +15,7 @@ */ package org.springframework.data.mongodb.core; +import static org.assertj.core.api.Assertions.*; import static org.springframework.data.mongodb.core.query.Criteria.*; import lombok.SneakyThrows; @@ -27,18 +28,20 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.Collectors; -import org.assertj.core.api.Assertions; import org.bson.Document; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; + import org.springframework.data.mongodb.test.util.MongoTestUtils; import org.springframework.data.mongodb.test.util.ReplicaSet; import com.mongodb.reactivestreams.client.MongoClient; /** + * Tests for {@link ReactiveChangeStreamOperation}. + * * @author Christoph Strobl * @currentRead Dawn Cook - The Decoy Princess */ @@ -91,8 +94,8 @@ public class ReactiveChangeStreamOperationSupportTests { Thread.sleep(500); // just give it some time to link receive all events try { - Assertions.assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())).hasSize(3) - .allMatch(val -> val instanceof Document); + assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())).hasSize(3) + .allMatch(Document.class::isInstance); } finally { disposable.dispose(); } @@ -122,7 +125,7 @@ public class ReactiveChangeStreamOperationSupportTests { Thread.sleep(500); // just give it some time to link receive all events try { - Assertions.assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())) + assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())) .containsExactly(person1, person2, person3); } finally { disposable.dispose(); @@ -135,7 +138,7 @@ public class ReactiveChangeStreamOperationSupportTests { BlockingQueue> documents = new LinkedBlockingQueue<>(100); Disposable disposable = template.changeStream(Person.class) // - .watchCollection("person") // + .watchCollection(Person.class) // .filter(where("age").gte(38)) // .listen() // .doOnNext(documents::add).subscribe(); @@ -146,9 +149,8 @@ public class ReactiveChangeStreamOperationSupportTests { Person person2 = new Person("Data", 37); Person person3 = new Person("MongoDB", 39); - Flux.merge(template.save(person1).delayElement(Duration.ofMillis(2)), - template.save(person2).delayElement(Duration.ofMillis(2)), - template.save(person3).delayElement(Duration.ofMillis(2))) // + Flux.merge(template.save(person1), template.save(person2).delayElement(Duration.ofMillis(50)), + template.save(person3).delayElement(Duration.ofMillis(100))) // .as(StepVerifier::create) // .expectNextCount(3) // .verifyComplete(); @@ -156,8 +158,8 @@ public class ReactiveChangeStreamOperationSupportTests { Thread.sleep(500); // just give it some time to link receive all events try { - Assertions.assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())) - .containsExactly(person1, person3); + assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())).containsOnly(person1, + person3); } finally { disposable.dispose(); } diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportUnitTests.java index d42cb7568..0340a1ceb 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupportUnitTests.java @@ -16,7 +16,7 @@ package org.springframework.data.mongodb.core; import static org.assertj.core.api.Assertions.*; -import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.isNull; @@ -36,11 +36,14 @@ import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; + import org.springframework.data.mongodb.core.aggregation.Aggregation; import org.springframework.data.mongodb.core.aggregation.TypedAggregation; import org.springframework.data.mongodb.core.query.Criteria; /** + * Unit tests for {@link ReactiveChangeStreamOperationSupport}. + * * @author Christoph Strobl * @currentRead Dawn Cook - The Decoy Princess */ diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java index 0d8c12345..c9dd95e4b 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java @@ -1390,7 +1390,7 @@ public class ReactiveMongoTemplateTests { Thread.sleep(500); // just give it some time to link receive all events try { - Assertions.assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())).hasSize(3) + assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())).hasSize(3) .allMatch(val -> val instanceof Document); } finally { disposable.dispose(); @@ -1422,7 +1422,7 @@ public class ReactiveMongoTemplateTests { Thread.sleep(500); // just give it some time to link receive all events try { - Assertions.assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())) + assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())) .containsExactly(person1, person2, person3); } finally { disposable.dispose(); @@ -1455,7 +1455,7 @@ public class ReactiveMongoTemplateTests { Thread.sleep(500); // just give it some time to link receive all events try { - Assertions.assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())) + assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())) .containsExactly(person1, person3); } finally { disposable.dispose(); @@ -1499,7 +1499,7 @@ public class ReactiveMongoTemplateTests { Thread.sleep(500); // just give it some time to link receive all events try { - Assertions.assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())) + assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())) .containsExactly(replacement); } finally { disposable.dispose(); @@ -1541,7 +1541,7 @@ public class ReactiveMongoTemplateTests { Thread.sleep(500); // just give it some time to link receive all events try { - Assertions.assertThat(resumeDocuments.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())) + assertThat(resumeDocuments.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())) .containsExactly(person2, person3); } finally { disposable.dispose(); @@ -1563,7 +1563,7 @@ public class ReactiveMongoTemplateTests { template.remove(query(where("field").is("lannister")).limit(25), Sample.class) // .as(StepVerifier::create) // - .assertNext(wr -> Assertions.assertThat(wr.getDeletedCount()).isEqualTo(25L)).verifyComplete(); + .assertNext(wr -> assertThat(wr.getDeletedCount()).isEqualTo(25L)).verifyComplete(); } @Test // DATAMONGO-1870 @@ -1577,7 +1577,7 @@ public class ReactiveMongoTemplateTests { template.remove(new Query().skip(25).with(Sort.by("field")), Sample.class) // .as(StepVerifier::create) // - .assertNext(wr -> Assertions.assertThat(wr.getDeletedCount()).isEqualTo(75L)).verifyComplete(); + .assertNext(wr -> assertThat(wr.getDeletedCount()).isEqualTo(75L)).verifyComplete(); template.count(query(where("field").is("lannister")), Sample.class).as(StepVerifier::create).expectNext(25L) .verifyComplete(); @@ -1651,7 +1651,7 @@ public class ReactiveMongoTemplateTests { Thread.sleep(500); // just give it some time to link receive all events try { - Assertions.assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())) + assertThat(documents.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())) .containsExactly(person1, person2, person3); } finally { disposable.dispose(); @@ -1701,7 +1701,7 @@ public class ReactiveMongoTemplateTests { Thread.sleep(500); // just give it some time to link receive all events try { - Assertions.assertThat(resumeDocuments.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())) + assertThat(resumeDocuments.stream().map(ChangeStreamEvent::getBody).collect(Collectors.toList())) .containsExactly(person2, person3); } finally { disposable.dispose(); diff --git a/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationExtensionsTests.kt b/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationExtensionsTests.kt index f6c67780d..b88d263ca 100644 --- a/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationExtensionsTests.kt +++ b/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationExtensionsTests.kt @@ -34,6 +34,7 @@ import reactor.core.publisher.Flux class ReactiveChangeStreamOperationExtensionsTests { val operation = mockk(relaxed = true) + val changestream = mockk>(relaxed = true) @Test // DATAMONGO-2089 fun `ReactiveChangeStreamOperation#changeStream() with reified type parameter extension should call its Java counterpart`() { @@ -61,4 +62,6 @@ class ReactiveChangeStreamOperationExtensionsTests { spec.listen() } } + + data class Last(val id: String) } diff --git a/src/main/asciidoc/reference/change-streams.adoc b/src/main/asciidoc/reference/change-streams.adoc index e794b388d..6b60ed7f2 100644 --- a/src/main/asciidoc/reference/change-streams.adoc +++ b/src/main/asciidoc/reference/change-streams.adoc @@ -52,7 +52,7 @@ Subscribing to Change Streams with the reactive API is a more natural approach t [source,java] ---- Flux> flux = reactiveTemplate.changeStream(User.class) <1> - .watchCollection("persons") + .watchCollection("people") .filter(where("age").gte(38)) <2> .listen(); <3> ---- @@ -72,9 +72,8 @@ The following example shows how to set the resume offset using server time: ==== [source,java] ---- -Flux> resumed = template.changeStream() - .watchCollection("persons") - .as(User.class) +Flux> resumed = template.changeStream(User.class) + .watchCollection("people") .resumeAt(Instant.now().minusSeconds(1)) <1> .listen(); ----