|
|
|
@ -75,15 +75,13 @@ final class TransactionalOperatorImpl implements TransactionalOperator { |
|
|
|
public <T> Flux<T> execute(TransactionCallback<T> action) throws TransactionException { |
|
|
|
public <T> Flux<T> execute(TransactionCallback<T> action) throws TransactionException { |
|
|
|
return TransactionContextManager.currentContext().flatMapMany(context -> { |
|
|
|
return TransactionContextManager.currentContext().flatMapMany(context -> { |
|
|
|
Mono<ReactiveTransaction> status = this.transactionManager.getReactiveTransaction(this.transactionDefinition); |
|
|
|
Mono<ReactiveTransaction> status = this.transactionManager.getReactiveTransaction(this.transactionDefinition); |
|
|
|
return status.flatMapMany(it -> { |
|
|
|
// This is an around advice: Invoke the next interceptor in the chain.
|
|
|
|
// This is an around advice: Invoke the next interceptor in the chain.
|
|
|
|
// This will normally result in a target object being invoked.
|
|
|
|
// This will normally result in a target object being invoked.
|
|
|
|
// Need re-wrapping of ReactiveTransaction until we get hold of the exception
|
|
|
|
// Need re-wrapping of ReactiveTransaction until we get hold of the exception
|
|
|
|
// through usingWhen.
|
|
|
|
// through usingWhen.
|
|
|
|
return status.flatMapMany(it -> Flux.usingWhen(Mono.just(it), action::doInTransaction, |
|
|
|
return Flux.usingWhen(Mono.just(it), action::doInTransaction, |
|
|
|
this.transactionManager::commit, s -> Mono.empty()) |
|
|
|
this.transactionManager::commit, s -> Mono.empty()) |
|
|
|
.onErrorResume(ex -> rollbackOnException(it, ex).then(Mono.error(ex)))); |
|
|
|
.onErrorResume(ex -> rollbackOnException(it, ex).then(Mono.error(ex))); |
|
|
|
|
|
|
|
}); |
|
|
|
|
|
|
|
}) |
|
|
|
}) |
|
|
|
.subscriberContext(TransactionContextManager.getOrCreateContext()) |
|
|
|
.subscriberContext(TransactionContextManager.getOrCreateContext()) |
|
|
|
.subscriberContext(TransactionContextManager.getOrCreateContextHolder()); |
|
|
|
.subscriberContext(TransactionContextManager.getOrCreateContextHolder()); |
|
|
|
|