|
|
|
|
@ -57,13 +57,14 @@ import org.springframework.util.StringUtils;
@@ -57,13 +57,14 @@ import org.springframework.util.StringUtils;
|
|
|
|
|
* |
|
|
|
|
* <p>Subclasses are responsible for calling methods in this class in the correct order. |
|
|
|
|
* |
|
|
|
|
* <p>If no transaction name has been specified in the {@code TransactionAttribute}, |
|
|
|
|
* <p>If no transaction name has been specified in the {@link TransactionAttribute}, |
|
|
|
|
* the exposed name will be the {@code fully-qualified class name + "." + method name} |
|
|
|
|
* (by default). |
|
|
|
|
* |
|
|
|
|
* <p>Uses the <b>Strategy</b> design pattern. A {@code PlatformTransactionManager} |
|
|
|
|
* implementation will perform the actual transaction management, and a |
|
|
|
|
* {@code TransactionAttributeSource} is used for determining transaction definitions. |
|
|
|
|
* <p>Uses the <b>Strategy</b> design pattern. A {@link PlatformTransactionManager} or |
|
|
|
|
* {@link ReactiveTransactionManager} implementation will perform the actual transaction |
|
|
|
|
* management, and a {@link TransactionAttributeSource} (e.g. annotation-based) is used |
|
|
|
|
* for determining transaction definitions for a particular class or method. |
|
|
|
|
* |
|
|
|
|
* <p>A transaction aspect is serializable if its {@code PlatformTransactionManager} |
|
|
|
|
* and {@code TransactionAttributeSource} are serializable. |
|
|
|
|
@ -72,7 +73,10 @@ import org.springframework.util.StringUtils;
@@ -72,7 +73,10 @@ import org.springframework.util.StringUtils;
|
|
|
|
|
* @author Juergen Hoeller |
|
|
|
|
* @author Stéphane Nicoll |
|
|
|
|
* @author Sam Brannen |
|
|
|
|
* @author Mark Paluch |
|
|
|
|
* @since 1.1 |
|
|
|
|
* @see PlatformTransactionManager |
|
|
|
|
* @see ReactiveTransactionManager |
|
|
|
|
* @see #setTransactionManager |
|
|
|
|
* @see #setTransactionAttributes |
|
|
|
|
* @see #setTransactionAttributeSource |
|
|
|
|
@ -450,7 +454,8 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
@@ -450,7 +454,8 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
|
|
|
|
else { |
|
|
|
|
PlatformTransactionManager defaultTransactionManager = asPlatformTransactionManager(getTransactionManager()); |
|
|
|
|
if (defaultTransactionManager == null) { |
|
|
|
|
defaultTransactionManager = asPlatformTransactionManager(this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY)); |
|
|
|
|
defaultTransactionManager = asPlatformTransactionManager( |
|
|
|
|
this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY)); |
|
|
|
|
if (defaultTransactionManager == null) { |
|
|
|
|
defaultTransactionManager = this.beanFactory.getBean(PlatformTransactionManager.class); |
|
|
|
|
this.transactionManagerCache.putIfAbsent( |
|
|
|
|
@ -817,7 +822,6 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
@@ -817,7 +822,6 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
|
|
|
|
this.adapter = adapter; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@SuppressWarnings({ "unchecked", "rawtypes" }) |
|
|
|
|
public Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, InvocationCallback invocation) { |
|
|
|
|
// If the transaction attribute is null, the method is non-transactional.
|
|
|
|
|
TransactionAttributeSource tas = getTransactionAttributeSource(); |
|
|
|
|
@ -827,62 +831,53 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
@@ -827,62 +831,53 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
|
|
|
|
|
|
|
|
|
// Optimize for Mono
|
|
|
|
|
if (Mono.class.isAssignableFrom(method.getReturnType())) { |
|
|
|
|
return TransactionContextManager.currentContext().flatMap(context -> { |
|
|
|
|
// Standard transaction demarcation with getTransaction and commit/rollback calls.
|
|
|
|
|
Mono<ReactiveTransactionInfo> txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); |
|
|
|
|
return txInfo.flatMap(it -> { |
|
|
|
|
return TransactionContextManager.currentContext().flatMap(context -> |
|
|
|
|
createTransactionIfNecessary(tm, txAttr, joinpointIdentification).flatMap(it -> { |
|
|
|
|
try { |
|
|
|
|
// This is an around advice: Invoke the next interceptor in the chain.
|
|
|
|
|
// This will normally result in a target object being invoked.
|
|
|
|
|
// Need re-wrapping of ReactiveTransaction until we get hold of the exception
|
|
|
|
|
// through usingWhen.
|
|
|
|
|
return Mono.<Object, ReactiveTransactionInfo>usingWhen(Mono.just(it), txInfo -> { |
|
|
|
|
try { |
|
|
|
|
return (Mono<?>) invocation.proceedWithInvocation(); |
|
|
|
|
} |
|
|
|
|
catch (Throwable throwable) { |
|
|
|
|
return Mono.error(throwable); |
|
|
|
|
} |
|
|
|
|
}, this::commitTransactionAfterReturning, txInfo -> Mono.empty()) |
|
|
|
|
.onErrorResume(ex -> completeTransactionAfterThrowing(it, ex).then(Mono.error(ex))); |
|
|
|
|
} |
|
|
|
|
catch (Throwable ex) { |
|
|
|
|
// target invocation exception
|
|
|
|
|
return completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)); |
|
|
|
|
} |
|
|
|
|
})).subscriberContext(TransactionContextManager.getOrCreateContext()) |
|
|
|
|
.subscriberContext(TransactionContextManager.getOrCreateContextHolder()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return TransactionContextManager.currentContext().flatMapMany(context -> |
|
|
|
|
createTransactionIfNecessary(tm, txAttr, joinpointIdentification).flatMapMany(it -> { |
|
|
|
|
try { |
|
|
|
|
// This is an around advice: Invoke the next interceptor in the chain.
|
|
|
|
|
// This will normally result in a target object being invoked.
|
|
|
|
|
// Need re-wrapping of ReactiveTransaction until we get hold of the exception
|
|
|
|
|
// through usingWhen.
|
|
|
|
|
return Mono.<Object, ReactiveTransactionInfo>usingWhen(Mono.just(it), s -> { |
|
|
|
|
return Flux.usingWhen(Mono.just(it), txInfo -> { |
|
|
|
|
try { |
|
|
|
|
return (Mono) invocation.proceedWithInvocation(); |
|
|
|
|
return this.adapter.toPublisher(invocation.proceedWithInvocation()); |
|
|
|
|
} |
|
|
|
|
catch (Throwable throwable) { |
|
|
|
|
return Mono.error(throwable); |
|
|
|
|
} |
|
|
|
|
}, this::commitTransactionAfterReturning, s -> Mono.empty()) |
|
|
|
|
.onErrorResume(ex -> completeTransactionAfterThrowing(it, ex) |
|
|
|
|
.then(Mono.error(ex))); |
|
|
|
|
}, this::commitTransactionAfterReturning, txInfo -> Mono.empty()) |
|
|
|
|
.onErrorResume(ex -> completeTransactionAfterThrowing(it, ex).then(Mono.error(ex))); |
|
|
|
|
} |
|
|
|
|
catch (Throwable ex) { |
|
|
|
|
// target invocation exception
|
|
|
|
|
return completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)); |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
}).subscriberContext(TransactionContextManager.getOrCreateContext()) |
|
|
|
|
.subscriberContext(TransactionContextManager.getOrCreateContextHolder()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return TransactionContextManager.currentContext().flatMapMany(context -> { |
|
|
|
|
// Standard transaction demarcation with getTransaction and commit/rollback calls.
|
|
|
|
|
Mono<ReactiveTransactionInfo> txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); |
|
|
|
|
return txInfo.flatMapMany(it -> { |
|
|
|
|
try { |
|
|
|
|
// This is an around advice: Invoke the next interceptor in the chain.
|
|
|
|
|
// This will normally result in a target object being invoked.
|
|
|
|
|
// Need re-wrapping of ReactiveTransaction until we get hold of the exception
|
|
|
|
|
// through usingWhen.
|
|
|
|
|
return Flux.usingWhen(Mono.just(it), s -> { |
|
|
|
|
try { |
|
|
|
|
return this.adapter.toPublisher( |
|
|
|
|
invocation.proceedWithInvocation()); |
|
|
|
|
} |
|
|
|
|
catch (Throwable throwable) { |
|
|
|
|
return Mono.error(throwable); |
|
|
|
|
} |
|
|
|
|
}, this::commitTransactionAfterReturning, s -> Mono.empty()) |
|
|
|
|
.onErrorResume(ex -> completeTransactionAfterThrowing(it, ex) |
|
|
|
|
.then(Mono.error(ex))); |
|
|
|
|
} |
|
|
|
|
catch (Throwable ex) { |
|
|
|
|
// target invocation exception
|
|
|
|
|
return completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)); |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
}).subscriberContext(TransactionContextManager.getOrCreateContext()) |
|
|
|
|
})).subscriberContext(TransactionContextManager.getOrCreateContext()) |
|
|
|
|
.subscriberContext(TransactionContextManager.getOrCreateContextHolder()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -903,7 +898,8 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
@@ -903,7 +898,8 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
|
|
|
|
else { |
|
|
|
|
ReactiveTransactionManager defaultTransactionManager = asReactiveTransactionManager(getTransactionManager()); |
|
|
|
|
if (defaultTransactionManager == null) { |
|
|
|
|
defaultTransactionManager = asReactiveTransactionManager(transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY)); |
|
|
|
|
defaultTransactionManager = asReactiveTransactionManager( |
|
|
|
|
transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY)); |
|
|
|
|
if (defaultTransactionManager == null) { |
|
|
|
|
defaultTransactionManager = beanFactory.getBean(ReactiveTransactionManager.class); |
|
|
|
|
transactionManagerCache.putIfAbsent( |
|
|
|
|
@ -963,8 +959,8 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
@@ -963,8 +959,8 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return tx.map(it -> prepareTransactionInfo(tm, attrToUse, joinpointIdentification, it)) |
|
|
|
|
.switchIfEmpty(Mono.defer(() -> Mono.just(prepareTransactionInfo(tm, attrToUse, joinpointIdentification, null)))); |
|
|
|
|
return tx.map(it -> prepareTransactionInfo(tm, attrToUse, joinpointIdentification, it)).switchIfEmpty( |
|
|
|
|
Mono.defer(() -> Mono.just(prepareTransactionInfo(tm, attrToUse, joinpointIdentification, null)))); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private ReactiveTransactionInfo prepareTransactionInfo(@Nullable ReactiveTransactionManager tm, |
|
|
|
|
|