|
|
|
@ -178,6 +178,8 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe |
|
|
|
|
|
|
|
|
|
|
|
private int activeInvokerCount = 0; |
|
|
|
private int activeInvokerCount = 0; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private int registeredWithDestination = 0; |
|
|
|
|
|
|
|
|
|
|
|
private Runnable stopCallback; |
|
|
|
private Runnable stopCallback; |
|
|
|
|
|
|
|
|
|
|
|
private Object currentRecoveryMarker = new Object(); |
|
|
|
private Object currentRecoveryMarker = new Object(); |
|
|
|
@ -577,6 +579,27 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
|
|
* Return whether at lease one consumer has entered a fixed registration with the |
|
|
|
|
|
|
|
* target destination. This is particularly interesting for the pub-sub case where |
|
|
|
|
|
|
|
* it might be important to have an actual consumer registered that is guaranteed |
|
|
|
|
|
|
|
* to not miss any messages that are just about to be published. |
|
|
|
|
|
|
|
* <p>This method may be polled after a {@link #start()} call, until asynchronous |
|
|
|
|
|
|
|
* registration of consumers has happened which is when the method will start returning |
|
|
|
|
|
|
|
* <code>true</code> - provided that the listener container actually ever establishes |
|
|
|
|
|
|
|
* a fixed registration. It will then keep returning <code>true</code> until shutdown, |
|
|
|
|
|
|
|
* since the container will hold on to at least one consumer registration thereafter. |
|
|
|
|
|
|
|
* <p>Note that a listener container is not bound to having a fixed registration in |
|
|
|
|
|
|
|
* the first place. It may also keep recreating consumers for every invoker execution. |
|
|
|
|
|
|
|
* This particularly depends on the {@link #setCacheLevel cache level} setting: |
|
|
|
|
|
|
|
* Only CACHE_CONSUMER will lead to a fixed registration. |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
public boolean isRegisteredWithDestination() { |
|
|
|
|
|
|
|
synchronized (this.lifecycleMonitor) { |
|
|
|
|
|
|
|
return (this.registeredWithDestination > 0); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
* Create a default TaskExecutor. Called if no explicit TaskExecutor has been specified. |
|
|
|
* Create a default TaskExecutor. Called if no explicit TaskExecutor has been specified. |
|
|
|
@ -1026,6 +1049,9 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe |
|
|
|
} |
|
|
|
} |
|
|
|
if (this.consumer == null && getCacheLevel() >= CACHE_CONSUMER) { |
|
|
|
if (this.consumer == null && getCacheLevel() >= CACHE_CONSUMER) { |
|
|
|
this.consumer = createListenerConsumer(this.session); |
|
|
|
this.consumer = createListenerConsumer(this.session); |
|
|
|
|
|
|
|
synchronized (lifecycleMonitor) { |
|
|
|
|
|
|
|
registeredWithDestination++; |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
@ -1047,6 +1073,11 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe |
|
|
|
JmsUtils.closeMessageConsumer(this.consumer); |
|
|
|
JmsUtils.closeMessageConsumer(this.consumer); |
|
|
|
JmsUtils.closeSession(this.session); |
|
|
|
JmsUtils.closeSession(this.session); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if (this.consumer != null) { |
|
|
|
|
|
|
|
synchronized (lifecycleMonitor) { |
|
|
|
|
|
|
|
registeredWithDestination--; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
this.consumer = null; |
|
|
|
this.consumer = null; |
|
|
|
this.session = null; |
|
|
|
this.session = null; |
|
|
|
} |
|
|
|
} |
|
|
|
|