From d0ee280f095cbd01cab559ae7420bc7dbe9a8c10 Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Thu, 31 Oct 2024 07:46:58 +0100 Subject: [PATCH] Retain order doing reactive save operations with multiple elements. Ensure subscription order on multi document operations. Original pull request: #4824 Closes #4804 --- .../data/mongodb/core/ReactiveMongoTemplate.java | 2 +- .../support/SimpleReactiveMongoRepository.java | 14 ++++++-------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java index 10ae7a9ea..94d4c9cd1 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java @@ -1413,7 +1413,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati }); return Flux.fromIterable(elementsByCollection.keySet()) - .flatMap(collectionName -> doInsertBatch(collectionName, elementsByCollection.get(collectionName), writer)); + .concatMap(collectionName -> doInsertBatch(collectionName, elementsByCollection.get(collectionName), writer)); } protected Flux doInsertBatch(String collectionName, Collection batchToSave, diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java index bb110c9d7..7bc4cbdf3 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java @@ -112,8 +112,8 @@ public class SimpleReactiveMongoRepository implement Streamable source = Streamable.of(entities); return source.stream().allMatch(entityInformation::isNew) ? // - mongoOperations.insert(source.stream().collect(Collectors.toList()), entityInformation.getCollectionName()) : // - Flux.fromIterable(entities).flatMap(this::save); + insert(entities) : + Flux.fromIterable(entities).concatMap(this::save); } @Override @@ -121,7 +121,7 @@ public class SimpleReactiveMongoRepository implement Assert.notNull(entityStream, "The given Publisher of entities must not be null"); - return Flux.from(entityStream).flatMapSequential(entity -> entityInformation.isNew(entity) ? // + return Flux.from(entityStream).concatMap(entity -> entityInformation.isNew(entity) ? // mongoOperations.insert(entity, entityInformation.getCollectionName()) : // mongoOperations.save(entity, entityInformation.getCollectionName())); } @@ -295,7 +295,7 @@ public class SimpleReactiveMongoRepository implement Optional readPreference = getReadPreference(); return Flux.from(entityStream)// .map(entityInformation::getRequiredId)// - .flatMap(id -> deleteById(id, readPreference))// + .concatMap(id -> deleteById(id, readPreference))// .then(); } @@ -336,8 +336,7 @@ public class SimpleReactiveMongoRepository implement Assert.notNull(entities, "The given Iterable of entities must not be null"); Collection source = toCollection(entities); - - return source.isEmpty() ? Flux.empty() : mongoOperations.insertAll(source); + return source.isEmpty() ? Flux.empty() : mongoOperations.insert(source, entityInformation.getCollectionName()); } @Override @@ -345,8 +344,7 @@ public class SimpleReactiveMongoRepository implement Assert.notNull(entities, "The given Publisher of entities must not be null"); - return Flux.from(entities) - .flatMapSequential(entity -> mongoOperations.insert(entity, entityInformation.getCollectionName())); + return Flux.from(entities).concatMap(this::insert); } // -------------------------------------------------------------------------