Browse Source

Polishing

pull/1261/merge
Juergen Hoeller 9 years ago
parent
commit
f095aa20eb
  1. 159
      spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java
  2. 3
      spring-websocket/src/main/java/org/springframework/web/socket/adapter/AbstractWebSocketSession.java
  3. 108
      spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/AbstractClientSockJsSession.java

159
spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java

@ -55,21 +55,21 @@ import org.springframework.util.concurrent.ListenableFutureTask;
* connection to the broker is opened and used exclusively for all messages from the * connection to the broker is opened and used exclusively for all messages from the
* client that originated the CONNECT message. Messages from the same client are * client that originated the CONNECT message. Messages from the same client are
* identified through the session id message header. Reversely, when the STOMP broker * identified through the session id message header. Reversely, when the STOMP broker
* sends messages back on the TCP connection, those messages are enriched with the session * sends messages back on the TCP connection, those messages are enriched with the
* id of the client and sent back downstream through the {@link MessageChannel} provided * session id of the client and sent back downstream through the {@link MessageChannel}
* to the constructor. * provided to the constructor.
* *
* <p>This class also automatically opens a default "system" TCP connection to the message * <p>This class also automatically opens a default "system" TCP connection to the
* broker that is used for sending messages that originate from the server application (as * message broker that is used for sending messages that originate from the server
* opposed to from a client). Such messages are not associated with any client and * application (as opposed to from a client). Such messages are not associated with
* therefore do not have a session id header. The "system" connection is effectively * any client and therefore do not have a session id header. The "system" connection
* shared and cannot be used to receive messages. Several properties are provided to * is effectively shared and cannot be used to receive messages. Several properties
* configure the "system" connection including: * are provided to configure the "system" connection including:
* <ul> * <ul>
* <li>{@link #setSystemLogin(String)}</li> * <li>{@link #setSystemLogin}</li>
* <li>{@link #setSystemPasscode(String)}</li> * <li>{@link #setSystemPasscode}</li>
* <li>{@link #setSystemHeartbeatSendInterval(long)}</li> * <li>{@link #setSystemHeartbeatSendInterval}</li>
* <li>{@link #setSystemHeartbeatReceiveInterval(long)}</li> * <li>{@link #setSystemHeartbeatReceiveInterval}</li>
* </ul> * </ul>
* *
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
@ -80,22 +80,20 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
public static final String SYSTEM_SESSION_ID = "_system_"; public static final String SYSTEM_SESSION_ID = "_system_";
private static final byte[] EMPTY_PAYLOAD = new byte[0];
private static final ListenableFutureTask<Void> EMPTY_TASK = new ListenableFutureTask<>(new VoidCallable());
// STOMP recommends error of margin for receiving heartbeats // STOMP recommends error of margin for receiving heartbeats
private static final long HEARTBEAT_MULTIPLIER = 3; private static final long HEARTBEAT_MULTIPLIER = 3;
private static final Message<byte[]> HEARTBEAT_MESSAGE;
/** /**
* A heartbeat is setup once a CONNECTED frame is received which contains * A heartbeat is setup once a CONNECTED frame is received which contains the heartbeat settings
* the heartbeat settings we need. If we don't receive CONNECTED within * we need. If we don't receive CONNECTED within a minute, the connection is closed proactively.
* a minute, the connection is closed proactively.
*/ */
private static final int MAX_TIME_TO_CONNECTED_FRAME = 60 * 1000; private static final int MAX_TIME_TO_CONNECTED_FRAME = 60 * 1000;
private static final byte[] EMPTY_PAYLOAD = new byte[0];
private static final ListenableFutureTask<Void> EMPTY_TASK = new ListenableFutureTask<>(new VoidCallable());
private static final Message<byte[]> HEARTBEAT_MESSAGE;
static { static {
@ -121,19 +119,18 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
private long systemHeartbeatReceiveInterval = 10000; private long systemHeartbeatReceiveInterval = 10000;
private String virtualHost;
private final Map<String, MessageHandler> systemSubscriptions = new HashMap<>(4); private final Map<String, MessageHandler> systemSubscriptions = new HashMap<>(4);
private String virtualHost;
private TcpOperations<byte[]> tcpClient; private TcpOperations<byte[]> tcpClient;
private MessageHeaderInitializer headerInitializer; private MessageHeaderInitializer headerInitializer;
private final Map<String, StompConnectionHandler> connectionHandlers =
new ConcurrentHashMap<>();
private final Stats stats = new Stats(); private final Stats stats = new Stats();
private final Map<String, StompConnectionHandler> connectionHandlers = new ConcurrentHashMap<>();
/** /**
* Create a StompBrokerRelayMessageHandler instance with the given message channels * Create a StompBrokerRelayMessageHandler instance with the given message channels
@ -179,46 +176,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
public int getRelayPort() { public int getRelayPort() {
return this.relayPort; return this.relayPort;
} }
/**
* Set the interval, in milliseconds, at which the "system" connection will, in the
* absence of any other data being sent, send a heartbeat to the STOMP broker. A value
* of zero will prevent heartbeats from being sent to the broker.
* <p>The default value is 10000.
* <p>See class-level documentation for more information on the "system" connection.
*/
public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) {
this.systemHeartbeatSendInterval = systemHeartbeatSendInterval;
}
/**
* Return the interval, in milliseconds, at which the "system" connection will
* send heartbeats to the STOMP broker.
*/
public long getSystemHeartbeatSendInterval() {
return this.systemHeartbeatSendInterval;
}
/**
* Set the maximum interval, in milliseconds, at which the "system" connection
* expects, in the absence of any other data, to receive a heartbeat from the STOMP
* broker. A value of zero will configure the connection to expect not to receive
* heartbeats from the broker.
* <p>The default value is 10000.
* <p>See class-level documentation for more information on the "system" connection.
*/
public void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) {
this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval;
}
/**
* Return the interval, in milliseconds, at which the "system" connection expects
* to receive heartbeats from the STOMP broker.
*/
public long getSystemHeartbeatReceiveInterval() {
return this.systemHeartbeatReceiveInterval;
}
/** /**
* Set the login to use when creating connections to the STOMP broker on * Set the login to use when creating connections to the STOMP broker on
* behalf of connected clients. * behalf of connected clients.
@ -294,6 +251,46 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
return this.systemPasscode; return this.systemPasscode;
} }
/**
* Set the interval, in milliseconds, at which the "system" connection will, in the
* absence of any other data being sent, send a heartbeat to the STOMP broker. A value
* of zero will prevent heartbeats from being sent to the broker.
* <p>The default value is 10000.
* <p>See class-level documentation for more information on the "system" connection.
*/
public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) {
this.systemHeartbeatSendInterval = systemHeartbeatSendInterval;
}
/**
* Return the interval, in milliseconds, at which the "system" connection will
* send heartbeats to the STOMP broker.
*/
public long getSystemHeartbeatSendInterval() {
return this.systemHeartbeatSendInterval;
}
/**
* Set the maximum interval, in milliseconds, at which the "system" connection
* expects, in the absence of any other data, to receive a heartbeat from the STOMP
* broker. A value of zero will configure the connection to expect not to receive
* heartbeats from the broker.
* <p>The default value is 10000.
* <p>See class-level documentation for more information on the "system" connection.
*/
public void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) {
this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval;
}
/**
* Return the interval, in milliseconds, at which the "system" connection expects
* to receive heartbeats from the STOMP broker.
*/
public long getSystemHeartbeatReceiveInterval() {
return this.systemHeartbeatReceiveInterval;
}
/** /**
* Configure one more destinations to subscribe to on the shared "system" * Configure one more destinations to subscribe to on the shared "system"
* connection along with MessageHandler's to handle received messages. * connection along with MessageHandler's to handle received messages.
@ -336,28 +333,21 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
/** /**
* Configure a TCP client for managing TCP connections to the STOMP broker. * Configure a TCP client for managing TCP connections to the STOMP broker.
* By default {@link ReactorNettyTcpClient} is used. * <p>By default {@link ReactorNettyTcpClient} is used.
*/ */
public void setTcpClient(TcpOperations<byte[]> tcpClient) { public void setTcpClient(TcpOperations<byte[]> tcpClient) {
this.tcpClient = tcpClient; this.tcpClient = tcpClient;
} }
/** /**
* Get the configured TCP client. Never {@code null} unless not configured * Get the configured TCP client (never {@code null} unless not configured
* invoked and this method is invoked before the handler is started and * invoked and this method is invoked before the handler is started and
* hence a default implementation initialized. * hence a default implementation initialized).
*/ */
public TcpOperations<byte[]> getTcpClient() { public TcpOperations<byte[]> getTcpClient() {
return this.tcpClient; return this.tcpClient;
} }
/**
* Return the current count of TCP connection to the broker.
*/
public int getConnectionCount() {
return this.connectionHandlers.size();
}
/** /**
* Configure a {@link MessageHeaderInitializer} to apply to the headers of all * Configure a {@link MessageHeaderInitializer} to apply to the headers of all
* messages created through the {@code StompBrokerRelayMessageHandler} that * messages created through the {@code StompBrokerRelayMessageHandler} that
@ -382,6 +372,13 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
return this.stats.toString(); return this.stats.toString();
} }
/**
* Return the current count of TCP connection to the broker.
*/
public int getConnectionCount() {
return this.connectionHandlers.size();
}
@Override @Override
protected void startInternal() { protected void startInternal() {
@ -872,6 +869,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
} }
} }
private class SystemStompConnectionHandler extends StompConnectionHandler { private class SystemStompConnectionHandler extends StompConnectionHandler {
public SystemStompConnectionHandler(StompHeaderAccessor connectHeaders) { public SystemStompConnectionHandler(StompHeaderAccessor connectHeaders) {
@ -971,10 +969,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
} }
} }
private static class VoidCallable implements Callable<Void> { private static class VoidCallable implements Callable<Void> {
@Override @Override
public Void call() throws Exception { public Void call() {
return null; return null;
} }
} }
@ -1001,10 +1000,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
} }
public String toString() { public String toString() {
return connectionHandlers.size() + " sessions, " + relayHost + ":" + relayPort + return (connectionHandlers.size() + " sessions, " + relayHost + ":" + relayPort +
(isBrokerAvailable() ? " (available)" : " (not available)") + (isBrokerAvailable() ? " (available)" : " (not available)") +
", processed CONNECT(" + this.connect.get() + ")-CONNECTED(" + ", processed CONNECT(" + this.connect.get() + ")-CONNECTED(" +
this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")"; this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")");
} }
} }

