From 85faeef8168baa62beb471be8add5ec539de22f5 Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Mon, 2 May 2016 12:57:30 +0200 Subject: [PATCH] DefaultMessageListenerContainer immediately invokes stop callback when not running Issue: SPR-14233 (cherry picked from commit e45d33f) --- .../DefaultMessageListenerContainer.java | 6 ++ .../DefaultMessageListenerContainerTests.java | 91 ++++++++++++++----- 2 files changed, 74 insertions(+), 23 deletions(-) diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java b/spring-jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java index 06209132b36..5fe49eb4520 100644 --- a/spring-jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java +++ b/spring-jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java @@ -614,6 +614,12 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe @Override public void stop(Runnable callback) throws JmsException { synchronized (this.lifecycleMonitor) { + if (!isRunning() || this.stopCallback != null) { + // Not started, already stopped, or previous stop attempt in progress + // -> return immediately, no stop process to control anymore. + callback.run(); + return; + } this.stopCallback = callback; } stop(); diff --git a/spring-jms/src/test/java/org/springframework/jms/listener/DefaultMessageListenerContainerTests.java b/spring-jms/src/test/java/org/springframework/jms/listener/DefaultMessageListenerContainerTests.java index e9a8e8d8448..aa24fcd04f8 100644 --- a/spring-jms/src/test/java/org/springframework/jms/listener/DefaultMessageListenerContainerTests.java +++ b/spring-jms/src/test/java/org/springframework/jms/listener/DefaultMessageListenerContainerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2014 the original author or authors. + * Copyright 2002-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package org.springframework.jms.listener; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; @@ -32,73 +34,90 @@ import static org.junit.Assert.*; import static org.mockito.BDDMockito.*; /** - * * @author Stephane Nicoll */ public class DefaultMessageListenerContainerTests { @Test public void applyBackOff() { - BackOff mock = mock(BackOff.class); + BackOff backOff = mock(BackOff.class); BackOffExecution execution = mock(BackOffExecution.class); given(execution.nextBackOff()).willReturn(BackOffExecution.STOP); - given(mock.start()).willReturn(execution); + given(backOff.start()).willReturn(execution); - DefaultMessageListenerContainer container = createContainer(mock, createFailingContainerFactory()); + DefaultMessageListenerContainer container = createContainer(createFailingContainerFactory()); + container.setBackOff(backOff); container.start(); assertEquals(true, container.isRunning()); container.refreshConnectionUntilSuccessful(); assertEquals(false, container.isRunning()); - verify(mock).start(); + verify(backOff).start(); verify(execution).nextBackOff(); } @Test public void applyBackOffRetry() { - BackOff mock = mock(BackOff.class); + BackOff backOff = mock(BackOff.class); BackOffExecution execution = mock(BackOffExecution.class); given(execution.nextBackOff()).willReturn(50L, BackOffExecution.STOP); - given(mock.start()).willReturn(execution); + given(backOff.start()).willReturn(execution); - DefaultMessageListenerContainer container = createContainer(mock, createFailingContainerFactory()); + DefaultMessageListenerContainer container = createContainer(createFailingContainerFactory()); + container.setBackOff(backOff); container.start(); container.refreshConnectionUntilSuccessful(); assertEquals(false, container.isRunning()); - verify(mock).start(); + verify(backOff).start(); verify(execution, times(2)).nextBackOff(); } @Test public void recoverResetBackOff() { - BackOff mock = mock(BackOff.class); + BackOff backOff = mock(BackOff.class); BackOffExecution execution = mock(BackOffExecution.class); given(execution.nextBackOff()).willReturn(50L, 50L, 50L); // 3 attempts max - given(mock.start()).willReturn(execution); + given(backOff.start()).willReturn(execution); - DefaultMessageListenerContainer container = createContainer(mock, createRecoverableContainerFactory(1)); + DefaultMessageListenerContainer container = createContainer(createRecoverableContainerFactory(1)); + container.setBackOff(backOff); container.start(); container.refreshConnectionUntilSuccessful(); assertEquals(true, container.isRunning()); - verify(mock).start(); + verify(backOff).start(); verify(execution, times(1)).nextBackOff(); // only on attempt as the second one lead to a recovery } - private DefaultMessageListenerContainer createContainer(BackOff backOff, ConnectionFactory connectionFactory) { + @Test + public void runnableIsInvokedEvenIfContainerIsNotRunning() throws InterruptedException { + DefaultMessageListenerContainer container = createRunningContainer(); + container.stop(); + + // container is stopped but should nevertheless invoke the runnable argument + TestRunnable runnable2 = new TestRunnable(); + container.stop(runnable2); + runnable2.waitForCompletion(); + } - Destination destination = new Destination() {}; + private DefaultMessageListenerContainer createRunningContainer() { + DefaultMessageListenerContainer container = createContainer(createSuccessfulConnectionFactory()); + container.afterPropertiesSet(); + container.start(); + return container; + } + + private DefaultMessageListenerContainer createContainer(ConnectionFactory connectionFactory) { + Destination destination = new Destination() {}; DefaultMessageListenerContainer container = new DefaultMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setCacheLevel(DefaultMessageListenerContainer.CACHE_NONE); container.setDestination(destination); - container.setBackOff(backOff); return container; - } private ConnectionFactory createFailingContainerFactory() { @@ -112,8 +131,8 @@ public class DefaultMessageListenerContainerTests { }); return connectionFactory; } - catch (JMSException e) { - throw new IllegalStateException(); // never happen + catch (JMSException ex) { + throw new IllegalStateException(ex); // never happen } } @@ -122,7 +141,6 @@ public class DefaultMessageListenerContainerTests { ConnectionFactory connectionFactory = mock(ConnectionFactory.class); given(connectionFactory.createConnection()).will(new Answer() { int currentAttempts = 0; - @Override public Object answer(InvocationOnMock invocation) throws Throwable { currentAttempts++; @@ -136,8 +154,35 @@ public class DefaultMessageListenerContainerTests { }); return connectionFactory; } - catch (JMSException e) { - throw new IllegalStateException(); // never happen + catch (JMSException ex) { + throw new IllegalStateException(ex); // never happen + } + } + + private ConnectionFactory createSuccessfulConnectionFactory() { + try { + ConnectionFactory connectionFactory = mock(ConnectionFactory.class); + given(connectionFactory.createConnection()).willReturn(mock(Connection.class)); + return connectionFactory; + } + catch (JMSException ex) { + throw new IllegalStateException(ex); // never happen + } + } + + + private static class TestRunnable implements Runnable { + + private final CountDownLatch countDownLatch = new CountDownLatch(1); + + @Override + public void run() { + this.countDownLatch.countDown(); + } + + public void waitForCompletion() throws InterruptedException { + this.countDownLatch.await(2, TimeUnit.SECONDS); + assertEquals("callback was not invoked", 0, this.countDownLatch.getCount()); } }