Browse Source

DefaultMessageListenerContainer immediately invokes stop callback when not running

Issue: SPR-14233
(cherry picked from commit e45d33f)
pull/1056/head
Juergen Hoeller 10 years ago
parent
commit
85faeef816
  1. 6
      spring-jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java
  2. 91
      spring-jms/src/test/java/org/springframework/jms/listener/DefaultMessageListenerContainerTests.java

6
spring-jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java

@ -614,6 +614,12 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe @@ -614,6 +614,12 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
@Override
public void stop(Runnable callback) throws JmsException {
synchronized (this.lifecycleMonitor) {
if (!isRunning() || this.stopCallback != null) {
// Not started, already stopped, or previous stop attempt in progress
// -> return immediately, no stop process to control anymore.
callback.run();
return;
}
this.stopCallback = callback;
}
stop();

91
spring-jms/src/test/java/org/springframework/jms/listener/DefaultMessageListenerContainerTests.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,6 +16,8 @@ @@ -16,6 +16,8 @@
package org.springframework.jms.listener;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
@ -32,73 +34,90 @@ import static org.junit.Assert.*; @@ -32,73 +34,90 @@ import static org.junit.Assert.*;
import static org.mockito.BDDMockito.*;
/**
*
* @author Stephane Nicoll
*/
public class DefaultMessageListenerContainerTests {
@Test
public void applyBackOff() {
BackOff mock = mock(BackOff.class);
BackOff backOff = mock(BackOff.class);
BackOffExecution execution = mock(BackOffExecution.class);
given(execution.nextBackOff()).willReturn(BackOffExecution.STOP);
given(mock.start()).willReturn(execution);
given(backOff.start()).willReturn(execution);
DefaultMessageListenerContainer container = createContainer(mock, createFailingContainerFactory());
DefaultMessageListenerContainer container = createContainer(createFailingContainerFactory());
container.setBackOff(backOff);
container.start();
assertEquals(true, container.isRunning());
container.refreshConnectionUntilSuccessful();
assertEquals(false, container.isRunning());
verify(mock).start();
verify(backOff).start();
verify(execution).nextBackOff();
}
@Test
public void applyBackOffRetry() {
BackOff mock = mock(BackOff.class);
BackOff backOff = mock(BackOff.class);
BackOffExecution execution = mock(BackOffExecution.class);
given(execution.nextBackOff()).willReturn(50L, BackOffExecution.STOP);
given(mock.start()).willReturn(execution);
given(backOff.start()).willReturn(execution);
DefaultMessageListenerContainer container = createContainer(mock, createFailingContainerFactory());
DefaultMessageListenerContainer container = createContainer(createFailingContainerFactory());
container.setBackOff(backOff);
container.start();
container.refreshConnectionUntilSuccessful();
assertEquals(false, container.isRunning());
verify(mock).start();
verify(backOff).start();
verify(execution, times(2)).nextBackOff();
}
@Test
public void recoverResetBackOff() {
BackOff mock = mock(BackOff.class);
BackOff backOff = mock(BackOff.class);
BackOffExecution execution = mock(BackOffExecution.class);
given(execution.nextBackOff()).willReturn(50L, 50L, 50L); // 3 attempts max
given(mock.start()).willReturn(execution);
given(backOff.start()).willReturn(execution);
DefaultMessageListenerContainer container = createContainer(mock, createRecoverableContainerFactory(1));
DefaultMessageListenerContainer container = createContainer(createRecoverableContainerFactory(1));
container.setBackOff(backOff);
container.start();
container.refreshConnectionUntilSuccessful();
assertEquals(true, container.isRunning());
verify(mock).start();
verify(backOff).start();
verify(execution, times(1)).nextBackOff(); // only on attempt as the second one lead to a recovery
}
private DefaultMessageListenerContainer createContainer(BackOff backOff, ConnectionFactory connectionFactory) {
@Test
public void runnableIsInvokedEvenIfContainerIsNotRunning() throws InterruptedException {
DefaultMessageListenerContainer container = createRunningContainer();
container.stop();
// container is stopped but should nevertheless invoke the runnable argument
TestRunnable runnable2 = new TestRunnable();
container.stop(runnable2);
runnable2.waitForCompletion();
}
Destination destination = new Destination() {};
private DefaultMessageListenerContainer createRunningContainer() {
DefaultMessageListenerContainer container = createContainer(createSuccessfulConnectionFactory());
container.afterPropertiesSet();
container.start();
return container;
}
private DefaultMessageListenerContainer createContainer(ConnectionFactory connectionFactory) {
Destination destination = new Destination() {};
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setCacheLevel(DefaultMessageListenerContainer.CACHE_NONE);
container.setDestination(destination);
container.setBackOff(backOff);
return container;
}
private ConnectionFactory createFailingContainerFactory() {
@ -112,8 +131,8 @@ public class DefaultMessageListenerContainerTests { @@ -112,8 +131,8 @@ public class DefaultMessageListenerContainerTests {
});
return connectionFactory;
}
catch (JMSException e) {
throw new IllegalStateException(); // never happen
catch (JMSException ex) {
throw new IllegalStateException(ex); // never happen
}
}
@ -122,7 +141,6 @@ public class DefaultMessageListenerContainerTests { @@ -122,7 +141,6 @@ public class DefaultMessageListenerContainerTests {
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
given(connectionFactory.createConnection()).will(new Answer<Object>() {
int currentAttempts = 0;
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
currentAttempts++;
@ -136,8 +154,35 @@ public class DefaultMessageListenerContainerTests { @@ -136,8 +154,35 @@ public class DefaultMessageListenerContainerTests {
});
return connectionFactory;
}
catch (JMSException e) {
throw new IllegalStateException(); // never happen
catch (JMSException ex) {
throw new IllegalStateException(ex); // never happen
}
}
private ConnectionFactory createSuccessfulConnectionFactory() {
try {
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
given(connectionFactory.createConnection()).willReturn(mock(Connection.class));
return connectionFactory;
}
catch (JMSException ex) {
throw new IllegalStateException(ex); // never happen
}
}
private static class TestRunnable implements Runnable {
private final CountDownLatch countDownLatch = new CountDownLatch(1);
@Override
public void run() {
this.countDownLatch.countDown();
}
public void waitForCompletion() throws InterruptedException {
this.countDownLatch.await(2, TimeUnit.SECONDS);
assertEquals("callback was not invoked", 0, this.countDownLatch.getCount());
}
}

Loading…
Cancel
Save