Browse Source

DATAMONGO-2001 - Count within transaction should return only the total count of documents visible to the specific session.

We now delegate count operations within an active transaction to an aggregation.

Once `MongoTemplate` detects an active transaction, all exposed `count()` methods are converted and delegated to the
aggregation framework using `$match` and `$count` operators, preserving `Query` settings, such as `collation`.

The following snippet of `count` inside the session bound closure

session.startTransaction();
template.withSession(session)
    .execute(action -> {
        action.count(query(where("state").is("active")), Step.class)
        ...

runs:

db.collection.aggregate(
   [
      { $match: { state: "active" } },
      { $count: "totalEntityCount" }
   ]
)

instead of:

db.collection.find( { state: "active" } ).count()

Original pull request: #568.
pull/570/merge
Christoph Strobl 8 years ago committed by Mark Paluch
parent
commit
05f325687c
  1. 2
      .travis.yml
  2. 59
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java
  3. 58
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java
  4. 22
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveClientSessionTests.java
  5. 61
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/SessionBoundMongoTemplateTests.java
  6. 130
      src/main/asciidoc/reference/client-session-transactions.adoc

2
.travis.yml

@ -18,7 +18,7 @@ env: @@ -18,7 +18,7 @@ env:
matrix:
- PROFILE=ci
global:
- MONGO_VERSION=4.0.0-rc0
- MONGO_VERSION=4.0.0-rc3
addons:
apt:

59
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java

@ -67,9 +67,11 @@ import org.springframework.data.mongodb.SessionSynchronization; @@ -67,9 +67,11 @@ import org.springframework.data.mongodb.SessionSynchronization;
import org.springframework.data.mongodb.core.BulkOperations.BulkMode;
import org.springframework.data.mongodb.core.DefaultBulkOperations.BulkOperationContext;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
import org.springframework.data.mongodb.core.aggregation.AggregationResults;
import org.springframework.data.mongodb.core.aggregation.CountOperation;
import org.springframework.data.mongodb.core.aggregation.Fields;
import org.springframework.data.mongodb.core.aggregation.TypeBasedAggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
@ -105,6 +107,7 @@ import org.springframework.data.mongodb.core.mapreduce.MapReduceOptions; @@ -105,6 +107,7 @@ import org.springframework.data.mongodb.core.mapreduce.MapReduceOptions;
import org.springframework.data.mongodb.core.mapreduce.MapReduceResults;
import org.springframework.data.mongodb.core.query.Collation;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.CriteriaDefinition;
import org.springframework.data.mongodb.core.query.Meta;
import org.springframework.data.mongodb.core.query.NearQuery;
import org.springframework.data.mongodb.core.query.Query;
@ -2116,7 +2119,8 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, @@ -2116,7 +2119,8 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
Assert.notNull(aggregation, "Aggregation pipeline must not be null!");
Assert.notNull(outputType, "Output type must not be null!");
AggregationOperationContext contextToUse = new AggregationUtil(queryMapper, mappingContext).prepareAggregationContext(aggregation, context);
AggregationOperationContext contextToUse = new AggregationUtil(queryMapper, mappingContext)
.prepareAggregationContext(aggregation, context);
return doAggregate(aggregation, collectionName, outputType, contextToUse);
}
@ -2847,7 +2851,6 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, @@ -2847,7 +2851,6 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
return fields;
}
/**
* Tries to convert the given {@link RuntimeException} into a {@link DataAccessException} but returns the original
* exception if the conversation failed. Thus allows safe re-throwing of the return value.
@ -3498,6 +3501,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, @@ -3498,6 +3501,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
static class SessionBoundMongoTemplate extends MongoTemplate {
private final MongoTemplate delegate;
private final ClientSession session;
/**
* @param session must not be {@literal null}.
@ -3508,6 +3512,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, @@ -3508,6 +3512,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
super(that.getMongoDbFactory().withSession(session), that);
this.delegate = that;
this.session = session;
}
/*
@ -3531,5 +3536,55 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, @@ -3531,5 +3536,55 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
// native MongoDB objects that offer methods with ClientSession must not be proxied.
return delegate.getDb();
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.core.MongoTemplate#count(org.springframework.data.mongodb.core.query.Query, java.lang.Class, java.lang.String)
*/
@Override
public long count(Query query, @Nullable Class<?> entityClass, String collection) {
if (!session.hasActiveTransaction()) {
return super.count(query, entityClass, collection);
}
List<AggregationOperation> pipeline = computeCountAggregationPipeline(query, entityClass);
Aggregation aggregation = entityClass != null ? Aggregation.newAggregation(entityClass, pipeline)
: Aggregation.newAggregation(pipeline);
aggregation.withOptions(AggregationOptions.builder().collation(query.getCollation().orElse(null)).build());
AggregationResults<Document> aggregationResults = aggregate(aggregation, collection, Document.class);
return ((List<Document>) aggregationResults.getRawResults().getOrDefault("results",
Collections.singletonList(new Document("totalEntityCount", 0)))).get(0).get("totalEntityCount", Number.class)
.longValue();
}
private List<AggregationOperation> computeCountAggregationPipeline(Query query, @Nullable Class<?> entityType) {
CountOperation count = Aggregation.count().as("totalEntityCount");
if (query.getQueryObject().isEmpty()) {
return Arrays.asList(count);
}
Document mappedQuery = delegate.queryMapper.getMappedObject(query.getQueryObject(),
delegate.getPersistentEntity(entityType));
CriteriaDefinition criteria = new CriteriaDefinition() {
@Override
public Document getCriteriaObject() {
return mappedQuery;
}
@Nullable
@Override
public String getKey() {
return null;
}
};
return Arrays.asList(Aggregation.match(criteria), count);
}
}
}

