Browse Source

Fix suspend/resume in AbstractReactiveTransactionManager

We now correctly unwrap suspended resources instead capturing
the Mono emitting suspended resources.

We also properly continue resume by chaining resume Mono's
instead of terminating eagerly.
pull/22921/head
Mark Paluch 7 years ago committed by Juergen Hoeller
parent
commit
bb002af8af
  1. 10
      spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java

10
spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java

@ -192,7 +192,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran @@ -192,7 +192,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
Mono<SuspendedResourcesHolder> suspendedResources = suspend(synchronizationManager, transaction);
return suspendedResources.flatMap(suspendedResourcesHolder -> {
GenericReactiveTransaction status = newReactiveTransaction(synchronizationManager,
definition, transaction, true, debugEnabled, suspendedResources);
definition, transaction, true, debugEnabled, suspendedResourcesHolder);
return doBegin(synchronizationManager, transaction, definition).doOnSuccess(ignore ->
prepareSynchronization(synchronizationManager, status, definition)).thenReturn(status)
.onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, beginEx ->
@ -322,10 +322,12 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran @@ -322,10 +322,12 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
throws TransactionException {
Mono<Void> resume = Mono.empty();
if (resourcesHolder != null) {
Object suspendedResources = resourcesHolder.suspendedResources;
if (suspendedResources != null) {
return doResume(synchronizationManager, transaction, suspendedResources);
resume = doResume(synchronizationManager, transaction, suspendedResources);
}
List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
if (suspendedSynchronizations != null) {
@ -333,11 +335,11 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran @@ -333,11 +335,11 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
synchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
synchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
synchronizationManager.setCurrentTransactionName(resourcesHolder.name);
return doResumeSynchronization(synchronizationManager, suspendedSynchronizations);
return resume.then(doResumeSynchronization(synchronizationManager, suspendedSynchronizations));
}
}
return Mono.empty();
return resume;
}
/**

Loading…
Cancel
Save