diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperations.java index 45d6709c9..893958988 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperations.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperations.java @@ -216,7 +216,7 @@ class DefaultReactiveBulkOperations extends BulkOperationsSupport implements Rea collection = collection.withWriteConcern(defaultWriteConcern); } - Flux concat = Flux.concat(models).flatMap(it -> { + Flux concat = Flux.concat(models).flatMapSequential(it -> { if (it.model()instanceof InsertOneModel iom) { diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java index a0f4312ca..1e2c834a3 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java @@ -1051,7 +1051,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati return (isOutOrMerge ? Flux.from(cursor.toCollection()) : Flux.from(cursor.first())).thenMany(Mono.empty()); } - return Flux.from(cursor).concatMap(readCallback::doWith); + return Flux.from(cursor).flatMapSequential(readCallback::doWith); } @Override @@ -1098,7 +1098,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati .withOptions(optionsBuilder.build()); return aggregate($geoNear, collection, Document.class) // - .concatMap(callback::doWith); + .flatMapSequential(callback::doWith); } @Override @@ -1324,7 +1324,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati Assert.notNull(batchToSave, "Batch to insert must not be null"); - return Flux.from(batchToSave).flatMap(collection -> insert(collection, collectionName)); + return Flux.from(batchToSave).flatMapSequential(collection -> insert(collection, collectionName)); } @Override @@ -1392,7 +1392,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati @Override public Flux insertAll(Mono> objectsToSave) { - return Flux.from(objectsToSave).flatMap(this::insertAll); + return Flux.from(objectsToSave).flatMapSequential(this::insertAll); } protected Flux doInsertAll(Collection listToSave, MongoWriter writer) { @@ -1443,7 +1443,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati return insertDocumentList(collectionName, documents).thenMany(Flux.fromIterable(tuples)); }); - return insertDocuments.flatMap(tuple -> { + return insertDocuments.flatMapSequential(tuple -> { Document document = tuple.getT2(); Object id = MappedDocument.of(document).getId(); @@ -1600,7 +1600,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati return collectionToUse.insertMany(documents); - }).flatMap(s -> { + }).flatMapSequential(s -> { return Flux.fromStream(documents.stream() // .map(MappedDocument::of) // @@ -2187,7 +2187,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati publisher = collation.map(Collation::toMongoCollation).map(publisher::collation).orElse(publisher); return Flux.from(publisher) - .concatMap(new ReadDocumentCallback<>(mongoConverter, resultType, inputCollectionName)::doWith); + .flatMapSequential(new ReadDocumentCallback<>(mongoConverter, resultType, inputCollectionName)::doWith); }); } @@ -2255,7 +2255,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati return Flux.from(flux).collectList().filter(it -> !it.isEmpty()) .flatMapMany(list -> Flux.from(remove(operations.getByIdInQuery(list), entityClass, collectionName)) - .flatMap(deleteResult -> Flux.fromIterable(list))); + .flatMapSequential(deleteResult -> Flux.fromIterable(list))); } /** @@ -2729,7 +2729,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati return createFlux(collectionName, collection -> { return Flux.from(preparer.initiateFind(collection, collectionCallback::doInCollection)) - .concatMap(objectCallback::doWith); + .flatMapSequential(objectCallback::doWith); }); } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java index 1c84e5fde..7cf6de352 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java @@ -121,7 +121,7 @@ public class SimpleReactiveMongoRepository implement Assert.notNull(entityStream, "The given Publisher of entities must not be null"); - return Flux.from(entityStream).flatMap(entity -> entityInformation.isNew(entity) ? // + return Flux.from(entityStream).flatMapSequential(entity -> entityInformation.isNew(entity) ? // mongoOperations.insert(entity, entityInformation.getCollectionName()) : // mongoOperations.save(entity, entityInformation.getCollectionName())); } @@ -191,7 +191,7 @@ public class SimpleReactiveMongoRepository implement Assert.notNull(ids, "The given Publisher of Id's must not be null"); Optional readPreference = getReadPreference(); - return Flux.from(ids).buffer().flatMap(listOfIds -> { + return Flux.from(ids).buffer().flatMapSequential(listOfIds -> { Query query = getIdQuery(listOfIds); readPreference.ifPresent(query::withReadPreference); return mongoOperations.find(query, entityInformation.getJavaType(), entityInformation.getCollectionName()); @@ -345,7 +345,8 @@ public class SimpleReactiveMongoRepository implement Assert.notNull(entities, "The given Publisher of entities must not be null"); - return Flux.from(entities).flatMap(entity -> mongoOperations.insert(entity, entityInformation.getCollectionName())); + return Flux.from(entities) + .flatMapSequential(entity -> mongoOperations.insert(entity, entityInformation.getCollectionName())); } // ------------------------------------------------------------------------- diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java index 79fc1869f..8e6173f5a 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import org.assertj.core.api.Assertions; @@ -697,6 +698,28 @@ public class ReactiveMongoTemplateUnitTests { verify(collection).withReadPreference(ReadPreference.primaryPreferred()); } + @Test // GH-4543 + void aggregateDoesNotLimitBackpressure() { + + reset(collection); + + AtomicLong request = new AtomicLong(); + Publisher realPublisher = Flux.just(new Document()).doOnRequest(request::addAndGet); + + doAnswer(invocation -> { + Subscriber subscriber = invocation.getArgument(0); + realPublisher.subscribe(subscriber); + return null; + }).when(aggregatePublisher).subscribe(any()); + + when(collection.aggregate(anyList())).thenReturn(aggregatePublisher); + when(collection.aggregate(anyList(), any(Class.class))).thenReturn(aggregatePublisher); + + template.aggregate(newAggregation(Sith.class, project("id")), AutogenerateableId.class, Document.class).subscribe(); + + assertThat(request).hasValueGreaterThan(128); + } + @Test // DATAMONGO-1854 void aggreateShouldUseCollationFromOptionsEvenIfDefaultCollationIsPresent() { @@ -1261,6 +1284,17 @@ public class ReactiveMongoTemplateUnitTests { assertThat(results.get(0).id).isEqualTo("after-convert"); } + @Test // GH-4543 + void findShouldNotLimitBackpressure() { + + AtomicLong request = new AtomicLong(); + stubFindSubscribe(new Document(), request); + + template.find(new Query(), Person.class).subscribe(); + + assertThat(request).hasValueGreaterThan(128); + } + @Test // DATAMONGO-2479 void findByIdShouldInvokeAfterConvertCallbacks() { @@ -1706,8 +1740,12 @@ public class ReactiveMongoTemplateUnitTests { } private void stubFindSubscribe(Document document) { + stubFindSubscribe(document, new AtomicLong()); + } + + private void stubFindSubscribe(Document document, AtomicLong request) { - Publisher realPublisher = Flux.just(document); + Publisher realPublisher = Flux.just(document).doOnRequest(request::addAndGet); doAnswer(invocation -> { Subscriber subscriber = invocation.getArgument(0);