diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/AbstractJmsListeningContainer.java b/spring-jms/src/main/java/org/springframework/jms/listener/AbstractJmsListeningContainer.java index 058a508f4cd..8ed0f74fe25 100644 --- a/spring-jms/src/main/java/org/springframework/jms/listener/AbstractJmsListeningContainer.java +++ b/spring-jms/src/main/java/org/springframework/jms/listener/AbstractJmsListeningContainer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2023 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,9 @@ package org.springframework.jms.listener; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import jakarta.jms.Connection; import jakarta.jms.JMSException; @@ -77,7 +80,7 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess private boolean sharedConnectionStarted = false; - protected final Object sharedConnectionMonitor = new Object(); + protected final Lock sharedConnectionLock = new ReentrantLock(); private boolean active = false; @@ -85,7 +88,9 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess private final List pausedTasks = new ArrayList<>(); - protected final Object lifecycleMonitor = new Object(); + protected final Lock lifecycleLock = new ReentrantLock(); + + protected final Condition lifecycleCondition = this.lifecycleLock.newCondition(); /** @@ -199,9 +204,13 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess */ public void initialize() throws JmsException { try { - synchronized (this.lifecycleMonitor) { + this.lifecycleLock.lock(); + try { this.active = true; - this.lifecycleMonitor.notifyAll(); + this.lifecycleCondition.signalAll(); + } + finally { + this.lifecycleLock.unlock(); } doInitialize(); } @@ -218,13 +227,18 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess */ public void shutdown() throws JmsException { logger.debug("Shutting down JMS listener container"); + boolean wasRunning; - synchronized (this.lifecycleMonitor) { + this.lifecycleLock.lock(); + try { wasRunning = this.running; this.running = false; this.active = false; this.pausedTasks.clear(); - this.lifecycleMonitor.notifyAll(); + this.lifecycleCondition.signalAll(); + } + finally { + this.lifecycleLock.unlock(); } // Stop shared Connection early, if necessary. @@ -256,9 +270,13 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess * that is, whether it has been set up but not shut down yet. */ public final boolean isActive() { - synchronized (this.lifecycleMonitor) { + this.lifecycleLock.lock(); + try { return this.active; } + finally { + this.lifecycleLock.unlock(); + } } /** @@ -288,11 +306,15 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess } // Reschedule paused tasks, if any. - synchronized (this.lifecycleMonitor) { + this.lifecycleLock.lock(); + try { this.running = true; - this.lifecycleMonitor.notifyAll(); + this.lifecycleCondition.signalAll(); resumePausedTasks(); } + finally { + this.lifecycleLock.unlock(); + } // Start the shared Connection, if any. if (sharedConnectionEnabled()) { @@ -321,9 +343,13 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess * @see #stopSharedConnection */ protected void doStop() throws JMSException { - synchronized (this.lifecycleMonitor) { + this.lifecycleLock.lock(); + try { this.running = false; - this.lifecycleMonitor.notifyAll(); + this.lifecycleCondition.signalAll(); + } + finally { + this.lifecycleLock.unlock(); } if (sharedConnectionEnabled()) { @@ -370,12 +396,16 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess * @throws JMSException if thrown by JMS API methods */ protected void establishSharedConnection() throws JMSException { - synchronized (this.sharedConnectionMonitor) { + this.sharedConnectionLock.lock(); + try { if (this.sharedConnection == null) { this.sharedConnection = createSharedConnection(); logger.debug("Established shared JMS Connection"); } } + finally { + this.sharedConnectionLock.unlock(); + } } /** @@ -385,13 +415,17 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess * @throws JMSException if thrown by JMS API methods */ protected final void refreshSharedConnection() throws JMSException { - synchronized (this.sharedConnectionMonitor) { + this.sharedConnectionLock.lock(); + try { releaseSharedConnection(); this.sharedConnection = createSharedConnection(); if (this.sharedConnectionStarted) { this.sharedConnection.start(); } } + finally { + this.sharedConnectionLock.unlock(); + } } /** @@ -435,7 +469,8 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess * @see jakarta.jms.Connection#start() */ protected void startSharedConnection() throws JMSException { - synchronized (this.sharedConnectionMonitor) { + this.sharedConnectionLock.lock(); + try { this.sharedConnectionStarted = true; if (this.sharedConnection != null) { try { @@ -446,6 +481,9 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess } } } + finally { + this.sharedConnectionLock.unlock(); + } } /** @@ -454,7 +492,8 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess * @see jakarta.jms.Connection#start() */ protected void stopSharedConnection() throws JMSException { - synchronized (this.sharedConnectionMonitor) { + this.sharedConnectionLock.lock(); + try { this.sharedConnectionStarted = false; if (this.sharedConnection != null) { try { @@ -465,6 +504,9 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess } } } + finally { + this.sharedConnectionLock.unlock(); + } } /** @@ -473,11 +515,15 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess * @see ConnectionFactoryUtils#releaseConnection */ protected final void releaseSharedConnection() { - synchronized (this.sharedConnectionMonitor) { + this.sharedConnectionLock.lock(); + try { ConnectionFactoryUtils.releaseConnection( this.sharedConnection, getConnectionFactory(), this.sharedConnectionStarted); this.sharedConnection = null; } + finally { + this.sharedConnectionLock.unlock(); + } } /** @@ -493,13 +539,17 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess throw new IllegalStateException( "This listener container does not maintain a shared Connection"); } - synchronized (this.sharedConnectionMonitor) { + this.sharedConnectionLock.lock(); + try { if (this.sharedConnection == null) { throw new SharedConnectionNotInitializedException( "This listener container's shared Connection has not been initialized yet"); } return this.sharedConnection; } + finally { + this.sharedConnectionLock.unlock(); + } } @@ -543,7 +593,8 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess * Tasks for which rescheduling failed simply remain in paused mode. */ protected void resumePausedTasks() { - synchronized (this.lifecycleMonitor) { + this.lifecycleLock.lock(); + try { if (!this.pausedTasks.isEmpty()) { for (Iterator it = this.pausedTasks.iterator(); it.hasNext();) { Object task = it.next(); @@ -561,15 +612,22 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess } } } + finally { + this.lifecycleLock.unlock(); + } } /** * Determine the number of currently paused tasks, if any. */ public int getPausedTaskCount() { - synchronized (this.lifecycleMonitor) { + this.lifecycleLock.lock(); + try { return this.pausedTasks.size(); } + finally { + this.lifecycleLock.unlock(); + } } /** diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java b/spring-jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java index 31de9c0f58d..819d5eb26e2 100644 --- a/spring-jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java +++ b/spring-jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2023 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,9 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import jakarta.jms.Connection; import jakarta.jms.JMSException; @@ -190,6 +193,8 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe @Nullable private Executor taskExecutor; + private boolean virtualThreads = false; + private BackOff backOff = new FixedBackOff(DEFAULT_RECOVERY_INTERVAL, Long.MAX_VALUE); private int cacheLevel = CACHE_AUTO; @@ -221,7 +226,7 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe private Object currentRecoveryMarker = new Object(); - private final Object recoveryMonitor = new Object(); + private final Lock recoveryLock = new ReentrantLock(); /** @@ -241,6 +246,25 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe this.taskExecutor = taskExecutor; } + /** + * Specify whether the default {@link SimpleAsyncTaskExecutor} should be + * configured to use virtual threads instead of platform threads, for + * efficient blocking behavior in listener threads on Java 21 or higher. + * This is off by default, setting up one platform thread per consumer. + *

Only applicable if the internal default executor is in use rather than + * an externally provided {@link #setTaskExecutor TaskExecutor} instance. + * The thread name prefix for virtual threads will be derived from the + * listener container's bean name, just like with default platform threads. + *

Alternatively, pass in a virtual threads based executor through + * {@link #setTaskExecutor} (with externally defined thread naming). + * @since 6.2 + * @see #setTaskExecutor + * @see SimpleAsyncTaskExecutor#setVirtualThreads + */ + public void setVirtualThreads(boolean virtualThreads) { + this.virtualThreads = virtualThreads; + } + /** * Specify the {@link BackOff} instance to use to compute the interval * between recovery attempts. If the {@link BackOffExecution} implementation @@ -364,12 +388,16 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe */ public void setConcurrentConsumers(int concurrentConsumers) { Assert.isTrue(concurrentConsumers > 0, "'concurrentConsumers' value must be at least 1 (one)"); - synchronized (this.lifecycleMonitor) { + this.lifecycleLock.lock(); + try { this.concurrentConsumers = concurrentConsumers; if (this.maxConcurrentConsumers < concurrentConsumers) { this.maxConcurrentConsumers = concurrentConsumers; } } + finally { + this.lifecycleLock.unlock(); + } } /** @@ -380,9 +408,13 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe * @see #getActiveConsumerCount() */ public final int getConcurrentConsumers() { - synchronized (this.lifecycleMonitor) { + this.lifecycleLock.lock(); + try { return this.concurrentConsumers; } + finally { + this.lifecycleLock.unlock(); + } } /** @@ -404,9 +436,13 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe */ public void setMaxConcurrentConsumers(int maxConcurrentConsumers) { Assert.isTrue(maxConcurrentConsumers > 0, "'maxConcurrentConsumers' value must be at least 1 (one)"); - synchronized (this.lifecycleMonitor) { + this.lifecycleLock.lock(); + try { this.maxConcurrentConsumers = Math.max(maxConcurrentConsumers, this.concurrentConsumers); } + finally { + this.lifecycleLock.unlock(); + } } /** @@ -417,9 +453,13 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe * @see #getActiveConsumerCount() */ public final int getMaxConcurrentConsumers() { - synchronized (this.lifecycleMonitor) { + this.lifecycleLock.lock(); + try { return this.maxConcurrentConsumers; } + finally { + this.lifecycleLock.unlock(); + } } /** @@ -446,18 +486,26 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe */ public void setMaxMessagesPerTask(int maxMessagesPerTask) { Assert.isTrue(maxMessagesPerTask != 0, "'maxMessagesPerTask' must not be 0"); - synchronized (this.lifecycleMonitor) { + this.lifecycleLock.lock(); + try { this.maxMessagesPerTask = maxMessagesPerTask; } + finally { + this.lifecycleLock.unlock(); + } } /** * Return the maximum number of messages to process in one task. */ public final int getMaxMessagesPerTask() { - synchronized (this.lifecycleMonitor) { + this.lifecycleLock.lock(); + try { return this.maxMessagesPerTask; } + finally { + this.lifecycleLock.unlock(); + } } /** @@ -472,18 +520,26 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe */ public void setIdleConsumerLimit(int idleConsumerLimit) { Assert.isTrue(idleConsumerLimit > 0, "'idleConsumerLimit' must be 1 or higher"); - synchronized (this.lifecycleMonitor) { + this.lifecycleLock.lock(); + try { this.idleConsumerLimit = idleConsumerLimit; } + finally { + this.lifecycleLock.unlock(); + } } /** * Return the limit for the number of idle consumers. */ public final int getIdleConsumerLimit() { - synchronized (this.lifecycleMonitor) { + this.lifecycleLock.lock(); + try { return this.idleConsumerLimit; } + finally { + this.lifecycleLock.unlock(); + } } /** @@ -515,18 +571,26 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe */ public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) { Assert.isTrue(idleTaskExecutionLimit > 0, "'idleTaskExecutionLimit' must be 1 or higher"); - synchronized (this.lifecycleMonitor) { + this.lifecycleLock.lock(); + try { this.idleTaskExecutionLimit = idleTaskExecutionLimit; } + finally { + this.lifecycleLock.unlock(); + } } /** * Return the limit for idle executions of a consumer task. */ public final int getIdleTaskExecutionLimit() { - synchronized (this.lifecycleMonitor) { + this.lifecycleLock.lock(); + try { return this.idleTaskExecutionLimit; } + finally { + this.lifecycleLock.unlock(); + } } /** @@ -556,9 +620,13 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe */ public void setIdleReceivesPerTaskLimit(int idleReceivesPerTaskLimit) { Assert.isTrue(idleReceivesPerTaskLimit != 0, "'idleReceivesPerTaskLimit' must not be 0)"); - synchronized (this.lifecycleMonitor) { + this.lifecycleLock.lock(); + try { this.idleReceivesPerTaskLimit = idleReceivesPerTaskLimit; } + finally { + this.lifecycleLock.unlock(); + } } /** @@ -567,9 +635,13 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe * @since 5.3.5 */ public int getIdleReceivesPerTaskLimit() { - synchronized (this.lifecycleMonitor) { + this.lifecycleLock.lock(); + try { return this.idleReceivesPerTaskLimit; } + finally { + this.lifecycleLock.unlock(); + } } @@ -585,7 +657,8 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe } // Prepare taskExecutor and maxMessagesPerTask. - synchronized (this.lifecycleMonitor) { + this.lifecycleLock.lock(); + try { if (this.taskExecutor == null) { this.taskExecutor = createDefaultTaskExecutor(); } @@ -598,6 +671,9 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe this.maxMessagesPerTask = 10; } } + finally { + this.lifecycleLock.unlock(); + } // Proceed with actual listener initialization. super.initialize(); @@ -612,11 +688,15 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe */ @Override protected void doInitialize() throws JMSException { - synchronized (this.lifecycleMonitor) { + this.lifecycleLock.lock(); + try { for (int i = 0; i < this.concurrentConsumers; i++) { scheduleNewInvoker(); } } + finally { + this.lifecycleLock.unlock(); + } } /** @@ -625,44 +705,46 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe @Override protected void doShutdown() throws JMSException { logger.debug("Waiting for shutdown of message listener invokers"); + this.lifecycleLock.lock(); try { - synchronized (this.lifecycleMonitor) { - long receiveTimeout = getReceiveTimeout(); - long waitStartTime = System.currentTimeMillis(); - int waitCount = 0; - while (this.activeInvokerCount > 0) { - if (waitCount > 0 && !isAcceptMessagesWhileStopping() && - System.currentTimeMillis() - waitStartTime >= receiveTimeout) { - // Unexpectedly some invokers are still active after the receive timeout period - // -> interrupt remaining receive attempts since we'd reject the messages anyway - for (AsyncMessageListenerInvoker scheduledInvoker : this.scheduledInvokers) { - scheduledInvoker.interruptIfNecessary(); - } - } - if (logger.isDebugEnabled()) { - logger.debug("Still waiting for shutdown of " + this.activeInvokerCount + - " message listener invokers (iteration " + waitCount + ")"); - } - // Wait for AsyncMessageListenerInvokers to deactivate themselves... - if (receiveTimeout > 0) { - this.lifecycleMonitor.wait(receiveTimeout); - } - else { - this.lifecycleMonitor.wait(); + long receiveTimeout = getReceiveTimeout(); + long waitStartTime = System.currentTimeMillis(); + int waitCount = 0; + while (this.activeInvokerCount > 0) { + if (waitCount > 0 && !isAcceptMessagesWhileStopping() && + System.currentTimeMillis() - waitStartTime >= receiveTimeout) { + // Unexpectedly some invokers are still active after the receive timeout period + // -> interrupt remaining receive attempts since we'd reject the messages anyway + for (AsyncMessageListenerInvoker scheduledInvoker : this.scheduledInvokers) { + scheduledInvoker.interruptIfNecessary(); } - waitCount++; } - // Clear remaining scheduled invokers, possibly left over as paused tasks - for (AsyncMessageListenerInvoker scheduledInvoker : this.scheduledInvokers) { - scheduledInvoker.clearResources(); + if (logger.isDebugEnabled()) { + logger.debug("Still waiting for shutdown of " + this.activeInvokerCount + + " message listener invokers (iteration " + waitCount + ")"); + } + // Wait for AsyncMessageListenerInvokers to deactivate themselves... + if (receiveTimeout > 0) { + this.lifecycleCondition.await(receiveTimeout, TimeUnit.MILLISECONDS); + } + else { + this.lifecycleCondition.await(); } - this.scheduledInvokers.clear(); + waitCount++; + } + // Clear remaining scheduled invokers, possibly left over as paused tasks + for (AsyncMessageListenerInvoker scheduledInvoker : this.scheduledInvokers) { + scheduledInvoker.clearResources(); } + this.scheduledInvokers.clear(); } catch (InterruptedException ex) { // Re-interrupt current thread, to allow other threads to react. Thread.currentThread().interrupt(); } + finally { + this.lifecycleLock.unlock(); + } } /** @@ -670,9 +752,13 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe */ @Override public void start() throws JmsException { - synchronized (this.lifecycleMonitor) { + this.lifecycleLock.lock(); + try { this.stopCallback = null; } + finally { + this.lifecycleLock.unlock(); + } super.start(); } @@ -691,7 +777,8 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe */ @Override public void stop(Runnable callback) throws JmsException { - synchronized (this.lifecycleMonitor) { + this.lifecycleLock.lock(); + try { if (!isRunning() || this.stopCallback != null) { // Not started, already stopped, or previous stop attempt in progress // -> return immediately, no stop process to control anymore. @@ -700,6 +787,9 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe } this.stopCallback = callback; } + finally { + this.lifecycleLock.unlock(); + } stop(); } @@ -713,9 +803,13 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe * @see #getActiveConsumerCount() */ public final int getScheduledConsumerCount() { - synchronized (this.lifecycleMonitor) { + this.lifecycleLock.lock(); + try { return this.scheduledInvokers.size(); } + finally { + this.lifecycleLock.unlock(); + } } /** @@ -728,9 +822,13 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe * @see #getActiveConsumerCount() */ public final int getActiveConsumerCount() { - synchronized (this.lifecycleMonitor) { + this.lifecycleLock.lock(); + try { return this.activeInvokerCount; } + finally { + this.lifecycleLock.unlock(); + } } /** @@ -749,9 +847,13 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe * only {@link #CACHE_CONSUMER} will lead to a fixed registration. */ public boolean isRegisteredWithDestination() { - synchronized (this.lifecycleMonitor) { + this.lifecycleLock.lock(); + try { return (this.registeredWithDestination > 0); } + finally { + this.lifecycleLock.unlock(); + } } @@ -760,11 +862,15 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe *

The default implementation builds a {@link org.springframework.core.task.SimpleAsyncTaskExecutor} * with the specified bean name (or the class name, if no bean name specified) as thread name prefix. * @see org.springframework.core.task.SimpleAsyncTaskExecutor#SimpleAsyncTaskExecutor(String) + * @see #setVirtualThreads */ protected TaskExecutor createDefaultTaskExecutor() { String beanName = getBeanName(); String threadNamePrefix = (beanName != null ? beanName + "-" : DEFAULT_THREAD_NAME_PREFIX); - return new SimpleAsyncTaskExecutor(threadNamePrefix); + + SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor(threadNamePrefix); + executor.setVirtualThreads(this.virtualThreads); + return executor; } /** @@ -831,7 +937,8 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe protected void scheduleNewInvokerIfAppropriate() { if (isRunning()) { resumePausedTasks(); - synchronized (this.lifecycleMonitor) { + this.lifecycleLock.lock(); + try { if (this.scheduledInvokers.size() < this.maxConcurrentConsumers && getIdleInvokerCount() < this.idleConsumerLimit) { scheduleNewInvoker(); @@ -840,6 +947,9 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe } } } + finally { + this.lifecycleLock.unlock(); + } } } @@ -1072,10 +1182,9 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe return false; } else { + this.lifecycleLock.lock(); try { - synchronized (this.lifecycleMonitor) { - this.lifecycleMonitor.wait(interval); - } + this.lifecycleCondition.await(interval, TimeUnit.MILLISECONDS); } catch (InterruptedException interEx) { // Re-interrupt current thread, to allow other threads to react. @@ -1084,6 +1193,9 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe this.interrupted = true; } } + finally { + this.lifecycleLock.unlock(); + } return true; } } @@ -1129,9 +1241,13 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe @Override public void run() { - synchronized (lifecycleMonitor) { + lifecycleLock.lock(); + try { activeInvokerCount++; - lifecycleMonitor.notifyAll(); + lifecycleCondition.signalAll(); + } + finally { + lifecycleLock.unlock(); } boolean messageReceived = false; try { @@ -1161,7 +1277,8 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe } this.lastMessageSucceeded = false; boolean alreadyRecovered = false; - synchronized (recoveryMonitor) { + recoveryLock.lock(); + try { if (this.lastRecoveryMarker == currentRecoveryMarker) { handleListenerSetupFailure(ex, false); recoverAfterListenerSetupFailure(); @@ -1171,14 +1288,21 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe alreadyRecovered = true; } } + finally { + recoveryLock.unlock(); + } if (alreadyRecovered) { handleListenerSetupFailure(ex, true); } } finally { - synchronized (lifecycleMonitor) { + lifecycleLock.lock(); + try { decreaseActiveInvokerCount(); - lifecycleMonitor.notifyAll(); + lifecycleCondition.signalAll(); + } + finally { + lifecycleLock.unlock(); } if (!messageReceived) { this.idleTaskExecutionCount++; @@ -1186,14 +1310,15 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe else { this.idleTaskExecutionCount = 0; } - synchronized (lifecycleMonitor) { + lifecycleLock.lock(); + try { if (!shouldRescheduleInvoker(this.idleTaskExecutionCount) || !rescheduleTaskIfNecessary(this)) { // We're shutting down completely. scheduledInvokers.remove(this); if (logger.isDebugEnabled()) { logger.debug("Lowered scheduled invoker count: " + scheduledInvokers.size()); } - lifecycleMonitor.notifyAll(); + lifecycleCondition.signalAll(); clearResources(); } else if (isRunning()) { @@ -1209,6 +1334,9 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe } } } + finally { + lifecycleLock.unlock(); + } } } @@ -1216,7 +1344,8 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe boolean messageReceived = false; boolean active = true; while (active) { - synchronized (lifecycleMonitor) { + lifecycleLock.lock(); + try { boolean interrupted = false; boolean wasWaiting = false; while ((active = isActive()) && !isRunning()) { @@ -1229,7 +1358,7 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe } wasWaiting = true; try { - lifecycleMonitor.wait(); + lifecycleCondition.await(); } catch (InterruptedException ex) { // Re-interrupt current thread, to allow other threads to react. @@ -1244,6 +1373,9 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe active = false; } } + finally { + lifecycleLock.unlock(); + } if (active) { messageReceived = (invokeListener() || messageReceived); } @@ -1289,17 +1421,25 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe } if (this.consumer == null && getCacheLevel() >= CACHE_CONSUMER) { this.consumer = createListenerConsumer(this.session); - synchronized (lifecycleMonitor) { + lifecycleLock.lock(); + try { registeredWithDestination++; } + finally { + lifecycleLock.unlock(); + } } } } private void updateRecoveryMarker() { - synchronized (recoveryMonitor) { + recoveryLock.lock(); + try { this.lastRecoveryMarker = currentRecoveryMarker; } + finally { + recoveryLock.unlock(); + } } private void interruptIfNecessary() { @@ -1311,19 +1451,27 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe private void clearResources() { if (sharedConnectionEnabled()) { - synchronized (sharedConnectionMonitor) { + sharedConnectionLock.lock(); + try { JmsUtils.closeMessageConsumer(this.consumer); JmsUtils.closeSession(this.session); } + finally { + sharedConnectionLock.unlock(); + } } else { JmsUtils.closeMessageConsumer(this.consumer); JmsUtils.closeSession(this.session); } if (this.consumer != null) { - synchronized (lifecycleMonitor) { + lifecycleLock.lock(); + try { registeredWithDestination--; } + finally { + lifecycleLock.unlock(); + } } this.consumer = null; this.session = null; diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/SimpleMessageListenerContainer.java b/spring-jms/src/main/java/org/springframework/jms/listener/SimpleMessageListenerContainer.java index 9530b236586..0fe0a0efcbd 100644 --- a/spring-jms/src/main/java/org/springframework/jms/listener/SimpleMessageListenerContainer.java +++ b/spring-jms/src/main/java/org/springframework/jms/listener/SimpleMessageListenerContainer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,8 @@ package org.springframework.jms.listener; import java.util.HashSet; import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import jakarta.jms.Connection; import jakarta.jms.ConnectionFactory; @@ -80,7 +82,7 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta @Nullable private Set consumers; - private final Object consumersMonitor = new Object(); + private final Lock consumersLock = new ReentrantLock(); /** @@ -261,10 +263,14 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta logger.debug("Trying to recover from JMS Connection exception: " + ex); } try { - synchronized (this.consumersMonitor) { + this.consumersLock.lock(); + try { this.sessions = null; this.consumers = null; } + finally { + this.consumersLock.unlock(); + } refreshSharedConnection(); initializeConsumers(); logger.debug("Successfully refreshed JMS Connection"); @@ -282,7 +288,8 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta */ protected void initializeConsumers() throws JMSException { // Register Sessions and MessageConsumers. - synchronized (this.consumersMonitor) { + this.consumersLock.lock(); + try { if (this.consumers == null) { this.sessions = new HashSet<>(this.concurrentConsumers); this.consumers = new HashSet<>(this.concurrentConsumers); @@ -295,6 +302,9 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta } } } + finally { + this.consumersLock.unlock(); + } } /** @@ -355,7 +365,8 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta */ @Override protected void doShutdown() throws JMSException { - synchronized (this.consumersMonitor) { + this.consumersLock.lock(); + try { if (this.consumers != null) { logger.debug("Closing JMS MessageConsumers"); for (MessageConsumer consumer : this.consumers) { @@ -369,6 +380,9 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta } } } + finally { + this.consumersLock.unlock(); + } } }