Browse Source

Switch to `Flux.flatMapSequential(…)` to prevent backpressure shaping.

We now use Flux.flatMapSequential(…) instead of concatMap as concatMap reduces the request size to 1. The change in backpressure/request size reduces parallelism and impacts the batch size by fetching 2 documents instead of considering the actual backpressure.

flatMapSequential doesn't tamper the requested amount while retaining the sequence order.

Closes: #4543
Original Pull Request: #4550
pull/4402/merge
Mark Paluch 2 years ago committed by Christoph Strobl
parent
commit
fcc222244b
No known key found for this signature in database
GPG Key ID: 8CC1AB53391458C8
  1. 2
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperations.java
  2. 18
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java
  3. 7
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java
  4. 40
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java

2
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperations.java

@ -216,7 +216,7 @@ class DefaultReactiveBulkOperations extends BulkOperationsSupport implements Rea @@ -216,7 +216,7 @@ class DefaultReactiveBulkOperations extends BulkOperationsSupport implements Rea
collection = collection.withWriteConcern(defaultWriteConcern);
}
Flux<SourceAwareWriteModelHolder> concat = Flux.concat(models).flatMap(it -> {
Flux<SourceAwareWriteModelHolder> concat = Flux.concat(models).flatMapSequential(it -> {
if (it.model()instanceof InsertOneModel<Document> iom) {

18
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java

@ -1051,7 +1051,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati @@ -1051,7 +1051,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
return (isOutOrMerge ? Flux.from(cursor.toCollection()) : Flux.from(cursor.first())).thenMany(Mono.empty());
}
return Flux.from(cursor).concatMap(readCallback::doWith);
return Flux.from(cursor).flatMapSequential(readCallback::doWith);
}
@Override
@ -1098,7 +1098,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati @@ -1098,7 +1098,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
.withOptions(optionsBuilder.build());
return aggregate($geoNear, collection, Document.class) //
.concatMap(callback::doWith);
.flatMapSequential(callback::doWith);
}
@Override
@ -1324,7 +1324,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati @@ -1324,7 +1324,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
Assert.notNull(batchToSave, "Batch to insert must not be null");
return Flux.from(batchToSave).flatMap(collection -> insert(collection, collectionName));
return Flux.from(batchToSave).flatMapSequential(collection -> insert(collection, collectionName));
}
@Override
@ -1392,7 +1392,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati @@ -1392,7 +1392,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
@Override
public <T> Flux<T> insertAll(Mono<? extends Collection<? extends T>> objectsToSave) {
return Flux.from(objectsToSave).flatMap(this::insertAll);
return Flux.from(objectsToSave).flatMapSequential(this::insertAll);
}
protected <T> Flux<T> doInsertAll(Collection<? extends T> listToSave, MongoWriter<Object> writer) {
@ -1443,7 +1443,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati @@ -1443,7 +1443,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
return insertDocumentList(collectionName, documents).thenMany(Flux.fromIterable(tuples));
});
return insertDocuments.flatMap(tuple -> {
return insertDocuments.flatMapSequential(tuple -> {
Document document = tuple.getT2();
Object id = MappedDocument.of(document).getId();
@ -1600,7 +1600,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati @@ -1600,7 +1600,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
return collectionToUse.insertMany(documents);
}).flatMap(s -> {
}).flatMapSequential(s -> {
return Flux.fromStream(documents.stream() //
.map(MappedDocument::of) //
@ -2187,7 +2187,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati @@ -2187,7 +2187,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
publisher = collation.map(Collation::toMongoCollation).map(publisher::collation).orElse(publisher);
return Flux.from(publisher)
.concatMap(new ReadDocumentCallback<>(mongoConverter, resultType, inputCollectionName)::doWith);
.flatMapSequential(new ReadDocumentCallback<>(mongoConverter, resultType, inputCollectionName)::doWith);
});
}
@ -2255,7 +2255,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati @@ -2255,7 +2255,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
return Flux.from(flux).collectList().filter(it -> !it.isEmpty())
.flatMapMany(list -> Flux.from(remove(operations.getByIdInQuery(list), entityClass, collectionName))
.flatMap(deleteResult -> Flux.fromIterable(list)));
.flatMapSequential(deleteResult -> Flux.fromIterable(list)));
}
/**
@ -2729,7 +2729,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati @@ -2729,7 +2729,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
return createFlux(collectionName, collection -> {
return Flux.from(preparer.initiateFind(collection, collectionCallback::doInCollection))
.concatMap(objectCallback::doWith);
.flatMapSequential(objectCallback::doWith);
});
}

7
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java

@ -121,7 +121,7 @@ public class SimpleReactiveMongoRepository<T, ID extends Serializable> implement @@ -121,7 +121,7 @@ public class SimpleReactiveMongoRepository<T, ID extends Serializable> implement
Assert.notNull(entityStream, "The given Publisher of entities must not be null");
return Flux.from(entityStream).flatMap(entity -> entityInformation.isNew(entity) ? //
return Flux.from(entityStream).flatMapSequential(entity -> entityInformation.isNew(entity) ? //
mongoOperations.insert(entity, entityInformation.getCollectionName()) : //
mongoOperations.save(entity, entityInformation.getCollectionName()));
}
@ -191,7 +191,7 @@ public class SimpleReactiveMongoRepository<T, ID extends Serializable> implement @@ -191,7 +191,7 @@ public class SimpleReactiveMongoRepository<T, ID extends Serializable> implement
Assert.notNull(ids, "The given Publisher of Id's must not be null");
Optional<ReadPreference> readPreference = getReadPreference();
return Flux.from(ids).buffer().flatMap(listOfIds -> {
return Flux.from(ids).buffer().flatMapSequential(listOfIds -> {
Query query = getIdQuery(listOfIds);
readPreference.ifPresent(query::withReadPreference);
return mongoOperations.find(query, entityInformation.getJavaType(), entityInformation.getCollectionName());
@ -345,7 +345,8 @@ public class SimpleReactiveMongoRepository<T, ID extends Serializable> implement @@ -345,7 +345,8 @@ public class SimpleReactiveMongoRepository<T, ID extends Serializable> implement
Assert.notNull(entities, "The given Publisher of entities must not be null");
return Flux.from(entities).flatMap(entity -> mongoOperations.insert(entity, entityInformation.getCollectionName()));
return Flux.from(entities)
.flatMapSequential(entity -> mongoOperations.insert(entity, entityInformation.getCollectionName()));
}
// -------------------------------------------------------------------------

40
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java

@ -34,6 +34,7 @@ import java.util.List; @@ -34,6 +34,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
@ -697,6 +698,28 @@ public class ReactiveMongoTemplateUnitTests { @@ -697,6 +698,28 @@ public class ReactiveMongoTemplateUnitTests {
verify(collection).withReadPreference(ReadPreference.primaryPreferred());
}
@Test // GH-4543
void aggregateDoesNotLimitBackpressure() {
reset(collection);
AtomicLong request = new AtomicLong();
Publisher<Document> realPublisher = Flux.just(new Document()).doOnRequest(request::addAndGet);
doAnswer(invocation -> {
Subscriber<Document> subscriber = invocation.getArgument(0);
realPublisher.subscribe(subscriber);
return null;
}).when(aggregatePublisher).subscribe(any());
when(collection.aggregate(anyList())).thenReturn(aggregatePublisher);
when(collection.aggregate(anyList(), any(Class.class))).thenReturn(aggregatePublisher);
template.aggregate(newAggregation(Sith.class, project("id")), AutogenerateableId.class, Document.class).subscribe();
assertThat(request).hasValueGreaterThan(128);
}
@Test // DATAMONGO-1854
void aggreateShouldUseCollationFromOptionsEvenIfDefaultCollationIsPresent() {
@ -1261,6 +1284,17 @@ public class ReactiveMongoTemplateUnitTests { @@ -1261,6 +1284,17 @@ public class ReactiveMongoTemplateUnitTests {
assertThat(results.get(0).id).isEqualTo("after-convert");
}
@Test // GH-4543
void findShouldNotLimitBackpressure() {
AtomicLong request = new AtomicLong();
stubFindSubscribe(new Document(), request);
template.find(new Query(), Person.class).subscribe();
assertThat(request).hasValueGreaterThan(128);
}
@Test // DATAMONGO-2479
void findByIdShouldInvokeAfterConvertCallbacks() {
@ -1706,8 +1740,12 @@ public class ReactiveMongoTemplateUnitTests { @@ -1706,8 +1740,12 @@ public class ReactiveMongoTemplateUnitTests {
}
private void stubFindSubscribe(Document document) {
stubFindSubscribe(document, new AtomicLong());
}
private void stubFindSubscribe(Document document, AtomicLong request) {
Publisher<Document> realPublisher = Flux.just(document);
Publisher<Document> realPublisher = Flux.just(document).doOnRequest(request::addAndGet);
doAnswer(invocation -> {
Subscriber<Document> subscriber = invocation.getArgument(0);

Loading…
Cancel
Save