3
spring-websocket/src/main/java/org/springframework/web/socket/adapter/AbstractWebSocketSession.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2016 the original author or authors. * Copyright 2002-2017 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -91,7 +91,6 @@ public abstract class AbstractWebSocketSession<T> implements NativeWebSocketSess
@Override @Override
public final void sendMessage(WebSocketMessage<?> message) throws IOException { public final void sendMessage(WebSocketMessage<?> message) throws IOException {
checkNativeSessionInitialized(); checkNativeSessionInitialized();
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {

108
spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/AbstractClientSockJsSession.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2016 the original author or authors. * Copyright 2002-2017 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -34,7 +34,6 @@ import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketMessage; import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.sockjs.frame.SockJsFrame; import org.springframework.web.socket.sockjs.frame.SockJsFrame;
import org.springframework.web.socket.sockjs.frame.SockJsFrameType;
import org.springframework.web.socket.sockjs.frame.SockJsMessageCodec; import org.springframework.web.socket.sockjs.frame.SockJsMessageCodec;
/** /**
@ -44,20 +43,19 @@ import org.springframework.web.socket.sockjs.frame.SockJsMessageCodec;
* Sub-classes implement actual send as well as disconnect logic. * Sub-classes implement actual send as well as disconnect logic.
* *
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @author Juergen Hoeller
* @since 4.1 * @since 4.1
*/ */
public abstract class AbstractClientSockJsSession implements WebSocketSession { public abstract class AbstractClientSockJsSession implements WebSocketSession {
protected final Log logger = LogFactory.getLog(getClass()); protected final Log logger = LogFactory.getLog(getClass());
private final TransportRequest request; private final TransportRequest request;
private final WebSocketHandler webSocketHandler; private final WebSocketHandler webSocketHandler;
private final SettableListenableFuture<WebSocketSession> connectFuture; private final SettableListenableFuture<WebSocketSession> connectFuture;
private final Map<String, Object> attributes = new ConcurrentHashMap<>(); private final Map<String, Object> attributes = new ConcurrentHashMap<>();
private volatile State state = State.NEW; private volatile State state = State.NEW;
@ -127,25 +125,31 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
@Override @Override
public boolean isOpen() { public boolean isOpen() {
return State.OPEN.equals(this.state); return (this.state == State.OPEN);
} }
public boolean isDisconnected() { public boolean isDisconnected() {
return (State.CLOSING.equals(this.state) || State.CLOSED.equals(this.state)); return (this.state == State.CLOSING || this.state == State.CLOSED);
} }
@Override @Override
public final void sendMessage(WebSocketMessage<?> message) throws IOException { public final void sendMessage(WebSocketMessage<?> message) throws IOException {
Assert.state(State.OPEN.equals(this.state), this + " is not open, current state=" + this.state); if (!(message instanceof TextMessage)) {
Assert.isInstanceOf(TextMessage.class, message, this + " supports text messages only."); throw new IllegalArgumentException(this + " supports text messages only.");
}
if (this.state != State.OPEN) {
throw new IllegalStateException(this + " is not open: current state " + this.state);
}
String payload = ((TextMessage) message).getPayload(); String payload = ((TextMessage) message).getPayload();
payload = getMessageCodec().encode(new String[] { payload }); payload = getMessageCodec().encode(payload);
payload = payload.substring(1); // the client-side doesn't need message framing (letter "a") payload = payload.substring(1); // the client-side doesn't need message framing (letter "a")
message = new TextMessage(payload);
TextMessage messageToSend = new TextMessage(payload);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Sending message " + message + " in " + this); logger.trace("Sending message " + messageToSend + " in " + this);
} }
sendInternal((TextMessage) message); sendInternal(messageToSend);
} }
protected abstract void sendInternal(TextMessage textMessage) throws IOException; protected abstract void sendInternal(TextMessage textMessage) throws IOException;
@ -173,10 +177,13 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
logger.warn("Ignoring close since connect() was never invoked"); logger.warn("Ignoring close since connect() was never invoked");
return; return;
} }
if (State.CLOSING.equals(this.state) || State.CLOSED.equals(this.state)) { if (isDisconnected()) {
logger.debug("Ignoring close (already closing or closed), current state=" + this.state); if (logger.isDebugEnabled()) {
logger.debug("Ignoring close (already closing or closed): current state " + this.state);
}
return; return;
} }
this.state = State.CLOSING; this.state = State.CLOSING;
this.closeStatus = status; this.closeStatus = status;
try { try {
@ -193,23 +200,20 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
public void handleFrame(String payload) { public void handleFrame(String payload) {
SockJsFrame frame = new SockJsFrame(payload); SockJsFrame frame = new SockJsFrame(payload);
if (SockJsFrameType.OPEN.equals(frame.getType())) { switch (frame.getType()) {
handleOpenFrame(); case OPEN:
} handleOpenFrame();
else if (SockJsFrameType.MESSAGE.equals(frame.getType())) { break;
handleMessageFrame(frame); case HEARTBEAT:
} if (logger.isTraceEnabled()) {
else if (SockJsFrameType.CLOSE.equals(frame.getType())) { logger.trace("Received heartbeat in " + this);
handleCloseFrame(frame); }
} break;
else if (SockJsFrameType.HEARTBEAT.equals(frame.getType())) { case MESSAGE:
if (logger.isTraceEnabled()) { handleMessageFrame(frame);
logger.trace("Received heartbeat in " + this); break;
} case CLOSE:
} handleCloseFrame(frame);
else {
// should never happen
throw new IllegalStateException("Unknown SockJS frame type " + frame + " in " + this);
} }
} }
@ -217,7 +221,7 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Processing SockJS open frame in " + this); logger.debug("Processing SockJS open frame in " + this);
} }
if (State.NEW.equals(state)) { if (this.state == State.NEW) {
this.state = State.OPEN; this.state = State.OPEN;
try { try {
this.webSocketHandler.afterConnectionEstablished(this); this.webSocketHandler.afterConnectionEstablished(this);
@ -225,16 +229,14 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
} }
catch (Throwable ex) { catch (Throwable ex) {
if (logger.isErrorEnabled()) { if (logger.isErrorEnabled()) {
Class<?> type = this.webSocketHandler.getClass(); logger.error("WebSocketHandler.afterConnectionEstablished threw exception in " + this, ex);
logger.error(type + ".afterConnectionEstablished threw exception in " + this, ex);
} }
} }
} }
else { else {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Open frame received in " + getId() + " but we're not" + logger.debug("Open frame received in " + getId() + " but we're not connecting (current state " +
"connecting (current state=" + this.state + "). The server might " + this.state + "). The server might have been restarted and lost track of the session.");
"have been restarted and lost track of the session.");
} }
closeInternal(new CloseStatus(1006, "Server lost session")); closeInternal(new CloseStatus(1006, "Server lost session"));
} }
@ -243,10 +245,11 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
private void handleMessageFrame(SockJsFrame frame) { private void handleMessageFrame(SockJsFrame frame) {
if (!isOpen()) { if (!isOpen()) {
if (logger.isErrorEnabled()) { if (logger.isErrorEnabled()) {
logger.error("Ignoring received message due to state=" + this.state + " in " + this); logger.error("Ignoring received message due to state " + this.state + " in " + this);
} }
return; return;
} }
String[] messages; String[] messages;
try { try {
messages = getMessageCodec().decode(frame.getFrameData()); messages = getMessageCodec().decode(frame.getFrameData());
@ -258,18 +261,18 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
closeInternal(CloseStatus.BAD_DATA); closeInternal(CloseStatus.BAD_DATA);
return; return;
} }
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Processing SockJS message frame " + frame.getContent() + " in " + this); logger.trace("Processing SockJS message frame " + frame.getContent() + " in " + this);
} }
for (String message : messages) { for (String message : messages) {
try { if (isOpen()) {
if (isOpen()) { try {
this.webSocketHandler.handleMessage(this, new TextMessage(message)); this.webSocketHandler.handleMessage(this, new TextMessage(message));
} }
} catch (Throwable ex) {
catch (Throwable ex) { logger.error("WebSocketHandler.handleMessage threw an exception on " + frame + " in " + this, ex);
Class<?> type = this.webSocketHandler.getClass(); }
logger.error(type + ".handleMessage threw an exception on " + frame + " in " + this, ex);
} }
} }
} }
@ -300,18 +303,14 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
} }
this.webSocketHandler.handleTransportError(this, error); this.webSocketHandler.handleTransportError(this, error);
} }
catch (Exception ex) { catch (Throwable ex) {
Class<?> type = this.webSocketHandler.getClass(); logger.error("WebSocketHandler.handleTransportError threw an exception", ex);
if (logger.isErrorEnabled()) {
logger.error(type + ".handleTransportError threw an exception", ex);
}
} }
} }
public void afterTransportClosed(CloseStatus closeStatus) { public void afterTransportClosed(CloseStatus closeStatus) {
this.closeStatus = (this.closeStatus != null ? this.closeStatus : closeStatus); this.closeStatus = (this.closeStatus != null ? this.closeStatus : closeStatus);
Assert.state(this.closeStatus != null, "CloseStatus not available"); Assert.state(this.closeStatus != null, "CloseStatus not available");
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Transport closed with " + this.closeStatus + " in " + this); logger.debug("Transport closed with " + this.closeStatus + " in " + this);
} }
@ -320,11 +319,8 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
try { try {
this.webSocketHandler.afterConnectionClosed(this, this.closeStatus); this.webSocketHandler.afterConnectionClosed(this, this.closeStatus);
} }
catch (Exception ex) { catch (Throwable ex) {
if (logger.isErrorEnabled()) { logger.error("WebSocketHandler.afterConnectionClosed threw an exception", ex);
Class<?> type = this.webSocketHandler.getClass();
logger.error(type + ".afterConnectionClosed threw an exception", ex);
}
} }
} }

Loading…
Cancel
Save