diff --git a/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransaction.java b/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransaction.java new file mode 100644 index 00000000000..11a33e681b5 --- /dev/null +++ b/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransaction.java @@ -0,0 +1,36 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.transaction; + +/** + * Representation of an ongoing reactive transaction. + * This is currently a marker interface extending {@link TransactionExecution} + * but may acquire further methods in a future revision. + * + *

Transactional code can use this to retrieve status information, + * and to programmatically request a rollback (instead of throwing + * an exception that causes an implicit rollback). + * + * @author Mark Paluch + * @author Juergen Hoeller + * @since 5.2 + * @see #setRollbackOnly() + * @see ReactiveTransactionManager#getReactiveTransaction + */ +public interface ReactiveTransaction extends TransactionExecution { + +} diff --git a/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionManager.java b/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionManager.java index e30a3628314..553e4e329ff 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionManager.java +++ b/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionManager.java @@ -32,7 +32,7 @@ import reactor.core.publisher.Mono; public interface ReactiveTransactionManager { /** - * Emit a currently active transaction or create a new one, according to + * Emit a currently active reactive transaction or create a new one, according to * the specified propagation behavior. *

Note that parameters like isolation level or timeout will only be applied * to new transactions, and thus be ignored when participating in active ones. @@ -54,7 +54,7 @@ public interface ReactiveTransactionManager { * @see TransactionDefinition#getTimeout * @see TransactionDefinition#isReadOnly */ - Mono getTransaction(TransactionDefinition definition) throws TransactionException; + Mono getReactiveTransaction(TransactionDefinition definition) throws TransactionException; /** * Commit the given transaction, with regard to its status. If the transaction @@ -72,7 +72,7 @@ public interface ReactiveTransactionManager { * database right before commit, with the resulting DataAccessException * causing the transaction to fail. The original exception will be * propagated to the caller of this commit method in such a case. - * @param status object returned by the {@code getTransaction} method + * @param transaction object returned by the {@code getTransaction} method * @throws UnexpectedRollbackException in case of an unexpected rollback * that the transaction coordinator initiated * @throws HeuristicCompletionException in case of a transaction failure @@ -81,9 +81,9 @@ public interface ReactiveTransactionManager { * (typically caused by fundamental resource failures) * @throws IllegalTransactionStateException if the given transaction * is already completed (that is, committed or rolled back) - * @see ReactiveTransactionStatus#setRollbackOnly + * @see ReactiveTransaction#setRollbackOnly */ - Mono commit(ReactiveTransactionStatus status) throws TransactionException; + Mono commit(ReactiveTransaction transaction) throws TransactionException; /** * Perform a rollback of the given transaction. @@ -95,12 +95,12 @@ public interface ReactiveTransactionManager { * The transaction will already have been completed and cleaned up when commit * returns, even in case of a commit exception. Consequently, a rollback call * after commit failure will lead to an IllegalTransactionStateException. - * @param status object returned by the {@code getTransaction} method + * @param transaction object returned by the {@code getTransaction} method * @throws TransactionSystemException in case of rollback or system errors * (typically caused by fundamental resource failures) * @throws IllegalTransactionStateException if the given transaction * is already completed (that is, committed or rolled back) */ - Mono rollback(ReactiveTransactionStatus status) throws TransactionException; + Mono rollback(ReactiveTransaction transaction) throws TransactionException; } diff --git a/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionStatus.java b/spring-tx/src/main/java/org/springframework/transaction/TransactionExecution.java similarity index 61% rename from spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionStatus.java rename to spring-tx/src/main/java/org/springframework/transaction/TransactionExecution.java index 01d5f16ab06..d5d0706187e 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionStatus.java +++ b/spring-tx/src/main/java/org/springframework/transaction/TransactionExecution.java @@ -17,20 +17,14 @@ package org.springframework.transaction; /** - * Representation of the status of a transaction exposing a reactive - * interface. + * Common representation of the current state of a transaction. + * Serves as base interface for {@link TransactionStatus} as well as + * {@link ReactiveTransaction}. * - *

Transactional code can use this to retrieve status information, - * and to programmatically request a rollback (instead of throwing - * an exception that causes an implicit rollback). - * - * @author Mark Paluch * @author Juergen Hoeller * @since 5.2 - * @see #setRollbackOnly() - * @see ReactiveTransactionManager#getTransaction */ -public interface ReactiveTransactionStatus { +public interface TransactionExecution { /** * Return whether the present transaction is new; otherwise participating @@ -43,12 +37,6 @@ public interface ReactiveTransactionStatus { * Set the transaction rollback-only. This instructs the transaction manager * that the only possible outcome of the transaction may be a rollback, as * alternative to throwing an exception which would in turn trigger a rollback. - *

This is mainly intended for transactions managed by - * {@link org.springframework.transaction.reactive.TransactionalOperator} or - * {@link org.springframework.transaction.interceptor.TransactionInterceptor}, - * where the actual commit/rollback decision is made by the container. - * @see org.springframework.transaction.reactive.ReactiveTransactionCallback#doInTransaction - * @see org.springframework.transaction.interceptor.TransactionAttribute#rollbackOn */ void setRollbackOnly(); @@ -61,8 +49,6 @@ public interface ReactiveTransactionStatus { /** * Return whether this transaction is completed, that is, * whether it has already been committed or rolled back. - * @see ReactiveTransactionManager#commit - * @see ReactiveTransactionManager#rollback */ boolean isCompleted(); diff --git a/spring-tx/src/main/java/org/springframework/transaction/TransactionStatus.java b/spring-tx/src/main/java/org/springframework/transaction/TransactionStatus.java index 557cd772044..5968c57fd7b 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/TransactionStatus.java +++ b/spring-tx/src/main/java/org/springframework/transaction/TransactionStatus.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,14 +36,7 @@ import java.io.Flushable; * @see org.springframework.transaction.support.TransactionCallback#doInTransaction * @see org.springframework.transaction.interceptor.TransactionInterceptor#currentTransactionStatus() */ -public interface TransactionStatus extends SavepointManager, Flushable { - - /** - * Return whether the present transaction is new; otherwise participating - * in an existing transaction, or potentially not running in an actual - * transaction in the first place. - */ - boolean isNewTransaction(); +public interface TransactionStatus extends TransactionExecution, SavepointManager, Flushable { /** * Return whether this transaction internally carries a savepoint, @@ -58,25 +51,6 @@ public interface TransactionStatus extends SavepointManager, Flushable { */ boolean hasSavepoint(); - /** - * Set the transaction rollback-only. This instructs the transaction manager - * that the only possible outcome of the transaction may be a rollback, as - * alternative to throwing an exception which would in turn trigger a rollback. - *

This is mainly intended for transactions managed by - * {@link org.springframework.transaction.support.TransactionTemplate} or - * {@link org.springframework.transaction.interceptor.TransactionInterceptor}, - * where the actual commit/rollback decision is made by the container. - * @see org.springframework.transaction.support.TransactionCallback#doInTransaction - * @see org.springframework.transaction.interceptor.TransactionAttribute#rollbackOn - */ - void setRollbackOnly(); - - /** - * Return whether the transaction has been marked as rollback-only - * (either by the application or by the transaction infrastructure). - */ - boolean isRollbackOnly(); - /** * Flush the underlying session to the datastore, if applicable: * for example, all affected Hibernate/JPA sessions. @@ -88,12 +62,4 @@ public interface TransactionStatus extends SavepointManager, Flushable { @Override void flush(); - /** - * Return whether this transaction is completed, that is, - * whether it has already been committed or rolled back. - * @see PlatformTransactionManager#commit - * @see PlatformTransactionManager#rollback - */ - boolean isCompleted(); - } diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java index 849c6bced5c..6652f44e675 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java @@ -32,8 +32,8 @@ import reactor.core.publisher.Mono; import org.springframework.lang.Nullable; import org.springframework.transaction.IllegalTransactionStateException; import org.springframework.transaction.InvalidTimeoutException; +import org.springframework.transaction.ReactiveTransaction; import org.springframework.transaction.ReactiveTransactionManager; -import org.springframework.transaction.ReactiveTransactionStatus; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionException; import org.springframework.transaction.TransactionSuspensionNotSupportedException; @@ -74,7 +74,7 @@ import org.springframework.transaction.UnexpectedRollbackException; * @author Mark Paluch * @author Juergen Hoeller * @since 5.2 - * @see ReactiveTransactionSynchronizationManager + * @see TransactionSynchronizationManager */ @SuppressWarnings("serial") public abstract class AbstractReactiveTransactionManager implements ReactiveTransactionManager, Serializable { @@ -95,8 +95,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @see #doBegin */ @Override - public final Mono getTransaction(TransactionDefinition definition) throws TransactionException { - return ReactiveTransactionSynchronizationManager.currentTransaction() + public final Mono getReactiveTransaction(TransactionDefinition definition) throws TransactionException { + return TransactionSynchronizationManager.currentTransaction() .flatMap(synchronizationManager -> { Object transaction = doGetTransaction(synchronizationManager); @@ -124,7 +124,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { return TransactionContextManager.currentContext() - .map(ReactiveTransactionSynchronizationManager::new) + .map(TransactionSynchronizationManager::new) .flatMap(nestedSynchronizationManager -> suspend(nestedSynchronizationManager, null) .map(Optional::of) @@ -134,7 +134,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition); } return Mono.defer(() -> { - DefaultReactiveTransactionStatus status = newTransactionStatus( + GenericReactiveTransaction status = newReactiveTransaction( nestedSynchronizationManager, definition, transaction, true, debugEnabled, suspendedResources.orElse(null)); return doBegin(nestedSynchronizationManager, transaction, definition) @@ -151,15 +151,15 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + definition); } - return Mono.just(prepareTransactionStatus(synchronizationManager, definition, null, true, debugEnabled, null)); + return Mono.just(prepareReactiveTransaction(synchronizationManager, definition, null, true, debugEnabled, null)); } }); } /** - * Create a TransactionStatus for an existing transaction. + * Create a ReactiveTransaction for an existing transaction. */ - private Mono handleExistingTransaction(ReactiveTransactionSynchronizationManager synchronizationManager, + private Mono handleExistingTransaction(TransactionSynchronizationManager synchronizationManager, TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException { if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { @@ -172,11 +172,11 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran logger.debug("Suspending current transaction"); } Mono suspend = suspend(synchronizationManager, transaction); - return suspend.map(suspendedResources -> prepareTransactionStatus(synchronizationManager, + return suspend.map(suspendedResources -> prepareReactiveTransaction(synchronizationManager, definition, null, false, debugEnabled, suspendedResources)) // - .switchIfEmpty(Mono.fromSupplier(() -> prepareTransactionStatus(synchronizationManager, + .switchIfEmpty(Mono.fromSupplier(() -> prepareReactiveTransaction(synchronizationManager, definition, null, false, debugEnabled, null))) - .cast(ReactiveTransactionStatus.class); + .cast(ReactiveTransaction.class); } if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { @@ -186,7 +186,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran } Mono suspendedResources = suspend(synchronizationManager, transaction); return suspendedResources.flatMap(suspendedResourcesHolder -> { - DefaultReactiveTransactionStatus status = newTransactionStatus(synchronizationManager, + GenericReactiveTransaction status = newReactiveTransaction(synchronizationManager, definition, transaction, true, debugEnabled, suspendedResources); return doBegin(synchronizationManager, transaction, definition).doOnSuccess(ignore -> prepareSynchronization(synchronizationManager, status, definition)).thenReturn(status) @@ -200,7 +200,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran logger.debug("Creating nested transaction with name [" + definition.getName() + "]"); } // Nested transaction through nested begin and commit/rollback calls. - DefaultReactiveTransactionStatus status = newTransactionStatus(synchronizationManager, + GenericReactiveTransaction status = newReactiveTransaction(synchronizationManager, definition, transaction, true, debugEnabled, null); return doBegin(synchronizationManager, transaction, definition).doOnSuccess(ignore -> prepareSynchronization(synchronizationManager, status, definition)).thenReturn(status); @@ -210,33 +210,33 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran if (debugEnabled) { logger.debug("Participating in existing transaction"); } - return Mono.just(prepareTransactionStatus(synchronizationManager, definition, transaction, false, debugEnabled, null)); + return Mono.just(prepareReactiveTransaction(synchronizationManager, definition, transaction, false, debugEnabled, null)); } /** - * Create a new TransactionStatus for the given arguments, + * Create a new ReactiveTransaction for the given arguments, * also initializing transaction synchronization as appropriate. - * @see #newTransactionStatus - * @see #prepareTransactionStatus + * @see #newReactiveTransaction + * @see #prepareReactiveTransaction */ - private DefaultReactiveTransactionStatus prepareTransactionStatus( - ReactiveTransactionSynchronizationManager synchronizationManager, TransactionDefinition definition, + private GenericReactiveTransaction prepareReactiveTransaction( + TransactionSynchronizationManager synchronizationManager, TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, boolean debug, @Nullable Object suspendedResources) { - DefaultReactiveTransactionStatus status = newTransactionStatus(synchronizationManager, + GenericReactiveTransaction status = newReactiveTransaction(synchronizationManager, definition, transaction, newTransaction, debug, suspendedResources); prepareSynchronization(synchronizationManager, status, definition); return status; } /** - * Create a TransactionStatus instance for the given arguments. + * Create a ReactiveTransaction instance for the given arguments. */ - private DefaultReactiveTransactionStatus newTransactionStatus( - ReactiveTransactionSynchronizationManager synchronizationManager, TransactionDefinition definition, + private GenericReactiveTransaction newReactiveTransaction( + TransactionSynchronizationManager synchronizationManager, TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, boolean debug, @Nullable Object suspendedResources) { - return new DefaultReactiveTransactionStatus(transaction, newTransaction, + return new GenericReactiveTransaction(transaction, newTransaction, !synchronizationManager.isSynchronizationActive(), definition.isReadOnly(), debug, suspendedResources); } @@ -244,8 +244,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran /** * Initialize transaction synchronization as appropriate. */ - private void prepareSynchronization(ReactiveTransactionSynchronizationManager synchronizationManager, - DefaultReactiveTransactionStatus status, TransactionDefinition definition) { + private void prepareSynchronization(TransactionSynchronizationManager synchronizationManager, + GenericReactiveTransaction status, TransactionDefinition definition) { if (status.isNewSynchronization()) { synchronizationManager.setActualTransactionActive(status.hasTransaction()); @@ -270,11 +270,11 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @see #doSuspend * @see #resume */ - private Mono suspend(ReactiveTransactionSynchronizationManager synchronizationManager, + private Mono suspend(TransactionSynchronizationManager synchronizationManager, @Nullable Object transaction) throws TransactionException { if (synchronizationManager.isSynchronizationActive()) { - Mono> suspendedSynchronizations = doSuspendSynchronization(synchronizationManager); + Mono> suspendedSynchronizations = doSuspendSynchronization(synchronizationManager); return suspendedSynchronizations.flatMap(synchronizations -> { Mono> suspendedResources = (transaction != null ? doSuspend(synchronizationManager, transaction).map(Optional::of).defaultIfEmpty(Optional.empty()) : Mono.just(Optional.empty())); return suspendedResources.map(it -> { @@ -313,7 +313,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @see #doResume * @see #suspend */ - private Mono resume(ReactiveTransactionSynchronizationManager synchronizationManager, + private Mono resume(TransactionSynchronizationManager synchronizationManager, @Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder) throws TransactionException { @@ -322,7 +322,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran if (suspendedResources != null) { return doResume(synchronizationManager, transaction, suspendedResources); } - List suspendedSynchronizations = resourcesHolder.suspendedSynchronizations; + List suspendedSynchronizations = resourcesHolder.suspendedSynchronizations; if (suspendedSynchronizations != null) { synchronizationManager.setActualTransactionActive(resourcesHolder.wasActive); synchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel); @@ -338,7 +338,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran /** * Resume outer transaction after inner transaction begin failed. */ - private Mono resumeAfterBeginException(ReactiveTransactionSynchronizationManager synchronizationManager, + private Mono resumeAfterBeginException(TransactionSynchronizationManager synchronizationManager, Object transaction, @Nullable SuspendedResourcesHolder suspendedResources, Throwable beginEx) { String exMessage = "Inner transaction begin exception overridden by outer transaction resume exception"; @@ -350,14 +350,14 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * Suspend all current synchronizations and deactivate transaction * synchronization for the current transaction context. * @param synchronizationManager the synchronization manager bound to the current transaction - * @return the List of suspended ReactiveTransactionSynchronization objects + * @return the List of suspended TransactionSynchronization objects */ - private Mono> doSuspendSynchronization( - ReactiveTransactionSynchronizationManager synchronizationManager) { + private Mono> doSuspendSynchronization( + TransactionSynchronizationManager synchronizationManager) { - List suspendedSynchronizations = synchronizationManager.getSynchronizations(); + List suspendedSynchronizations = synchronizationManager.getSynchronizations(); return Flux.fromIterable(suspendedSynchronizations) - .concatMap(ReactiveTransactionSynchronization::suspend) + .concatMap(TransactionSynchronization::suspend) .then(Mono.defer(() -> { synchronizationManager.clearSynchronization(); return Mono.just(suspendedSynchronizations); @@ -368,10 +368,10 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * Reactivate transaction synchronization for the current transaction context * and resume all given synchronizations. * @param synchronizationManager the synchronization manager bound to the current transaction - * @param suspendedSynchronizations a List of ReactiveTransactionSynchronization objects + * @param suspendedSynchronizations a List of TransactionSynchronization objects */ - private Mono doResumeSynchronization(ReactiveTransactionSynchronizationManager synchronizationManager, - List suspendedSynchronizations) { + private Mono doResumeSynchronization(TransactionSynchronizationManager synchronizationManager, + List suspendedSynchronizations) { synchronizationManager.initSynchronization(); return Flux.fromIterable(suspendedSynchronizations) @@ -385,26 +385,26 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * transactions and programmatic rollback requests. * Delegates to {@code isRollbackOnly}, {@code doCommit} * and {@code rollback}. - * @see ReactiveTransactionStatus#isRollbackOnly() + * @see ReactiveTransaction#isRollbackOnly() * @see #doCommit * @see #rollback */ @Override - public final Mono commit(ReactiveTransactionStatus status) throws TransactionException { - if (status.isCompleted()) { + public final Mono commit(ReactiveTransaction transaction) throws TransactionException { + if (transaction.isCompleted()) { return Mono.error(new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction")); } - return ReactiveTransactionSynchronizationManager.currentTransaction().flatMap(synchronizationManager -> { - DefaultReactiveTransactionStatus defStatus = (DefaultReactiveTransactionStatus) status; - if (defStatus.isRollbackOnly()) { - if (defStatus.isDebug()) { + return TransactionSynchronizationManager.currentTransaction().flatMap(synchronizationManager -> { + GenericReactiveTransaction reactiveTx = (GenericReactiveTransaction) transaction; + if (reactiveTx.isRollbackOnly()) { + if (reactiveTx.isDebug()) { logger.debug("Transactional code has requested rollback"); } - return processRollback(synchronizationManager, defStatus); + return processRollback(synchronizationManager, reactiveTx); } - return processCommit(synchronizationManager, defStatus); + return processCommit(synchronizationManager, reactiveTx); }); } @@ -415,8 +415,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @param status object representing the transaction * @throws TransactionException in case of commit failure */ - private Mono processCommit(ReactiveTransactionSynchronizationManager synchronizationManager, - DefaultReactiveTransactionStatus status) throws TransactionException { + private Mono processCommit(TransactionSynchronizationManager synchronizationManager, + GenericReactiveTransaction status) throws TransactionException { AtomicBoolean beforeCompletionInvoked = new AtomicBoolean(false); @@ -435,10 +435,10 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran })).then(Mono.empty().onErrorResume(ex -> { Mono propagateException = Mono.error(ex); if (ErrorPredicates.UNEXPECTED_ROLLBACK.test(ex)) { - return triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_ROLLED_BACK).then(propagateException); + return triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK).then(propagateException); } if (ErrorPredicates.TRANSACTION_EXCEPTION.test(ex)) { - triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_UNKNOWN).then(propagateException); + triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_UNKNOWN).then(propagateException); } if (ErrorPredicates.RUNTIME_OR_ERROR.test(ex)) { Mono mono; @@ -453,8 +453,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran return propagateException; })).then(Mono.defer(() -> triggerAfterCommit(synchronizationManager, status).onErrorResume(ex -> - triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_COMMITTED).then(Mono.error(ex))) - .then(triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_COMMITTED)))); + triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED).then(Mono.error(ex))) + .then(triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED)))); return commit .onErrorResume(ex -> cleanupAfterCompletion(synchronizationManager, status) @@ -468,14 +468,14 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @see #doSetRollbackOnly */ @Override - public final Mono rollback(ReactiveTransactionStatus status) throws TransactionException { - if (status.isCompleted()) { + public final Mono rollback(ReactiveTransaction transaction) throws TransactionException { + if (transaction.isCompleted()) { return Mono.error(new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction")); } - return ReactiveTransactionSynchronizationManager.currentTransaction().flatMap(synchronizationManager -> { - DefaultReactiveTransactionStatus defStatus = (DefaultReactiveTransactionStatus) status; - return processRollback(synchronizationManager, defStatus); + return TransactionSynchronizationManager.currentTransaction().flatMap(synchronizationManager -> { + GenericReactiveTransaction reactiveTx = (GenericReactiveTransaction) transaction; + return processRollback(synchronizationManager, reactiveTx); }); } @@ -486,8 +486,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @param status object representing the transaction * @throws TransactionException in case of rollback failure */ - private Mono processRollback(ReactiveTransactionSynchronizationManager synchronizationManager, - DefaultReactiveTransactionStatus status) { + private Mono processRollback(TransactionSynchronizationManager synchronizationManager, + GenericReactiveTransaction status) { return triggerBeforeCompletion(synchronizationManager, status).then(Mono.defer(() -> { if (status.isNewTransaction()) { @@ -511,9 +511,9 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran return beforeCompletion; } })).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, ex -> triggerAfterCompletion( - synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_UNKNOWN) + synchronizationManager, status, TransactionSynchronization.STATUS_UNKNOWN) .then(Mono.error(ex))) - .then(Mono.defer(() -> triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_ROLLED_BACK))) + .then(Mono.defer(() -> triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK))) .onErrorResume(ex -> cleanupAfterCompletion(synchronizationManager, status).then(Mono.error(ex))) .then(cleanupAfterCompletion(synchronizationManager, status)); } @@ -526,8 +526,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @throws TransactionException in case of rollback failure * @see #doRollback */ - private Mono doRollbackOnCommitException(ReactiveTransactionSynchronizationManager synchronizationManager, - DefaultReactiveTransactionStatus status, Throwable ex) throws TransactionException { + private Mono doRollbackOnCommitException(TransactionSynchronizationManager synchronizationManager, + GenericReactiveTransaction status, Throwable ex) throws TransactionException { return Mono.defer(() -> { if (status.isNewTransaction()) { @@ -545,9 +545,9 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran return Mono.empty(); }).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, rbex -> { logger.error("Commit exception overridden by rollback exception", ex); - return triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_UNKNOWN) + return triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_UNKNOWN) .then(Mono.error(rbex)); - }).then(triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_ROLLED_BACK)); + }).then(triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK)); } @@ -556,14 +556,14 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @param synchronizationManager the synchronization manager bound to the current transaction * @param status object representing the transaction */ - private Mono triggerBeforeCommit(ReactiveTransactionSynchronizationManager synchronizationManager, - DefaultReactiveTransactionStatus status) { + private Mono triggerBeforeCommit(TransactionSynchronizationManager synchronizationManager, + GenericReactiveTransaction status) { if (status.isNewSynchronization()) { if (status.isDebug()) { logger.trace("Triggering beforeCommit synchronization"); } - return ReactiveTransactionSynchronizationUtils.triggerBeforeCommit(synchronizationManager.getSynchronizations(), status.isReadOnly()); + return TransactionSynchronizationUtils.triggerBeforeCommit(synchronizationManager.getSynchronizations(), status.isReadOnly()); } return Mono.empty(); @@ -574,14 +574,14 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @param synchronizationManager the synchronization manager bound to the current transaction * @param status object representing the transaction */ - private Mono triggerBeforeCompletion(ReactiveTransactionSynchronizationManager synchronizationManager, - DefaultReactiveTransactionStatus status) { + private Mono triggerBeforeCompletion(TransactionSynchronizationManager synchronizationManager, + GenericReactiveTransaction status) { if (status.isNewSynchronization()) { if (status.isDebug()) { logger.trace("Triggering beforeCompletion synchronization"); } - return ReactiveTransactionSynchronizationUtils.triggerBeforeCompletion(synchronizationManager.getSynchronizations()); + return TransactionSynchronizationUtils.triggerBeforeCompletion(synchronizationManager.getSynchronizations()); } return Mono.empty(); @@ -592,14 +592,14 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @param synchronizationManager the synchronization manager bound to the current transaction * @param status object representing the transaction */ - private Mono triggerAfterCommit(ReactiveTransactionSynchronizationManager synchronizationManager, - DefaultReactiveTransactionStatus status) { + private Mono triggerAfterCommit(TransactionSynchronizationManager synchronizationManager, + GenericReactiveTransaction status) { if (status.isNewSynchronization()) { if (status.isDebug()) { logger.trace("Triggering afterCommit synchronization"); } - return ReactiveTransactionSynchronizationUtils.invokeAfterCommit(synchronizationManager.getSynchronizations()); + return TransactionSynchronizationUtils.invokeAfterCommit(synchronizationManager.getSynchronizations()); } return Mono.empty(); @@ -609,13 +609,13 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * Trigger {@code afterCompletion} callbacks. * @param synchronizationManager the synchronization manager bound to the current transaction * @param status object representing the transaction - * @param completionStatus completion status according to ReactiveTransactionSynchronization constants + * @param completionStatus completion status according to TransactionSynchronization constants */ - private Mono triggerAfterCompletion(ReactiveTransactionSynchronizationManager synchronizationManager, - DefaultReactiveTransactionStatus status, int completionStatus) { + private Mono triggerAfterCompletion(TransactionSynchronizationManager synchronizationManager, + GenericReactiveTransaction status, int completionStatus) { if (status.isNewSynchronization()) { - List synchronizations = synchronizationManager.getSynchronizations(); + List synchronizations = synchronizationManager.getSynchronizations(); synchronizationManager.clearSynchronization(); if (!status.hasTransaction() || status.isNewTransaction()) { if (status.isDebug()) { @@ -638,22 +638,22 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran /** * Actually invoke the {@code afterCompletion} methods of the - * given Spring ReactiveTransactionSynchronization objects. + * given TransactionSynchronization objects. *

To be called by this abstract manager itself, or by special implementations * of the {@code registerAfterCompletionWithExistingTransaction} callback. * @param synchronizationManager the synchronization manager bound to the current transaction - * @param synchronizations a List of ReactiveTransactionSynchronization objects + * @param synchronizations a List of TransactionSynchronization objects * @param completionStatus the completion status according to the - * constants in the ReactiveTransactionSynchronization interface - * @see #registerAfterCompletionWithExistingTransaction(ReactiveTransactionSynchronizationManager, Object, List) - * @see ReactiveTransactionSynchronization#STATUS_COMMITTED - * @see ReactiveTransactionSynchronization#STATUS_ROLLED_BACK - * @see ReactiveTransactionSynchronization#STATUS_UNKNOWN + * constants in the TransactionSynchronization interface + * @see #registerAfterCompletionWithExistingTransaction(TransactionSynchronizationManager, Object, List) + * @see TransactionSynchronization#STATUS_COMMITTED + * @see TransactionSynchronization#STATUS_ROLLED_BACK + * @see TransactionSynchronization#STATUS_UNKNOWN */ - private Mono invokeAfterCompletion(ReactiveTransactionSynchronizationManager synchronizationManager, - List synchronizations, int completionStatus) { + private Mono invokeAfterCompletion(TransactionSynchronizationManager synchronizationManager, + List synchronizations, int completionStatus) { - return ReactiveTransactionSynchronizationUtils.invokeAfterCompletion(synchronizations, completionStatus); + return TransactionSynchronizationUtils.invokeAfterCompletion(synchronizations, completionStatus); } /** @@ -663,8 +663,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @param status object representing the transaction * @see #doCleanupAfterCompletion */ - private Mono cleanupAfterCompletion(ReactiveTransactionSynchronizationManager synchronizationManager, - DefaultReactiveTransactionStatus status) { + private Mono cleanupAfterCompletion(TransactionSynchronizationManager synchronizationManager, + GenericReactiveTransaction status) { return Mono.defer(() -> { status.setCompleted(); @@ -710,9 +710,9 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @see #doBegin * @see #doCommit * @see #doRollback - * @see DefaultReactiveTransactionStatus#getTransaction + * @see GenericReactiveTransaction#getTransaction */ - protected abstract Object doGetTransaction(ReactiveTransactionSynchronizationManager synchronizationManager) throws TransactionException; + protected abstract Object doGetTransaction(TransactionSynchronizationManager synchronizationManager) throws TransactionException; /** * Check if the given transaction object indicates an existing transaction @@ -752,7 +752,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @throws org.springframework.transaction.NestedTransactionNotSupportedException * if the underlying transaction does not support nesting (e.g. through savepoints) */ - protected abstract Mono doBegin(ReactiveTransactionSynchronizationManager synchronizationManager, + protected abstract Mono doBegin(TransactionSynchronizationManager synchronizationManager, Object transaction, TransactionDefinition definition) throws TransactionException; /** @@ -768,7 +768,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @throws TransactionException in case of system errors * @see #doResume */ - protected Mono doSuspend(ReactiveTransactionSynchronizationManager synchronizationManager, + protected Mono doSuspend(TransactionSynchronizationManager synchronizationManager, Object transaction) throws TransactionException { throw new TransactionSuspensionNotSupportedException( @@ -788,7 +788,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @throws TransactionException in case of system errors * @see #doSuspend */ - protected Mono doResume(ReactiveTransactionSynchronizationManager synchronizationManager, + protected Mono doResume(TransactionSynchronizationManager synchronizationManager, @Nullable Object transaction, Object suspendedResources) throws TransactionException { throw new TransactionSuspensionNotSupportedException( @@ -805,8 +805,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @throws RuntimeException in case of errors; will be propagated to the caller * (note: do not throw TransactionException subclasses here!) */ - protected Mono prepareForCommit(ReactiveTransactionSynchronizationManager synchronizationManager, - DefaultReactiveTransactionStatus status) { + protected Mono prepareForCommit(TransactionSynchronizationManager synchronizationManager, + GenericReactiveTransaction status) { return Mono.empty(); } @@ -820,10 +820,10 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @param synchronizationManager the synchronization manager bound to the current transaction * @param status the status representation of the transaction * @throws TransactionException in case of commit or system errors - * @see DefaultReactiveTransactionStatus#getTransaction + * @see GenericReactiveTransaction#getTransaction */ - protected abstract Mono doCommit(ReactiveTransactionSynchronizationManager synchronizationManager, - DefaultReactiveTransactionStatus status) throws TransactionException; + protected abstract Mono doCommit(TransactionSynchronizationManager synchronizationManager, + GenericReactiveTransaction status) throws TransactionException; /** * Perform an actual rollback of the given transaction. @@ -833,10 +833,10 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @param synchronizationManager the synchronization manager bound to the current transaction * @param status the status representation of the transaction * @throws TransactionException in case of system errors - * @see DefaultReactiveTransactionStatus#getTransaction + * @see GenericReactiveTransaction#getTransaction */ - protected abstract Mono doRollback(ReactiveTransactionSynchronizationManager synchronizationManager, - DefaultReactiveTransactionStatus status) throws TransactionException; + protected abstract Mono doRollback(TransactionSynchronizationManager synchronizationManager, + GenericReactiveTransaction status) throws TransactionException; /** * Set the given transaction rollback-only. Only called on rollback @@ -848,8 +848,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @param status the status representation of the transaction * @throws TransactionException in case of system errors */ - protected Mono doSetRollbackOnly(ReactiveTransactionSynchronizationManager synchronizationManager, - DefaultReactiveTransactionStatus status) throws TransactionException { + protected Mono doSetRollbackOnly(TransactionSynchronizationManager synchronizationManager, + GenericReactiveTransaction status) throws TransactionException { throw new IllegalTransactionStateException( "Participating in existing transactions is not supported - when 'isExistingTransaction' " + @@ -866,18 +866,18 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * chance to determine the actual outcome of the outer transaction. * @param synchronizationManager the synchronization manager bound to the current transaction * @param transaction transaction object returned by {@code doGetTransaction} - * @param synchronizations a List of ReactiveTransactionSynchronization objects + * @param synchronizations a List of TransactionSynchronization objects * @throws TransactionException in case of system errors - * @see #invokeAfterCompletion(ReactiveTransactionSynchronizationManager, List, int) - * @see ReactiveTransactionSynchronization#afterCompletion(int) - * @see ReactiveTransactionSynchronization#STATUS_UNKNOWN + * @see #invokeAfterCompletion(TransactionSynchronizationManager, List, int) + * @see TransactionSynchronization#afterCompletion(int) + * @see TransactionSynchronization#STATUS_UNKNOWN */ - protected Mono registerAfterCompletionWithExistingTransaction(ReactiveTransactionSynchronizationManager synchronizationManager, - Object transaction, List synchronizations) throws TransactionException { + protected Mono registerAfterCompletionWithExistingTransaction(TransactionSynchronizationManager synchronizationManager, + Object transaction, List synchronizations) throws TransactionException { logger.debug("Cannot register Spring after-completion synchronization with existing transaction - " + "processing Spring after-completion callbacks immediately, with outcome status 'unknown'"); - return invokeAfterCompletion(synchronizationManager, synchronizations, ReactiveTransactionSynchronization.STATUS_UNKNOWN); + return invokeAfterCompletion(synchronizationManager, synchronizations, TransactionSynchronization.STATUS_UNKNOWN); } /** @@ -888,7 +888,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @param synchronizationManager the synchronization manager bound to the current transaction * @param transaction transaction object returned by {@code doGetTransaction} */ - protected Mono doCleanupAfterCompletion(ReactiveTransactionSynchronizationManager synchronizationManager, + protected Mono doCleanupAfterCompletion(TransactionSynchronizationManager synchronizationManager, Object transaction) { return Mono.empty(); @@ -918,7 +918,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran private final Object suspendedResources; @Nullable - private List suspendedSynchronizations; + private List suspendedSynchronizations; @Nullable private String name; @@ -935,7 +935,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran } private SuspendedResourcesHolder( - @Nullable Object suspendedResources, List suspendedSynchronizations, + @Nullable Object suspendedResources, List suspendedSynchronizations, @Nullable String name, boolean readOnly, @Nullable Integer isolationLevel, boolean wasActive) { this.suspendedResources = suspendedResources; diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionStatus.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionStatus.java deleted file mode 100644 index 6b7552004c1..00000000000 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionStatus.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright 2002-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.transaction.reactive; - -import org.springframework.transaction.ReactiveTransactionStatus; - -/** - * Abstract base implementation of the {@link ReactiveTransactionStatus} interface. - * - *

Pre-implements the handling of local rollback-only and completed flags. - * - *

Does not assume any specific internal transaction handling, such as an - * underlying transaction object, and no transaction synchronization mechanism. - * - * @author Mark Paluch - * @author Juergen Hoeller - * @since 5.2 - * @see #setRollbackOnly() - * @see #isRollbackOnly() - * @see #setCompleted() - * @see #isCompleted() - * @see DefaultReactiveTransactionStatus - */ -public abstract class AbstractReactiveTransactionStatus implements ReactiveTransactionStatus { - - private boolean rollbackOnly = false; - - private boolean completed = false; - - - //--------------------------------------------------------------------- - // Handling of current transaction state - //--------------------------------------------------------------------- - - @Override - public void setRollbackOnly() { - this.rollbackOnly = true; - } - - /** - * Determine the rollback-only flag via checking this ReactiveTransactionStatus. - *

Will only return "true" if the application called {@code setRollbackOnly} - * on this TransactionStatus object. - */ - @Override - public boolean isRollbackOnly() { - return this.rollbackOnly; - } - - /** - * Mark this transaction as completed, that is, committed or rolled back. - */ - public void setCompleted() { - this.completed = true; - } - - @Override - public boolean isCompleted() { - return this.completed; - } - -} diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/DefaultReactiveTransactionStatus.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/GenericReactiveTransaction.java similarity index 82% rename from spring-tx/src/main/java/org/springframework/transaction/reactive/DefaultReactiveTransactionStatus.java rename to spring-tx/src/main/java/org/springframework/transaction/reactive/GenericReactiveTransaction.java index 14dd58782ee..702392368a8 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/DefaultReactiveTransactionStatus.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/GenericReactiveTransaction.java @@ -17,11 +17,11 @@ package org.springframework.transaction.reactive; import org.springframework.lang.Nullable; -import org.springframework.transaction.ReactiveTransactionStatus; +import org.springframework.transaction.ReactiveTransaction; import org.springframework.util.Assert; /** - * Default implementation of the {@link ReactiveTransactionStatus} interface, + * Default implementation of the {@link ReactiveTransaction} interface, * used by {@link AbstractReactiveTransactionManager}. Based on the concept * of an underlying "transaction object". * @@ -38,7 +38,7 @@ import org.springframework.util.Assert; * @see AbstractReactiveTransactionManager * @see #getTransaction */ -public class DefaultReactiveTransactionStatus extends AbstractReactiveTransactionStatus { +public class GenericReactiveTransaction implements ReactiveTransaction { @Nullable private final Object transaction; @@ -54,6 +54,10 @@ public class DefaultReactiveTransactionStatus extends AbstractReactiveTransactio @Nullable private final Object suspendedResources; + private boolean rollbackOnly = false; + + private boolean completed = false; + /** * Create a new {@code DefaultReactiveTransactionStatus} instance. @@ -70,7 +74,7 @@ public class DefaultReactiveTransactionStatus extends AbstractReactiveTransactio * @param suspendedResources a holder for resources that have been suspended * for this transaction, if any */ - public DefaultReactiveTransactionStatus( + public GenericReactiveTransaction( @Nullable Object transaction, boolean newTransaction, boolean newSynchronization, boolean readOnly, boolean debug, @Nullable Object suspendedResources) { @@ -137,4 +141,31 @@ public class DefaultReactiveTransactionStatus extends AbstractReactiveTransactio return this.suspendedResources; } + @Override + public void setRollbackOnly() { + this.rollbackOnly = true; + } + + /** + * Determine the rollback-only flag via checking this ReactiveTransactionStatus. + *

Will only return "true" if the application called {@code setRollbackOnly} + * on this TransactionStatus object. + */ + @Override + public boolean isRollbackOnly() { + return this.rollbackOnly; + } + + /** + * Mark this transaction as completed, that is, committed or rolled back. + */ + public void setCompleted() { + this.completed = true; + } + + @Override + public boolean isCompleted() { + return this.completed; + } + } diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveResourceHolderSynchronization.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveResourceSynchronization.java similarity index 71% rename from spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveResourceHolderSynchronization.java rename to spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveResourceSynchronization.java index 188587626a3..e2c4992660b 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveResourceHolderSynchronization.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveResourceSynchronization.java @@ -18,40 +18,38 @@ package org.springframework.transaction.reactive; import reactor.core.publisher.Mono; -import org.springframework.transaction.support.ResourceHolder; - /** - * {@link ReactiveTransactionSynchronization} implementation that manages a - * {@link ResourceHolder} bound through {@link ReactiveTransactionSynchronizationManager}. + * {@link TransactionSynchronization} implementation that manages a + * resource object bound through {@link TransactionSynchronizationManager}. * * @author Mark Paluch + * @author Juergen Hoeller * @since 5.2 - * @param the resource holder type + * @param the resource holder type * @param the resource key type */ -public abstract class ReactiveResourceHolderSynchronization - implements ReactiveTransactionSynchronization { +public abstract class ReactiveResourceSynchronization implements TransactionSynchronization { - private final H resourceHolder; + private final O resourceObject; private final K resourceKey; - private final ReactiveTransactionSynchronizationManager synchronizationManager; + private final TransactionSynchronizationManager synchronizationManager; private volatile boolean holderActive = true; /** - * Create a new ResourceHolderSynchronization for the given holder. - * @param resourceHolder the ResourceHolder to manage - * @param resourceKey the key to bind the ResourceHolder for + * Create a new ReactiveResourceSynchronization for the given holder. + * @param resourceObject the resource object to manage + * @param resourceKey the key to bind the resource object for * @param synchronizationManager the synchronization manager bound to the current transaction - * @see ReactiveTransactionSynchronizationManager#bindResource + * @see TransactionSynchronizationManager#bindResource */ - public ReactiveResourceHolderSynchronization( - H resourceHolder, K resourceKey, ReactiveTransactionSynchronizationManager synchronizationManager) { + public ReactiveResourceSynchronization( + O resourceObject, K resourceKey, TransactionSynchronizationManager synchronizationManager) { - this.resourceHolder = resourceHolder; + this.resourceObject = resourceObject; this.resourceKey = resourceKey; this.synchronizationManager = synchronizationManager; } @@ -68,7 +66,7 @@ public abstract class ReactiveResourceHolderSynchronization resume() { if (this.holderActive) { - this.synchronizationManager.bindResource(this.resourceKey, this.resourceHolder); + this.synchronizationManager.bindResource(this.resourceKey, this.resourceObject); } return Mono.empty(); } @@ -84,7 +82,7 @@ public abstract class ReactiveResourceHolderSynchronization afterCommit() { if (!shouldReleaseBeforeCompletion()) { - return processResourceAfterCommit(this.resourceHolder); + return processResourceAfterCommit(this.resourceObject); } return Mono.empty(); } @@ -109,21 +107,20 @@ public abstract class ReactiveResourceHolderSynchronization this.resourceHolder.reset()); + return sync; }); } @@ -157,7 +154,7 @@ public abstract class ReactiveResourceHolderSynchronization processResourceAfterCommit(H resourceHolder) { + protected Mono processResourceAfterCommit(O resourceHolder) { return Mono.empty(); } /** * Release the given resource (after it has been unbound from the thread). * @param resourceHolder the resource holder to process - * @param resourceKey the key that the ResourceHolder was bound for + * @param resourceKey the key that the resource object was bound for */ - protected Mono releaseResource(H resourceHolder, K resourceKey) { + protected Mono releaseResource(O resourceHolder, K resourceKey) { return Mono.empty(); } /** * Perform a cleanup on the given resource (which is left bound to the thread). * @param resourceHolder the resource holder to process - * @param resourceKey the key that the ResourceHolder was bound for + * @param resourceKey the key that the resource object was bound for * @param committed whether the transaction has committed ({@code true}) * or rolled back ({@code false}) */ - protected Mono cleanupResource(H resourceHolder, K resourceKey, boolean committed) { + protected Mono cleanupResource(O resourceHolder, K resourceKey, boolean committed) { return Mono.empty(); } diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionCallback.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionCallback.java similarity index 90% rename from spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionCallback.java rename to spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionCallback.java index 4ee74803f11..9ba759f0d55 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionCallback.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionCallback.java @@ -18,7 +18,7 @@ package org.springframework.transaction.reactive; import org.reactivestreams.Publisher; -import org.springframework.transaction.ReactiveTransactionStatus; +import org.springframework.transaction.ReactiveTransaction; /** * Callback interface for reactive transactional code. Used with {@link TransactionalOperator}'s @@ -30,12 +30,13 @@ import org.springframework.transaction.ReactiveTransactionStatus; * Spring's {@link org.springframework.transaction.annotation.Transactional} annotation). * * @author Mark Paluch + * @author Juergen Hoeller * @since 5.2 * @see TransactionalOperator * @param the result type */ @FunctionalInterface -public interface ReactiveTransactionCallback { +public interface TransactionCallback { /** * Gets called by {@link TransactionalOperator} within a transactional context. @@ -46,6 +47,6 @@ public interface ReactiveTransactionCallback { * @return a result publisher * @see TransactionalOperator#transactional */ - Publisher doInTransaction(ReactiveTransactionStatus status); + Publisher doInTransaction(ReactiveTransaction status); } diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContext.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContext.java index 760d65303dd..a13c8caee73 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContext.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContext.java @@ -45,7 +45,7 @@ public class TransactionContext { private final Map resources = new LinkedHashMap<>(); @Nullable - private Set synchronizations; + private Set synchronizations; private volatile @Nullable String currentTransactionName; @@ -85,12 +85,12 @@ public class TransactionContext { return this.resources; } - public void setSynchronizations(@Nullable Set synchronizations) { + public void setSynchronizations(@Nullable Set synchronizations) { this.synchronizations = synchronizations; } @Nullable - public Set getSynchronizations() { + public Set getSynchronizations() { return this.synchronizations; } diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContextManager.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContextManager.java index f35e8830b14..53dd59f23db 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContextManager.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContextManager.java @@ -33,7 +33,7 @@ import org.springframework.transaction.NoTransactionException; * * @author Mark Paluch * @since 5.2 - * @see ReactiveTransactionSynchronization + * @see TransactionSynchronization */ public abstract class TransactionContextManager { @@ -80,7 +80,7 @@ public abstract class TransactionContextManager { /** * Return a {@link Function} to create or associate a new {@link TransactionContext}. * Interaction with transactional resources through - * {@link ReactiveTransactionSynchronizationManager} requires a TransactionContext + * {@link TransactionSynchronizationManager} requires a TransactionContext * to be registered in the subscriber context. * @return functional context registration. */ diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronization.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionSynchronization.java similarity index 94% rename from spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronization.java rename to spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionSynchronization.java index ed10f52530f..ac8309f4c8f 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronization.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionSynchronization.java @@ -22,7 +22,7 @@ import reactor.core.publisher.Mono; * Interface for reactive transaction synchronization callbacks. * Supported by {@link AbstractReactiveTransactionManager}. * - *

ReactiveTransactionSynchronization implementations can implement the + *

TransactionSynchronization implementations can implement the * {@link org.springframework.core.Ordered} interface to influence their execution order. * A synchronization that does not implement the {@link org.springframework.core.Ordered} * interface is appended to the end of the synchronization chain. @@ -31,11 +31,12 @@ import reactor.core.publisher.Mono; * allowing for fine-grained interaction with their execution order (if necessary). * * @author Mark Paluch + * @author Juergen Hoeller * @since 5.2 - * @see ReactiveTransactionSynchronizationManager + * @see TransactionSynchronizationManager * @see AbstractReactiveTransactionManager */ -public interface ReactiveTransactionSynchronization { +public interface TransactionSynchronization { /** Completion status in case of proper commit. */ int STATUS_COMMITTED = 0; @@ -50,7 +51,7 @@ public interface ReactiveTransactionSynchronization { /** * Suspend this synchronization. * Supposed to unbind resources from TransactionSynchronizationManager if managing any. - * @see ReactiveTransactionSynchronizationManager#unbindResource + * @see TransactionSynchronizationManager#unbindResource */ default Mono suspend() { return Mono.empty(); @@ -59,7 +60,7 @@ public interface ReactiveTransactionSynchronization { /** * Resume this synchronization. * Supposed to rebind resources to TransactionSynchronizationManager if managing any. - * @see ReactiveTransactionSynchronizationManager#bindResource + * @see TransactionSynchronizationManager#bindResource */ default Mono resume() { return Mono.empty(); diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronizationManager.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionSynchronizationManager.java similarity index 90% rename from spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronizationManager.java rename to spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionSynchronizationManager.java index d8448e93c9d..8db41c7dd23 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronizationManager.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionSynchronizationManager.java @@ -67,29 +67,29 @@ import org.springframework.util.Assert; * @since 5.2 * @see #isSynchronizationActive * @see #registerSynchronization - * @see ReactiveTransactionSynchronization + * @see TransactionSynchronization */ -public class ReactiveTransactionSynchronizationManager { +public class TransactionSynchronizationManager { - private static final Log logger = LogFactory.getLog(ReactiveTransactionSynchronizationManager.class); + private static final Log logger = LogFactory.getLog(TransactionSynchronizationManager.class); private final TransactionContext transactionContext; - public ReactiveTransactionSynchronizationManager(TransactionContext transactionContext) { + public TransactionSynchronizationManager(TransactionContext transactionContext) { this.transactionContext = transactionContext; } /** - * Return the ReactiveTransactionSynchronizationManager of the current transaction. + * Return the TransactionSynchronizationManager of the current transaction. * Mainly intended for code that wants to bind resources or synchronizations. * rollback-only but not throw an application exception. * @throws NoTransactionException if the transaction info cannot be found, * because the method was invoked outside a managed transaction. */ - public static Mono currentTransaction() { - return TransactionContextManager.currentContext().map(ReactiveTransactionSynchronizationManager::new); + public static Mono currentTransaction() { + return TransactionContextManager.currentContext().map(TransactionSynchronizationManager::new); } /** @@ -98,7 +98,7 @@ public class ReactiveTransactionSynchronizationManager { * @return if there is a value bound to the current thread */ public boolean hasResource(Object key) { - Object actualKey = ReactiveTransactionSynchronizationUtils.unwrapResourceIfNecessary(key); + Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Object value = doGetResource(actualKey); return (value != null); } @@ -111,7 +111,7 @@ public class ReactiveTransactionSynchronizationManager { */ @Nullable public Object getResource(Object key) { - Object actualKey = ReactiveTransactionSynchronizationUtils.unwrapResourceIfNecessary(key); + Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Object value = doGetResource(actualKey); if (value != null && logger.isTraceEnabled()) { logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to context [" + @@ -137,7 +137,7 @@ public class ReactiveTransactionSynchronizationManager { * @throws IllegalStateException if there is already a value bound to the context */ public void bindResource(Object key, Object value) throws IllegalStateException { - Object actualKey = ReactiveTransactionSynchronizationUtils.unwrapResourceIfNecessary(key); + Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Assert.notNull(value, "Value must not be null"); Map map = this.transactionContext.getResources(); Object oldValue = map.put(actualKey, value); @@ -158,7 +158,7 @@ public class ReactiveTransactionSynchronizationManager { * @throws IllegalStateException if there is no value bound to the context */ public Object unbindResource(Object key) throws IllegalStateException { - Object actualKey = ReactiveTransactionSynchronizationUtils.unwrapResourceIfNecessary(key); + Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Object value = doUnbindResource(actualKey); if (value == null) { throw new IllegalStateException( @@ -174,7 +174,7 @@ public class ReactiveTransactionSynchronizationManager { */ @Nullable public Object unbindResourceIfPossible(Object key) { - Object actualKey = ReactiveTransactionSynchronizationUtils.unwrapResourceIfNecessary(key); + Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); return doUnbindResource(actualKey); } @@ -229,11 +229,11 @@ public class ReactiveTransactionSynchronizationManager { * @throws IllegalStateException if transaction synchronization is not active * @see org.springframework.core.Ordered */ - public void registerSynchronization(ReactiveTransactionSynchronization synchronization) + public void registerSynchronization(TransactionSynchronization synchronization) throws IllegalStateException { Assert.notNull(synchronization, "TransactionSynchronization must not be null"); - Set synchs = this.transactionContext.getSynchronizations(); + Set synchs = this.transactionContext.getSynchronizations(); if (synchs == null) { throw new IllegalStateException("Transaction synchronization is not active"); } @@ -245,10 +245,10 @@ public class ReactiveTransactionSynchronizationManager { * for the current context. * @return unmodifiable List of TransactionSynchronization instances * @throws IllegalStateException if synchronization is not active - * @see ReactiveTransactionSynchronization + * @see TransactionSynchronization */ - public List getSynchronizations() throws IllegalStateException { - Set synchs = this.transactionContext.getSynchronizations(); + public List getSynchronizations() throws IllegalStateException { + Set synchs = this.transactionContext.getSynchronizations(); if (synchs == null) { throw new IllegalStateException("Transaction synchronization is not active"); } @@ -260,7 +260,7 @@ public class ReactiveTransactionSynchronizationManager { } else { // Sort lazily here, not in registerSynchronization. - List sortedSynchs = new ArrayList<>(synchs); + List sortedSynchs = new ArrayList<>(synchs); AnnotationAwareOrderComparator.sort(sortedSynchs); return Collections.unmodifiableList(sortedSynchs); } @@ -325,7 +325,7 @@ public class ReactiveTransactionSynchronizationManager { * to suppress change detection on commit. The present method is meant * to be used for earlier read-only checks. * @see org.springframework.transaction.TransactionDefinition#isReadOnly() - * @see ReactiveTransactionSynchronization#beforeCommit(boolean) + * @see TransactionSynchronization#beforeCommit(boolean) */ public boolean isCurrentTransactionReadOnly() { return this.transactionContext.isCurrentTransactionReadOnly(); diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronizationUtils.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionSynchronizationUtils.java similarity index 59% rename from spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronizationUtils.java rename to spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionSynchronizationUtils.java index 0913bdae360..85064218fb6 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronizationUtils.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionSynchronizationUtils.java @@ -29,21 +29,21 @@ import org.springframework.util.Assert; import org.springframework.util.ClassUtils; /** - * Utility methods for triggering specific {@link ReactiveTransactionSynchronization} + * Utility methods for triggering specific {@link TransactionSynchronization} * callback methods on all currently registered synchronizations. * * @author Mark Paluch * @author Juergen Hoeller * @since 5.2 - * @see ReactiveTransactionSynchronization - * @see ReactiveTransactionSynchronizationManager#getSynchronizations() + * @see TransactionSynchronization + * @see TransactionSynchronizationManager#getSynchronizations() */ -abstract class ReactiveTransactionSynchronizationUtils { +abstract class TransactionSynchronizationUtils { - private static final Log logger = LogFactory.getLog(ReactiveTransactionSynchronizationUtils.class); + private static final Log logger = LogFactory.getLog(TransactionSynchronizationUtils.class); private static final boolean aopAvailable = ClassUtils.isPresent( - "org.springframework.aop.scope.ScopedObject", ReactiveTransactionSynchronizationUtils.class.getClassLoader()); + "org.springframework.aop.scope.ScopedObject", TransactionSynchronizationUtils.class.getClassLoader()); /** @@ -68,51 +68,51 @@ abstract class ReactiveTransactionSynchronizationUtils { /** * Actually invoke the {@code triggerBeforeCommit} methods of the - * given Spring ReactiveTransactionSynchronization objects. - * @param synchronizations a List of ReactiveTransactionSynchronization objects - * @see ReactiveTransactionSynchronization#beforeCommit(boolean) + * given Spring TransactionSynchronization objects. + * @param synchronizations a List of TransactionSynchronization objects + * @see TransactionSynchronization#beforeCommit(boolean) */ - public static Mono triggerBeforeCommit(Collection synchronizations, boolean readOnly) { + public static Mono triggerBeforeCommit(Collection synchronizations, boolean readOnly) { return Flux.fromIterable(synchronizations).concatMap(it -> it.beforeCommit(readOnly)).then(); } /** * Actually invoke the {@code beforeCompletion} methods of the - * given Spring ReactiveTransactionSynchronization objects. - * @param synchronizations a List of ReactiveTransactionSynchronization objects - * @see ReactiveTransactionSynchronization#beforeCompletion() + * given Spring TransactionSynchronization objects. + * @param synchronizations a List of TransactionSynchronization objects + * @see TransactionSynchronization#beforeCompletion() */ - public static Mono triggerBeforeCompletion(Collection synchronizations) { + public static Mono triggerBeforeCompletion(Collection synchronizations) { return Flux.fromIterable(synchronizations) - .concatMap(ReactiveTransactionSynchronization::beforeCompletion).onErrorContinue((t, o) -> + .concatMap(TransactionSynchronization::beforeCompletion).onErrorContinue((t, o) -> logger.error("TransactionSynchronization.beforeCompletion threw exception", t)).then(); } /** * Actually invoke the {@code afterCommit} methods of the - * given Spring ReactiveTransactionSynchronization objects. - * @param synchronizations a List of ReactiveTransactionSynchronization objects - * @see ReactiveTransactionSynchronization#afterCommit() + * given Spring TransactionSynchronization objects. + * @param synchronizations a List of TransactionSynchronization objects + * @see TransactionSynchronization#afterCommit() */ - public static Mono invokeAfterCommit(Collection synchronizations) { + public static Mono invokeAfterCommit(Collection synchronizations) { return Flux.fromIterable(synchronizations) - .concatMap(ReactiveTransactionSynchronization::afterCommit) + .concatMap(TransactionSynchronization::afterCommit) .then(); } /** * Actually invoke the {@code afterCompletion} methods of the - * given Spring ReactiveTransactionSynchronization objects. - * @param synchronizations a List of ReactiveTransactionSynchronization objects + * given Spring TransactionSynchronization objects. + * @param synchronizations a List of TransactionSynchronization objects * @param completionStatus the completion status according to the - * constants in the ReactiveTransactionSynchronization interface - * @see ReactiveTransactionSynchronization#afterCompletion(int) - * @see ReactiveTransactionSynchronization#STATUS_COMMITTED - * @see ReactiveTransactionSynchronization#STATUS_ROLLED_BACK - * @see ReactiveTransactionSynchronization#STATUS_UNKNOWN + * constants in the TransactionSynchronization interface + * @see TransactionSynchronization#afterCompletion(int) + * @see TransactionSynchronization#STATUS_COMMITTED + * @see TransactionSynchronization#STATUS_ROLLED_BACK + * @see TransactionSynchronization#STATUS_UNKNOWN */ public static Mono invokeAfterCompletion( - Collection synchronizations, int completionStatus) { + Collection synchronizations, int completionStatus) { return Flux.fromIterable(synchronizations).concatMap(it -> it.afterCompletion(completionStatus)) .onErrorContinue((t, o) -> logger.error("TransactionSynchronization.afterCompletion threw exception", t)).then(); diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java index 34382e2c2db..557d3e90f0c 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java @@ -56,7 +56,7 @@ public interface TransactionalOperator { static TransactionalOperator create( ReactiveTransactionManager transactionManager, TransactionDefinition transactionDefinition){ - return new DefaultTransactionalOperator(transactionManager, transactionDefinition); + return new TransactionalOperatorImpl(transactionManager, transactionDefinition); } @@ -93,6 +93,6 @@ public interface TransactionalOperator { * @throws TransactionException in case of initialization, rollback, or system errors * @throws RuntimeException if thrown by the TransactionCallback */ - Flux execute(ReactiveTransactionCallback action) throws TransactionException; + Flux execute(TransactionCallback action) throws TransactionException; } diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/DefaultTransactionalOperator.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java similarity index 82% rename from spring-tx/src/main/java/org/springframework/transaction/reactive/DefaultTransactionalOperator.java rename to spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java index c1285fdf94a..ea2ab1d7dbc 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/DefaultTransactionalOperator.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java @@ -21,8 +21,8 @@ import org.apache.commons.logging.LogFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import org.springframework.transaction.ReactiveTransaction; import org.springframework.transaction.ReactiveTransactionManager; -import org.springframework.transaction.ReactiveTransactionStatus; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionException; import org.springframework.transaction.TransactionSystemException; @@ -39,9 +39,9 @@ import org.springframework.util.Assert; * @see ReactiveTransactionManager */ @SuppressWarnings("serial") -final class DefaultTransactionalOperator implements TransactionalOperator { +final class TransactionalOperatorImpl implements TransactionalOperator { - private static final Log logger = LogFactory.getLog(DefaultTransactionalOperator.class); + private static final Log logger = LogFactory.getLog(TransactionalOperatorImpl.class); private final ReactiveTransactionManager transactionManager; @@ -55,7 +55,7 @@ final class DefaultTransactionalOperator implements TransactionalOperator { * @param transactionDefinition the transaction definition to copy the * default settings from. Local properties can still be set to change values. */ - DefaultTransactionalOperator(ReactiveTransactionManager transactionManager, TransactionDefinition transactionDefinition) { + TransactionalOperatorImpl(ReactiveTransactionManager transactionManager, TransactionDefinition transactionDefinition) { Assert.notNull(transactionManager, "ReactiveTransactionManager must not be null"); Assert.notNull(transactionManager, "TransactionDefinition must not be null"); this.transactionManager = transactionManager; @@ -72,9 +72,9 @@ final class DefaultTransactionalOperator implements TransactionalOperator { @Override - public Flux execute(ReactiveTransactionCallback action) throws TransactionException { + public Flux execute(TransactionCallback action) throws TransactionException { return TransactionContextManager.currentContext().flatMapMany(context -> { - Mono status = this.transactionManager.getTransaction(this.transactionDefinition); + Mono status = this.transactionManager.getReactiveTransaction(this.transactionDefinition); return status.flatMapMany(it -> { // This is an around advice: Invoke the next interceptor in the chain. // This will normally result in a target object being invoked. @@ -98,7 +98,7 @@ final class DefaultTransactionalOperator implements TransactionalOperator { * @param ex the thrown application exception or error * @throws TransactionException in case of a rollback error */ - private Mono rollbackOnException(ReactiveTransactionStatus status, Throwable ex) throws TransactionException { + private Mono rollbackOnException(ReactiveTransaction status, Throwable ex) throws TransactionException { logger.debug("Initiating transaction rollback on application exception", ex); return this.transactionManager.rollback(status).onErrorMap(ex2 -> { logger.error("Application exception overridden by rollback exception", ex); @@ -113,8 +113,8 @@ final class DefaultTransactionalOperator implements TransactionalOperator { @Override public boolean equals(Object other) { - return (this == other || (super.equals(other) && (!(other instanceof DefaultTransactionalOperator) || - getTransactionManager() == ((DefaultTransactionalOperator) other).getTransactionManager()))); + return (this == other || (super.equals(other) && (!(other instanceof TransactionalOperatorImpl) || + getTransactionManager() == ((TransactionalOperatorImpl) other).getTransactionManager()))); } @Override diff --git a/spring-tx/src/main/java/org/springframework/transaction/support/AbstractTransactionStatus.java b/spring-tx/src/main/java/org/springframework/transaction/support/AbstractTransactionStatus.java index f82c96a56c4..97d9cd33483 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/support/AbstractTransactionStatus.java +++ b/spring-tx/src/main/java/org/springframework/transaction/support/AbstractTransactionStatus.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -55,7 +55,7 @@ public abstract class AbstractTransactionStatus implements TransactionStatus { //--------------------------------------------------------------------- - // Handling of current transaction state + // Implementation of TransactionExecution //--------------------------------------------------------------------- @Override @@ -93,13 +93,6 @@ public abstract class AbstractTransactionStatus implements TransactionStatus { return false; } - /** - * This implementations is empty, considering flush as a no-op. - */ - @Override - public void flush() { - } - /** * Mark this transaction as completed, that is, committed or rolled back. */ @@ -117,6 +110,11 @@ public abstract class AbstractTransactionStatus implements TransactionStatus { // Handling of current savepoint state //--------------------------------------------------------------------- + @Override + public boolean hasSavepoint() { + return (this.savepoint != null); + } + /** * Set a savepoint for this transaction. Useful for PROPAGATION_NESTED. * @see org.springframework.transaction.TransactionDefinition#PROPAGATION_NESTED @@ -133,11 +131,6 @@ public abstract class AbstractTransactionStatus implements TransactionStatus { return this.savepoint; } - @Override - public boolean hasSavepoint() { - return (this.savepoint != null); - } - /** * Create a savepoint and hold it for the transaction. * @throws org.springframework.transaction.NestedTransactionNotSupportedException @@ -223,4 +216,16 @@ public abstract class AbstractTransactionStatus implements TransactionStatus { throw new NestedTransactionNotSupportedException("This transaction does not support savepoints"); } + + //--------------------------------------------------------------------- + // Flushing support + //--------------------------------------------------------------------- + + /** + * This implementations is empty, considering flush as a no-op. + */ + @Override + public void flush() { + } + } diff --git a/spring-tx/src/main/java/org/springframework/transaction/support/DefaultTransactionStatus.java b/spring-tx/src/main/java/org/springframework/transaction/support/DefaultTransactionStatus.java index 574789a2e49..5ee90ce3c39 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/support/DefaultTransactionStatus.java +++ b/spring-tx/src/main/java/org/springframework/transaction/support/DefaultTransactionStatus.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -165,18 +165,6 @@ public class DefaultTransactionStatus extends AbstractTransactionStatus { ((SmartTransactionObject) this.transaction).isRollbackOnly()); } - /** - * Delegate the flushing to the transaction object, provided that the latter - * implements the {@link SmartTransactionObject} interface. - * @see SmartTransactionObject#flush() - */ - @Override - public void flush() { - if (this.transaction instanceof SmartTransactionObject) { - ((SmartTransactionObject) this.transaction).flush(); - } - } - /** * This implementation exposes the {@link SavepointManager} interface * of the underlying transaction object, if any. @@ -203,4 +191,16 @@ public class DefaultTransactionStatus extends AbstractTransactionStatus { return (this.transaction instanceof SavepointManager); } + /** + * Delegate the flushing to the transaction object, provided that the latter + * implements the {@link SmartTransactionObject} interface. + * @see SmartTransactionObject#flush() + */ + @Override + public void flush() { + if (this.transaction instanceof SmartTransactionObject) { + ((SmartTransactionObject) this.transaction).flush(); + } + } + } diff --git a/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTestTransactionManager.java b/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTestTransactionManager.java index b6b4e58b547..d1045e5e2d8 100644 --- a/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTestTransactionManager.java +++ b/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTestTransactionManager.java @@ -52,7 +52,7 @@ class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager @Override - protected Object doGetTransaction(ReactiveTransactionSynchronizationManager synchronizationManager) { + protected Object doGetTransaction(TransactionSynchronizationManager synchronizationManager) { return TRANSACTION; } @@ -62,7 +62,7 @@ class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager } @Override - protected Mono doBegin(ReactiveTransactionSynchronizationManager synchronizationManager, Object transaction, TransactionDefinition definition) { + protected Mono doBegin(TransactionSynchronizationManager synchronizationManager, Object transaction, TransactionDefinition definition) { if (!TRANSACTION.equals(transaction)) { return Mono.error(new IllegalArgumentException("Not the same transaction object")); } @@ -73,7 +73,7 @@ class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager } @Override - protected Mono doCommit(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) { + protected Mono doCommit(TransactionSynchronizationManager synchronizationManager, GenericReactiveTransaction status) { if (!TRANSACTION.equals(status.getTransaction())) { return Mono.error(new IllegalArgumentException("Not the same transaction object")); } @@ -81,7 +81,7 @@ class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager } @Override - protected Mono doRollback(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) { + protected Mono doRollback(TransactionSynchronizationManager synchronizationManager, GenericReactiveTransaction status) { if (!TRANSACTION.equals(status.getTransaction())) { return Mono.error(new IllegalArgumentException("Not the same transaction object")); } @@ -89,7 +89,7 @@ class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager } @Override - protected Mono doSetRollbackOnly(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) { + protected Mono doSetRollbackOnly(TransactionSynchronizationManager synchronizationManager, GenericReactiveTransaction status) { if (!TRANSACTION.equals(status.getTransaction())) { return Mono.error(new IllegalArgumentException("Not the same transaction object")); } diff --git a/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTransactionSupportUnitTests.java b/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTransactionSupportTests.java similarity index 82% rename from spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTransactionSupportUnitTests.java rename to spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTransactionSupportTests.java index 5a3b2691610..d14d0a4aa81 100644 --- a/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTransactionSupportUnitTests.java +++ b/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTransactionSupportTests.java @@ -23,7 +23,7 @@ import reactor.test.StepVerifier; import org.springframework.transaction.IllegalTransactionStateException; import org.springframework.transaction.ReactiveTransactionManager; -import org.springframework.transaction.ReactiveTransactionStatus; +import org.springframework.transaction.ReactiveTransaction; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.support.DefaultTransactionDefinition; @@ -34,27 +34,27 @@ import static org.junit.Assert.*; * * @author Mark Paluch */ -public class ReactiveTransactionSupportUnitTests { +public class ReactiveTransactionSupportTests { @Test public void noExistingTransaction() { ReactiveTransactionManager tm = new ReactiveTestTransactionManager(false, true); - tm.getTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_SUPPORTS)) - .subscriberContext(TransactionContextManager.createTransactionContext()).cast(DefaultReactiveTransactionStatus.class) + tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_SUPPORTS)) + .subscriberContext(TransactionContextManager.createTransactionContext()).cast(GenericReactiveTransaction.class) .as(StepVerifier::create).consumeNextWith(actual -> { assertFalse(actual.hasTransaction()); }).verifyComplete(); - tm.getTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED)) - .cast(DefaultReactiveTransactionStatus.class).subscriberContext(TransactionContextManager.createTransactionContext()) + tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED)) + .cast(GenericReactiveTransaction.class).subscriberContext(TransactionContextManager.createTransactionContext()) .as(StepVerifier::create).consumeNextWith(actual -> { assertTrue(actual.hasTransaction()); assertTrue(actual.isNewTransaction()); }).verifyComplete(); - tm.getTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_MANDATORY)) - .subscriberContext(TransactionContextManager.createTransactionContext()).cast(DefaultReactiveTransactionStatus.class) + tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_MANDATORY)) + .subscriberContext(TransactionContextManager.createTransactionContext()).cast(GenericReactiveTransaction.class) .as(StepVerifier::create).expectError(IllegalTransactionStateException.class).verify(); } @@ -62,22 +62,22 @@ public class ReactiveTransactionSupportUnitTests { public void existingTransaction() { ReactiveTransactionManager tm = new ReactiveTestTransactionManager(true, true); - tm.getTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_SUPPORTS)) - .subscriberContext(TransactionContextManager.createTransactionContext()).cast(DefaultReactiveTransactionStatus.class) + tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_SUPPORTS)) + .subscriberContext(TransactionContextManager.createTransactionContext()).cast(GenericReactiveTransaction.class) .as(StepVerifier::create).consumeNextWith(actual -> { assertNotNull(actual.getTransaction()); assertFalse(actual.isNewTransaction()); }).verifyComplete(); - tm.getTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED)) - .subscriberContext(TransactionContextManager.createTransactionContext()).cast(DefaultReactiveTransactionStatus.class) + tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED)) + .subscriberContext(TransactionContextManager.createTransactionContext()).cast(GenericReactiveTransaction.class) .as(StepVerifier::create).consumeNextWith(actual -> { assertNotNull(actual.getTransaction()); assertFalse(actual.isNewTransaction()); }).verifyComplete(); - tm.getTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_MANDATORY)) - .subscriberContext(TransactionContextManager.createTransactionContext()).cast(DefaultReactiveTransactionStatus.class) + tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_MANDATORY)) + .subscriberContext(TransactionContextManager.createTransactionContext()).cast(GenericReactiveTransaction.class) .as(StepVerifier::create).consumeNextWith(actual -> { assertNotNull(actual.getTransaction()); assertFalse(actual.isNewTransaction()); @@ -87,7 +87,7 @@ public class ReactiveTransactionSupportUnitTests { @Test public void commitWithoutExistingTransaction() { ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true); - tm.getTransaction(new DefaultTransactionDefinition()).flatMap(tm::commit) + tm.getReactiveTransaction(new DefaultTransactionDefinition()).flatMap(tm::commit) .subscriberContext(TransactionContextManager.createTransactionContext()) .as(StepVerifier::create).verifyComplete(); @@ -100,7 +100,7 @@ public class ReactiveTransactionSupportUnitTests { @Test public void rollbackWithoutExistingTransaction() { ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true); - tm.getTransaction(new DefaultTransactionDefinition()).flatMap(tm::rollback) + tm.getReactiveTransaction(new DefaultTransactionDefinition()).flatMap(tm::rollback) .subscriberContext(TransactionContextManager.createTransactionContext()).as(StepVerifier::create) .verifyComplete(); @@ -113,7 +113,7 @@ public class ReactiveTransactionSupportUnitTests { @Test public void rollbackOnlyWithoutExistingTransaction() { ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true); - tm.getTransaction(new DefaultTransactionDefinition()).doOnNext(ReactiveTransactionStatus::setRollbackOnly) + tm.getReactiveTransaction(new DefaultTransactionDefinition()).doOnNext(ReactiveTransaction::setRollbackOnly) .flatMap(tm::commit) .subscriberContext(TransactionContextManager.createTransactionContext()).as(StepVerifier::create) .verifyComplete(); @@ -127,7 +127,7 @@ public class ReactiveTransactionSupportUnitTests { @Test public void commitWithExistingTransaction() { ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(true, true); - tm.getTransaction(new DefaultTransactionDefinition()).flatMap(tm::commit) + tm.getReactiveTransaction(new DefaultTransactionDefinition()).flatMap(tm::commit) .subscriberContext(TransactionContextManager.createTransactionContext()) .as(StepVerifier::create).verifyComplete(); @@ -140,7 +140,7 @@ public class ReactiveTransactionSupportUnitTests { @Test public void rollbackWithExistingTransaction() { ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(true, true); - tm.getTransaction(new DefaultTransactionDefinition()).flatMap(tm::rollback) + tm.getReactiveTransaction(new DefaultTransactionDefinition()).flatMap(tm::rollback) .subscriberContext(TransactionContextManager.createTransactionContext()).as(StepVerifier::create) .verifyComplete(); @@ -153,7 +153,7 @@ public class ReactiveTransactionSupportUnitTests { @Test public void rollbackOnlyWithExistingTransaction() { ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(true, true); - tm.getTransaction(new DefaultTransactionDefinition()).doOnNext(ReactiveTransactionStatus::setRollbackOnly).flatMap(tm::commit) + tm.getReactiveTransaction(new DefaultTransactionDefinition()).doOnNext(ReactiveTransaction::setRollbackOnly).flatMap(tm::commit) .subscriberContext(TransactionContextManager.createTransactionContext()).as(StepVerifier::create) .verifyComplete();