58
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java

@ -70,8 +70,10 @@ import org.springframework.data.mapping.model.ConvertingPropertyAccessor; @@ -70,8 +70,10 @@ import org.springframework.data.mapping.model.ConvertingPropertyAccessor;
import org.springframework.data.mongodb.MongoDbFactory;
import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
import org.springframework.data.mongodb.core.aggregation.CountOperation;
import org.springframework.data.mongodb.core.aggregation.PrefixingDelegatingAggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.TypeBasedAggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
@ -94,6 +96,7 @@ import org.springframework.data.mongodb.core.mapping.event.MongoMappingEvent; @@ -94,6 +96,7 @@ import org.springframework.data.mongodb.core.mapping.event.MongoMappingEvent;
import org.springframework.data.mongodb.core.mapreduce.MapReduceOptions;
import org.springframework.data.mongodb.core.query.Collation;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.CriteriaDefinition;
import org.springframework.data.mongodb.core.query.NearQuery;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
@ -2639,7 +2642,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati @@ -2639,7 +2642,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
}
/**
* Exception translation {@link Function} intended for {@link Flux#mapError(Function)}} usage.
* Exception translation {@link Function} intended for {@link Flux#onErrorMap(Function)} usage.
*
* @return the exception translation {@link Function}
*/
@ -3231,6 +3234,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati @@ -3231,6 +3234,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
static class ReactiveSessionBoundMongoTemplate extends ReactiveMongoTemplate {
private final ReactiveMongoTemplate delegate;
private final ClientSession session;
/**
* @param session must not be {@literal null}.
@ -3241,6 +3245,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati @@ -3241,6 +3245,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
super(that.mongoDatabaseFactory.withSession(session), that);
this.delegate = that;
this.session = session;
}
/*
@ -3264,6 +3269,57 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati @@ -3264,6 +3269,57 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
// native MongoDB objects that offer methods with ClientSession must not be proxied.
return delegate.getMongoDatabase();
}
/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.core.ReactiveMongoTemplate#count(org.springframework.data.mongodb.core.query.Query, java.lang.Class, java.lang.String)
*/
@Override
public Mono<Long> count(@Nullable Query query, @Nullable Class<?> entityClass, String collectionName) {
if (!session.hasActiveTransaction()) {
return super.count(query, entityClass, collectionName);
}
List<AggregationOperation> pipeline = computeCountAggregationPipeline(query, entityClass);
Aggregation aggregation = entityClass != null ? Aggregation.newAggregation(entityClass, pipeline)
: Aggregation.newAggregation(pipeline);
aggregation.withOptions(AggregationOptions.builder().collation(query.getCollation().orElse(null)).build());
return aggregate(aggregation, collectionName, Document.class) //
.defaultIfEmpty(new Document("totalEntityCount", 0)) //
.next() //
.map(it -> it.get("totalEntityCount", Number.class).longValue());
}
private List<AggregationOperation> computeCountAggregationPipeline(@Nullable Query query,
@Nullable Class<?> entityType) {
CountOperation count = Aggregation.count().as("totalEntityCount");
if (query == null || query.getQueryObject().isEmpty()) {
return Arrays.asList(count);
}
Document mappedQuery = delegate.queryMapper.getMappedObject(query.getQueryObject(),
delegate.getPersistentEntity(entityType));
CriteriaDefinition criteria = new CriteriaDefinition() {
@Override
public Document getCriteriaObject() {
return mappedQuery;
}
@Nullable
@Override
public String getKey() {
return null;
}
};
return Arrays.asList(Aggregation.match(criteria), count);
}
}
@RequiredArgsConstructor

