Browse Source

Add reactive transaction support SPI

This commit adds SPI interfaces to support reactive transactions
through spring-tx with optional dependencies to Project Reactor and
supportive implementations for TransactionalOperator and
AbstractReactiveTransactionManager.
pull/22870/head
Mark Paluch 7 years ago committed by Juergen Hoeller
parent
commit
beea83b9d2
  1. 8
      spring-tx/spring-tx.gradle
  2. 107
      spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionManager.java
  3. 78
      spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionStatus.java
  4. 1370
      spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java
  5. 105
      spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionStatus.java
  6. 138
      spring-tx/src/main/java/org/springframework/transaction/reactive/DefaultReactiveTransactionStatus.java
  7. 137
      spring-tx/src/main/java/org/springframework/transaction/reactive/DefaultTransactionalOperator.java
  8. 215
      spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveResourceHolderSynchronization.java
  9. 56
      spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionCallback.java
  10. 152
      spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronization.java
  11. 446
      spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronizationManager.java
  12. 202
      spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronizationUtils.java
  13. 140
      spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContext.java
  14. 73
      spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContextHolder.java
  15. 124
      spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContextManager.java
  16. 114
      spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java
  17. 11
      spring-tx/src/main/java/org/springframework/transaction/reactive/package-info.java
  18. 99
      spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTestTransactionManager.java
  19. 228
      spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTransactionSupportUnitTests.java
  20. 88
      spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java

8
spring-tx/spring-tx.gradle

