|
|
|
|
@ -1,5 +1,5 @@
@@ -1,5 +1,5 @@
|
|
|
|
|
/* |
|
|
|
|
* Copyright 2002-2014 the original author or authors. |
|
|
|
|
* Copyright 2002-2016 the original author or authors. |
|
|
|
|
* |
|
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
|
|
|
* you may not use this file except in compliance with the License. |
|
|
|
|
@ -16,6 +16,8 @@
@@ -16,6 +16,8 @@
|
|
|
|
|
|
|
|
|
|
package org.springframework.jms.listener; |
|
|
|
|
|
|
|
|
|
import java.util.concurrent.CountDownLatch; |
|
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
|
import javax.jms.Connection; |
|
|
|
|
import javax.jms.ConnectionFactory; |
|
|
|
|
import javax.jms.Destination; |
|
|
|
|
@ -39,55 +41,76 @@ public class DefaultMessageListenerContainerTests {
@@ -39,55 +41,76 @@ public class DefaultMessageListenerContainerTests {
|
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void applyBackOff() { |
|
|
|
|
BackOff mock = mock(BackOff.class); |
|
|
|
|
BackOff backOff = mock(BackOff.class); |
|
|
|
|
BackOffExecution execution = mock(BackOffExecution.class); |
|
|
|
|
given(execution.nextBackOff()).willReturn(BackOffExecution.STOP); |
|
|
|
|
given(mock.start()).willReturn(execution); |
|
|
|
|
given(backOff.start()).willReturn(execution); |
|
|
|
|
|
|
|
|
|
DefaultMessageListenerContainer container = createContainer(mock, createFailingContainerFactory()); |
|
|
|
|
DefaultMessageListenerContainer container = createContainer(createFailingContainerFactory()); |
|
|
|
|
container.setBackOff(backOff); |
|
|
|
|
container.start(); |
|
|
|
|
assertEquals(true, container.isRunning()); |
|
|
|
|
|
|
|
|
|
container.refreshConnectionUntilSuccessful(); |
|
|
|
|
|
|
|
|
|
assertEquals(false, container.isRunning()); |
|
|
|
|
verify(mock).start(); |
|
|
|
|
verify(backOff).start(); |
|
|
|
|
verify(execution).nextBackOff(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void applyBackOffRetry() { |
|
|
|
|
BackOff mock = mock(BackOff.class); |
|
|
|
|
BackOff backOff = mock(BackOff.class); |
|
|
|
|
BackOffExecution execution = mock(BackOffExecution.class); |
|
|
|
|
given(execution.nextBackOff()).willReturn(50L, BackOffExecution.STOP); |
|
|
|
|
given(mock.start()).willReturn(execution); |
|
|
|
|
given(backOff.start()).willReturn(execution); |
|
|
|
|
|
|
|
|
|
DefaultMessageListenerContainer container = createContainer(mock, createFailingContainerFactory()); |
|
|
|
|
DefaultMessageListenerContainer container = createContainer(createFailingContainerFactory()); |
|
|
|
|
container.setBackOff(backOff); |
|
|
|
|
container.start(); |
|
|
|
|
container.refreshConnectionUntilSuccessful(); |
|
|
|
|
|
|
|
|
|
assertEquals(false, container.isRunning()); |
|
|
|
|
verify(mock).start(); |
|
|
|
|
verify(backOff).start(); |
|
|
|
|
verify(execution, times(2)).nextBackOff(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void recoverResetBackOff() { |
|
|
|
|
BackOff mock = mock(BackOff.class); |
|
|
|
|
BackOff backOff = mock(BackOff.class); |
|
|
|
|
BackOffExecution execution = mock(BackOffExecution.class); |
|
|
|
|
given(execution.nextBackOff()).willReturn(50L, 50L, 50L); // 3 attempts max
|
|
|
|
|
given(mock.start()).willReturn(execution); |
|
|
|
|
given(backOff.start()).willReturn(execution); |
|
|
|
|
|
|
|
|
|
DefaultMessageListenerContainer container = createContainer(mock, createRecoverableContainerFactory(1)); |
|
|
|
|
DefaultMessageListenerContainer container = createContainer(createRecoverableContainerFactory(1)); |
|
|
|
|
container.setBackOff(backOff); |
|
|
|
|
container.start(); |
|
|
|
|
container.refreshConnectionUntilSuccessful(); |
|
|
|
|
|
|
|
|
|
assertEquals(true, container.isRunning()); |
|
|
|
|
verify(mock).start(); |
|
|
|
|
verify(backOff).start(); |
|
|
|
|
verify(execution, times(1)).nextBackOff(); // only on attempt as the second one lead to a recovery
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private DefaultMessageListenerContainer createContainer(BackOff backOff, ConnectionFactory connectionFactory) { |
|
|
|
|
@Test |
|
|
|
|
public void runnableIsInvokedEvenIfContainerIsNotRunning() throws InterruptedException { |
|
|
|
|
DefaultMessageListenerContainer container = createRunningContainer(); |
|
|
|
|
container.stop(); |
|
|
|
|
|
|
|
|
|
// container is stopped but should nevertheless invoke the runnable argument
|
|
|
|
|
TestRunnable runnable2 = new TestRunnable(); |
|
|
|
|
container.stop(runnable2); |
|
|
|
|
runnable2.waitForCompletion(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private DefaultMessageListenerContainer createRunningContainer() { |
|
|
|
|
DefaultMessageListenerContainer container = createContainer(createSuccessfulConnectionFactory()); |
|
|
|
|
container.afterPropertiesSet(); |
|
|
|
|
container.start(); |
|
|
|
|
return container; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private DefaultMessageListenerContainer createContainer(ConnectionFactory connectionFactory) { |
|
|
|
|
|
|
|
|
|
Destination destination = new Destination() {}; |
|
|
|
|
|
|
|
|
|
@ -96,9 +119,7 @@ public class DefaultMessageListenerContainerTests {
@@ -96,9 +119,7 @@ public class DefaultMessageListenerContainerTests {
|
|
|
|
|
container.setConnectionFactory(connectionFactory); |
|
|
|
|
container.setCacheLevel(DefaultMessageListenerContainer.CACHE_NONE); |
|
|
|
|
container.setDestination(destination); |
|
|
|
|
container.setBackOff(backOff); |
|
|
|
|
return container; |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private ConnectionFactory createFailingContainerFactory() { |
|
|
|
|
@ -141,4 +162,30 @@ public class DefaultMessageListenerContainerTests {
@@ -141,4 +162,30 @@ public class DefaultMessageListenerContainerTests {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private ConnectionFactory createSuccessfulConnectionFactory() { |
|
|
|
|
try { |
|
|
|
|
ConnectionFactory connectionFactory = mock(ConnectionFactory.class); |
|
|
|
|
given(connectionFactory.createConnection()).willReturn(mock(Connection.class)); |
|
|
|
|
return connectionFactory; |
|
|
|
|
} |
|
|
|
|
catch (JMSException e) { |
|
|
|
|
throw new IllegalStateException(); // never happen
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private static class TestRunnable implements Runnable { |
|
|
|
|
private final CountDownLatch countDownLatch = new CountDownLatch(1); |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public void run() { |
|
|
|
|
this.countDownLatch.countDown(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public void waitForCompletion() throws InterruptedException { |
|
|
|
|
this.countDownLatch.await(2, TimeUnit.SECONDS); |
|
|
|
|
assertEquals("callback was not invoked", 0, this.countDownLatch.getCount()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|