|
|
|
@ -439,13 +439,16 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran |
|
|
|
return Mono.empty(); |
|
|
|
return Mono.empty(); |
|
|
|
})).then(Mono.empty().onErrorResume(ex -> { |
|
|
|
})).then(Mono.empty().onErrorResume(ex -> { |
|
|
|
Mono<Object> propagateException = Mono.error(ex); |
|
|
|
Mono<Object> propagateException = Mono.error(ex); |
|
|
|
|
|
|
|
// Store result in a local variable in order to appease the
|
|
|
|
|
|
|
|
// Eclipse compiler with regard to inferred generics.
|
|
|
|
|
|
|
|
Mono<Object> result = propagateException; |
|
|
|
if (ErrorPredicates.UNEXPECTED_ROLLBACK.test(ex)) { |
|
|
|
if (ErrorPredicates.UNEXPECTED_ROLLBACK.test(ex)) { |
|
|
|
return triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK).then(propagateException); |
|
|
|
result = triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK).then(propagateException); |
|
|
|
} |
|
|
|
} |
|
|
|
if (ErrorPredicates.TRANSACTION_EXCEPTION.test(ex)) { |
|
|
|
else if (ErrorPredicates.TRANSACTION_EXCEPTION.test(ex)) { |
|
|
|
return triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_UNKNOWN).then(propagateException); |
|
|
|
result = triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_UNKNOWN).then(propagateException); |
|
|
|
} |
|
|
|
} |
|
|
|
if (ErrorPredicates.RUNTIME_OR_ERROR.test(ex)) { |
|
|
|
else if (ErrorPredicates.RUNTIME_OR_ERROR.test(ex)) { |
|
|
|
Mono<Void> mono; |
|
|
|
Mono<Void> mono; |
|
|
|
if (!beforeCompletionInvoked.get()) { |
|
|
|
if (!beforeCompletionInvoked.get()) { |
|
|
|
mono = triggerBeforeCompletion(synchronizationManager, status); |
|
|
|
mono = triggerBeforeCompletion(synchronizationManager, status); |
|
|
|
@ -453,10 +456,10 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran |
|
|
|
else { |
|
|
|
else { |
|
|
|
mono = Mono.empty(); |
|
|
|
mono = Mono.empty(); |
|
|
|
} |
|
|
|
} |
|
|
|
return mono.then(doRollbackOnCommitException(synchronizationManager, status, ex)).then(propagateException); |
|
|
|
result = mono.then(doRollbackOnCommitException(synchronizationManager, status, ex)).then(propagateException); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
return propagateException; |
|
|
|
return result; |
|
|
|
})).then(Mono.defer(() -> triggerAfterCommit(synchronizationManager, status).onErrorResume(ex -> |
|
|
|
})).then(Mono.defer(() -> triggerAfterCommit(synchronizationManager, status).onErrorResume(ex -> |
|
|
|
triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED).then(Mono.error(ex))) |
|
|
|
triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED).then(Mono.error(ex))) |
|
|
|
.then(triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED)))); |
|
|
|
.then(triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED)))); |
|
|
|
|