|
|
|
@ -735,14 +735,20 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe |
|
|
|
long receiveTimeout = getReceiveTimeout(); |
|
|
|
long receiveTimeout = getReceiveTimeout(); |
|
|
|
long waitStartTime = System.currentTimeMillis(); |
|
|
|
long waitStartTime = System.currentTimeMillis(); |
|
|
|
int waitCount = 0; |
|
|
|
int waitCount = 0; |
|
|
|
|
|
|
|
boolean interrupted = false; |
|
|
|
while (this.activeInvokerCount > 0) { |
|
|
|
while (this.activeInvokerCount > 0) { |
|
|
|
if (waitCount > 0 && !isAcceptMessagesWhileStopping() && |
|
|
|
if (waitCount > 0 && !isAcceptMessagesWhileStopping() && |
|
|
|
System.currentTimeMillis() - waitStartTime >= receiveTimeout) { |
|
|
|
System.currentTimeMillis() - waitStartTime >= receiveTimeout) { |
|
|
|
// Unexpectedly some invokers are still active after the receive timeout period
|
|
|
|
// Unexpectedly some invokers are still active after the receive timeout period
|
|
|
|
// -> interrupt remaining receive attempts since we'd reject the messages anyway
|
|
|
|
// -> interrupt remaining receive attempts since we'd reject the messages anyway
|
|
|
|
|
|
|
|
if (interrupted) { |
|
|
|
|
|
|
|
// Already interrupted -> not worth waiting any longer...
|
|
|
|
|
|
|
|
break; |
|
|
|
|
|
|
|
} |
|
|
|
for (AsyncMessageListenerInvoker scheduledInvoker : this.scheduledInvokers) { |
|
|
|
for (AsyncMessageListenerInvoker scheduledInvoker : this.scheduledInvokers) { |
|
|
|
scheduledInvoker.interruptIfNecessary(); |
|
|
|
scheduledInvoker.interruptIfNecessary(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
interrupted = true; |
|
|
|
} |
|
|
|
} |
|
|
|
if (logger.isDebugEnabled()) { |
|
|
|
if (logger.isDebugEnabled()) { |
|
|
|
logger.debug("Still waiting for shutdown of " + this.activeInvokerCount + |
|
|
|
logger.debug("Still waiting for shutdown of " + this.activeInvokerCount + |
|
|
|
|