From 05f325687c1424ef613a7f98bed628cd5e3322ec Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Thu, 7 Jun 2018 10:24:18 +0200 Subject: [PATCH] DATAMONGO-2001 - Count within transaction should return only the total count of documents visible to the specific session. We now delegate count operations within an active transaction to an aggregation. Once `MongoTemplate` detects an active transaction, all exposed `count()` methods are converted and delegated to the aggregation framework using `$match` and `$count` operators, preserving `Query` settings, such as `collation`. The following snippet of `count` inside the session bound closure session.startTransaction(); template.withSession(session) .execute(action -> { action.count(query(where("state").is("active")), Step.class) ... runs: db.collection.aggregate( [ { $match: { state: "active" } }, { $count: "totalEntityCount" } ] ) instead of: db.collection.find( { state: "active" } ).count() Original pull request: #568. --- .travis.yml | 2 +- .../data/mongodb/core/MongoTemplate.java | 59 +++++++- .../mongodb/core/ReactiveMongoTemplate.java | 58 +++++++- .../core/ReactiveClientSessionTests.java | 22 ++- .../core/SessionBoundMongoTemplateTests.java | 61 +++++++- .../client-session-transactions.adoc | 130 +++++++++++++----- 6 files changed, 287 insertions(+), 45 deletions(-) diff --git a/.travis.yml b/.travis.yml index 471e86e73..0a76c027e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,7 +18,7 @@ env: matrix: - PROFILE=ci global: - - MONGO_VERSION=4.0.0-rc0 + - MONGO_VERSION=4.0.0-rc3 addons: apt: diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java index 4d0d92d58..4771eea07 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java @@ -67,9 +67,11 @@ import org.springframework.data.mongodb.SessionSynchronization; import org.springframework.data.mongodb.core.BulkOperations.BulkMode; import org.springframework.data.mongodb.core.DefaultBulkOperations.BulkOperationContext; import org.springframework.data.mongodb.core.aggregation.Aggregation; +import org.springframework.data.mongodb.core.aggregation.AggregationOperation; import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext; import org.springframework.data.mongodb.core.aggregation.AggregationOptions; import org.springframework.data.mongodb.core.aggregation.AggregationResults; +import org.springframework.data.mongodb.core.aggregation.CountOperation; import org.springframework.data.mongodb.core.aggregation.Fields; import org.springframework.data.mongodb.core.aggregation.TypeBasedAggregationOperationContext; import org.springframework.data.mongodb.core.aggregation.TypedAggregation; @@ -105,6 +107,7 @@ import org.springframework.data.mongodb.core.mapreduce.MapReduceOptions; import org.springframework.data.mongodb.core.mapreduce.MapReduceResults; import org.springframework.data.mongodb.core.query.Collation; import org.springframework.data.mongodb.core.query.Criteria; +import org.springframework.data.mongodb.core.query.CriteriaDefinition; import org.springframework.data.mongodb.core.query.Meta; import org.springframework.data.mongodb.core.query.NearQuery; import org.springframework.data.mongodb.core.query.Query; @@ -2116,7 +2119,8 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, Assert.notNull(aggregation, "Aggregation pipeline must not be null!"); Assert.notNull(outputType, "Output type must not be null!"); - AggregationOperationContext contextToUse = new AggregationUtil(queryMapper, mappingContext).prepareAggregationContext(aggregation, context); + AggregationOperationContext contextToUse = new AggregationUtil(queryMapper, mappingContext) + .prepareAggregationContext(aggregation, context); return doAggregate(aggregation, collectionName, outputType, contextToUse); } @@ -2847,7 +2851,6 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, return fields; } - /** * Tries to convert the given {@link RuntimeException} into a {@link DataAccessException} but returns the original * exception if the conversation failed. Thus allows safe re-throwing of the return value. @@ -3498,6 +3501,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, static class SessionBoundMongoTemplate extends MongoTemplate { private final MongoTemplate delegate; + private final ClientSession session; /** * @param session must not be {@literal null}. @@ -3508,6 +3512,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, super(that.getMongoDbFactory().withSession(session), that); this.delegate = that; + this.session = session; } /* @@ -3531,5 +3536,55 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, // native MongoDB objects that offer methods with ClientSession must not be proxied. return delegate.getDb(); } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.MongoTemplate#count(org.springframework.data.mongodb.core.query.Query, java.lang.Class, java.lang.String) + */ + @Override + public long count(Query query, @Nullable Class entityClass, String collection) { + + if (!session.hasActiveTransaction()) { + return super.count(query, entityClass, collection); + } + + List pipeline = computeCountAggregationPipeline(query, entityClass); + + Aggregation aggregation = entityClass != null ? Aggregation.newAggregation(entityClass, pipeline) + : Aggregation.newAggregation(pipeline); + aggregation.withOptions(AggregationOptions.builder().collation(query.getCollation().orElse(null)).build()); + + AggregationResults aggregationResults = aggregate(aggregation, collection, Document.class); + return ((List) aggregationResults.getRawResults().getOrDefault("results", + Collections.singletonList(new Document("totalEntityCount", 0)))).get(0).get("totalEntityCount", Number.class) + .longValue(); + } + + private List computeCountAggregationPipeline(Query query, @Nullable Class entityType) { + + CountOperation count = Aggregation.count().as("totalEntityCount"); + if (query.getQueryObject().isEmpty()) { + return Arrays.asList(count); + } + + Document mappedQuery = delegate.queryMapper.getMappedObject(query.getQueryObject(), + delegate.getPersistentEntity(entityType)); + + CriteriaDefinition criteria = new CriteriaDefinition() { + + @Override + public Document getCriteriaObject() { + return mappedQuery; + } + + @Nullable + @Override + public String getKey() { + return null; + } + }; + + return Arrays.asList(Aggregation.match(criteria), count); + } } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java index 8d071525d..4b303faad 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java @@ -70,8 +70,10 @@ import org.springframework.data.mapping.model.ConvertingPropertyAccessor; import org.springframework.data.mongodb.MongoDbFactory; import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory; import org.springframework.data.mongodb.core.aggregation.Aggregation; +import org.springframework.data.mongodb.core.aggregation.AggregationOperation; import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext; import org.springframework.data.mongodb.core.aggregation.AggregationOptions; +import org.springframework.data.mongodb.core.aggregation.CountOperation; import org.springframework.data.mongodb.core.aggregation.PrefixingDelegatingAggregationOperationContext; import org.springframework.data.mongodb.core.aggregation.TypeBasedAggregationOperationContext; import org.springframework.data.mongodb.core.aggregation.TypedAggregation; @@ -94,6 +96,7 @@ import org.springframework.data.mongodb.core.mapping.event.MongoMappingEvent; import org.springframework.data.mongodb.core.mapreduce.MapReduceOptions; import org.springframework.data.mongodb.core.query.Collation; import org.springframework.data.mongodb.core.query.Criteria; +import org.springframework.data.mongodb.core.query.CriteriaDefinition; import org.springframework.data.mongodb.core.query.NearQuery; import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Update; @@ -2639,7 +2642,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati } /** - * Exception translation {@link Function} intended for {@link Flux#mapError(Function)}} usage. + * Exception translation {@link Function} intended for {@link Flux#onErrorMap(Function)} usage. * * @return the exception translation {@link Function} */ @@ -3231,6 +3234,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati static class ReactiveSessionBoundMongoTemplate extends ReactiveMongoTemplate { private final ReactiveMongoTemplate delegate; + private final ClientSession session; /** * @param session must not be {@literal null}. @@ -3241,6 +3245,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati super(that.mongoDatabaseFactory.withSession(session), that); this.delegate = that; + this.session = session; } /* @@ -3264,6 +3269,57 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati // native MongoDB objects that offer methods with ClientSession must not be proxied. return delegate.getMongoDatabase(); } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.ReactiveMongoTemplate#count(org.springframework.data.mongodb.core.query.Query, java.lang.Class, java.lang.String) + */ + @Override + public Mono count(@Nullable Query query, @Nullable Class entityClass, String collectionName) { + + if (!session.hasActiveTransaction()) { + return super.count(query, entityClass, collectionName); + } + + List pipeline = computeCountAggregationPipeline(query, entityClass); + + Aggregation aggregation = entityClass != null ? Aggregation.newAggregation(entityClass, pipeline) + : Aggregation.newAggregation(pipeline); + aggregation.withOptions(AggregationOptions.builder().collation(query.getCollation().orElse(null)).build()); + + return aggregate(aggregation, collectionName, Document.class) // + .defaultIfEmpty(new Document("totalEntityCount", 0)) // + .next() // + .map(it -> it.get("totalEntityCount", Number.class).longValue()); + } + + private List computeCountAggregationPipeline(@Nullable Query query, + @Nullable Class entityType) { + + CountOperation count = Aggregation.count().as("totalEntityCount"); + if (query == null || query.getQueryObject().isEmpty()) { + return Arrays.asList(count); + } + + Document mappedQuery = delegate.queryMapper.getMappedObject(query.getQueryObject(), + delegate.getPersistentEntity(entityType)); + + CriteriaDefinition criteria = new CriteriaDefinition() { + + @Override + public Document getCriteriaObject() { + return mappedQuery; + } + + @Nullable + @Override + public String getKey() { + return null; + } + }; + + return Arrays.asList(Aggregation.match(criteria), count); + } } @RequiredArgsConstructor diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveClientSessionTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveClientSessionTests.java index f78301265..a1922f8d8 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveClientSessionTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveClientSessionTests.java @@ -100,8 +100,7 @@ public class ReactiveClientSessionTests { assertThat(session.getOperationTime()).isNull(); - template.withSession(() -> session) - .execute(action -> action.findOne(new Query(), Document.class, COLLECTION_NAME)) // + template.withSession(() -> session).execute(action -> action.findOne(new Query(), Document.class, COLLECTION_NAME)) // .as(StepVerifier::create) // .expectNextCount(1) // .verifyComplete(); @@ -138,6 +137,25 @@ public class ReactiveClientSessionTests { .verifyComplete(); } + @Test // DATAMONGO-2001 + public void countShouldOnlyReturnCorrectly() { + + ClientSession session = Mono + .from(client.startSession(ClientSessionOptions.builder().causallyConsistent(true).build())).block(); + + template.withSession(() -> session).execute(action -> { + + session.startTransaction(); + + return action.insert(new Document("_id", "id-2").append("value", "in transaction"), COLLECTION_NAME) // + .then(action.count(new Query(), Document.class, COLLECTION_NAME)) // + .flatMap(it -> Mono.from(session.commitTransaction()).then(Mono.just(it))); + + }).as(StepVerifier::create) // + .expectNext(2L) // + .verifyComplete(); + } + static class CountingSessionSupplier implements Supplier { AtomicInteger invocationCount = new AtomicInteger(0); diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/SessionBoundMongoTemplateTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/SessionBoundMongoTemplateTests.java index 7029a7eb5..02035ac95 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/SessionBoundMongoTemplateTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/SessionBoundMongoTemplateTests.java @@ -16,7 +16,8 @@ package org.springframework.data.mongodb.core; import static org.assertj.core.api.Assertions.*; -import static org.mockito.ArgumentMatchers.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.*; import lombok.Data; @@ -26,6 +27,9 @@ import java.lang.reflect.Proxy; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.aopalliance.aop.Advice; import org.bson.Document; @@ -55,6 +59,7 @@ import org.springframework.data.mongodb.core.convert.MongoCustomConversions; import org.springframework.data.mongodb.core.mapping.DBRef; import org.springframework.data.mongodb.core.mapping.MongoMappingContext; import org.springframework.data.mongodb.core.query.Query; +import org.springframework.data.mongodb.test.util.MongoTestUtils; import org.springframework.data.mongodb.test.util.MongoVersionRule; import org.springframework.data.mongodb.test.util.ReplicaSet; import org.springframework.data.util.Version; @@ -78,6 +83,7 @@ public class SessionBoundMongoTemplateTests { public @Rule ExpectedException exception = ExpectedException.none(); + MongoClient client; MongoTemplate template; SessionBoundMongoTemplate sessionBoundTemplate; ClientSession session; @@ -87,7 +93,7 @@ public class SessionBoundMongoTemplateTests { @Before public void setUp() { - MongoClient client = new MongoClient(); + client = MongoTestUtils.replSetClient(); MongoDbFactory factory = new SimpleMongoDbFactory(client, "session-bound-mongo-template-tests") { @@ -139,7 +145,9 @@ public class SessionBoundMongoTemplateTests { @After public void tearDown() { + session.close(); + client.close(); } @Test // DATAMONGO-1880 @@ -259,6 +267,55 @@ public class SessionBoundMongoTemplateTests { assertThat(result.getPersonRef()).isEqualTo(person); // resolve the lazy loading proxy } + @Test // DATAMONGO-2001 + public void countShouldOnlyReturnCorrectly() throws InterruptedException { + + if (!template.collectionExists(Person.class)) { + template.createCollection(Person.class); + } else { + template.remove(Person.class).all(); + } + + List resultList = new CopyOnWriteArrayList<>(); + + int nrThreads = 2; + CountDownLatch countDownLatch = new CountDownLatch(nrThreads); + + for (int i = 0; i < nrThreads; i++) { + + new Thread(() -> { + + ClientSession session = client.startSession(); + session.startTransaction(); + + try { + + MongoTemplate sessionBound = template.withSession(session); + + try { + sessionBound.save(new Person("Kylar Stern")); + } finally { + countDownLatch.countDown(); + } + + countDownLatch.await(1, TimeUnit.SECONDS); + + resultList.add(Long.valueOf(sessionBound.query(Person.class).count())); + } catch (Exception e) { + resultList.add(e); + } + + session.commitTransaction(); + session.close(); + }).start(); + } + + countDownLatch.await(); + + assertThat(template.query(Person.class).count()).isEqualTo(2L); + assertThat(resultList).allMatch(it -> it.equals(1L)); + } + @Data static class WithDbRef { diff --git a/src/main/asciidoc/reference/client-session-transactions.adoc b/src/main/asciidoc/reference/client-session-transactions.adoc index a271468cd..8e5c2169c 100644 --- a/src/main/asciidoc/reference/client-session-transactions.adoc +++ b/src/main/asciidoc/reference/client-session-transactions.adoc @@ -127,7 +127,6 @@ template.withSession(session) session.abortTransaction(); <4> } }, ClientSession::close) <5> - .subscribe(); ---- <1> Obtain a new `ClientSession`. <2> Start the transaction. @@ -155,16 +154,16 @@ TransactionTemplate txTemplate = new TransactionTemplate(anyTxManager); txTemplate.execute(new TransactionCallbackWithoutResult() { - @Override - protected void doInTransactionWithoutResult(TransactionStatus status) { <3> + @Override + protected void doInTransactionWithoutResult(TransactionStatus status) { <3> - Step step = // ...; - template.insert(step); + Step step = // ...; + template.insert(step); - process(step); + process(step); - template.update(Step.class).apply(Update.set("state", // ... - }; + template.update(Step.class).apply(Update.set("state", // ... + }; }); ---- <1> Enable transaction synchronization during Template API configuration. @@ -186,26 +185,26 @@ The `MongoTransactionManager` binds a `ClientSession` to the thread. `MongoTempl @Configuration static class Config extends AbstractMongoConfiguration { - @Bean - MongoTransactionManager transactionManager(MongoDbFactory dbFactory) { <1> - return new MongoTransactionManager(dbFactory); - } + @Bean + MongoTransactionManager transactionManager(MongoDbFactory dbFactory) { <1> + return new MongoTransactionManager(dbFactory); + } - // ... + // ... } @Component public class StateService { - @Transactional - void someBusinessFunction(Step step) { <2> + @Transactional + void someBusinessFunction(Step step) { <2> - template.insert(step); + template.insert(step); - process(step); + process(step); - template.update(Step.class).apply(Update.set("state", // ... - }; + template.update(Step.class).apply(Update.set("state", // ... + }; }); ---- @@ -230,18 +229,18 @@ Using the plain MongoDB reactive driver API a `delete` within a transactional fl [source,java] ---- Mono result = Mono - .from(client.startSession()) <1> + .from(client.startSession()) <1> - .flatMap(session -> { - session.startTransaction(); <2> + .flatMap(session -> { + session.startTransaction(); <2> - return Mono.from(collection.deleteMany(session, ...)) <3> + return Mono.from(collection.deleteMany(session, ...)) <3> - .onErrorResume(e -> Mono.from(session.abortTransaction()).then(Mono.error(e))) <4> + .onErrorResume(e -> Mono.from(session.abortTransaction()).then(Mono.error(e))) <4> - .flatMap(val -> Mono.from(session.commitTransaction()).then(Mono.just(val))) <5> + .flatMap(val -> Mono.from(session.commitTransaction()).then(Mono.just(val))) <5> - .doFinally(signal -> session.close()); <6> + .doFinally(signal -> session.close()); <6> }); ---- <1> First we obviously need to initiate the session. @@ -263,9 +262,9 @@ accordingly. This allows you to express the above flow simply as the following: ==== [source,java] ---- -Mono result = template.inTransaction() <1> +Mono result = template.inTransaction() <1> - .execute(action -> action.remove(query(where("id").is("step-1")), Step.class)); <2> + .execute(action -> action.remove(query(where("id").is("step-1")), Step.class)); <2> ---- <1> Initiate the transaction. <2> Operate within the `ClientSession`. Each `execute(…)` unit of work callback initiates a new transaction in the scope of the same `ClientSession`. @@ -280,20 +279,77 @@ reactive flow of `execute(…)` that are not propagated to outside of the callba ==== [source,java] ---- -template.inTransaction() <1> +template.inTransaction() <1> - .execute(action -> action.find(query(where("state").is("active")), Step.class) - .flatMap(step -> action.update(Step.class) - .matching(query(where("id").is(step.id))) - .apply(update("state", "paused")) - .all())) <2> + .execute(action -> action.find(query(where("state").is("active")), Step.class) + .flatMap(step -> action.update(Step.class) + .matching(query(where("id").is(step.id))) + .apply(update("state", "paused")) + .all())) <2> - .flatMap(updated -> { - // Exception could happen here <3> - }); + .flatMap(updated -> { + // Exception could happen here <3> + }); ---- <1> Initiate the managed transaction. <2> Operate within the `ClientSession`. The transaction is committed after this is done or rolled back if an error occurs here. <3> An error outside the transaction flow has no affect on the previous transactional execution. ==== + +== Special behavior inside transactions + +Inside transactions MongoDB server has a slightly different behavior. + +*Connection Settings* + +The MongoDB drivers offer a dedicated replica set name configuration option turing the driver into an auto detection +mode. This option helps identifying replica set master nodes and command routing during a transaction. + +INFO: Make sure to add `replicaSet` to the MongoDB Uri. Please refer to https://docs.mongodb.com/manual/reference/connection-string/#connections-connection-options[connection string options] for further details. + +*Collection Operations* + +MongoDB does *not* support collection operations, such as collection creation, within a transaction. This also +affects the on the fly collection creation that happens on first usage. Therefore make sure to have all required +structures in place. + +*Count* + +MongoDB `count` operates upon collection statistics which may not reflect the actual situation within a transaction. +The server responds with _error 50851_ when issuing a `count` command inside of a multi-document transaction. +Once `MongoTemplate` detects an active transaction, all exposed `count()` methods are converted and delegated to the +aggregation framework using `$match` and `$count` operators, preserving `Query` settings, such as `collation`. + +==== +The following snippet of `count` inside the session bound closure + +[source,javascript] +---- +session.startTransaction(); + +template.withSession(session) + .execute(action -> { + action.count(query(where("state").is("active")), Step.class) + ... +---- + +runs: + +[source,javascript] +---- +db.collection.aggregate( + [ + { $match: { state: "active" } }, + { $group: { _id: null, count: { $sum: 1 } } } + ] +) +---- + +instead of: + +[source,javascript] +---- +db.collection.find( { state: "active" } ).count() +---- +==== \ No newline at end of file