Browse Source

Avoid rollback after a commit failure in TransactionalOperator

A failure to commit a reactive transaction will complete the
transaction and clean up resources. Executing a rollback at
that point is invalid, which causes an
IllegalTransactionStateException that masks the cause of the
commit failure.

This change restructures TransactionalOperatorImpl and
ReactiveTransactionSupport to avoid executing a rollback after
a failed commit. While there, the Mono transaction handling in
TransactionalOperator is simplified by moving it to a default
method on the interface.

Closes gh-27572
pull/30074/head
Enric Sala 4 years ago committed by Sébastien Deleuze
parent
commit
edf0ae77e5
  1. 4
      spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java
  2. 100
      spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java
  3. 7
      spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java
  4. 58
      spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java
  5. 6
      spring-tx/src/test/java/org/springframework/transaction/interceptor/AbstractReactiveTransactionAspectTests.java
  6. 86
      spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java
  7. 6
      spring-tx/src/test/kotlin/org/springframework/transaction/interceptor/AbstractCoroutinesTransactionAspectTests.kt

4
spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java

@ -264,7 +264,7 @@ class R2dbcTransactionManagerUnitTests { @@ -264,7 +264,7 @@ class R2dbcTransactionManagerUnitTests {
.doOnNext(connection -> connection.createStatement("foo")).then()
.as(operator::transactional)
.as(StepVerifier::create)
.verifyError(IllegalTransactionStateException.class);
.verifyError(BadSqlGrammarException.class);
verify(connectionMock).isAutoCommit();
verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class));
@ -317,7 +317,7 @@ class R2dbcTransactionManagerUnitTests { @@ -317,7 +317,7 @@ class R2dbcTransactionManagerUnitTests {
return ConnectionFactoryUtils.getConnection(connectionFactoryMock)
.doOnNext(connection -> connection.createStatement("foo")).then();
}).as(StepVerifier::create)
.verifyError(IllegalTransactionStateException.class);
.verifyError(BadSqlGrammarException.class);
verify(connectionMock).isAutoCommit();
verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class));

100
spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java