@ -1,5 +1,11 @@ @@ -1,5 +1,11 @@
description = "Spring Transaction"
dependencyManagement {
imports {
mavenBom "io.projectreactor:reactor-bom:${reactorVersion}"
}
}
dependencies {
compile(project(":spring-beans"))
compile(project(":spring-core"))
@ -10,8 +16,10 @@ dependencies { @@ -10,8 +16,10 @@ dependencies {
optional("javax.resource:javax.resource-api:1.7.1")
optional("javax.transaction:javax.transaction-api:1.3")
optional("com.ibm.websphere:uow:6.0.2.17")
optional("io.projectreactor:reactor-core")
optional("io.vavr:vavr:0.10.0")
testCompile("org.aspectj:aspectjweaver:${aspectjVersion}")
testCompile("org.codehaus.groovy:groovy:${groovyVersion}")
testCompile("org.eclipse.persistence:javax.persistence:2.2.0")
testCompile("io.projectreactor:reactor-test")
}

107
spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionManager.java

@ -0,0 +1,107 @@ @@ -0,0 +1,107 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.transaction;
import reactor.core.publisher.Mono;
import org.springframework.lang.Nullable;
/**
* This is the central interface in Spring's reactive transaction infrastructure.
* Applications can use this directly, but it is not primarily meant as API:
* Typically, applications will work with either transactional operators or
* declarative transaction demarcation through AOP.
*
* @author Mark Paluch
* @since 5.2
* @see org.springframework.transaction.interceptor.TransactionProxyFactoryBean
*/
public interface ReactiveTransactionManager {
/**
* Emit a currently active transaction or create a new one, according to
* the specified propagation behavior.
* <p>Note that parameters like isolation level or timeout will only be applied
* to new transactions, and thus be ignored when participating in active ones.
* <p>Furthermore, not all transaction definition settings will be supported
* by every transaction manager: A proper transaction manager implementation
* should throw an exception when unsupported settings are encountered.
* <p>An exception to the above rule is the read-only flag, which should be
* ignored if no explicit read-only mode is supported. Essentially, the
* read-only flag is just a hint for potential optimization.
* @param definition the TransactionDefinition instance (can be empty for defaults),
* describing propagation behavior, isolation level, timeout etc.
* @return transaction status object representing the new or current transaction
* @throws TransactionException in case of lookup, creation, or system errors
* @throws IllegalTransactionStateException if the given transaction definition
* cannot be executed (for example, if a currently active transaction is in
* conflict with the specified propagation behavior)
* @see TransactionDefinition#getPropagationBehavior
* @see TransactionDefinition#getIsolationLevel
* @see TransactionDefinition#getTimeout
* @see TransactionDefinition#isReadOnly
*/
Mono<ReactiveTransactionStatus> getTransaction(@Nullable TransactionDefinition definition) throws TransactionException;
/**
* Commit the given transaction, with regard to its status. If the transaction
* has been marked rollback-only programmatically, perform a rollback.
* <p>If the transaction wasn't a new one, omit the commit for proper
* participation in the surrounding transaction. If a previous transaction
* has been suspended to be able to create a new one, resume the previous
* transaction after committing the new one.
* <p>Note that when the commit call completes, no matter if normally or
* throwing an exception, the transaction must be fully completed and
* cleaned up. No rollback call should be expected in such a case.
* <p>If this method throws an exception other than a TransactionException,
* then some before-commit error caused the commit attempt to fail. For
* example, an O/R Mapping tool might have tried to flush changes to the
* database right before commit, with the resulting DataAccessException
* causing the transaction to fail. The original exception will be
* propagated to the caller of this commit method in such a case.
* @param status object returned by the {@code getTransaction} method
* @throws UnexpectedRollbackException in case of an unexpected rollback
* that the transaction coordinator initiated
* @throws HeuristicCompletionException in case of a transaction failure
* caused by a heuristic decision on the side of the transaction coordinator
* @throws TransactionSystemException in case of commit or system errors
* (typically caused by fundamental resource failures)
* @throws IllegalTransactionStateException if the given transaction
* is already completed (that is, committed or rolled back)
* @see ReactiveTransactionStatus#setRollbackOnly
*/
Mono<Void> commit(ReactiveTransactionStatus status) throws TransactionException;
/**
* Perform a rollback of the given transaction.
* <p>If the transaction wasn't a new one, just set it rollback-only for proper
* participation in the surrounding transaction. If a previous transaction
* has been suspended to be able to create a new one, resume the previous
* transaction after rolling back the new one.
* <p><b>Do not call rollback on a transaction if commit threw an exception.</b>
* The transaction will already have been completed and cleaned up when commit
* returns, even in case of a commit exception. Consequently, a rollback call
* after commit failure will lead to an IllegalTransactionStateException.
* @param status object returned by the {@code getTransaction} method
* @throws TransactionSystemException in case of rollback or system errors
* (typically caused by fundamental resource failures)
* @throws IllegalTransactionStateException if the given transaction
* is already completed (that is, committed or rolled back)
*/
Mono<Void> rollback(ReactiveTransactionStatus status) throws TransactionException;
}

78
spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionStatus.java

@ -0,0 +1,78 @@ @@ -0,0 +1,78 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.transaction;
import reactor.core.publisher.Mono;
/**
* Representation of the status of a transaction exposing a reactive
* interface.
*
* <p>Transactional code can use this to retrieve status information,
* and to programmatically request a rollback (instead of throwing
* an exception that causes an implicit rollback).
*
* @author Mark Paluch
* @since 5.2
* @see #setRollbackOnly()
* @see ReactiveTransactionManager#getTransaction
*/
public interface ReactiveTransactionStatus {
/**
* Return whether the present transaction is new; otherwise participating
* in an existing transaction, or potentially not running in an actual
* transaction in the first place.
*/
boolean isNewTransaction();
/**
* Set the transaction rollback-only. This instructs the transaction manager
* that the only possible outcome of the transaction may be a rollback, as
* alternative to throwing an exception which would in turn trigger a rollback.
* <p>This is mainly intended for transactions managed by
* {@link org.springframework.transaction.reactive.support.TransactionalOperator} or
* {@link org.springframework.transaction.interceptor.ReactiveTransactionInterceptor},
* where the actual commit/rollback decision is made by the container.
* @see org.springframework.transaction.reactive.support.ReactiveTransactionCallback#doInTransaction
* @see org.springframework.transaction.interceptor.TransactionAttribute#rollbackOn
*/
void setRollbackOnly();
/**
* Return whether the transaction has been marked as rollback-only
* (either by the application or by the transaction infrastructure).
*/
boolean isRollbackOnly();
/**
* Flush the underlying session to the datastore, if applicable.
* <p>This is effectively just a hint and may be a no-op if the underlying
* transaction manager does not have a flush concept. A flush signal may
* get applied to the primary resource or to transaction synchronizations,
* depending on the underlying resource.
*/
Mono<Void> flush();
/**
* Return whether this transaction is completed, that is,
* whether it has already been committed or rolled back.
* @see ReactiveTransactionManager#commit
* @see ReactiveTransactionManager#rollback
*/
boolean isCompleted();
}

1370
spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java

File diff suppressed because it is too large Load Diff

105
spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionStatus.java

@ -0,0 +1,105 @@ @@ -0,0 +1,105 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.transaction.reactive;
import reactor.core.publisher.Mono;
import org.springframework.transaction.ReactiveTransactionStatus;
/**
* Abstract base implementation of the {@link ReactiveTransactionStatus} interface.
*
* <p>Pre-implements the handling of local rollback-only and completed flags.
*
* <p>Does not assume any specific internal transaction handling, such as an
* underlying transaction object, and no transaction synchronization mechanism.
*
* @author Mark Paluch
* @since 5.2
* @see #setRollbackOnly()
* @see #isRollbackOnly()
* @see #setCompleted()
* @see #isCompleted()
* @see DefaultReactiveTransactionStatus
*/
public abstract class AbstractReactiveTransactionStatus implements ReactiveTransactionStatus {
private boolean rollbackOnly = false;
private boolean completed = false;
//---------------------------------------------------------------------
// Handling of current transaction state
//---------------------------------------------------------------------
@Override
public void setRollbackOnly() {
this.rollbackOnly = true;
}
/**
* Determine the rollback-only flag via checking both the local rollback-only flag
* of this TransactionStatus and the global rollback-only flag of the underlying
* transaction, if any.
* @see #isLocalRollbackOnly()
* @see #isGlobalRollbackOnly()
*/
@Override
public boolean isRollbackOnly() {
return (isLocalRollbackOnly() || isGlobalRollbackOnly());
}
/**
* Determine the rollback-only flag via checking this ReactiveTransactionStatus.
* <p>Will only return "true" if the application called {@code setRollbackOnly}
* on this TransactionStatus object.
*/
public boolean isLocalRollbackOnly() {
return this.rollbackOnly;
}
/**
* Template method for determining the global rollback-only flag of the
* underlying transaction, if any.
* <p>This implementation always returns {@code false}.
*/
public boolean isGlobalRollbackOnly() {
return false;
}
/**
* This implementations is empty, considering flush as a no-op.
*/
@Override
public Mono<Void> flush() {
return Mono.empty();
}
/**
* Mark this transaction as completed, that is, committed or rolled back.
*/
public void setCompleted() {
this.completed = true;
}
@Override
public boolean isCompleted() {
return this.completed;
}
}

138
spring-tx/src/main/java/org/springframework/transaction/reactive/DefaultReactiveTransactionStatus.java

@ -0,0 +1,138 @@ @@ -0,0 +1,138 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.transaction.reactive;
import org.springframework.lang.Nullable;
import org.springframework.transaction.ReactiveTransactionStatus;
import org.springframework.util.Assert;
/**
* Default implementation of the {@link ReactiveTransactionStatus}
* interface, used by {@link AbstractReactiveTransactionManager}. Based on the concept
* of an underlying "transaction object".
*
* <p>Holds all status information that {@link AbstractReactiveTransactionManager}
* needs internally, including a generic transaction object determined by the
* concrete transaction manager implementation.
*
* <p><b>NOTE:</b> This is <i>not</i> intended for use with other ReactiveTransactionManager
* implementations, in particular not for mock transaction managers in testing environments.
*
* @author Mark Paluch
* @since 5.2
* @see AbstractReactiveTransactionManager
* @see #getTransaction
*/
public class DefaultReactiveTransactionStatus extends AbstractReactiveTransactionStatus {
@Nullable
private final Object transaction;
private final boolean newTransaction;
private final boolean newSynchronization;
private final boolean readOnly;
private final boolean debug;
@Nullable
private final Object suspendedResources;
/**
* Create a new {@code DefaultReactiveTransactionStatus} instance.
* @param transaction underlying transaction object that can hold state
* for the internal transaction implementation
* @param newTransaction if the transaction is new, otherwise participating
* in an existing transaction
* @param newSynchronization if a new transaction synchronization has been
* opened for the given transaction
* @param readOnly whether the transaction is marked as read-only
* @param debug should debug logging be enabled for the handling of this transaction?
* Caching it in here can prevent repeated calls to ask the logging system whether
* debug logging should be enabled.
* @param suspendedResources a holder for resources that have been suspended
* for this transaction, if any
*/
public DefaultReactiveTransactionStatus(
@Nullable Object transaction, boolean newTransaction, boolean newSynchronization,
boolean readOnly, boolean debug, @Nullable Object suspendedResources) {
this.transaction = transaction;
this.newTransaction = newTransaction;
this.newSynchronization = newSynchronization;
this.readOnly = readOnly;
this.debug = debug;
this.suspendedResources = suspendedResources;
}
/**
* Return the underlying transaction object.
* @throws IllegalStateException if no transaction is active
*/
public Object getTransaction() {
Assert.state(this.transaction != null, "No transaction active");
return this.transaction;
}
/**
* Return whether there is an actual transaction active.
*/
public boolean hasTransaction() {
return (this.transaction != null);
}
@Override
public boolean isNewTransaction() {
return (hasTransaction() && this.newTransaction);
}
/**
* Return if a new transaction synchronization has been opened
* for this transaction.
*/
public boolean isNewSynchronization() {
return this.newSynchronization;
}
/**
* Return if this transaction is defined as read-only transaction.
*/
public boolean isReadOnly() {
return this.readOnly;
}
/**
* Return whether the progress of this transaction is debugged. This is used by
* {@link AbstractReactiveTransactionManager} as an optimization, to prevent repeated
* calls to {@code logger.isDebugEnabled()}. Not really intended for client code.
*/
public boolean isDebug() {
return this.debug;
}
/**
* Return the holder for resources that have been suspended for this transaction,
* if any.
*/
@Nullable
public Object getSuspendedResources() {
return this.suspendedResources;
}
}

137
spring-tx/src/main/java/org/springframework/transaction/reactive/DefaultTransactionalOperator.java

@ -0,0 +1,137 @@ @@ -0,0 +1,137 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.transaction.reactive;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.TransactionSystemException;
import org.springframework.transaction.ReactiveTransactionManager;
import org.springframework.transaction.ReactiveTransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.Assert;
/**
* Operator class that simplifies programmatic transaction demarcation and
* transaction exception handling.
*
* @author Mark Paluch
* @since 5.2
* @see #execute
* @see ReactiveTransactionManager
*/
@SuppressWarnings("serial")
class DefaultTransactionalOperator extends DefaultTransactionDefinition
implements TransactionalOperator {
private final Log logger = LogFactory.getLog(getClass());
private final ReactiveTransactionManager transactionManager;
/**
* Construct a new DefaultTransactionalOperator using the given transaction manager.
* @param transactionManager the transaction management strategy to be used
*/
DefaultTransactionalOperator(ReactiveTransactionManager transactionManager) {
Assert.notNull(transactionManager, "ReactiveTransactionManager must not be null");
this.transactionManager = transactionManager;
}
/**
* Construct a new TransactionTemplate using the given transaction manager,
* taking its default settings from the given transaction definition.
* @param transactionManager the transaction management strategy to be used
* @param transactionDefinition the transaction definition to copy the
* default settings from. Local properties can still be set to change values.
*/
DefaultTransactionalOperator(ReactiveTransactionManager transactionManager, TransactionDefinition transactionDefinition) {
super(transactionDefinition);
Assert.notNull(transactionManager, "ReactiveTransactionManager must not be null");
this.transactionManager = transactionManager;
}
/**
* Return the transaction management strategy to be used.
*/
public ReactiveTransactionManager getTransactionManager() {
return this.transactionManager;
}
@Override
public <T> Flux<T> execute(ReactiveTransactionCallback<T> action) throws TransactionException {
return TransactionContextManager.currentContext().flatMapMany(context -> {
Mono<ReactiveTransactionStatus> status = this.transactionManager.getTransaction(this);
return status.flatMapMany(it -> {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
Flux<Object> retVal = Flux.from(action.doInTransaction(it));
return retVal.onErrorResume(ex -> {
// Transactional code threw application exception -> rollback
return rollbackOnException(it, ex).then(Mono.error(ex));
}).materialize().flatMap(signal -> {
if (signal.isOnComplete()) {
return transactionManager.commit(it).materialize();
}
return Mono.just(signal);
}).<T>dematerialize();
});
})
.subscriberContext(TransactionContextManager.getOrCreateContext())
.subscriberContext(TransactionContextManager.getOrCreateContextHolder());
}
/**
* Perform a rollback, handling rollback exceptions properly.
* @param status object representing the transaction
* @param ex the thrown application exception or error
* @throws TransactionException in case of a rollback error
*/
private Mono<Void> rollbackOnException(ReactiveTransactionStatus status, Throwable ex) throws TransactionException {
logger.debug("Initiating transaction rollback on application exception", ex);
return this.transactionManager.rollback(status).onErrorMap(ex2 -> {
logger.error("Application exception overridden by rollback exception", ex);
if (ex2 instanceof TransactionSystemException) {
((TransactionSystemException) ex2).initApplicationException(ex);
}
return ex2;
}
);
}
@Override
public boolean equals(Object other) {
return (this == other || (super.equals(other) && (!(other instanceof DefaultTransactionalOperator) ||
getTransactionManager() == ((DefaultTransactionalOperator) other).getTransactionManager())));
}
}

215
spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveResourceHolderSynchronization.java

@ -0,0 +1,215 @@ @@ -0,0 +1,215 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.transaction.reactive;
import org.springframework.transaction.support.ResourceHolder;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import reactor.core.publisher.Mono;
/**
* {@link ReactiveTransactionSynchronization} implementation that manages a
* {@link ResourceHolder} bound through {@link ReactiveTransactionSynchronizationManager}.
*
* @author Mark Paluch
* @since 5.2
* @param <H> the resource holder type
* @param <K> the resource key type
*/
public abstract class ReactiveResourceHolderSynchronization<H extends ResourceHolder, K>
implements ReactiveTransactionSynchronization {
private final H resourceHolder;
private final K resourceKey;
private final ReactiveTransactionSynchronizationManager synchronizationManager;
private volatile boolean holderActive = true;
/**
* Create a new ResourceHolderSynchronization for the given holder.
* @param resourceHolder the ResourceHolder to manage
* @param resourceKey the key to bind the ResourceHolder for
* @param synchronizationManager the synchronization manager bound to the current transaction
* @see TransactionSynchronizationManager#bindResource
*/
public ReactiveResourceHolderSynchronization(H resourceHolder, K resourceKey, ReactiveTransactionSynchronizationManager synchronizationManager) {
this.resourceHolder = resourceHolder;
this.resourceKey = resourceKey;
this.synchronizationManager = synchronizationManager;
}
@Override
public Mono<Void> suspend() {
if (this.holderActive) {
synchronizationManager.unbindResource(this.resourceKey);
}
return Mono.empty();
}
@Override
public Mono<Void> resume() {
if (this.holderActive) {
synchronizationManager.bindResource(this.resourceKey, this.resourceHolder);
}
return Mono.empty();
}
@Override
public Mono<Void> flush() {
return flushResource(this.resourceHolder);
}
@Override
public Mono<Void> beforeCommit(boolean readOnly) {
return Mono.empty();
}
@Override
public Mono<Void> beforeCompletion() {
if (shouldUnbindAtCompletion()) {
synchronizationManager.unbindResource(this.resourceKey);
this.holderActive = false;
if (shouldReleaseBeforeCompletion()) {
return releaseResource(this.resourceHolder, this.resourceKey);
}
}
return Mono.empty();
}
@Override
public Mono<Void> afterCommit() {
if (!shouldReleaseBeforeCompletion()) {
return processResourceAfterCommit(this.resourceHolder);
}
return Mono.empty();
}
@Override
public Mono<Void> afterCompletion(int status) {
return Mono.defer(() -> {
Mono<Void> sync = Mono.empty();
if (shouldUnbindAtCompletion()) {
boolean releaseNecessary = false;
if (this.holderActive) {
// The thread-bound resource holder might not be available anymore,
// since afterCompletion might get called from a different thread.
this.holderActive = false;
synchronizationManager.unbindResourceIfPossible(this.resourceKey);
this.resourceHolder.unbound();
releaseNecessary = true;
} else {
releaseNecessary = shouldReleaseAfterCompletion(this.resourceHolder);
}
if (releaseNecessary) {
sync = releaseResource(this.resourceHolder, this.resourceKey);
}
} else {
// Probably a pre-bound resource...
sync = cleanupResource(this.resourceHolder, this.resourceKey, (status == STATUS_COMMITTED));
}
;
return sync.doFinally(s -> this.resourceHolder.reset());
});
}
/**
* Return whether this holder should be unbound at completion
* (or should rather be left bound to the thread after the transaction).
* <p>The default implementation returns {@code true}.
*/
protected boolean shouldUnbindAtCompletion() {
return true;
}
/**
* Return whether this holder's resource should be released before
* transaction completion ({@code true}) or rather after
* transaction completion ({@code false}).
* <p>Note that resources will only be released when they are
* unbound from the thread ({@link #shouldUnbindAtCompletion()}).
* <p>The default implementation returns {@code true}.
*
* @see #releaseResource
*/
protected boolean shouldReleaseBeforeCompletion() {
return true;
}
/**
* Return whether this holder's resource should be released after
* transaction completion ({@code true}).
* <p>The default implementation returns {@code !shouldReleaseBeforeCompletion()},
* releasing after completion if no attempt was made before completion.
*
* @see #releaseResource
*/
protected boolean shouldReleaseAfterCompletion(H resourceHolder) {
return !shouldReleaseBeforeCompletion();
}
/**
* Flush callback for the given resource holder.
*
* @param resourceHolder the resource holder to flush
*/
protected Mono<Void> flushResource(H resourceHolder) {
return Mono.empty();
}
/**
* After-commit callback for the given resource holder.
* Only called when the resource hasn't been released yet
* ({@link #shouldReleaseBeforeCompletion()}).
*
* @param resourceHolder the resource holder to process
*/
protected Mono<Void> processResourceAfterCommit(H resourceHolder) {
return Mono.empty();
}
/**
* Release the given resource (after it has been unbound from the thread).
*
* @param resourceHolder the resource holder to process
* @param resourceKey the key that the ResourceHolder was bound for
*/
protected Mono<Void> releaseResource(H resourceHolder, K resourceKey) {
return Mono.empty();
}
/**
* Perform a cleanup on the given resource (which is left bound to the thread).
*
* @param resourceHolder the resource holder to process
* @param resourceKey the key that the ResourceHolder was bound for
* @param committed whether the transaction has committed ({@code true})
* or rolled back ({@code false})
*/
protected Mono<Void> cleanupResource(H resourceHolder, K resourceKey, boolean committed) {
return Mono.empty();
}
}

56
spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionCallback.java

@ -0,0 +1,56 @@ @@ -0,0 +1,56 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.transaction.reactive;
import org.reactivestreams.Publisher;
import org.springframework.transaction.ReactiveTransactionStatus;
/**
* Callback interface for reactive transactional code. Used with {@link TransactionalOperator}'s
* {@code execute} method, often as anonymous class within a method implementation.
*
* <p>Typically used to assemble various calls to transaction-unaware data access
* services into a higher-level service method with transaction demarcation. As an
* alternative, consider the use of declarative transaction demarcation (e.g. through
* Spring's {@link org.springframework.transaction.annotation.Transactional} annotation).
*
* @author Mark Paluch
* @since 5.2
* @see TransactionalOperator
* @param <T> the result type
*/
@FunctionalInterface
public interface ReactiveTransactionCallback<T> {
/**
* Gets called by {@link TransactionalOperator#transactional} within a transactional context.
* Does not need to care about transactions itself, although it can retrieve and
* influence the status of the current transaction via the given status object,
* e.g. setting rollback-only.
* <p>Allows for returning a result object created within the transaction, i.e. a
* domain object or a collection of domain objects. A RuntimeException thrown by the
* callback is treated as application exception that enforces a rollback. Any such
* exception will be propagated to the caller of the template, unless there is a
* problem rolling back, in which case a TransactionException will be thrown.
* @param status associated transaction status
* @return a result publisher
* @see TransactionalOperator#transactional
*/
Publisher<T> doInTransaction(ReactiveTransactionStatus status);
}

152
spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronization.java

@ -0,0 +1,152 @@ @@ -0,0 +1,152 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.transaction.reactive;
import reactor.core.publisher.Mono;
import org.springframework.transaction.ReactiveTransactionStatus;
/**
* Interface for reactive transaction synchronization callbacks.
* Supported by {@link AbstractReactiveTransactionManager}.
*
* <p>ReactiveTransactionSynchronization implementations can implement the
* {@link org.springframework.core.Ordered} interface to influence their execution order.
* A synchronization that does not implement the {@link org.springframework.core.Ordered}
* interface is appended to the end of the synchronization chain.
*
* <p>System synchronizations performed by Spring itself use specific order values,
* allowing for fine-grained interaction with their execution order (if necessary).
*
* @author Mark Paluch
* @since 5.2
* @see ReactiveTransactionSynchronizationManager
* @see AbstractReactiveTransactionManager
*/
public interface ReactiveTransactionSynchronization {
/** Completion status in case of proper commit. */
int STATUS_COMMITTED = 0;
/** Completion status in case of proper rollback. */
int STATUS_ROLLED_BACK = 1;
/** Completion status in case of heuristic mixed completion or system errors. */
int STATUS_UNKNOWN = 2;
/**
* Suspend this synchronization.
* Supposed to unbind resources from TransactionSynchronizationManager if managing any.
* @see ReactiveTransactionSynchronizationManager#unbindResource
*/
default Mono<Void> suspend() {
return Mono.empty();
}
/**
* Resume this synchronization.
* Supposed to rebind resources to TransactionSynchronizationManager if managing any.
* @see ReactiveTransactionSynchronizationManager#bindResource
*/
default Mono<Void> resume() {
return Mono.empty();
}
/**
* Flush the underlying session to the datastore, if applicable.
* @see ReactiveTransactionStatus#flush()
*/
default Mono<Void> flush() {
return Mono.empty();
}
/**
* Invoked before transaction commit (before "beforeCompletion").
* Can e.g. flush transactional O/R Mapping sessions to the database.
* <p>This callback does <i>not</i> mean that the transaction will actually be committed.
* A rollback decision can still occur after this method has been called. This callback
* is rather meant to perform work that's only relevant if a commit still has a chance
* to happen, such as flushing SQL statements to the database.
* <p>Note that exceptions will get propagated to the commit caller and cause a
* rollback of the transaction.
* @param readOnly whether the transaction is defined as read-only transaction
* @throws RuntimeException in case of errors; will be <b>propagated to the caller</b>
* (note: do not throw TransactionException subclasses here!)
* @see #beforeCompletion
*/
default Mono<Void> beforeCommit(boolean readOnly) {
return Mono.empty();
}
/**
* Invoked before transaction commit/rollback.
* Can perform resource cleanup <i>before</i> transaction completion.
* <p>This method will be invoked after {@code beforeCommit}, even when
* {@code beforeCommit} threw an exception. This callback allows for
* closing resources before transaction completion, for any outcome.
* @throws RuntimeException in case of errors; will be <b>logged but not propagated</b>
* (note: do not throw TransactionException subclasses here!)
* @see #beforeCommit
* @see #afterCompletion
*/
default Mono<Void> beforeCompletion() {
return Mono.empty();
}
/**
* Invoked after transaction commit. Can perform further operations right
* <i>after</i> the main transaction has <i>successfully</i> committed.
* <p>Can e.g. commit further operations that are supposed to follow on a successful
* commit of the main transaction, like confirmation messages or emails.
* <p><b>NOTE:</b> The transaction will have been committed already, but the
* transactional resources might still be active and accessible. As a consequence,
* any data access code triggered at this point will still "participate" in the
* original transaction, allowing to perform some cleanup (with no commit following
* anymore!), unless it explicitly declares that it needs to run in a separate
* transaction. Hence: <b>Use {@code PROPAGATION_REQUIRES_NEW} for any
* transactional operation that is called from here.</b>
* @throws RuntimeException in case of errors; will be <b>propagated to the caller</b>
* (note: do not throw TransactionException subclasses here!)
*/
default Mono<Void> afterCommit() {
return Mono.empty();
}
/**
* Invoked after transaction commit/rollback.
* Can perform resource cleanup <i>after</i> transaction completion.
* <p><b>NOTE:</b> The transaction will have been committed or rolled back already,
* but the transactional resources might still be active and accessible. As a
* consequence, any data access code triggered at this point will still "participate"
* in the original transaction, allowing to perform some cleanup (with no commit
* following anymore!), unless it explicitly declares that it needs to run in a
* separate transaction. Hence: <b>Use {@code PROPAGATION_REQUIRES_NEW}
* for any transactional operation that is called from here.</b>
* @param status completion status according to the {@code STATUS_*} constants
* @throws RuntimeException in case of errors; will be <b>logged but not propagated</b>
* (note: do not throw TransactionException subclasses here!)
* @see #STATUS_COMMITTED
* @see #STATUS_ROLLED_BACK
* @see #STATUS_UNKNOWN
* @see #beforeCompletion
*/
default Mono<Void> afterCompletion(int status) {
return Mono.empty();
}
}

446
spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronizationManager.java

@ -0,0 +1,446 @@ @@ -0,0 +1,446 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.transaction.reactive;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;
import org.springframework.core.annotation.AnnotationAwareOrderComparator;
import org.springframework.lang.Nullable;
import org.springframework.transaction.NoTransactionException;
import org.springframework.transaction.support.ResourceHolder;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.util.Assert;
/**
* Central delegate that manages resources and transaction synchronizations per
* subscriber context.
* To be used by resource management code but not by typical application code.
*
* <p>Supports one resource per key without overwriting, that is, a resource needs
* to be removed before a new one can be set for the same key.
* Supports a list of transaction synchronizations if synchronization is active.
*
* <p>Resource management code should check for context-bound resources, e.g. database
* connections, via {@code getResource}. Such code is
* normally not supposed to bind resources to units of work, as this is the responsibility
* of transaction managers. A further option is to lazily bind on first use if
* transaction synchronization is active, for performing transactions that span
* an arbitrary number of resources.
*
* <p>Transaction synchronization must be activated and deactivated by a transaction
* manager via {@link #initSynchronization()} and {@link #clearSynchronization()}.
* This is automatically supported by {@link AbstractReactiveTransactionManager},
* and thus by all standard Spring transaction managers.
*
* <p>Resource management code should only register synchronizations when this
* manager is active, which can be checked via {@link #isSynchronizationActive};
* it should perform immediate resource cleanup else. If transaction synchronization
* isn't active, there is either no current transaction, or the transaction manager
* doesn't support transaction synchronization.
*
* <p>Synchronization is for example used to always return the same resources
* within a transaction, e.g. a database connection for
* any given Connectionfactory or DatabaseFactory.
*
* @author Mark Paluch
* @since 5.2
* @see #isSynchronizationActive
* @see #registerSynchronization
* @see TransactionSynchronization
* @see AbstractReactiveTransactionManager#setTransactionSynchronization
*/
public class ReactiveTransactionSynchronizationManager {
private static final Log logger = LogFactory.getLog(ReactiveTransactionSynchronizationManager.class);
private final TransactionContext transactionContext;
public ReactiveTransactionSynchronizationManager(TransactionContext transactionContext) {
this.transactionContext = transactionContext;
}
/**
* Return the ReactiveTransactionSynchronizationManager of the current transaction.
* Mainly intended for code that wants to bind resources or synchronizations.
* rollback-only but not throw an application exception.
* @throws NoTransactionException if the transaction info cannot be found,
* because the method was invoked outside a managed transaction.
*/
public static Mono<ReactiveTransactionSynchronizationManager> currentTransaction() {
return TransactionContextManager.currentContext().map(ReactiveTransactionSynchronizationManager::new);
}
/**
* Check if there is a resource for the given key bound to the current thread.
*
* @param key the key to check (usually the resource factory)
* @return if there is a value bound to the current thread
* @see ResourceTransactionManager#getResourceFactory()
*/
public boolean hasResource(Object key) {
Object actualKey = ReactiveTransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Object value = doGetResource(actualKey);
return (value != null);
}
/**
* Retrieve a resource for the given key that is bound to the current thread.
*
* @param key the key to check (usually the resource factory)
* @return a value bound to the current thread (usually the active
* resource object), or {@code null} if none
* @see ResourceTransactionManager#getResourceFactory()
*/
@Nullable
public Object getResource(Object key) {
Object actualKey = ReactiveTransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Object value = doGetResource(actualKey);
if (value != null && logger.isTraceEnabled()) {
logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to context [" +
transactionContext.getName() + "]");
}
return value;
}
/**
* Actually check the value of the resource that is bound for the given key.
*/
@Nullable
private Object doGetResource(Object actualKey) {
Map<Object, Object> map = transactionContext.getResources();
Object value = map.get(actualKey);
// Transparently remove ResourceHolder that was marked as void...
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
map.remove(actualKey);
value = null;
}
return value;
}
/**
* Bind the given resource for the given key to the current context.
*
* @param key the key to bind the value to (usually the resource factory)
* @param value the value to bind (usually the active resource object)
* @throws IllegalStateException if there is already a value bound to the context
* @see ResourceTransactionManager#getResourceFactory()
*/
public void bindResource(Object key, Object value) throws IllegalStateException {
Object actualKey = ReactiveTransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Assert.notNull(value, "Value must not be null");
Map<Object, Object> map = transactionContext.getResources();
Object oldValue = map.put(actualKey, value);
// Transparently suppress a ResourceHolder that was marked as void...
if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {
oldValue = null;
}
if (oldValue != null) {
throw new IllegalStateException("Already value [" + oldValue + "] for key [" +
actualKey + "] bound to context [" + transactionContext.getName() + "]");
}
if (logger.isTraceEnabled()) {
logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to context [" +
transactionContext.getName() + "]");
}
}
/**
* Unbind a resource for the given key from the current context.
*
* @param key the key to unbind (usually the resource factory)
* @return the previously bound value (usually the active resource object)
* @throws IllegalStateException if there is no value bound to the context
* @see ResourceTransactionManager#getResourceFactory()
*/
public Object unbindResource(Object key) throws IllegalStateException {
Object actualKey = ReactiveTransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Object value = doUnbindResource(actualKey);
if (value == null) {
throw new IllegalStateException(
"No value for key [" + actualKey + "] bound to context [" + transactionContext.getName() + "]");
}
return value;
}
/**
* Unbind a resource for the given key from the current context.
*
* @param key the key to unbind (usually the resource factory)
* @return the previously bound value, or {@code null} if none bound
*/
@Nullable
public Object unbindResourceIfPossible(Object key) {
Object actualKey = ReactiveTransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
return doUnbindResource(actualKey);
}
/**
* Actually remove the value of the resource that is bound for the given key.
*/
@Nullable
private Object doUnbindResource(Object actualKey) {
Map<Object, Object> map = transactionContext.getResources();
Object value = map.remove(actualKey);
// Transparently suppress a ResourceHolder that was marked as void...
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
value = null;
}
if (value != null && logger.isTraceEnabled()) {
logger.trace("Removed value [" + value + "] for key [" + actualKey + "] from context [" +
transactionContext.getName() + "]");
}
return value;
}
//-------------------------------------------------------------------------
// Management of transaction synchronizations
//-------------------------------------------------------------------------
/**
* Return if transaction synchronization is active for the current context.
* Can be called before register to avoid unnecessary instance creation.
*
* @see #registerSynchronization
*/
public boolean isSynchronizationActive() {
return (transactionContext.getSynchronizations() != null);
}
/**
* Activate transaction synchronization for the current context.
* Called by a transaction manager on transaction begin.
*
* @throws IllegalStateException if synchronization is already active
*/
public void initSynchronization() throws IllegalStateException {
if (isSynchronizationActive()) {
throw new IllegalStateException("Cannot activate transaction synchronization - already active");
}
logger.trace("Initializing transaction synchronization");
transactionContext.setSynchronizations(new LinkedHashSet<>());
}
/**
* Register a new transaction synchronization for the current context.
* Typically called by resource management code.
* <p>Note that synchronizations can implement the
* {@link org.springframework.core.Ordered} interface.
* They will be executed in an order according to their order value (if any).
*
* @param synchronization the synchronization object to register
* @throws IllegalStateException if transaction synchronization is not active
* @see org.springframework.core.Ordered
*/
public void registerSynchronization(ReactiveTransactionSynchronization synchronization)
throws IllegalStateException {
Assert.notNull(synchronization, "TransactionSynchronization must not be null");
if (!isSynchronizationActive()) {
throw new IllegalStateException("Transaction synchronization is not active");
}
transactionContext.getSynchronizations().add(synchronization);
}
/**
* Return an unmodifiable snapshot list of all registered synchronizations
* for the current context.
*
* @return unmodifiable List of TransactionSynchronization instances
* @throws IllegalStateException if synchronization is not active
* @see TransactionSynchronization
*/
public List<ReactiveTransactionSynchronization> getSynchronizations() throws IllegalStateException {
Set<ReactiveTransactionSynchronization> synchs = transactionContext.getSynchronizations();
if (synchs == null) {
throw new IllegalStateException("Transaction synchronization is not active");
}
// Return unmodifiable snapshot, to avoid ConcurrentModificationExceptions
// while iterating and invoking synchronization callbacks that in turn
// might register further synchronizations.
if (synchs.isEmpty()) {
return Collections.emptyList();
} else {
// Sort lazily here, not in registerSynchronization.
List<ReactiveTransactionSynchronization> sortedSynchs = new ArrayList<>(synchs);
AnnotationAwareOrderComparator.sort(sortedSynchs);
return Collections.unmodifiableList(sortedSynchs);
}
}
/**
* Deactivate transaction synchronization for the current context.
* Called by the transaction manager on transaction cleanup.
*
* @throws IllegalStateException if synchronization is not active
*/
public void clearSynchronization() throws IllegalStateException {
if (!isSynchronizationActive()) {
throw new IllegalStateException("Cannot deactivate transaction synchronization - not active");
}
logger.trace("Clearing transaction synchronization");
transactionContext.setSynchronizations(null);
}
//-------------------------------------------------------------------------
// Exposure of transaction characteristics
//-------------------------------------------------------------------------
/**
* Expose the name of the current transaction, if any.
* Called by the transaction manager on transaction begin and on cleanup.
*
* @param name the name of the transaction, or {@code null} to reset it
* @see org.springframework.transaction.TransactionDefinition#getName()
*/
public void setCurrentTransactionName(@Nullable String name) {
transactionContext.setCurrentTransactionName(name);
}
/**
* Return the name of the current transaction, or {@code null} if none set.
* To be called by resource management code for optimizations per use case,
* for example to optimize fetch strategies for specific named transactions.
*
* @see org.springframework.transaction.TransactionDefinition#getName()
*/
@Nullable
public String getCurrentTransactionName() {
return transactionContext.getCurrentTransactionName();
}
/**
* Expose a read-only flag for the current transaction.
* Called by the transaction manager on transaction begin and on cleanup.
*
* @param readOnly {@code true} to mark the current transaction
* as read-only; {@code false} to reset such a read-only marker
* @see org.springframework.transaction.TransactionDefinition#isReadOnly()
*/
public void setCurrentTransactionReadOnly(boolean readOnly) {
transactionContext.setCurrentTransactionReadOnly(readOnly);
}
/**
* Return whether the current transaction is marked as read-only.
* To be called by resource management code when preparing a newly
* created resource (for example, a Hibernate Session).
* <p>Note that transaction synchronizations receive the read-only flag
* as argument for the {@code beforeCommit} callback, to be able
* to suppress change detection on commit. The present method is meant
* to be used for earlier read-only checks, for example to set the
* flush mode of a Hibernate Session to "FlushMode.NEVER" upfront.
*
* @see org.springframework.transaction.TransactionDefinition#isReadOnly()
* @see TransactionSynchronization#beforeCommit(boolean)
*/
public boolean isCurrentTransactionReadOnly() {
return transactionContext.isCurrentTransactionReadOnly();
}
/**
* Expose an isolation level for the current transaction.
* Called by the transaction manager on transaction begin and on cleanup.
*
* @param isolationLevel the isolation level to expose, according to the
* R2DBC Connection constants (equivalent to the corresponding Spring
* TransactionDefinition constants), or {@code null} to reset it
* @see org.springframework.transaction.TransactionDefinition#ISOLATION_READ_UNCOMMITTED
* @see org.springframework.transaction.TransactionDefinition#ISOLATION_READ_COMMITTED
* @see org.springframework.transaction.TransactionDefinition#ISOLATION_REPEATABLE_READ
* @see org.springframework.transaction.TransactionDefinition#ISOLATION_SERIALIZABLE
* @see org.springframework.transaction.TransactionDefinition#getIsolationLevel()
*/
public void setCurrentTransactionIsolationLevel(@Nullable Integer isolationLevel) {
transactionContext.setCurrentTransactionIsolationLevel(isolationLevel);
}
/**
* Return the isolation level for the current transaction, if any.
* To be called by resource management code when preparing a newly
* created resource (for example, a R2DBC Connection).
*
* @return the currently exposed isolation level, according to the
* R2DBC Connection constants (equivalent to the corresponding Spring
* TransactionDefinition constants), or {@code null} if none
* @see org.springframework.transaction.TransactionDefinition#ISOLATION_READ_UNCOMMITTED
* @see org.springframework.transaction.TransactionDefinition#ISOLATION_READ_COMMITTED
* @see org.springframework.transaction.TransactionDefinition#ISOLATION_REPEATABLE_READ
* @see org.springframework.transaction.TransactionDefinition#ISOLATION_SERIALIZABLE
* @see org.springframework.transaction.TransactionDefinition#getIsolationLevel()
*/
@Nullable
public Integer getCurrentTransactionIsolationLevel() {
return transactionContext.getCurrentTransactionIsolationLevel();
}
/**
* Expose whether there currently is an actual transaction active.
* Called by the transaction manager on transaction begin and on cleanup.
*
* @param active {@code true} to mark the current context as being associated
* with an actual transaction; {@code false} to reset that marker
*/
public void setActualTransactionActive(boolean active) {
transactionContext.setActualTransactionActive(active);
}
/**
* Return whether there currently is an actual transaction active.
* This indicates whether the current context is associated with an actual
* transaction rather than just with active transaction synchronization.
* <p>To be called by resource management code that wants to discriminate
* between active transaction synchronization (with or without backing
* resource transaction; also on PROPAGATION_SUPPORTS) and an actual
* transaction being active (with backing resource transaction;
* on PROPAGATION_REQUIRED, PROPAGATION_REQUIRES_NEW, etc).
*
* @see #isSynchronizationActive()
*/
public boolean isActualTransactionActive() {
return transactionContext.isActualTransactionActive();
}
/**
* Clear the entire transaction synchronization state:
* registered synchronizations as well as the various transaction characteristics.
*
* @see #clearSynchronization()
* @see #setCurrentTransactionName
* @see #setCurrentTransactionReadOnly
* @see #setCurrentTransactionIsolationLevel
* @see #setActualTransactionActive
*/
public void clear() {
transactionContext.clear();
}
private Map<Object, Object> getResources() {
return transactionContext.getResources();
}
}

