Browse Source

Declarative way for setting MongoDB transaction options.

Closes #1628
Original pull request: #4552
pull/4666/head
Yan Kardziyaka 2 years ago committed by Mark Paluch
parent
commit
1f2cf88243
No known key found for this signature in database
GPG Key ID: 55BC6374BAA9D973
  1. 2
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/MongoTransactionManager.java
  2. 98
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/MongoTransactionUtils.java
  3. 2
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/ReactiveMongoTransactionManager.java
  4. 227
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/MongoTransactionUtilsUnitTests.java
  5. 159
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionIntegrationTests.java
  6. 96
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionOptionsTestService.java
  7. 144
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateTransactionTests.java
  8. 101
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/TransactionOptionsTestService.java

2
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/MongoTransactionManager.java

@ -134,7 +134,7 @@ public class MongoTransactionManager extends AbstractPlatformTransactionManager @@ -134,7 +134,7 @@ public class MongoTransactionManager extends AbstractPlatformTransactionManager
}
try {
mongoTransactionObject.startTransaction(options);
mongoTransactionObject.startTransaction(MongoTransactionUtils.extractOptions(definition, options));
} catch (MongoException ex) {
throw new TransactionSystemException(String.format("Could not start Mongo transaction for session %s.",
debugString(mongoTransactionObject.getSession())), ex);

98
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/MongoTransactionUtils.java

@ -0,0 +1,98 @@ @@ -0,0 +1,98 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.lang.Nullable;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.interceptor.TransactionAttribute;
import com.mongodb.ReadConcern;
import com.mongodb.ReadConcernLevel;
import com.mongodb.ReadPreference;
import com.mongodb.TransactionOptions;
import com.mongodb.WriteConcern;
/**
* Helper class for translating @Transactional labels into Mongo-specific {@link TransactionOptions}.
*
* @author Yan Kardziyaka
*/
public final class MongoTransactionUtils {
private static final Log LOGGER = LogFactory.getLog(MongoTransactionUtils.class);
private static final String MAX_COMMIT_TIME = "mongo:maxCommitTime";
private static final String READ_CONCERN_OPTION = "mongo:readConcern";
private static final String READ_PREFERENCE_OPTION = "mongo:readPreference";
private static final String WRITE_CONCERN_OPTION = "mongo:writeConcern";
private MongoTransactionUtils() {}
@Nullable
public static TransactionOptions extractOptions(TransactionDefinition transactionDefinition,
@Nullable TransactionOptions fallbackOptions) {
if (transactionDefinition instanceof TransactionAttribute transactionAttribute) {
TransactionOptions.Builder builder = null;
for (String label : transactionAttribute.getLabels()) {
String[] tokens = label.split("=", 2);
builder = tokens.length == 2 ? enhanceWithProperty(builder, tokens[0], tokens[1]) : builder;
}
if (builder == null) {
return fallbackOptions;
}
TransactionOptions options = builder.build();
return fallbackOptions == null ? options : TransactionOptions.merge(options, fallbackOptions);
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("%s cannot be casted to %s. Transaction labels won't be evaluated as options".formatted(
TransactionDefinition.class.getName(), TransactionAttribute.class.getName()));
}
return fallbackOptions;
}
}
@Nullable
private static TransactionOptions.Builder enhanceWithProperty(@Nullable TransactionOptions.Builder builder,
String key, String value) {
return switch (key) {
case MAX_COMMIT_TIME -> nullSafe(builder).maxCommitTime(Duration.parse(value).toMillis(), TimeUnit.MILLISECONDS);
case READ_CONCERN_OPTION -> nullSafe(builder).readConcern(new ReadConcern(ReadConcernLevel.fromString(value)));
case READ_PREFERENCE_OPTION -> nullSafe(builder).readPreference(ReadPreference.valueOf(value));
case WRITE_CONCERN_OPTION -> nullSafe(builder).writeConcern(getWriteConcern(value));
default -> builder;
};
}
private static TransactionOptions.Builder nullSafe(@Nullable TransactionOptions.Builder builder) {
return builder == null ? TransactionOptions.builder() : builder;
}
private static WriteConcern getWriteConcern(String writeConcernAsString) {
WriteConcern writeConcern = WriteConcern.valueOf(writeConcernAsString);
if (writeConcern == null) {
throw new IllegalArgumentException("'%s' is not a valid WriteConcern".formatted(writeConcernAsString));
}
return writeConcern;
}
}

2
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/ReactiveMongoTransactionManager.java

