diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java index fa38761851b..895fbfda92f 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java @@ -96,8 +96,8 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { SessionSubscriptionInfo info = this.subscriptionRegistry.getSubscriptions(sessionId); if (info != null) { String destination = info.removeSubscription(subsId); - if (destination != null && info.getSubscriptions(destination) == null) { - this.destinationCache.updateAfterRemovedSubscription(destination, sessionId, subsId); + if (destination != null) { + this.destinationCache.updateAfterRemovedSubscription(sessionId, subsId); } } } @@ -187,54 +187,51 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { } } - public void updateAfterRemovedSubscription(String destination, String sessionId, String subsId) { + public void updateAfterRemovedSubscription(String sessionId, String subsId) { synchronized (this.updateCache) { Set destinationsToRemove = new HashSet(); for (Map.Entry> entry : this.updateCache.entrySet()) { - String cachedDestination = entry.getKey(); - if (getPathMatcher().match(destination, cachedDestination)) { - MultiValueMap subs = entry.getValue(); - List subsIds = subs.get(sessionId); - subsIds.remove(subsId); - if (subsIds.isEmpty()) { - subs.remove(sessionId); + String destination = entry.getKey(); + MultiValueMap sessionMap = entry.getValue(); + List subscriptions = sessionMap.get(sessionId); + if (subscriptions != null) { + subscriptions.remove(subsId); + if (subscriptions.isEmpty()) { + sessionMap.remove(sessionId); } - if (subs.isEmpty()) { - destinationsToRemove.add(cachedDestination); + if (sessionMap.isEmpty()) { + destinationsToRemove.add(destination); } else { - this.accessCache.put(cachedDestination, new LinkedMultiValueMap(subs)); + this.accessCache.put(destination, new LinkedMultiValueMap(sessionMap)); } } } - for (String destinationToRemove : destinationsToRemove) { - this.updateCache.remove(destinationToRemove); - this.accessCache.remove(destinationToRemove); + for (String destination : destinationsToRemove) { + this.updateCache.remove(destination); + this.accessCache.remove(destination); } } } public void updateAfterRemovedSession(SessionSubscriptionInfo info) { synchronized (this.updateCache) { - for (String destination : info.getDestinations()) { - Set destinationsToRemove = new HashSet(); - for (Map.Entry> entry : this.updateCache.entrySet()) { - String cachedDestination = entry.getKey(); - if (getPathMatcher().match(destination, cachedDestination)) { - MultiValueMap subs = entry.getValue(); - subs.remove(info.getSessionId()); - if (subs.isEmpty()) { - destinationsToRemove.add(cachedDestination); - } - else { - this.accessCache.put(cachedDestination,new LinkedMultiValueMap(subs)); - } + Set destinationsToRemove = new HashSet(); + for (Map.Entry> entry : this.updateCache.entrySet()) { + String destination = entry.getKey(); + MultiValueMap sessionMap = entry.getValue(); + if (sessionMap.remove(info.getSessionId()) != null) { + if (sessionMap.isEmpty()) { + destinationsToRemove.add(destination); + } + else { + this.accessCache.put(destination, new LinkedMultiValueMap(sessionMap)); } } - for (String d : destinationsToRemove) { - this.updateCache.remove(d); - this.accessCache.remove(d); - } + } + for (String destination : destinationsToRemove) { + this.updateCache.remove(destination); + this.accessCache.remove(destination); } } } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistryTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistryTests.java index ce7b7aaa971..21da6156f56 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistryTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistryTests.java @@ -270,6 +270,27 @@ public class DefaultSubscriptionRegistryTests { assertEquals(subscriptionIds, sort(actual.get(sessIds.get(2)))); } + // SPR-11931 + + @Test + public void registerTwiceAndUnregisterSubscriptions() { + + this.registry.registerSubscription(subscribeMessage("sess01", "subs01", "/foo")); + this.registry.registerSubscription(subscribeMessage("sess01", "subs02", "/foo")); + MultiValueMap actual = this.registry.findSubscriptions(message("/foo")); + assertEquals("Expected 1 element", 1, actual.size()); + assertEquals(Arrays.asList("subs01", "subs02"), actual.get("sess01")); + + this.registry.unregisterSubscription(unsubscribeMessage("sess01", "subs01")); + actual = this.registry.findSubscriptions(message("/foo")); + assertEquals("Expected 1 element", 1, actual.size()); + assertEquals(Arrays.asList("subs02"), actual.get("sess01")); + + this.registry.unregisterSubscription(unsubscribeMessage("sess01", "subs02")); + actual = this.registry.findSubscriptions(message("/foo")); + assertEquals("Expected no element", 0, actual.size()); + } + @Test public void unregisterAllSubscriptions() {