202
spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronizationUtils.java

@ -0,0 +1,202 @@ @@ -0,0 +1,202 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.transaction.reactive;
import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.aop.scope.ScopedObject;
import org.springframework.core.InfrastructureProxy;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
/**
* Utility methods for triggering specific {@link ReactiveTransactionSynchronization}
* callback methods on all currently registered synchronizations.
*
* @author Mark Paluch
* @since 5.2
* @see ReactiveTransactionSynchronization
* @see ReactiveTransactionSynchronizationManager#getSynchronizations()
*/
public abstract class ReactiveTransactionSynchronizationUtils {
private static final Log logger = LogFactory.getLog(ReactiveTransactionSynchronizationUtils.class);
private static final boolean aopAvailable = ClassUtils.isPresent(
"org.springframework.aop.scope.ScopedObject", ReactiveTransactionSynchronizationUtils.class.getClassLoader());
/**
* Unwrap the given resource handle if necessary; otherwise return
* the given handle as-is.
* @see InfrastructureProxy#getWrappedObject()
*/
static Object unwrapResourceIfNecessary(Object resource) {
Assert.notNull(resource, "Resource must not be null");
Object resourceRef = resource;
// unwrap infrastructure proxy
if (resourceRef instanceof InfrastructureProxy) {
resourceRef = ((InfrastructureProxy) resourceRef).getWrappedObject();
}
if (aopAvailable) {
// now unwrap scoped proxy
resourceRef = ScopedProxyUnwrapper.unwrapIfNecessary(resourceRef);
}
return resourceRef;
}
/**
* Trigger {@code flush} callbacks on all currently registered synchronizations.
* @throws RuntimeException if thrown by a {@code flush} callback
* @see ReactiveTransactionSynchronization#flush()
*/
public static Mono<Void> triggerFlush() {
return TransactionContextManager.currentContext().flatMapIterable(TransactionContext::getSynchronizations).concatMap(ReactiveTransactionSynchronization::flush).then();
}
/**
* Trigger {@code beforeCommit} callbacks on all currently registered synchronizations.
*
* @param readOnly whether the transaction is defined as read-only transaction
* @throws RuntimeException if thrown by a {@code beforeCommit} callback
* @see ReactiveTransactionSynchronization#beforeCommit(boolean)
*/
public static Mono<Void> triggerBeforeCommit(boolean readOnly) {
return TransactionContextManager.currentContext()
.map(TransactionContext::getSynchronizations)
.flatMap(it -> triggerBeforeCommit(it, readOnly)).then();
}
/**
* Actually invoke the {@code triggerBeforeCommit} methods of the
* given Spring ReactiveTransactionSynchronization objects.
*
* @param synchronizations a List of ReactiveTransactionSynchronization objects
* @see ReactiveTransactionSynchronization#beforeCommit(boolean)
*/
public static Mono<Void> triggerBeforeCommit(Collection<ReactiveTransactionSynchronization> synchronizations, boolean readOnly) {
return Flux.fromIterable(synchronizations).concatMap(it -> it.beforeCommit(readOnly))
.then();
}
/**
* Trigger {@code beforeCompletion} callbacks on all currently registered synchronizations.
* @see ReactiveTransactionSynchronization#beforeCompletion()
*/
public static Mono<Void> triggerBeforeCompletion() {
return TransactionContextManager.currentContext()
.map(TransactionContext::getSynchronizations)
.flatMap(ReactiveTransactionSynchronizationUtils::triggerBeforeCompletion);
}
/**
* Actually invoke the {@code beforeCompletion} methods of the
* given Spring ReactiveTransactionSynchronization objects.
* @param synchronizations a List of ReactiveTransactionSynchronization objects
* @see ReactiveTransactionSynchronization#beforeCompletion()
*/
public static Mono<Void> triggerBeforeCompletion(Collection<ReactiveTransactionSynchronization> synchronizations) {
return Flux.fromIterable(synchronizations)
.concatMap(ReactiveTransactionSynchronization::beforeCompletion).onErrorContinue((t, o) -> {
logger.error("TransactionSynchronization.beforeCompletion threw exception", t);
}).then();
}
/**
* Trigger {@code afterCommit} callbacks on all currently registered synchronizations.
* @throws RuntimeException if thrown by a {@code afterCommit} callback
* @see ReactiveTransactionSynchronizationManager#getSynchronizations()
* @see ReactiveTransactionSynchronization#afterCommit()
*/
public static Mono<Void> triggerAfterCommit() {
return TransactionContextManager.currentContext()
.flatMap(it -> invokeAfterCommit(it.getSynchronizations()));
}
/**
* Actually invoke the {@code afterCommit} methods of the
* given Spring ReactiveTransactionSynchronization objects.
* @param synchronizations a List of ReactiveTransactionSynchronization objects
* @see TransactionSynchronization#afterCommit()
*/
public static Mono<Void> invokeAfterCommit(Collection<ReactiveTransactionSynchronization> synchronizations) {
return Flux.fromIterable(synchronizations)
.concatMap(ReactiveTransactionSynchronization::afterCommit)
.then();
}
/**
* Trigger {@code afterCompletion} callbacks on all currently registered synchronizations.
* @param completionStatus the completion status according to the
* constants in the ReactiveTransactionSynchronization interface
* @see ReactiveTransactionSynchronizationManager#getSynchronizations()
* @see ReactiveTransactionSynchronization#afterCompletion(int)
* @see ReactiveTransactionSynchronization#STATUS_COMMITTED
* @see ReactiveTransactionSynchronization#STATUS_ROLLED_BACK
* @see ReactiveTransactionSynchronization#STATUS_UNKNOWN
*/
public static Mono<Void> triggerAfterCompletion(int completionStatus) {
return TransactionContextManager.currentContext()
.flatMap(it -> invokeAfterCompletion(it.getSynchronizations(), completionStatus));
}
/**
* Actually invoke the {@code afterCompletion} methods of the
* given Spring ReactiveTransactionSynchronization objects.
* @param synchronizations a List of ReactiveTransactionSynchronization objects
* @param completionStatus the completion status according to the
* constants in the ReactiveTransactionSynchronization interface
* @see ReactiveTransactionSynchronization#afterCompletion(int)
* @see ReactiveTransactionSynchronization#STATUS_COMMITTED
* @see ReactiveTransactionSynchronization#STATUS_ROLLED_BACK
* @see ReactiveTransactionSynchronization#STATUS_UNKNOWN
*/
public static Mono<Void> invokeAfterCompletion(Collection<ReactiveTransactionSynchronization> synchronizations,
int completionStatus) {
return Flux.fromIterable(synchronizations).concatMap(it -> it.afterCompletion(completionStatus))
.onErrorContinue((t, o) -> {
logger.error("TransactionSynchronization.afterCompletion threw exception", t);
}).then();
}
/**
* Inner class to avoid hard-coded dependency on AOP module.
*/
private static class ScopedProxyUnwrapper {
static Object unwrapIfNecessary(Object resource) {
if (resource instanceof ScopedObject) {
return ((ScopedObject) resource).getTargetObject();
}
else {
return resource;
}
}
}
}

