From 847e8a2b23d5626395d1ae868e54ef30286bb681 Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Fri, 29 Sep 2023 14:57:17 +0200 Subject: [PATCH 1/3] Restore auto-commit mode if not done by driver Closes gh-31268 --- .../connection/R2dbcTransactionManager.java | 33 ++++++++++-- .../R2dbcTransactionManagerUnitTests.java | 54 +++++++++---------- 2 files changed, 53 insertions(+), 34 deletions(-) diff --git a/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java b/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java index f3edbf16b9a..38ef488cae5 100644 --- a/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java +++ b/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java @@ -209,7 +209,7 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager connectionMono = Mono.just(txObject.getConnectionHolder().getConnection()); } - return connectionMono.flatMap(con -> doBegin(definition, con) + return connectionMono.flatMap(con -> doBegin(con, txObject, definition) .then(prepareTransactionalConnection(con, definition)) .doOnSuccess(v -> { txObject.getConnectionHolder().setTransactionActive(true); @@ -233,7 +233,10 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager }).then(); } - private Mono doBegin(TransactionDefinition definition, Connection con) { + private Mono doBegin( + Connection con, ConnectionFactoryTransactionObject transaction, TransactionDefinition definition) { + + transaction.setMustRestoreAutoCommit(con.isAutoCommit()); io.r2dbc.spi.TransactionDefinition transactionDefinition = createTransactionDefinition(definition); if (logger.isDebugEnabled()) { logger.debug("Starting R2DBC transaction on Connection [" + con + "] using [" + transactionDefinition + "]"); @@ -354,12 +357,22 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager if (logger.isDebugEnabled()) { logger.debug("Releasing R2DBC Connection [" + con + "] after transaction"); } + Mono restoreMono = Mono.empty(); + if (txObject.isMustRestoreAutoCommit() && !con.isAutoCommit()) { + restoreMono = Mono.from(con.setAutoCommit(true)); + if (logger.isDebugEnabled()) { + restoreMono = restoreMono.doOnError(ex -> + logger.debug(String.format("Error ignored during auto-commit restore: %s", ex))); + } + restoreMono = restoreMono.onErrorComplete(); + } Mono releaseMono = ConnectionFactoryUtils.releaseConnection(con, obtainConnectionFactory()); if (logger.isDebugEnabled()) { - releaseMono = releaseMono.doOnError( - ex -> logger.debug(String.format("Error ignored during cleanup: %s", ex))); + releaseMono = releaseMono.doOnError(ex -> + logger.debug(String.format("Error ignored during connection release: %s", ex))); } - return releaseMono.onErrorComplete(); + releaseMono = releaseMono.onErrorComplete(); + return restoreMono.then(releaseMono); } } finally { @@ -482,6 +495,8 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager private boolean newConnectionHolder; + private boolean mustRestoreAutoCommit; + @Nullable private String savepointName; @@ -507,6 +522,14 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager return (this.connectionHolder != null); } + public void setMustRestoreAutoCommit(boolean mustRestoreAutoCommit) { + this.mustRestoreAutoCommit = mustRestoreAutoCommit; + } + + public boolean isMustRestoreAutoCommit() { + return this.mustRestoreAutoCommit; + } + public boolean isTransactionActive() { return (this.connectionHolder != null && this.connectionHolder.isTransactionActive()); } diff --git a/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java b/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java index 05cc75cfc04..2dd2341cc71 100644 --- a/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java +++ b/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java @@ -81,11 +81,12 @@ class R2dbcTransactionManagerUnitTests { @Test void testSimpleTransaction() { - TestTransactionSynchronization sync = new TestTransactionSynchronization( - TransactionSynchronization.STATUS_COMMITTED); + when(connectionMock.isAutoCommit()).thenReturn(false); AtomicInteger commits = new AtomicInteger(); when(connectionMock.commitTransaction()).thenReturn( Mono.fromRunnable(commits::incrementAndGet)); + TestTransactionSynchronization sync = new TestTransactionSynchronization( + TransactionSynchronization.STATUS_COMMITTED); TransactionalOperator operator = TransactionalOperator.create(tm); @@ -98,6 +99,7 @@ class R2dbcTransactionManagerUnitTests { .verifyComplete(); assertThat(commits).hasValue(1); + verify(connectionMock).isAutoCommit(); verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class)); verify(connectionMock).commitTransaction(); verify(connectionMock).close(); @@ -131,8 +133,10 @@ class R2dbcTransactionManagerUnitTests { } @Test - void appliesTransactionDefinition() { + void appliesTransactionDefinitionAndAutoCommit() { + when(connectionMock.isAutoCommit()).thenReturn(true, false); when(connectionMock.commitTransaction()).thenReturn(Mono.empty()); + when(connectionMock.setAutoCommit(true)).thenReturn(Mono.empty()); DefaultTransactionDefinition definition = new DefaultTransactionDefinition(); definition.setName("my-transaction"); @@ -152,6 +156,7 @@ class R2dbcTransactionManagerUnitTests { verify(connectionMock).beginTransaction(txCaptor.capture()); verify(connectionMock, never()).setTransactionIsolationLevel(any()); verify(connectionMock).commitTransaction(); + verify(connectionMock).setAutoCommit(true); verify(connectionMock).close(); io.r2dbc.spi.TransactionDefinition def = txCaptor.getValue(); @@ -162,29 +167,8 @@ class R2dbcTransactionManagerUnitTests { } @Test - void doesNotSetIsolationLevelIfMatch() { - when(connectionMock.getTransactionIsolationLevel()).thenReturn( - IsolationLevel.READ_COMMITTED); - when(connectionMock.commitTransaction()).thenReturn(Mono.empty()); - - DefaultTransactionDefinition definition = new DefaultTransactionDefinition(); - definition.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED); - - TransactionalOperator operator = TransactionalOperator.create(tm, definition); - - ConnectionFactoryUtils.getConnection(connectionFactoryMock) - .as(operator::transactional) - .as(StepVerifier::create) - .expectNextCount(1) - .verifyComplete(); - - verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class)); - verify(connectionMock, never()).setTransactionIsolationLevel(any()); - verify(connectionMock).commitTransaction(); - } - - @Test - void doesNotSetAutoCommitDisabled() { + void doesNotSetAutoCommitIfRestoredByDriver() { + when(connectionMock.isAutoCommit()).thenReturn(true, true); when(connectionMock.commitTransaction()).thenReturn(Mono.empty()); DefaultTransactionDefinition definition = new DefaultTransactionDefinition(); @@ -204,6 +188,7 @@ class R2dbcTransactionManagerUnitTests { @Test void appliesReadOnly() { + when(connectionMock.isAutoCommit()).thenReturn(false); when(connectionMock.commitTransaction()).thenReturn(Mono.empty()); when(connectionMock.setTransactionIsolationLevel(any())).thenReturn(Mono.empty()); Statement statement = mock(); @@ -222,6 +207,7 @@ class R2dbcTransactionManagerUnitTests { .expectNextCount(1) .verifyComplete(); + verify(connectionMock).isAutoCommit(); verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class)); verify(connectionMock).createStatement("SET TRANSACTION READ ONLY"); verify(connectionMock).commitTransaction(); @@ -231,7 +217,9 @@ class R2dbcTransactionManagerUnitTests { @Test void testCommitFails() { - when(connectionMock.commitTransaction()).thenReturn(Mono.defer(() -> Mono.error(new R2dbcBadGrammarException("Commit should fail")))); + when(connectionMock.isAutoCommit()).thenReturn(false); + when(connectionMock.commitTransaction()).thenReturn(Mono.defer(() -> + Mono.error(new R2dbcBadGrammarException("Commit should fail")))); when(connectionMock.rollbackTransaction()).thenReturn(Mono.empty()); TransactionalOperator operator = TransactionalOperator.create(tm); @@ -242,6 +230,7 @@ class R2dbcTransactionManagerUnitTests { .as(StepVerifier::create) .verifyError(BadSqlGrammarException.class); + verify(connectionMock).isAutoCommit(); verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class)); verify(connectionMock).createStatement("foo"); verify(connectionMock).commitTransaction(); @@ -252,6 +241,7 @@ class R2dbcTransactionManagerUnitTests { @Test void testRollback() { + when(connectionMock.isAutoCommit()).thenReturn(false); AtomicInteger commits = new AtomicInteger(); when(connectionMock.commitTransaction()).thenReturn( Mono.fromRunnable(commits::incrementAndGet)); @@ -269,6 +259,7 @@ class R2dbcTransactionManagerUnitTests { assertThat(commits).hasValue(0); assertThat(rollbacks).hasValue(1); + verify(connectionMock).isAutoCommit(); verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class)); verify(connectionMock).rollbackTransaction(); verify(connectionMock).close(); @@ -278,7 +269,8 @@ class R2dbcTransactionManagerUnitTests { @Test @SuppressWarnings("unchecked") void testRollbackFails() { - when(connectionMock.rollbackTransaction()).thenReturn(Mono.defer(() -> Mono.error(new R2dbcBadGrammarException("Commit should fail"))), Mono.empty()); + when(connectionMock.rollbackTransaction()).thenReturn(Mono.defer(() -> + Mono.error(new R2dbcBadGrammarException("Commit should fail"))), Mono.empty()); TransactionalOperator operator = TransactionalOperator.create(tm); operator.execute(reactiveTransaction -> { @@ -287,6 +279,7 @@ class R2dbcTransactionManagerUnitTests { .doOnNext(connection -> connection.createStatement("foo")).then(); }).as(StepVerifier::create).verifyError(BadSqlGrammarException.class); + verify(connectionMock).isAutoCommit(); verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class)); verify(connectionMock).createStatement("foo"); verify(connectionMock, never()).commitTransaction(); @@ -298,7 +291,8 @@ class R2dbcTransactionManagerUnitTests { @Test @SuppressWarnings("unchecked") void testConnectionReleasedWhenRollbackFails() { - when(connectionMock.rollbackTransaction()).thenReturn(Mono.defer(() -> Mono.error(new R2dbcBadGrammarException("Rollback should fail"))), Mono.empty()); + when(connectionMock.rollbackTransaction()).thenReturn(Mono.defer(() -> + Mono.error(new R2dbcBadGrammarException("Rollback should fail"))), Mono.empty()); when(connectionMock.setTransactionIsolationLevel(any())).thenReturn(Mono.empty()); TransactionalOperator operator = TransactionalOperator.create(tm); @@ -319,6 +313,7 @@ class R2dbcTransactionManagerUnitTests { @Test void testTransactionSetRollbackOnly() { + when(connectionMock.isAutoCommit()).thenReturn(false); when(connectionMock.rollbackTransaction()).thenReturn(Mono.empty()); TestTransactionSynchronization sync = new TestTransactionSynchronization( TransactionSynchronization.STATUS_ROLLED_BACK); @@ -334,6 +329,7 @@ class R2dbcTransactionManagerUnitTests { }).then(); }).as(StepVerifier::create).verifyComplete(); + verify(connectionMock).isAutoCommit(); verify(connectionMock).beginTransaction(any(io.r2dbc.spi.TransactionDefinition.class)); verify(connectionMock).rollbackTransaction(); verify(connectionMock).close(); From 4cf5c7796d0b96d74f7fc3279358e229eb98244f Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Fri, 29 Sep 2023 14:57:40 +0200 Subject: [PATCH 2/3] Explicit note on local bean access within @PostConstruct method Closes gh-27876 --- .../core/beans/java/composing-configuration-classes.adoc | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/framework-docs/modules/ROOT/pages/core/beans/java/composing-configuration-classes.adoc b/framework-docs/modules/ROOT/pages/core/beans/java/composing-configuration-classes.adoc index bb919b2af7e..638c8736b34 100644 --- a/framework-docs/modules/ROOT/pages/core/beans/java/composing-configuration-classes.adoc +++ b/framework-docs/modules/ROOT/pages/core/beans/java/composing-configuration-classes.adoc @@ -113,7 +113,8 @@ issue, because no compiler is involved, and you can declare When using `@Configuration` classes, the Java compiler places constraints on the configuration model, in that references to other beans must be valid Java syntax. -Fortunately, solving this problem is simple. As xref:core/beans/java/bean-annotation.adoc#beans-java-dependencies[we already discussed], +Fortunately, solving this problem is simple. As +xref:core/beans/java/bean-annotation.adoc#beans-java-dependencies[we already discussed], a `@Bean` method can have an arbitrary number of parameters that describe the bean dependencies. Consider the following more real-world scenario with several `@Configuration` classes, each depending on beans declared in the others: @@ -204,7 +205,6 @@ Kotlin:: ---- ====== - There is another way to achieve the same result. Remember that `@Configuration` classes are ultimately only another bean in the container: This means that they can take advantage of `@Autowired` and `@Value` injection and other features the same as any other bean. @@ -216,6 +216,11 @@ classes are processed quite early during the initialization of the context, and to be injected this way may lead to unexpected early initialization. Whenever possible, resort to parameter-based injection, as in the preceding example. +Avoid access to locally defined beans within a `@PostConstruct` method on the same configuration +class. This effectively leads to a circular reference since non-static `@Bean` methods semantically +require a fully initialized configuration class instance to be called on. With circular references +disallowed (e.g. in Spring Boot 2.6+), this may trigger a `BeanCurrentlyInCreationException`. + Also, be particularly careful with `BeanPostProcessor` and `BeanFactoryPostProcessor` definitions through `@Bean`. Those should usually be declared as `static @Bean` methods, not triggering the instantiation of their containing configuration class. Otherwise, `@Autowired` and `@Value` may not From 407113945d7c70a079f6096af63fbc3dbece00b7 Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Fri, 29 Sep 2023 14:58:02 +0200 Subject: [PATCH 3/3] Polishing --- .../context/annotation/Configuration.java | 6 ++-- .../util/backoff/ExponentialBackOff.java | 31 +++++++++---------- .../orm/jpa/vendor/HibernateJpaDialect.java | 26 ++++++++-------- .../r2dbc/core/DefaultDatabaseClient.java | 5 ++- 4 files changed, 32 insertions(+), 36 deletions(-) diff --git a/spring-context/src/main/java/org/springframework/context/annotation/Configuration.java b/spring-context/src/main/java/org/springframework/context/annotation/Configuration.java index 17633580bd8..5acfd2d2c16 100644 --- a/spring-context/src/main/java/org/springframework/context/annotation/Configuration.java +++ b/spring-context/src/main/java/org/springframework/context/annotation/Configuration.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -104,8 +104,8 @@ import org.springframework.stereotype.Component; * * } * - *

{@code @Configuration} classes may not only be bootstrapped using - * component scanning, but may also themselves configure component scanning using + *

{@code @Configuration} classes may not only be bootstrapped using component + * scanning, but may also themselves configure component scanning using * the {@link ComponentScan @ComponentScan} annotation: * *

diff --git a/spring-core/src/main/java/org/springframework/util/backoff/ExponentialBackOff.java b/spring-core/src/main/java/org/springframework/util/backoff/ExponentialBackOff.java
index 0331699a5b6..dceea17b91a 100644
--- a/spring-core/src/main/java/org/springframework/util/backoff/ExponentialBackOff.java
+++ b/spring-core/src/main/java/org/springframework/util/backoff/ExponentialBackOff.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2002-2020 the original author or authors.
+ * Copyright 2002-2023 the original author or authors.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -20,12 +20,12 @@ import org.springframework.util.Assert;
 
 /**
  * Implementation of {@link BackOff} that increases the back off period for each
- * retry attempt. When the interval has reached the {@link #setMaxInterval(long)
+ * retry attempt. When the interval has reached the {@linkplain #setMaxInterval
  * max interval}, it is no longer increased. Stops retrying once the
- * {@link #setMaxElapsedTime(long) max elapsed time} has been reached.
+ * {@linkplain #setMaxElapsedTime max elapsed time} has been reached.
  *
- * 

Example: The default interval is {@value #DEFAULT_INITIAL_INTERVAL} ms, - * the default multiplier is {@value #DEFAULT_MULTIPLIER}, and the default max + *

Example: The default interval is {@value #DEFAULT_INITIAL_INTERVAL} ms; + * the default multiplier is {@value #DEFAULT_MULTIPLIER}; and the default max * interval is {@value #DEFAULT_MAX_INTERVAL}. For 10 attempts the sequence will be * as follows: * @@ -44,10 +44,9 @@ import org.springframework.util.Assert; * 10 30000 *

* - *

Note that the default max elapsed time is {@link Long#MAX_VALUE}. Use - * {@link #setMaxElapsedTime(long)} to limit the maximum length of time - * that an instance should accumulate before returning - * {@link BackOffExecution#STOP}. + *

Note that the default max elapsed time is {@link Long#MAX_VALUE}. + * Use {@link #setMaxElapsedTime} to limit the maximum length of time that an + * instance should accumulate before returning {@link BackOffExecution#STOP}. * * @author Stephane Nicoll * @since 4.1 @@ -107,7 +106,7 @@ public class ExponentialBackOff implements BackOff { /** - * The initial interval in milliseconds. + * Set the initial interval in milliseconds. */ public void setInitialInterval(long initialInterval) { this.initialInterval = initialInterval; @@ -121,7 +120,7 @@ public class ExponentialBackOff implements BackOff { } /** - * The value to multiply the current interval by for each retry attempt. + * Set the value to multiply the current interval by for each retry attempt. */ public void setMultiplier(double multiplier) { checkMultiplier(multiplier); @@ -136,21 +135,21 @@ public class ExponentialBackOff implements BackOff { } /** - * The maximum back off time. + * Set the maximum back off time in milliseconds. */ public void setMaxInterval(long maxInterval) { this.maxInterval = maxInterval; } /** - * Return the maximum back off time. + * Return the maximum back off time in milliseconds. */ public long getMaxInterval() { return this.maxInterval; } /** - * The maximum elapsed time in milliseconds after which a call to + * Set the maximum elapsed time in milliseconds after which a call to * {@link BackOffExecution#nextBackOff()} returns {@link BackOffExecution#STOP}. */ public void setMaxElapsedTime(long maxElapsedTime) { @@ -184,10 +183,9 @@ public class ExponentialBackOff implements BackOff { @Override public long nextBackOff() { - if (this.currentElapsedTime >= maxElapsedTime) { + if (this.currentElapsedTime >= getMaxElapsedTime()) { return STOP; } - long nextInterval = computeNextInterval(); this.currentElapsedTime += nextInterval; return nextInterval; @@ -214,7 +212,6 @@ public class ExponentialBackOff implements BackOff { return Math.min(i, maxInterval); } - @Override public String toString() { StringBuilder sb = new StringBuilder("ExponentialBackOff{"); diff --git a/spring-orm/src/main/java/org/springframework/orm/jpa/vendor/HibernateJpaDialect.java b/spring-orm/src/main/java/org/springframework/orm/jpa/vendor/HibernateJpaDialect.java index 76f7304e308..ae1f787509f 100644 --- a/spring-orm/src/main/java/org/springframework/orm/jpa/vendor/HibernateJpaDialect.java +++ b/spring-orm/src/main/java/org/springframework/orm/jpa/vendor/HibernateJpaDialect.java @@ -252,24 +252,24 @@ public class HibernateJpaDialect extends DefaultJpaDialect { if (ex instanceof JDBCConnectionException) { return new DataAccessResourceFailureException(ex.getMessage(), ex); } - if (ex instanceof SQLGrammarException hibJdbcEx) { - return new InvalidDataAccessResourceUsageException(ex.getMessage() + "; SQL [" + hibJdbcEx.getSQL() + "]", ex); + if (ex instanceof SQLGrammarException hibEx) { + return new InvalidDataAccessResourceUsageException(ex.getMessage() + "; SQL [" + hibEx.getSQL() + "]", ex); } - if (ex instanceof QueryTimeoutException hibJdbcEx) { - return new org.springframework.dao.QueryTimeoutException(ex.getMessage() + "; SQL [" + hibJdbcEx.getSQL() + "]", ex); + if (ex instanceof QueryTimeoutException hibEx) { + return new org.springframework.dao.QueryTimeoutException(ex.getMessage() + "; SQL [" + hibEx.getSQL() + "]", ex); } - if (ex instanceof LockAcquisitionException hibJdbcEx) { - return new CannotAcquireLockException(ex.getMessage() + "; SQL [" + hibJdbcEx.getSQL() + "]", ex); + if (ex instanceof LockAcquisitionException hibEx) { + return new CannotAcquireLockException(ex.getMessage() + "; SQL [" + hibEx.getSQL() + "]", ex); } - if (ex instanceof PessimisticLockException hibJdbcEx) { - return new PessimisticLockingFailureException(ex.getMessage() + "; SQL [" + hibJdbcEx.getSQL() + "]", ex); + if (ex instanceof PessimisticLockException hibEx) { + return new PessimisticLockingFailureException(ex.getMessage() + "; SQL [" + hibEx.getSQL() + "]", ex); } - if (ex instanceof ConstraintViolationException hibJdbcEx) { - return new DataIntegrityViolationException(ex.getMessage() + "; SQL [" + hibJdbcEx.getSQL() + - "]; constraint [" + hibJdbcEx.getConstraintName() + "]", ex); + if (ex instanceof ConstraintViolationException hibEx) { + return new DataIntegrityViolationException(ex.getMessage() + "; SQL [" + hibEx.getSQL() + + "]; constraint [" + hibEx.getConstraintName() + "]", ex); } - if (ex instanceof DataException hibJdbcEx) { - return new DataIntegrityViolationException(ex.getMessage() + "; SQL [" + hibJdbcEx.getSQL() + "]", ex); + if (ex instanceof DataException hibEx) { + return new DataIntegrityViolationException(ex.getMessage() + "; SQL [" + hibEx.getSQL() + "]", ex); } // end of JDBCException subclass handling diff --git a/spring-r2dbc/src/main/java/org/springframework/r2dbc/core/DefaultDatabaseClient.java b/spring-r2dbc/src/main/java/org/springframework/r2dbc/core/DefaultDatabaseClient.java index 7db53bcd280..dcfdbd33ee6 100644 --- a/spring-r2dbc/src/main/java/org/springframework/r2dbc/core/DefaultDatabaseClient.java +++ b/spring-r2dbc/src/main/java/org/springframework/r2dbc/core/DefaultDatabaseClient.java @@ -166,9 +166,8 @@ final class DefaultDatabaseClient implements DatabaseClient { * @return a {@link Publisher} that completes successfully when the connection is closed */ private Publisher closeConnection(Connection connection) { - return ConnectionFactoryUtils.currentConnectionFactory( - obtainConnectionFactory()).then().onErrorResume(Exception.class, - e -> Mono.from(connection.close())); + return ConnectionFactoryUtils.currentConnectionFactory(obtainConnectionFactory()).then() + .onErrorResume(Exception.class, ex -> Mono.from(connection.close())); } /**