diff --git a/src/main/java/org/springframework/data/r2dbc/connectionfactory/ConnectionFactoryUtils.java b/src/main/java/org/springframework/data/r2dbc/connectionfactory/ConnectionFactoryUtils.java index a0199f94e..49f01c925 100644 --- a/src/main/java/org/springframework/data/r2dbc/connectionfactory/ConnectionFactoryUtils.java +++ b/src/main/java/org/springframework/data/r2dbc/connectionfactory/ConnectionFactoryUtils.java @@ -17,9 +17,14 @@ package org.springframework.data.r2dbc.connectionfactory; import io.r2dbc.spi.Connection; import io.r2dbc.spi.ConnectionFactory; +import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.reactivestreams.Publisher; + import org.springframework.core.Ordered; import org.springframework.dao.DataAccessResourceFailureException; import org.springframework.lang.Nullable; @@ -27,9 +32,6 @@ import org.springframework.transaction.NoTransactionException; import org.springframework.transaction.reactive.TransactionSynchronization; import org.springframework.transaction.reactive.TransactionSynchronizationManager; import org.springframework.util.Assert; -import reactor.core.publisher.Mono; -import reactor.util.function.Tuple2; -import reactor.util.function.Tuples; /** * Helper class that provides static methods for obtaining R2DBC Connections from a @@ -50,8 +52,7 @@ public abstract class ConnectionFactoryUtils { private static final Log logger = LogFactory.getLog(ConnectionFactoryUtils.class); - private ConnectionFactoryUtils() { - } + private ConnectionFactoryUtils() {} /** * Obtain a {@link io.r2dbc.spi.Connection} from the given {@link io.r2dbc.spi.ConnectionFactory}. Translates @@ -85,7 +86,7 @@ public abstract class ConnectionFactoryUtils { Assert.notNull(connectionFactory, "ConnectionFactory must not be null!"); - return TransactionSynchronizationManager.currentTransaction().flatMap(synchronizationManager -> { + return TransactionSynchronizationManager.forCurrentTransaction().flatMap(synchronizationManager -> { ConnectionHolder conHolder = (ConnectionHolder) synchronizationManager.getResource(connectionFactory); if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) { @@ -180,8 +181,7 @@ public abstract class ConnectionFactoryUtils { return Tuples.of(it, connectionFactory); })); - return Mono.justOrEmpty(resource) - .flatMap(ConnectionFactoryUtils::createConnection) + return Mono.justOrEmpty(resource).flatMap(ConnectionFactoryUtils::createConnection) .switchIfEmpty(attachNewConnection); } @@ -212,7 +212,7 @@ public abstract class ConnectionFactoryUtils { * * @param con the {@link io.r2dbc.spi.Connection} to close if necessary. * @param connectionFactory the {@link ConnectionFactory} that the Connection was obtained from (may be - * {@literal null}). + * {@literal null}). * @see #getConnection */ public static Mono releaseConnection(@Nullable io.r2dbc.spi.Connection con, @@ -228,13 +228,13 @@ public abstract class ConnectionFactoryUtils { * * @param con the {@link io.r2dbc.spi.Connection} to close if necessary. * @param connectionFactory the {@link ConnectionFactory} that the Connection was obtained from (may be - * {@literal null}). + * {@literal null}). * @see #doGetConnection */ public static Mono doReleaseConnection(@Nullable io.r2dbc.spi.Connection con, @Nullable ConnectionFactory connectionFactory) { - return TransactionSynchronizationManager.currentTransaction().flatMap(it -> { + return TransactionSynchronizationManager.forCurrentTransaction().flatMap(it -> { ConnectionHolder conHolder = (ConnectionHolder) it.getResource(connectionFactory); if (conHolder != null && connectionEquals(conHolder, con)) { @@ -302,7 +302,7 @@ public abstract class ConnectionFactoryUtils { * {@link reactor.util.context.Context}. * * @throws NoTransactionException if no active {@link ReactiveTransactionSynchronization} is associated with the - * current subscription. + * current subscription. * @see Mono#subscriberContext() * @see ReactiveTransactionSynchronization */ @@ -319,7 +319,7 @@ public abstract class ConnectionFactoryUtils { * {@link reactor.util.context.Context}. * * @throws NoTransactionException if no active {@link ReactiveTransactionSynchronization} is associated with the - * current subscription. + * current subscription. * @see Mono#subscriberContext() * @see ReactiveTransactionSynchronization */ @@ -339,7 +339,7 @@ public abstract class ConnectionFactoryUtils { */ public static Mono currentConnectionFactory(ConnectionFactory connectionFactory) { - return TransactionSynchronizationManager.currentTransaction() + return TransactionSynchronizationManager.forCurrentTransaction() .filter(TransactionSynchronizationManager::isSynchronizationActive).filter(it -> { ConnectionHolder conHolder = (ConnectionHolder) it.getResource(connectionFactory); @@ -357,7 +357,7 @@ public abstract class ConnectionFactoryUtils { * * @param conHolder the {@link ConnectionHolder} for the held Connection (potentially a proxy) * @param passedInCon the {@link Connection} passed-in by the user (potentially a target {@link Connection} without - * proxy) + * proxy) * @return whether the given Connections are equal * @see #getTargetConnection */ @@ -420,13 +420,13 @@ public abstract class ConnectionFactoryUtils { TransactionResources currentSynchronization = synchronization.getCurrentTransaction(); return currentSynchronization.getResource(ConnectionFactory.class); - }).switchIfEmpty(Mono.error(new DataAccessResourceFailureException( - "Cannot extract ConnectionFactory from current TransactionContext!"))); + }).switchIfEmpty(Mono.error( + new DataAccessResourceFailureException("Cannot extract ConnectionFactory from current TransactionContext!"))); } /** - * Create a {@link Connection} via the given {@link ConnectionFactory#create() factory} and return a {@link Tuple2} associating the - * {@link Connection} with its creating {@link ConnectionFactory}. + * Create a {@link Connection} via the given {@link ConnectionFactory#create() factory} and return a {@link Tuple2} + * associating the {@link Connection} with its creating {@link ConnectionFactory}. * * @param factory must not be {@literal null}. * @return never {@literal null} @@ -437,8 +437,7 @@ public abstract class ConnectionFactoryUtils { logger.debug("Fetching resumed R2DBC Connection from ConnectionFactory"); } - return Mono.from(factory.create()) - .map(connection -> Tuples.of(connection, factory)); + return Mono.from(factory.create()).map(connection -> Tuples.of(connection, factory)); } /** @@ -469,7 +468,7 @@ public abstract class ConnectionFactoryUtils { public Mono suspend() { if (this.holderActive) { - return TransactionSynchronizationManager.currentTransaction().flatMap(it -> { + return TransactionSynchronizationManager.forCurrentTransaction().flatMap(it -> { it.unbindResource(this.connectionFactory); if (this.connectionHolder.hasConnection() && !this.connectionHolder.isOpen()) { @@ -490,7 +489,7 @@ public abstract class ConnectionFactoryUtils { @Override public Mono resume() { if (this.holderActive) { - return TransactionSynchronizationManager.currentTransaction().doOnNext(it -> { + return TransactionSynchronizationManager.forCurrentTransaction().doOnNext(it -> { it.bindResource(this.connectionFactory, this.connectionHolder); }).then(); } @@ -506,7 +505,7 @@ public abstract class ConnectionFactoryUtils { // to avoid issues with strict transaction implementations that expect // the close call before transaction completion. if (!this.connectionHolder.isOpen()) { - return TransactionSynchronizationManager.currentTransaction().flatMap(it -> { + return TransactionSynchronizationManager.forCurrentTransaction().flatMap(it -> { it.unbindResource(this.connectionFactory); this.holderActive = false; @@ -529,7 +528,7 @@ public abstract class ConnectionFactoryUtils { if (this.holderActive) { // The thread-bound ConnectionHolder might not be available anymore, // since afterCompletion might get called from a different thread. - return TransactionSynchronizationManager.currentTransaction().flatMap(it -> { + return TransactionSynchronizationManager.forCurrentTransaction().flatMap(it -> { it.unbindResourceIfPossible(this.connectionFactory); this.holderActive = false; diff --git a/src/test/java/org/springframework/data/r2dbc/connectionfactory/R2dbcTransactionManagerUnitTests.java b/src/test/java/org/springframework/data/r2dbc/connectionfactory/R2dbcTransactionManagerUnitTests.java index afb28682e..aa5bb7995 100644 --- a/src/test/java/org/springframework/data/r2dbc/connectionfactory/R2dbcTransactionManagerUnitTests.java +++ b/src/test/java/org/springframework/data/r2dbc/connectionfactory/R2dbcTransactionManagerUnitTests.java @@ -73,7 +73,7 @@ public class R2dbcTransactionManagerUnitTests { ConnectionFactoryUtils.getConnection(connectionFactoryMock).map(Tuple2::getT1).flatMap(it -> { - return TransactionSynchronizationManager.currentTransaction() + return TransactionSynchronizationManager.forCurrentTransaction() .doOnNext(synchronizationManager -> synchronizationManager.registerSynchronization(sync)); }) // @@ -209,7 +209,7 @@ public class R2dbcTransactionManagerUnitTests { tx.setRollbackOnly(); assertThat(tx.isNewTransaction()).isTrue(); - return TransactionSynchronizationManager.currentTransaction().doOnNext(it -> { + return TransactionSynchronizationManager.forCurrentTransaction().doOnNext(it -> { assertThat(it.hasResource(connectionFactoryMock)).isTrue(); it.registerSynchronization(sync);