140
spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContext.java

@ -0,0 +1,140 @@ @@ -0,0 +1,140 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.transaction.reactive;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.springframework.lang.Nullable;
import org.springframework.util.StringUtils;
/**
* Mutable transaction context that encapsulates transactional synchronizations and
* resources in the scope of a single transaction. Transaction context is typically
* held by an outer {@link TransactionContextHolder} or referenced directly within
* from the subscriber context.
*
* @author Mark Paluch
* @since 5.2
* @see TransactionContextManager
* @see reactor.util.context.Context
*/
public class TransactionContext {
private final UUID contextId = UUID.randomUUID();
private final Map<Object, Object> resources = new LinkedHashMap<>();
private @Nullable Set<ReactiveTransactionSynchronization> synchronizations;
private volatile @Nullable String currentTransactionName;
private volatile boolean currentTransactionReadOnly;
private volatile @Nullable Integer currentTransactionIsolationLevel;
private volatile boolean actualTransactionActive;
private final @Nullable TransactionContext parent;
TransactionContext() {
this(null);
}
TransactionContext(@Nullable TransactionContext parent) {
this.parent = parent;
}
public void clear() {
synchronizations = null;
currentTransactionName = null;
currentTransactionReadOnly = false;
currentTransactionIsolationLevel = null;
actualTransactionActive = false;
}
public String getName() {
if (StringUtils.hasText(currentTransactionName)) {
return contextId + ": " + currentTransactionName;
}
return contextId.toString();
}
public UUID getContextId() {
return contextId;
}
public Map<Object, Object> getResources() {
return resources;
}
@Nullable
public Set<ReactiveTransactionSynchronization> getSynchronizations() {
return synchronizations;
}
public void setSynchronizations(@org.springframework.lang.Nullable Set<ReactiveTransactionSynchronization> synchronizations) {
this.synchronizations = synchronizations;
}
@Nullable
public String getCurrentTransactionName() {
return currentTransactionName;
}
public void setCurrentTransactionName(@Nullable String currentTransactionName) {
this.currentTransactionName = currentTransactionName;
}
public boolean isCurrentTransactionReadOnly() {
return currentTransactionReadOnly;
}
public void setCurrentTransactionReadOnly(boolean currentTransactionReadOnly) {
this.currentTransactionReadOnly = currentTransactionReadOnly;
}
@Nullable
public Integer getCurrentTransactionIsolationLevel() {
return currentTransactionIsolationLevel;
}
public void setCurrentTransactionIsolationLevel(@Nullable Integer currentTransactionIsolationLevel) {
this.currentTransactionIsolationLevel = currentTransactionIsolationLevel;
}
public boolean isActualTransactionActive() {
return actualTransactionActive;
}
public void setActualTransactionActive(boolean actualTransactionActive) {
this.actualTransactionActive = actualTransactionActive;
}
@Nullable
public TransactionContext getParent() {
return parent;
}
}

