From f095aa20eb8da585f05c772a17518a7fe40b3ed1 Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Mon, 23 Jan 2017 21:28:40 +0100 Subject: [PATCH] Polishing --- .../stomp/StompBrokerRelayMessageHandler.java | 159 +++++++++--------- .../adapter/AbstractWebSocketSession.java | 3 +- .../client/AbstractClientSockJsSession.java | 108 ++++++------ 3 files changed, 132 insertions(+), 138 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index 9f0dd380763..36357a427ba 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -55,21 +55,21 @@ import org.springframework.util.concurrent.ListenableFutureTask; * connection to the broker is opened and used exclusively for all messages from the * client that originated the CONNECT message. Messages from the same client are * identified through the session id message header. Reversely, when the STOMP broker - * sends messages back on the TCP connection, those messages are enriched with the session - * id of the client and sent back downstream through the {@link MessageChannel} provided - * to the constructor. + * sends messages back on the TCP connection, those messages are enriched with the + * session id of the client and sent back downstream through the {@link MessageChannel} + * provided to the constructor. * - *

This class also automatically opens a default "system" TCP connection to the message - * broker that is used for sending messages that originate from the server application (as - * opposed to from a client). Such messages are not associated with any client and - * therefore do not have a session id header. The "system" connection is effectively - * shared and cannot be used to receive messages. Several properties are provided to - * configure the "system" connection including: + *

This class also automatically opens a default "system" TCP connection to the + * message broker that is used for sending messages that originate from the server + * application (as opposed to from a client). Such messages are not associated with + * any client and therefore do not have a session id header. The "system" connection + * is effectively shared and cannot be used to receive messages. Several properties + * are provided to configure the "system" connection including: *

* * @author Rossen Stoyanchev @@ -80,22 +80,20 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler public static final String SYSTEM_SESSION_ID = "_system_"; - private static final byte[] EMPTY_PAYLOAD = new byte[0]; - - private static final ListenableFutureTask EMPTY_TASK = new ListenableFutureTask<>(new VoidCallable()); - // STOMP recommends error of margin for receiving heartbeats private static final long HEARTBEAT_MULTIPLIER = 3; - private static final Message HEARTBEAT_MESSAGE; - /** - * A heartbeat is setup once a CONNECTED frame is received which contains - * the heartbeat settings we need. If we don't receive CONNECTED within - * a minute, the connection is closed proactively. + * A heartbeat is setup once a CONNECTED frame is received which contains the heartbeat settings + * we need. If we don't receive CONNECTED within a minute, the connection is closed proactively. */ private static final int MAX_TIME_TO_CONNECTED_FRAME = 60 * 1000; + private static final byte[] EMPTY_PAYLOAD = new byte[0]; + + private static final ListenableFutureTask EMPTY_TASK = new ListenableFutureTask<>(new VoidCallable()); + + private static final Message HEARTBEAT_MESSAGE; static { @@ -121,19 +119,18 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler private long systemHeartbeatReceiveInterval = 10000; - private String virtualHost; - private final Map systemSubscriptions = new HashMap<>(4); + private String virtualHost; + private TcpOperations tcpClient; private MessageHeaderInitializer headerInitializer; - private final Map connectionHandlers = - new ConcurrentHashMap<>(); - private final Stats stats = new Stats(); + private final Map connectionHandlers = new ConcurrentHashMap<>(); + /** * Create a StompBrokerRelayMessageHandler instance with the given message channels @@ -179,46 +176,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler public int getRelayPort() { return this.relayPort; } - - /** - * Set the interval, in milliseconds, at which the "system" connection will, in the - * absence of any other data being sent, send a heartbeat to the STOMP broker. A value - * of zero will prevent heartbeats from being sent to the broker. - *

The default value is 10000. - *

