|
|
|
@ -115,9 +115,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE |
|
|
|
* Since a STOMP message can be received in multiple WebSocket messages, |
|
|
|
* Since a STOMP message can be received in multiple WebSocket messages, |
|
|
|
* buffering may be required and therefore it is necessary to know the maximum |
|
|
|
* buffering may be required and therefore it is necessary to know the maximum |
|
|
|
* allowed message size. |
|
|
|
* allowed message size. |
|
|
|
* |
|
|
|
|
|
|
|
* <p>By default this property is set to 64K. |
|
|
|
* <p>By default this property is set to 64K. |
|
|
|
* |
|
|
|
|
|
|
|
* @since 4.0.3 |
|
|
|
* @since 4.0.3 |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
public void setMessageSizeLimit(int messageSizeLimit) { |
|
|
|
public void setMessageSizeLimit(int messageSizeLimit) { |
|
|
|
@ -126,7 +124,6 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
* Get the configured message buffer size limit in bytes. |
|
|
|
* Get the configured message buffer size limit in bytes. |
|
|
|
* |
|
|
|
|
|
|
|
* @since 4.0.3 |
|
|
|
* @since 4.0.3 |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
public int getMessageSizeLimit() { |
|
|
|
public int getMessageSizeLimit() { |
|
|
|
@ -142,7 +139,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
* @return the configured UserSessionRegistry. |
|
|
|
* Return the configured UserSessionRegistry. |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
public UserSessionRegistry getUserSessionRegistry() { |
|
|
|
public UserSessionRegistry getUserSessionRegistry() { |
|
|
|
return this.userSessionRegistry; |
|
|
|
return this.userSessionRegistry; |
|
|
|
@ -152,7 +149,6 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE |
|
|
|
* Configure a {@link MessageHeaderInitializer} to apply to the headers of all |
|
|
|
* Configure a {@link MessageHeaderInitializer} to apply to the headers of all |
|
|
|
* messages created from decoded STOMP frames and other messages sent to the |
|
|
|
* messages created from decoded STOMP frames and other messages sent to the |
|
|
|
* client inbound channel. |
|
|
|
* client inbound channel. |
|
|
|
* |
|
|
|
|
|
|
|
* <p>By default this property is not set. |
|
|
|
* <p>By default this property is not set. |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
public void setHeaderInitializer(MessageHeaderInitializer headerInitializer) { |
|
|
|
public void setHeaderInitializer(MessageHeaderInitializer headerInitializer) { |
|
|
|
@ -161,7 +157,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
* @return the configured header initializer. |
|
|
|
* Return the configured header initializer. |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
public MessageHeaderInitializer getHeaderInitializer() { |
|
|
|
public MessageHeaderInitializer getHeaderInitializer() { |
|
|
|
return this.headerInitializer; |
|
|
|
return this.headerInitializer; |
|
|
|
@ -274,7 +270,6 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE |
|
|
|
logger.error("Failed to send client message to application via MessageChannel" + |
|
|
|
logger.error("Failed to send client message to application via MessageChannel" + |
|
|
|
" in session " + session.getId() + ". Sending STOMP ERROR to client.", ex); |
|
|
|
" in session " + session.getId() + ". Sending STOMP ERROR to client.", ex); |
|
|
|
sendErrorMessage(session, ex); |
|
|
|
sendErrorMessage(session, ex); |
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
@ -300,13 +295,14 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE |
|
|
|
this.eventPublisher.publishEvent(event); |
|
|
|
this.eventPublisher.publishEvent(event); |
|
|
|
} |
|
|
|
} |
|
|
|
catch (Throwable ex) { |
|
|
|
catch (Throwable ex) { |
|
|
|
logger.error("Error publishing " + event + ".", ex); |
|
|
|
logger.error("Error publishing " + event, ex); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
protected void sendErrorMessage(WebSocketSession session, Throwable error) { |
|
|
|
protected void sendErrorMessage(WebSocketSession session, Throwable error) { |
|
|
|
StompHeaderAccessor headerAccessor = StompHeaderAccessor.create(StompCommand.ERROR); |
|
|
|
StompHeaderAccessor headerAccessor = StompHeaderAccessor.create(StompCommand.ERROR); |
|
|
|
headerAccessor.setMessage(error.getMessage()); |
|
|
|
headerAccessor.setMessage(error.getMessage()); |
|
|
|
|
|
|
|
|
|
|
|
byte[] bytes = this.stompEncoder.encode(headerAccessor.getMessageHeaders(), EMPTY_PAYLOAD); |
|
|
|
byte[] bytes = this.stompEncoder.encode(headerAccessor.getMessageHeaders(), EMPTY_PAYLOAD); |
|
|
|
try { |
|
|
|
try { |
|
|
|
session.sendMessage(new TextMessage(bytes)); |
|
|
|
session.sendMessage(new TextMessage(bytes)); |
|
|
|
@ -327,8 +323,10 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE |
|
|
|
logger.error("Expected byte[] payload. Ignoring " + message + "."); |
|
|
|
logger.error("Expected byte[] payload. Ignoring " + message + "."); |
|
|
|
return; |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
StompHeaderAccessor stompAccessor = getStompHeaderAccessor(message); |
|
|
|
StompHeaderAccessor stompAccessor = getStompHeaderAccessor(message); |
|
|
|
StompCommand command = stompAccessor.getCommand(); |
|
|
|
StompCommand command = stompAccessor.getCommand(); |
|
|
|
|
|
|
|
|
|
|
|
if (StompCommand.MESSAGE.equals(command)) { |
|
|
|
if (StompCommand.MESSAGE.equals(command)) { |
|
|
|
if (stompAccessor.getSubscriptionId() == null) { |
|
|
|
if (stompAccessor.getSubscriptionId() == null) { |
|
|
|
logger.warn("No STOMP \"subscription\" header in " + message); |
|
|
|
logger.warn("No STOMP \"subscription\" header in " + message); |
|
|
|
@ -374,7 +372,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE |
|
|
|
} |
|
|
|
} |
|
|
|
catch (Throwable ex) { |
|
|
|
catch (Throwable ex) { |
|
|
|
// Could be part of normal workflow (e.g. browser tab closed)
|
|
|
|
// Could be part of normal workflow (e.g. browser tab closed)
|
|
|
|
logger.debug("Failed to send WebSocket message to client in session " + session.getId() + ".", ex); |
|
|
|
logger.debug("Failed to send WebSocket message to client in session " + session.getId(), ex); |
|
|
|
command = StompCommand.ERROR; |
|
|
|
command = StompCommand.ERROR; |
|
|
|
} |
|
|
|
} |
|
|
|
finally { |
|
|
|
finally { |
|
|
|
@ -393,7 +391,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE |
|
|
|
MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class); |
|
|
|
MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class); |
|
|
|
if (accessor == null) { |
|
|
|
if (accessor == null) { |
|
|
|
// Shouldn't happen (only broker broadcasts directly to clients)
|
|
|
|
// Shouldn't happen (only broker broadcasts directly to clients)
|
|
|
|
throw new IllegalStateException("No header accessor in " + message + "."); |
|
|
|
throw new IllegalStateException("No header accessor in " + message); |
|
|
|
} |
|
|
|
} |
|
|
|
StompHeaderAccessor stompAccessor; |
|
|
|
StompHeaderAccessor stompAccessor; |
|
|
|
if (accessor instanceof StompHeaderAccessor) { |
|
|
|
if (accessor instanceof StompHeaderAccessor) { |
|
|
|
@ -415,7 +413,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE |
|
|
|
else { |
|
|
|
else { |
|
|
|
// Shouldn't happen (only broker broadcasts directly to clients)
|
|
|
|
// Shouldn't happen (only broker broadcasts directly to clients)
|
|
|
|
throw new IllegalStateException( |
|
|
|
throw new IllegalStateException( |
|
|
|
"Unexpected header accessor type: " + accessor.getClass() + " in " + message + "."); |
|
|
|
"Unexpected header accessor type: " + accessor.getClass() + " in " + message); |
|
|
|
} |
|
|
|
} |
|
|
|
return stompAccessor; |
|
|
|
return stompAccessor; |
|
|
|
} |
|
|
|
} |
|
|
|
@ -465,6 +463,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE |
|
|
|
this.userSessionRegistry.registerSessionId(userName, session.getId()); |
|
|
|
this.userSessionRegistry.registerSessionId(userName, session.getId()); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
long[] heartbeat = accessor.getHeartbeat(); |
|
|
|
long[] heartbeat = accessor.getHeartbeat(); |
|
|
|
if (heartbeat[1] > 0) { |
|
|
|
if (heartbeat[1] > 0) { |
|
|
|
session = WebSocketSessionDecorator.unwrap(session); |
|
|
|
session = WebSocketSessionDecorator.unwrap(session); |
|
|
|
@ -472,6 +471,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE |
|
|
|
((SockJsSession) session).disableHeartbeat(); |
|
|
|
((SockJsSession) session).disableHeartbeat(); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
return accessor; |
|
|
|
return accessor; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@ -499,11 +499,13 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE |
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public void afterSessionEnded(WebSocketSession session, CloseStatus closeStatus, MessageChannel outputChannel) { |
|
|
|
public void afterSessionEnded(WebSocketSession session, CloseStatus closeStatus, MessageChannel outputChannel) { |
|
|
|
this.decoders.remove(session.getId()); |
|
|
|
this.decoders.remove(session.getId()); |
|
|
|
|
|
|
|
|
|
|
|
Principal principal = session.getPrincipal(); |
|
|
|
Principal principal = session.getPrincipal(); |
|
|
|
if (principal != null && this.userSessionRegistry != null) { |
|
|
|
if (principal != null && this.userSessionRegistry != null) { |
|
|
|
String userName = getSessionRegistryUserName(principal); |
|
|
|
String userName = getSessionRegistryUserName(principal); |
|
|
|
this.userSessionRegistry.unregisterSessionId(userName, session.getId()); |
|
|
|
this.userSessionRegistry.unregisterSessionId(userName, session.getId()); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
Message<byte[]> message = createDisconnectMessage(session); |
|
|
|
Message<byte[]> message = createDisconnectMessage(session); |
|
|
|
SimpAttributes simpAttributes = SimpAttributes.fromMessage(message); |
|
|
|
SimpAttributes simpAttributes = SimpAttributes.fromMessage(message); |
|
|
|
try { |
|
|
|
try { |
|
|
|
@ -535,7 +537,8 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE |
|
|
|
return "StompSubProtocolHandler" + getSupportedProtocols(); |
|
|
|
return "StompSubProtocolHandler" + getSupportedProtocols(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private class Stats { |
|
|
|
|
|
|
|
|
|
|
|
private static class Stats { |
|
|
|
|
|
|
|
|
|
|
|
private final AtomicInteger connect = new AtomicInteger(); |
|
|
|
private final AtomicInteger connect = new AtomicInteger(); |
|
|
|
|
|
|
|
|
|
|
|
@ -543,7 +546,6 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE |
|
|
|
|
|
|
|
|
|
|
|
private final AtomicInteger disconnect = new AtomicInteger(); |
|
|
|
private final AtomicInteger disconnect = new AtomicInteger(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public void incrementConnectCount() { |
|
|
|
public void incrementConnectCount() { |
|
|
|
this.connect.incrementAndGet(); |
|
|
|
this.connect.incrementAndGet(); |
|
|
|
} |
|
|
|
} |
|
|
|
@ -556,7 +558,6 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE |
|
|
|
this.disconnect.incrementAndGet(); |
|
|
|
this.disconnect.incrementAndGet(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public String toString() { |
|
|
|
public String toString() { |
|
|
|
return "processed CONNECT(" + this.connect.get() + ")-CONNECTED(" + |
|
|
|
return "processed CONNECT(" + this.connect.get() + ")-CONNECTED(" + |
|
|
|
this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")"; |
|
|
|
this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")"; |
|
|
|
|