diff --git a/spring-tx/spring-tx.gradle b/spring-tx/spring-tx.gradle index d293999edc5..db581051d89 100644 --- a/spring-tx/spring-tx.gradle +++ b/spring-tx/spring-tx.gradle @@ -1,5 +1,11 @@ description = "Spring Transaction" +dependencyManagement { + imports { + mavenBom "io.projectreactor:reactor-bom:${reactorVersion}" + } +} + dependencies { compile(project(":spring-beans")) compile(project(":spring-core")) @@ -10,8 +16,10 @@ dependencies { optional("javax.resource:javax.resource-api:1.7.1") optional("javax.transaction:javax.transaction-api:1.3") optional("com.ibm.websphere:uow:6.0.2.17") + optional("io.projectreactor:reactor-core") optional("io.vavr:vavr:0.10.0") testCompile("org.aspectj:aspectjweaver:${aspectjVersion}") testCompile("org.codehaus.groovy:groovy:${groovyVersion}") testCompile("org.eclipse.persistence:javax.persistence:2.2.0") + testCompile("io.projectreactor:reactor-test") } diff --git a/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionManager.java b/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionManager.java new file mode 100644 index 00000000000..adbaf38106e --- /dev/null +++ b/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionManager.java @@ -0,0 +1,107 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.transaction; + +import reactor.core.publisher.Mono; + +import org.springframework.lang.Nullable; + +/** + * This is the central interface in Spring's reactive transaction infrastructure. + * Applications can use this directly, but it is not primarily meant as API: + * Typically, applications will work with either transactional operators or + * declarative transaction demarcation through AOP. + * + * @author Mark Paluch + * @since 5.2 + * @see org.springframework.transaction.interceptor.TransactionProxyFactoryBean + */ +public interface ReactiveTransactionManager { + + /** + * Emit a currently active transaction or create a new one, according to + * the specified propagation behavior. + *

Note that parameters like isolation level or timeout will only be applied + * to new transactions, and thus be ignored when participating in active ones. + *

Furthermore, not all transaction definition settings will be supported + * by every transaction manager: A proper transaction manager implementation + * should throw an exception when unsupported settings are encountered. + *

An exception to the above rule is the read-only flag, which should be + * ignored if no explicit read-only mode is supported. Essentially, the + * read-only flag is just a hint for potential optimization. + * @param definition the TransactionDefinition instance (can be empty for defaults), + * describing propagation behavior, isolation level, timeout etc. + * @return transaction status object representing the new or current transaction + * @throws TransactionException in case of lookup, creation, or system errors + * @throws IllegalTransactionStateException if the given transaction definition + * cannot be executed (for example, if a currently active transaction is in + * conflict with the specified propagation behavior) + * @see TransactionDefinition#getPropagationBehavior + * @see TransactionDefinition#getIsolationLevel + * @see TransactionDefinition#getTimeout + * @see TransactionDefinition#isReadOnly + */ + Mono getTransaction(@Nullable TransactionDefinition definition) throws TransactionException; + + /** + * Commit the given transaction, with regard to its status. If the transaction + * has been marked rollback-only programmatically, perform a rollback. + *

If the transaction wasn't a new one, omit the commit for proper + * participation in the surrounding transaction. If a previous transaction + * has been suspended to be able to create a new one, resume the previous + * transaction after committing the new one. + *

Note that when the commit call completes, no matter if normally or + * throwing an exception, the transaction must be fully completed and + * cleaned up. No rollback call should be expected in such a case. + *

If this method throws an exception other than a TransactionException, + * then some before-commit error caused the commit attempt to fail. For + * example, an O/R Mapping tool might have tried to flush changes to the + * database right before commit, with the resulting DataAccessException + * causing the transaction to fail. The original exception will be + * propagated to the caller of this commit method in such a case. + * @param status object returned by the {@code getTransaction} method + * @throws UnexpectedRollbackException in case of an unexpected rollback + * that the transaction coordinator initiated + * @throws HeuristicCompletionException in case of a transaction failure + * caused by a heuristic decision on the side of the transaction coordinator + * @throws TransactionSystemException in case of commit or system errors + * (typically caused by fundamental resource failures) + * @throws IllegalTransactionStateException if the given transaction + * is already completed (that is, committed or rolled back) + * @see ReactiveTransactionStatus#setRollbackOnly + */ + Mono commit(ReactiveTransactionStatus status) throws TransactionException; + + /** + * Perform a rollback of the given transaction. + *

If the transaction wasn't a new one, just set it rollback-only for proper + * participation in the surrounding transaction. If a previous transaction + * has been suspended to be able to create a new one, resume the previous + * transaction after rolling back the new one. + *

Do not call rollback on a transaction if commit threw an exception. + * The transaction will already have been completed and cleaned up when commit + * returns, even in case of a commit exception. Consequently, a rollback call + * after commit failure will lead to an IllegalTransactionStateException. + * @param status object returned by the {@code getTransaction} method + * @throws TransactionSystemException in case of rollback or system errors + * (typically caused by fundamental resource failures) + * @throws IllegalTransactionStateException if the given transaction + * is already completed (that is, committed or rolled back) + */ + Mono rollback(ReactiveTransactionStatus status) throws TransactionException; + +} diff --git a/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionStatus.java b/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionStatus.java new file mode 100644 index 00000000000..99c3f77394e --- /dev/null +++ b/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionStatus.java @@ -0,0 +1,78 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.transaction; + +import reactor.core.publisher.Mono; + +/** + * Representation of the status of a transaction exposing a reactive + * interface. + * + *

Transactional code can use this to retrieve status information, + * and to programmatically request a rollback (instead of throwing + * an exception that causes an implicit rollback). + * + * @author Mark Paluch + * @since 5.2 + * @see #setRollbackOnly() + * @see ReactiveTransactionManager#getTransaction + */ +public interface ReactiveTransactionStatus { + + /** + * Return whether the present transaction is new; otherwise participating + * in an existing transaction, or potentially not running in an actual + * transaction in the first place. + */ + boolean isNewTransaction(); + + /** + * Set the transaction rollback-only. This instructs the transaction manager + * that the only possible outcome of the transaction may be a rollback, as + * alternative to throwing an exception which would in turn trigger a rollback. + *

This is mainly intended for transactions managed by + * {@link org.springframework.transaction.reactive.support.TransactionalOperator} or + * {@link org.springframework.transaction.interceptor.ReactiveTransactionInterceptor}, + * where the actual commit/rollback decision is made by the container. + * @see org.springframework.transaction.reactive.support.ReactiveTransactionCallback#doInTransaction + * @see org.springframework.transaction.interceptor.TransactionAttribute#rollbackOn + */ + void setRollbackOnly(); + + /** + * Return whether the transaction has been marked as rollback-only + * (either by the application or by the transaction infrastructure). + */ + boolean isRollbackOnly(); + + /** + * Flush the underlying session to the datastore, if applicable. + *

This is effectively just a hint and may be a no-op if the underlying + * transaction manager does not have a flush concept. A flush signal may + * get applied to the primary resource or to transaction synchronizations, + * depending on the underlying resource. + */ + Mono flush(); + + /** + * Return whether this transaction is completed, that is, + * whether it has already been committed or rolled back. + * @see ReactiveTransactionManager#commit + * @see ReactiveTransactionManager#rollback + */ + boolean isCompleted(); +} diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java new file mode 100644 index 00000000000..526767192db --- /dev/null +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java @@ -0,0 +1,1370 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.transaction.reactive; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Predicate; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.core.Constants; +import org.springframework.lang.Nullable; +import org.springframework.transaction.IllegalTransactionStateException; +import org.springframework.transaction.InvalidTimeoutException; +import org.springframework.transaction.NestedTransactionNotSupportedException; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.TransactionException; +import org.springframework.transaction.TransactionSuspensionNotSupportedException; +import org.springframework.transaction.UnexpectedRollbackException; +import org.springframework.transaction.ReactiveTransactionManager; +import org.springframework.transaction.ReactiveTransactionStatus; +import org.springframework.transaction.support.DefaultTransactionDefinition; +import org.springframework.transaction.support.TransactionSynchronizationManager; +import org.springframework.util.Assert; + + +/** + * Abstract base class that implements Spring's standard reactive transaction workflow, + * serving as basis for concrete platform transaction managers. + *

This base class provides the following workflow handling: + *

+ *

Subclasses have to implement specific template methods for specific + * states of a transaction, e.g.: begin, suspend, resume, commit, rollback. + * The most important of them are abstract and must be provided by a concrete + * implementation; for the rest, defaults are provided, so overriding is optional. + *

Transaction synchronization is a generic mechanism for registering callbacks + * that get invoked at transaction completion time. This is mainly used internally + * by the data access support classes for R2DBC, MongoDB, etc. The same mechanism can + * also be leveraged for custom synchronization needs in an application. + *

The state of this class is serializable, to allow for serializing the + * transaction strategy along with proxies that carry a transaction interceptor. + * It is up to subclasses if they wish to make their state to be serializable too. + * They should implement the {@code java.io.Serializable} marker interface in + * that case, and potentially a private {@code readObject()} method (according + * to Java serialization rules) if they need to restore any transient state. + * + * @author Mark Paluch + * @since 5.2 + * @see #setTransactionSynchronization + * @see ReactiveTransactionSynchronizationManager + */ +@SuppressWarnings({"serial", "WeakerAccess"}) +public abstract class AbstractReactiveTransactionManager implements ReactiveTransactionManager, Serializable { + + /** + * Always activate transaction synchronization, even for "empty" transactions + * that result from PROPAGATION_SUPPORTS with no existing backend transaction. + * + * @see org.springframework.transaction.TransactionDefinition#PROPAGATION_SUPPORTS + * @see org.springframework.transaction.TransactionDefinition#PROPAGATION_NOT_SUPPORTED + * @see org.springframework.transaction.TransactionDefinition#PROPAGATION_NEVER + */ + public static final int SYNCHRONIZATION_ALWAYS = 0; + + /** + * Activate transaction synchronization only for actual transactions, + * that is, not for empty ones that result from PROPAGATION_SUPPORTS with + * no existing backend transaction. + * + * @see org.springframework.transaction.TransactionDefinition#PROPAGATION_REQUIRED + * @see org.springframework.transaction.TransactionDefinition#PROPAGATION_MANDATORY + * @see org.springframework.transaction.TransactionDefinition#PROPAGATION_REQUIRES_NEW + */ + public static final int SYNCHRONIZATION_ON_ACTUAL_TRANSACTION = 1; + + /** + * Never active transaction synchronization, not even for actual transactions. + */ + public static final int SYNCHRONIZATION_NEVER = 2; + + + /** + * Constants instance for AbstractReactiveTransactionManager. + */ + private static final Constants constants = new Constants(AbstractReactiveTransactionManager.class); + + + protected transient Log logger = LogFactory.getLog(getClass()); + + private int transactionSynchronization = SYNCHRONIZATION_ALWAYS; + + private Duration defaultTimeout = Duration.ofSeconds(TransactionDefinition.TIMEOUT_DEFAULT); + + private boolean nestedTransactionAllowed = false; + + private boolean validateExistingTransaction = false; + + private boolean globalRollbackOnParticipationFailure = true; + + private boolean failEarlyOnGlobalRollbackOnly = false; + + private boolean rollbackOnCommitFailure = false; + + + /** + * Set the transaction synchronization by the name of the corresponding constant + * in this class, e.g. "SYNCHRONIZATION_ALWAYS". + * @param constantName name of the constant + * @see #SYNCHRONIZATION_ALWAYS + */ + public final void setTransactionSynchronizationName(String constantName) { + setTransactionSynchronization(constants.asNumber(constantName).intValue()); + } + + /** + * Set when this transaction manager should activate the subscriber context-bound + * transaction synchronization support. Default is "always". + *

Note that transaction synchronization isn't supported for + * multiple concurrent transactions by different transaction managers. + * Only one transaction manager is allowed to activate it at any time. + * @see #SYNCHRONIZATION_ALWAYS + * @see #SYNCHRONIZATION_ON_ACTUAL_TRANSACTION + * @see #SYNCHRONIZATION_NEVER + * @see ReactiveTransactionSynchronizationManager + * @see ReactiveTransactionSynchronization + */ + public final void setTransactionSynchronization(int transactionSynchronization) { + this.transactionSynchronization = transactionSynchronization; + } + + /** + * Return if this transaction manager should activate the subscriber context-bound + * transaction synchronization support. + */ + public final int getTransactionSynchronization() { + return this.transactionSynchronization; + } + + /** + * Specify the default timeout that this transaction manager should apply + * if there is no timeout specified at the transaction level, in seconds. + *

Default is the underlying transaction infrastructure's default timeout, + * e.g. typically 30 seconds in case of a JTA provider, indicated by the + * {@code TransactionDefinition.TIMEOUT_DEFAULT} value. + * @see org.springframework.transaction.TransactionDefinition#TIMEOUT_DEFAULT + */ + public final void setDefaultTimeout(Duration defaultTimeout) { + Assert.notNull(defaultTimeout, "Default timeout must not be null"); + if (defaultTimeout.getSeconds() < TransactionDefinition.TIMEOUT_DEFAULT) { + throw new InvalidTimeoutException("Invalid default timeout", (int) defaultTimeout.getSeconds()); + } + this.defaultTimeout = defaultTimeout; + } + + /** + * Return the default timeout that this transaction manager should apply + * if there is no timeout specified at the transaction level, in seconds. + *

Returns {@code TransactionDefinition.TIMEOUT_DEFAULT} to indicate + * the underlying transaction infrastructure's default timeout. + */ + public final Duration getDefaultTimeout() { + return this.defaultTimeout; + } + + /** + * Set whether nested transactions are allowed. Default is "false". + *

Typically initialized with an appropriate default by the + * concrete transaction manager subclass. + */ + public final void setNestedTransactionAllowed(boolean nestedTransactionAllowed) { + this.nestedTransactionAllowed = nestedTransactionAllowed; + } + + /** + * Return whether nested transactions are allowed. + */ + public final boolean isNestedTransactionAllowed() { + return this.nestedTransactionAllowed; + } + + /** + * Set whether existing transactions should be validated before participating + * in them. + *

When participating in an existing transaction (e.g. with + * PROPAGATION_REQUIRED or PROPAGATION_SUPPORTS encountering an existing + * transaction), this outer transaction's characteristics will apply even + * to the inner transaction scope. Validation will detect incompatible + * isolation level and read-only settings on the inner transaction definition + * and reject participation accordingly through throwing a corresponding exception. + *

Default is "false", leniently ignoring inner transaction settings, + * simply overriding them with the outer transaction's characteristics. + * Switch this flag to "true" in order to enforce strict validation. + */ + public final void setValidateExistingTransaction(boolean validateExistingTransaction) { + this.validateExistingTransaction = validateExistingTransaction; + } + + /** + * Return whether existing transactions should be validated before participating + * in them. + */ + public final boolean isValidateExistingTransaction() { + return this.validateExistingTransaction; + } + + /** + * Set whether to globally mark an existing transaction as rollback-only + * after a participating transaction failed. + *

Default is "true": If a participating transaction (e.g. with + * PROPAGATION_REQUIRED or PROPAGATION_SUPPORTS encountering an existing + * transaction) fails, the transaction will be globally marked as rollback-only. + * The only possible outcome of such a transaction is a rollback: The + * transaction originator cannot make the transaction commit anymore. + *

Switch this to "false" to let the transaction originator make the rollback + * decision. If a participating transaction fails with an exception, the caller + * can still decide to continue with a different path within the transaction. + * However, note that this will only work as long as all participating resources + * are capable of continuing towards a transaction commit even after a data access + * failure: This is generally not the case for a Hibernate Session, for example; + * neither is it for a sequence of R2DBC insert/update/delete operations. + *

Note:This flag only applies to an explicit rollback attempt for a + * subtransaction, typically caused by an exception thrown by a data access operation + * (where TransactionInterceptor will trigger a {@code ReactiveTransactionManager.rollback()} + * call according to a rollback rule). If the flag is off, the caller can handle the exception + * and decide on a rollback, independent of the rollback rules of the subtransaction. + * This flag does, however, not apply to explicit {@code setRollbackOnly} + * calls on a {@code TransactionStatus}, which will always cause an eventual + * global rollback (as it might not throw an exception after the rollback-only call). + *

The recommended solution for handling failure of a subtransaction + * is a "nested transaction", where the global transaction can be rolled + * back to a savepoint taken at the beginning of the subtransaction. + * PROPAGATION_NESTED provides exactly those semantics; however, it will + * only work when nested transaction support is available. This is the case + * with DataSourceTransactionManager, but not with JtaTransactionManager. + * @see #setNestedTransactionAllowed + */ + public final void setGlobalRollbackOnParticipationFailure(boolean globalRollbackOnParticipationFailure) { + this.globalRollbackOnParticipationFailure = globalRollbackOnParticipationFailure; + } + + /** + * Return whether to globally mark an existing transaction as rollback-only + * after a participating transaction failed. + */ + public final boolean isGlobalRollbackOnParticipationFailure() { + return this.globalRollbackOnParticipationFailure; + } + + /** + * Set whether to fail early in case of the transaction being globally marked + * as rollback-only. + *

Default is "false", only causing an UnexpectedRollbackException at the + * outermost transaction boundary. Switch this flag on to cause an + * UnexpectedRollbackException as early as the global rollback-only marker + * has been first detected, even from within an inner transaction boundary. + * @see org.springframework.transaction.UnexpectedRollbackException + */ + public final void setFailEarlyOnGlobalRollbackOnly(boolean failEarlyOnGlobalRollbackOnly) { + this.failEarlyOnGlobalRollbackOnly = failEarlyOnGlobalRollbackOnly; + } + + /** + * Return whether to fail early in case of the transaction being globally marked + * as rollback-only. + */ + public final boolean isFailEarlyOnGlobalRollbackOnly() { + return this.failEarlyOnGlobalRollbackOnly; + } + + /** + * Set whether {@code doRollback} should be performed on failure of the + * {@code doCommit} call. Typically not necessary and thus to be avoided, + * as it can potentially override the commit exception with a subsequent + * rollback exception. + *

Default is "false". + * @see #doCommit + * @see #doRollback + */ + public final void setRollbackOnCommitFailure(boolean rollbackOnCommitFailure) { + this.rollbackOnCommitFailure = rollbackOnCommitFailure; + } + + /** + * Return whether {@code doRollback} should be performed on failure of the + * {@code doCommit} call. + */ + public final boolean isRollbackOnCommitFailure() { + return this.rollbackOnCommitFailure; + } + + //--------------------------------------------------------------------- + // Implementation of ReactiveTransactionManager + //--------------------------------------------------------------------- + + /** + * This implementation handles propagation behavior. Delegates to + * {@code doGetTransaction}, {@code isExistingTransaction} + * and {@code doBegin}. + * @see #doGetTransaction + * @see #isExistingTransaction + * @see #doBegin + */ + @Override + public final Mono getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { + + if (definition == null) { + // Use defaults if no transaction definition given. + definition = new DefaultTransactionDefinition(); + } + + TransactionDefinition definitionToUse = definition; + + return ReactiveTransactionSynchronizationManager.currentTransaction() + .flatMap(synchronizationManager -> { + + Object transaction = doGetTransaction(synchronizationManager); + + // Cache debug flag to avoid repeated checks. + boolean debugEnabled = logger.isDebugEnabled(); + + if (isExistingTransaction(transaction)) { + // Existing transaction found -> check propagation behavior to find out how to behave. + return handleExistingTransaction(synchronizationManager, definitionToUse, transaction, debugEnabled); + } + + // Check definition settings for new transaction. + if (definitionToUse.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { + return Mono.error(new InvalidTimeoutException("Invalid transaction timeout", definitionToUse.getTimeout())); + } + + // No existing transaction found -> check propagation behavior to find out how to proceed. + if (definitionToUse.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { + return Mono.error(new IllegalTransactionStateException( + "No existing transaction found for transaction marked with propagation 'mandatory'")); + } else if (definitionToUse.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || + definitionToUse.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || + definitionToUse.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { + + return TransactionContextManager.currentContext() + .map(ReactiveTransactionSynchronizationManager::new) + .flatMap(nestedSynchronizationManager -> { + + return suspend(nestedSynchronizationManager, null) + .map(Optional::of) + .defaultIfEmpty(Optional.empty()) + .flatMap(suspendedResources -> { + + if (debugEnabled) { + logger.debug("Creating new transaction with name [" + definitionToUse.getName() + "]: " + definitionToUse); + } + + return Mono.defer(() -> { + boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); + DefaultReactiveTransactionStatus status = newTransactionStatus( + nestedSynchronizationManager, definitionToUse, transaction, true, + newSynchronization, debugEnabled, suspendedResources.orElse(null)); + + return doBegin(nestedSynchronizationManager, transaction, definitionToUse) + .doOnSuccess(ignore -> prepareSynchronization(nestedSynchronizationManager, status, definitionToUse)) + .thenReturn(status); + }).onErrorResume(ErrorPredicates.RuntimeOrError, e -> { + return resume(nestedSynchronizationManager, null, suspendedResources.orElse(null)) + .then(Mono.error(e)); + }); + }); + }); + } else { + // Create "empty" transaction: no actual transaction, but potentially synchronization. + if (definitionToUse.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { + logger.warn("Custom isolation level specified but no actual transaction initiated; " + + "isolation level will effectively be ignored: " + definitionToUse); + } + boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); + return Mono.just(prepareTransactionStatus(synchronizationManager, definitionToUse, null, true, newSynchronization, debugEnabled, null)); + } + }); + } + + /** + * Create a TransactionStatus for an existing transaction. + */ + private Mono handleExistingTransaction(ReactiveTransactionSynchronizationManager synchronizationManager, + TransactionDefinition definition, Object transaction, boolean debugEnabled) + throws TransactionException { + + if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { + return Mono.error(new IllegalTransactionStateException( + "Existing transaction found for transaction marked with propagation 'never'")); + } + + if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { + if (debugEnabled) { + logger.debug("Suspending current transaction"); + } + Mono suspend = suspend(synchronizationManager, transaction); + boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); + + return suspend.map(suspendedResources -> prepareTransactionStatus(synchronizationManager, + definition, null, false, newSynchronization, debugEnabled, suspendedResources)) // + .switchIfEmpty(Mono.fromSupplier(() -> prepareTransactionStatus(synchronizationManager, + definition, null, false, newSynchronization, debugEnabled, null))) + .cast(ReactiveTransactionStatus.class); + } + + if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { + if (debugEnabled) { + logger.debug("Suspending current transaction, creating new transaction with name [" + + definition.getName() + "]"); + } + Mono suspendedResources = suspend(synchronizationManager, transaction); + boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); + + return suspendedResources.flatMap(suspendedResourcesHolder -> { + + DefaultReactiveTransactionStatus status = newTransactionStatus(synchronizationManager, + definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); + return doBegin(synchronizationManager, transaction, definition).doOnSuccess(ignore -> { + prepareSynchronization(synchronizationManager, status, definition); + }).thenReturn(status). + + onErrorResume(ErrorPredicates.RuntimeOrError, beginEx -> { + return resumeAfterBeginException(synchronizationManager, transaction, suspendedResourcesHolder, beginEx).then(Mono.error(beginEx)); + }); + }); + } + + if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { + if (!isNestedTransactionAllowed()) { + return Mono.error(new NestedTransactionNotSupportedException( + "Transaction manager does not allow nested transactions by default - " + + "specify 'nestedTransactionAllowed' property with value 'true'")); + } + if (debugEnabled) { + logger.debug("Creating nested transaction with name [" + definition.getName() + "]"); + } + + // Nested transaction through nested begin and commit/rollback calls. + boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); + DefaultReactiveTransactionStatus status = newTransactionStatus(synchronizationManager, + definition, transaction, true, newSynchronization, debugEnabled, null); + + return doBegin(synchronizationManager, transaction, definition).doOnSuccess(ignore -> { + prepareSynchronization(synchronizationManager, status, definition); + }).thenReturn(status); + } + + // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED. + if (debugEnabled) { + logger.debug("Participating in existing transaction"); + } + if (isValidateExistingTransaction()) { + if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { + Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); + if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) { + Constants isoConstants = new Constants(TransactionDefinition.class); + return Mono.error(new IllegalTransactionStateException("Participating transaction with definition [" + + definition + "] specifies isolation level which is incompatible with existing transaction: " + + (currentIsolationLevel != null ? + isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) : + "(unknown)"))); + } + } + if (!definition.isReadOnly()) { + if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) { + return Mono.error(new IllegalTransactionStateException("Participating transaction with definition [" + + definition + "] is not marked as read-only but existing transaction is")); + } + } + } + boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); + return Mono.just(prepareTransactionStatus(synchronizationManager, definition, transaction, false, newSynchronization, debugEnabled, null)); + } + + /** + * Create a new TransactionStatus for the given arguments, + * also initializing transaction synchronization as appropriate. + * + * @see #newTransactionStatus + * @see #prepareTransactionStatus + */ + protected final DefaultReactiveTransactionStatus prepareTransactionStatus( + ReactiveTransactionSynchronizationManager synchronizationManager, TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, + boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) { + + DefaultReactiveTransactionStatus status = newTransactionStatus(synchronizationManager, + definition, transaction, newTransaction, newSynchronization, debug, suspendedResources); + prepareSynchronization(synchronizationManager, status, definition); + return status; + } + + /** + * Create a TransactionStatus instance for the given arguments. + */ + protected DefaultReactiveTransactionStatus newTransactionStatus( + ReactiveTransactionSynchronizationManager synchronizationManager, TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, + boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) { + + boolean actualNewSynchronization = newSynchronization && + !synchronizationManager.isSynchronizationActive(); + return new DefaultReactiveTransactionStatus( + transaction, newTransaction, actualNewSynchronization, + definition.isReadOnly(), debug, suspendedResources); + } + + /** + * Initialize transaction synchronization as appropriate. + */ + protected void prepareSynchronization(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status, TransactionDefinition definition) { + if (status.isNewSynchronization()) { + synchronizationManager.setActualTransactionActive(status.hasTransaction()); + synchronizationManager.setCurrentTransactionIsolationLevel( + definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ? + definition.getIsolationLevel() : null); + synchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly()); + synchronizationManager.setCurrentTransactionName(definition.getName()); + synchronizationManager.initSynchronization(); + } + } + + /** + * Determine the actual timeout to use for the given definition. + * Will fall back to this manager's default timeout if the + * transaction definition doesn't specify a non-default value. + * @param definition the transaction definition + * @return the actual timeout to use + * @see org.springframework.transaction.TransactionDefinition#getTimeout() + * @see #setDefaultTimeout + */ + protected Duration determineTimeout(TransactionDefinition definition) { + if (definition.getTimeout() != TransactionDefinition.TIMEOUT_DEFAULT) { + return Duration.ofSeconds(definition.getTimeout()); + } + return this.defaultTimeout; + } + + + /** + * Suspend the given transaction. Suspends transaction synchronization first, + * then delegates to the {@code doSuspend} template method. + * @param synchronizationManager the synchronization manager bound to the current transaction + * @param transaction the current transaction object + * (or {@code null} to just suspend active synchronizations, if any) + * @return an object that holds suspended resources + * (or {@code null} if neither transaction nor synchronization active) + * @see #doSuspend + * @see #resume + */ + protected final Mono suspend(ReactiveTransactionSynchronizationManager synchronizationManager, @Nullable Object transaction) throws TransactionException { + if (synchronizationManager.isSynchronizationActive()) { + Mono> suspendedSynchronizations = doSuspendSynchronization(synchronizationManager); + + return suspendedSynchronizations.flatMap(synchronizations -> { + + Mono> suspendedResources = transaction != null ? doSuspend(synchronizationManager, transaction).map(Optional::of).defaultIfEmpty(Optional.empty()) : Mono.just(Optional.empty()); + + return suspendedResources.map(it -> { + + String name = synchronizationManager.getCurrentTransactionName(); + synchronizationManager.setCurrentTransactionName(null); + boolean readOnly = synchronizationManager.isCurrentTransactionReadOnly(); + synchronizationManager.setCurrentTransactionReadOnly(false); + Integer isolationLevel = synchronizationManager.getCurrentTransactionIsolationLevel(); + synchronizationManager.setCurrentTransactionIsolationLevel(null); + boolean wasActive = synchronizationManager.isActualTransactionActive(); + synchronizationManager.setActualTransactionActive(false); + return new SuspendedResourcesHolder( + it.orElse(null), synchronizations, name, readOnly, isolationLevel, wasActive); + }).onErrorResume(ErrorPredicates.RuntimeOrError, t -> doResumeSynchronization(synchronizationManager, synchronizations).cast(SuspendedResourcesHolder.class)); + }); + } else if (transaction != null) { + // Transaction active but no synchronization active. + Mono> suspendedResources = doSuspend(synchronizationManager, transaction).map(Optional::of).defaultIfEmpty(Optional.empty()); + return suspendedResources.map(it -> new SuspendedResourcesHolder(it.orElse(null))); + } else { + // Neither transaction nor synchronization active. + return Mono.empty(); + } + } + + /** + * Resume the given transaction. Delegates to the {@code doResume} + * template method first, then resuming transaction synchronization. + * @param synchronizationManager the synchronization manager bound to the current transaction + * @param transaction the current transaction object + * @param resourcesHolder the object that holds suspended resources, + * as returned by {@code suspend} (or {@code null} to just + * resume synchronizations, if any) + * @see #doResume + * @see #suspend + */ + protected final Mono resume(ReactiveTransactionSynchronizationManager synchronizationManager, @Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder) + throws TransactionException { + + if (resourcesHolder != null) { + Object suspendedResources = resourcesHolder.suspendedResources; + if (suspendedResources != null) { + return doResume(synchronizationManager, transaction, suspendedResources); + } + List suspendedSynchronizations = resourcesHolder.suspendedSynchronizations; + if (suspendedSynchronizations != null) { + synchronizationManager.setActualTransactionActive(resourcesHolder.wasActive); + synchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel); + synchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly); + synchronizationManager.setCurrentTransactionName(resourcesHolder.name); + return doResumeSynchronization(synchronizationManager, suspendedSynchronizations); + } + } + + return Mono.empty(); + } + + /** + * Resume outer transaction after inner transaction begin failed. + */ + private Mono resumeAfterBeginException(ReactiveTransactionSynchronizationManager synchronizationManager, + Object transaction, @Nullable SuspendedResourcesHolder suspendedResources, Throwable beginEx) { + + String exMessage = "Inner transaction begin exception overridden by outer transaction resume exception"; + return resume(synchronizationManager, transaction, suspendedResources).doOnError(ErrorPredicates.RuntimeOrError, t -> logger.error(exMessage, beginEx)); + } + + /** + * Suspend all current synchronizations and deactivate transaction + * synchronization for the current transaction context. + * + * @param synchronizationManager the synchronization manager bound to the current transaction + * @return the List of suspended ReactiveTransactionSynchronization objects + */ + private Mono> doSuspendSynchronization(ReactiveTransactionSynchronizationManager synchronizationManager) { + List suspendedSynchronizations = + synchronizationManager.getSynchronizations(); + + return Flux.fromIterable(suspendedSynchronizations) + .concatMap(ReactiveTransactionSynchronization::suspend) + .then(Mono.defer(() -> { + synchronizationManager.clearSynchronization(); + return Mono.just(suspendedSynchronizations); + })); + } + + /** + * Reactivate transaction synchronization for the current transaction context + * and resume all given synchronizations. + * @param synchronizationManager the synchronization manager bound to the current transaction + * @param suspendedSynchronizations a List of ReactiveTransactionSynchronization objects + */ + private Mono doResumeSynchronization(ReactiveTransactionSynchronizationManager synchronizationManager, List suspendedSynchronizations) { + synchronizationManager.initSynchronization(); + + return Flux.fromIterable(suspendedSynchronizations) + .concatMap(synchronization -> { + return synchronization.resume() + .doOnSuccess(ignore -> synchronizationManager.registerSynchronization(synchronization)); + }).then(); + } + + + /** + * This implementation of commit handles participating in existing + * transactions and programmatic rollback requests. + * Delegates to {@code isRollbackOnly}, {@code doCommit} + * and {@code rollback}. + * @see ReactiveTransactionStatus#isRollbackOnly() + * @see #doCommit + * @see #rollback + */ + @Override + public final Mono commit(ReactiveTransactionStatus status) throws TransactionException { + if (status.isCompleted()) { + return Mono.error(new IllegalTransactionStateException( + "Transaction is already completed - do not call commit or rollback more than once per transaction")); + } + + return ReactiveTransactionSynchronizationManager.currentTransaction().flatMap(synchronizationManager -> { + + DefaultReactiveTransactionStatus defStatus = (DefaultReactiveTransactionStatus) status; + if (defStatus.isLocalRollbackOnly()) { + if (defStatus.isDebug()) { + logger.debug("Transactional code has requested rollback"); + } + return processRollback(synchronizationManager, defStatus, false); + } + + if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { + if (defStatus.isDebug()) { + logger.debug("Global transaction is marked as rollback-only but transactional code requested commit"); + } + return processRollback(synchronizationManager, defStatus, true); + } + + return processCommit(synchronizationManager, defStatus); + }); + } + + /** + * Process an actual commit. + * Rollback-only flags have already been checked and applied. + * @param synchronizationManager the synchronization manager bound to the current transaction + * @param status object representing the transaction + * @throws TransactionException in case of commit failure + */ + private Mono processCommit(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) throws TransactionException { + + AtomicBoolean beforeCompletionInvoked = new AtomicBoolean(false); + AtomicBoolean unexpectedRollback = new AtomicBoolean(false); + + Mono commit = prepareForCommit(synchronizationManager, status) + .then(triggerBeforeCommit(synchronizationManager, status)) + .then(triggerBeforeCompletion(synchronizationManager, status)) + .then(Mono.defer(() -> { + + beforeCompletionInvoked.set(true); + + if (status.isNewTransaction()) { + if (status.isDebug()) { + logger.debug("Initiating transaction commit"); + } + unexpectedRollback.set(status.isGlobalRollbackOnly()); + return doCommit(synchronizationManager, status); + } else if (isFailEarlyOnGlobalRollbackOnly()) { + unexpectedRollback.set(status.isGlobalRollbackOnly()); + } + + return Mono.empty(); + })).then(Mono.defer(() -> { + + // Throw UnexpectedRollbackException if we have a global rollback-only + // marker but still didn't get a corresponding exception from commit. + if (unexpectedRollback.get()) { + return Mono.error(new UnexpectedRollbackException( + "Transaction silently rolled back because it has been marked as rollback-only")); + } + + return Mono.empty(); + }).onErrorResume(e -> { + + Mono propagateException = Mono.error(e); + if (ErrorPredicates.UnexpectedRollback.test(e)) { + return triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_ROLLED_BACK).then(propagateException); + } + + if (ErrorPredicates.TransactionException.test(e)) { + + Mono mono; + // can only be caused by doCommit + if (isRollbackOnCommitFailure()) { + mono = doRollbackOnCommitException(synchronizationManager, status, e); + } else { + mono = triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_UNKNOWN); + } + return mono.then(propagateException); + } + + if (ErrorPredicates.RuntimeOrError.test(e)) { + + Mono mono; + if (!beforeCompletionInvoked.get()) { + mono = triggerBeforeCompletion(synchronizationManager, status); + } else { + mono = Mono.empty(); + } + return mono.then(doRollbackOnCommitException(synchronizationManager, status, e)).then(propagateException); + } + + return propagateException; + })).then(Mono.defer((() -> { + + // Trigger afterCommit callbacks, with an exception thrown there + // propagated to callers but the transaction still considered as committed. + + return triggerAfterCommit(synchronizationManager, status).onErrorResume(e -> { + return triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_COMMITTED).then(Mono.error(e)); + }).then(triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_COMMITTED)); + }))); + + return commit + .onErrorResume((e) -> { + return cleanupAfterCompletion(synchronizationManager, status).then(Mono.error(e)); + }).then(cleanupAfterCompletion(synchronizationManager, status)); + } + + /** + * This implementation of rollback handles participating in existing + * transactions. Delegates to {@code doRollback} and + * {@code doSetRollbackOnly}. + * @see #doRollback + * @see #doSetRollbackOnly + */ + @Override + public final Mono rollback(ReactiveTransactionStatus status) throws TransactionException { + if (status.isCompleted()) { + return Mono.error(new IllegalTransactionStateException( + "Transaction is already completed - do not call commit or rollback more than once per transaction")); + } + + return ReactiveTransactionSynchronizationManager.currentTransaction().flatMap(synchronizationManager -> { + DefaultReactiveTransactionStatus defStatus = (DefaultReactiveTransactionStatus) status; + return processRollback(synchronizationManager, defStatus, false); + }); + } + + /** + * Process an actual rollback. + * The completed flag has already been checked. + * @param synchronizationManager the synchronization manager bound to the current transaction + * @param status object representing the transaction + * @throws TransactionException in case of rollback failure + */ + private Mono processRollback(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status, boolean unexpected) { + AtomicBoolean unexpectedRollback = new AtomicBoolean(unexpected); + + return triggerBeforeCompletion(synchronizationManager, status) + .then(Mono.defer(() -> { + + if (status.isNewTransaction()) { + if (status.isDebug()) { + logger.debug("Initiating transaction rollback"); + } + return doRollback(synchronizationManager, status); + } else { + + Mono beforeCompletion = Mono.empty(); + // Participating in larger transaction + if (status.hasTransaction()) { + if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) { + if (status.isDebug()) { + logger.debug("Participating transaction failed - marking existing transaction as rollback-only"); + } + beforeCompletion = doSetRollbackOnly(synchronizationManager, status); + } else { + if (status.isDebug()) { + logger.debug("Participating transaction failed - letting transaction originator decide on rollback"); + } + } + } else { + logger.debug("Should roll back transaction but cannot - no transaction available"); + } + + return beforeCompletion.doOnSuccess(ignore -> { + + // Unexpected rollback only matters here if we're asked to fail early + if (!isFailEarlyOnGlobalRollbackOnly()) { + unexpectedRollback.set(false); + } + }); + } + })).onErrorResume(ErrorPredicates.RuntimeOrError, e -> triggerAfterCompletion(synchronizationManager, + status, ReactiveTransactionSynchronization.STATUS_UNKNOWN) + .then(Mono.error(e))) + .then(Mono.defer(() -> { + + Mono afterCompletion = triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_ROLLED_BACK); + + // Raise UnexpectedRollbackException if we had a global rollback-only marker + if (unexpectedRollback.get()) { + return afterCompletion.then(Mono.error(new UnexpectedRollbackException( + "Transaction rolled back because it has been marked as rollback-only"))); + } + return afterCompletion; + })).onErrorResume((e) -> cleanupAfterCompletion(synchronizationManager, status) + .then(Mono.error(e))) + .then(cleanupAfterCompletion(synchronizationManager, status)); + } + + /** + * Invoke {@code doRollback}, handling rollback exceptions properly. + * + * @param synchronizationManager the synchronization manager bound to the current transaction + * @param status object representing the transaction + * @param ex the thrown application exception or error + * @throws TransactionException in case of rollback failure + * @see #doRollback + */ + private Mono doRollbackOnCommitException(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status, Throwable ex) throws TransactionException { + + return Mono.defer(() -> { + + if (status.isNewTransaction()) { + if (status.isDebug()) { + logger.debug("Initiating transaction rollback after commit exception", ex); + } + return doRollback(synchronizationManager, status); + } else if (status.hasTransaction() && isGlobalRollbackOnParticipationFailure()) { + if (status.isDebug()) { + logger.debug("Marking existing transaction as rollback-only after commit exception", ex); + } + return doSetRollbackOnly(synchronizationManager, status); + } + + return Mono.empty(); + }).onErrorResume(ErrorPredicates.RuntimeOrError, (rbex) -> { + + logger.error("Commit exception overridden by rollback exception", ex); + return triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_UNKNOWN) + .then(Mono.error(rbex)); + }).then(triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_ROLLED_BACK)); + } + + + /** + * Trigger {@code beforeCommit} callbacks. + * @param synchronizationManager the synchronization manager bound to the current transaction + * @param status object representing the transaction + */ + protected final Mono triggerBeforeCommit(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) { + if (status.isNewSynchronization()) { + if (status.isDebug()) { + logger.trace("Triggering beforeCommit synchronization"); + } + return ReactiveTransactionSynchronizationUtils.triggerBeforeCommit(synchronizationManager.getSynchronizations(), status.isReadOnly()); + } + + return Mono.empty(); + } + + /** + * Trigger {@code beforeCompletion} callbacks. + * @param synchronizationManager the synchronization manager bound to the current transaction + * @param status object representing the transaction + */ + protected final Mono triggerBeforeCompletion(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) { + if (status.isNewSynchronization()) { + if (status.isDebug()) { + logger.trace("Triggering beforeCompletion synchronization"); + } + return ReactiveTransactionSynchronizationUtils.triggerBeforeCompletion(synchronizationManager.getSynchronizations()); + } + + return Mono.empty(); + } + + /** + * Trigger {@code afterCommit} callbacks. + * @param synchronizationManager the synchronization manager bound to the current transaction + * @param status object representing the transaction + */ + private Mono triggerAfterCommit(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) { + if (status.isNewSynchronization()) { + if (status.isDebug()) { + logger.trace("Triggering afterCommit synchronization"); + } + return ReactiveTransactionSynchronizationUtils.invokeAfterCommit(synchronizationManager.getSynchronizations()); + } + + return Mono.empty(); + } + + /** + * Trigger {@code afterCompletion} callbacks. + * @param synchronizationManager the synchronization manager bound to the current transaction + * @param status object representing the transaction + * @param completionStatus completion status according to ReactiveTransactionSynchronization constants + */ + private Mono triggerAfterCompletion(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status, int completionStatus) { + if (status.isNewSynchronization()) { + List synchronizations = synchronizationManager.getSynchronizations(); + synchronizationManager.clearSynchronization(); + if (!status.hasTransaction() || status.isNewTransaction()) { + if (status.isDebug()) { + logger.trace("Triggering afterCompletion synchronization"); + } + // No transaction or new transaction for the current scope -> + // invoke the afterCompletion callbacks immediately + return invokeAfterCompletion(synchronizationManager, synchronizations, completionStatus); + } else if (!synchronizations.isEmpty()) { + // Existing transaction that we participate in, controlled outside + // of the scope of this Spring transaction manager -> try to register + // an afterCompletion callback with the existing (JTA) transaction. + return registerAfterCompletionWithExistingTransaction(synchronizationManager, status.getTransaction(), synchronizations); + } + } + + return Mono.empty(); + } + + /** + * Actually invoke the {@code afterCompletion} methods of the + * given Spring ReactiveTransactionSynchronization objects. + *

To be called by this abstract manager itself, or by special implementations + * of the {@code registerAfterCompletionWithExistingTransaction} callback. + * @param synchronizationManager the synchronization manager bound to the current transaction + * @param synchronizations a List of ReactiveTransactionSynchronization objects + * @param completionStatus the completion status according to the + * constants in the ReactiveTransactionSynchronization interface + * @see #registerAfterCompletionWithExistingTransaction(ReactiveTransactionSynchronizationManager, Object, List) + * @see ReactiveTransactionSynchronization#STATUS_COMMITTED + * @see ReactiveTransactionSynchronization#STATUS_ROLLED_BACK + * @see ReactiveTransactionSynchronization#STATUS_UNKNOWN + */ + protected final Mono invokeAfterCompletion(ReactiveTransactionSynchronizationManager synchronizationManager, List synchronizations, int completionStatus) { + return ReactiveTransactionSynchronizationUtils.invokeAfterCompletion(synchronizations, completionStatus); + } + + /** + * Clean up after completion, clearing synchronization if necessary, + * and invoking doCleanupAfterCompletion. + * @param synchronizationManager the synchronization manager bound to the current transaction + * @param status object representing the transaction + * @see #doCleanupAfterCompletion + */ + private Mono cleanupAfterCompletion(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) { + + return Mono.defer(() -> { + + status.setCompleted(); + if (status.isNewSynchronization()) { + synchronizationManager.clear(); + } + if (status.isNewTransaction()) { + doCleanupAfterCompletion(synchronizationManager, status.getTransaction()); + } + if (status.getSuspendedResources() != null) { + if (status.isDebug()) { + logger.debug("Resuming suspended transaction after completion of inner transaction"); + } + Object transaction = (status.hasTransaction() ? status.getTransaction() : null); + return resume(synchronizationManager, transaction, (SuspendedResourcesHolder) status.getSuspendedResources()); + } + + return Mono.empty(); + }); + } + + //--------------------------------------------------------------------- + // Template methods to be implemented in subclasses + //--------------------------------------------------------------------- + + /** + * Return a transaction object for the current transaction state. + *

The returned object will usually be specific to the concrete transaction + * manager implementation, carrying corresponding transaction state in a + * modifiable fashion. This object will be passed into the other template + * methods (e.g. doBegin and doCommit), either directly or as part of a + * DefaultReactiveTransactionStatus instance. + *

The returned object should contain information about any existing + * transaction, that is, a transaction that has already started before the + * current {@code getTransaction} call on the transaction manager. + * Consequently, a {@code doGetTransaction} implementation will usually + * look for an existing transaction and store corresponding state in the + * returned transaction object. + * @param synchronizationManager the synchronization manager bound to the current transaction + * @return the current transaction object + * @throws org.springframework.transaction.CannotCreateTransactionException if transaction support is not available + * @throws TransactionException in case of lookup or system errors + * @see #doBegin + * @see #doCommit + * @see #doRollback + * @see DefaultReactiveTransactionStatus#getTransaction + */ + protected abstract Object doGetTransaction(ReactiveTransactionSynchronizationManager synchronizationManager) throws TransactionException; + + /** + * Check if the given transaction object indicates an existing transaction + * (that is, a transaction which has already started). + *

The result will be evaluated according to the specified propagation + * behavior for the new transaction. An existing transaction might get + * suspended (in case of PROPAGATION_REQUIRES_NEW), or the new transaction + * might participate in the existing one (in case of PROPAGATION_REQUIRED). + *

The default implementation returns {@code false}, assuming that + * participating in existing transactions is generally not supported. + * Subclasses are of course encouraged to provide such support. + * @param transaction transaction object returned by doGetTransaction + * @return if there is an existing transaction + * @throws TransactionException in case of system errors + * @see #doGetTransaction + */ + protected boolean isExistingTransaction(Object transaction) throws TransactionException { + return false; + } + + /** + * Begin a new transaction with semantics according to the given transaction + * definition. Does not have to care about applying the propagation behavior, + * as this has already been handled by this abstract manager. + *

This method gets called when the transaction manager has decided to actually + * start a new transaction. Either there wasn't any transaction before, or the + * previous transaction has been suspended. + *

A special scenario is a nested transaction without savepoint: If + * {@code useSavepointForNestedTransaction()} returns "false", this method + * will be called to start a nested transaction when necessary. In such a context, + * there will be an active transaction: The implementation of this method has + * to detect this and start an appropriate nested transaction. + * @param synchronizationManager the synchronization manager bound to the new transaction + * @param transaction transaction object returned by {@code doGetTransaction} + * @param definition a TransactionDefinition instance, describing propagation + * behavior, isolation level, read-only flag, timeout, and transaction name + * @throws TransactionException in case of creation or system errors + */ + protected abstract Mono doBegin(ReactiveTransactionSynchronizationManager synchronizationManager, Object transaction, TransactionDefinition definition) + throws TransactionException; + + /** + * Suspend the resources of the current transaction. + * Transaction synchronization will already have been suspended. + *

The default implementation throws a TransactionSuspensionNotSupportedException, + * assuming that transaction suspension is generally not supported. + * @param synchronizationManager the synchronization manager bound to the current transaction + * @param transaction transaction object returned by {@code doGetTransaction} + * @return an object that holds suspended resources + * (will be kept unexamined for passing it into doResume) + * @throws org.springframework.transaction.TransactionSuspensionNotSupportedException if suspending is not supported by the transaction manager implementation + * @throws TransactionException in case of system errors + * @see #doResume + */ + protected Mono doSuspend(ReactiveTransactionSynchronizationManager synchronizationManager, Object transaction) throws TransactionException { + throw new TransactionSuspensionNotSupportedException( + "Transaction manager [" + getClass().getName() + "] does not support transaction suspension"); + } + + /** + * Resume the resources of the current transaction. + * Transaction synchronization will be resumed afterwards. + *

The default implementation throws a TransactionSuspensionNotSupportedException, + * assuming that transaction suspension is generally not supported. + * @param synchronizationManager the synchronization manager bound to the current transaction + * @param transaction transaction object returned by {@code doGetTransaction} + * @param suspendedResources the object that holds suspended resources, + * as returned by doSuspend + * @throws org.springframework.transaction.TransactionSuspensionNotSupportedException if resuming is not supported by the transaction manager implementation + * @throws TransactionException in case of system errors + * @see #doSuspend + */ + protected Mono doResume(ReactiveTransactionSynchronizationManager synchronizationManager, @Nullable Object transaction, Object suspendedResources) throws TransactionException { + throw new TransactionSuspensionNotSupportedException( + "Transaction manager [" + getClass().getName() + "] does not support transaction suspension"); + } + + /** + * Return whether to call {@code doCommit} on a transaction that has been + * marked as rollback-only in a global fashion. + *

Does not apply if an application locally sets the transaction to rollback-only + * via the TransactionStatus, but only to the transaction itself being marked as + * rollback-only by the transaction coordinator. + *

Default is "false": Local transaction strategies usually don't hold the rollback-only + * marker in the transaction itself, therefore they can't handle rollback-only transactions + * as part of transaction commit. Hence, AbstractReactiveTransactionManager will trigger + * a rollback in that case, throwing an UnexpectedRollbackException afterwards. + *

Override this to return "true" if the concrete transaction manager expects a + * {@code doCommit} call even for a rollback-only transaction, allowing for + * special handling there. This will, for example, be the case for JTA, where + * {@code UserTransaction.commit} will check the read-only flag itself and + * throw a corresponding RollbackException, which might include the specific reason + * (such as a transaction timeout). + *

If this method returns "true" but the {@code doCommit} implementation does not + * throw an exception, this transaction manager will throw an UnexpectedRollbackException + * itself. + * @see #doCommit + * @see DefaultReactiveTransactionStatus#isGlobalRollbackOnly() + * @see DefaultReactiveTransactionStatus#isLocalRollbackOnly() + * @see ReactiveTransactionStatus#setRollbackOnly() + * @see org.springframework.transaction.UnexpectedRollbackException + */ + protected boolean shouldCommitOnGlobalRollbackOnly() { + return false; + } + + /** + * Make preparations for commit, to be performed before the + * {@code beforeCommit} synchronization callbacks occur. + *

Note that exceptions will get propagated to the commit caller + * and cause a rollback of the transaction. + * @param synchronizationManager the synchronization manager bound to the current transaction + * @param status the status representation of the transaction + * @throws RuntimeException in case of errors; will be propagated to the caller + * (note: do not throw TransactionException subclasses here!) + */ + protected Mono prepareForCommit(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) { + return Mono.empty(); + } + + /** + * Perform an actual commit of the given transaction. + *

An implementation does not need to check the "new transaction" flag + * or the rollback-only flag; this will already have been handled before. + * Usually, a straight commit will be performed on the transaction object + * contained in the passed-in status. + * @param synchronizationManager the synchronization manager bound to the current transaction + * @param status the status representation of the transaction + * @throws TransactionException in case of commit or system errors + * @see DefaultReactiveTransactionStatus#getTransaction + */ + protected abstract Mono doCommit(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) throws TransactionException; + + /** + * Perform an actual rollback of the given transaction. + *

An implementation does not need to check the "new transaction" flag; + * this will already have been handled before. Usually, a straight rollback + * will be performed on the transaction object contained in the passed-in status. + * @param synchronizationManager the synchronization manager bound to the current transaction + * @param status the status representation of the transaction + * @throws TransactionException in case of system errors + * @see DefaultReactiveTransactionStatus#getTransaction + */ + protected abstract Mono doRollback(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) throws TransactionException; + + /** + * Set the given transaction rollback-only. Only called on rollback + * if the current transaction participates in an existing one. + *

The default implementation throws an IllegalTransactionStateException, + * assuming that participating in existing transactions is generally not + * supported. Subclasses are of course encouraged to provide such support. + * @param synchronizationManager the synchronization manager bound to the current transaction + * @param status the status representation of the transaction + * @throws TransactionException in case of system errors + */ + protected Mono doSetRollbackOnly(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) throws TransactionException { + throw new IllegalTransactionStateException( + "Participating in existing transactions is not supported - when 'isExistingTransaction' " + + "returns true, appropriate 'doSetRollbackOnly' behavior must be provided"); + } + + /** + * Register the given list of transaction synchronizations with the existing transaction. + *

Invoked when the control of the Spring transaction manager and thus all Spring + * transaction synchronizations end, without the transaction being completed yet. This + * is for example the case when participating in an existing JTA or EJB CMT transaction. + *

The default implementation simply invokes the {@code afterCompletion} methods + * immediately, passing in "STATUS_UNKNOWN". This is the best we can do if there's no + * chance to determine the actual outcome of the outer transaction. + * @param synchronizationManager the synchronization manager bound to the current transaction + * @param transaction transaction object returned by {@code doGetTransaction} + * @param synchronizations a List of ReactiveTransactionSynchronization objects + * @throws TransactionException in case of system errors + * @see #invokeAfterCompletion(ReactiveTransactionSynchronizationManager, List, int) + * @see ReactiveTransactionSynchronization#afterCompletion(int) + * @see ReactiveTransactionSynchronization#STATUS_UNKNOWN + */ + protected Mono registerAfterCompletionWithExistingTransaction(ReactiveTransactionSynchronizationManager synchronizationManager, + Object transaction, List synchronizations) throws TransactionException { + + logger.debug("Cannot register Spring after-completion synchronization with existing transaction - " + + "processing Spring after-completion callbacks immediately, with outcome status 'unknown'"); + return invokeAfterCompletion(synchronizationManager, synchronizations, ReactiveTransactionSynchronization.STATUS_UNKNOWN); + } + + /** + * Cleanup resources after transaction completion. + *

Called after {@code doCommit} and {@code doRollback} execution, + * on any outcome. The default implementation does nothing. + *

Should not throw any exceptions but just issue warnings on errors. + * @param synchronizationManager the synchronization manager bound to the current transaction + * @param transaction transaction object returned by {@code doGetTransaction} + */ + protected Mono doCleanupAfterCompletion(ReactiveTransactionSynchronizationManager synchronizationManager, Object transaction) { + return Mono.empty(); + } + + //--------------------------------------------------------------------- + // Serialization support + //--------------------------------------------------------------------- + + private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException { + // Rely on default serialization; just initialize state after deserialization. + ois.defaultReadObject(); + + // Initialize transient fields. + this.logger = LogFactory.getLog(getClass()); + } + + + /** + * Holder for suspended resources. + * Used internally by {@code suspend} and {@code resume}. + */ + protected static final class SuspendedResourcesHolder { + + @Nullable + private final Object suspendedResources; + + @Nullable + private List suspendedSynchronizations; + + @Nullable + private String name; + + private boolean readOnly; + + @Nullable + private Integer isolationLevel; + + private boolean wasActive; + + private SuspendedResourcesHolder(Object suspendedResources) { + this.suspendedResources = suspendedResources; + } + + private SuspendedResourcesHolder( + @Nullable Object suspendedResources, List suspendedSynchronizations, + @Nullable String name, boolean readOnly, @Nullable Integer isolationLevel, boolean wasActive) { + + this.suspendedResources = suspendedResources; + this.suspendedSynchronizations = suspendedSynchronizations; + this.name = name; + this.readOnly = readOnly; + this.isolationLevel = isolationLevel; + this.wasActive = wasActive; + } + } + + /** + * Predicates for exception types that transactional error handling applies to. + */ + enum ErrorPredicates implements Predicate { + + /** + * Predicate matching {@link RuntimeException} or {@link Error}. + */ + RuntimeOrError { + @Override + public boolean test(Throwable throwable) { + return throwable instanceof RuntimeException || throwable instanceof Error; + } + }, + + /** + * Predicate matching {@link TransactionException}. + */ + TransactionException { + @Override + public boolean test(Throwable throwable) { + return throwable instanceof TransactionException; + } + }, + + /** + * Predicate matching {@link UnexpectedRollbackException}. + */ + UnexpectedRollback { + @Override + public boolean test(Throwable throwable) { + return throwable instanceof UnexpectedRollbackException; + } + }; + + @Override + public abstract boolean test(Throwable throwable); + } + +} diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionStatus.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionStatus.java new file mode 100644 index 00000000000..f3466906cb8 --- /dev/null +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionStatus.java @@ -0,0 +1,105 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.transaction.reactive; + +import reactor.core.publisher.Mono; + +import org.springframework.transaction.ReactiveTransactionStatus; + +/** + * Abstract base implementation of the {@link ReactiveTransactionStatus} interface. + * + *

Pre-implements the handling of local rollback-only and completed flags. + * + *

Does not assume any specific internal transaction handling, such as an + * underlying transaction object, and no transaction synchronization mechanism. + * + * @author Mark Paluch + * @since 5.2 + * @see #setRollbackOnly() + * @see #isRollbackOnly() + * @see #setCompleted() + * @see #isCompleted() + * @see DefaultReactiveTransactionStatus + */ +public abstract class AbstractReactiveTransactionStatus implements ReactiveTransactionStatus { + + private boolean rollbackOnly = false; + + private boolean completed = false; + + + //--------------------------------------------------------------------- + // Handling of current transaction state + //--------------------------------------------------------------------- + + @Override + public void setRollbackOnly() { + this.rollbackOnly = true; + } + + /** + * Determine the rollback-only flag via checking both the local rollback-only flag + * of this TransactionStatus and the global rollback-only flag of the underlying + * transaction, if any. + * @see #isLocalRollbackOnly() + * @see #isGlobalRollbackOnly() + */ + @Override + public boolean isRollbackOnly() { + return (isLocalRollbackOnly() || isGlobalRollbackOnly()); + } + + /** + * Determine the rollback-only flag via checking this ReactiveTransactionStatus. + *

Will only return "true" if the application called {@code setRollbackOnly} + * on this TransactionStatus object. + */ + public boolean isLocalRollbackOnly() { + return this.rollbackOnly; + } + + /** + * Template method for determining the global rollback-only flag of the + * underlying transaction, if any. + *

This implementation always returns {@code false}. + */ + public boolean isGlobalRollbackOnly() { + return false; + } + + /** + * This implementations is empty, considering flush as a no-op. + */ + @Override + public Mono flush() { + return Mono.empty(); + } + + /** + * Mark this transaction as completed, that is, committed or rolled back. + */ + public void setCompleted() { + this.completed = true; + } + + @Override + public boolean isCompleted() { + return this.completed; + } + +} diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/DefaultReactiveTransactionStatus.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/DefaultReactiveTransactionStatus.java new file mode 100644 index 00000000000..978b33d0462 --- /dev/null +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/DefaultReactiveTransactionStatus.java @@ -0,0 +1,138 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.transaction.reactive; + +import org.springframework.lang.Nullable; +import org.springframework.transaction.ReactiveTransactionStatus; +import org.springframework.util.Assert; + +/** + * Default implementation of the {@link ReactiveTransactionStatus} + * interface, used by {@link AbstractReactiveTransactionManager}. Based on the concept + * of an underlying "transaction object". + * + *

Holds all status information that {@link AbstractReactiveTransactionManager} + * needs internally, including a generic transaction object determined by the + * concrete transaction manager implementation. + * + *

NOTE: This is not intended for use with other ReactiveTransactionManager + * implementations, in particular not for mock transaction managers in testing environments. + * + * @author Mark Paluch + * @since 5.2 + * @see AbstractReactiveTransactionManager + * @see #getTransaction + */ +public class DefaultReactiveTransactionStatus extends AbstractReactiveTransactionStatus { + + @Nullable + private final Object transaction; + + private final boolean newTransaction; + + private final boolean newSynchronization; + + private final boolean readOnly; + + private final boolean debug; + + @Nullable + private final Object suspendedResources; + + + /** + * Create a new {@code DefaultReactiveTransactionStatus} instance. + * @param transaction underlying transaction object that can hold state + * for the internal transaction implementation + * @param newTransaction if the transaction is new, otherwise participating + * in an existing transaction + * @param newSynchronization if a new transaction synchronization has been + * opened for the given transaction + * @param readOnly whether the transaction is marked as read-only + * @param debug should debug logging be enabled for the handling of this transaction? + * Caching it in here can prevent repeated calls to ask the logging system whether + * debug logging should be enabled. + * @param suspendedResources a holder for resources that have been suspended + * for this transaction, if any + */ + public DefaultReactiveTransactionStatus( + @Nullable Object transaction, boolean newTransaction, boolean newSynchronization, + boolean readOnly, boolean debug, @Nullable Object suspendedResources) { + this.transaction = transaction; + this.newTransaction = newTransaction; + this.newSynchronization = newSynchronization; + this.readOnly = readOnly; + this.debug = debug; + this.suspendedResources = suspendedResources; + } + + + /** + * Return the underlying transaction object. + * @throws IllegalStateException if no transaction is active + */ + public Object getTransaction() { + Assert.state(this.transaction != null, "No transaction active"); + return this.transaction; + } + + /** + * Return whether there is an actual transaction active. + */ + public boolean hasTransaction() { + return (this.transaction != null); + } + + @Override + public boolean isNewTransaction() { + return (hasTransaction() && this.newTransaction); + } + + /** + * Return if a new transaction synchronization has been opened + * for this transaction. + */ + public boolean isNewSynchronization() { + return this.newSynchronization; + } + + /** + * Return if this transaction is defined as read-only transaction. + */ + public boolean isReadOnly() { + return this.readOnly; + } + + /** + * Return whether the progress of this transaction is debugged. This is used by + * {@link AbstractReactiveTransactionManager} as an optimization, to prevent repeated + * calls to {@code logger.isDebugEnabled()}. Not really intended for client code. + */ + public boolean isDebug() { + return this.debug; + } + + /** + * Return the holder for resources that have been suspended for this transaction, + * if any. + */ + @Nullable + public Object getSuspendedResources() { + return this.suspendedResources; + } + +} diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/DefaultTransactionalOperator.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/DefaultTransactionalOperator.java new file mode 100644 index 00000000000..77ce6ee90c5 --- /dev/null +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/DefaultTransactionalOperator.java @@ -0,0 +1,137 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.transaction.reactive; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.TransactionException; +import org.springframework.transaction.TransactionSystemException; +import org.springframework.transaction.ReactiveTransactionManager; +import org.springframework.transaction.ReactiveTransactionStatus; +import org.springframework.transaction.support.DefaultTransactionDefinition; +import org.springframework.util.Assert; + +/** + * Operator class that simplifies programmatic transaction demarcation and + * transaction exception handling. + * + * @author Mark Paluch + * @since 5.2 + * @see #execute + * @see ReactiveTransactionManager + */ +@SuppressWarnings("serial") +class DefaultTransactionalOperator extends DefaultTransactionDefinition + implements TransactionalOperator { + + private final Log logger = LogFactory.getLog(getClass()); + + private final ReactiveTransactionManager transactionManager; + + /** + * Construct a new DefaultTransactionalOperator using the given transaction manager. + * @param transactionManager the transaction management strategy to be used + */ + DefaultTransactionalOperator(ReactiveTransactionManager transactionManager) { + Assert.notNull(transactionManager, "ReactiveTransactionManager must not be null"); + this.transactionManager = transactionManager; + } + + /** + * Construct a new TransactionTemplate using the given transaction manager, + * taking its default settings from the given transaction definition. + * @param transactionManager the transaction management strategy to be used + * @param transactionDefinition the transaction definition to copy the + * default settings from. Local properties can still be set to change values. + */ + DefaultTransactionalOperator(ReactiveTransactionManager transactionManager, TransactionDefinition transactionDefinition) { + super(transactionDefinition); + Assert.notNull(transactionManager, "ReactiveTransactionManager must not be null"); + this.transactionManager = transactionManager; + } + + + /** + * Return the transaction management strategy to be used. + */ + public ReactiveTransactionManager getTransactionManager() { + return this.transactionManager; + } + + @Override + public Flux execute(ReactiveTransactionCallback action) throws TransactionException { + + return TransactionContextManager.currentContext().flatMapMany(context -> { + + Mono status = this.transactionManager.getTransaction(this); + + return status.flatMapMany(it -> { + + // This is an around advice: Invoke the next interceptor in the chain. + // This will normally result in a target object being invoked. + Flux retVal = Flux.from(action.doInTransaction(it)); + + return retVal.onErrorResume(ex -> { + // Transactional code threw application exception -> rollback + return rollbackOnException(it, ex).then(Mono.error(ex)); + }).materialize().flatMap(signal -> { + + if (signal.isOnComplete()) { + return transactionManager.commit(it).materialize(); + } + + return Mono.just(signal); + }).dematerialize(); + }); + }) + .subscriberContext(TransactionContextManager.getOrCreateContext()) + .subscriberContext(TransactionContextManager.getOrCreateContextHolder()); + } + + /** + * Perform a rollback, handling rollback exceptions properly. + * @param status object representing the transaction + * @param ex the thrown application exception or error + * @throws TransactionException in case of a rollback error + */ + private Mono rollbackOnException(ReactiveTransactionStatus status, Throwable ex) throws TransactionException { + + logger.debug("Initiating transaction rollback on application exception", ex); + + return this.transactionManager.rollback(status).onErrorMap(ex2 -> { + + logger.error("Application exception overridden by rollback exception", ex); + + if (ex2 instanceof TransactionSystemException) { + ((TransactionSystemException) ex2).initApplicationException(ex); + } + return ex2; + } + ); + } + + @Override + public boolean equals(Object other) { + return (this == other || (super.equals(other) && (!(other instanceof DefaultTransactionalOperator) || + getTransactionManager() == ((DefaultTransactionalOperator) other).getTransactionManager()))); + } + +} diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveResourceHolderSynchronization.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveResourceHolderSynchronization.java new file mode 100644 index 00000000000..1f24ef43640 --- /dev/null +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveResourceHolderSynchronization.java @@ -0,0 +1,215 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.transaction.reactive; + +import org.springframework.transaction.support.ResourceHolder; +import org.springframework.transaction.support.TransactionSynchronizationManager; + +import reactor.core.publisher.Mono; + +/** + * {@link ReactiveTransactionSynchronization} implementation that manages a + * {@link ResourceHolder} bound through {@link ReactiveTransactionSynchronizationManager}. + * + * @author Mark Paluch + * @since 5.2 + * @param the resource holder type + * @param the resource key type + */ +public abstract class ReactiveResourceHolderSynchronization + implements ReactiveTransactionSynchronization { + + private final H resourceHolder; + + private final K resourceKey; + + private final ReactiveTransactionSynchronizationManager synchronizationManager; + + private volatile boolean holderActive = true; + + + /** + * Create a new ResourceHolderSynchronization for the given holder. + * @param resourceHolder the ResourceHolder to manage + * @param resourceKey the key to bind the ResourceHolder for + * @param synchronizationManager the synchronization manager bound to the current transaction + * @see TransactionSynchronizationManager#bindResource + */ + public ReactiveResourceHolderSynchronization(H resourceHolder, K resourceKey, ReactiveTransactionSynchronizationManager synchronizationManager) { + this.resourceHolder = resourceHolder; + this.resourceKey = resourceKey; + this.synchronizationManager = synchronizationManager; + } + + + @Override + public Mono suspend() { + if (this.holderActive) { + synchronizationManager.unbindResource(this.resourceKey); + } + return Mono.empty(); + } + + @Override + public Mono resume() { + if (this.holderActive) { + synchronizationManager.bindResource(this.resourceKey, this.resourceHolder); + } + return Mono.empty(); + } + + @Override + public Mono flush() { + return flushResource(this.resourceHolder); + } + + @Override + public Mono beforeCommit(boolean readOnly) { + return Mono.empty(); + } + + @Override + public Mono beforeCompletion() { + if (shouldUnbindAtCompletion()) { + synchronizationManager.unbindResource(this.resourceKey); + this.holderActive = false; + if (shouldReleaseBeforeCompletion()) { + return releaseResource(this.resourceHolder, this.resourceKey); + } + } + + return Mono.empty(); + } + + @Override + public Mono afterCommit() { + if (!shouldReleaseBeforeCompletion()) { + return processResourceAfterCommit(this.resourceHolder); + } + + return Mono.empty(); + } + + @Override + public Mono afterCompletion(int status) { + + return Mono.defer(() -> { + + Mono sync = Mono.empty(); + if (shouldUnbindAtCompletion()) { + boolean releaseNecessary = false; + if (this.holderActive) { + // The thread-bound resource holder might not be available anymore, + // since afterCompletion might get called from a different thread. + this.holderActive = false; + synchronizationManager.unbindResourceIfPossible(this.resourceKey); + this.resourceHolder.unbound(); + releaseNecessary = true; + } else { + releaseNecessary = shouldReleaseAfterCompletion(this.resourceHolder); + } + if (releaseNecessary) { + sync = releaseResource(this.resourceHolder, this.resourceKey); + } + } else { + // Probably a pre-bound resource... + sync = cleanupResource(this.resourceHolder, this.resourceKey, (status == STATUS_COMMITTED)); + } + ; + return sync.doFinally(s -> this.resourceHolder.reset()); + }); + } + + + /** + * Return whether this holder should be unbound at completion + * (or should rather be left bound to the thread after the transaction). + *

The default implementation returns {@code true}. + */ + protected boolean shouldUnbindAtCompletion() { + return true; + } + + /** + * Return whether this holder's resource should be released before + * transaction completion ({@code true}) or rather after + * transaction completion ({@code false}). + *

Note that resources will only be released when they are + * unbound from the thread ({@link #shouldUnbindAtCompletion()}). + *

The default implementation returns {@code true}. + * + * @see #releaseResource + */ + protected boolean shouldReleaseBeforeCompletion() { + return true; + } + + /** + * Return whether this holder's resource should be released after + * transaction completion ({@code true}). + *

The default implementation returns {@code !shouldReleaseBeforeCompletion()}, + * releasing after completion if no attempt was made before completion. + * + * @see #releaseResource + */ + protected boolean shouldReleaseAfterCompletion(H resourceHolder) { + return !shouldReleaseBeforeCompletion(); + } + + /** + * Flush callback for the given resource holder. + * + * @param resourceHolder the resource holder to flush + */ + protected Mono flushResource(H resourceHolder) { + return Mono.empty(); + } + + /** + * After-commit callback for the given resource holder. + * Only called when the resource hasn't been released yet + * ({@link #shouldReleaseBeforeCompletion()}). + * + * @param resourceHolder the resource holder to process + */ + protected Mono processResourceAfterCommit(H resourceHolder) { + return Mono.empty(); + } + + /** + * Release the given resource (after it has been unbound from the thread). + * + * @param resourceHolder the resource holder to process + * @param resourceKey the key that the ResourceHolder was bound for + */ + protected Mono releaseResource(H resourceHolder, K resourceKey) { + return Mono.empty(); + } + + /** + * Perform a cleanup on the given resource (which is left bound to the thread). + * + * @param resourceHolder the resource holder to process + * @param resourceKey the key that the ResourceHolder was bound for + * @param committed whether the transaction has committed ({@code true}) + * or rolled back ({@code false}) + */ + protected Mono cleanupResource(H resourceHolder, K resourceKey, boolean committed) { + return Mono.empty(); + } + +} diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionCallback.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionCallback.java new file mode 100644 index 00000000000..0409620d410 --- /dev/null +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionCallback.java @@ -0,0 +1,56 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.transaction.reactive; + +import org.reactivestreams.Publisher; + +import org.springframework.transaction.ReactiveTransactionStatus; + +/** + * Callback interface for reactive transactional code. Used with {@link TransactionalOperator}'s + * {@code execute} method, often as anonymous class within a method implementation. + * + *

Typically used to assemble various calls to transaction-unaware data access + * services into a higher-level service method with transaction demarcation. As an + * alternative, consider the use of declarative transaction demarcation (e.g. through + * Spring's {@link org.springframework.transaction.annotation.Transactional} annotation). + * + * @author Mark Paluch + * @since 5.2 + * @see TransactionalOperator + * @param the result type + */ +@FunctionalInterface +public interface ReactiveTransactionCallback { + + /** + * Gets called by {@link TransactionalOperator#transactional} within a transactional context. + * Does not need to care about transactions itself, although it can retrieve and + * influence the status of the current transaction via the given status object, + * e.g. setting rollback-only. + *

Allows for returning a result object created within the transaction, i.e. a + * domain object or a collection of domain objects. A RuntimeException thrown by the + * callback is treated as application exception that enforces a rollback. Any such + * exception will be propagated to the caller of the template, unless there is a + * problem rolling back, in which case a TransactionException will be thrown. + * @param status associated transaction status + * @return a result publisher + * @see TransactionalOperator#transactional + */ + Publisher doInTransaction(ReactiveTransactionStatus status); + +} diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronization.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronization.java new file mode 100644 index 00000000000..1ec9a58fb94 --- /dev/null +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronization.java @@ -0,0 +1,152 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.transaction.reactive; + +import reactor.core.publisher.Mono; + +import org.springframework.transaction.ReactiveTransactionStatus; + +/** + * Interface for reactive transaction synchronization callbacks. + * Supported by {@link AbstractReactiveTransactionManager}. + * + *

ReactiveTransactionSynchronization implementations can implement the + * {@link org.springframework.core.Ordered} interface to influence their execution order. + * A synchronization that does not implement the {@link org.springframework.core.Ordered} + * interface is appended to the end of the synchronization chain. + * + *

System synchronizations performed by Spring itself use specific order values, + * allowing for fine-grained interaction with their execution order (if necessary). + * + * @author Mark Paluch + * @since 5.2 + * @see ReactiveTransactionSynchronizationManager + * @see AbstractReactiveTransactionManager + */ +public interface ReactiveTransactionSynchronization { + + /** Completion status in case of proper commit. */ + int STATUS_COMMITTED = 0; + + /** Completion status in case of proper rollback. */ + int STATUS_ROLLED_BACK = 1; + + /** Completion status in case of heuristic mixed completion or system errors. */ + int STATUS_UNKNOWN = 2; + + + /** + * Suspend this synchronization. + * Supposed to unbind resources from TransactionSynchronizationManager if managing any. + * @see ReactiveTransactionSynchronizationManager#unbindResource + */ + default Mono suspend() { + return Mono.empty(); + } + + /** + * Resume this synchronization. + * Supposed to rebind resources to TransactionSynchronizationManager if managing any. + * @see ReactiveTransactionSynchronizationManager#bindResource + */ + default Mono resume() { + return Mono.empty(); + } + + /** + * Flush the underlying session to the datastore, if applicable. + * @see ReactiveTransactionStatus#flush() + */ + default Mono flush() { + return Mono.empty(); + } + + /** + * Invoked before transaction commit (before "beforeCompletion"). + * Can e.g. flush transactional O/R Mapping sessions to the database. + *

This callback does not mean that the transaction will actually be committed. + * A rollback decision can still occur after this method has been called. This callback + * is rather meant to perform work that's only relevant if a commit still has a chance + * to happen, such as flushing SQL statements to the database. + *

Note that exceptions will get propagated to the commit caller and cause a + * rollback of the transaction. + * @param readOnly whether the transaction is defined as read-only transaction + * @throws RuntimeException in case of errors; will be propagated to the caller + * (note: do not throw TransactionException subclasses here!) + * @see #beforeCompletion + */ + default Mono beforeCommit(boolean readOnly) { + return Mono.empty(); + } + + /** + * Invoked before transaction commit/rollback. + * Can perform resource cleanup before transaction completion. + *

This method will be invoked after {@code beforeCommit}, even when + * {@code beforeCommit} threw an exception. This callback allows for + * closing resources before transaction completion, for any outcome. + * @throws RuntimeException in case of errors; will be logged but not propagated + * (note: do not throw TransactionException subclasses here!) + * @see #beforeCommit + * @see #afterCompletion + */ + default Mono beforeCompletion() { + return Mono.empty(); + } + + /** + * Invoked after transaction commit. Can perform further operations right + * after the main transaction has successfully committed. + *

Can e.g. commit further operations that are supposed to follow on a successful + * commit of the main transaction, like confirmation messages or emails. + *

NOTE: The transaction will have been committed already, but the + * transactional resources might still be active and accessible. As a consequence, + * any data access code triggered at this point will still "participate" in the + * original transaction, allowing to perform some cleanup (with no commit following + * anymore!), unless it explicitly declares that it needs to run in a separate + * transaction. Hence: Use {@code PROPAGATION_REQUIRES_NEW} for any + * transactional operation that is called from here. + * @throws RuntimeException in case of errors; will be propagated to the caller + * (note: do not throw TransactionException subclasses here!) + */ + default Mono afterCommit() { + return Mono.empty(); + } + + /** + * Invoked after transaction commit/rollback. + * Can perform resource cleanup after transaction completion. + *

NOTE: The transaction will have been committed or rolled back already, + * but the transactional resources might still be active and accessible. As a + * consequence, any data access code triggered at this point will still "participate" + * in the original transaction, allowing to perform some cleanup (with no commit + * following anymore!), unless it explicitly declares that it needs to run in a + * separate transaction. Hence: Use {@code PROPAGATION_REQUIRES_NEW} + * for any transactional operation that is called from here. + * @param status completion status according to the {@code STATUS_*} constants + * @throws RuntimeException in case of errors; will be logged but not propagated + * (note: do not throw TransactionException subclasses here!) + * @see #STATUS_COMMITTED + * @see #STATUS_ROLLED_BACK + * @see #STATUS_UNKNOWN + * @see #beforeCompletion + */ + default Mono afterCompletion(int status) { + return Mono.empty(); + } + +} diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronizationManager.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronizationManager.java new file mode 100644 index 00000000000..30640062493 --- /dev/null +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronizationManager.java @@ -0,0 +1,446 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.transaction.reactive; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import reactor.core.publisher.Mono; + +import org.springframework.core.annotation.AnnotationAwareOrderComparator; +import org.springframework.lang.Nullable; +import org.springframework.transaction.NoTransactionException; +import org.springframework.transaction.support.ResourceHolder; +import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.util.Assert; + +/** + * Central delegate that manages resources and transaction synchronizations per + * subscriber context. + * To be used by resource management code but not by typical application code. + * + *

Supports one resource per key without overwriting, that is, a resource needs + * to be removed before a new one can be set for the same key. + * Supports a list of transaction synchronizations if synchronization is active. + * + *

Resource management code should check for context-bound resources, e.g. database + * connections, via {@code getResource}. Such code is + * normally not supposed to bind resources to units of work, as this is the responsibility + * of transaction managers. A further option is to lazily bind on first use if + * transaction synchronization is active, for performing transactions that span + * an arbitrary number of resources. + * + *

Transaction synchronization must be activated and deactivated by a transaction + * manager via {@link #initSynchronization()} and {@link #clearSynchronization()}. + * This is automatically supported by {@link AbstractReactiveTransactionManager}, + * and thus by all standard Spring transaction managers. + * + *

Resource management code should only register synchronizations when this + * manager is active, which can be checked via {@link #isSynchronizationActive}; + * it should perform immediate resource cleanup else. If transaction synchronization + * isn't active, there is either no current transaction, or the transaction manager + * doesn't support transaction synchronization. + * + *

Synchronization is for example used to always return the same resources + * within a transaction, e.g. a database connection for + * any given Connectionfactory or DatabaseFactory. + * + * @author Mark Paluch + * @since 5.2 + * @see #isSynchronizationActive + * @see #registerSynchronization + * @see TransactionSynchronization + * @see AbstractReactiveTransactionManager#setTransactionSynchronization + */ +public class ReactiveTransactionSynchronizationManager { + + private static final Log logger = LogFactory.getLog(ReactiveTransactionSynchronizationManager.class); + + private final TransactionContext transactionContext; + + + public ReactiveTransactionSynchronizationManager(TransactionContext transactionContext) { + this.transactionContext = transactionContext; + } + + + /** + * Return the ReactiveTransactionSynchronizationManager of the current transaction. + * Mainly intended for code that wants to bind resources or synchronizations. + * rollback-only but not throw an application exception. + * @throws NoTransactionException if the transaction info cannot be found, + * because the method was invoked outside a managed transaction. + */ + public static Mono currentTransaction() { + return TransactionContextManager.currentContext().map(ReactiveTransactionSynchronizationManager::new); + } + + /** + * Check if there is a resource for the given key bound to the current thread. + * + * @param key the key to check (usually the resource factory) + * @return if there is a value bound to the current thread + * @see ResourceTransactionManager#getResourceFactory() + */ + public boolean hasResource(Object key) { + Object actualKey = ReactiveTransactionSynchronizationUtils.unwrapResourceIfNecessary(key); + Object value = doGetResource(actualKey); + return (value != null); + } + + /** + * Retrieve a resource for the given key that is bound to the current thread. + * + * @param key the key to check (usually the resource factory) + * @return a value bound to the current thread (usually the active + * resource object), or {@code null} if none + * @see ResourceTransactionManager#getResourceFactory() + */ + @Nullable + public Object getResource(Object key) { + Object actualKey = ReactiveTransactionSynchronizationUtils.unwrapResourceIfNecessary(key); + Object value = doGetResource(actualKey); + if (value != null && logger.isTraceEnabled()) { + logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to context [" + + transactionContext.getName() + "]"); + } + return value; + } + + /** + * Actually check the value of the resource that is bound for the given key. + */ + @Nullable + private Object doGetResource(Object actualKey) { + Map map = transactionContext.getResources(); + Object value = map.get(actualKey); + // Transparently remove ResourceHolder that was marked as void... + if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) { + map.remove(actualKey); + value = null; + } + return value; + } + + /** + * Bind the given resource for the given key to the current context. + * + * @param key the key to bind the value to (usually the resource factory) + * @param value the value to bind (usually the active resource object) + * @throws IllegalStateException if there is already a value bound to the context + * @see ResourceTransactionManager#getResourceFactory() + */ + public void bindResource(Object key, Object value) throws IllegalStateException { + Object actualKey = ReactiveTransactionSynchronizationUtils.unwrapResourceIfNecessary(key); + Assert.notNull(value, "Value must not be null"); + Map map = transactionContext.getResources(); + Object oldValue = map.put(actualKey, value); + // Transparently suppress a ResourceHolder that was marked as void... + if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) { + oldValue = null; + } + if (oldValue != null) { + throw new IllegalStateException("Already value [" + oldValue + "] for key [" + + actualKey + "] bound to context [" + transactionContext.getName() + "]"); + } + if (logger.isTraceEnabled()) { + logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to context [" + + transactionContext.getName() + "]"); + } + } + + /** + * Unbind a resource for the given key from the current context. + * + * @param key the key to unbind (usually the resource factory) + * @return the previously bound value (usually the active resource object) + * @throws IllegalStateException if there is no value bound to the context + * @see ResourceTransactionManager#getResourceFactory() + */ + public Object unbindResource(Object key) throws IllegalStateException { + Object actualKey = ReactiveTransactionSynchronizationUtils.unwrapResourceIfNecessary(key); + Object value = doUnbindResource(actualKey); + if (value == null) { + throw new IllegalStateException( + "No value for key [" + actualKey + "] bound to context [" + transactionContext.getName() + "]"); + } + return value; + } + + /** + * Unbind a resource for the given key from the current context. + * + * @param key the key to unbind (usually the resource factory) + * @return the previously bound value, or {@code null} if none bound + */ + @Nullable + public Object unbindResourceIfPossible(Object key) { + Object actualKey = ReactiveTransactionSynchronizationUtils.unwrapResourceIfNecessary(key); + return doUnbindResource(actualKey); + } + + /** + * Actually remove the value of the resource that is bound for the given key. + */ + @Nullable + private Object doUnbindResource(Object actualKey) { + Map map = transactionContext.getResources(); + Object value = map.remove(actualKey); + // Transparently suppress a ResourceHolder that was marked as void... + if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) { + value = null; + } + if (value != null && logger.isTraceEnabled()) { + logger.trace("Removed value [" + value + "] for key [" + actualKey + "] from context [" + + transactionContext.getName() + "]"); + } + return value; + } + + //------------------------------------------------------------------------- + // Management of transaction synchronizations + //------------------------------------------------------------------------- + + /** + * Return if transaction synchronization is active for the current context. + * Can be called before register to avoid unnecessary instance creation. + * + * @see #registerSynchronization + */ + public boolean isSynchronizationActive() { + return (transactionContext.getSynchronizations() != null); + } + + /** + * Activate transaction synchronization for the current context. + * Called by a transaction manager on transaction begin. + * + * @throws IllegalStateException if synchronization is already active + */ + public void initSynchronization() throws IllegalStateException { + if (isSynchronizationActive()) { + throw new IllegalStateException("Cannot activate transaction synchronization - already active"); + } + logger.trace("Initializing transaction synchronization"); + transactionContext.setSynchronizations(new LinkedHashSet<>()); + } + + /** + * Register a new transaction synchronization for the current context. + * Typically called by resource management code. + *

Note that synchronizations can implement the + * {@link org.springframework.core.Ordered} interface. + * They will be executed in an order according to their order value (if any). + * + * @param synchronization the synchronization object to register + * @throws IllegalStateException if transaction synchronization is not active + * @see org.springframework.core.Ordered + */ + public void registerSynchronization(ReactiveTransactionSynchronization synchronization) + throws IllegalStateException { + + Assert.notNull(synchronization, "TransactionSynchronization must not be null"); + if (!isSynchronizationActive()) { + throw new IllegalStateException("Transaction synchronization is not active"); + } + transactionContext.getSynchronizations().add(synchronization); + } + + /** + * Return an unmodifiable snapshot list of all registered synchronizations + * for the current context. + * + * @return unmodifiable List of TransactionSynchronization instances + * @throws IllegalStateException if synchronization is not active + * @see TransactionSynchronization + */ + public List getSynchronizations() throws IllegalStateException { + Set synchs = transactionContext.getSynchronizations(); + if (synchs == null) { + throw new IllegalStateException("Transaction synchronization is not active"); + } + // Return unmodifiable snapshot, to avoid ConcurrentModificationExceptions + // while iterating and invoking synchronization callbacks that in turn + // might register further synchronizations. + if (synchs.isEmpty()) { + return Collections.emptyList(); + } else { + // Sort lazily here, not in registerSynchronization. + List sortedSynchs = new ArrayList<>(synchs); + AnnotationAwareOrderComparator.sort(sortedSynchs); + return Collections.unmodifiableList(sortedSynchs); + } + } + + /** + * Deactivate transaction synchronization for the current context. + * Called by the transaction manager on transaction cleanup. + * + * @throws IllegalStateException if synchronization is not active + */ + public void clearSynchronization() throws IllegalStateException { + if (!isSynchronizationActive()) { + throw new IllegalStateException("Cannot deactivate transaction synchronization - not active"); + } + logger.trace("Clearing transaction synchronization"); + transactionContext.setSynchronizations(null); + } + + //------------------------------------------------------------------------- + // Exposure of transaction characteristics + //------------------------------------------------------------------------- + + /** + * Expose the name of the current transaction, if any. + * Called by the transaction manager on transaction begin and on cleanup. + * + * @param name the name of the transaction, or {@code null} to reset it + * @see org.springframework.transaction.TransactionDefinition#getName() + */ + public void setCurrentTransactionName(@Nullable String name) { + transactionContext.setCurrentTransactionName(name); + } + + /** + * Return the name of the current transaction, or {@code null} if none set. + * To be called by resource management code for optimizations per use case, + * for example to optimize fetch strategies for specific named transactions. + * + * @see org.springframework.transaction.TransactionDefinition#getName() + */ + @Nullable + public String getCurrentTransactionName() { + return transactionContext.getCurrentTransactionName(); + } + + /** + * Expose a read-only flag for the current transaction. + * Called by the transaction manager on transaction begin and on cleanup. + * + * @param readOnly {@code true} to mark the current transaction + * as read-only; {@code false} to reset such a read-only marker + * @see org.springframework.transaction.TransactionDefinition#isReadOnly() + */ + public void setCurrentTransactionReadOnly(boolean readOnly) { + transactionContext.setCurrentTransactionReadOnly(readOnly); + } + + /** + * Return whether the current transaction is marked as read-only. + * To be called by resource management code when preparing a newly + * created resource (for example, a Hibernate Session). + *

Note that transaction synchronizations receive the read-only flag + * as argument for the {@code beforeCommit} callback, to be able + * to suppress change detection on commit. The present method is meant + * to be used for earlier read-only checks, for example to set the + * flush mode of a Hibernate Session to "FlushMode.NEVER" upfront. + * + * @see org.springframework.transaction.TransactionDefinition#isReadOnly() + * @see TransactionSynchronization#beforeCommit(boolean) + */ + public boolean isCurrentTransactionReadOnly() { + return transactionContext.isCurrentTransactionReadOnly(); + } + + /** + * Expose an isolation level for the current transaction. + * Called by the transaction manager on transaction begin and on cleanup. + * + * @param isolationLevel the isolation level to expose, according to the + * R2DBC Connection constants (equivalent to the corresponding Spring + * TransactionDefinition constants), or {@code null} to reset it + * @see org.springframework.transaction.TransactionDefinition#ISOLATION_READ_UNCOMMITTED + * @see org.springframework.transaction.TransactionDefinition#ISOLATION_READ_COMMITTED + * @see org.springframework.transaction.TransactionDefinition#ISOLATION_REPEATABLE_READ + * @see org.springframework.transaction.TransactionDefinition#ISOLATION_SERIALIZABLE + * @see org.springframework.transaction.TransactionDefinition#getIsolationLevel() + */ + public void setCurrentTransactionIsolationLevel(@Nullable Integer isolationLevel) { + transactionContext.setCurrentTransactionIsolationLevel(isolationLevel); + } + + /** + * Return the isolation level for the current transaction, if any. + * To be called by resource management code when preparing a newly + * created resource (for example, a R2DBC Connection). + * + * @return the currently exposed isolation level, according to the + * R2DBC Connection constants (equivalent to the corresponding Spring + * TransactionDefinition constants), or {@code null} if none + * @see org.springframework.transaction.TransactionDefinition#ISOLATION_READ_UNCOMMITTED + * @see org.springframework.transaction.TransactionDefinition#ISOLATION_READ_COMMITTED + * @see org.springframework.transaction.TransactionDefinition#ISOLATION_REPEATABLE_READ + * @see org.springframework.transaction.TransactionDefinition#ISOLATION_SERIALIZABLE + * @see org.springframework.transaction.TransactionDefinition#getIsolationLevel() + */ + @Nullable + public Integer getCurrentTransactionIsolationLevel() { + return transactionContext.getCurrentTransactionIsolationLevel(); + } + + /** + * Expose whether there currently is an actual transaction active. + * Called by the transaction manager on transaction begin and on cleanup. + * + * @param active {@code true} to mark the current context as being associated + * with an actual transaction; {@code false} to reset that marker + */ + public void setActualTransactionActive(boolean active) { + transactionContext.setActualTransactionActive(active); + } + + /** + * Return whether there currently is an actual transaction active. + * This indicates whether the current context is associated with an actual + * transaction rather than just with active transaction synchronization. + *

To be called by resource management code that wants to discriminate + * between active transaction synchronization (with or without backing + * resource transaction; also on PROPAGATION_SUPPORTS) and an actual + * transaction being active (with backing resource transaction; + * on PROPAGATION_REQUIRED, PROPAGATION_REQUIRES_NEW, etc). + * + * @see #isSynchronizationActive() + */ + public boolean isActualTransactionActive() { + return transactionContext.isActualTransactionActive(); + } + + /** + * Clear the entire transaction synchronization state: + * registered synchronizations as well as the various transaction characteristics. + * + * @see #clearSynchronization() + * @see #setCurrentTransactionName + * @see #setCurrentTransactionReadOnly + * @see #setCurrentTransactionIsolationLevel + * @see #setActualTransactionActive + */ + public void clear() { + transactionContext.clear(); + } + + private Map getResources() { + return transactionContext.getResources(); + } + +} diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronizationUtils.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronizationUtils.java new file mode 100644 index 00000000000..677cb8122a1 --- /dev/null +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronizationUtils.java @@ -0,0 +1,202 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.transaction.reactive; + +import java.util.Collection; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.aop.scope.ScopedObject; +import org.springframework.core.InfrastructureProxy; +import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; + +/** + * Utility methods for triggering specific {@link ReactiveTransactionSynchronization} + * callback methods on all currently registered synchronizations. + * + * @author Mark Paluch + * @since 5.2 + * @see ReactiveTransactionSynchronization + * @see ReactiveTransactionSynchronizationManager#getSynchronizations() + */ +public abstract class ReactiveTransactionSynchronizationUtils { + + private static final Log logger = LogFactory.getLog(ReactiveTransactionSynchronizationUtils.class); + + private static final boolean aopAvailable = ClassUtils.isPresent( + "org.springframework.aop.scope.ScopedObject", ReactiveTransactionSynchronizationUtils.class.getClassLoader()); + + + /** + * Unwrap the given resource handle if necessary; otherwise return + * the given handle as-is. + * @see InfrastructureProxy#getWrappedObject() + */ + static Object unwrapResourceIfNecessary(Object resource) { + Assert.notNull(resource, "Resource must not be null"); + Object resourceRef = resource; + // unwrap infrastructure proxy + if (resourceRef instanceof InfrastructureProxy) { + resourceRef = ((InfrastructureProxy) resourceRef).getWrappedObject(); + } + if (aopAvailable) { + // now unwrap scoped proxy + resourceRef = ScopedProxyUnwrapper.unwrapIfNecessary(resourceRef); + } + return resourceRef; + } + + + /** + * Trigger {@code flush} callbacks on all currently registered synchronizations. + * @throws RuntimeException if thrown by a {@code flush} callback + * @see ReactiveTransactionSynchronization#flush() + */ + public static Mono triggerFlush() { + return TransactionContextManager.currentContext().flatMapIterable(TransactionContext::getSynchronizations).concatMap(ReactiveTransactionSynchronization::flush).then(); + } + + /** + * Trigger {@code beforeCommit} callbacks on all currently registered synchronizations. + * + * @param readOnly whether the transaction is defined as read-only transaction + * @throws RuntimeException if thrown by a {@code beforeCommit} callback + * @see ReactiveTransactionSynchronization#beforeCommit(boolean) + */ + public static Mono triggerBeforeCommit(boolean readOnly) { + return TransactionContextManager.currentContext() + .map(TransactionContext::getSynchronizations) + .flatMap(it -> triggerBeforeCommit(it, readOnly)).then(); + } + + /** + * Actually invoke the {@code triggerBeforeCommit} methods of the + * given Spring ReactiveTransactionSynchronization objects. + * + * @param synchronizations a List of ReactiveTransactionSynchronization objects + * @see ReactiveTransactionSynchronization#beforeCommit(boolean) + */ + public static Mono triggerBeforeCommit(Collection synchronizations, boolean readOnly) { + return Flux.fromIterable(synchronizations).concatMap(it -> it.beforeCommit(readOnly)) + .then(); + } + + /** + * Trigger {@code beforeCompletion} callbacks on all currently registered synchronizations. + * @see ReactiveTransactionSynchronization#beforeCompletion() + */ + public static Mono triggerBeforeCompletion() { + + return TransactionContextManager.currentContext() + .map(TransactionContext::getSynchronizations) + .flatMap(ReactiveTransactionSynchronizationUtils::triggerBeforeCompletion); + } + + /** + * Actually invoke the {@code beforeCompletion} methods of the + * given Spring ReactiveTransactionSynchronization objects. + * @param synchronizations a List of ReactiveTransactionSynchronization objects + * @see ReactiveTransactionSynchronization#beforeCompletion() + */ + public static Mono triggerBeforeCompletion(Collection synchronizations) { + + return Flux.fromIterable(synchronizations) + .concatMap(ReactiveTransactionSynchronization::beforeCompletion).onErrorContinue((t, o) -> { + logger.error("TransactionSynchronization.beforeCompletion threw exception", t); + }).then(); + } + + /** + * Trigger {@code afterCommit} callbacks on all currently registered synchronizations. + * @throws RuntimeException if thrown by a {@code afterCommit} callback + * @see ReactiveTransactionSynchronizationManager#getSynchronizations() + * @see ReactiveTransactionSynchronization#afterCommit() + */ + public static Mono triggerAfterCommit() { + return TransactionContextManager.currentContext() + .flatMap(it -> invokeAfterCommit(it.getSynchronizations())); + } + + /** + * Actually invoke the {@code afterCommit} methods of the + * given Spring ReactiveTransactionSynchronization objects. + * @param synchronizations a List of ReactiveTransactionSynchronization objects + * @see TransactionSynchronization#afterCommit() + */ + public static Mono invokeAfterCommit(Collection synchronizations) { + return Flux.fromIterable(synchronizations) + .concatMap(ReactiveTransactionSynchronization::afterCommit) + .then(); + } + + /** + * Trigger {@code afterCompletion} callbacks on all currently registered synchronizations. + * @param completionStatus the completion status according to the + * constants in the ReactiveTransactionSynchronization interface + * @see ReactiveTransactionSynchronizationManager#getSynchronizations() + * @see ReactiveTransactionSynchronization#afterCompletion(int) + * @see ReactiveTransactionSynchronization#STATUS_COMMITTED + * @see ReactiveTransactionSynchronization#STATUS_ROLLED_BACK + * @see ReactiveTransactionSynchronization#STATUS_UNKNOWN + */ + public static Mono triggerAfterCompletion(int completionStatus) { + return TransactionContextManager.currentContext() + .flatMap(it -> invokeAfterCompletion(it.getSynchronizations(), completionStatus)); + } + + /** + * Actually invoke the {@code afterCompletion} methods of the + * given Spring ReactiveTransactionSynchronization objects. + * @param synchronizations a List of ReactiveTransactionSynchronization objects + * @param completionStatus the completion status according to the + * constants in the ReactiveTransactionSynchronization interface + * @see ReactiveTransactionSynchronization#afterCompletion(int) + * @see ReactiveTransactionSynchronization#STATUS_COMMITTED + * @see ReactiveTransactionSynchronization#STATUS_ROLLED_BACK + * @see ReactiveTransactionSynchronization#STATUS_UNKNOWN + */ + public static Mono invokeAfterCompletion(Collection synchronizations, + int completionStatus) { + + return Flux.fromIterable(synchronizations).concatMap(it -> it.afterCompletion(completionStatus)) + .onErrorContinue((t, o) -> { + logger.error("TransactionSynchronization.afterCompletion threw exception", t); + }).then(); + } + + + /** + * Inner class to avoid hard-coded dependency on AOP module. + */ + private static class ScopedProxyUnwrapper { + + static Object unwrapIfNecessary(Object resource) { + if (resource instanceof ScopedObject) { + return ((ScopedObject) resource).getTargetObject(); + } + else { + return resource; + } + } + } + +} diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContext.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContext.java new file mode 100644 index 00000000000..b86f82a69c4 --- /dev/null +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContext.java @@ -0,0 +1,140 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.transaction.reactive; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.springframework.lang.Nullable; +import org.springframework.util.StringUtils; + +/** + * Mutable transaction context that encapsulates transactional synchronizations and + * resources in the scope of a single transaction. Transaction context is typically + * held by an outer {@link TransactionContextHolder} or referenced directly within + * from the subscriber context. + * + * @author Mark Paluch + * @since 5.2 + * @see TransactionContextManager + * @see reactor.util.context.Context + */ +public class TransactionContext { + + private final UUID contextId = UUID.randomUUID(); + + private final Map resources = new LinkedHashMap<>(); + + private @Nullable Set synchronizations; + + private volatile @Nullable String currentTransactionName; + + private volatile boolean currentTransactionReadOnly; + + private volatile @Nullable Integer currentTransactionIsolationLevel; + + private volatile boolean actualTransactionActive; + + private final @Nullable TransactionContext parent; + + + TransactionContext() { + this(null); + } + + TransactionContext(@Nullable TransactionContext parent) { + this.parent = parent; + } + + + public void clear() { + + synchronizations = null; + currentTransactionName = null; + currentTransactionReadOnly = false; + currentTransactionIsolationLevel = null; + actualTransactionActive = false; + } + + public String getName() { + + if (StringUtils.hasText(currentTransactionName)) { + return contextId + ": " + currentTransactionName; + } + + return contextId.toString(); + } + + public UUID getContextId() { + return contextId; + } + + public Map getResources() { + return resources; + } + + @Nullable + public Set getSynchronizations() { + return synchronizations; + } + + public void setSynchronizations(@org.springframework.lang.Nullable Set synchronizations) { + this.synchronizations = synchronizations; + } + + @Nullable + public String getCurrentTransactionName() { + return currentTransactionName; + } + + public void setCurrentTransactionName(@Nullable String currentTransactionName) { + this.currentTransactionName = currentTransactionName; + } + + public boolean isCurrentTransactionReadOnly() { + return currentTransactionReadOnly; + } + + public void setCurrentTransactionReadOnly(boolean currentTransactionReadOnly) { + this.currentTransactionReadOnly = currentTransactionReadOnly; + } + + @Nullable + public Integer getCurrentTransactionIsolationLevel() { + return currentTransactionIsolationLevel; + } + + public void setCurrentTransactionIsolationLevel(@Nullable Integer currentTransactionIsolationLevel) { + this.currentTransactionIsolationLevel = currentTransactionIsolationLevel; + } + + public boolean isActualTransactionActive() { + return actualTransactionActive; + } + + public void setActualTransactionActive(boolean actualTransactionActive) { + this.actualTransactionActive = actualTransactionActive; + } + + @Nullable + public TransactionContext getParent() { + return parent; + } + +} diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContextHolder.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContextHolder.java new file mode 100644 index 00000000000..5ae5f47fb04 --- /dev/null +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContextHolder.java @@ -0,0 +1,73 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.transaction.reactive; + +import java.util.Stack; + +import org.springframework.transaction.NoTransactionException; + +/** + * Mutable holder for reactive transaction {@link TransactionContext contexts}. + * This holder keeps references to individual {@link TransactionContext}s. + * @author Mark Paluch + * @since 5.2 + * @see TransactionContext + */ +class TransactionContextHolder { + + private final Stack transactionStack; + + + TransactionContextHolder(Stack transactionStack) { + this.transactionStack = transactionStack; + } + + + /** + * Return the current {@link TransactionContext}. + * @return the current {@link TransactionContext}. + * @throws NoTransactionException if no transaction is ongoing. + */ + TransactionContext currentContext() { + TransactionContext context = (transactionStack.isEmpty() ? null : transactionStack.peek()); + + if (context == null) { + throw new NoTransactionException("No transaction in context"); + } + + return context; + } + + /** + * Create a new {@link TransactionContext}. + * @return the new {@link TransactionContext}. + */ + TransactionContext createContext() { + TransactionContext context = (transactionStack.isEmpty() ? null : transactionStack.peek()); + + return (context == null ? transactionStack.push(new TransactionContext()) : + transactionStack.push(new TransactionContext(context))); + } + + /** + * Check whether the holder has a {@link TransactionContext}. + * @return {@literal true} if a {@link TransactionContext} is associated. + */ + boolean hasContext() { + return !transactionStack.isEmpty(); + } + +} diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContextManager.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContextManager.java new file mode 100644 index 00000000000..5071ce30214 --- /dev/null +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContextManager.java @@ -0,0 +1,124 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.transaction.reactive; + +import java.util.Stack; +import java.util.function.Function; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.context.Context; + +import org.springframework.transaction.NoTransactionException; + +/** + * Delegate to register and obtain transactional contexts. + *

+ * Typically used by components that intercept or orchestrate transactional flows such as AOP interceptors or + * transactional operators. + * + * @author Mark Paluch + * @since 5.2 + * @see ReactiveTransactionSynchronization + */ +public abstract class TransactionContextManager { + + private TransactionContextManager() { + /* prevent instantiation */ + } + + + /** + * Obtain the current {@link TransactionContext} from the subscriber context or the + * transactional context holder. Context retrieval fails with NoTransactionException + * if no context or context holder is registered. + * @return the current {@link TransactionContext} + * @throws NoTransactionException if no TransactionContext was found in the subscriber context + * or no context found in a holder + */ + public static Mono currentContext() throws NoTransactionException { + + return Mono.subscriberContext().handle((ctx, sink) -> { + + if (ctx.hasKey(TransactionContext.class)) { + sink.next(ctx.get(TransactionContext.class)); + return; + } + + if (ctx.hasKey(TransactionContextHolder.class)) { + TransactionContextHolder holder = ctx.get(TransactionContextHolder.class); + if (holder.hasContext()) { + sink.next(holder.currentContext()); + return; + } + } + + sink.error(new NoTransactionException("No transaction in context")); + }); + } + + /** + * Create a {@link TransactionContext} and register it in the subscriber {@link Context}. + * @return functional context registration. + * @see Mono#subscriberContext(Function) + * @see Flux#subscriberContext(Function) + * @throws IllegalStateException if a transaction context is already associated. + */ + public static Function createTransactionContext() { + return context -> context.put(TransactionContext.class, new TransactionContext()); + } + + /** + * Return a {@link Function} to create or associate a new {@link TransactionContext}. + * Interaction with transactional resources through + * {@link ReactiveTransactionSynchronizationManager} requires a TransactionContext + * to be registered in the subscriber context. + * @return functional context registration. + */ + public static Function getOrCreateContext() { + return context -> { + + TransactionContextHolder holder = context.get(TransactionContextHolder.class); + + if (holder.hasContext()) { + context.put(TransactionContext.class, holder.currentContext()); + } + + return context.put(TransactionContext.class, holder.createContext()); + }; + } + + /** + * Return a {@link Function} to create or associate a new + * {@link TransactionContextHolder}. Creation and release of transactions + * within a reactive flow requires a mutable holder that follows a top to + * down execution scheme. Reactor's subscriber context follows a down to top + * approach regarding mutation visibility. + * @return functional context registration. + */ + public static Function getOrCreateContextHolder() { + + return context -> { + + if (!context.hasKey(TransactionContextHolder.class)) { + return context.put(TransactionContextHolder.class, new TransactionContextHolder(new Stack<>())); + } + return context; + }; + } + +} diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java new file mode 100644 index 00000000000..5faa80c36c1 --- /dev/null +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java @@ -0,0 +1,114 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.transaction.reactive; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.TransactionException; +import org.springframework.transaction.ReactiveTransactionManager; + +/** + * Operator class that simplifies programmatic transaction demarcation and + * transaction exception handling. + * + *

The central method is {@link #transactional}, supporting transactional wrapping + * of functional sequences code that. This operator handles the transaction lifecycle + * and possible exceptions such that neither the ReactiveTransactionCallback + * implementation nor the calling code needs to explicitly handle transactions. + * + *

Typical usage: Allows for writing low-level data access objects that use + * resources such as database connections but are not transaction-aware themselves. + * Instead, they can implicitly participate in transactions handled by higher-level + * application services utilizing this class, making calls to the low-level + * services via an inner-class callback object. + * + *

Can be used within a service implementation via direct instantiation with + * a transaction manager reference, or get prepared in an application context + * and passed to services as bean reference. Note: The transaction manager should + * always be configured as bean in the application context: in the first case given + * to the service directly, in the second case given to the prepared template. + * + *

Supports setting the propagation behavior and the isolation level by name, + * for convenient configuration in context definitions. + * + * @author Mark Paluch + * @since 5.2 + * @see #execute + * @see ReactiveTransactionManager + */ +public interface TransactionalOperator { + + /** + * Create a new {@link TransactionalOperator} using {@link ReactiveTransactionManager}. + * @param transactionManager the transaction management strategy to be used + * @return the transactional operator + */ + static TransactionalOperator create(ReactiveTransactionManager transactionManager){ + return new DefaultTransactionalOperator(transactionManager); + } + + /** + * Create a new {@link TransactionalOperator} using {@link ReactiveTransactionManager} + * and {@link TransactionDefinition}. + * + * @param transactionManager the transaction management strategy to be used + * @param transactionDefinition the transaction definition to apply. + * @return the transactional operator + */ + static TransactionalOperator create(ReactiveTransactionManager transactionManager, TransactionDefinition transactionDefinition){ + return new DefaultTransactionalOperator(transactionManager, transactionDefinition); + } + + + /** + * Wrap the functional sequence specified by the given Flux within a transaction. + * @param flux the Flux that should be executed within the transaction + * @return a result publisher returned by the callback, or {@code null} if none + * @throws TransactionException in case of initialization, rollback, or system errors + * @throws RuntimeException if thrown by the TransactionCallback + */ + default Flux transactional(Flux flux) { + return execute(it -> flux); + } + + /** + * Wrap the functional sequence specified by the given Mono within a transaction. + * @param mono the Mono that should be executed within the transaction + * @return a result publisher returned by the callback + * @throws TransactionException in case of initialization, rollback, or system errors + * @throws RuntimeException if thrown by the TransactionCallback + */ + default Mono transactional(Mono mono) { + return execute(it -> mono).next(); + } + + /** + * Execute the action specified by the given callback object within a transaction. + *

Allows for returning a result object created within the transaction, that is, + * a domain object or a collection of domain objects. A RuntimeException thrown + * by the callback is treated as a fatal exception that enforces a rollback. + * Such an exception gets propagated to the caller of the template. + * @param action the callback object that specifies the transactional action + * @return a result object returned by the callback + * @throws TransactionException in case of initialization, rollback, or system errors + * @throws RuntimeException if thrown by the TransactionCallback + */ + Flux execute(ReactiveTransactionCallback action) throws TransactionException; + +} diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/package-info.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/package-info.java new file mode 100644 index 00000000000..3b8526e87fa --- /dev/null +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/package-info.java @@ -0,0 +1,11 @@ +/** + * Support classes for the org.springframework.transaction.reactive package. + * Provides an abstract base class for reactive transaction manager implementations, + * and a transactional operator plus callback for transaction demarcation. + */ +@NonNullApi +@NonNullFields +package org.springframework.transaction.reactive; + +import org.springframework.lang.NonNullApi; +import org.springframework.lang.NonNullFields; diff --git a/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTestTransactionManager.java b/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTestTransactionManager.java new file mode 100644 index 00000000000..80db3e0afe1 --- /dev/null +++ b/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTestTransactionManager.java @@ -0,0 +1,99 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.transaction.reactive; + +import reactor.core.publisher.Mono; + +import org.springframework.transaction.CannotCreateTransactionException; +import org.springframework.transaction.ReactiveTransactionManager; +import org.springframework.transaction.TransactionDefinition; + +/** + * Test implementation of a {@link ReactiveTransactionManager}. + * + * @author Mark Paluch + */ +@SuppressWarnings("serial") +class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager { + + private static final Object TRANSACTION = "transaction"; + + private final boolean existingTransaction; + + private final boolean canCreateTransaction; + + protected boolean begin = false; + + protected boolean commit = false; + + protected boolean rollback = false; + + protected boolean rollbackOnly = false; + + + ReactiveTestTransactionManager(boolean existingTransaction, boolean canCreateTransaction) { + this.existingTransaction = existingTransaction; + this.canCreateTransaction = canCreateTransaction; + setTransactionSynchronization(SYNCHRONIZATION_NEVER); + } + + + @Override + protected Object doGetTransaction(ReactiveTransactionSynchronizationManager synchronizationManager) { + return TRANSACTION; + } + + @Override + protected boolean isExistingTransaction(Object transaction) { + return existingTransaction; + } + + @Override + protected Mono doBegin(ReactiveTransactionSynchronizationManager synchronizationManager, Object transaction, TransactionDefinition definition) { + if (!TRANSACTION.equals(transaction)) { + return Mono.error(new IllegalArgumentException("Not the same transaction object")); + } + if (!this.canCreateTransaction) { + return Mono.error(new CannotCreateTransactionException("Cannot create transaction")); + } + return Mono.fromRunnable(() -> this.begin = true); + } + + @Override + protected Mono doCommit(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) { + if (!TRANSACTION.equals(status.getTransaction())) { + return Mono.error(new IllegalArgumentException("Not the same transaction object")); + } + return Mono.fromRunnable(() -> this.commit = true); + } + + @Override + protected Mono doRollback(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) { + if (!TRANSACTION.equals(status.getTransaction())) { + return Mono.error(new IllegalArgumentException("Not the same transaction object")); + } + return Mono.fromRunnable(() -> this.rollback = true); + } + + @Override + protected Mono doSetRollbackOnly(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) { + if (!TRANSACTION.equals(status.getTransaction())) { + return Mono.error(new IllegalArgumentException("Not the same transaction object")); + } + return Mono.fromRunnable(() -> this.rollbackOnly = true); + } +} diff --git a/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTransactionSupportUnitTests.java b/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTransactionSupportUnitTests.java new file mode 100644 index 00000000000..258ec59160c --- /dev/null +++ b/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTransactionSupportUnitTests.java @@ -0,0 +1,228 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.transaction.reactive; + +import org.junit.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import org.springframework.transaction.IllegalTransactionStateException; +import org.springframework.transaction.ReactiveTransactionManager; +import org.springframework.transaction.ReactiveTransactionStatus; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.support.DefaultTransactionDefinition; + +import static org.junit.Assert.*; + +/** + * Unit tests for transactional support through {@link ReactiveTestTransactionManager}. + * + * @author Mark Paluch + */ +public class ReactiveTransactionSupportUnitTests { + + @Test + public void noExistingTransaction() { + ReactiveTransactionManager tm = new ReactiveTestTransactionManager(false, true); + + tm.getTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_SUPPORTS)) + .subscriberContext(TransactionContextManager.createTransactionContext()).cast(DefaultReactiveTransactionStatus.class) + .as(StepVerifier::create).consumeNextWith(actual -> { + assertFalse(actual.hasTransaction()); + }).verifyComplete(); + + tm.getTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED)) + .cast(DefaultReactiveTransactionStatus.class).subscriberContext(TransactionContextManager.createTransactionContext()) + .as(StepVerifier::create).consumeNextWith(actual -> { + assertTrue(actual.hasTransaction()); + assertTrue(actual.isNewTransaction()); + }).verifyComplete(); + + tm.getTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_MANDATORY)) + .subscriberContext(TransactionContextManager.createTransactionContext()).cast(DefaultReactiveTransactionStatus.class) + .as(StepVerifier::create).expectError(IllegalTransactionStateException.class).verify(); + } + + @Test + public void existingTransaction() { + ReactiveTransactionManager tm = new ReactiveTestTransactionManager(true, true); + + tm.getTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_SUPPORTS)) + .subscriberContext(TransactionContextManager.createTransactionContext()).cast(DefaultReactiveTransactionStatus.class) + .as(StepVerifier::create).consumeNextWith(actual -> { + assertNotNull(actual.getTransaction()); + assertFalse(actual.isNewTransaction()); + }).verifyComplete(); + + tm.getTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED)) + .subscriberContext(TransactionContextManager.createTransactionContext()).cast(DefaultReactiveTransactionStatus.class) + .as(StepVerifier::create).consumeNextWith(actual -> { + assertNotNull(actual.getTransaction()); + assertFalse(actual.isNewTransaction()); + }).verifyComplete(); + + tm.getTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_MANDATORY)) + .subscriberContext(TransactionContextManager.createTransactionContext()).cast(DefaultReactiveTransactionStatus.class) + .as(StepVerifier::create).consumeNextWith(actual -> { + assertNotNull(actual.getTransaction()); + assertFalse(actual.isNewTransaction()); + }).verifyComplete(); + } + + @Test + public void commitWithoutExistingTransaction() { + ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true); + tm.getTransaction(null).flatMap(tm::commit).subscriberContext(TransactionContextManager.createTransactionContext()) + .as(StepVerifier::create).verifyComplete(); + + assertHasBegan(tm); + assertHasCommitted(tm); + assertHasNoRollback(tm); + assertHasNotSetRollbackOnly(tm); + } + + @Test + public void rollbackWithoutExistingTransaction() { + ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true); + tm.getTransaction(null).flatMap(tm::rollback) + .subscriberContext(TransactionContextManager.createTransactionContext()).as(StepVerifier::create) + .verifyComplete(); + + assertHasBegan(tm); + assertHasNotCommitted(tm); + assertHasRolledBack(tm); + assertHasNotSetRollbackOnly(tm); + } + + @Test + public void rollbackOnlyWithoutExistingTransaction() { + ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true); + tm.getTransaction(null).doOnNext(ReactiveTransactionStatus::setRollbackOnly).flatMap(tm::commit) + .subscriberContext(TransactionContextManager.createTransactionContext()).as(StepVerifier::create) + .verifyComplete(); + + assertHasBegan(tm); + assertHasNotCommitted(tm); + assertHasRolledBack(tm); + assertHasNotSetRollbackOnly(tm); + } + + @Test + public void commitWithExistingTransaction() { + ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(true, true); + tm.getTransaction(null).flatMap(tm::commit).subscriberContext(TransactionContextManager.createTransactionContext()) + .as(StepVerifier::create).verifyComplete(); + + assertHasNotBegan(tm); + assertHasNotCommitted(tm); + assertHasNoRollback(tm); + assertHasNotSetRollbackOnly(tm); + } + + @Test + public void rollbackWithExistingTransaction() { + ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(true, true); + tm.getTransaction(null).flatMap(tm::rollback) + .subscriberContext(TransactionContextManager.createTransactionContext()).as(StepVerifier::create) + .verifyComplete(); + + assertHasNotBegan(tm); + assertHasNotCommitted(tm); + assertHasNoRollback(tm); + assertHasSetRollbackOnly(tm); + } + + @Test + public void rollbackOnlyWithExistingTransaction() { + ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(true, true); + tm.getTransaction(null).doOnNext(ReactiveTransactionStatus::setRollbackOnly).flatMap(tm::commit) + .subscriberContext(TransactionContextManager.createTransactionContext()).as(StepVerifier::create) + .verifyComplete(); + + assertHasNotBegan(tm); + assertHasNotCommitted(tm); + assertHasNoRollback(tm); + assertHasSetRollbackOnly(tm); + } + + @Test + public void transactionTemplate() { + ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true); + TransactionalOperator operator = TransactionalOperator.create(tm); + + Flux.just("Walter").as(operator::transactional) + .as(StepVerifier::create) + .expectNextCount(1) + .verifyComplete(); + + assertHasBegan(tm); + assertHasCommitted(tm); + assertHasNoRollback(tm); + assertHasNotSetRollbackOnly(tm); + } + + @Test + public void transactionTemplateWithException() { + ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true); + TransactionalOperator operator = TransactionalOperator.create(tm); + RuntimeException ex = new RuntimeException("Some application exception"); + + Mono.error(ex).as(operator::transactional) + .as(StepVerifier::create) + .expectError(RuntimeException.class) + .verify(); + + assertHasBegan(tm); + assertHasNotCommitted(tm); + assertHasRolledBack(tm); + assertHasNotSetRollbackOnly(tm); + } + + private void assertHasBegan(ReactiveTestTransactionManager actual) { + assertTrue("Expected but was was not invoked", actual.begin); + } + + private void assertHasNotBegan(ReactiveTestTransactionManager actual) { + assertFalse("Expected to not call but was was called", actual.begin); + } + + private void assertHasCommitted(ReactiveTestTransactionManager actual) { + assertTrue("Expected but was was not invoked", actual.commit); + } + + private void assertHasNotCommitted(ReactiveTestTransactionManager actual) { + assertFalse("Expected to not call but was was called", actual.commit); + } + + private void assertHasRolledBack(ReactiveTestTransactionManager actual) { + assertTrue("Expected but was was not invoked", actual.rollback); + } + + private void assertHasNoRollback(ReactiveTestTransactionManager actual) { +assertFalse("Expected to not call but was was called", actual.rollback); + } + + private void assertHasSetRollbackOnly(ReactiveTestTransactionManager actual) { + assertTrue("Expected but was was not invoked", actual.rollbackOnly); + } + + private void assertHasNotSetRollbackOnly(ReactiveTestTransactionManager actual) { + assertFalse("Expected to not call but was was called", actual.rollbackOnly); + } + +} diff --git a/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java b/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java new file mode 100644 index 00000000000..f52d893c5da --- /dev/null +++ b/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java @@ -0,0 +1,88 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.transaction.reactive; + +import org.junit.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import static org.junit.Assert.*; + +/** + * Tests for {@link TransactionalOperator}. + * + * @author Mark Paluch + */ +public class TransactionalOperatorTests { + + ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true); + + @Test + public void commitWithMono() { + + TransactionalOperator operator = TransactionalOperator.create(tm); + + Mono.just(true).as(operator::transactional) + .as(StepVerifier::create) + .expectNext(true) + .verifyComplete(); + + assertTrue(tm.commit); + assertFalse(tm.rollback); + } + + @Test + public void rollbackWithMono() { + + TransactionalOperator operator = TransactionalOperator.create(tm); + + Mono.error(new IllegalStateException()).as(operator::transactional) + .as(StepVerifier::create) + .verifyError(IllegalStateException.class); + + assertFalse(tm.commit); + assertTrue(tm.rollback); + } + + @Test + public void commitWithFlux() { + + TransactionalOperator operator = TransactionalOperator.create(tm); + + Flux.just(true).as(operator::transactional) + .as(StepVerifier::create) + .expectNext(true) + .verifyComplete(); + + assertTrue(tm.commit); + assertFalse(tm.rollback); + } + + @Test + public void rollbackWithFlux() { + + TransactionalOperator operator = TransactionalOperator.create(tm); + + Flux.error(new IllegalStateException()).as(operator::transactional) + .as(StepVerifier::create) + .verifyError(IllegalStateException.class); + + assertFalse(tm.commit); + assertTrue(tm.rollback); + } +}