73
spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContextHolder.java

@ -0,0 +1,73 @@ @@ -0,0 +1,73 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.transaction.reactive;
import java.util.Stack;
import org.springframework.transaction.NoTransactionException;
/**
* Mutable holder for reactive transaction {@link TransactionContext contexts}.
* This holder keeps references to individual {@link TransactionContext}s.
* @author Mark Paluch
* @since 5.2
* @see TransactionContext
*/
class TransactionContextHolder {
private final Stack<TransactionContext> transactionStack;
TransactionContextHolder(Stack<TransactionContext> transactionStack) {
this.transactionStack = transactionStack;
}
/**
* Return the current {@link TransactionContext}.
* @return the current {@link TransactionContext}.
* @throws NoTransactionException if no transaction is ongoing.
*/
TransactionContext currentContext() {
TransactionContext context = (transactionStack.isEmpty() ? null : transactionStack.peek());
if (context == null) {
throw new NoTransactionException("No transaction in context");
}
return context;
}
/**
* Create a new {@link TransactionContext}.
* @return the new {@link TransactionContext}.
*/
TransactionContext createContext() {
TransactionContext context = (transactionStack.isEmpty() ? null : transactionStack.peek());
return (context == null ? transactionStack.push(new TransactionContext()) :
transactionStack.push(new TransactionContext(context)));
}
/**
* Check whether the holder has a {@link TransactionContext}.
* @return {@literal true} if a {@link TransactionContext} is associated.
*/
boolean hasContext() {
return !transactionStack.isEmpty();
}
}

