|
|
|
|
@ -17,9 +17,14 @@ package org.springframework.data.r2dbc.connectionfactory;
@@ -17,9 +17,14 @@ package org.springframework.data.r2dbc.connectionfactory;
|
|
|
|
|
|
|
|
|
|
import io.r2dbc.spi.Connection; |
|
|
|
|
import io.r2dbc.spi.ConnectionFactory; |
|
|
|
|
import reactor.core.publisher.Mono; |
|
|
|
|
import reactor.util.function.Tuple2; |
|
|
|
|
import reactor.util.function.Tuples; |
|
|
|
|
|
|
|
|
|
import org.apache.commons.logging.Log; |
|
|
|
|
import org.apache.commons.logging.LogFactory; |
|
|
|
|
import org.reactivestreams.Publisher; |
|
|
|
|
|
|
|
|
|
import org.springframework.core.Ordered; |
|
|
|
|
import org.springframework.dao.DataAccessResourceFailureException; |
|
|
|
|
import org.springframework.lang.Nullable; |
|
|
|
|
@ -27,9 +32,6 @@ import org.springframework.transaction.NoTransactionException;
@@ -27,9 +32,6 @@ import org.springframework.transaction.NoTransactionException;
|
|
|
|
|
import org.springframework.transaction.reactive.TransactionSynchronization; |
|
|
|
|
import org.springframework.transaction.reactive.TransactionSynchronizationManager; |
|
|
|
|
import org.springframework.util.Assert; |
|
|
|
|
import reactor.core.publisher.Mono; |
|
|
|
|
import reactor.util.function.Tuple2; |
|
|
|
|
import reactor.util.function.Tuples; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Helper class that provides static methods for obtaining R2DBC Connections from a |
|
|
|
|
@ -50,8 +52,7 @@ public abstract class ConnectionFactoryUtils {
@@ -50,8 +52,7 @@ public abstract class ConnectionFactoryUtils {
|
|
|
|
|
|
|
|
|
|
private static final Log logger = LogFactory.getLog(ConnectionFactoryUtils.class); |
|
|
|
|
|
|
|
|
|
private ConnectionFactoryUtils() { |
|
|
|
|
} |
|
|
|
|
private ConnectionFactoryUtils() {} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Obtain a {@link io.r2dbc.spi.Connection} from the given {@link io.r2dbc.spi.ConnectionFactory}. Translates |
|
|
|
|
@ -85,7 +86,7 @@ public abstract class ConnectionFactoryUtils {
@@ -85,7 +86,7 @@ public abstract class ConnectionFactoryUtils {
|
|
|
|
|
|
|
|
|
|
Assert.notNull(connectionFactory, "ConnectionFactory must not be null!"); |
|
|
|
|
|
|
|
|
|
return TransactionSynchronizationManager.currentTransaction().flatMap(synchronizationManager -> { |
|
|
|
|
return TransactionSynchronizationManager.forCurrentTransaction().flatMap(synchronizationManager -> { |
|
|
|
|
|
|
|
|
|
ConnectionHolder conHolder = (ConnectionHolder) synchronizationManager.getResource(connectionFactory); |
|
|
|
|
if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) { |
|
|
|
|
@ -180,8 +181,7 @@ public abstract class ConnectionFactoryUtils {
@@ -180,8 +181,7 @@ public abstract class ConnectionFactoryUtils {
|
|
|
|
|
return Tuples.of(it, connectionFactory); |
|
|
|
|
})); |
|
|
|
|
|
|
|
|
|
return Mono.justOrEmpty(resource) |
|
|
|
|
.flatMap(ConnectionFactoryUtils::createConnection) |
|
|
|
|
return Mono.justOrEmpty(resource).flatMap(ConnectionFactoryUtils::createConnection) |
|
|
|
|
.switchIfEmpty(attachNewConnection); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -234,7 +234,7 @@ public abstract class ConnectionFactoryUtils {
@@ -234,7 +234,7 @@ public abstract class ConnectionFactoryUtils {
|
|
|
|
|
public static Mono<Void> doReleaseConnection(@Nullable io.r2dbc.spi.Connection con, |
|
|
|
|
@Nullable ConnectionFactory connectionFactory) { |
|
|
|
|
|
|
|
|
|
return TransactionSynchronizationManager.currentTransaction().flatMap(it -> { |
|
|
|
|
return TransactionSynchronizationManager.forCurrentTransaction().flatMap(it -> { |
|
|
|
|
|
|
|
|
|
ConnectionHolder conHolder = (ConnectionHolder) it.getResource(connectionFactory); |
|
|
|
|
if (conHolder != null && connectionEquals(conHolder, con)) { |
|
|
|
|
@ -339,7 +339,7 @@ public abstract class ConnectionFactoryUtils {
@@ -339,7 +339,7 @@ public abstract class ConnectionFactoryUtils {
|
|
|
|
|
*/ |
|
|
|
|
public static Mono<ConnectionFactory> currentConnectionFactory(ConnectionFactory connectionFactory) { |
|
|
|
|
|
|
|
|
|
return TransactionSynchronizationManager.currentTransaction() |
|
|
|
|
return TransactionSynchronizationManager.forCurrentTransaction() |
|
|
|
|
.filter(TransactionSynchronizationManager::isSynchronizationActive).filter(it -> { |
|
|
|
|
|
|
|
|
|
ConnectionHolder conHolder = (ConnectionHolder) it.getResource(connectionFactory); |
|
|
|
|
@ -420,13 +420,13 @@ public abstract class ConnectionFactoryUtils {
@@ -420,13 +420,13 @@ public abstract class ConnectionFactoryUtils {
|
|
|
|
|
|
|
|
|
|
TransactionResources currentSynchronization = synchronization.getCurrentTransaction(); |
|
|
|
|
return currentSynchronization.getResource(ConnectionFactory.class); |
|
|
|
|
}).switchIfEmpty(Mono.error(new DataAccessResourceFailureException( |
|
|
|
|
"Cannot extract ConnectionFactory from current TransactionContext!"))); |
|
|
|
|
}).switchIfEmpty(Mono.error( |
|
|
|
|
new DataAccessResourceFailureException("Cannot extract ConnectionFactory from current TransactionContext!"))); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Create a {@link Connection} via the given {@link ConnectionFactory#create() factory} and return a {@link Tuple2} associating the |
|
|
|
|
* {@link Connection} with its creating {@link ConnectionFactory}. |
|
|
|
|
* Create a {@link Connection} via the given {@link ConnectionFactory#create() factory} and return a {@link Tuple2} |
|
|
|
|
* associating the {@link Connection} with its creating {@link ConnectionFactory}. |
|
|
|
|
* |
|
|
|
|
* @param factory must not be {@literal null}. |
|
|
|
|
* @return never {@literal null} |
|
|
|
|
@ -437,8 +437,7 @@ public abstract class ConnectionFactoryUtils {
@@ -437,8 +437,7 @@ public abstract class ConnectionFactoryUtils {
|
|
|
|
|
logger.debug("Fetching resumed R2DBC Connection from ConnectionFactory"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return Mono.from(factory.create()) |
|
|
|
|
.map(connection -> Tuples.of(connection, factory)); |
|
|
|
|
return Mono.from(factory.create()).map(connection -> Tuples.of(connection, factory)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
@ -469,7 +468,7 @@ public abstract class ConnectionFactoryUtils {
@@ -469,7 +468,7 @@ public abstract class ConnectionFactoryUtils {
|
|
|
|
|
public Mono<Void> suspend() { |
|
|
|
|
if (this.holderActive) { |
|
|
|
|
|
|
|
|
|
return TransactionSynchronizationManager.currentTransaction().flatMap(it -> { |
|
|
|
|
return TransactionSynchronizationManager.forCurrentTransaction().flatMap(it -> { |
|
|
|
|
|
|
|
|
|
it.unbindResource(this.connectionFactory); |
|
|
|
|
if (this.connectionHolder.hasConnection() && !this.connectionHolder.isOpen()) { |
|
|
|
|
@ -490,7 +489,7 @@ public abstract class ConnectionFactoryUtils {
@@ -490,7 +489,7 @@ public abstract class ConnectionFactoryUtils {
|
|
|
|
|
@Override |
|
|
|
|
public Mono<Void> resume() { |
|
|
|
|
if (this.holderActive) { |
|
|
|
|
return TransactionSynchronizationManager.currentTransaction().doOnNext(it -> { |
|
|
|
|
return TransactionSynchronizationManager.forCurrentTransaction().doOnNext(it -> { |
|
|
|
|
it.bindResource(this.connectionFactory, this.connectionHolder); |
|
|
|
|
}).then(); |
|
|
|
|
} |
|
|
|
|
@ -506,7 +505,7 @@ public abstract class ConnectionFactoryUtils {
@@ -506,7 +505,7 @@ public abstract class ConnectionFactoryUtils {
|
|
|
|
|
// to avoid issues with strict transaction implementations that expect
|
|
|
|
|
// the close call before transaction completion.
|
|
|
|
|
if (!this.connectionHolder.isOpen()) { |
|
|
|
|
return TransactionSynchronizationManager.currentTransaction().flatMap(it -> { |
|
|
|
|
return TransactionSynchronizationManager.forCurrentTransaction().flatMap(it -> { |
|
|
|
|
|
|
|
|
|
it.unbindResource(this.connectionFactory); |
|
|
|
|
this.holderActive = false; |
|
|
|
|
@ -529,7 +528,7 @@ public abstract class ConnectionFactoryUtils {
@@ -529,7 +528,7 @@ public abstract class ConnectionFactoryUtils {
|
|
|
|
|
if (this.holderActive) { |
|
|
|
|
// The thread-bound ConnectionHolder might not be available anymore,
|
|
|
|
|
// since afterCompletion might get called from a different thread.
|
|
|
|
|
return TransactionSynchronizationManager.currentTransaction().flatMap(it -> { |
|
|
|
|
return TransactionSynchronizationManager.forCurrentTransaction().flatMap(it -> { |
|
|
|
|
|
|
|
|
|
it.unbindResourceIfPossible(this.connectionFactory); |
|
|
|
|
this.holderActive = false; |
|
|
|
|
|