From 48bedd0af14b90d0e86d4a19a95e4d74a2512f44 Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Fri, 16 Jan 2026 22:38:01 +0100 Subject: [PATCH] Revise DefaultMessageListenerContainer connection recovery Includes listener validation for consistent back-off behavior. Closes gh-36143 --- .../AbstractJmsListeningContainer.java | 26 ++++++- .../DefaultMessageListenerContainer.java | 73 ++++++++++++------- .../DefaultMessageListenerContainerTests.java | 16 +++- 3 files changed, 83 insertions(+), 32 deletions(-) diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/AbstractJmsListeningContainer.java b/spring-jms/src/main/java/org/springframework/jms/listener/AbstractJmsListeningContainer.java index 9bd759629d1..131f62e9a6b 100644 --- a/spring-jms/src/main/java/org/springframework/jms/listener/AbstractJmsListeningContainer.java +++ b/spring-jms/src/main/java/org/springframework/jms/listener/AbstractJmsListeningContainer.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; import jakarta.jms.Connection; import jakarta.jms.JMSException; @@ -403,15 +404,34 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess /** * Refresh the shared Connection that this container holds. - *

Called on startup and also after an infrastructure exception - * that occurred during invoker setup and/or execution. + *

Called on startup. * @throws JMSException if thrown by JMS API methods */ protected final void refreshSharedConnection() throws JMSException { + refreshSharedConnection(con -> {}); + } + + /** + * Refresh the shared Connection that this container holds. + *

Called after an infrastructure exception that occurred during + * invoker setup and/or execution. + * @param connectionValidator a callback to test the refreshed connection + * @throws JMSException if thrown by JMS API methods + * @since 7.0.4 + */ + void refreshSharedConnection(Consumer connectionValidator) throws JMSException { this.sharedConnectionLock.lock(); try { releaseSharedConnection(); - this.sharedConnection = createSharedConnection(); + Connection con = createSharedConnection(); + try { + connectionValidator.accept(con); + } + catch (RuntimeException | Error ex) { + JmsUtils.closeConnection(con); + throw ex; + } + this.sharedConnection = con; if (this.sharedConnectionStarted) { this.sharedConnection.start(); } diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java b/spring-jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java index d61292428cf..5b5d47bd1bc 100644 --- a/spring-jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java +++ b/spring-jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java @@ -1118,8 +1118,8 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe protected void recoverAfterListenerSetupFailure() { this.recovering = true; try { - refreshConnectionUntilSuccessful(); refreshDestination(); + refreshConnectionUntilSuccessful(); } finally { this.recovering = false; @@ -1144,11 +1144,16 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe while (isRunning()) { try { if (sharedConnectionEnabled()) { - refreshSharedConnection(); + refreshSharedConnection(this::validateRefreshedConnection); } else { Connection con = createConnection(); - JmsUtils.closeConnection(con); + try { + validateRefreshedConnection(con); + } + finally { + JmsUtils.closeConnection(con); + } } logger.debug("Successfully refreshed JMS Connection"); break; @@ -1197,6 +1202,32 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe } } + /** + * Validate the given connection after a refresh, typically initiated after + * listener setup failure. + *

The default implementation creates a test {@code Session} and a test + * {@code MessageConsumer} which also validates {@code Destination} access. + * @param con the JMS Connection to validate + * @throws ConnectionValidationException in case of any setup failure + * @since 7.0.4 + * @see #refreshConnectionUntilSuccessful() + */ + private void validateRefreshedConnection(Connection con) throws ConnectionValidationException { + Session session = null; + MessageConsumer consumer = null; + try { + session = createSession(con); + consumer = createListenerConsumer(session); + } + catch (JMSException ex) { + throw new ConnectionValidationException("Failed to create listener for specified destination", ex); + } + finally { + JmsUtils.closeMessageConsumer(consumer); + JmsUtils.closeSession(session); + } + } + /** * Apply the next back-off time using the specified {@link BackOffExecution}. *

Return {@code true} if the back-off period has been applied and a new @@ -1259,8 +1290,6 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe private @Nullable Object lastRecoveryMarker; - private boolean lastMessageSucceeded; - private int idleTaskExecutionCount = 0; private volatile boolean idle = true; @@ -1303,12 +1332,6 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe } catch (Throwable ex) { clearResources(); - if (!this.lastMessageSucceeded) { - // We failed more than once in a row or on startup - - // wait before first recovery attempt. - waitBeforeRecoveryAttempt(); - } - this.lastMessageSucceeded = false; boolean alreadyRecovered = false; recoveryLock.lock(); try { @@ -1420,9 +1443,7 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe this.currentReceiveThread = Thread.currentThread(); try { initResourcesIfNecessary(); - boolean messageReceived = receiveAndExecute(this, this.session, this.consumer); - this.lastMessageSucceeded = true; - return messageReceived; + return receiveAndExecute(this, this.session, this.consumer); } finally { this.currentReceiveThread = null; @@ -1512,17 +1533,6 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe this.session = null; } - /** - * Apply the back-off time once. In a regular scenario, the back-off is only applied if we - * failed to recover with the broker. This additional wait period avoids a burst retry - * scenario when the broker is actually up but something else is failing (i.e. listener - * specific). - */ - private void waitBeforeRecoveryAttempt() { - BackOffExecution execution = DefaultMessageListenerContainer.this.backOff.start(); - applyBackOffTime(execution); - } - @Override public boolean isLongLived() { return (maxMessagesPerTask < 0); @@ -1537,4 +1547,17 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe } } + + /** + * Internal exception class that indicates a validation failure for a Connection. + * @since 7.0.4 + */ + @SuppressWarnings("serial") + private static class ConnectionValidationException extends RuntimeException { + + public ConnectionValidationException(String msg, JMSException cause) { + super(msg, cause); + } + } + } diff --git a/spring-jms/src/test/java/org/springframework/jms/listener/DefaultMessageListenerContainerTests.java b/spring-jms/src/test/java/org/springframework/jms/listener/DefaultMessageListenerContainerTests.java index 822bba66397..d5d2d71b581 100644 --- a/spring-jms/src/test/java/org/springframework/jms/listener/DefaultMessageListenerContainerTests.java +++ b/spring-jms/src/test/java/org/springframework/jms/listener/DefaultMessageListenerContainerTests.java @@ -28,6 +28,8 @@ import jakarta.jms.Connection; import jakarta.jms.ConnectionFactory; import jakarta.jms.Destination; import jakarta.jms.JMSException; +import jakarta.jms.MessageConsumer; +import jakarta.jms.Session; import org.junit.jupiter.api.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -53,6 +55,9 @@ import static org.mockito.Mockito.verify; */ class DefaultMessageListenerContainerTests { + private static final Destination DESTINATION = new Destination() {}; + + @Test void applyBackOff() { BackOff backOff = mock(); @@ -223,12 +228,10 @@ class DefaultMessageListenerContainerTests { } private static DefaultMessageListenerContainer createContainer(ConnectionFactory connectionFactory) { - Destination destination = new Destination() {}; - DefaultMessageListenerContainer container = new DefaultMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setCacheLevel(DefaultMessageListenerContainer.CACHE_NONE); - container.setDestination(destination); + container.setDestination(DESTINATION); return container; } @@ -257,7 +260,12 @@ class DefaultMessageListenerContainerTests { throw new JMSException("Test exception (attempt " + currentAttempts + ")"); } else { - return mock(Connection.class); + Connection con = mock(Connection.class); + Session session = mock(Session.class); + MessageConsumer consumer = mock(MessageConsumer.class); + given(con.createSession(false, Session.AUTO_ACKNOWLEDGE)).willReturn(session); + given(session.createConsumer(DESTINATION)).willReturn(consumer); + return con; } } });