connectionHandlers = new ConcurrentHashMap<>();
+
/**
* Create a StompBrokerRelayMessageHandler instance with the given message channels
@@ -179,46 +176,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
public int getRelayPort() {
return this.relayPort;
}
-
- /**
- * Set the interval, in milliseconds, at which the "system" connection will, in the
- * absence of any other data being sent, send a heartbeat to the STOMP broker. A value
- * of zero will prevent heartbeats from being sent to the broker.
- * The default value is 10000.
- *
See class-level documentation for more information on the "system" connection.
- */
- public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) {
- this.systemHeartbeatSendInterval = systemHeartbeatSendInterval;
- }
-
- /**
- * Return the interval, in milliseconds, at which the "system" connection will
- * send heartbeats to the STOMP broker.
- */
- public long getSystemHeartbeatSendInterval() {
- return this.systemHeartbeatSendInterval;
- }
-
- /**
- * Set the maximum interval, in milliseconds, at which the "system" connection
- * expects, in the absence of any other data, to receive a heartbeat from the STOMP
- * broker. A value of zero will configure the connection to expect not to receive
- * heartbeats from the broker.
- *
The default value is 10000.
- *
See class-level documentation for more information on the "system" connection.
- */
- public void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) {
- this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval;
- }
-
- /**
- * Return the interval, in milliseconds, at which the "system" connection expects
- * to receive heartbeats from the STOMP broker.
- */
- public long getSystemHeartbeatReceiveInterval() {
- return this.systemHeartbeatReceiveInterval;
- }
-
/**
* Set the login to use when creating connections to the STOMP broker on
* behalf of connected clients.
@@ -294,6 +251,46 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
return this.systemPasscode;
}
+
+ /**
+ * Set the interval, in milliseconds, at which the "system" connection will, in the
+ * absence of any other data being sent, send a heartbeat to the STOMP broker. A value
+ * of zero will prevent heartbeats from being sent to the broker.
+ *
The default value is 10000.
+ *
See class-level documentation for more information on the "system" connection.
+ */
+ public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) {
+ this.systemHeartbeatSendInterval = systemHeartbeatSendInterval;
+ }
+
+ /**
+ * Return the interval, in milliseconds, at which the "system" connection will
+ * send heartbeats to the STOMP broker.
+ */
+ public long getSystemHeartbeatSendInterval() {
+ return this.systemHeartbeatSendInterval;
+ }
+
+ /**
+ * Set the maximum interval, in milliseconds, at which the "system" connection
+ * expects, in the absence of any other data, to receive a heartbeat from the STOMP
+ * broker. A value of zero will configure the connection to expect not to receive
+ * heartbeats from the broker.
+ *
The default value is 10000.
+ *
See class-level documentation for more information on the "system" connection.
+ */
+ public void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) {
+ this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval;
+ }
+
+ /**
+ * Return the interval, in milliseconds, at which the "system" connection expects
+ * to receive heartbeats from the STOMP broker.
+ */
+ public long getSystemHeartbeatReceiveInterval() {
+ return this.systemHeartbeatReceiveInterval;
+ }
+
/**
* Configure one more destinations to subscribe to on the shared "system"
* connection along with MessageHandler's to handle received messages.
@@ -336,28 +333,21 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
/**
* Configure a TCP client for managing TCP connections to the STOMP broker.
- * By default {@link ReactorNettyTcpClient} is used.
+ *
By default {@link ReactorNettyTcpClient} is used.
*/
public void setTcpClient(TcpOperations tcpClient) {
this.tcpClient = tcpClient;
}
/**
- * Get the configured TCP client. Never {@code null} unless not configured
+ * Get the configured TCP client (never {@code null} unless not configured
* invoked and this method is invoked before the handler is started and
- * hence a default implementation initialized.
+ * hence a default implementation initialized).
*/
public TcpOperations getTcpClient() {
return this.tcpClient;
}
- /**
- * Return the current count of TCP connection to the broker.
- */
- public int getConnectionCount() {
- return this.connectionHandlers.size();
- }
-
/**
* Configure a {@link MessageHeaderInitializer} to apply to the headers of all
* messages created through the {@code StompBrokerRelayMessageHandler} that
@@ -382,6 +372,13 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
return this.stats.toString();
}
+ /**
+ * Return the current count of TCP connection to the broker.
+ */
+ public int getConnectionCount() {
+ return this.connectionHandlers.size();
+ }
+
@Override
protected void startInternal() {
@@ -872,6 +869,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
}
+
private class SystemStompConnectionHandler extends StompConnectionHandler {
public SystemStompConnectionHandler(StompHeaderAccessor connectHeaders) {
@@ -971,10 +969,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
}
+
private static class VoidCallable implements Callable {
@Override
- public Void call() throws Exception {
+ public Void call() {
return null;
}
}
@@ -1001,10 +1000,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
public String toString() {
- return connectionHandlers.size() + " sessions, " + relayHost + ":" + relayPort +
+ return (connectionHandlers.size() + " sessions, " + relayHost + ":" + relayPort +
(isBrokerAvailable() ? " (available)" : " (not available)") +
", processed CONNECT(" + this.connect.get() + ")-CONNECTED(" +
- this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")";
+ this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")");
}
}
diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/adapter/AbstractWebSocketSession.java b/spring-websocket/src/main/java/org/springframework/web/socket/adapter/AbstractWebSocketSession.java
index a5e78b6ae01..753c48a8319 100644
--- a/spring-websocket/src/main/java/org/springframework/web/socket/adapter/AbstractWebSocketSession.java
+++ b/spring-websocket/src/main/java/org/springframework/web/socket/adapter/AbstractWebSocketSession.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2016 the original author or authors.
+ * Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -91,7 +91,6 @@ public abstract class AbstractWebSocketSession implements NativeWebSocketSess
@Override
public final void sendMessage(WebSocketMessage> message) throws IOException {
-
checkNativeSessionInitialized();
if (logger.isTraceEnabled()) {
diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/AbstractClientSockJsSession.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/AbstractClientSockJsSession.java
index 0c79969b5cb..227fe6f8aac 100644
--- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/AbstractClientSockJsSession.java
+++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/AbstractClientSockJsSession.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2016 the original author or authors.
+ * Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -34,7 +34,6 @@ import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.sockjs.frame.SockJsFrame;
-import org.springframework.web.socket.sockjs.frame.SockJsFrameType;
import org.springframework.web.socket.sockjs.frame.SockJsMessageCodec;
/**
@@ -44,20 +43,19 @@ import org.springframework.web.socket.sockjs.frame.SockJsMessageCodec;
* Sub-classes implement actual send as well as disconnect logic.
*
* @author Rossen Stoyanchev
+ * @author Juergen Hoeller
* @since 4.1
*/
public abstract class AbstractClientSockJsSession implements WebSocketSession {
protected final Log logger = LogFactory.getLog(getClass());
-
private final TransportRequest request;
private final WebSocketHandler webSocketHandler;
private final SettableListenableFuture connectFuture;
-
private final Map attributes = new ConcurrentHashMap<>();
private volatile State state = State.NEW;
@@ -127,25 +125,31 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
@Override
public boolean isOpen() {
- return State.OPEN.equals(this.state);
+ return (this.state == State.OPEN);
}
public boolean isDisconnected() {
- return (State.CLOSING.equals(this.state) || State.CLOSED.equals(this.state));
+ return (this.state == State.CLOSING || this.state == State.CLOSED);
}
@Override
public final void sendMessage(WebSocketMessage> message) throws IOException {
- Assert.state(State.OPEN.equals(this.state), this + " is not open, current state=" + this.state);
- Assert.isInstanceOf(TextMessage.class, message, this + " supports text messages only.");
+ if (!(message instanceof TextMessage)) {
+ throw new IllegalArgumentException(this + " supports text messages only.");
+ }
+ if (this.state != State.OPEN) {
+ throw new IllegalStateException(this + " is not open: current state " + this.state);
+ }
+
String payload = ((TextMessage) message).getPayload();
- payload = getMessageCodec().encode(new String[] { payload });
- payload = payload.substring(1); // the client-side doesn't need message framing (letter "a")
- message = new TextMessage(payload);
+ payload = getMessageCodec().encode(payload);
+ payload = payload.substring(1); // the client-side doesn't need message framing (letter "a")
+
+ TextMessage messageToSend = new TextMessage(payload);
if (logger.isTraceEnabled()) {
- logger.trace("Sending message " + message + " in " + this);
+ logger.trace("Sending message " + messageToSend + " in " + this);
}
- sendInternal((TextMessage) message);
+ sendInternal(messageToSend);
}
protected abstract void sendInternal(TextMessage textMessage) throws IOException;
@@ -173,10 +177,13 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
logger.warn("Ignoring close since connect() was never invoked");
return;
}
- if (State.CLOSING.equals(this.state) || State.CLOSED.equals(this.state)) {
- logger.debug("Ignoring close (already closing or closed), current state=" + this.state);
+ if (isDisconnected()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Ignoring close (already closing or closed): current state " + this.state);
+ }
return;
}
+
this.state = State.CLOSING;
this.closeStatus = status;
try {
@@ -193,23 +200,20 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
public void handleFrame(String payload) {
SockJsFrame frame = new SockJsFrame(payload);
- if (SockJsFrameType.OPEN.equals(frame.getType())) {
- handleOpenFrame();
- }
- else if (SockJsFrameType.MESSAGE.equals(frame.getType())) {
- handleMessageFrame(frame);
- }
- else if (SockJsFrameType.CLOSE.equals(frame.getType())) {
- handleCloseFrame(frame);
- }
- else if (SockJsFrameType.HEARTBEAT.equals(frame.getType())) {
- if (logger.isTraceEnabled()) {
- logger.trace("Received heartbeat in " + this);
- }
- }
- else {
- // should never happen
- throw new IllegalStateException("Unknown SockJS frame type " + frame + " in " + this);
+ switch (frame.getType()) {
+ case OPEN:
+ handleOpenFrame();
+ break;
+ case HEARTBEAT:
+ if (logger.isTraceEnabled()) {
+ logger.trace("Received heartbeat in " + this);
+ }
+ break;
+ case MESSAGE:
+ handleMessageFrame(frame);
+ break;
+ case CLOSE:
+ handleCloseFrame(frame);
}
}
@@ -217,7 +221,7 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
if (logger.isDebugEnabled()) {
logger.debug("Processing SockJS open frame in " + this);
}
- if (State.NEW.equals(state)) {
+ if (this.state == State.NEW) {
this.state = State.OPEN;
try {
this.webSocketHandler.afterConnectionEstablished(this);
@@ -225,16 +229,14 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
}
catch (Throwable ex) {
if (logger.isErrorEnabled()) {
- Class> type = this.webSocketHandler.getClass();
- logger.error(type + ".afterConnectionEstablished threw exception in " + this, ex);
+ logger.error("WebSocketHandler.afterConnectionEstablished threw exception in " + this, ex);
}
}
}
else {
if (logger.isDebugEnabled()) {
- logger.debug("Open frame received in " + getId() + " but we're not" +
- "connecting (current state=" + this.state + "). The server might " +
- "have been restarted and lost track of the session.");
+ logger.debug("Open frame received in " + getId() + " but we're not connecting (current state " +
+ this.state + "). The server might have been restarted and lost track of the session.");
}
closeInternal(new CloseStatus(1006, "Server lost session"));
}
@@ -243,10 +245,11 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
private void handleMessageFrame(SockJsFrame frame) {
if (!isOpen()) {
if (logger.isErrorEnabled()) {
- logger.error("Ignoring received message due to state=" + this.state + " in " + this);
+ logger.error("Ignoring received message due to state " + this.state + " in " + this);
}
return;
}
+
String[] messages;
try {
messages = getMessageCodec().decode(frame.getFrameData());
@@ -258,18 +261,18 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
closeInternal(CloseStatus.BAD_DATA);
return;
}
+
if (logger.isTraceEnabled()) {
logger.trace("Processing SockJS message frame " + frame.getContent() + " in " + this);
}
for (String message : messages) {
- try {
- if (isOpen()) {
+ if (isOpen()) {
+ try {
this.webSocketHandler.handleMessage(this, new TextMessage(message));
}
- }
- catch (Throwable ex) {
- Class> type = this.webSocketHandler.getClass();
- logger.error(type + ".handleMessage threw an exception on " + frame + " in " + this, ex);
+ catch (Throwable ex) {
+ logger.error("WebSocketHandler.handleMessage threw an exception on " + frame + " in " + this, ex);
+ }
}
}
}
@@ -300,18 +303,14 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
}
this.webSocketHandler.handleTransportError(this, error);
}
- catch (Exception ex) {
- Class> type = this.webSocketHandler.getClass();
- if (logger.isErrorEnabled()) {
- logger.error(type + ".handleTransportError threw an exception", ex);
- }
+ catch (Throwable ex) {
+ logger.error("WebSocketHandler.handleTransportError threw an exception", ex);
}
}
public void afterTransportClosed(CloseStatus closeStatus) {
this.closeStatus = (this.closeStatus != null ? this.closeStatus : closeStatus);
Assert.state(this.closeStatus != null, "CloseStatus not available");
-
if (logger.isDebugEnabled()) {
logger.debug("Transport closed with " + this.closeStatus + " in " + this);
}
@@ -320,11 +319,8 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
try {
this.webSocketHandler.afterConnectionClosed(this, this.closeStatus);
}
- catch (Exception ex) {
- if (logger.isErrorEnabled()) {
- Class> type = this.webSocketHandler.getClass();
- logger.error(type + ".afterConnectionClosed threw an exception", ex);
- }
+ catch (Throwable ex) {
+ logger.error("WebSocketHandler.afterConnectionClosed threw an exception", ex);
}
}