diff --git a/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/ConnectionHolder.java b/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/ConnectionHolder.java
index 5ad913d0b81..dd1b9c36c71 100644
--- a/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/ConnectionHolder.java
+++ b/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/ConnectionHolder.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2022 the original author or authors.
+ * Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -41,11 +41,20 @@ import org.springframework.util.Assert;
*/
public class ConnectionHolder extends ResourceHolderSupport {
+ /**
+ * Prefix for savepoint names.
+ * @since 6.0.10
+ */
+ static final String SAVEPOINT_NAME_PREFIX = "SAVEPOINT_";
+
+
@Nullable
private Connection currentConnection;
private boolean transactionActive;
+ private int savepointCounter = 0;
+
/**
* Create a new ConnectionHolder for the given R2DBC {@link Connection},
@@ -112,6 +121,17 @@ public class ConnectionHolder extends ResourceHolderSupport {
return this.currentConnection;
}
+ /**
+ * Create a new savepoint for the current {@link Connection},
+ * using generated savepoint names that are unique for the Connection.
+ * @return the name of the new savepoint
+ * @since 6.0.10
+ */
+ String nextSavepoint() {
+ this.savepointCounter++;
+ return SAVEPOINT_NAME_PREFIX + this.savepointCounter;
+ }
+
/**
* Releases the current {@link Connection}.
*/
diff --git a/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java b/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java
index 1143aa1a7f7..9fe2e1140e6 100644
--- a/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java
+++ b/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java
@@ -24,7 +24,6 @@ import io.r2dbc.spi.IsolationLevel;
import io.r2dbc.spi.Option;
import io.r2dbc.spi.R2dbcException;
import io.r2dbc.spi.Result;
-import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import org.springframework.beans.factory.InitializingBean;
@@ -38,46 +37,42 @@ import org.springframework.transaction.reactive.TransactionSynchronizationManage
import org.springframework.util.Assert;
/**
- * {@link org.springframework.transaction.ReactiveTransactionManager}
- * implementation for a single R2DBC {@link ConnectionFactory}. This class is
- * capable of working in any environment with any R2DBC driver, as long as the
- * setup uses a {@code ConnectionFactory} as its {@link Connection} factory
- * mechanism. Binds a R2DBC {@code Connection} from the specified
- * {@code ConnectionFactory} to the current subscriber context, potentially
- * allowing for one context-bound {@code Connection} per {@code ConnectionFactory}.
+ * {@link org.springframework.transaction.ReactiveTransactionManager} implementation
+ * for a single R2DBC {@link ConnectionFactory}. This class is capable of working
+ * in any environment with any R2DBC driver, as long as the setup uses a
+ * {@code ConnectionFactory} as its {@link Connection} factory mechanism.
+ * Binds a R2DBC {@code Connection} from the specified {@code ConnectionFactory}
+ * to the current subscriber context, potentially allowing for one context-bound
+ * {@code Connection} per {@code ConnectionFactory}.
*
- *
Note: The {@code ConnectionFactory} that this transaction manager
- * operates on needs to return independent {@code Connection}s.
- * The {@code Connection}s may come from a pool (the typical case), but the
- * {@code ConnectionFactory} must not return scoped {@code Connection}s
- * or the like. This transaction manager will associate {@code Connection}
- * with context-bound transactions itself, according to the specified propagation
- * behavior. It assumes that a separate, independent {@code Connection} can
- * be obtained even during an ongoing transaction.
+ *
Note: The {@code ConnectionFactory} that this transaction manager operates
+ * on needs to return independent {@code Connection}s. The {@code Connection}s
+ * typically come from a connection pool but the {@code ConnectionFactory} must not
+ * return specifically scoped or constrained {@code Connection}s. This transaction
+ * manager will associate {@code Connection} with context-bound transactions,
+ * according to the specified propagation behavior. It assumes that a separate,
+ * independent {@code Connection} can be obtained even during an ongoing transaction.
*
*
Application code is required to retrieve the R2DBC Connection via
* {@link ConnectionFactoryUtils#getConnection(ConnectionFactory)}
* instead of a standard R2DBC-style {@link ConnectionFactory#create()} call.
* Spring classes such as {@code DatabaseClient} use this strategy implicitly.
* If not used in combination with this transaction manager, the
- * {@link ConnectionFactoryUtils} lookup strategy behaves exactly like the
- * native {@code ConnectionFactory} lookup; it can thus be used in a portable fashion.
+ * {@link ConnectionFactoryUtils} lookup strategy behaves exactly like the native
+ * {@code ConnectionFactory} lookup; it can thus be used in a portable fashion.
*
- *
Alternatively, you can allow application code to work with the standard
- * R2DBC lookup pattern {@link ConnectionFactory#create()}, for example for code
- * that is not aware of Spring at all. In that case, define a
- * {@link TransactionAwareConnectionFactoryProxy} for your target {@code ConnectionFactory},
- * and pass that proxy {@code ConnectionFactory} to your DAOs, which will automatically
- * participate in Spring-managed transactions when accessing it.
+ *
Alternatively, you can allow application code to work with the lookup pattern
+ * {@link ConnectionFactory#create()}, for example for code not aware of Spring.
+ * In that case, define a {@link TransactionAwareConnectionFactoryProxy} for your
+ * target {@code ConnectionFactory}, and pass that proxy {@code ConnectionFactory}
+ * to your DAOs which will automatically participate in Spring-managed transactions
+ * when accessing it.
*
- *
This transaction manager triggers flush callbacks on registered transaction
- * synchronizations (if synchronization is generally active), assuming resources
- * operating on the underlying R2DBC {@code Connection}.
- *
- *
Spring's {@code TransactionDefinition} attributes are carried forward to R2DBC drivers
- * using extensible R2DBC {@link io.r2dbc.spi.TransactionDefinition}. Subclasses may
- * override {@link #createTransactionDefinition(TransactionDefinition)} to customize
- * transaction definitions for vendor-specific attributes.
+ *
Spring's {@code TransactionDefinition} attributes are carried forward to
+ * R2DBC drivers using extensible R2DBC {@link io.r2dbc.spi.TransactionDefinition}.
+ * Subclasses may override {@link #createTransactionDefinition(TransactionDefinition)}
+ * to customize transaction definitions for vendor-specific attributes. As of 6.0.10,
+ * this transaction manager supports nested transactions via R2DBC savepoints as well.
*
* @author Mark Paluch
* @author Juergen Hoeller
@@ -97,7 +92,7 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
/**
* Create a new {@code R2dbcTransactionManager} instance.
- * A ConnectionFactory has to be set to be able to use it.
+ * A {@code ConnectionFactory} has to be set to be able to use it.
* @see #setConnectionFactory
*/
public R2dbcTransactionManager() {}
@@ -114,12 +109,13 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
/**
- * Set the R2DBC {@link ConnectionFactory} that this instance should manage transactions for.
- *
This will typically be a locally defined {@code ConnectionFactory}, for example a connection pool.
- *
The {@code ConnectionFactory} passed in here needs to return independent {@link Connection}s.
- * The {@code Connection}s may come from a pool (the typical case), but the {@code ConnectionFactory}
- * must not return scoped {@code Connection}s or the like.
- * @see TransactionAwareConnectionFactoryProxy
+ * Set the R2DBC {@link ConnectionFactory} that this instance should manage transactions
+ * for. This will typically be a locally defined {@code ConnectionFactory}, for example
+ * an R2DBC connection pool.
+ *
The {@code ConnectionFactory} passed in here needs to return independent
+ * {@link Connection}s. The {@code Connection}s typically come from a connection
+ * pool but the {@code ConnectionFactory} must not return specifically scoped or
+ * constrained {@code Connection}s.
*/
public void setConnectionFactory(@Nullable ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
@@ -183,8 +179,7 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
@Override
protected boolean isExistingTransaction(Object transaction) {
- ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject) transaction;
- return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
+ return ((ConnectionFactoryTransactionObject) transaction).isTransactionActive();
}
@Override
@@ -193,6 +188,11 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject) transaction;
+ if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED &&
+ txObject.isTransactionActive()) {
+ return txObject.createSavepoint();
+ }
+
return Mono.defer(() -> {
Mono connectionMono;
@@ -210,7 +210,7 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
connectionMono = Mono.just(txObject.getConnectionHolder().getConnection());
}
- return connectionMono.flatMap(con -> Mono.from(doBegin(definition, con))
+ return connectionMono.flatMap(con -> doBegin(definition, con)
.then(prepareTransactionalConnection(con, definition))
.doOnSuccess(v -> {
txObject.getConnectionHolder().setTransactionActive(true);
@@ -234,12 +234,12 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
}).then();
}
- private Publisher doBegin(TransactionDefinition definition, Connection con) {
+ private Mono doBegin(TransactionDefinition definition, Connection con) {
io.r2dbc.spi.TransactionDefinition transactionDefinition = createTransactionDefinition(definition);
if (logger.isDebugEnabled()) {
logger.debug("Starting R2DBC transaction on Connection [" + con + "] using [" + transactionDefinition + "]");
}
- return con.beginTransaction(transactionDefinition);
+ return Mono.from(con.beginTransaction(transactionDefinition));
}
/**
@@ -300,12 +300,11 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
GenericReactiveTransaction status) throws TransactionException {
ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject) status.getTransaction();
- Connection connection = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
- logger.debug("Committing R2DBC transaction on Connection [" + connection + "]");
+ logger.debug("Committing R2DBC transaction on Connection [" +
+ txObject.getConnectionHolder().getConnection() + "]");
}
- return Mono.from(connection.commitTransaction())
- .onErrorMap(R2dbcException.class, ex -> translateException("R2DBC commit", ex));
+ return txObject.commit().onErrorMap(R2dbcException.class, ex -> translateException("R2DBC commit", ex));
}
@Override
@@ -313,12 +312,11 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
GenericReactiveTransaction status) throws TransactionException {
ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject) status.getTransaction();
- Connection connection = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
- logger.debug("Rolling back R2DBC transaction on Connection [" + connection + "]");
+ logger.debug("Rolling back R2DBC transaction on Connection [" +
+ txObject.getConnectionHolder().getConnection() + "]");
}
- return Mono.from(connection.rollbackTransaction())
- .onErrorMap(R2dbcException.class, ex -> translateException("R2DBC rollback", ex));
+ return txObject.rollback().onErrorMap(R2dbcException.class, ex -> translateException("R2DBC rollback", ex));
}
@Override
@@ -496,6 +494,9 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
private boolean newConnectionHolder;
+ @Nullable
+ private String savepointName;
+
void setConnectionHolder(@Nullable ConnectionHolder connectionHolder, boolean newConnectionHolder) {
setConnectionHolder(connectionHolder);
this.newConnectionHolder = newConnectionHolder;
@@ -505,10 +506,6 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
return this.newConnectionHolder;
}
- void setRollbackOnly() {
- getConnectionHolder().setRollbackOnly();
- }
-
public void setConnectionHolder(@Nullable ConnectionHolder connectionHolder) {
this.connectionHolder = connectionHolder;
}
@@ -521,6 +518,34 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
public boolean hasConnectionHolder() {
return (this.connectionHolder != null);
}
+
+ public boolean isTransactionActive() {
+ return (this.connectionHolder != null && this.connectionHolder.isTransactionActive());
+ }
+
+ public Mono createSavepoint() {
+ ConnectionHolder holder = getConnectionHolder();
+ this.savepointName = holder.nextSavepoint();
+ return Mono.from(holder.getConnection().createSavepoint(this.savepointName));
+ }
+
+ public Mono commit() {
+ Connection connection = getConnectionHolder().getConnection();
+ return (this.savepointName != null ?
+ Mono.from(connection.releaseSavepoint(this.savepointName)) :
+ Mono.from(connection.commitTransaction()));
+ }
+
+ public Mono rollback() {
+ Connection connection = getConnectionHolder().getConnection();
+ return (this.savepointName != null ?
+ Mono.from(connection.rollbackTransactionToSavepoint(this.savepointName)) :
+ Mono.from(connection.rollbackTransaction()));
+ }
+
+ public void setRollbackOnly() {
+ getConnectionHolder().setRollbackOnly();
+ }
}
}
diff --git a/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java b/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java
index b428d8ff7d3..89ccc9fe2d6 100644
--- a/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java
+++ b/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java
@@ -231,7 +231,6 @@ class R2dbcTransactionManagerUnitTests {
@Test
void testCommitFails() {
when(connectionMock.commitTransaction()).thenReturn(Mono.defer(() -> Mono.error(new R2dbcBadGrammarException("Commit should fail"))));
-
when(connectionMock.rollbackTransaction()).thenReturn(Mono.empty());
TransactionalOperator operator = TransactionalOperator.create(tm);
@@ -252,7 +251,6 @@ class R2dbcTransactionManagerUnitTests {
@Test
void testRollback() {
-
AtomicInteger commits = new AtomicInteger();
when(connectionMock.commitTransaction()).thenReturn(
Mono.fromRunnable(commits::incrementAndGet));
@@ -284,11 +282,8 @@ class R2dbcTransactionManagerUnitTests {
when(connectionMock.rollbackTransaction()).thenReturn(Mono.defer(() -> Mono.error(new R2dbcBadGrammarException("Commit should fail"))), Mono.empty());
TransactionalOperator operator = TransactionalOperator.create(tm);
-
operator.execute(reactiveTransaction -> {
-
reactiveTransaction.setRollbackOnly();
-
return ConnectionFactoryUtils.getConnection(connectionFactoryMock)
.doOnNext(connection -> connection.createStatement("foo")).then();
}).as(StepVerifier::create)
@@ -306,11 +301,9 @@ class R2dbcTransactionManagerUnitTests {
@SuppressWarnings("unchecked")
void testConnectionReleasedWhenRollbackFails() {
when(connectionMock.rollbackTransaction()).thenReturn(Mono.defer(() -> Mono.error(new R2dbcBadGrammarException("Rollback should fail"))), Mono.empty());
-
- TransactionalOperator operator = TransactionalOperator.create(tm);
-
when(connectionMock.setTransactionIsolationLevel(any())).thenReturn(Mono.empty());
+ TransactionalOperator operator = TransactionalOperator.create(tm);
operator.execute(reactiveTransaction -> ConnectionFactoryUtils.getConnection(connectionFactoryMock)
.doOnNext(connection -> {
throw new IllegalStateException("Intentional error to trigger rollback");
@@ -333,12 +326,9 @@ class R2dbcTransactionManagerUnitTests {
TransactionSynchronization.STATUS_ROLLED_BACK);
TransactionalOperator operator = TransactionalOperator.create(tm);
-
operator.execute(tx -> {
-
tx.setRollbackOnly();
assertThat(tx.isNewTransaction()).isTrue();
-
return TransactionSynchronizationManager.forCurrentTransaction().doOnNext(
synchronizationManager -> {
assertThat(synchronizationManager.hasResource(connectionFactoryMock)).isTrue();
@@ -364,15 +354,12 @@ class R2dbcTransactionManagerUnitTests {
DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
- TransactionalOperator operator = TransactionalOperator.create(tm, definition);
+ TransactionalOperator operator = TransactionalOperator.create(tm, definition);
operator.execute(tx1 -> {
-
assertThat(tx1.isNewTransaction()).isTrue();
-
definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_NEVER);
return operator.execute(tx2 -> {
-
fail("Should have thrown IllegalTransactionStateException");
return Mono.empty();
});
@@ -384,24 +371,121 @@ class R2dbcTransactionManagerUnitTests {
}
@Test
- void testPropagationSupportsAndRequiresNew() {
+ void testPropagationNestedWithExistingTransaction() {
+ when(connectionMock.createSavepoint("SAVEPOINT_1")).thenReturn(Mono.empty());
+ when(connectionMock.releaseSavepoint("SAVEPOINT_1")).thenReturn(Mono.empty());
+ when(connectionMock.commitTransaction()).thenReturn(Mono.empty());
+
+ DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
+ definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
+
+ TransactionalOperator operator = TransactionalOperator.create(tm, definition);
+ operator.execute(tx1 -> {
+ assertThat(tx1.isNewTransaction()).isTrue();
+ definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_NESTED);
+ return operator.execute(tx2 -> {
+ assertThat(tx2.isNewTransaction()).isTrue();
+ return Mono.empty();
+ });
+ }).as(StepVerifier::create)
+ .verifyComplete();
+
+ verify(connectionMock).createSavepoint("SAVEPOINT_1");
+ verify(connectionMock).releaseSavepoint("SAVEPOINT_1");
+ verify(connectionMock).commitTransaction();
+ verify(connectionMock).close();
+ }
+
+ @Test
+ void testPropagationNestedWithExistingTransactionAndRollback() {
+ when(connectionMock.createSavepoint("SAVEPOINT_1")).thenReturn(Mono.empty());
+ when(connectionMock.rollbackTransactionToSavepoint("SAVEPOINT_1")).thenReturn(Mono.empty());
+ when(connectionMock.commitTransaction()).thenReturn(Mono.empty());
+
+ DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
+ definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
+
+ TransactionalOperator operator = TransactionalOperator.create(tm, definition);
+ operator.execute(tx1 -> {
+ assertThat(tx1.isNewTransaction()).isTrue();
+ definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_NESTED);
+ return operator.execute(tx2 -> {
+ assertThat(tx2.isNewTransaction()).isTrue();
+ tx2.setRollbackOnly();
+ return Mono.empty();
+ });
+ }).as(StepVerifier::create)
+ .verifyComplete();
+
+ verify(connectionMock).createSavepoint("SAVEPOINT_1");
+ verify(connectionMock).rollbackTransactionToSavepoint("SAVEPOINT_1");
+ verify(connectionMock).commitTransaction();
+ verify(connectionMock).close();
+ }
+
+ @Test
+ void testPropagationSupportsAndNested() {
when(connectionMock.commitTransaction()).thenReturn(Mono.empty());
DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_SUPPORTS);
+
TransactionalOperator operator = TransactionalOperator.create(tm, definition);
+ operator.execute(tx1 -> {
+ assertThat(tx1.isNewTransaction()).isFalse();
+ DefaultTransactionDefinition innerDef = new DefaultTransactionDefinition();
+ innerDef.setPropagationBehavior(TransactionDefinition.PROPAGATION_NESTED);
+ TransactionalOperator inner = TransactionalOperator.create(tm, innerDef);
+ return inner.execute(tx2 -> {
+ assertThat(tx2.isNewTransaction()).isTrue();
+ return Mono.empty();
+ });
+ }).as(StepVerifier::create)
+ .verifyComplete();
+
+ verify(connectionMock).commitTransaction();
+ verify(connectionMock).close();
+ }
+
+ @Test
+ void testPropagationSupportsAndNestedWithRollback() {
+ when(connectionMock.rollbackTransaction()).thenReturn(Mono.empty());
+ DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
+ definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_SUPPORTS);
+
+ TransactionalOperator operator = TransactionalOperator.create(tm, definition);
operator.execute(tx1 -> {
+ assertThat(tx1.isNewTransaction()).isFalse();
+ DefaultTransactionDefinition innerDef = new DefaultTransactionDefinition();
+ innerDef.setPropagationBehavior(TransactionDefinition.PROPAGATION_NESTED);
+ TransactionalOperator inner = TransactionalOperator.create(tm, innerDef);
+ return inner.execute(tx2 -> {
+ assertThat(tx2.isNewTransaction()).isTrue();
+ tx2.setRollbackOnly();
+ return Mono.empty();
+ });
+ }).as(StepVerifier::create)
+ .verifyComplete();
- assertThat(tx1.isNewTransaction()).isFalse();
+ verify(connectionMock).rollbackTransaction();
+ verify(connectionMock).close();
+ }
+
+ @Test
+ void testPropagationSupportsAndRequiresNew() {
+ when(connectionMock.commitTransaction()).thenReturn(Mono.empty());
+ DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
+ definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_SUPPORTS);
+
+ TransactionalOperator operator = TransactionalOperator.create(tm, definition);
+ operator.execute(tx1 -> {
+ assertThat(tx1.isNewTransaction()).isFalse();
DefaultTransactionDefinition innerDef = new DefaultTransactionDefinition();
- innerDef.setPropagationBehavior(
- TransactionDefinition.PROPAGATION_REQUIRES_NEW);
+ innerDef.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
TransactionalOperator inner = TransactionalOperator.create(tm, innerDef);
-
return inner.execute(tx2 -> {
-
assertThat(tx2.isNewTransaction()).isTrue();
return Mono.empty();
});
@@ -412,6 +496,31 @@ class R2dbcTransactionManagerUnitTests {
verify(connectionMock).close();
}
+ @Test
+ void testPropagationSupportsAndRequiresNewWithRollback() {
+ when(connectionMock.rollbackTransaction()).thenReturn(Mono.empty());
+
+ DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
+ definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_SUPPORTS);
+
+ TransactionalOperator operator = TransactionalOperator.create(tm, definition);
+ operator.execute(tx1 -> {
+ assertThat(tx1.isNewTransaction()).isFalse();
+ DefaultTransactionDefinition innerDef = new DefaultTransactionDefinition();
+ innerDef.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
+ TransactionalOperator inner = TransactionalOperator.create(tm, innerDef);
+ return inner.execute(tx2 -> {
+ assertThat(tx2.isNewTransaction()).isTrue();
+ tx2.setRollbackOnly();
+ return Mono.empty();
+ });
+ }).as(StepVerifier::create)
+ .verifyComplete();
+
+ verify(connectionMock).rollbackTransaction();
+ verify(connectionMock).close();
+ }
+
private static class TestTransactionSynchronization implements TransactionSynchronization {