124
spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContextManager.java

@ -0,0 +1,124 @@ @@ -0,0 +1,124 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.transaction.reactive;
import java.util.Stack;
import java.util.function.Function;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import org.springframework.transaction.NoTransactionException;
/**
* Delegate to register and obtain transactional contexts.
* <p/>
* Typically used by components that intercept or orchestrate transactional flows such as AOP interceptors or
* transactional operators.
*
* @author Mark Paluch
* @since 5.2
* @see ReactiveTransactionSynchronization
*/
public abstract class TransactionContextManager {
private TransactionContextManager() {
/* prevent instantiation */
}
/**
* Obtain the current {@link TransactionContext} from the subscriber context or the
* transactional context holder. Context retrieval fails with NoTransactionException
* if no context or context holder is registered.
* @return the current {@link TransactionContext}
* @throws NoTransactionException if no TransactionContext was found in the subscriber context
* or no context found in a holder
*/
public static Mono<TransactionContext> currentContext() throws NoTransactionException {
return Mono.subscriberContext().handle((ctx, sink) -> {
if (ctx.hasKey(TransactionContext.class)) {
sink.next(ctx.get(TransactionContext.class));
return;
}
if (ctx.hasKey(TransactionContextHolder.class)) {
TransactionContextHolder holder = ctx.get(TransactionContextHolder.class);
if (holder.hasContext()) {
sink.next(holder.currentContext());
return;
}
}
sink.error(new NoTransactionException("No transaction in context"));
});
}
/**
* Create a {@link TransactionContext} and register it in the subscriber {@link Context}.
* @return functional context registration.
* @see Mono#subscriberContext(Function)
* @see Flux#subscriberContext(Function)
* @throws IllegalStateException if a transaction context is already associated.
*/
public static Function<Context, Context> createTransactionContext() {
return context -> context.put(TransactionContext.class, new TransactionContext());
}
/**
* Return a {@link Function} to create or associate a new {@link TransactionContext}.
* Interaction with transactional resources through
* {@link ReactiveTransactionSynchronizationManager} requires a TransactionContext
* to be registered in the subscriber context.
* @return functional context registration.
*/
public static Function<Context, Context> getOrCreateContext() {
return context -> {
TransactionContextHolder holder = context.get(TransactionContextHolder.class);
if (holder.hasContext()) {
context.put(TransactionContext.class, holder.currentContext());
}
return context.put(TransactionContext.class, holder.createContext());
};
}
/**
* Return a {@link Function} to create or associate a new
* {@link TransactionContextHolder}. Creation and release of transactions
* within a reactive flow requires a mutable holder that follows a top to
* down execution scheme. Reactor's subscriber context follows a down to top
* approach regarding mutation visibility.
* @return functional context registration.
*/
public static Function<Context, Context> getOrCreateContextHolder() {
return context -> {
if (!context.hasKey(TransactionContextHolder.class)) {
return context.put(TransactionContextHolder.class, new TransactionContextHolder(new Stack<>()));
}
return context;
};
}
}

