diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index 8451c05268c..a2a267284b1 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; @@ -118,6 +119,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler private final Map connectionHandlers = new ConcurrentHashMap(); + private final Stats stats = new Stats(); + /** * Create a StompBrokerRelayMessageHandler instance with the given message channels @@ -352,6 +355,13 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler return this.headerInitializer; } + /** + * Return a String describing internal state and counters. + */ + public String getStatsInfo() { + return this.stats.toString(); + } + @Override protected void startInternal() { @@ -380,6 +390,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler SystemStompConnectionHandler handler = new SystemStompConnectionHandler(headers); this.connectionHandlers.put(handler.getSessionId(), handler); + this.stats.incrementConnectCount(); this.tcpClient.connect(handler, new FixedIntervalReconnectStrategy(5000)); } @@ -469,6 +480,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } StompConnectionHandler handler = new StompConnectionHandler(sessionId, stompAccessor); this.connectionHandlers.put(sessionId, handler); + this.stats.incrementConnectCount(); this.tcpClient.connect(handler); } else if (StompCommand.DISCONNECT.equals(command)) { @@ -479,6 +491,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } return; } + stats.incrementDisconnectCount(); handler.forward(message, stompAccessor); } else { @@ -615,6 +628,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler */ protected void afterStompConnected(StompHeaderAccessor connectedHeaders) { this.isStompConnected = true; + stats.incrementConnectedCount(); initHeartbeats(connectedHeaders); } @@ -877,4 +891,33 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } } + private class Stats { + + private final AtomicInteger connect = new AtomicInteger(); + + private final AtomicInteger connected = new AtomicInteger(); + + private final AtomicInteger disconnect = new AtomicInteger(); + + + public void incrementConnectCount() { + this.connect.incrementAndGet(); + } + + public void incrementConnectedCount() { + this.connected.incrementAndGet(); + } + + public void incrementDisconnectCount() { + this.disconnect.incrementAndGet(); + } + + public String toString() { + return connectionHandlers.size() + " sessions, " + relayHost + ":" + relayPort + + (isBrokerAvailable() ? " (available)" : " (not available)") + + ", processed CONNECT(" + this.connect.get() + ")-CONNECTED(" + + this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")"; + } + } + } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java index 62a40055ff0..45771ea27b6 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java @@ -80,7 +80,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @Before public void setUp() throws Exception { - logger.debug("Setting up '" + this.testName.getMethodName() + "'"); + logger.debug("Setting up before '" + this.testName.getMethodName() + "'"); this.port = SocketUtils.findAvailableTcpPort(61613); this.responseChannel = new ExecutorSubscribableChannel(); this.responseHandler = new TestMessageHandler(); @@ -116,6 +116,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @After public void tearDown() throws Exception { try { + logger.debug("STOMP broker relay stats: " + this.relay.getStatsInfo()); this.relay.stop(); } finally { diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java index 1b78a8d2dfb..f0e45814990 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java @@ -128,13 +128,13 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { beanName = registerBeanDef(beanDef, parserCxt, source); RuntimeBeanReference userSessionRegistry = new RuntimeBeanReference(beanName); - RuntimeBeanReference subProtocolWsHandler = registerSubProtocolWebSocketHandler( + RuntimeBeanReference subProtocolHandlerDef = registerSubProtocolWebSocketHandler( element, clientInChannel, clientOutChannel, userSessionRegistry, parserCxt, source); for(Element stompEndpointElem : DomUtils.getChildElementsByTagName(element, "stomp-endpoint")) { RuntimeBeanReference httpRequestHandler = registerHttpRequestHandler( - stompEndpointElem, subProtocolWsHandler, parserCxt, source); + stompEndpointElem, subProtocolHandlerDef, parserCxt, source); String pathAttribute = stompEndpointElem.getAttribute("path"); Assert.state(StringUtils.hasText(pathAttribute), "Invalid (no path mapping)"); @@ -155,7 +155,8 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { beanName = "brokerChannel"; channelElem = DomUtils.getChildElementByTagName(element, "broker-channel"); RuntimeBeanReference brokerChannel = getMessageChannel(beanName, channelElem, parserCxt, source); - registerMessageBroker(element, clientInChannel, clientOutChannel, brokerChannel, parserCxt, source); + RootBeanDefinition brokerDef = registerMessageBroker(element, clientInChannel, + clientOutChannel, brokerChannel, parserCxt, source); RuntimeBeanReference messageConverter = registerBrokerMessageConverter(element, parserCxt, source); @@ -176,6 +177,9 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { scopeConfigurerDef.getPropertyValues().add("scopes", scopeMap); registerBeanDefByName("webSocketScopeConfigurer", scopeConfigurerDef, parserCxt, source); + registerWebSocketMessageBrokerStats(subProtocolHandlerDef, brokerDef, clientInChannel, + clientOutChannel, parserCxt, source); + parserCxt.popAndRegisterContainingComponent(); return null; @@ -309,7 +313,7 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { return new RuntimeBeanReference(httpRequestHandlerBeanName); } - private void registerMessageBroker(Element messageBrokerElement, RuntimeBeanReference clientInChannelDef, + private RootBeanDefinition registerMessageBroker(Element messageBrokerElement, RuntimeBeanReference clientInChannelDef, RuntimeBeanReference clientOutChannelDef, RuntimeBeanReference brokerChannelDef, ParserContext parserCxt, Object source) { @@ -321,15 +325,13 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { cavs.addIndexedArgumentValue(1, clientOutChannelDef); cavs.addIndexedArgumentValue(2, brokerChannelDef); + RootBeanDefinition brokerDef; if (simpleBrokerElem != null) { - String prefix = simpleBrokerElem.getAttribute("prefix"); cavs.addIndexedArgumentValue(3, Arrays.asList(StringUtils.tokenizeToStringArray(prefix, ","))); - RootBeanDefinition brokerDef = new RootBeanDefinition(SimpleBrokerMessageHandler.class, cavs, null); - registerBeanDef(brokerDef, parserCxt, source); + brokerDef = new RootBeanDefinition(SimpleBrokerMessageHandler.class, cavs, null); } else if (brokerRelayElem != null) { - String prefix = brokerRelayElem.getAttribute("prefix"); cavs.addIndexedArgumentValue(3, Arrays.asList(StringUtils.tokenizeToStringArray(prefix, ","))); @@ -370,12 +372,15 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { if(!attrValue.isEmpty()) { mpvs.add("virtualHost", attrValue); } - Class handlerType = StompBrokerRelayMessageHandler.class; - RootBeanDefinition messageBrokerDef = new RootBeanDefinition(handlerType, cavs, mpvs); - registerBeanDef(messageBrokerDef, parserCxt, source); + brokerDef = new RootBeanDefinition(handlerType, cavs, mpvs); } - + else { + // Should not happen + throw new IllegalStateException("Neither nor elements found."); + } + registerBeanDef(brokerDef, parserCxt, source); + return brokerDef; } private RuntimeBeanReference registerBrokerMessageConverter(Element element, @@ -407,8 +412,8 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { ConstructorArgumentValues cavs = new ConstructorArgumentValues(); cavs.addIndexedArgumentValue(0, convertersDef); - RootBeanDefinition brokerMessage = new RootBeanDefinition(CompositeMessageConverter.class, cavs, null); - return new RuntimeBeanReference(registerBeanDef(brokerMessage, parserCxt, source)); + RootBeanDefinition messageConverterDef = new RootBeanDefinition(CompositeMessageConverter.class, cavs, null); + return new RuntimeBeanReference(registerBeanDef(messageConverterDef, parserCxt, source)); } private RuntimeBeanReference registerBrokerMessagingTemplate( @@ -481,6 +486,37 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { return new RuntimeBeanReference(userDestinationMessageHandleName); } + private void registerWebSocketMessageBrokerStats(RuntimeBeanReference subProtocolHandlerDef, + RootBeanDefinition brokerDef, RuntimeBeanReference clientInChannel, + RuntimeBeanReference clientOutChannel, ParserContext parserCxt, Object source) { + + RootBeanDefinition statsDef = new RootBeanDefinition(WebSocketMessageBrokerStats.class); + statsDef.getPropertyValues().add("subProtocolWebSocketHandler", subProtocolHandlerDef); + + if (StompBrokerRelayMessageHandler.class.equals(brokerDef.getBeanClass())) { + statsDef.getPropertyValues().add("stompBrokerRelay", brokerDef); + } + + String beanName = clientInChannel.getBeanName() + "Executor"; + if (parserCxt.getRegistry().containsBeanDefinition(beanName)) { + BeanDefinition beanDef = parserCxt.getRegistry().getBeanDefinition(beanName); + statsDef.getPropertyValues().add("inboundChannelExecutor", beanDef); + } + + beanName = clientOutChannel.getBeanName() + "Executor"; + if (parserCxt.getRegistry().containsBeanDefinition(beanName)) { + BeanDefinition beanDef = parserCxt.getRegistry().getBeanDefinition(beanName); + statsDef.getPropertyValues().add("outboundChannelExecutor", beanDef); + } + + beanName = SOCKJS_SCHEDULER_BEAN_NAME; + if (parserCxt.getRegistry().containsBeanDefinition(beanName)) { + BeanDefinition beanDef = parserCxt.getRegistry().getBeanDefinition(beanName); + statsDef.getPropertyValues().add("sockJsTaskScheduler", beanDef); + } + registerBeanDefByName("webSocketMessageBrokerStats", statsDef, parserCxt, source); + } + private static String registerBeanDef(RootBeanDefinition beanDef, ParserContext parserCxt, Object source) { String beanName = parserCxt.getReaderContext().generateBeanName(beanDef); diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/WebSocketMessageBrokerStats.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/WebSocketMessageBrokerStats.java new file mode 100644 index 00000000000..c5ede88445b --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/WebSocketMessageBrokerStats.java @@ -0,0 +1,196 @@ +/* + * Copyright 2002-2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.socket.config; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.web.socket.messaging.StompSubProtocolHandler; +import org.springframework.web.socket.messaging.SubProtocolHandler; +import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler; + +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + + +/** + * A central class for aggregating information about internal state and counters + * from key infrastructure components of the setup that comes with + * {@code @EnableWebSocketMessageBroker} for Java config and + * {@code } for XML. + * + *

By default aggregated information is logged every 15 minutes at INFO level. + * The frequency of logging can be changed via {@link #setLoggingPeriod(long)}. + * + *

This class is declared as a Spring bean by the above configuration with the + * name "webSocketMessageBrokerStats" and can be easily exported to JMX, e.g. with + * the {@link org.springframework.jmx.export.MBeanExporter MBeanExporter}. + * + * @author Rossen Stoyanchev + * @since 4.1 + */ +public class WebSocketMessageBrokerStats { + + private static Log logger = LogFactory.getLog(WebSocketMessageBrokerStats.class); + + + private SubProtocolWebSocketHandler webSocketHandler; + + private StompSubProtocolHandler stompSubProtocolHandler; + + private StompBrokerRelayMessageHandler stompBrokerRelay; + + private ThreadPoolExecutor inboundChannelExecutor; + + private ThreadPoolExecutor outboundChannelExecutor; + + private ScheduledThreadPoolExecutor sockJsTaskScheduler; + + private ScheduledFuture loggingTask; + + private long loggingPeriod = 15 * 60 * 1000; + + + public void setSubProtocolWebSocketHandler(SubProtocolWebSocketHandler webSocketHandler) { + this.webSocketHandler = webSocketHandler; + this.stompSubProtocolHandler = initStompSubProtocolHandler(); + } + + private StompSubProtocolHandler initStompSubProtocolHandler() { + for (SubProtocolHandler handler : this.webSocketHandler.getProtocolHandlers()) { + if (handler instanceof StompSubProtocolHandler) { + return (StompSubProtocolHandler) handler; + } + } + SubProtocolHandler defaultHandler = this.webSocketHandler.getDefaultProtocolHandler(); + if (defaultHandler != null && defaultHandler instanceof StompSubProtocolHandler) { + return (StompSubProtocolHandler) defaultHandler; + } + return null; + } + + public void setStompBrokerRelay(StompBrokerRelayMessageHandler stompBrokerRelay) { + this.stompBrokerRelay = stompBrokerRelay; + } + + public void setInboundChannelExecutor(ThreadPoolTaskExecutor inboundChannelExecutor) { + this.inboundChannelExecutor = inboundChannelExecutor.getThreadPoolExecutor(); + } + + public void setOutboundChannelExecutor(ThreadPoolTaskExecutor outboundChannelExecutor) { + this.outboundChannelExecutor = outboundChannelExecutor.getThreadPoolExecutor(); + } + + public void setSockJsTaskScheduler(ThreadPoolTaskScheduler sockJsTaskScheduler) { + this.sockJsTaskScheduler = sockJsTaskScheduler.getScheduledThreadPoolExecutor(); + this.loggingTask = initLoggingTask(3 * 60 * 1000); + } + + private ScheduledFuture initLoggingTask(long initialDelay) { + if (logger.isInfoEnabled() && this.loggingPeriod > 0) { + return this.sockJsTaskScheduler.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + logger.info(WebSocketMessageBrokerStats.this.toString()); + } + }, initialDelay, this.loggingPeriod, TimeUnit.MILLISECONDS); + } + return null; + } + + /** + * Set the frequency for logging information at INFO level in milliseconds. + * If set 0 or less than 0, the logging task is cancelled. + *

By default this property is set to 30 minutes (30 * 60 * 1000). + */ + public void setLoggingPeriod(long period) { + if (this.loggingTask != null) { + this.loggingTask.cancel(true); + } + this.loggingPeriod = period; + this.loggingTask = initLoggingTask(0); + } + + /** + * Return the configured logging period frequency in milliseconds. + */ + public long getLoggingPeriod() { + return this.loggingPeriod; + } + + /** + * Get stats about WebSocket sessions. + */ + public String getWebSocketSessionStatsInfo() { + return (this.webSocketHandler != null ? this.webSocketHandler.getStatsInfo() : "null"); + } + + /** + * Get stats about STOMP-related WebSocket message processing. + */ + public String getStompSubProtocolStatsInfo() { + return (this.stompSubProtocolHandler != null ? this.stompSubProtocolHandler.getStatsInfo() : "null"); + } + + /** + * Get stats about STOMP broker relay (when using a full-featured STOMP broker). + */ + public String getStompBrokerRelayStatsInfo() { + return (this.stompBrokerRelay != null ? this.stompBrokerRelay.getStatsInfo() : "null"); + } + + /** + * Get stats about the executor processing incoming messages from WebSocket clients. + */ + public String getClientInboundExecutorStatsInfo() { + return (this.inboundChannelExecutor != null ? getExecutorStatsInfo(this.inboundChannelExecutor) : "null"); + } + + /** + * Get stats about the executor processing outgoing messages to WebSocket clients. + */ + public String getClientOutboundExecutorStatsInfo() { + return (this.outboundChannelExecutor != null ? getExecutorStatsInfo(this.outboundChannelExecutor) : "null"); + } + + /** + * Get stats about the SockJS task scheduler. + */ + public String getSockJsTaskSchedulerStatsInfo() { + return (this.sockJsTaskScheduler != null ? getExecutorStatsInfo(this.sockJsTaskScheduler) : "null"); + } + + private String getExecutorStatsInfo(Executor executor) { + String s = executor.toString(); + return s.substring(s.indexOf("pool"), s.length() - 1); + } + + public String toString() { + return "WebSocketSession[" + getWebSocketSessionStatsInfo() + "]" + + ", stompSubProtocol[" + getStompSubProtocolStatsInfo() + "]" + + ", stompBrokerRelay[" + getStompBrokerRelayStatsInfo() + "]" + + ", inboundChannel[" + getClientInboundExecutorStatsInfo() + "]" + + ", outboundChannel" + getClientOutboundExecutorStatsInfo() + "]" + + ", sockJsScheduler[" + getSockJsTaskSchedulerStatsInfo() + "]"; + } + +} diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java index 0af443a4709..98f2812e0bd 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java @@ -22,9 +22,11 @@ import org.springframework.beans.factory.config.CustomScopeConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.messaging.simp.SimpSessionScope; import org.springframework.messaging.simp.config.AbstractMessageBrokerConfiguration; +import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.web.servlet.HandlerMapping; import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.config.WebSocketMessageBrokerStats; import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler; /** @@ -47,6 +49,7 @@ public abstract class WebSocketMessageBrokerConfigurationSupport extends Abstrac protected WebSocketMessageBrokerConfigurationSupport() { } + @Bean public HandlerMapping stompWebSocketHandlerMapping() { @@ -109,4 +112,22 @@ public abstract class WebSocketMessageBrokerConfigurationSupport extends Abstrac return configurer; } + @Bean + public WebSocketMessageBrokerStats webSocketMessageBrokerStats() { + StompBrokerRelayMessageHandler brokerRelay = + stompBrokerRelayMessageHandler() instanceof StompBrokerRelayMessageHandler ? + (StompBrokerRelayMessageHandler) stompBrokerRelayMessageHandler() : null; + + // Ensure STOMP endpoints are registered + stompWebSocketHandlerMapping(); + + WebSocketMessageBrokerStats stats = new WebSocketMessageBrokerStats(); + stats.setSubProtocolWebSocketHandler((SubProtocolWebSocketHandler) subProtocolWebSocketHandler()); + stats.setStompBrokerRelay(brokerRelay); + stats.setInboundChannelExecutor(clientInboundChannelExecutor()); + stats.setOutboundChannelExecutor(clientOutboundChannelExecutor()); + stats.setSockJsTaskScheduler(messageBrokerSockJsTaskScheduler()); + return stats; + } + } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java index 9c42025ab48..a11a8aa2c9b 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -99,6 +100,8 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE private ApplicationEventPublisher eventPublisher; + private final Stats stats = new Stats(); + /** * Configure the maximum size allowed for an incoming STOMP message. @@ -167,6 +170,13 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE this.eventPublisher = applicationEventPublisher; } + /** + * Return a String describing internal state and counters. + */ + public String getStatsInfo() { + return this.stats.toString(); + } + /** * Handle incoming WebSocket messages from clients. @@ -221,8 +231,12 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE if (this.eventPublisher != null) { if (StompCommand.CONNECT.equals(headerAccessor.getCommand())) { + this.stats.incrementConnectCount(); publishEvent(new SessionConnectEvent(this, message)); } + else if (StompCommand.DISCONNECT.equals(headerAccessor.getCommand())) { + this.stats.incrementDisconnectCount(); + } else if (StompCommand.SUBSCRIBE.equals(headerAccessor.getCommand())) { publishEvent(new SessionSubscribeEvent(this, message)); } @@ -298,6 +312,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE } } else if (StompCommand.CONNECTED.equals(command)) { + this.stats.incrementConnectedCount(); stompAccessor = afterStompSessionConnected(message, stompAccessor, session); if (this.eventPublisher != null && StompCommand.CONNECTED.equals(command)) { publishEvent(new SessionConnectedEvent(this, (Message) message)); @@ -467,4 +482,33 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE return MessageBuilder.createMessage(EMPTY_PAYLOAD, headerAccessor.getMessageHeaders()); } + + private class Stats { + + private final AtomicInteger connect = new AtomicInteger(); + + private final AtomicInteger connected = new AtomicInteger(); + + private final AtomicInteger disconnect = new AtomicInteger(); + + + public void incrementConnectCount() { + this.connect.incrementAndGet(); + } + + public void incrementConnectedCount() { + this.connected.incrementAndGet(); + } + + public void incrementDisconnectCount() { + this.disconnect.incrementAndGet(); + } + + + public String toString() { + return "processed CONNECT(" + this.connect.get() + ")-CONNECTED(" + + this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")"; + } + } + } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandler.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandler.java index 534681fc09c..48ebc195ceb 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandler.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; @@ -44,6 +45,8 @@ import org.springframework.web.socket.WebSocketMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator; import org.springframework.web.socket.handler.SessionLimitExceededException; +import org.springframework.web.socket.sockjs.transport.session.PollingSockJsSession; +import org.springframework.web.socket.sockjs.transport.session.StreamingSockJsSession; /** * An implementation of {@link WebSocketHandler} that delegates incoming WebSocket @@ -97,6 +100,8 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, private final ReentrantLock sessionCheckLock = new ReentrantLock(); + private final Stats stats = new Stats(); + private final Object lifecycleMonitor = new Object(); private volatile boolean running = false; @@ -214,6 +219,14 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, } } + /** + * Return a String describing internal state and counters. + */ + public String getStatsInfo() { + return this.stats.toString(); + } + + @Override public final void start() { Assert.isTrue(this.defaultProtocolHandler != null || !this.protocolHandlers.isEmpty(), "No handlers"); @@ -249,6 +262,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { + this.stats.incrementSessionCount(session); session = new ConcurrentWebSocketSessionDecorator(session, getSendTimeLimit(), getSendBufferSizeLimit()); this.sessions.put(session.getId(), new WebSocketSessionHolder(session)); if (logger.isDebugEnabled()) { @@ -323,6 +337,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, catch (SessionLimitExceededException ex) { try { logger.error("Terminating '" + session + "'", ex); + this.stats.incrementLimitExceededCount(); clearSession(session, ex.getStatus()); // clear first, session may be unresponsive session.close(ex.getStatus()); } @@ -381,6 +396,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, "Closing " + holder.getSession() + "."); } try { + this.stats.incrementNoMessagesReceivedCount(); session.close(CloseStatus.SESSION_NOT_RELIABLE); } catch (Throwable t) { @@ -396,6 +412,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { + this.stats.incrementTransportError(); } @Override @@ -407,7 +424,9 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, if (logger.isDebugEnabled()) { logger.debug("Clearing session " + session.getId() + " (" + this.sessions.size() + " remain)"); } - this.sessions.remove(session.getId()); + if (this.sessions.remove(session.getId()) != null) { + this.stats.decrementSessionCount(session); + } findProtocolHandler(session).afterSessionEnded(session, closeStatus, this.clientInboundChannel); } @@ -453,4 +472,67 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, } } + private class Stats { + + private final AtomicInteger total = new AtomicInteger(); + + private final AtomicInteger webSocket = new AtomicInteger(); + + private final AtomicInteger httpStreaming = new AtomicInteger(); + + private final AtomicInteger httpPolling = new AtomicInteger(); + + private final AtomicInteger limitExceeded = new AtomicInteger(); + + private final AtomicInteger noMessagesReceived = new AtomicInteger(); + + private final AtomicInteger transportError = new AtomicInteger(); + + + public void incrementSessionCount(WebSocketSession session) { + getCountFor(session).incrementAndGet(); + this.total.incrementAndGet(); + } + + public void decrementSessionCount(WebSocketSession session) { + getCountFor(session).decrementAndGet(); + } + + public void incrementLimitExceededCount() { + this.limitExceeded.incrementAndGet(); + } + + public void incrementNoMessagesReceivedCount() { + this.noMessagesReceived.incrementAndGet(); + } + + public void incrementTransportError() { + this.transportError.incrementAndGet(); + } + + private AtomicInteger getCountFor(WebSocketSession session) { + if (session instanceof PollingSockJsSession) { + return this.httpPolling; + } + else if (session instanceof StreamingSockJsSession) { + return this.httpStreaming; + } + else { + return this.webSocket; + } + } + + public String toString() { + return SubProtocolWebSocketHandler.this.sessions.size() + + " current WS(" + this.webSocket.get() + + ")-HttpStream(" + this.httpStreaming.get() + + ")-HttpPoll(" + this.httpPolling.get() + "), " + + this.total.get() + " total, " + + (this.limitExceeded.get() + this.noMessagesReceived.get()) + " closed abnormally (" + + this.noMessagesReceived.get() + " connect failure, " + + this.limitExceeded.get() + " send limit, " + + this.transportError.get() + " transport error)"; + } + } + } diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java index 465dcb3fda7..9bb15d1b60c 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java @@ -245,6 +245,17 @@ public class MessageBrokerBeanDefinitionParserTests { catch (NoSuchBeanDefinitionException ex) { // expected } + + String name = "webSocketMessageBrokerStats"; + WebSocketMessageBrokerStats stats = this.appContext.getBean(name, WebSocketMessageBrokerStats.class); + assertEquals("WebSocketSession[0 current WS(0)-HttpStream(0)-HttpPoll(0), " + + "0 total, 0 closed abnormally (0 connect failure, 0 send limit, 0 transport error)], " + + "stompSubProtocol[processed CONNECT(0)-CONNECTED(0)-DISCONNECT(0)], " + + "stompBrokerRelay[0 sessions, relayhost:1234 (not available), processed CONNECT(0)-CONNECTED(0)-DISCONNECT(0)], " + + "inboundChannel[pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0], " + + "outboundChannelpool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0], " + + "sockJsScheduler[pool size = 1, active threads = 0, queued tasks = 1, completed tasks = 0]", + stats.toString()); } @Test diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupportTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupportTests.java index 0f16354d22c..0cb1e5f3e27 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupportTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupportTests.java @@ -44,6 +44,7 @@ import org.springframework.web.servlet.HandlerMapping; import org.springframework.web.servlet.handler.SimpleUrlHandlerMapping; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.config.WebSocketMessageBrokerStats; import org.springframework.web.socket.handler.TestWebSocketSession; import org.springframework.web.socket.messaging.StompSubProtocolHandler; import org.springframework.web.socket.messaging.StompTextMessageBuilder; @@ -51,6 +52,7 @@ import org.springframework.web.socket.messaging.SubProtocolHandler; import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler; import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; /** * Test fixture for @@ -134,6 +136,20 @@ public class WebSocketMessageBrokerConfigurationSupportTests { assertTrue(executor.getRemoveOnCancelPolicy()); } + @Test + public void webSocketMessageBrokerStats() { + String name = "webSocketMessageBrokerStats"; + WebSocketMessageBrokerStats stats = this.config.getBean(name, WebSocketMessageBrokerStats.class); + assertEquals("WebSocketSession[0 current WS(0)-HttpStream(0)-HttpPoll(0), " + + "0 total, 0 closed abnormally (0 connect failure, 0 send limit, 0 transport error)], " + + "stompSubProtocol[processed CONNECT(0)-CONNECTED(0)-DISCONNECT(0)], " + + "stompBrokerRelay[null], " + + "inboundChannel[pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0], " + + "outboundChannelpool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0], " + + "sockJsScheduler[pool size = 1, active threads = 0, queued tasks = 1, completed tasks = 0]", + stats.toString()); + } + @Controller static class TestController { diff --git a/src/asciidoc/index.adoc b/src/asciidoc/index.adoc index 969b2abedb5..c78466ffe9b 100644 --- a/src/asciidoc/index.adoc +++ b/src/asciidoc/index.adoc @@ -38692,6 +38692,63 @@ through any other application instances. +[[websocket-stomp-stats]] +==== Runtime Monitoring + +When using `@EnableWebSocketMessageBroker` or `` key +infrastructure components automatically gather stats and counters that provide +important insight into the internal state of the application. The configuration +also declares a bean of type `WebSocketMessageBrokerStats` that gathers all +available information in one place and by default logs it at INFO once +every 15 minutes. This bean can be exported to JMX through Spring's +`MBeanExporter` for viewing at runtime for example through JDK's jconsole. +Below is a summary of the available information. + +Client WebSocket Sessions:: + Current::: indicates how many client sessions there are + currently with the count further broken down by WebSocket vs HTTP + streaming and polling SockJS sessions. + Total::: indicates how many total sessions have been established. + Abnormally Closed::: + Connect Failures:::: these are sessions that got established but were + closed after not having received any messages within 60 seconds. This is + usually an indication of proxy or network issues. + Send Limit Exceeded:::: sessions closed after exceeding the configured send + timeout or the send buffer limits which can occur with slow clients + (see previous section). + Transport Errors:::: sessions closed after a transport error such as + failure to read or write to a WebSocket connection or + HTTP request/response. + STOMP Frames::: the total number of CONNECT, CONNECTED, and DISCONNECT frames + processed indicating how many clients connected on the STOMP level. Note that + the DISCONNECT count may be lower when sessions get closed abnormally or when + clients close without sending a DISCONNECT frame. +STOMP Broker Relay:: + TCP Connections::: indicates how many TCP connections on behalf of client + WebSocket sessions are established to the broker. This should be equal to the + number of client WebSocket sessions + 1 additional shared "system" connection + for sending messages from within the application. + STOMP Frames::: the total number of CONNECT, CONNECTED, and DISCONNECT frames + forwarded to or received from the broker on behalf of clients. Note that a + DISCONNECT frame is sent to the broker regardless of how the client WebSocket + session was closed. Therefore a lower DISCONNECT frame count is an indication + that the broker is pro-actively closing connections, may be because of a + heartbeat that didn't arrive in time, an invalid input frame, or other. +Client Inbound Channel:: stats from thread pool backing the "clientInboundChannel" + providing insight into the health of incoming message processing. Tasks queueing + up here is an indication the application may be too slow to handle messages. + If there I/O bound tasks (e.g. slow database query, HTTP request to 3rd party + REST API, etc) consider increasing the thread pool size. +Client Outbound Channel:: stats from the thread pool backing the "clientOutboundChannel" + providing insight into the health of broadcasting messages to clients. Tasks + queueing up here is an indication clients are too slow to consume messages. + One way to address this is to increase the thread pool size to accommodate the + number of concurrent slow clients expected. Another option is to reduce the + send timeout and send buffer size limits (see the previous section). +SockJS Task Scheduler:: stats from thread pool of the SockJS task scheduler which + is used to send heartbeats. Note that when heartbeats are negotiated on the + STOMP level the SockJS heartbeats are disabled. + [[websocket-stomp-testing]] ==== Testing Annotated Controller Methods