@ -95,7 +95,12 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
* @see # doBegin
* @see # doBegin
* /
* /
@Override
@Override
public final Mono < ReactiveTransaction > getReactiveTransaction ( TransactionDefinition definition ) throws TransactionException {
public final Mono < ReactiveTransaction > getReactiveTransaction ( @Nullable TransactionDefinition definition )
throws TransactionException {
// Use defaults if no transaction definition given.
TransactionDefinition def = ( definition ! = null ? definition : TransactionDefinition . withDefaults ( ) ) ;
return TransactionSynchronizationManager . currentTransaction ( )
return TransactionSynchronizationManager . currentTransaction ( )
. flatMap ( synchronizationManager - > {
. flatMap ( synchronizationManager - > {
@ -106,22 +111,22 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
if ( isExistingTransaction ( transaction ) ) {
if ( isExistingTransaction ( transaction ) ) {
// Existing transaction found -> check propagation behavior to find out how to behave.
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction ( synchronizationManager , definition , transaction , debugEnabled ) ;
return handleExistingTransaction ( synchronizationManager , def , transaction , debugEnabled ) ;
}
}
// Check definition settings for new transaction.
// Check definition settings for new transaction.
if ( definition . getTimeout ( ) < TransactionDefinition . TIMEOUT_DEFAULT ) {
if ( def . getTimeout ( ) < TransactionDefinition . TIMEOUT_DEFAULT ) {
return Mono . error ( new InvalidTimeoutException ( "Invalid transaction timeout" , definition . getTimeout ( ) ) ) ;
return Mono . error ( new InvalidTimeoutException ( "Invalid transaction timeout" , def . getTimeout ( ) ) ) ;
}
}
// No existing transaction found -> check propagation behavior to find out how to proceed.
// No existing transaction found -> check propagation behavior to find out how to proceed.
if ( definition . getPropagationBehavior ( ) = = TransactionDefinition . PROPAGATION_MANDATORY ) {
if ( def . getPropagationBehavior ( ) = = TransactionDefinition . PROPAGATION_MANDATORY ) {
return Mono . error ( new IllegalTransactionStateException (
return Mono . error ( new IllegalTransactionStateException (
"No existing transaction found for transaction marked with propagation 'mandatory'" ) ) ;
"No existing transaction found for transaction marked with propagation 'mandatory'" ) ) ;
}
}
else if ( definition . getPropagationBehavior ( ) = = TransactionDefinition . PROPAGATION_REQUIRED | |
else if ( def . getPropagationBehavior ( ) = = TransactionDefinition . PROPAGATION_REQUIRED | |
definition . getPropagationBehavior ( ) = = TransactionDefinition . PROPAGATION_REQUIRES_NEW | |
def . getPropagationBehavior ( ) = = TransactionDefinition . PROPAGATION_REQUIRES_NEW | |
definition . getPropagationBehavior ( ) = = TransactionDefinition . PROPAGATION_NESTED ) {
def . getPropagationBehavior ( ) = = TransactionDefinition . PROPAGATION_NESTED ) {
return TransactionContextManager . currentContext ( )
return TransactionContextManager . currentContext ( )
. map ( TransactionSynchronizationManager : : new )
. map ( TransactionSynchronizationManager : : new )
@ -131,14 +136,14 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
. defaultIfEmpty ( Optional . empty ( ) )
. defaultIfEmpty ( Optional . empty ( ) )
. flatMap ( suspendedResources - > {
. flatMap ( suspendedResources - > {
if ( debugEnabled ) {
if ( debugEnabled ) {
logger . debug ( "Creating new transaction with name [" + definition . getName ( ) + "]: " + definition ) ;
logger . debug ( "Creating new transaction with name [" + def . getName ( ) + "]: " + def ) ;
}
}
return Mono . defer ( ( ) - > {
return Mono . defer ( ( ) - > {
GenericReactiveTransaction status = newReactiveTransaction (
GenericReactiveTransaction status = newReactiveTransaction (
nestedSynchronizationManager , definition , transaction , true ,
nestedSynchronizationManager , def , transaction , true ,
debugEnabled , suspendedResources . orElse ( null ) ) ;
debugEnabled , suspendedResources . orElse ( null ) ) ;
return doBegin ( nestedSynchronizationManager , transaction , definition )
return doBegin ( nestedSynchronizationManager , transaction , def )
. doOnSuccess ( ignore - > prepareSynchronization ( nestedSynchronizationManager , status , definition ) )
. doOnSuccess ( ignore - > prepareSynchronization ( nestedSynchronizationManager , status , def ) )
. thenReturn ( status ) ;
. thenReturn ( status ) ;
} ) . onErrorResume ( ErrorPredicates . RUNTIME_OR_ERROR ,
} ) . onErrorResume ( ErrorPredicates . RUNTIME_OR_ERROR ,
ex - > resume ( nestedSynchronizationManager , null , suspendedResources . orElse ( null ) )
ex - > resume ( nestedSynchronizationManager , null , suspendedResources . orElse ( null ) )
@ -147,11 +152,11 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
}
}
else {
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if ( definition . getIsolationLevel ( ) ! = TransactionDefinition . ISOLATION_DEFAULT & & logger . isWarnEnabled ( ) ) {
if ( def . getIsolationLevel ( ) ! = TransactionDefinition . ISOLATION_DEFAULT & & logger . isWarnEnabled ( ) ) {
logger . warn ( "Custom isolation level specified but no actual transaction initiated; " +
logger . warn ( "Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + definition ) ;
"isolation level will effectively be ignored: " + def ) ;
}
}
return Mono . just ( prepareReactiveTransaction ( synchronizationManager , definition , null , true , debugEnabled , null ) ) ;
return Mono . just ( prepareReactiveTransaction ( synchronizationManager , def , null , true , debugEnabled , null ) ) ;
}
}
} ) ;
} ) ;
}
}