diff --git a/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java b/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java
index 7aea0c270a8..fb6d57a4efa 100644
--- a/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java
+++ b/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java
@@ -80,6 +80,7 @@ import org.springframework.util.Assert;
* transaction definitions for vendor-specific attributes.
*
* @author Mark Paluch
+ * @author Juergen Hoeller
* @since 5.3
* @see ConnectionFactoryUtils#getConnection(ConnectionFactory)
* @see ConnectionFactoryUtils#releaseConnection
@@ -149,7 +150,7 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
* transactional connection: "SET TRANSACTION READ ONLY" as understood by Oracle,
* MySQL and Postgres.
*
The exact treatment, including any SQL statement executed on the connection,
- * can be customized through through {@link #prepareTransactionalConnection}.
+ * can be customized through {@link #prepareTransactionalConnection}.
* @see #prepareTransactionalConnection
*/
public void setEnforceReadOnly(boolean enforceReadOnly) {
@@ -209,8 +210,9 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
connectionMono = Mono.just(txObject.getConnectionHolder().getConnection());
}
- return connectionMono.flatMap(con -> prepareTransactionalConnection(con, definition, transaction)
+ return connectionMono.flatMap(con -> switchAutoCommitIfNecessary(con, transaction)
.then(Mono.from(doBegin(definition, con)))
+ .then(prepareTransactionalConnection(con, definition))
.doOnSuccess(v -> {
txObject.getConnectionHolder().setTransactionActive(true);
Duration timeout = determineTimeout(definition);
@@ -375,6 +377,24 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
});
}
+ private Mono switchAutoCommitIfNecessary(Connection con, Object transaction) {
+ ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject) transaction;
+ Mono prepare = Mono.empty();
+
+ // Switch to manual commit if necessary. This is very expensive in some R2DBC drivers,
+ // so we don't want to do it unnecessarily (for example if we've explicitly
+ // configured the connection pool to set it already).
+ if (con.isAutoCommit()) {
+ txObject.setMustRestoreAutoCommit(true);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Switching R2DBC Connection [" + con + "] to manual commit");
+ }
+ prepare = prepare.then(Mono.from(con.setAutoCommit(false)));
+ }
+
+ return prepare;
+ }
+
/**
* Prepare the transactional {@link Connection} right after transaction begin.
* The default implementation executes a "SET TRANSACTION READ ONLY" statement if the
@@ -385,33 +405,16 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
* override this method accordingly.
* @param con the transactional R2DBC Connection
* @param definition the current transaction definition
- * @param transaction the transaction object
+ * @since 5.3.22
* @see #setEnforceReadOnly
*/
- protected Mono prepareTransactionalConnection(
- Connection con, TransactionDefinition definition, Object transaction) {
-
- ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject) transaction;
-
+ protected Mono prepareTransactionalConnection(Connection con, TransactionDefinition definition) {
Mono prepare = Mono.empty();
-
if (isEnforceReadOnly() && definition.isReadOnly()) {
prepare = Mono.from(con.createStatement("SET TRANSACTION READ ONLY").execute())
.flatMapMany(Result::getRowsUpdated)
.then();
}
-
- // Switch to manual commit if necessary. This is very expensive in some R2DBC drivers,
- // so we don't want to do it unnecessarily (for example if we've explicitly
- // configured the connection pool to set it already).
- if (con.isAutoCommit()) {
- txObject.setMustRestoreAutoCommit(true);
- if (logger.isDebugEnabled()) {
- logger.debug("Switching R2DBC Connection [" + con + "] to manual commit");
- }
- prepare = prepare.then(Mono.from(con.setAutoCommit(false)));
- }
-
return prepare;
}
@@ -452,21 +455,20 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
* to R2DBC drivers when starting a transaction.
*/
private record ExtendedTransactionDefinition(@Nullable String transactionName,
- boolean readOnly,
- @Nullable IsolationLevel isolationLevel,
- Duration lockWaitTimeout) implements io.r2dbc.spi.TransactionDefinition {
+ boolean readOnly, @Nullable IsolationLevel isolationLevel, Duration lockWaitTimeout)
+ implements io.r2dbc.spi.TransactionDefinition {
private ExtendedTransactionDefinition(@Nullable String transactionName, boolean readOnly,
@Nullable IsolationLevel isolationLevel, Duration lockWaitTimeout) {
+
this.transactionName = transactionName;
this.readOnly = readOnly;
this.isolationLevel = isolationLevel;
this.lockWaitTimeout = lockWaitTimeout;
}
-
- @Override
@SuppressWarnings("unchecked")
+ @Override
public T getAttribute(Option option) {
return (T) doGetValue(option);
}