See class-level documentation for more information on the "system" connection. - */ - public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) { - this.systemHeartbeatSendInterval = systemHeartbeatSendInterval; - } - - /** - * Return the interval, in milliseconds, at which the "system" connection will - * send heartbeats to the STOMP broker. - */ - public long getSystemHeartbeatSendInterval() { - return this.systemHeartbeatSendInterval; - } - - /** - * Set the maximum interval, in milliseconds, at which the "system" connection - * expects, in the absence of any other data, to receive a heartbeat from the STOMP - * broker. A value of zero will configure the connection to expect not to receive - * heartbeats from the broker. - *

The default value is 10000. - *

See class-level documentation for more information on the "system" connection. - */ - public void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) { - this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval; - } - - /** - * Return the interval, in milliseconds, at which the "system" connection expects - * to receive heartbeats from the STOMP broker. - */ - public long getSystemHeartbeatReceiveInterval() { - return this.systemHeartbeatReceiveInterval; - } - /** * Set the login to use when creating connections to the STOMP broker on * behalf of connected clients. @@ -294,6 +251,46 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler return this.systemPasscode; } + + /** + * Set the interval, in milliseconds, at which the "system" connection will, in the + * absence of any other data being sent, send a heartbeat to the STOMP broker. A value + * of zero will prevent heartbeats from being sent to the broker. + *

The default value is 10000. + *

See class-level documentation for more information on the "system" connection. + */ + public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) { + this.systemHeartbeatSendInterval = systemHeartbeatSendInterval; + } + + /** + * Return the interval, in milliseconds, at which the "system" connection will + * send heartbeats to the STOMP broker. + */ + public long getSystemHeartbeatSendInterval() { + return this.systemHeartbeatSendInterval; + } + + /** + * Set the maximum interval, in milliseconds, at which the "system" connection + * expects, in the absence of any other data, to receive a heartbeat from the STOMP + * broker. A value of zero will configure the connection to expect not to receive + * heartbeats from the broker. + *

The default value is 10000. + *

See class-level documentation for more information on the "system" connection. + */ + public void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) { + this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval; + } + + /** + * Return the interval, in milliseconds, at which the "system" connection expects + * to receive heartbeats from the STOMP broker. + */ + public long getSystemHeartbeatReceiveInterval() { + return this.systemHeartbeatReceiveInterval; + } + /** * Configure one more destinations to subscribe to on the shared "system" * connection along with MessageHandler's to handle received messages. @@ -336,28 +333,21 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler /** * Configure a TCP client for managing TCP connections to the STOMP broker. - * By default {@link ReactorNettyTcpClient} is used. + *

By default {@link ReactorNettyTcpClient} is used. */ public void setTcpClient(TcpOperations tcpClient) { this.tcpClient = tcpClient; } /** - * Get the configured TCP client. Never {@code null} unless not configured + * Get the configured TCP client (never {@code null} unless not configured * invoked and this method is invoked before the handler is started and - * hence a default implementation initialized. + * hence a default implementation initialized). */ public TcpOperations getTcpClient() { return this.tcpClient; } - /** - * Return the current count of TCP connection to the broker. - */ - public int getConnectionCount() { - return this.connectionHandlers.size(); - } - /** * Configure a {@link MessageHeaderInitializer} to apply to the headers of all * messages created through the {@code StompBrokerRelayMessageHandler} that @@ -382,6 +372,13 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler return this.stats.toString(); } + /** + * Return the current count of TCP connection to the broker. + */ + public int getConnectionCount() { + return this.connectionHandlers.size(); + } + @Override protected void startInternal() { @@ -872,6 +869,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } } + private class SystemStompConnectionHandler extends StompConnectionHandler { public SystemStompConnectionHandler(StompHeaderAccessor connectHeaders) { @@ -971,10 +969,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } } + private static class VoidCallable implements Callable { @Override - public Void call() throws Exception { + public Void call() { return null; } } @@ -1001,10 +1000,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } public String toString() { - return connectionHandlers.size() + " sessions, " + relayHost + ":" + relayPort + + return (connectionHandlers.size() + " sessions, " + relayHost + ":" + relayPort + (isBrokerAvailable() ? " (available)" : " (not available)") + ", processed CONNECT(" + this.connect.get() + ")-CONNECTED(" + - this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")"; + this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")"); } } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/adapter/AbstractWebSocketSession.java b/spring-websocket/src/main/java/org/springframework/web/socket/adapter/AbstractWebSocketSession.java index a5e78b6ae01..753c48a8319 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/adapter/AbstractWebSocketSession.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/adapter/AbstractWebSocketSession.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -91,7 +91,6 @@ public abstract class AbstractWebSocketSession implements NativeWebSocketSess @Override public final void sendMessage(WebSocketMessage message) throws IOException { - checkNativeSessionInitialized(); if (logger.isTraceEnabled()) { diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/AbstractClientSockJsSession.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/AbstractClientSockJsSession.java index 0c79969b5cb..227fe6f8aac 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/AbstractClientSockJsSession.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/AbstractClientSockJsSession.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,7 +34,6 @@ import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.WebSocketMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.sockjs.frame.SockJsFrame; -import org.springframework.web.socket.sockjs.frame.SockJsFrameType; import org.springframework.web.socket.sockjs.frame.SockJsMessageCodec; /** @@ -44,20 +43,19 @@ import org.springframework.web.socket.sockjs.frame.SockJsMessageCodec; * Sub-classes implement actual send as well as disconnect logic. * * @author Rossen Stoyanchev + * @author Juergen Hoeller * @since 4.1 */ public abstract class AbstractClientSockJsSession implements WebSocketSession { protected final Log logger = LogFactory.getLog(getClass()); - private final TransportRequest request; private final WebSocketHandler webSocketHandler; private final SettableListenableFuture connectFuture; - private final Map attributes = new ConcurrentHashMap<>(); private volatile State state = State.NEW; @@ -127,25 +125,31 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession { @Override public boolean isOpen() { - return State.OPEN.equals(this.state); + return (this.state == State.OPEN); } public boolean isDisconnected() { - return (State.CLOSING.equals(this.state) || State.CLOSED.equals(this.state)); + return (this.state == State.CLOSING || this.state == State.CLOSED); } @Override public final void sendMessage(WebSocketMessage message) throws IOException { - Assert.state(State.OPEN.equals(this.state), this + " is not open, current state=" + this.state); - Assert.isInstanceOf(TextMessage.class, message, this + " supports text messages only."); + if (!(message instanceof TextMessage)) { + throw new IllegalArgumentException(this + " supports text messages only."); + } + if (this.state != State.OPEN) { + throw new IllegalStateException(this + " is not open: current state " + this.state); + } + String payload = ((TextMessage) message).getPayload(); - payload = getMessageCodec().encode(new String[] { payload }); - payload = payload.substring(1); // the client-side doesn't need message framing (letter "a") - message = new TextMessage(payload); + payload = getMessageCodec().encode(payload); + payload = payload.substring(1); // the client-side doesn't need message framing (letter "a") + + TextMessage messageToSend = new TextMessage(payload); if (logger.isTraceEnabled()) { - logger.trace("Sending message " + message + " in " + this); + logger.trace("Sending message " + messageToSend + " in " + this); } - sendInternal((TextMessage) message); + sendInternal(messageToSend); } protected abstract void sendInternal(TextMessage textMessage) throws IOException; @@ -173,10 +177,13 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession { logger.warn("Ignoring close since connect() was never invoked"); return; } - if (State.CLOSING.equals(this.state) || State.CLOSED.equals(this.state)) { - logger.debug("Ignoring close (already closing or closed), current state=" + this.state); + if (isDisconnected()) { + if (logger.isDebugEnabled()) { + logger.debug("Ignoring close (already closing or closed): current state " + this.state); + } return; } + this.state = State.CLOSING; this.closeStatus = status; try { @@ -193,23 +200,20 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession { public void handleFrame(String payload) { SockJsFrame frame = new SockJsFrame(payload); - if (SockJsFrameType.OPEN.equals(frame.getType())) { - handleOpenFrame(); - } - else if (SockJsFrameType.MESSAGE.equals(frame.getType())) { - handleMessageFrame(frame); - } - else if (SockJsFrameType.CLOSE.equals(frame.getType())) { - handleCloseFrame(frame); - } - else if (SockJsFrameType.HEARTBEAT.equals(frame.getType())) { - if (logger.isTraceEnabled()) { - logger.trace("Received heartbeat in " + this); - } - } - else { - // should never happen - throw new IllegalStateException("Unknown SockJS frame type " + frame + " in " + this); + switch (frame.getType()) { + case OPEN: + handleOpenFrame(); + break; + case HEARTBEAT: + if (logger.isTraceEnabled()) { + logger.trace("Received heartbeat in " + this); + } + break; + case MESSAGE: + handleMessageFrame(frame); + break; + case CLOSE: + handleCloseFrame(frame); } } @@ -217,7 +221,7 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession { if (logger.isDebugEnabled()) { logger.debug("Processing SockJS open frame in " + this); } - if (State.NEW.equals(state)) { + if (this.state == State.NEW) { this.state = State.OPEN; try { this.webSocketHandler.afterConnectionEstablished(this); @@ -225,16 +229,14 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession { } catch (Throwable ex) { if (logger.isErrorEnabled()) { - Class type = this.webSocketHandler.getClass(); - logger.error(type + ".afterConnectionEstablished threw exception in " + this, ex); + logger.error("WebSocketHandler.afterConnectionEstablished threw exception in " + this, ex); } } } else { if (logger.isDebugEnabled()) { - logger.debug("Open frame received in " + getId() + " but we're not" + - "connecting (current state=" + this.state + "). The server might " + - "have been restarted and lost track of the session."); + logger.debug("Open frame received in " + getId() + " but we're not connecting (current state " + + this.state + "). The server might have been restarted and lost track of the session."); } closeInternal(new CloseStatus(1006, "Server lost session")); } @@ -243,10 +245,11 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession { private void handleMessageFrame(SockJsFrame frame) { if (!isOpen()) { if (logger.isErrorEnabled()) { - logger.error("Ignoring received message due to state=" + this.state + " in " + this); + logger.error("Ignoring received message due to state " + this.state + " in " + this); } return; } + String[] messages; try { messages = getMessageCodec().decode(frame.getFrameData()); @@ -258,18 +261,18 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession { closeInternal(CloseStatus.BAD_DATA); return; } + if (logger.isTraceEnabled()) { logger.trace("Processing SockJS message frame " + frame.getContent() + " in " + this); } for (String message : messages) { - try { - if (isOpen()) { + if (isOpen()) { + try { this.webSocketHandler.handleMessage(this, new TextMessage(message)); } - } - catch (Throwable ex) { - Class type = this.webSocketHandler.getClass(); - logger.error(type + ".handleMessage threw an exception on " + frame + " in " + this, ex); + catch (Throwable ex) { + logger.error("WebSocketHandler.handleMessage threw an exception on " + frame + " in " + this, ex); + } } } } @@ -300,18 +303,14 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession { } this.webSocketHandler.handleTransportError(this, error); } - catch (Exception ex) { - Class type = this.webSocketHandler.getClass(); - if (logger.isErrorEnabled()) { - logger.error(type + ".handleTransportError threw an exception", ex); - } + catch (Throwable ex) { + logger.error("WebSocketHandler.handleTransportError threw an exception", ex); } } public void afterTransportClosed(CloseStatus closeStatus) { this.closeStatus = (this.closeStatus != null ? this.closeStatus : closeStatus); Assert.state(this.closeStatus != null, "CloseStatus not available"); - if (logger.isDebugEnabled()) { logger.debug("Transport closed with " + this.closeStatus + " in " + this); } @@ -320,11 +319,8 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession { try { this.webSocketHandler.afterConnectionClosed(this, this.closeStatus); } - catch (Exception ex) { - if (logger.isErrorEnabled()) { - Class type = this.webSocketHandler.getClass(); - logger.error(type + ".afterConnectionClosed threw an exception", ex); - } + catch (Throwable ex) { + logger.error("WebSocketHandler.afterConnectionClosed threw an exception", ex); } }