@ -146,7 +146,7 @@ public class ReactiveMongoTransactionManager extends AbstractReactiveTransaction @@ -146,7 +146,7 @@ public class ReactiveMongoTransactionManager extends AbstractReactiveTransaction
}).doOnNext(resourceHolder -> {
mongoTransactionObject.startTransaction(options);
mongoTransactionObject.startTransaction(MongoTransactionUtils.extractOptions(definition, options));
if (logger.isDebugEnabled()) {
logger.debug(String.format("Started transaction for session %s.", debugString(resourceHolder.getSession())));

227
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/MongoTransactionUtilsUnitTests.java

@ -0,0 +1,227 @@ @@ -0,0 +1,227 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb;
import static java.util.UUID.*;
import static org.assertj.core.api.Assertions.*;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
import org.springframework.transaction.interceptor.TransactionAttribute;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import com.mongodb.ReadConcern;
import com.mongodb.ReadPreference;
import com.mongodb.TransactionOptions;
import com.mongodb.WriteConcern;
/**
* @author Yan Kardziyaka
*/
class MongoTransactionUtilsUnitTests {
@Test // GH-1628
public void shouldThrowIllegalArgumentExceptionIfLabelsContainInvalidMaxCommitTime() {
TransactionOptions fallbackOptions = getTransactionOptions();
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setLabels(Set.of("mongo:maxCommitTime=-PT5S"));
assertThatThrownBy(() -> MongoTransactionUtils.extractOptions(attribute, fallbackOptions)) //
.isInstanceOf(IllegalArgumentException.class);
}
@Test // GH-1628
public void shouldThrowIllegalArgumentExceptionIfLabelsContainInvalidReadConcern() {
TransactionOptions fallbackOptions = getTransactionOptions();
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setLabels(Set.of("mongo:readConcern=invalidValue"));
assertThatThrownBy(() -> MongoTransactionUtils.extractOptions(attribute, fallbackOptions)) //
.isInstanceOf(IllegalArgumentException.class);
}
@Test // GH-1628
public void shouldThrowIllegalArgumentExceptionIfLabelsContainInvalidReadPreference() {
TransactionOptions fallbackOptions = getTransactionOptions();
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setLabels(Set.of("mongo:readPreference=invalidValue"));
assertThatThrownBy(() -> MongoTransactionUtils.extractOptions(attribute, fallbackOptions)) //
.isInstanceOf(IllegalArgumentException.class);
}
@Test // GH-1628
public void shouldThrowIllegalArgumentExceptionIfLabelsContainInvalidWriteConcern() {
TransactionOptions fallbackOptions = getTransactionOptions();
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setLabels(Set.of("mongo:writeConcern=invalidValue"));
assertThatThrownBy(() -> MongoTransactionUtils.extractOptions(attribute, fallbackOptions)) //
.isInstanceOf(IllegalArgumentException.class);
}
@Test // GH-1628
public void shouldReturnFallbackOptionsIfNotTransactionAttribute() {
TransactionOptions fallbackOptions = getTransactionOptions();
DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
TransactionOptions result = MongoTransactionUtils.extractOptions(definition, fallbackOptions);
assertThat(result).isSameAs(fallbackOptions);
}
@Test // GH-1628
public void shouldReturnFallbackOptionsIfNoLabelsProvided() {
TransactionOptions fallbackOptions = getTransactionOptions();
TransactionAttribute attribute = new DefaultTransactionAttribute();
TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, fallbackOptions);
assertThat(result).isSameAs(fallbackOptions);
}
@Test // GH-1628
public void shouldReturnFallbackOptionsIfLabelsDoesNotContainValidOptions() {
TransactionOptions fallbackOptions = getTransactionOptions();
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
Set<String> labels = Set.of("mongo:readConcern", "writeConcern", "readPreference=SECONDARY",
"mongo:maxCommitTime PT5M", randomUUID().toString());
attribute.setLabels(labels);
TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, fallbackOptions);
assertThat(result).isSameAs(fallbackOptions);
}
@Test // GH-1628
public void shouldReturnMergedOptionsIfLabelsContainMaxCommitTime() {
TransactionOptions fallbackOptions = getTransactionOptions();
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setLabels(Set.of("mongo:maxCommitTime=PT5S"));
TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, fallbackOptions);
assertThat(result).isNotSameAs(fallbackOptions) //
.returns(5L, from(options -> options.getMaxCommitTime(TimeUnit.SECONDS))) //
.returns(ReadConcern.AVAILABLE, from(TransactionOptions::getReadConcern)) //
.returns(ReadPreference.secondaryPreferred(), from(TransactionOptions::getReadPreference)) //
.returns(WriteConcern.UNACKNOWLEDGED, from(TransactionOptions::getWriteConcern));
}
@Test // GH-1628
public void shouldReturnMergedOptionsIfLabelsContainReadConcern() {
TransactionOptions fallbackOptions = getTransactionOptions();
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setLabels(Set.of("mongo:readConcern=majority"));
TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, fallbackOptions);
assertThat(result).isNotSameAs(fallbackOptions) //
.returns(1L, from(options -> options.getMaxCommitTime(TimeUnit.MINUTES))) //
.returns(ReadConcern.MAJORITY, from(TransactionOptions::getReadConcern)) //
.returns(ReadPreference.secondaryPreferred(), from(TransactionOptions::getReadPreference)) //
.returns(WriteConcern.UNACKNOWLEDGED, from(TransactionOptions::getWriteConcern));
}
@Test // GH-1628
public void shouldReturnMergedOptionsIfLabelsContainReadPreference() {
TransactionOptions fallbackOptions = getTransactionOptions();
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setLabels(Set.of("mongo:readPreference=primaryPreferred"));
TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, fallbackOptions);
assertThat(result).isNotSameAs(fallbackOptions) //
.returns(1L, from(options -> options.getMaxCommitTime(TimeUnit.MINUTES))) //
.returns(ReadConcern.AVAILABLE, from(TransactionOptions::getReadConcern)) //
.returns(ReadPreference.primaryPreferred(), from(TransactionOptions::getReadPreference)) //
.returns(WriteConcern.UNACKNOWLEDGED, from(TransactionOptions::getWriteConcern));
}
@Test // GH-1628
public void shouldReturnMergedOptionsIfLabelsContainWriteConcern() {
TransactionOptions fallbackOptions = getTransactionOptions();
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setLabels(Set.of("mongo:writeConcern=w3"));
TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, fallbackOptions);
assertThat(result).isNotSameAs(fallbackOptions) //
.returns(1L, from(options -> options.getMaxCommitTime(TimeUnit.MINUTES))) //
.returns(ReadConcern.AVAILABLE, from(TransactionOptions::getReadConcern)) //
.returns(ReadPreference.secondaryPreferred(), from(TransactionOptions::getReadPreference)) //
.returns(WriteConcern.W3, from(TransactionOptions::getWriteConcern));
}
@Test // GH-1628
public void shouldReturnNewOptionsIfLabelsContainAllOptions() {
TransactionOptions fallbackOptions = getTransactionOptions();
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
Set<String> labels = Set.of("mongo:maxCommitTime=PT5S", "mongo:readConcern=majority",
"mongo:readPreference=primaryPreferred", "mongo:writeConcern=w3");
attribute.setLabels(labels);
TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, fallbackOptions);
assertThat(result).isNotSameAs(fallbackOptions) //
.returns(5L, from(options -> options.getMaxCommitTime(TimeUnit.SECONDS))) //
.returns(ReadConcern.MAJORITY, from(TransactionOptions::getReadConcern)) //
.returns(ReadPreference.primaryPreferred(), from(TransactionOptions::getReadPreference)) //
.returns(WriteConcern.W3, from(TransactionOptions::getWriteConcern));
}
@Test // GH-1628
public void shouldReturnMergedOptionsIfLabelsContainOptionsMixedWithOrdinaryStrings() {
TransactionOptions fallbackOptions = getTransactionOptions();
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
Set<String> labels = Set.of("mongo:maxCommitTime=PT5S", "mongo:nonExistentOption=value", "label",
"mongo:writeConcern=w3");
attribute.setLabels(labels);
TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, fallbackOptions);
assertThat(result).isNotSameAs(fallbackOptions) //
.returns(5L, from(options -> options.getMaxCommitTime(TimeUnit.SECONDS))) //
.returns(ReadConcern.AVAILABLE, from(TransactionOptions::getReadConcern)) //
.returns(ReadPreference.secondaryPreferred(), from(TransactionOptions::getReadPreference)) //
.returns(WriteConcern.W3, from(TransactionOptions::getWriteConcern));
}
@Test // GH-1628
public void shouldReturnNewOptionsIFallbackIsNull() {
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
Set<String> labels = Set.of("mongo:maxCommitTime=PT5S", "mongo:writeConcern=w3");
attribute.setLabels(labels);
TransactionOptions result = MongoTransactionUtils.extractOptions(attribute, null);
assertThat(result).returns(5L, from(options -> options.getMaxCommitTime(TimeUnit.SECONDS))) //
.returns(null, from(TransactionOptions::getReadConcern)) //
.returns(null, from(TransactionOptions::getReadPreference)) //
.returns(WriteConcern.W3, from(TransactionOptions::getWriteConcern));
}
private TransactionOptions getTransactionOptions() {
return TransactionOptions.builder() //
.maxCommitTime(1L, TimeUnit.MINUTES) //
.readConcern(ReadConcern.AVAILABLE) //
.readPreference(ReadPreference.secondaryPreferred()) //
.writeConcern(WriteConcern.UNACKNOWLEDGED).build();
}
}

