Browse Source

DATAMONGO-2080 - Support tailable cursors with the fluent reactive API.

We now support queries to return a tailable cursor using the fluent reactive API.

 query(Human.class)
     .inCollection("star-wars")
     .as(Jedi.class)
     .matching(query(where("firstname").is("luke")))
     .tail();

Original Pull Request: #608
pull/610/head
Mark Paluch 7 years ago committed by Christoph Strobl
parent
commit
fe90950880
  1. 15
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveFindOperation.java
  2. 9
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveFindOperationSupport.java
  3. 92
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveFindOperationSupportTests.java

15
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveFindOperation.java

@ -85,6 +85,21 @@ public interface ReactiveFindOperation { @@ -85,6 +85,21 @@ public interface ReactiveFindOperation {
*/
Flux<T> all();
/**
* Get all matching elements using a tailable cursor.
* <p />
* The stream may become dead, or invalid, if either the query returns no match or the cursor returns the document
* at the "end" of the collection and then the application deletes that document.
* <p/>
* A stream that is no longer in use must be {@link reactor.core.Disposable#dispose()} disposed} otherwise the
* streams will linger and exhaust resources. <br/>
* <strong>NOTE:</strong> Requires a capped collection.
*
* @return never {@literal null}.
* @since 2.1
*/
Flux<T> tail();
/**
* Get the number of matching elements.
*

9
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveFindOperationSupport.java

@ -169,6 +169,15 @@ class ReactiveFindOperationSupport implements ReactiveFindOperation { @@ -169,6 +169,15 @@ class ReactiveFindOperationSupport implements ReactiveFindOperation {
return doFind(null);
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.core.ReactiveFindOperation.TerminatingFind#tail()
*/
@Override
public Flux<T> tail() {
return doFind(template.new TailingQueryFindPublisherPreparer(query, domainType));
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.core.ReactiveFindOperation.FindWithQuery#near(org.springframework.data.mongodb.core.query.NearQuery)

92
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveFindOperationSupportTests.java

@ -22,9 +22,14 @@ import static org.springframework.data.mongodb.core.query.Query.*; @@ -22,9 +22,14 @@ import static org.springframework.data.mongodb.core.query.Query.*;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.bson.BsonString;
@ -41,6 +46,7 @@ import org.springframework.data.mongodb.core.index.GeoSpatialIndexType; @@ -41,6 +46,7 @@ import org.springframework.data.mongodb.core.index.GeoSpatialIndexType;
import org.springframework.data.mongodb.core.index.GeospatialIndex;
import org.springframework.data.mongodb.core.mapping.Field;
import org.springframework.data.mongodb.core.query.BasicQuery;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.NearQuery;
import com.mongodb.MongoClient;
@ -67,6 +73,12 @@ public class ReactiveFindOperationSupportTests { @@ -67,6 +73,12 @@ public class ReactiveFindOperationSupportTests {
blocking = new MongoTemplate(new SimpleMongoDbFactory(new MongoClient(), "ExecutableFindOperationSupportTests"));
blocking.dropCollection(STAR_WARS);
insertObjects();
template = new ReactiveMongoTemplate(MongoClients.create(), "ExecutableFindOperationSupportTests");
}
void insertObjects() {
han = new Person();
han.firstname = "han";
han.lastname = "solo";
@ -79,8 +91,6 @@ public class ReactiveFindOperationSupportTests { @@ -79,8 +91,6 @@ public class ReactiveFindOperationSupportTests {
blocking.save(han);
blocking.save(luke);
template = new ReactiveMongoTemplate(MongoClients.create(), "ExecutableFindOperationSupportTests");
}
@Test(expected = IllegalArgumentException.class) // DATAMONGO-1719
@ -291,6 +301,84 @@ public class ReactiveFindOperationSupportTests { @@ -291,6 +301,84 @@ public class ReactiveFindOperationSupportTests {
.verifyComplete();
}
@Test // DATAMONGO-2080
public void tail() throws InterruptedException {
blocking.dropCollection(STAR_WARS);
blocking.createCollection(STAR_WARS, CollectionOptions.empty().capped().size(1024 * 1024));
insertObjects();
BlockingQueue<Person> collector = new LinkedBlockingQueue<>();
Flux<Person> tail = template.query(Person.class)
.matching(query(new Criteria().orOperator(where("firstname").is("chewbacca"), where("firstname").is("luke"))))
.tail().doOnNext(collector::add);
Disposable subscription = tail.subscribe();
assertThat(collector.poll(1, TimeUnit.SECONDS)).isEqualTo(luke);
assertThat(collector).isEmpty();
Person chewbacca = new Person();
chewbacca.firstname = "chewbacca";
chewbacca.lastname = "chewie";
chewbacca.id = "id-3";
blocking.save(chewbacca);
assertThat(collector.poll(10, TimeUnit.SECONDS)).isEqualTo(chewbacca);
subscription.dispose();
}
@Test // DATAMONGO-2080
public void tailWithProjection() {
blocking.dropCollection(STAR_WARS);
blocking.createCollection(STAR_WARS, CollectionOptions.empty().capped().size(1024 * 1024));
insertObjects();
StepVerifier
.create(template.query(Person.class).as(Jedi.class).matching(query(where("firstname").is("luke"))).tail())
.consumeNextWith(it -> assertThat(it).isInstanceOf(Jedi.class)) //
.thenCancel() //
.verify();
}
@Test // DATAMONGO-2080
public void tailWithClosedInterfaceProjection() {
blocking.dropCollection(STAR_WARS);
blocking.createCollection(STAR_WARS, CollectionOptions.empty().capped().size(1024 * 1024));
insertObjects();
StepVerifier.create(
template.query(Person.class).as(PersonProjection.class).matching(query(where("firstname").is("luke"))).all())
.consumeNextWith(it -> {
assertThat(it).isInstanceOf(PersonProjection.class);
assertThat(it.getFirstname()).isEqualTo("luke");
}) //
.thenCancel() //
.verify();
}
@Test // DATAMONGO-2080
public void tailWithOpenInterfaceProjection() {
blocking.dropCollection(STAR_WARS);
blocking.createCollection(STAR_WARS, CollectionOptions.empty().capped().size(1024 * 1024));
insertObjects();
StepVerifier.create(template.query(Person.class).as(PersonSpELProjection.class)
.matching(query(where("firstname").is("luke"))).tail()).consumeNextWith(it -> {
assertThat(it).isInstanceOf(PersonSpELProjection.class);
assertThat(it.getName()).isEqualTo("luke");
}) //
.thenCancel() //
.verify();
}
@Test // DATAMONGO-1719
public void firstShouldReturnFirstEntryInCollection() {
StepVerifier.create(template.query(Person.class).first()).expectNextCount(1).verifyComplete();

Loading…
Cancel
Save