@ -84,6 +84,7 @@ import org.springframework.util.StringUtils; @@ -84,6 +84,7 @@ import org.springframework.util.StringUtils;
* @author Sam Brannen
* @author Mark Paluch
* @author Sebastien Deleuze
* @author Enric Sala
* @since 1.1
* @see PlatformTransactionManager
* @see ReactiveTransactionManager
@ -919,60 +920,41 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init @@ -919,60 +920,41 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
!COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName()))) {
return TransactionContextManager.currentContext().flatMap(context ->
createTransactionIfNecessary(rtm, txAttr, joinpointIdentification).flatMap(it -> {
try {
// Need re-wrapping until we get hold of the exception through usingWhen.
return Mono.<Object, ReactiveTransactionInfo>usingWhen(
Mono.just(it),
txInfo -> {
try {
return (Mono<?>) invocation.proceedWithInvocation();
}
catch (Throwable ex) {
return Mono.error(ex);
}
},
this::commitTransactionAfterReturning,
(txInfo, err) -> Mono.empty(),
this::rollbackTransactionOnCancel)
.onErrorResume(ex ->
completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)));
}
catch (Throwable ex) {
// target invocation exception
return completeTransactionAfterThrowing(it, ex).then(Mono.error(ex));
}
})).contextWrite(TransactionContextManager.getOrCreateContext())
Mono.<Object, ReactiveTransactionInfo>usingWhen(
createTransactionIfNecessary(rtm, txAttr, joinpointIdentification),
tx -> {
try {
return (Mono<?>) invocation.proceedWithInvocation();
}
catch (Throwable ex) {
return Mono.error(ex);
}
},
this::commitTransactionAfterReturning,
this::completeTransactionAfterThrowing,
this::rollbackTransactionOnCancel)
.onErrorMap(this::unwrapIfResourceCleanupFailure))
.contextWrite(TransactionContextManager.getOrCreateContext())
.contextWrite(TransactionContextManager.getOrCreateContextHolder());
}
// Any other reactive type, typically a Flux
return this.adapter.fromPublisher(TransactionContextManager.currentContext().flatMapMany(context ->
createTransactionIfNecessary(rtm, txAttr, joinpointIdentification).flatMapMany(it -> {
try {
// Need re-wrapping until we get hold of the exception through usingWhen.
return Flux
.usingWhen(
Mono.just(it),
txInfo -> {
try {
return this.adapter.toPublisher(invocation.proceedWithInvocation());
}
catch (Throwable ex) {
return Mono.error(ex);
}
},
this::commitTransactionAfterReturning,
(txInfo, ex) -> Mono.empty(),
this::rollbackTransactionOnCancel)
.onErrorResume(ex ->
completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)));
}
catch (Throwable ex) {
// target invocation exception
return completeTransactionAfterThrowing(it, ex).then(Mono.error(ex));
}
})).contextWrite(TransactionContextManager.getOrCreateContext())
Flux.usingWhen(
createTransactionIfNecessary(rtm, txAttr, joinpointIdentification),
tx -> {
try {
return this.adapter.toPublisher(invocation.proceedWithInvocation());
}
catch (Throwable ex) {
return Mono.error(ex);
}
},
this::commitTransactionAfterReturning,
this::completeTransactionAfterThrowing,
this::rollbackTransactionOnCancel)
.onErrorMap(this::unwrapIfResourceCleanupFailure))
.contextWrite(TransactionContextManager.getOrCreateContext())
.contextWrite(TransactionContextManager.getOrCreateContextHolder()));
}
@ -1053,6 +1035,9 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init @@ -1053,6 +1035,9 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
if (ex2 instanceof TransactionSystemException systemException) {
systemException.initApplicationException(ex);
}
else {
ex2.addSuppressed(ex);
}
return ex2;
}
);
@ -1065,6 +1050,9 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init @@ -1065,6 +1050,9 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
if (ex2 instanceof TransactionSystemException systemException) {
systemException.initApplicationException(ex);
}
else {
ex2.addSuppressed(ex);
}
return ex2;
}
);
@ -1072,6 +1060,20 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init @@ -1072,6 +1060,20 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
}
return Mono.empty();
}
/**
* Unwrap the cause of a throwable, if produced by a failure
* during the async resource cleanup in {@link Flux#usingWhen}.
* @param ex the throwable to try to unwrap
*/
private Throwable unwrapIfResourceCleanupFailure(Throwable ex) {
if (ex instanceof RuntimeException &&
ex.getCause() != null &&
ex.getMessage().startsWith("Async resource cleanup failed")) {
return ex.getCause();
}
return ex;
}
}