159
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionIntegrationTests.java

@ -15,6 +15,8 @@ @@ -15,6 +15,8 @@
*/
package org.springframework.data.mongodb;
import static java.util.UUID.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@ -35,6 +37,7 @@ import org.springframework.context.annotation.AnnotationConfigApplicationContext @@ -35,6 +37,7 @@ import org.springframework.context.annotation.AnnotationConfigApplicationContext
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.mongodb.config.AbstractReactiveMongoConfiguration;
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
import org.springframework.data.mongodb.core.mapping.Document;
@ -44,6 +47,8 @@ import org.springframework.data.mongodb.test.util.EnableIfMongoServerVersion; @@ -44,6 +47,8 @@ import org.springframework.data.mongodb.test.util.EnableIfMongoServerVersion;
import org.springframework.data.mongodb.test.util.EnableIfReplicaSetAvailable;
import org.springframework.data.mongodb.test.util.MongoClientExtension;
import org.springframework.data.mongodb.test.util.MongoTestUtils;
import org.springframework.transaction.TransactionSystemException;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.reactive.TransactionalOperator;
import org.springframework.transaction.support.DefaultTransactionDefinition;
@ -55,6 +60,7 @@ import com.mongodb.reactivestreams.client.MongoClient; @@ -55,6 +60,7 @@ import com.mongodb.reactivestreams.client.MongoClient;
*
* @author Mark Paluch
* @author Christoph Strobl
* @author Yan Kardziyaka
*/
@ExtendWith(MongoClientExtension.class)
@EnableIfMongoServerVersion(isGreaterThanEqual = "4.0")
@ -69,6 +75,7 @@ public class ReactiveTransactionIntegrationTests { @@ -69,6 +75,7 @@ public class ReactiveTransactionIntegrationTests {
PersonService personService;
ReactiveMongoOperations operations;
ReactiveTransactionOptionsTestService<Person> transactionOptionsTestService;
@BeforeAll
public static void init() {
@ -85,6 +92,7 @@ public class ReactiveTransactionIntegrationTests { @@ -85,6 +92,7 @@ public class ReactiveTransactionIntegrationTests {
personService = context.getBean(PersonService.class);
operations = context.getBean(ReactiveMongoOperations.class);
transactionOptionsTestService = context.getBean(ReactiveTransactionOptionsTestService.class);
try (MongoClient client = MongoTestUtils.reactiveClient()) {
@ -220,7 +228,123 @@ public class ReactiveTransactionIntegrationTests { @@ -220,7 +228,123 @@ public class ReactiveTransactionIntegrationTests {
.verifyComplete();
}
@Test // GH-1628
public void shouldThrowTransactionSystemExceptionOnTransactionWithInvalidMaxCommitTime() {
Person person = new Person(ObjectId.get(), randomUUID().toString(), randomUUID().toString());
transactionOptionsTestService.saveWithInvalidMaxCommitTime(person) //
.as(StepVerifier::create) //
.verifyError(TransactionSystemException.class);
operations.count(new Query(), Person.class) //
.as(StepVerifier::create) //
.expectNext(0L) //
.verifyComplete();
}
@Test // GH-1628
public void shouldCommitOnTransactionWithinMaxCommitTime() {
Person person = new Person(ObjectId.get(), randomUUID().toString(), randomUUID().toString());
transactionOptionsTestService.saveWithinMaxCommitTime(person) //
.as(StepVerifier::create) //
.expectNext(person) //
.verifyComplete();
operations.count(new Query(), Person.class) //
.as(StepVerifier::create) //
.expectNext(1L) //
.verifyComplete();
}
@Test // GH-1628
public void shouldThrowInvalidDataAccessApiUsageExceptionOnTransactionWithAvailableReadConcern() {
transactionOptionsTestService.availableReadConcernFind(randomUUID().toString()) //
.as(StepVerifier::create) //
.verifyError(InvalidDataAccessApiUsageException.class);
}
@Test // GH-1628
public void shouldThrowTransactionSystemExceptionOnTransactionWithInvalidReadConcern() {
transactionOptionsTestService.invalidReadConcernFind(randomUUID().toString()) //
.as(StepVerifier::create) //
.verifyError(TransactionSystemException.class);
}
@Test // GH-1628
public void shouldNotThrowOnTransactionWithMajorityReadConcern() {
transactionOptionsTestService.majorityReadConcernFind(randomUUID().toString()) //
.as(StepVerifier::create) //
.expectNextCount(0L) //
.verifyComplete();
}
@Test // GH-1628
public void shouldThrowUncategorizedMongoDbExceptionOnTransactionWithPrimaryPreferredReadPreference() {
transactionOptionsTestService.findFromPrimaryPreferredReplica(randomUUID().toString()) //
.as(StepVerifier::create) //
.verifyError(UncategorizedMongoDbException.class);
}
@Test // GH-1628
public void shouldThrowTransactionSystemExceptionOnTransactionWithInvalidReadPreference() {
transactionOptionsTestService.findFromInvalidReplica(randomUUID().toString()) //
.as(StepVerifier::create) //
.verifyError(TransactionSystemException.class);
}
@Test // GH-1628
public void shouldNotThrowOnTransactionWithPrimaryReadPreference() {
transactionOptionsTestService.findFromPrimaryReplica(randomUUID().toString()) //
.as(StepVerifier::create) //
.expectNextCount(0L) //
.verifyComplete();
}
@Test // GH-1628
public void shouldThrowTransactionSystemExceptionOnTransactionWithUnacknowledgedWriteConcern() {
Person person = new Person(ObjectId.get(), randomUUID().toString(), randomUUID().toString());
transactionOptionsTestService.unacknowledgedWriteConcernSave(person) //
.as(StepVerifier::create) //
.verifyError(TransactionSystemException.class);
operations.count(new Query(), Person.class) //
.as(StepVerifier::create).expectNext(0L) //
.verifyComplete();
}
@Test // GH-1628
public void shouldThrowTransactionSystemExceptionOnTransactionWithInvalidWriteConcern() {
Person person = new Person(ObjectId.get(), randomUUID().toString(), randomUUID().toString());
transactionOptionsTestService.invalidWriteConcernSave(person) //
.as(StepVerifier::create) //
.verifyError(TransactionSystemException.class);
operations.count(new Query(), Person.class) //
.as(StepVerifier::create) //
.expectNext(0L) //
.verifyComplete();
}
@Test // GH-1628
public void shouldCommitOnTransactionWithAcknowledgedWriteConcern() {
Person person = new Person(ObjectId.get(), randomUUID().toString(), randomUUID().toString());
transactionOptionsTestService.acknowledgedWriteConcernSave(person) //
.as(StepVerifier::create) //
.expectNext(person) //
.verifyComplete();
operations.count(new Query(), Person.class) //
.as(StepVerifier::create) //
.expectNext(1L) //
.verifyComplete();
}
@Configuration
@EnableTransactionManagement
static class TestMongoConfig extends AbstractReactiveMongoConfiguration {
@Override
@ -234,10 +358,16 @@ public class ReactiveTransactionIntegrationTests { @@ -234,10 +358,16 @@ public class ReactiveTransactionIntegrationTests {
}
@Bean
public ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory factory) {
public ReactiveMongoTransactionManager txManager(ReactiveMongoDatabaseFactory factory) {
return new ReactiveMongoTransactionManager(factory);
}
@Bean
public ReactiveTransactionOptionsTestService<Person> transactionOptionsTestService(
ReactiveMongoOperations operations) {
return new ReactiveTransactionOptionsTestService<>(operations, Person.class);
}
@Override
protected Set<Class<?>> getInitialEntitySet() {
return Collections.singleton(Person.class);
@ -291,10 +421,10 @@ public class ReactiveTransactionIntegrationTests { @@ -291,10 +421,10 @@ public class ReactiveTransactionIntegrationTests {
new DefaultTransactionDefinition());
return Flux.merge(operations.save(new EventLog(new ObjectId(), "beforeConvert")), //
operations.save(new EventLog(new ObjectId(), "afterConvert")), //
operations.save(new EventLog(new ObjectId(), "beforeInsert")), //
operations.save(person), //
operations.save(new EventLog(new ObjectId(), "afterInsert"))) //
operations.save(new EventLog(new ObjectId(), "afterConvert")), //
operations.save(new EventLog(new ObjectId(), "beforeInsert")), //
operations.save(person), //
operations.save(new EventLog(new ObjectId(), "afterInsert"))) //
.thenMany(operations.query(EventLog.class).all()) //
.as(transactionalOperator::transactional);
}
@ -305,15 +435,15 @@ public class ReactiveTransactionIntegrationTests { @@ -305,15 +435,15 @@ public class ReactiveTransactionIntegrationTests {
new DefaultTransactionDefinition());
return Flux.merge(operations.save(new EventLog(new ObjectId(), "beforeConvert")), //
operations.save(new EventLog(new ObjectId(), "afterConvert")), //
operations.save(new EventLog(new ObjectId(), "beforeInsert")), //
operations.save(person), //
operations.save(new EventLog(new ObjectId(), "afterInsert"))) //
operations.save(new EventLog(new ObjectId(), "afterConvert")), //
operations.save(new EventLog(new ObjectId(), "beforeInsert")), //
operations.save(person), //
operations.save(new EventLog(new ObjectId(), "afterInsert"))) //
.<Void> flatMap(it -> Mono.error(new RuntimeException("poof"))) //
.as(transactionalOperator::transactional);
}
@Transactional
@Transactional(transactionManager = "txManager")
public Flux<Person> declarativeSavePerson(Person person) {
TransactionalOperator transactionalOperator = TransactionalOperator.create(manager,
@ -324,7 +454,7 @@ public class ReactiveTransactionIntegrationTests { @@ -324,7 +454,7 @@ public class ReactiveTransactionIntegrationTests {
});
}
@Transactional
@Transactional(transactionManager = "txManager")
public Flux<Person> declarativeSavePersonErrors(Person person) {
TransactionalOperator transactionalOperator = TransactionalOperator.create(manager,
@ -384,8 +514,8 @@ public class ReactiveTransactionIntegrationTests { @@ -384,8 +514,8 @@ public class ReactiveTransactionIntegrationTests {
return false;
}
Person person = (Person) o;
return Objects.equals(id, person.id) && Objects.equals(firstname, person.firstname)
&& Objects.equals(lastname, person.lastname);
return Objects.equals(id, person.id) && Objects.equals(firstname, person.firstname) && Objects.equals(lastname,
person.lastname);
}
@Override
@ -394,8 +524,7 @@ public class ReactiveTransactionIntegrationTests { @@ -394,8 +524,7 @@ public class ReactiveTransactionIntegrationTests {
}
public String toString() {
return "ReactiveTransactionIntegrationTests.Person(id=" + this.getId() + ", firstname=" + this.getFirstname()
+ ", lastname=" + this.getLastname() + ")";
return "ReactiveTransactionIntegrationTests.Person(id=" + this.getId() + ", firstname=" + this.getFirstname() + ", lastname=" + this.getLastname() + ")";
}
}

96
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionOptionsTestService.java

@ -0,0 +1,96 @@ @@ -0,0 +1,96 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb;
import reactor.core.publisher.Mono;
import java.util.function.Function;
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
import org.springframework.transaction.annotation.Transactional;
/**
* Helper class for integration tests of {@link Transactional#label()} MongoDb options in reactive context.
*
* @param <T> root document type
* @author Yan Kardziyaka
* @see org.springframework.data.mongodb.core.TransactionOptionsTestService
*/
public class ReactiveTransactionOptionsTestService<T> {
private final Function<Object, Mono<T>> findByIdFunction;
private final Function<T, Mono<T>> saveFunction;
public ReactiveTransactionOptionsTestService(ReactiveMongoOperations operations, Class<T> entityClass) {
this.findByIdFunction = id -> operations.findById(id, entityClass);
this.saveFunction = operations::save;
}
@Transactional(transactionManager = "txManager", label = { "mongo:maxCommitTime=-PT6H3M" })
public Mono<T> saveWithInvalidMaxCommitTime(T entity) {
return saveFunction.apply(entity);
}
@Transactional(transactionManager = "txManager", label = { "mongo:maxCommitTime=PT1M" })
public Mono<T> saveWithinMaxCommitTime(T entity) {
return saveFunction.apply(entity);
}
@Transactional(transactionManager = "txManager", label = { "mongo:readConcern=available" })
public Mono<T> availableReadConcernFind(Object id) {
return findByIdFunction.apply(id);
}
@Transactional(transactionManager = "txManager", label = { "mongo:readConcern=invalid" })
public Mono<T> invalidReadConcernFind(Object id) {
return findByIdFunction.apply(id);
}
@Transactional(transactionManager = "txManager", label = { "mongo:readConcern=majority" })
public Mono<T> majorityReadConcernFind(Object id) {
return findByIdFunction.apply(id);
}
@Transactional(transactionManager = "txManager", label = { "mongo:readPreference=primaryPreferred" })
public Mono<T> findFromPrimaryPreferredReplica(Object id) {
return findByIdFunction.apply(id);
}
@Transactional(transactionManager = "txManager", label = { "mongo:readPreference=invalid" })
public Mono<T> findFromInvalidReplica(Object id) {
return findByIdFunction.apply(id);
}
@Transactional(transactionManager = "txManager", label = { "mongo:readPreference=primary" })
public Mono<T> findFromPrimaryReplica(Object id) {
return findByIdFunction.apply(id);
}
@Transactional(transactionManager = "txManager", label = { "mongo:writeConcern=unacknowledged" })
public Mono<T> unacknowledgedWriteConcernSave(T entity) {
return saveFunction.apply(entity);
}
@Transactional(transactionManager = "txManager", label = { "mongo:writeConcern=invalid" })
public Mono<T> invalidWriteConcernSave(T entity) {
return saveFunction.apply(entity);
}
@Transactional(transactionManager = "txManager", label = { "mongo:writeConcern=acknowledged" })
public Mono<T> acknowledgedWriteConcernSave(T entity) {
return saveFunction.apply(entity);
}
}

144
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateTransactionTests.java

@ -15,6 +15,7 @@ @@ -15,6 +15,7 @@
*/
package org.springframework.data.mongodb.core;
import static java.util.UUID.*;
import static org.assertj.core.api.Assertions.*;
import static org.springframework.data.mongodb.core.query.Criteria.*;
import static org.springframework.data.mongodb.core.query.Query.*;
@ -33,10 +34,12 @@ import org.junit.jupiter.api.extension.ExtendWith; @@ -33,10 +34,12 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.annotation.Id;
import org.springframework.data.domain.Persistable;
import org.springframework.data.mongodb.MongoDatabaseFactory;
import org.springframework.data.mongodb.MongoTransactionManager;
import org.springframework.data.mongodb.UncategorizedMongoDbException;
import org.springframework.data.mongodb.config.AbstractMongoClientConfiguration;
import org.springframework.data.mongodb.test.util.AfterTransactionAssertion;
import org.springframework.data.mongodb.test.util.EnableIfMongoServerVersion;
@ -48,6 +51,9 @@ import org.springframework.test.context.ContextConfiguration; @@ -48,6 +51,9 @@ import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.test.context.transaction.AfterTransaction;
import org.springframework.test.context.transaction.BeforeTransaction;
import org.springframework.transaction.TransactionSystemException;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import com.mongodb.ReadPreference;
@ -57,6 +63,7 @@ import com.mongodb.client.model.Filters; @@ -57,6 +63,7 @@ import com.mongodb.client.model.Filters;
/**
* @author Christoph Strobl
* @author Yan Kardziyaka
* @currentRead Shadow's Edge - Brent Weeks
*/
@ExtendWith({ MongoClientExtension.class, SpringExtension.class })
@ -72,6 +79,7 @@ public class MongoTemplateTransactionTests { @@ -72,6 +79,7 @@ public class MongoTemplateTransactionTests {
static @ReplSetClient MongoClient mongoClient;
@Configuration
@EnableTransactionManagement
static class Config extends AbstractMongoClientConfiguration {
@Bean
@ -98,10 +106,19 @@ public class MongoTemplateTransactionTests { @@ -98,10 +106,19 @@ public class MongoTemplateTransactionTests {
protected Set<Class<?>> getInitialEntitySet() throws ClassNotFoundException {
return Collections.emptySet();
}
@Bean
public TransactionOptionsTestService<Assassin> transactionOptionsTestService(MongoOperations operations) {
return new TransactionOptionsTestService<>(operations, Assassin.class);
}
}
@Autowired MongoTemplate template;
@Autowired MongoClient client;
@Autowired
MongoTemplate template;
@Autowired
MongoClient client;
@Autowired
TransactionOptionsTestService<Assassin> transactionOptionsTestService;
List<AfterTransactionAssertion<? extends Persistable<?>>> assertionList;
@ -127,8 +144,8 @@ public class MongoTemplateTransactionTests { @@ -127,8 +144,8 @@ public class MongoTemplateTransactionTests {
boolean isPresent = collection.countDocuments(Filters.eq("_id", it.getId())) != 0;
assertThat(isPresent).isEqualTo(it.shouldBePresent())
.withFailMessage(String.format("After transaction entity %s should %s.", it.getPersistable(),
assertThat(isPresent).isEqualTo(it.shouldBePresent()).withFailMessage(
String.format("After transaction entity %s should %s.", it.getPersistable(),
it.shouldBePresent() ? "be present" : "NOT be present"));
});
}
@ -166,6 +183,122 @@ public class MongoTemplateTransactionTests { @@ -166,6 +183,122 @@ public class MongoTemplateTransactionTests {
assertAfterTransaction(durzo).isNotPresent();
}
@Rollback(false)
@Test // GH-1628
@Transactional(transactionManager = "txManager", propagation = Propagation.NEVER)
public void shouldThrowIllegalArgumentExceptionOnTransactionWithInvalidMaxCommitTime() {
Assassin assassin = new Assassin(randomUUID().toString(), randomUUID().toString());
assertThatThrownBy(() -> transactionOptionsTestService.saveWithInvalidMaxCommitTime(assassin)) //
.isInstanceOf(IllegalArgumentException.class);
assertAfterTransaction(assassin).isNotPresent();
}
@Rollback(false)
@Test // GH-1628
@Transactional(transactionManager = "txManager", propagation = Propagation.NEVER)
public void shouldCommitOnTransactionWithinMaxCommitTime() {
Assassin assassin = new Assassin(randomUUID().toString(), randomUUID().toString());
transactionOptionsTestService.saveWithinMaxCommitTime(assassin);
assertAfterTransaction(assassin).isPresent();
}
@Rollback(false)
@Test // GH-1628
@Transactional(transactionManager = "txManager", propagation = Propagation.NEVER)
public void shouldThrowInvalidDataAccessApiUsageExceptionOnTransactionWithAvailableReadConcern() {
assertThatThrownBy(() -> transactionOptionsTestService.availableReadConcernFind(randomUUID().toString())) //
.isInstanceOf(InvalidDataAccessApiUsageException.class);
}
@Rollback(false)
@Test // GH-1628
@Transactional(transactionManager = "txManager", propagation = Propagation.NEVER)
public void shouldThrowIllegalArgumentExceptionOnTransactionWithInvalidReadConcern() {
assertThatThrownBy(() -> transactionOptionsTestService.invalidReadConcernFind(randomUUID().toString())) //
.isInstanceOf(IllegalArgumentException.class);
}
@Rollback(false)
@Test // GH-1628
@Transactional(transactionManager = "txManager", propagation = Propagation.NEVER)
public void shouldNotThrowOnTransactionWithMajorityReadConcern() {
assertThatNoException() //
.isThrownBy(() -> transactionOptionsTestService.majorityReadConcernFind(randomUUID().toString()));
}
@Rollback(false)
@Test // GH-1628
@Transactional(transactionManager = "txManager", propagation = Propagation.NEVER)
public void shouldThrowUncategorizedMongoDbExceptionOnTransactionWithPrimaryPreferredReadPreference() {
assertThatThrownBy(() -> transactionOptionsTestService.findFromPrimaryPreferredReplica(randomUUID().toString())) //
.isInstanceOf(UncategorizedMongoDbException.class);
}
@Rollback(false)
@Test // GH-1628
@Transactional(transactionManager = "txManager", propagation = Propagation.NEVER)
public void shouldThrowIllegalArgumentExceptionOnTransactionWithInvalidReadPreference() {
assertThatThrownBy(() -> transactionOptionsTestService.findFromInvalidReplica(randomUUID().toString())) //
.isInstanceOf(IllegalArgumentException.class);
}
@Rollback(false)
@Test // GH-1628
@Transactional(transactionManager = "txManager", propagation = Propagation.NEVER)
public void shouldNotThrowOnTransactionWithPrimaryReadPreference() {
assertThatNoException() //
.isThrownBy(() -> transactionOptionsTestService.findFromPrimaryReplica(randomUUID().toString()));
}
@Rollback(false)
@Test // GH-1628
@Transactional(transactionManager = "txManager", propagation = Propagation.NEVER)
public void shouldThrowTransactionSystemExceptionOnTransactionWithUnacknowledgedWriteConcern() {
Assassin assassin = new Assassin(randomUUID().toString(), randomUUID().toString());
assertThatThrownBy(() -> transactionOptionsTestService.unacknowledgedWriteConcernSave(assassin)) //
.isInstanceOf(TransactionSystemException.class);
assertAfterTransaction(assassin).isNotPresent();
}
@Rollback(false)
@Test // GH-1628
@Transactional(transactionManager = "txManager", propagation = Propagation.NEVER)
public void shouldThrowIllegalArgumentExceptionOnTransactionWithInvalidWriteConcern() {
Assassin assassin = new Assassin(randomUUID().toString(), randomUUID().toString());
assertThatThrownBy(() -> transactionOptionsTestService.invalidWriteConcernSave(assassin)) //
.isInstanceOf(IllegalArgumentException.class);
assertAfterTransaction(assassin).isNotPresent();
}
@Rollback(false)
@Test // GH-1628
@Transactional(transactionManager = "txManager", propagation = Propagation.NEVER)
public void shouldCommitOnTransactionWithAcknowledgedWriteConcern() {
Assassin assassin = new Assassin(randomUUID().toString(), randomUUID().toString());
transactionOptionsTestService.acknowledgedWriteConcernSave(assassin);
assertAfterTransaction(assassin).isPresent();
}
// --- Just some helpers and tests entities
private AfterTransactionAssertion assertAfterTransaction(Assassin assassin) {
@ -178,7 +311,8 @@ public class MongoTemplateTransactionTests { @@ -178,7 +311,8 @@ public class MongoTemplateTransactionTests {
@org.springframework.data.mongodb.core.mapping.Document(COLLECTION_NAME)
static class Assassin implements Persistable<String> {
@Id String id;
@Id
String id;
String name;
public Assassin(String id, String name) {

101
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/TransactionOptionsTestService.java

@ -0,0 +1,101 @@ @@ -0,0 +1,101 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import org.springframework.lang.Nullable;
import org.springframework.transaction.annotation.Transactional;
/**
* Helper class for integration tests of {@link Transactional#label()} MongoDb options in non-reactive context.
*
* @param <T> root document type
* @author Yan Kardziyaka
* @see org.springframework.data.mongodb.ReactiveTransactionOptionsTestService
*/
public class TransactionOptionsTestService<T> {
private final Function<Object, T> findByIdFunction;
private final UnaryOperator<T> saveFunction;
public TransactionOptionsTestService(MongoOperations operations, Class<T> entityClass) {
this.findByIdFunction = id -> operations.findById(id, entityClass);
this.saveFunction = operations::save;
}
@Transactional(transactionManager = "txManager", label = { "mongo:maxCommitTime=-PT6H3M" })
public T saveWithInvalidMaxCommitTime(T entity) {
return saveFunction.apply(entity);
}
@Transactional(transactionManager = "txManager", label = { "mongo:maxCommitTime=PT1M" })
public T saveWithinMaxCommitTime(T entity) {
return saveFunction.apply(entity);
}
@Nullable
@Transactional(transactionManager = "txManager", label = { "mongo:readConcern=available" })
public T availableReadConcernFind(Object id) {
return findByIdFunction.apply(id);
}
@Nullable
@Transactional(transactionManager = "txManager", label = { "mongo:readConcern=invalid" })
public T invalidReadConcernFind(Object id) {
return findByIdFunction.apply(id);
}
@Nullable
@Transactional(transactionManager = "txManager", label = { "mongo:readConcern=majority" })
public T majorityReadConcernFind(Object id) {
return findByIdFunction.apply(id);
}
@Nullable
@Transactional(transactionManager = "txManager", label = { "mongo:readPreference=primaryPreferred" })
public T findFromPrimaryPreferredReplica(Object id) {
return findByIdFunction.apply(id);
}
@Nullable
@Transactional(transactionManager = "txManager", label = { "mongo:readPreference=invalid" })
public T findFromInvalidReplica(Object id) {
return findByIdFunction.apply(id);
}
@Nullable
@Transactional(transactionManager = "txManager", label = { "mongo:readPreference=primary" })
public T findFromPrimaryReplica(Object id) {
return findByIdFunction.apply(id);
}
@Transactional(transactionManager = "txManager", label = { "mongo:writeConcern=unacknowledged" })
public T unacknowledgedWriteConcernSave(T entity) {
return saveFunction.apply(entity);
}
@Transactional(transactionManager = "txManager", label = { "mongo:writeConcern=invalid" })
public T invalidWriteConcernSave(T entity) {
return saveFunction.apply(entity);
}
@Transactional(transactionManager = "txManager", label = { "mongo:writeConcern=acknowledged" })
public T acknowledgedWriteConcernSave(T entity) {
return saveFunction.apply(entity);
}
}
Loading…
Cancel
Save