diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java index bac3ef6ca0b..006346d55f0 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java @@ -263,8 +263,9 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { if (sessionIdToSubscriptionIds == null) { sessionIdToSubscriptionIds = this.destinationCache.computeIfAbsent(destination, _destination -> { LinkedMultiValueMap matches = computeMatchingSubscriptions(destination); - this.cacheSize.incrementAndGet(); + // Update queue first, so that cacheSize <= queue.size( this.cacheEvictionPolicy.add(destination); + this.cacheSize.incrementAndGet(); return matches; }); ensureCacheLimit(); @@ -309,7 +310,9 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { if (size > cacheLimit) { do { if (this.cacheSize.compareAndSet(size, size - 1)) { - this.destinationCache.remove(this.cacheEvictionPolicy.poll()); + // Remove (vs poll): we expect an element + String head = this.cacheEvictionPolicy.remove(); + this.destinationCache.remove(head); } } while ((size = this.cacheSize.get()) > cacheLimit); }