7
spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -45,6 +45,7 @@ import org.springframework.transaction.TransactionException; @@ -45,6 +45,7 @@ import org.springframework.transaction.TransactionException;
*
* @author Mark Paluch
* @author Juergen Hoeller
* @author Enric Sala
* @since 5.2
* @see #execute
* @see ReactiveTransactionManager
@ -69,7 +70,9 @@ public interface TransactionalOperator { @@ -69,7 +70,9 @@ public interface TransactionalOperator {
* @throws TransactionException in case of initialization, rollback, or system errors
* @throws RuntimeException if thrown by the TransactionCallback
*/
<T> Mono<T> transactional(Mono<T> mono);
default <T> Mono<T> transactional(Mono<T> mono) {
return execute(it -> mono).singleOrEmpty();
}
/**
* Execute the action specified by the given callback object within a transaction.

58
spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java

@ -35,6 +35,7 @@ import org.springframework.util.Assert; @@ -35,6 +35,7 @@ import org.springframework.util.Assert;
*
* @author Mark Paluch
* @author Juergen Hoeller
* @author Enric Sala
* @since 5.2
* @see #execute
* @see ReactiveTransactionManager
@ -70,40 +71,16 @@ final class TransactionalOperatorImpl implements TransactionalOperator { @@ -70,40 +71,16 @@ final class TransactionalOperatorImpl implements TransactionalOperator {
return this.transactionManager;
}
@Override
public <T> Mono<T> transactional(Mono<T> mono) {
return TransactionContextManager.currentContext().flatMap(context -> {
Mono<ReactiveTransaction> status = this.transactionManager.getReactiveTransaction(this.transactionDefinition);
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
// Need re-wrapping of ReactiveTransaction until we get hold of the exception
// through usingWhen.
return status.flatMap(it -> Mono.usingWhen(Mono.just(it), ignore -> mono,
this.transactionManager::commit, (res, err) -> Mono.empty(), this.transactionManager::rollback)
.onErrorResume(ex -> rollbackOnException(it, ex).then(Mono.error(ex))));
})
.contextWrite(TransactionContextManager.getOrCreateContext())
.contextWrite(TransactionContextManager.getOrCreateContextHolder());
}
@Override
public <T> Flux<T> execute(TransactionCallback<T> action) throws TransactionException {
return TransactionContextManager.currentContext().flatMapMany(context -> {
Mono<ReactiveTransaction> status = this.transactionManager.getReactiveTransaction(this.transactionDefinition);
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
// Need re-wrapping of ReactiveTransaction until we get hold of the exception
// through usingWhen.
return status.flatMapMany(it -> Flux
.usingWhen(
Mono.just(it),
action::doInTransaction,
this.transactionManager::commit,
(tx, ex) -> Mono.empty(),
this.transactionManager::rollback)
.onErrorResume(ex ->
rollbackOnException(it, ex).then(Mono.error(ex))));
})
return TransactionContextManager.currentContext().flatMapMany(context ->
Flux.usingWhen(
this.transactionManager.getReactiveTransaction(this.transactionDefinition),
action::doInTransaction,
this.transactionManager::commit,
this::rollbackOnException,
this.transactionManager::rollback)
.onErrorMap(this::unwrapIfResourceCleanupFailure))
.contextWrite(TransactionContextManager.getOrCreateContext())
.contextWrite(TransactionContextManager.getOrCreateContextHolder());
}
@ -121,11 +98,28 @@ final class TransactionalOperatorImpl implements TransactionalOperator { @@ -121,11 +98,28 @@ final class TransactionalOperatorImpl implements TransactionalOperator {
if (ex2 instanceof TransactionSystemException tse) {
tse.initApplicationException(ex);
}
else {
ex2.addSuppressed(ex);
}
return ex2;
}
);
}
/**
* Unwrap the cause of a throwable, if produced by a failure
* during the async resource cleanup in {@link Flux#usingWhen}.
* @param ex the throwable to try to unwrap
*/
private Throwable unwrapIfResourceCleanupFailure(Throwable ex) {
if (ex instanceof RuntimeException &&
ex.getCause() != null &&
ex.getMessage().startsWith("Async resource cleanup failed")) {
return ex.getCause();
}
return ex;
}
@Override
public boolean equals(@Nullable Object other) {

6
spring-tx/src/test/java/org/springframework/transaction/interceptor/AbstractReactiveTransactionAspectTests.java

@ -335,11 +335,7 @@ public abstract class AbstractReactiveTransactionAspectTests { @@ -335,11 +335,7 @@ public abstract class AbstractReactiveTransactionAspectTests {
Mono.from(itb.setName(name))
.as(StepVerifier::create)
.consumeErrorWith(throwable -> {
assertThat(throwable.getClass()).isEqualTo(RuntimeException.class);
assertThat(throwable.getCause()).isEqualTo(ex);
})
.verify();
.verifyErrorSatisfies(actual -> assertThat(actual).isEqualTo(ex));
// Should have invoked target and changed name

86
spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,21 +16,29 @@ @@ -16,21 +16,29 @@
package org.springframework.transaction.reactive;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import reactor.test.publisher.PublisherProbe;
import org.springframework.transaction.ReactiveTransaction;
import org.springframework.transaction.ReactiveTransactionManager;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
/**
* Tests for {@link TransactionalOperator}.
*
* @author Mark Paluch
* @author Enric Sala
*/
public class TransactionalOperatorTests {
@ -99,6 +107,43 @@ public class TransactionalOperatorTests { @@ -99,6 +107,43 @@ public class TransactionalOperatorTests {
assertThat(tm.rollback).isTrue();
}
@Test
public void commitFailureWithMono() {
ReactiveTransactionManager tm = mock(ReactiveTransactionManager.class);
given(tm.getReactiveTransaction(any())).willReturn(Mono.just(mock(ReactiveTransaction.class)));
PublisherProbe<Void> commit = PublisherProbe.of(Mono.error(IOException::new));
given(tm.commit(any())).willReturn(commit.mono());
PublisherProbe<Void> rollback = PublisherProbe.empty();
given(tm.rollback(any())).willReturn(rollback.mono());
TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition());
Mono.just(true).as(operator::transactional)
.as(StepVerifier::create)
.verifyError(IOException.class);
assertThat(commit.subscribeCount()).isEqualTo(1);
rollback.assertWasNotSubscribed();
}
@Test
public void rollbackFailureWithMono() {
ReactiveTransactionManager tm = mock(ReactiveTransactionManager.class);
given(tm.getReactiveTransaction(any())).willReturn(Mono.just(mock(ReactiveTransaction.class)));
PublisherProbe<Void> commit = PublisherProbe.empty();
given(tm.commit(any())).willReturn(commit.mono());
PublisherProbe<Void> rollback = PublisherProbe.of(Mono.error(IOException::new));
given(tm.rollback(any())).willReturn(rollback.mono());
TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition());
IllegalStateException actionFailure = new IllegalStateException();
Mono.error(actionFailure).as(operator::transactional)
.as(StepVerifier::create)
.verifyErrorSatisfies(ex -> assertThat(ex)
.isInstanceOf(IOException.class)
.hasSuppressedException(actionFailure));
commit.assertWasNotSubscribed();
assertThat(rollback.subscribeCount()).isEqualTo(1);
}
@Test
public void commitWithFlux() {
TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition());
@ -120,4 +165,43 @@ public class TransactionalOperatorTests { @@ -120,4 +165,43 @@ public class TransactionalOperatorTests {
assertThat(tm.rollback).isTrue();
}
@Test
public void commitFailureWithFlux() {
ReactiveTransactionManager tm = mock(ReactiveTransactionManager.class);
given(tm.getReactiveTransaction(any())).willReturn(Mono.just(mock(ReactiveTransaction.class)));
PublisherProbe<Void> commit = PublisherProbe.of(Mono.error(IOException::new));
given(tm.commit(any())).willReturn(commit.mono());
PublisherProbe<Void> rollback = PublisherProbe.empty();
given(tm.rollback(any())).willReturn(rollback.mono());
TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition());
Flux.just(1, 2, 3, 4).as(operator::transactional)
.as(StepVerifier::create)
.expectNextCount(4)
.verifyError(IOException.class);
assertThat(commit.subscribeCount()).isEqualTo(1);
rollback.assertWasNotSubscribed();
}
@Test
public void rollbackFailureWithFlux() {
ReactiveTransactionManager tm = mock(ReactiveTransactionManager.class);
given(tm.getReactiveTransaction(any())).willReturn(Mono.just(mock(ReactiveTransaction.class)));
PublisherProbe<Void> commit = PublisherProbe.empty();
given(tm.commit(any())).willReturn(commit.mono());
PublisherProbe<Void> rollback = PublisherProbe.of(Mono.error(IOException::new));
given(tm.rollback(any())).willReturn(rollback.mono());
TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition());
IllegalStateException actionFailure = new IllegalStateException();
Flux.just(1, 2, 3).concatWith(Flux.error(actionFailure)).as(operator::transactional)
.as(StepVerifier::create)
.expectNextCount(3)
.verifyErrorSatisfies(ex -> assertThat(ex)
.isInstanceOf(IOException.class)
.hasSuppressedException(actionFailure));
commit.assertWasNotSubscribed();
assertThat(rollback.subscribeCount()).isEqualTo(1);
}
}

6
spring-tx/src/test/kotlin/org/springframework/transaction/interceptor/AbstractCoroutinesTransactionAspectTests.kt

@ -290,9 +290,9 @@ abstract class AbstractCoroutinesTransactionAspectTests { @@ -290,9 +290,9 @@ abstract class AbstractCoroutinesTransactionAspectTests {
try {
itb.setName(name)
}
catch (ex: Exception) {
assertThat(ex).isInstanceOf(RuntimeException::class.java)
assertThat(ex.cause).hasMessage(ex.message).isInstanceOf(ex::class.java)
catch (actual: Exception) {
assertThat(actual).isInstanceOf(ex.javaClass)
assertThat(actual).hasMessage(ex.message)
}
// Should have invoked target and changed name
assertThat(itb.getName()).isEqualTo(name)

Loading…
Cancel
Save