From cc986cd2e84e550288e763b255aae7703b8976b1 Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Wed, 19 Mar 2025 10:59:46 +0100 Subject: [PATCH] Defer triggerAfterCompletion invocation in doRollbackOnCommitException Closes gh-34595 --- .../R2dbcTransactionManagerTests.java | 27 +++++++++++++++++++ .../AbstractReactiveTransactionManager.java | 20 ++++++-------- 2 files changed, 35 insertions(+), 12 deletions(-) diff --git a/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerTests.java b/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerTests.java index ddb45f2f50e..b383b633b01 100644 --- a/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerTests.java +++ b/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerTests.java @@ -23,6 +23,7 @@ import io.r2dbc.spi.Connection; import io.r2dbc.spi.ConnectionFactory; import io.r2dbc.spi.IsolationLevel; import io.r2dbc.spi.R2dbcBadGrammarException; +import io.r2dbc.spi.R2dbcTransientResourceException; import io.r2dbc.spi.Statement; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -32,6 +33,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import org.springframework.dao.TransientDataAccessResourceException; import org.springframework.r2dbc.BadSqlGrammarException; import org.springframework.transaction.CannotCreateTransactionException; import org.springframework.transaction.IllegalTransactionStateException; @@ -315,6 +317,31 @@ class R2dbcTransactionManagerTests { verify(connectionMock).close(); } + @Test + void testCommitAndRollbackFails() { + when(connectionMock.isAutoCommit()).thenReturn(false); + when(connectionMock.commitTransaction()).thenReturn(Mono.defer(() -> + Mono.error(new R2dbcBadGrammarException("Commit should fail")))); + when(connectionMock.rollbackTransaction()).thenReturn(Mono.defer(() -> + Mono.error(new R2dbcTransientResourceException("Rollback should also fail")))); + + TransactionalOperator operator = TransactionalOperator.create(tm); + + ConnectionFactoryUtils.getConnection(connectionFactoryMock) + .doOnNext(connection -> connection.createStatement("foo")).then() + .as(operator::transactional) + .as(StepVerifier::create) + .verifyError(TransientDataAccessResourceException.class); + + verify(connectionMock).isAutoCommit(); + verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class)); + verify(connectionMock).createStatement("foo"); + verify(connectionMock).commitTransaction(); + verify(connectionMock).rollbackTransaction(); + verify(connectionMock).close(); + verifyNoMoreInteractions(connectionMock); + } + @Test void testTransactionSetRollbackOnly() { when(connectionMock.isAutoCommit()).thenReturn(false); diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java index 8a3a5056b6a..ce08817223b 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -494,21 +494,17 @@ public abstract class AbstractReactiveTransactionManager })); } else if (ErrorPredicates.RUNTIME_OR_ERROR.test(ex)) { - Mono mono; + Mono mono = Mono.empty(); if (!beforeCompletionInvoked.get()) { mono = triggerBeforeCompletion(synchronizationManager, status); } - else { - mono = Mono.empty(); - } result = mono.then(doRollbackOnCommitException(synchronizationManager, status, ex)) .then(propagateException); } - return result; }) .then(Mono.defer(() -> triggerAfterCommit(synchronizationManager, status).onErrorResume(ex -> - triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED).then(Mono.error(ex))) + triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED).then(Mono.error(ex))) .then(triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED)) .then(Mono.defer(() -> { if (status.isNewTransaction()) { @@ -518,8 +514,8 @@ public abstract class AbstractReactiveTransactionManager })))); return commit - .onErrorResume(ex -> cleanupAfterCompletion(synchronizationManager, status) - .then(Mono.error(ex))).then(cleanupAfterCompletion(synchronizationManager, status)); + .onErrorResume(ex -> cleanupAfterCompletion(synchronizationManager, status).then(Mono.error(ex))) + .then(cleanupAfterCompletion(synchronizationManager, status)); } /** @@ -571,8 +567,8 @@ public abstract class AbstractReactiveTransactionManager } return beforeCompletion; } - })).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, ex -> triggerAfterCompletion( - synchronizationManager, status, TransactionSynchronization.STATUS_UNKNOWN) + })).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, ex -> + triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_UNKNOWN) .then(Mono.defer(() -> { if (status.isNewTransaction()) { this.transactionExecutionListeners.forEach(listener -> listener.afterRollback(status, ex)); @@ -623,7 +619,7 @@ public abstract class AbstractReactiveTransactionManager return Mono.empty(); })) .then(Mono.error(rbex)); - }).then(triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK)) + }).then(Mono.defer(() -> triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK))) .then(Mono.defer(() -> { this.transactionExecutionListeners.forEach(listener -> listener.afterRollback(status, null)); return Mono.empty();