From bb002af8af1a9b93517860ab396cf8e4bc55c087 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Tue, 7 May 2019 15:43:44 +0200 Subject: [PATCH] Fix suspend/resume in AbstractReactiveTransactionManager We now correctly unwrap suspended resources instead capturing the Mono emitting suspended resources. We also properly continue resume by chaining resume Mono's instead of terminating eagerly. --- .../reactive/AbstractReactiveTransactionManager.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java index fc556d310b8..52ab39b184b 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java @@ -192,7 +192,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran Mono suspendedResources = suspend(synchronizationManager, transaction); return suspendedResources.flatMap(suspendedResourcesHolder -> { GenericReactiveTransaction status = newReactiveTransaction(synchronizationManager, - definition, transaction, true, debugEnabled, suspendedResources); + definition, transaction, true, debugEnabled, suspendedResourcesHolder); return doBegin(synchronizationManager, transaction, definition).doOnSuccess(ignore -> prepareSynchronization(synchronizationManager, status, definition)).thenReturn(status) .onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, beginEx -> @@ -322,10 +322,12 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran @Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder) throws TransactionException { + Mono resume = Mono.empty(); + if (resourcesHolder != null) { Object suspendedResources = resourcesHolder.suspendedResources; if (suspendedResources != null) { - return doResume(synchronizationManager, transaction, suspendedResources); + resume = doResume(synchronizationManager, transaction, suspendedResources); } List suspendedSynchronizations = resourcesHolder.suspendedSynchronizations; if (suspendedSynchronizations != null) { @@ -333,11 +335,11 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran synchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel); synchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly); synchronizationManager.setCurrentTransactionName(resourcesHolder.name); - return doResumeSynchronization(synchronizationManager, suspendedSynchronizations); + return resume.then(doResumeSynchronization(synchronizationManager, suspendedSynchronizations)); } } - return Mono.empty(); + return resume; } /**