diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/AbstractJmsListeningContainer.java b/spring-jms/src/main/java/org/springframework/jms/listener/AbstractJmsListeningContainer.java index 9bd759629d1..131f62e9a6b 100644 --- a/spring-jms/src/main/java/org/springframework/jms/listener/AbstractJmsListeningContainer.java +++ b/spring-jms/src/main/java/org/springframework/jms/listener/AbstractJmsListeningContainer.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; import jakarta.jms.Connection; import jakarta.jms.JMSException; @@ -403,15 +404,34 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess /** * Refresh the shared Connection that this container holds. - *
Called on startup and also after an infrastructure exception - * that occurred during invoker setup and/or execution. + *
Called on startup. * @throws JMSException if thrown by JMS API methods */ protected final void refreshSharedConnection() throws JMSException { + refreshSharedConnection(con -> {}); + } + + /** + * Refresh the shared Connection that this container holds. + *
Called after an infrastructure exception that occurred during
+ * invoker setup and/or execution.
+ * @param connectionValidator a callback to test the refreshed connection
+ * @throws JMSException if thrown by JMS API methods
+ * @since 7.0.4
+ */
+ void refreshSharedConnection(Consumer The default implementation creates a test {@code Session} and a test
+ * {@code MessageConsumer} which also validates {@code Destination} access.
+ * @param con the JMS Connection to validate
+ * @throws ConnectionValidationException in case of any setup failure
+ * @since 7.0.4
+ * @see #refreshConnectionUntilSuccessful()
+ */
+ private void validateRefreshedConnection(Connection con) throws ConnectionValidationException {
+ Session session = null;
+ MessageConsumer consumer = null;
+ try {
+ session = createSession(con);
+ consumer = createListenerConsumer(session);
+ }
+ catch (JMSException ex) {
+ throw new ConnectionValidationException("Failed to create listener for specified destination", ex);
+ }
+ finally {
+ JmsUtils.closeMessageConsumer(consumer);
+ JmsUtils.closeSession(session);
+ }
+ }
+
/**
* Apply the next back-off time using the specified {@link BackOffExecution}.
* Return {@code true} if the back-off period has been applied and a new
@@ -1259,8 +1290,6 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
private @Nullable Object lastRecoveryMarker;
- private boolean lastMessageSucceeded;
-
private int idleTaskExecutionCount = 0;
private volatile boolean idle = true;
@@ -1303,12 +1332,6 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
}
catch (Throwable ex) {
clearResources();
- if (!this.lastMessageSucceeded) {
- // We failed more than once in a row or on startup -
- // wait before first recovery attempt.
- waitBeforeRecoveryAttempt();
- }
- this.lastMessageSucceeded = false;
boolean alreadyRecovered = false;
recoveryLock.lock();
try {
@@ -1420,9 +1443,7 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
this.currentReceiveThread = Thread.currentThread();
try {
initResourcesIfNecessary();
- boolean messageReceived = receiveAndExecute(this, this.session, this.consumer);
- this.lastMessageSucceeded = true;
- return messageReceived;
+ return receiveAndExecute(this, this.session, this.consumer);
}
finally {
this.currentReceiveThread = null;
@@ -1512,17 +1533,6 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
this.session = null;
}
- /**
- * Apply the back-off time once. In a regular scenario, the back-off is only applied if we
- * failed to recover with the broker. This additional wait period avoids a burst retry
- * scenario when the broker is actually up but something else is failing (i.e. listener
- * specific).
- */
- private void waitBeforeRecoveryAttempt() {
- BackOffExecution execution = DefaultMessageListenerContainer.this.backOff.start();
- applyBackOffTime(execution);
- }
-
@Override
public boolean isLongLived() {
return (maxMessagesPerTask < 0);
@@ -1537,4 +1547,17 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
}
}
+
+ /**
+ * Internal exception class that indicates a validation failure for a Connection.
+ * @since 7.0.4
+ */
+ @SuppressWarnings("serial")
+ private static class ConnectionValidationException extends RuntimeException {
+
+ public ConnectionValidationException(String msg, JMSException cause) {
+ super(msg, cause);
+ }
+ }
+
}
diff --git a/spring-jms/src/test/java/org/springframework/jms/listener/DefaultMessageListenerContainerTests.java b/spring-jms/src/test/java/org/springframework/jms/listener/DefaultMessageListenerContainerTests.java
index 822bba66397..d5d2d71b581 100644
--- a/spring-jms/src/test/java/org/springframework/jms/listener/DefaultMessageListenerContainerTests.java
+++ b/spring-jms/src/test/java/org/springframework/jms/listener/DefaultMessageListenerContainerTests.java
@@ -28,6 +28,8 @@ import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.Session;
import org.junit.jupiter.api.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -53,6 +55,9 @@ import static org.mockito.Mockito.verify;
*/
class DefaultMessageListenerContainerTests {
+ private static final Destination DESTINATION = new Destination() {};
+
+
@Test
void applyBackOff() {
BackOff backOff = mock();
@@ -223,12 +228,10 @@ class DefaultMessageListenerContainerTests {
}
private static DefaultMessageListenerContainer createContainer(ConnectionFactory connectionFactory) {
- Destination destination = new Destination() {};
-
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setCacheLevel(DefaultMessageListenerContainer.CACHE_NONE);
- container.setDestination(destination);
+ container.setDestination(DESTINATION);
return container;
}
@@ -257,7 +260,12 @@ class DefaultMessageListenerContainerTests {
throw new JMSException("Test exception (attempt " + currentAttempts + ")");
}
else {
- return mock(Connection.class);
+ Connection con = mock(Connection.class);
+ Session session = mock(Session.class);
+ MessageConsumer consumer = mock(MessageConsumer.class);
+ given(con.createSession(false, Session.AUTO_ACKNOWLEDGE)).willReturn(session);
+ given(session.createConsumer(DESTINATION)).willReturn(consumer);
+ return con;
}
}
});