|
|
|
@ -19,6 +19,7 @@ package org.springframework.messaging.simp.broker; |
|
|
|
import java.security.Principal; |
|
|
|
import java.security.Principal; |
|
|
|
import java.util.Arrays; |
|
|
|
import java.util.Arrays; |
|
|
|
import java.util.Collection; |
|
|
|
import java.util.Collection; |
|
|
|
|
|
|
|
import java.util.List; |
|
|
|
import java.util.Map; |
|
|
|
import java.util.Map; |
|
|
|
import java.util.concurrent.ConcurrentHashMap; |
|
|
|
import java.util.concurrent.ConcurrentHashMap; |
|
|
|
import java.util.concurrent.ScheduledFuture; |
|
|
|
import java.util.concurrent.ScheduledFuture; |
|
|
|
@ -333,11 +334,11 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { |
|
|
|
logger.debug("Broadcasting to " + subscriptions.size() + " sessions."); |
|
|
|
logger.debug("Broadcasting to " + subscriptions.size() + " sessions."); |
|
|
|
} |
|
|
|
} |
|
|
|
long now = System.currentTimeMillis(); |
|
|
|
long now = System.currentTimeMillis(); |
|
|
|
for (String sessionId : subscriptions.keySet()) { |
|
|
|
for (Map.Entry<String, List<String>> subscriptionEntry : subscriptions.entrySet()) { |
|
|
|
for (String subscriptionId : subscriptions.get(sessionId)) { |
|
|
|
for (String subscriptionId : subscriptionEntry.getValue()) { |
|
|
|
SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE); |
|
|
|
SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE); |
|
|
|
initHeaders(headerAccessor); |
|
|
|
initHeaders(headerAccessor); |
|
|
|
headerAccessor.setSessionId(sessionId); |
|
|
|
headerAccessor.setSessionId(subscriptionEntry.getKey()); |
|
|
|
headerAccessor.setSubscriptionId(subscriptionId); |
|
|
|
headerAccessor.setSubscriptionId(subscriptionId); |
|
|
|
headerAccessor.copyHeadersIfAbsent(message.getHeaders()); |
|
|
|
headerAccessor.copyHeadersIfAbsent(message.getHeaders()); |
|
|
|
Object payload = message.getPayload(); |
|
|
|
Object payload = message.getPayload(); |
|
|
|
@ -349,7 +350,7 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { |
|
|
|
logger.error("Failed to send " + message, ex); |
|
|
|
logger.error("Failed to send " + message, ex); |
|
|
|
} |
|
|
|
} |
|
|
|
finally { |
|
|
|
finally { |
|
|
|
SessionInfo info = this.sessions.get(sessionId); |
|
|
|
SessionInfo info = this.sessions.get(subscriptionEntry.getKey()); |
|
|
|
if (info != null) { |
|
|
|
if (info != null) { |
|
|
|
info.setLastWriteTime(now); |
|
|
|
info.setLastWriteTime(now); |
|
|
|
} |
|
|
|
} |
|
|
|
|