@ -977,36 +977,38 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
@@ -977,36 +977,38 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
handleListenerSetupFailure ( ex , true ) ;
}
}
synchronized ( lifecycleMonitor ) {
decreaseActiveInvokerCount ( ) ;
lifecycleMonitor . notifyAll ( ) ;
}
if ( ! messageReceived ) {
this . idleTaskExecutionCount + + ;
}
else {
this . idleTaskExecutionCount = 0 ;
}
synchronized ( lifecycleMonitor ) {
if ( ! shouldRescheduleInvoker ( this . idleTaskExecutionCount ) | | ! rescheduleTaskIfNecessary ( this ) ) {
// We're shutting down completely.
scheduledInvokers . remove ( this ) ;
if ( logger . isDebugEnabled ( ) ) {
logger . debug ( "Lowered scheduled invoker count: " + scheduledInvokers . size ( ) ) ;
}
finally {
synchronized ( lifecycleMonitor ) {
decreaseActiveInvokerCount ( ) ;
lifecycleMonitor . notifyAll ( ) ;
clearResources ( ) ;
}
else if ( isRunning ( ) ) {
int nonPausedConsumers = getScheduledConsumerCount ( ) - getPausedTaskCount ( ) ;
if ( nonPausedConsumers < 1 ) {
logger . error ( "All scheduled consumers have been paused, probably due to tasks having been rejected. " +
"Check your thread pool configuration! Manual recovery necessary through a start() call." ) ;
if ( ! messageReceived ) {
this . idleTaskExecutionCount + + ;
}
else {
this . idleTaskExecutionCount = 0 ;
}
synchronized ( lifecycleMonitor ) {
if ( ! shouldRescheduleInvoker ( this . idleTaskExecutionCount ) | | ! rescheduleTaskIfNecessary ( this ) ) {
// We're shutting down completely.
scheduledInvokers . remove ( this ) ;
if ( logger . isDebugEnabled ( ) ) {
logger . debug ( "Lowered scheduled invoker count: " + scheduledInvokers . size ( ) ) ;
}
lifecycleMonitor . notifyAll ( ) ;
clearResources ( ) ;
}
else if ( nonPausedConsumers < getConcurrentConsumers ( ) ) {
logger . warn ( "Number of scheduled consumers has dropped below concurrentConsumers limit, probably " +
"due to tasks having been rejected. Check your thread pool configuration! Automatic recovery " +
"to be triggered by remaining consumers." ) ;
else if ( isRunning ( ) ) {
int nonPausedConsumers = getScheduledConsumerCount ( ) - getPausedTaskCount ( ) ;
if ( nonPausedConsumers < 1 ) {
logger . error ( "All scheduled consumers have been paused, probably due to tasks having been rejected. " +
"Check your thread pool configuration! Manual recovery necessary through a start() call." ) ;
}
else if ( nonPausedConsumers < getConcurrentConsumers ( ) ) {
logger . warn ( "Number of scheduled consumers has dropped below concurrentConsumers limit, probably " +
"due to tasks having been rejected. Check your thread pool configuration! Automatic recovery " +
"to be triggered by remaining consumers." ) ;
}
}
}
}