@ -80,6 +80,7 @@ import org.springframework.util.Assert;
@@ -80,6 +80,7 @@ import org.springframework.util.Assert;
* transaction definitions for vendor - specific attributes .
*
* @author Mark Paluch
* @author Juergen Hoeller
* @since 5 . 3
* @see ConnectionFactoryUtils # getConnection ( ConnectionFactory )
* @see ConnectionFactoryUtils # releaseConnection
@ -149,7 +150,7 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
@@ -149,7 +150,7 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
* transactional connection : "SET TRANSACTION READ ONLY" as understood by Oracle ,
* MySQL and Postgres .
* < p > The exact treatment , including any SQL statement executed on the connection ,
* can be customized through through { @link # prepareTransactionalConnection } .
* can be customized through { @link # prepareTransactionalConnection } .
* @see # prepareTransactionalConnection
* /
public void setEnforceReadOnly ( boolean enforceReadOnly ) {
@ -209,8 +210,9 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
@@ -209,8 +210,9 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
connectionMono = Mono . just ( txObject . getConnectionHolder ( ) . getConnection ( ) ) ;
}
return connectionMono . flatMap ( con - > prepareTransactionalConnection ( con , definiti on, transaction )
return connectionMono . flatMap ( con - > switchAutoCommitIfNecessary ( c on, transaction )
. then ( Mono . from ( doBegin ( definition , con ) ) )
. then ( prepareTransactionalConnection ( con , definition ) )
. doOnSuccess ( v - > {
txObject . getConnectionHolder ( ) . setTransactionActive ( true ) ;
Duration timeout = determineTimeout ( definition ) ;
@ -375,6 +377,24 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
@@ -375,6 +377,24 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
} ) ;
}
private Mono < Void > switchAutoCommitIfNecessary ( Connection con , Object transaction ) {
ConnectionFactoryTransactionObject txObject = ( ConnectionFactoryTransactionObject ) transaction ;
Mono < Void > prepare = Mono . empty ( ) ;
// Switch to manual commit if necessary. This is very expensive in some R2DBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
if ( con . isAutoCommit ( ) ) {
txObject . setMustRestoreAutoCommit ( true ) ;
if ( logger . isDebugEnabled ( ) ) {
logger . debug ( "Switching R2DBC Connection [" + con + "] to manual commit" ) ;
}
prepare = prepare . then ( Mono . from ( con . setAutoCommit ( false ) ) ) ;
}
return prepare ;
}
/ * *
* Prepare the transactional { @link Connection } right after transaction begin .
* < p > The default implementation executes a "SET TRANSACTION READ ONLY" statement if the
@ -385,33 +405,16 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
@@ -385,33 +405,16 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
* override this method accordingly .
* @param con the transactional R2DBC Connection
* @param definition the current transaction definition
* @param transaction the transaction object
* @since 5 . 3 . 22
* @see # setEnforceReadOnly
* /
protected Mono < Void > prepareTransactionalConnection (
Connection con , TransactionDefinition definition , Object transaction ) {
ConnectionFactoryTransactionObject txObject = ( ConnectionFactoryTransactionObject ) transaction ;
protected Mono < Void > prepareTransactionalConnection ( Connection con , TransactionDefinition definition ) {
Mono < Void > prepare = Mono . empty ( ) ;
if ( isEnforceReadOnly ( ) & & definition . isReadOnly ( ) ) {
prepare = Mono . from ( con . createStatement ( "SET TRANSACTION READ ONLY" ) . execute ( ) )
. flatMapMany ( Result : : getRowsUpdated )
. then ( ) ;
}
// Switch to manual commit if necessary. This is very expensive in some R2DBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
if ( con . isAutoCommit ( ) ) {
txObject . setMustRestoreAutoCommit ( true ) ;
if ( logger . isDebugEnabled ( ) ) {
logger . debug ( "Switching R2DBC Connection [" + con + "] to manual commit" ) ;
}
prepare = prepare . then ( Mono . from ( con . setAutoCommit ( false ) ) ) ;
}
return prepare ;
}
@ -452,21 +455,20 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
@@ -452,21 +455,20 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
* to R2DBC drivers when starting a transaction .
* /
private record ExtendedTransactionDefinition ( @Nullable String transactionName ,
boolean readOnly ,
@Nullable IsolationLevel isolationLevel ,
Duration lockWaitTimeout ) implements io . r2dbc . spi . TransactionDefinition {
boolean readOnly , @Nullable IsolationLevel isolationLevel , Duration lockWaitTimeout )
implements io . r2dbc . spi . TransactionDefinition {
private ExtendedTransactionDefinition ( @Nullable String transactionName , boolean readOnly ,
@Nullable IsolationLevel isolationLevel , Duration lockWaitTimeout ) {
this . transactionName = transactionName ;
this . readOnly = readOnly ;
this . isolationLevel = isolationLevel ;
this . lockWaitTimeout = lockWaitTimeout ;
}
@Override
@SuppressWarnings ( "unchecked" )
@Override
public < T > T getAttribute ( Option < T > option ) {
return ( T ) doGetValue ( option ) ;
}