|
|
|
@ -263,8 +263,9 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { |
|
|
|
if (sessionIdToSubscriptionIds == null) { |
|
|
|
if (sessionIdToSubscriptionIds == null) { |
|
|
|
sessionIdToSubscriptionIds = this.destinationCache.computeIfAbsent(destination, _destination -> { |
|
|
|
sessionIdToSubscriptionIds = this.destinationCache.computeIfAbsent(destination, _destination -> { |
|
|
|
LinkedMultiValueMap<String, String> matches = computeMatchingSubscriptions(destination); |
|
|
|
LinkedMultiValueMap<String, String> matches = computeMatchingSubscriptions(destination); |
|
|
|
this.cacheSize.incrementAndGet(); |
|
|
|
// Update queue first, so that cacheSize <= queue.size(
|
|
|
|
this.cacheEvictionPolicy.add(destination); |
|
|
|
this.cacheEvictionPolicy.add(destination); |
|
|
|
|
|
|
|
this.cacheSize.incrementAndGet(); |
|
|
|
return matches; |
|
|
|
return matches; |
|
|
|
}); |
|
|
|
}); |
|
|
|
ensureCacheLimit(); |
|
|
|
ensureCacheLimit(); |
|
|
|
@ -309,7 +310,9 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { |
|
|
|
if (size > cacheLimit) { |
|
|
|
if (size > cacheLimit) { |
|
|
|
do { |
|
|
|
do { |
|
|
|
if (this.cacheSize.compareAndSet(size, size - 1)) { |
|
|
|
if (this.cacheSize.compareAndSet(size, size - 1)) { |
|
|
|
this.destinationCache.remove(this.cacheEvictionPolicy.poll()); |
|
|
|
// Remove (vs poll): we expect an element
|
|
|
|
|
|
|
|
String head = this.cacheEvictionPolicy.remove(); |
|
|
|
|
|
|
|
this.destinationCache.remove(head); |
|
|
|
} |
|
|
|
} |
|
|
|
} while ((size = this.cacheSize.get()) > cacheLimit); |
|
|
|
} while ((size = this.cacheSize.get()) > cacheLimit); |
|
|
|
} |
|
|
|
} |
|
|
|
|