114
spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java

@ -0,0 +1,114 @@ @@ -0,0 +1,114 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.transaction.reactive;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.ReactiveTransactionManager;
/**
* Operator class that simplifies programmatic transaction demarcation and
* transaction exception handling.
*
* <p>The central method is {@link #transactional}, supporting transactional wrapping
* of functional sequences code that. This operator handles the transaction lifecycle
* and possible exceptions such that neither the ReactiveTransactionCallback
* implementation nor the calling code needs to explicitly handle transactions.
*
* <p>Typical usage: Allows for writing low-level data access objects that use
* resources such as database connections but are not transaction-aware themselves.
* Instead, they can implicitly participate in transactions handled by higher-level
* application services utilizing this class, making calls to the low-level
* services via an inner-class callback object.
*
* <p>Can be used within a service implementation via direct instantiation with
* a transaction manager reference, or get prepared in an application context
* and passed to services as bean reference. Note: The transaction manager should
* always be configured as bean in the application context: in the first case given
* to the service directly, in the second case given to the prepared template.
*
* <p>Supports setting the propagation behavior and the isolation level by name,
* for convenient configuration in context definitions.
*
* @author Mark Paluch
* @since 5.2
* @see #execute
* @see ReactiveTransactionManager
*/
public interface TransactionalOperator {
/**
* Create a new {@link TransactionalOperator} using {@link ReactiveTransactionManager}.
* @param transactionManager the transaction management strategy to be used
* @return the transactional operator
*/
static TransactionalOperator create(ReactiveTransactionManager transactionManager){
return new DefaultTransactionalOperator(transactionManager);
}
/**
* Create a new {@link TransactionalOperator} using {@link ReactiveTransactionManager}
* and {@link TransactionDefinition}.
*
* @param transactionManager the transaction management strategy to be used
* @param transactionDefinition the transaction definition to apply.
* @return the transactional operator
*/
static TransactionalOperator create(ReactiveTransactionManager transactionManager, TransactionDefinition transactionDefinition){
return new DefaultTransactionalOperator(transactionManager, transactionDefinition);
}
/**
* Wrap the functional sequence specified by the given Flux within a transaction.
* @param flux the Flux that should be executed within the transaction
* @return a result publisher returned by the callback, or {@code null} if none
* @throws TransactionException in case of initialization, rollback, or system errors
* @throws RuntimeException if thrown by the TransactionCallback
*/
default <T> Flux<T> transactional(Flux<T> flux) {
return execute(it -> flux);
}
/**
* Wrap the functional sequence specified by the given Mono within a transaction.
* @param mono the Mono that should be executed within the transaction
* @return a result publisher returned by the callback
* @throws TransactionException in case of initialization, rollback, or system errors
* @throws RuntimeException if thrown by the TransactionCallback
*/
default <T> Mono<T> transactional(Mono<T> mono) {
return execute(it -> mono).next();
}
/**
* Execute the action specified by the given callback object within a transaction.
* <p>Allows for returning a result object created within the transaction, that is,
* a domain object or a collection of domain objects. A RuntimeException thrown
* by the callback is treated as a fatal exception that enforces a rollback.
* Such an exception gets propagated to the caller of the template.
* @param action the callback object that specifies the transactional action
* @return a result object returned by the callback
* @throws TransactionException in case of initialization, rollback, or system errors
* @throws RuntimeException if thrown by the TransactionCallback
*/
<T> Flux<T> execute(ReactiveTransactionCallback<T> action) throws TransactionException;
}

11
spring-tx/src/main/java/org/springframework/transaction/reactive/package-info.java

@ -0,0 +1,11 @@ @@ -0,0 +1,11 @@
/**
* Support classes for the org.springframework.transaction.reactive package.
* Provides an abstract base class for reactive transaction manager implementations,
* and a transactional operator plus callback for transaction demarcation.
*/
@NonNullApi
@NonNullFields
package org.springframework.transaction.reactive;
import org.springframework.lang.NonNullApi;
import org.springframework.lang.NonNullFields;

99
spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTestTransactionManager.java

@ -0,0 +1,99 @@ @@ -0,0 +1,99 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.transaction.reactive;
import reactor.core.publisher.Mono;
import org.springframework.transaction.CannotCreateTransactionException;
import org.springframework.transaction.ReactiveTransactionManager;
import org.springframework.transaction.TransactionDefinition;
/**
* Test implementation of a {@link ReactiveTransactionManager}.
*
* @author Mark Paluch
*/
@SuppressWarnings("serial")
class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager {
private static final Object TRANSACTION = "transaction";
private final boolean existingTransaction;
private final boolean canCreateTransaction;
protected boolean begin = false;
protected boolean commit = false;
protected boolean rollback = false;
protected boolean rollbackOnly = false;
ReactiveTestTransactionManager(boolean existingTransaction, boolean canCreateTransaction) {
this.existingTransaction = existingTransaction;
this.canCreateTransaction = canCreateTransaction;
setTransactionSynchronization(SYNCHRONIZATION_NEVER);
}
@Override
protected Object doGetTransaction(ReactiveTransactionSynchronizationManager synchronizationManager) {
return TRANSACTION;
}
@Override
protected boolean isExistingTransaction(Object transaction) {
return existingTransaction;
}
@Override
protected Mono<Void> doBegin(ReactiveTransactionSynchronizationManager synchronizationManager, Object transaction, TransactionDefinition definition) {
if (!TRANSACTION.equals(transaction)) {
return Mono.error(new IllegalArgumentException("Not the same transaction object"));
}
if (!this.canCreateTransaction) {
return Mono.error(new CannotCreateTransactionException("Cannot create transaction"));
}
return Mono.fromRunnable(() -> this.begin = true);
}
@Override
protected Mono<Void> doCommit(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) {
if (!TRANSACTION.equals(status.getTransaction())) {
return Mono.error(new IllegalArgumentException("Not the same transaction object"));
}
return Mono.fromRunnable(() -> this.commit = true);
}
@Override
protected Mono<Void> doRollback(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) {
if (!TRANSACTION.equals(status.getTransaction())) {
return Mono.error(new IllegalArgumentException("Not the same transaction object"));
}
return Mono.fromRunnable(() -> this.rollback = true);
}
@Override
protected Mono<Void> doSetRollbackOnly(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) {
if (!TRANSACTION.equals(status.getTransaction())) {
return Mono.error(new IllegalArgumentException("Not the same transaction object"));
}
return Mono.fromRunnable(() -> this.rollbackOnly = true);
}
}

