|
|
|
@ -1,5 +1,5 @@ |
|
|
|
/* |
|
|
|
/* |
|
|
|
* Copyright 2002-2015 the original author or authors. |
|
|
|
* Copyright 2002-2016 the original author or authors. |
|
|
|
* |
|
|
|
* |
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
|
|
* you may not use this file except in compliance with the License. |
|
|
|
* you may not use this file except in compliance with the License. |
|
|
|
@ -296,8 +296,10 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
catch (Throwable ex) { |
|
|
|
catch (Throwable ex) { |
|
|
|
logger.error("Failed to send client message to application via MessageChannel" + |
|
|
|
if (logger.isErrorEnabled()) { |
|
|
|
" in session " + session.getId() + ". Sending STOMP ERROR to client.", ex); |
|
|
|
logger.error("Failed to send client message to application via MessageChannel" + |
|
|
|
|
|
|
|
" in session " + session.getId() + ". Sending STOMP ERROR to client.", ex); |
|
|
|
|
|
|
|
} |
|
|
|
handleError(session, ex, message); |
|
|
|
handleError(session, ex, message); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
@ -316,7 +318,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); |
|
|
|
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); |
|
|
|
Assert.notNull(accessor, "Expected STOMP headers"); |
|
|
|
Assert.state(accessor != null, "Expected STOMP headers"); |
|
|
|
sendToClient(session, accessor, message.getPayload()); |
|
|
|
sendToClient(session, accessor, message.getPayload()); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@ -365,7 +367,9 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE |
|
|
|
this.eventPublisher.publishEvent(event); |
|
|
|
this.eventPublisher.publishEvent(event); |
|
|
|
} |
|
|
|
} |
|
|
|
catch (Throwable ex) { |
|
|
|
catch (Throwable ex) { |
|
|
|
logger.error("Error publishing " + event, ex); |
|
|
|
if (logger.isErrorEnabled()) { |
|
|
|
|
|
|
|
logger.error("Error publishing " + event, ex); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@ -376,27 +380,29 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE |
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
public void handleMessageToClient(WebSocketSession session, Message<?> message) { |
|
|
|
public void handleMessageToClient(WebSocketSession session, Message<?> message) { |
|
|
|
if (!(message.getPayload() instanceof byte[])) { |
|
|
|
if (!(message.getPayload() instanceof byte[])) { |
|
|
|
logger.error("Expected byte[] payload. Ignoring " + message + "."); |
|
|
|
if (logger.isErrorEnabled()) { |
|
|
|
|
|
|
|
logger.error("Expected byte[] payload. Ignoring " + message + "."); |
|
|
|
|
|
|
|
} |
|
|
|
return; |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
StompHeaderAccessor stompAccessor = getStompHeaderAccessor(message); |
|
|
|
StompHeaderAccessor accessor = getStompHeaderAccessor(message); |
|
|
|
StompCommand command = stompAccessor.getCommand(); |
|
|
|
StompCommand command = accessor.getCommand(); |
|
|
|
|
|
|
|
|
|
|
|
if (StompCommand.MESSAGE.equals(command)) { |
|
|
|
if (StompCommand.MESSAGE.equals(command)) { |
|
|
|
if (stompAccessor.getSubscriptionId() == null) { |
|
|
|
if (accessor.getSubscriptionId() == null && logger.isWarnEnabled()) { |
|
|
|
logger.warn("No STOMP \"subscription\" header in " + message); |
|
|
|
logger.warn("No STOMP \"subscription\" header in " + message); |
|
|
|
} |
|
|
|
} |
|
|
|
String origDestination = stompAccessor.getFirstNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION); |
|
|
|
String origDestination = accessor.getFirstNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION); |
|
|
|
if (origDestination != null) { |
|
|
|
if (origDestination != null) { |
|
|
|
stompAccessor = toMutableAccessor(stompAccessor, message); |
|
|
|
accessor = toMutableAccessor(accessor, message); |
|
|
|
stompAccessor.removeNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION); |
|
|
|
accessor.removeNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION); |
|
|
|
stompAccessor.setDestination(origDestination); |
|
|
|
accessor.setDestination(origDestination); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
else if (StompCommand.CONNECTED.equals(command)) { |
|
|
|
else if (StompCommand.CONNECTED.equals(command)) { |
|
|
|
this.stats.incrementConnectedCount(); |
|
|
|
this.stats.incrementConnectedCount(); |
|
|
|
stompAccessor = afterStompSessionConnected(message, stompAccessor, session); |
|
|
|
accessor = afterStompSessionConnected(message, accessor, session); |
|
|
|
if (this.eventPublisher != null && StompCommand.CONNECTED.equals(command)) { |
|
|
|
if (this.eventPublisher != null && StompCommand.CONNECTED.equals(command)) { |
|
|
|
try { |
|
|
|
try { |
|
|
|
SimpAttributes simpAttributes = new SimpAttributes(session.getId(), session.getAttributes()); |
|
|
|
SimpAttributes simpAttributes = new SimpAttributes(session.getId(), session.getAttributes()); |
|
|
|
@ -411,25 +417,21 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
byte[] payload = (byte[]) message.getPayload(); |
|
|
|
byte[] payload = (byte[]) message.getPayload(); |
|
|
|
|
|
|
|
|
|
|
|
if (StompCommand.ERROR.equals(command) && getErrorHandler() != null) { |
|
|
|
if (StompCommand.ERROR.equals(command) && getErrorHandler() != null) { |
|
|
|
Message<byte[]> errorMessage = getErrorHandler().handleErrorMessageToClient((Message<byte[]>) message); |
|
|
|
Message<byte[]> errorMessage = getErrorHandler().handleErrorMessageToClient((Message<byte[]>) message); |
|
|
|
stompAccessor = MessageHeaderAccessor.getAccessor(errorMessage, StompHeaderAccessor.class); |
|
|
|
accessor = MessageHeaderAccessor.getAccessor(errorMessage, StompHeaderAccessor.class); |
|
|
|
Assert.notNull(stompAccessor, "Expected STOMP headers"); |
|
|
|
Assert.state(accessor != null, "Expected STOMP headers"); |
|
|
|
payload = errorMessage.getPayload(); |
|
|
|
payload = errorMessage.getPayload(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
sendToClient(session, accessor, payload); |
|
|
|
sendToClient(session, stompAccessor, payload); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private void sendToClient(WebSocketSession session, StompHeaderAccessor stompAccessor, byte[] payload) { |
|
|
|
private void sendToClient(WebSocketSession session, StompHeaderAccessor stompAccessor, byte[] payload) { |
|
|
|
StompCommand command = stompAccessor.getCommand(); |
|
|
|
StompCommand command = stompAccessor.getCommand(); |
|
|
|
try { |
|
|
|
try { |
|
|
|
byte[] bytes = this.stompEncoder.encode(stompAccessor.getMessageHeaders(), payload); |
|
|
|
byte[] bytes = this.stompEncoder.encode(stompAccessor.getMessageHeaders(), payload); |
|
|
|
|
|
|
|
|
|
|
|
boolean useBinary = (payload.length > 0 && !(session instanceof SockJsSession) && |
|
|
|
boolean useBinary = (payload.length > 0 && !(session instanceof SockJsSession) && |
|
|
|
MimeTypeUtils.APPLICATION_OCTET_STREAM.isCompatibleWith(stompAccessor.getContentType())); |
|
|
|
MimeTypeUtils.APPLICATION_OCTET_STREAM.isCompatibleWith(stompAccessor.getContentType())); |
|
|
|
|
|
|
|
|
|
|
|
if (useBinary) { |
|
|
|
if (useBinary) { |
|
|
|
session.sendMessage(new BinaryMessage(bytes)); |
|
|
|
session.sendMessage(new BinaryMessage(bytes)); |
|
|
|
} |
|
|
|
} |
|
|
|
@ -443,7 +445,9 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE |
|
|
|
} |
|
|
|
} |
|
|
|
catch (Throwable ex) { |
|
|
|
catch (Throwable ex) { |
|
|
|
// Could be part of normal workflow (e.g. browser tab closed)
|
|
|
|
// Could be part of normal workflow (e.g. browser tab closed)
|
|
|
|
logger.debug("Failed to send WebSocket message to client in session " + session.getId(), ex); |
|
|
|
if (logger.isDebugEnabled()) { |
|
|
|
|
|
|
|
logger.debug("Failed to send WebSocket message to client in session " + session.getId(), ex); |
|
|
|
|
|
|
|
} |
|
|
|
command = StompCommand.ERROR; |
|
|
|
command = StompCommand.ERROR; |
|
|
|
} |
|
|
|
} |
|
|
|
finally { |
|
|
|
finally { |
|
|
|
@ -500,9 +504,13 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE |
|
|
|
private StompHeaderAccessor convertConnectAcktoStompConnected(StompHeaderAccessor connectAckHeaders) { |
|
|
|
private StompHeaderAccessor convertConnectAcktoStompConnected(StompHeaderAccessor connectAckHeaders) { |
|
|
|
String name = StompHeaderAccessor.CONNECT_MESSAGE_HEADER; |
|
|
|
String name = StompHeaderAccessor.CONNECT_MESSAGE_HEADER; |
|
|
|
Message<?> message = (Message<?>) connectAckHeaders.getHeader(name); |
|
|
|
Message<?> message = (Message<?>) connectAckHeaders.getHeader(name); |
|
|
|
Assert.notNull(message, "Original STOMP CONNECT not found in " + connectAckHeaders); |
|
|
|
if (message == null) { |
|
|
|
|
|
|
|
throw new IllegalStateException("Original STOMP CONNECT not found in " + connectAckHeaders); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
StompHeaderAccessor connectHeaders = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); |
|
|
|
StompHeaderAccessor connectHeaders = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); |
|
|
|
StompHeaderAccessor connectedHeaders = StompHeaderAccessor.create(StompCommand.CONNECTED); |
|
|
|
StompHeaderAccessor connectedHeaders = StompHeaderAccessor.create(StompCommand.CONNECTED); |
|
|
|
|
|
|
|
|
|
|
|
Set<String> acceptVersions = connectHeaders.getAcceptVersion(); |
|
|
|
Set<String> acceptVersions = connectHeaders.getAcceptVersion(); |
|
|
|
if (acceptVersions.contains("1.2")) { |
|
|
|
if (acceptVersions.contains("1.2")) { |
|
|
|
connectedHeaders.setVersion("1.2"); |
|
|
|
connectedHeaders.setVersion("1.2"); |
|
|
|
@ -513,6 +521,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE |
|
|
|
else if (!acceptVersions.isEmpty()) { |
|
|
|
else if (!acceptVersions.isEmpty()) { |
|
|
|
throw new IllegalArgumentException("Unsupported STOMP version '" + acceptVersions + "'"); |
|
|
|
throw new IllegalArgumentException("Unsupported STOMP version '" + acceptVersions + "'"); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
long[] heartbeat = (long[]) connectAckHeaders.getHeader(SimpMessageHeaderAccessor.HEART_BEAT_HEADER); |
|
|
|
long[] heartbeat = (long[]) connectAckHeaders.getHeader(SimpMessageHeaderAccessor.HEART_BEAT_HEADER); |
|
|
|
if (heartbeat != null) { |
|
|
|
if (heartbeat != null) { |
|
|
|
connectedHeaders.setHeartbeat(heartbeat[0], heartbeat[1]); |
|
|
|
connectedHeaders.setHeartbeat(heartbeat[0], heartbeat[1]); |
|
|
|
@ -520,6 +529,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE |
|
|
|
else { |
|
|
|
else { |
|
|
|
connectedHeaders.setHeartbeat(0, 0); |
|
|
|
connectedHeaders.setHeartbeat(0, 0); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
return connectedHeaders; |
|
|
|
return connectedHeaders; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|