Browse Source

Revise DefaultMessageListenerContainer connection recovery

Includes listener validation for consistent back-off behavior.

Closes gh-36143
pull/36170/head
Juergen Hoeller 2 weeks ago
parent
commit
48bedd0af1
  1. 26
      spring-jms/src/main/java/org/springframework/jms/listener/AbstractJmsListeningContainer.java
  2. 73
      spring-jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java
  3. 16
      spring-jms/src/test/java/org/springframework/jms/listener/DefaultMessageListenerContainerTests.java

26
spring-jms/src/main/java/org/springframework/jms/listener/AbstractJmsListeningContainer.java

@ -22,6 +22,7 @@ import java.util.List; @@ -22,6 +22,7 @@ import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import jakarta.jms.Connection;
import jakarta.jms.JMSException;
@ -403,15 +404,34 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess @@ -403,15 +404,34 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess
/**
* Refresh the shared Connection that this container holds.
* <p>Called on startup and also after an infrastructure exception
* that occurred during invoker setup and/or execution.
* <p>Called on startup.
* @throws JMSException if thrown by JMS API methods
*/
protected final void refreshSharedConnection() throws JMSException {
refreshSharedConnection(con -> {});
}
/**
* Refresh the shared Connection that this container holds.
* <p>Called after an infrastructure exception that occurred during
* invoker setup and/or execution.
* @param connectionValidator a callback to test the refreshed connection
* @throws JMSException if thrown by JMS API methods
* @since 7.0.4
*/
void refreshSharedConnection(Consumer<Connection> connectionValidator) throws JMSException {
this.sharedConnectionLock.lock();
try {
releaseSharedConnection();
this.sharedConnection = createSharedConnection();
Connection con = createSharedConnection();
try {
connectionValidator.accept(con);
}
catch (RuntimeException | Error ex) {
JmsUtils.closeConnection(con);
throw ex;
}
this.sharedConnection = con;
if (this.sharedConnectionStarted) {
this.sharedConnection.start();
}

73
spring-jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java

@ -1118,8 +1118,8 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe @@ -1118,8 +1118,8 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
protected void recoverAfterListenerSetupFailure() {
this.recovering = true;
try {
refreshConnectionUntilSuccessful();
refreshDestination();
refreshConnectionUntilSuccessful();
}
finally {
this.recovering = false;
@ -1144,11 +1144,16 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe @@ -1144,11 +1144,16 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
while (isRunning()) {
try {
if (sharedConnectionEnabled()) {
refreshSharedConnection();
refreshSharedConnection(this::validateRefreshedConnection);
}
else {
Connection con = createConnection();
JmsUtils.closeConnection(con);
try {
validateRefreshedConnection(con);
}
finally {
JmsUtils.closeConnection(con);
}
}
logger.debug("Successfully refreshed JMS Connection");
break;
@ -1197,6 +1202,32 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe @@ -1197,6 +1202,32 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
}
}
/**
* Validate the given connection after a refresh, typically initiated after
* listener setup failure.
* <p>The default implementation creates a test {@code Session} and a test
* {@code MessageConsumer} which also validates {@code Destination} access.
* @param con the JMS Connection to validate
* @throws ConnectionValidationException in case of any setup failure
* @since 7.0.4
* @see #refreshConnectionUntilSuccessful()
*/
private void validateRefreshedConnection(Connection con) throws ConnectionValidationException {
Session session = null;
MessageConsumer consumer = null;
try {
session = createSession(con);
consumer = createListenerConsumer(session);
}
catch (JMSException ex) {
throw new ConnectionValidationException("Failed to create listener for specified destination", ex);
}
finally {
JmsUtils.closeMessageConsumer(consumer);
JmsUtils.closeSession(session);
}
}
/**
* Apply the next back-off time using the specified {@link BackOffExecution}.
* <p>Return {@code true} if the back-off period has been applied and a new
@ -1259,8 +1290,6 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe @@ -1259,8 +1290,6 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
private @Nullable Object lastRecoveryMarker;
private boolean lastMessageSucceeded;
private int idleTaskExecutionCount = 0;
private volatile boolean idle = true;
@ -1303,12 +1332,6 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe @@ -1303,12 +1332,6 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
}
catch (Throwable ex) {
clearResources();
if (!this.lastMessageSucceeded) {
// We failed more than once in a row or on startup -
// wait before first recovery attempt.
waitBeforeRecoveryAttempt();
}
this.lastMessageSucceeded = false;
boolean alreadyRecovered = false;
recoveryLock.lock();
try {
@ -1420,9 +1443,7 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe @@ -1420,9 +1443,7 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
this.currentReceiveThread = Thread.currentThread();
try {
initResourcesIfNecessary();
boolean messageReceived = receiveAndExecute(this, this.session, this.consumer);
this.lastMessageSucceeded = true;
return messageReceived;
return receiveAndExecute(this, this.session, this.consumer);
}
finally {
this.currentReceiveThread = null;
@ -1512,17 +1533,6 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe @@ -1512,17 +1533,6 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
this.session = null;
}
/**
* Apply the back-off time once. In a regular scenario, the back-off is only applied if we
* failed to recover with the broker. This additional wait period avoids a burst retry
* scenario when the broker is actually up but something else is failing (i.e. listener
* specific).
*/
private void waitBeforeRecoveryAttempt() {
BackOffExecution execution = DefaultMessageListenerContainer.this.backOff.start();
applyBackOffTime(execution);
}
@Override
public boolean isLongLived() {
return (maxMessagesPerTask < 0);
@ -1537,4 +1547,17 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe @@ -1537,4 +1547,17 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
}
}
/**
* Internal exception class that indicates a validation failure for a Connection.
* @since 7.0.4
*/
@SuppressWarnings("serial")
private static class ConnectionValidationException extends RuntimeException {
public ConnectionValidationException(String msg, JMSException cause) {
super(msg, cause);
}
}
}

16
spring-jms/src/test/java/org/springframework/jms/listener/DefaultMessageListenerContainerTests.java

@ -28,6 +28,8 @@ import jakarta.jms.Connection; @@ -28,6 +28,8 @@ import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.MessageConsumer;
import jakarta.jms.Session;
import org.junit.jupiter.api.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@ -53,6 +55,9 @@ import static org.mockito.Mockito.verify; @@ -53,6 +55,9 @@ import static org.mockito.Mockito.verify;
*/
class DefaultMessageListenerContainerTests {
private static final Destination DESTINATION = new Destination() {};
@Test
void applyBackOff() {
BackOff backOff = mock();
@ -223,12 +228,10 @@ class DefaultMessageListenerContainerTests { @@ -223,12 +228,10 @@ class DefaultMessageListenerContainerTests {
}
private static DefaultMessageListenerContainer createContainer(ConnectionFactory connectionFactory) {
Destination destination = new Destination() {};
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setCacheLevel(DefaultMessageListenerContainer.CACHE_NONE);
container.setDestination(destination);
container.setDestination(DESTINATION);
return container;
}
@ -257,7 +260,12 @@ class DefaultMessageListenerContainerTests { @@ -257,7 +260,12 @@ class DefaultMessageListenerContainerTests {
throw new JMSException("Test exception (attempt " + currentAttempts + ")");
}
else {
return mock(Connection.class);
Connection con = mock(Connection.class);
Session session = mock(Session.class);
MessageConsumer consumer = mock(MessageConsumer.class);
given(con.createSession(false, Session.AUTO_ACKNOWLEDGE)).willReturn(session);
given(session.createConsumer(DESTINATION)).willReturn(consumer);
return con;
}
}
});

Loading…
Cancel
Save