228
spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTransactionSupportUnitTests.java

@ -0,0 +1,228 @@ @@ -0,0 +1,228 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.transaction.reactive;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import org.springframework.transaction.IllegalTransactionStateException;
import org.springframework.transaction.ReactiveTransactionManager;
import org.springframework.transaction.ReactiveTransactionStatus;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import static org.junit.Assert.*;
/**
* Unit tests for transactional support through {@link ReactiveTestTransactionManager}.
*
* @author Mark Paluch
*/
public class ReactiveTransactionSupportUnitTests {
@Test
public void noExistingTransaction() {
ReactiveTransactionManager tm = new ReactiveTestTransactionManager(false, true);
tm.getTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_SUPPORTS))
.subscriberContext(TransactionContextManager.createTransactionContext()).cast(DefaultReactiveTransactionStatus.class)
.as(StepVerifier::create).consumeNextWith(actual -> {
assertFalse(actual.hasTransaction());
}).verifyComplete();
tm.getTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED))
.cast(DefaultReactiveTransactionStatus.class).subscriberContext(TransactionContextManager.createTransactionContext())
.as(StepVerifier::create).consumeNextWith(actual -> {
assertTrue(actual.hasTransaction());
assertTrue(actual.isNewTransaction());
}).verifyComplete();
tm.getTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_MANDATORY))
.subscriberContext(TransactionContextManager.createTransactionContext()).cast(DefaultReactiveTransactionStatus.class)
.as(StepVerifier::create).expectError(IllegalTransactionStateException.class).verify();
}
@Test
public void existingTransaction() {
ReactiveTransactionManager tm = new ReactiveTestTransactionManager(true, true);
tm.getTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_SUPPORTS))
.subscriberContext(TransactionContextManager.createTransactionContext()).cast(DefaultReactiveTransactionStatus.class)
.as(StepVerifier::create).consumeNextWith(actual -> {
assertNotNull(actual.getTransaction());
assertFalse(actual.isNewTransaction());
}).verifyComplete();
tm.getTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED))
.subscriberContext(TransactionContextManager.createTransactionContext()).cast(DefaultReactiveTransactionStatus.class)
.as(StepVerifier::create).consumeNextWith(actual -> {
assertNotNull(actual.getTransaction());
assertFalse(actual.isNewTransaction());
}).verifyComplete();
tm.getTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_MANDATORY))
.subscriberContext(TransactionContextManager.createTransactionContext()).cast(DefaultReactiveTransactionStatus.class)
.as(StepVerifier::create).consumeNextWith(actual -> {
assertNotNull(actual.getTransaction());
assertFalse(actual.isNewTransaction());
}).verifyComplete();
}
@Test
public void commitWithoutExistingTransaction() {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true);
tm.getTransaction(null).flatMap(tm::commit).subscriberContext(TransactionContextManager.createTransactionContext())
.as(StepVerifier::create).verifyComplete();
assertHasBegan(tm);
assertHasCommitted(tm);
assertHasNoRollback(tm);
assertHasNotSetRollbackOnly(tm);
}
@Test
public void rollbackWithoutExistingTransaction() {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true);
tm.getTransaction(null).flatMap(tm::rollback)
.subscriberContext(TransactionContextManager.createTransactionContext()).as(StepVerifier::create)
.verifyComplete();
assertHasBegan(tm);
assertHasNotCommitted(tm);
assertHasRolledBack(tm);
assertHasNotSetRollbackOnly(tm);
}
@Test
public void rollbackOnlyWithoutExistingTransaction() {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true);
tm.getTransaction(null).doOnNext(ReactiveTransactionStatus::setRollbackOnly).flatMap(tm::commit)
.subscriberContext(TransactionContextManager.createTransactionContext()).as(StepVerifier::create)
.verifyComplete();
assertHasBegan(tm);
assertHasNotCommitted(tm);
assertHasRolledBack(tm);
assertHasNotSetRollbackOnly(tm);
}
@Test
public void commitWithExistingTransaction() {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(true, true);
tm.getTransaction(null).flatMap(tm::commit).subscriberContext(TransactionContextManager.createTransactionContext())
.as(StepVerifier::create).verifyComplete();
assertHasNotBegan(tm);
assertHasNotCommitted(tm);
assertHasNoRollback(tm);
assertHasNotSetRollbackOnly(tm);
}
@Test
public void rollbackWithExistingTransaction() {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(true, true);
tm.getTransaction(null).flatMap(tm::rollback)
.subscriberContext(TransactionContextManager.createTransactionContext()).as(StepVerifier::create)
.verifyComplete();
assertHasNotBegan(tm);
assertHasNotCommitted(tm);
assertHasNoRollback(tm);
assertHasSetRollbackOnly(tm);
}
@Test
public void rollbackOnlyWithExistingTransaction() {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(true, true);
tm.getTransaction(null).doOnNext(ReactiveTransactionStatus::setRollbackOnly).flatMap(tm::commit)
.subscriberContext(TransactionContextManager.createTransactionContext()).as(StepVerifier::create)
.verifyComplete();
assertHasNotBegan(tm);
assertHasNotCommitted(tm);
assertHasNoRollback(tm);
assertHasSetRollbackOnly(tm);
}
@Test
public void transactionTemplate() {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true);
TransactionalOperator operator = TransactionalOperator.create(tm);
Flux.just("Walter").as(operator::transactional)
.as(StepVerifier::create)
.expectNextCount(1)
.verifyComplete();
assertHasBegan(tm);
assertHasCommitted(tm);
assertHasNoRollback(tm);
assertHasNotSetRollbackOnly(tm);
}
@Test
public void transactionTemplateWithException() {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true);
TransactionalOperator operator = TransactionalOperator.create(tm);
RuntimeException ex = new RuntimeException("Some application exception");
Mono.error(ex).as(operator::transactional)
.as(StepVerifier::create)
.expectError(RuntimeException.class)
.verify();
assertHasBegan(tm);
assertHasNotCommitted(tm);
assertHasRolledBack(tm);
assertHasNotSetRollbackOnly(tm);
}
private void assertHasBegan(ReactiveTestTransactionManager actual) {
assertTrue("Expected <ReactiveTransactionManager.begin()> but was <begin()> was not invoked", actual.begin);
}
private void assertHasNotBegan(ReactiveTestTransactionManager actual) {
assertFalse("Expected to not call <ReactiveTransactionManager.begin()> but was <begin()> was called", actual.begin);
}
private void assertHasCommitted(ReactiveTestTransactionManager actual) {
assertTrue("Expected <ReactiveTransactionManager.commit()> but was <commit()> was not invoked", actual.commit);
}
private void assertHasNotCommitted(ReactiveTestTransactionManager actual) {
assertFalse("Expected to not call <ReactiveTransactionManager.commit()> but was <commit()> was called", actual.commit);
}
private void assertHasRolledBack(ReactiveTestTransactionManager actual) {
assertTrue("Expected <ReactiveTransactionManager.rollback()> but was <rollback()> was not invoked", actual.rollback);
}
private void assertHasNoRollback(ReactiveTestTransactionManager actual) {
assertFalse("Expected to not call <ReactiveTransactionManager.rollback()> but was <rollback()> was called", actual.rollback);
}
private void assertHasSetRollbackOnly(ReactiveTestTransactionManager actual) {
assertTrue("Expected <ReactiveTransactionManager.setRollbackOnly()> but was <setRollbackOnly()> was not invoked", actual.rollbackOnly);
}
private void assertHasNotSetRollbackOnly(ReactiveTestTransactionManager actual) {
assertFalse("Expected to not call <ReactiveTransactionManager.setRollbackOnly()> but was <setRollbackOnly()> was called", actual.rollbackOnly);
}
}

88
spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java

@ -0,0 +1,88 @@ @@ -0,0 +1,88 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.transaction.reactive;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import static org.junit.Assert.*;
/**
* Tests for {@link TransactionalOperator}.
*
* @author Mark Paluch
*/
public class TransactionalOperatorTests {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true);
@Test
public void commitWithMono() {
TransactionalOperator operator = TransactionalOperator.create(tm);
Mono.just(true).as(operator::transactional)
.as(StepVerifier::create)
.expectNext(true)
.verifyComplete();
assertTrue(tm.commit);
assertFalse(tm.rollback);
}
@Test
public void rollbackWithMono() {
TransactionalOperator operator = TransactionalOperator.create(tm);
Mono.error(new IllegalStateException()).as(operator::transactional)
.as(StepVerifier::create)
.verifyError(IllegalStateException.class);
assertFalse(tm.commit);
assertTrue(tm.rollback);
}
@Test
public void commitWithFlux() {
TransactionalOperator operator = TransactionalOperator.create(tm);
Flux.just(true).as(operator::transactional)
.as(StepVerifier::create)
.expectNext(true)
.verifyComplete();
assertTrue(tm.commit);
assertFalse(tm.rollback);
}
@Test
public void rollbackWithFlux() {
TransactionalOperator operator = TransactionalOperator.create(tm);
Flux.error(new IllegalStateException()).as(operator::transactional)
.as(StepVerifier::create)
.verifyError(IllegalStateException.class);
assertFalse(tm.commit);
assertTrue(tm.rollback);
}
}
Loading…
Cancel
Save