22
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveClientSessionTests.java

@ -100,8 +100,7 @@ public class ReactiveClientSessionTests { @@ -100,8 +100,7 @@ public class ReactiveClientSessionTests {
assertThat(session.getOperationTime()).isNull();
template.withSession(() -> session)
.execute(action -> action.findOne(new Query(), Document.class, COLLECTION_NAME)) //
template.withSession(() -> session).execute(action -> action.findOne(new Query(), Document.class, COLLECTION_NAME)) //
.as(StepVerifier::create) //
.expectNextCount(1) //
.verifyComplete();
@ -138,6 +137,25 @@ public class ReactiveClientSessionTests { @@ -138,6 +137,25 @@ public class ReactiveClientSessionTests {
.verifyComplete();
}
@Test // DATAMONGO-2001
public void countShouldOnlyReturnCorrectly() {
ClientSession session = Mono
.from(client.startSession(ClientSessionOptions.builder().causallyConsistent(true).build())).block();
template.withSession(() -> session).execute(action -> {
session.startTransaction();
return action.insert(new Document("_id", "id-2").append("value", "in transaction"), COLLECTION_NAME) //
.then(action.count(new Query(), Document.class, COLLECTION_NAME)) //
.flatMap(it -> Mono.from(session.commitTransaction()).then(Mono.just(it)));
}).as(StepVerifier::create) //
.expectNext(2L) //
.verifyComplete();
}
static class CountingSessionSupplier implements Supplier<ClientSession> {
AtomicInteger invocationCount = new AtomicInteger(0);

61
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/SessionBoundMongoTemplateTests.java

@ -16,7 +16,8 @@ @@ -16,7 +16,8 @@
package org.springframework.data.mongodb.core;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.*;
import lombok.Data;
@ -26,6 +27,9 @@ import java.lang.reflect.Proxy; @@ -26,6 +27,9 @@ import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.aopalliance.aop.Advice;
import org.bson.Document;
@ -55,6 +59,7 @@ import org.springframework.data.mongodb.core.convert.MongoCustomConversions; @@ -55,6 +59,7 @@ import org.springframework.data.mongodb.core.convert.MongoCustomConversions;
import org.springframework.data.mongodb.core.mapping.DBRef;
import org.springframework.data.mongodb.core.mapping.MongoMappingContext;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.test.util.MongoTestUtils;
import org.springframework.data.mongodb.test.util.MongoVersionRule;
import org.springframework.data.mongodb.test.util.ReplicaSet;
import org.springframework.data.util.Version;
@ -78,6 +83,7 @@ public class SessionBoundMongoTemplateTests { @@ -78,6 +83,7 @@ public class SessionBoundMongoTemplateTests {
public @Rule ExpectedException exception = ExpectedException.none();
MongoClient client;
MongoTemplate template;
SessionBoundMongoTemplate sessionBoundTemplate;
ClientSession session;
@ -87,7 +93,7 @@ public class SessionBoundMongoTemplateTests { @@ -87,7 +93,7 @@ public class SessionBoundMongoTemplateTests {
@Before
public void setUp() {
MongoClient client = new MongoClient();
client = MongoTestUtils.replSetClient();
MongoDbFactory factory = new SimpleMongoDbFactory(client, "session-bound-mongo-template-tests") {
@ -139,7 +145,9 @@ public class SessionBoundMongoTemplateTests { @@ -139,7 +145,9 @@ public class SessionBoundMongoTemplateTests {
@After
public void tearDown() {
session.close();
client.close();
}
@Test // DATAMONGO-1880
@ -259,6 +267,55 @@ public class SessionBoundMongoTemplateTests { @@ -259,6 +267,55 @@ public class SessionBoundMongoTemplateTests {
assertThat(result.getPersonRef()).isEqualTo(person); // resolve the lazy loading proxy
}
@Test // DATAMONGO-2001
public void countShouldOnlyReturnCorrectly() throws InterruptedException {
if (!template.collectionExists(Person.class)) {
template.createCollection(Person.class);
} else {
template.remove(Person.class).all();
}
List<Object> resultList = new CopyOnWriteArrayList<>();
int nrThreads = 2;
CountDownLatch countDownLatch = new CountDownLatch(nrThreads);
for (int i = 0; i < nrThreads; i++) {
new Thread(() -> {
ClientSession session = client.startSession();
session.startTransaction();
try {
MongoTemplate sessionBound = template.withSession(session);
try {
sessionBound.save(new Person("Kylar Stern"));
} finally {
countDownLatch.countDown();
}
countDownLatch.await(1, TimeUnit.SECONDS);
resultList.add(Long.valueOf(sessionBound.query(Person.class).count()));
} catch (Exception e) {
resultList.add(e);
}
session.commitTransaction();
session.close();
}).start();
}
countDownLatch.await();
assertThat(template.query(Person.class).count()).isEqualTo(2L);
assertThat(resultList).allMatch(it -> it.equals(1L));
}
@Data
static class WithDbRef {

130
src/main/asciidoc/reference/client-session-transactions.adoc

@ -127,7 +127,6 @@ template.withSession(session) @@ -127,7 +127,6 @@ template.withSession(session)
session.abortTransaction(); <4>
}
}, ClientSession::close) <5>
.subscribe();
----
<1> Obtain a new `ClientSession`.
<2> Start the transaction.
@ -155,16 +154,16 @@ TransactionTemplate txTemplate = new TransactionTemplate(anyTxManager); @@ -155,16 +154,16 @@ TransactionTemplate txTemplate = new TransactionTemplate(anyTxManager);
txTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) { <3>
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) { <3>
Step step = // ...;
template.insert(step);
Step step = // ...;
template.insert(step);
process(step);
process(step);
template.update(Step.class).apply(Update.set("state", // ...
};
template.update(Step.class).apply(Update.set("state", // ...
};
});
----
<1> Enable transaction synchronization during Template API configuration.
@ -186,26 +185,26 @@ The `MongoTransactionManager` binds a `ClientSession` to the thread. `MongoTempl @@ -186,26 +185,26 @@ The `MongoTransactionManager` binds a `ClientSession` to the thread. `MongoTempl
@Configuration
static class Config extends AbstractMongoConfiguration {
@Bean
MongoTransactionManager transactionManager(MongoDbFactory dbFactory) { <1>
return new MongoTransactionManager(dbFactory);
}
@Bean
MongoTransactionManager transactionManager(MongoDbFactory dbFactory) { <1>
return new MongoTransactionManager(dbFactory);
}
// ...
// ...
}
@Component
public class StateService {
@Transactional
void someBusinessFunction(Step step) { <2>
@Transactional
void someBusinessFunction(Step step) { <2>
template.insert(step);
template.insert(step);
process(step);
process(step);
template.update(Step.class).apply(Update.set("state", // ...
};
template.update(Step.class).apply(Update.set("state", // ...
};
});
----
@ -230,18 +229,18 @@ Using the plain MongoDB reactive driver API a `delete` within a transactional fl @@ -230,18 +229,18 @@ Using the plain MongoDB reactive driver API a `delete` within a transactional fl
[source,java]
----
Mono<DeleteResult> result = Mono
.from(client.startSession()) <1>
.from(client.startSession()) <1>
.flatMap(session -> {
session.startTransaction(); <2>
.flatMap(session -> {
session.startTransaction(); <2>
return Mono.from(collection.deleteMany(session, ...)) <3>
return Mono.from(collection.deleteMany(session, ...)) <3>
.onErrorResume(e -> Mono.from(session.abortTransaction()).then(Mono.error(e))) <4>
.onErrorResume(e -> Mono.from(session.abortTransaction()).then(Mono.error(e))) <4>
.flatMap(val -> Mono.from(session.commitTransaction()).then(Mono.just(val))) <5>
.flatMap(val -> Mono.from(session.commitTransaction()).then(Mono.just(val))) <5>
.doFinally(signal -> session.close()); <6>
.doFinally(signal -> session.close()); <6>
});
----
<1> First we obviously need to initiate the session.
@ -263,9 +262,9 @@ accordingly. This allows you to express the above flow simply as the following: @@ -263,9 +262,9 @@ accordingly. This allows you to express the above flow simply as the following:
====
[source,java]
----
Mono<DeleteResult> result = template.inTransaction() <1>
Mono<DeleteResult> result = template.inTransaction() <1>
.execute(action -> action.remove(query(where("id").is("step-1")), Step.class)); <2>
.execute(action -> action.remove(query(where("id").is("step-1")), Step.class)); <2>
----
<1> Initiate the transaction.
<2> Operate within the `ClientSession`. Each `execute(…)` unit of work callback initiates a new transaction in the scope of the same `ClientSession`.
@ -280,20 +279,77 @@ reactive flow of `execute(…)` that are not propagated to outside of the callba @@ -280,20 +279,77 @@ reactive flow of `execute(…)` that are not propagated to outside of the callba
====
[source,java]
----
template.inTransaction() <1>
template.inTransaction() <1>
.execute(action -> action.find(query(where("state").is("active")), Step.class)
.flatMap(step -> action.update(Step.class)
.matching(query(where("id").is(step.id)))
.apply(update("state", "paused"))
.all())) <2>
.execute(action -> action.find(query(where("state").is("active")), Step.class)
.flatMap(step -> action.update(Step.class)
.matching(query(where("id").is(step.id)))
.apply(update("state", "paused"))
.all())) <2>
.flatMap(updated -> {
// Exception could happen here <3>
});
.flatMap(updated -> {
// Exception could happen here <3>
});
----
<1> Initiate the managed transaction.
<2> Operate within the `ClientSession`. The transaction is committed after this is done or rolled back if an
error occurs here.
<3> An error outside the transaction flow has no affect on the previous transactional execution.
====
== Special behavior inside transactions
Inside transactions MongoDB server has a slightly different behavior.
*Connection Settings*
The MongoDB drivers offer a dedicated replica set name configuration option turing the driver into an auto detection
mode. This option helps identifying replica set master nodes and command routing during a transaction.
INFO: Make sure to add `replicaSet` to the MongoDB Uri. Please refer to https://docs.mongodb.com/manual/reference/connection-string/#connections-connection-options[connection string options] for further details.
*Collection Operations*
MongoDB does *not* support collection operations, such as collection creation, within a transaction. This also
affects the on the fly collection creation that happens on first usage. Therefore make sure to have all required
structures in place.
*Count*
MongoDB `count` operates upon collection statistics which may not reflect the actual situation within a transaction.
The server responds with _error 50851_ when issuing a `count` command inside of a multi-document transaction.
Once `MongoTemplate` detects an active transaction, all exposed `count()` methods are converted and delegated to the
aggregation framework using `$match` and `$count` operators, preserving `Query` settings, such as `collation`.
====
The following snippet of `count` inside the session bound closure
[source,javascript]
----
session.startTransaction();
template.withSession(session)
.execute(action -> {
action.count(query(where("state").is("active")), Step.class)
...
----
runs:
[source,javascript]
----
db.collection.aggregate(
[
{ $match: { state: "active" } },
{ $group: { _id: null, count: { $sum: 1 } } }
]
)
----
instead of:
[source,javascript]
----
db.collection.find( { state: "active" } ).count()
----
====
Loading…
Cancel
Save