diff --git a/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java b/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java index fbd918a4f80..7e4a0e49db4 100644 --- a/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java +++ b/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java @@ -357,26 +357,41 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager Mono afterCleanup = Mono.empty(); if (txObject.isMustRestoreAutoCommit()) { - afterCleanup = afterCleanup.then(Mono.from(con.setAutoCommit(true))); + Mono restoreAutoCommitStep = safeCleanupStep( + "doCleanupAfterCompletion when restoring autocommit", Mono.from(con.setAutoCommit(true))); + afterCleanup = afterCleanup.then(restoreAutoCommitStep); } - return afterCleanup.then(Mono.defer(() -> { + Mono releaseConnectionStep = Mono.defer(() -> { try { if (txObject.isNewConnectionHolder()) { if (logger.isDebugEnabled()) { logger.debug("Releasing R2DBC Connection [" + con + "] after transaction"); } - return ConnectionFactoryUtils.releaseConnection(con, obtainConnectionFactory()); + return safeCleanupStep("doCleanupAfterCompletion when releasing R2DBC Connection", + ConnectionFactoryUtils.releaseConnection(con, obtainConnectionFactory())); } } finally { txObject.getConnectionHolder().clear(); } return Mono.empty(); - })); + }); + return afterCleanup.then(releaseConnectionStep); }); } + private Mono safeCleanupStep(String stepDescription, Mono stepMono) { + if (!logger.isDebugEnabled()) { + return stepMono.onErrorComplete(); + } + else { + return stepMono.doOnError(e -> + logger.debug(String.format("Error ignored during %s: %s", stepDescription, e))) + .onErrorComplete(); + } + } + private Mono switchAutoCommitIfNecessary(Connection con, Object transaction) { ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject) transaction; Mono prepare = Mono.empty(); diff --git a/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java b/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java index 03994a311d8..7247afe9d2a 100644 --- a/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java +++ b/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java @@ -23,13 +23,16 @@ import io.r2dbc.spi.Connection; import io.r2dbc.spi.ConnectionFactory; import io.r2dbc.spi.IsolationLevel; import io.r2dbc.spi.R2dbcBadGrammarException; +import io.r2dbc.spi.R2dbcTimeoutException; import io.r2dbc.spi.Statement; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import org.springframework.r2dbc.BadSqlGrammarException; import org.springframework.transaction.CannotCreateTransactionException; import org.springframework.transaction.IllegalTransactionStateException; import org.springframework.transaction.TransactionDefinition; @@ -326,6 +329,34 @@ class R2dbcTransactionManagerUnitTests { verifyNoMoreInteractions(connectionMock); } + @Test + @SuppressWarnings("unchecked") + void testConnectionReleasedWhenRollbackFails() { + when(connectionMock.rollbackTransaction()).thenReturn(Mono.defer(() -> Mono.error(new R2dbcBadGrammarException("Rollback should fail"))), Mono.empty()); + + TransactionalOperator operator = TransactionalOperator.create(tm); + + when(connectionMock.isAutoCommit()).thenReturn(true); + when(connectionMock.setAutoCommit(true)).thenReturn(Mono.defer(() -> Mono.error(new R2dbcTimeoutException("SET AUTOCOMMIT = 1 timed out")))); + when(connectionMock.setTransactionIsolationLevel(any())).thenReturn(Mono.empty()); + when(connectionMock.setAutoCommit(false)).thenReturn(Mono.empty()); + + operator.execute(reactiveTransaction -> ConnectionFactoryUtils.getConnection(connectionFactoryMock) + .doOnNext(connection -> { + throw new IllegalStateException("Intentional error to trigger rollback"); + }).then()).as(StepVerifier::create) + .verifyErrorSatisfies(e -> Assertions.assertThat(e) + .isInstanceOf(BadSqlGrammarException.class) + .hasCause(new R2dbcBadGrammarException("Rollback should fail")) + ); + + verify(connectionMock).isAutoCommit(); + verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class)); + verify(connectionMock, never()).commitTransaction(); + verify(connectionMock).rollbackTransaction(); + verify(connectionMock).close(); + } + @Test void testTransactionSetRollbackOnly() { when(connectionMock.rollbackTransaction()).thenReturn(Mono.empty());