Browse Source

Improve SubscriptionRegistry subscription removal

This a backport of:
ce20abde51
083d415fc4

Issue: SPR-11931, SPR-11930
pull/579/head
Rossen Stoyanchev 12 years ago
parent
commit
5092414842
  1. 63
      spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java
  2. 21
      spring-messaging/src/test/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistryTests.java

63
spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java

@ -96,8 +96,8 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { @@ -96,8 +96,8 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
SessionSubscriptionInfo info = this.subscriptionRegistry.getSubscriptions(sessionId);
if (info != null) {
String destination = info.removeSubscription(subsId);
if (destination != null && info.getSubscriptions(destination) == null) {
this.destinationCache.updateAfterRemovedSubscription(destination, sessionId, subsId);
if (destination != null) {
this.destinationCache.updateAfterRemovedSubscription(sessionId, subsId);
}
}
}
@ -187,54 +187,51 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { @@ -187,54 +187,51 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
}
}
public void updateAfterRemovedSubscription(String destination, String sessionId, String subsId) {
public void updateAfterRemovedSubscription(String sessionId, String subsId) {
synchronized (this.updateCache) {
Set<String> destinationsToRemove = new HashSet<String>();
for (Map.Entry<String, MultiValueMap<String, String>> entry : this.updateCache.entrySet()) {
String cachedDestination = entry.getKey();
if (getPathMatcher().match(destination, cachedDestination)) {
MultiValueMap<String, String> subs = entry.getValue();
List<String> subsIds = subs.get(sessionId);
subsIds.remove(subsId);
if (subsIds.isEmpty()) {
subs.remove(sessionId);
String destination = entry.getKey();
MultiValueMap<String, String> sessionMap = entry.getValue();
List<String> subscriptions = sessionMap.get(sessionId);
if (subscriptions != null) {
subscriptions.remove(subsId);
if (subscriptions.isEmpty()) {
sessionMap.remove(sessionId);
}
if (subs.isEmpty()) {
destinationsToRemove.add(cachedDestination);
if (sessionMap.isEmpty()) {
destinationsToRemove.add(destination);
}
else {
this.accessCache.put(cachedDestination, new LinkedMultiValueMap<String, String>(subs));
this.accessCache.put(destination, new LinkedMultiValueMap<String, String>(sessionMap));
}
}
}
for (String destinationToRemove : destinationsToRemove) {
this.updateCache.remove(destinationToRemove);
this.accessCache.remove(destinationToRemove);
for (String destination : destinationsToRemove) {
this.updateCache.remove(destination);
this.accessCache.remove(destination);
}
}
}
public void updateAfterRemovedSession(SessionSubscriptionInfo info) {
synchronized (this.updateCache) {
for (String destination : info.getDestinations()) {
Set<String> destinationsToRemove = new HashSet<String>();
for (Map.Entry<String, MultiValueMap<String, String>> entry : this.updateCache.entrySet()) {
String cachedDestination = entry.getKey();
if (getPathMatcher().match(destination, cachedDestination)) {
MultiValueMap<String, String> subs = entry.getValue();
subs.remove(info.getSessionId());
if (subs.isEmpty()) {
destinationsToRemove.add(cachedDestination);
}
else {
this.accessCache.put(cachedDestination,new LinkedMultiValueMap<String, String>(subs));
}
Set<String> destinationsToRemove = new HashSet<String>();
for (Map.Entry<String, MultiValueMap<String, String>> entry : this.updateCache.entrySet()) {
String destination = entry.getKey();
MultiValueMap<String, String> sessionMap = entry.getValue();
if (sessionMap.remove(info.getSessionId()) != null) {
if (sessionMap.isEmpty()) {
destinationsToRemove.add(destination);
}
else {
this.accessCache.put(destination, new LinkedMultiValueMap<String, String>(sessionMap));
}
}
for (String d : destinationsToRemove) {
this.updateCache.remove(d);
this.accessCache.remove(d);
}
}
for (String destination : destinationsToRemove) {
this.updateCache.remove(destination);
this.accessCache.remove(destination);
}
}
}

21
spring-messaging/src/test/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistryTests.java

@ -270,6 +270,27 @@ public class DefaultSubscriptionRegistryTests { @@ -270,6 +270,27 @@ public class DefaultSubscriptionRegistryTests {
assertEquals(subscriptionIds, sort(actual.get(sessIds.get(2))));
}
// SPR-11931
@Test
public void registerTwiceAndUnregisterSubscriptions() {
this.registry.registerSubscription(subscribeMessage("sess01", "subs01", "/foo"));
this.registry.registerSubscription(subscribeMessage("sess01", "subs02", "/foo"));
MultiValueMap<String, String> actual = this.registry.findSubscriptions(message("/foo"));
assertEquals("Expected 1 element", 1, actual.size());
assertEquals(Arrays.asList("subs01", "subs02"), actual.get("sess01"));
this.registry.unregisterSubscription(unsubscribeMessage("sess01", "subs01"));
actual = this.registry.findSubscriptions(message("/foo"));
assertEquals("Expected 1 element", 1, actual.size());
assertEquals(Arrays.asList("subs02"), actual.get("sess01"));
this.registry.unregisterSubscription(unsubscribeMessage("sess01", "subs02"));
actual = this.registry.findSubscriptions(message("/foo"));
assertEquals("Expected no element", 0, actual.size());
}
@Test
public void unregisterAllSubscriptions() {

Loading…
Cancel
Save