From ab4864da2a136fbdcdb9af9b4e043ae769652b06 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Fri, 4 Jul 2014 15:08:52 -0400 Subject: [PATCH] Add STOMP/WebSocket stats collection This change adds collection of stats in key infrastructure components of the WebSocket message broker config setup and exposes the gathered information for logging and viewing (e.g. via JMX). WebSocketMessageBrokerStats is a single class that assembles all gathered information and by default logs it once every 15 minutes. Application can also easily expose to JMX through an MBeanExporter. A new section in the reference documentation provides a summary of the available information. Issue: SPR-11739 --- .../stomp/StompBrokerRelayMessageHandler.java | 43 ++++ ...erRelayMessageHandlerIntegrationTests.java | 3 +- .../MessageBrokerBeanDefinitionParser.java | 64 ++++-- .../config/WebSocketMessageBrokerStats.java | 196 ++++++++++++++++++ ...cketMessageBrokerConfigurationSupport.java | 21 ++ .../messaging/StompSubProtocolHandler.java | 44 ++++ .../SubProtocolWebSocketHandler.java | 84 +++++++- ...essageBrokerBeanDefinitionParserTests.java | 11 + ...essageBrokerConfigurationSupportTests.java | 16 ++ src/asciidoc/index.adoc | 57 +++++ 10 files changed, 523 insertions(+), 16 deletions(-) create mode 100644 spring-websocket/src/main/java/org/springframework/web/socket/config/WebSocketMessageBrokerStats.java diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index 8451c05268c..a2a267284b1 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; @@ -118,6 +119,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler private final Map connectionHandlers = new ConcurrentHashMap(); + private final Stats stats = new Stats(); + /** * Create a StompBrokerRelayMessageHandler instance with the given message channels @@ -352,6 +355,13 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler return this.headerInitializer; } + /** + * Return a String describing internal state and counters. + */ + public String getStatsInfo() { + return this.stats.toString(); + } + @Override protected void startInternal() { @@ -380,6 +390,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler SystemStompConnectionHandler handler = new SystemStompConnectionHandler(headers); this.connectionHandlers.put(handler.getSessionId(), handler); + this.stats.incrementConnectCount(); this.tcpClient.connect(handler, new FixedIntervalReconnectStrategy(5000)); } @@ -469,6 +480,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } StompConnectionHandler handler = new StompConnectionHandler(sessionId, stompAccessor); this.connectionHandlers.put(sessionId, handler); + this.stats.incrementConnectCount(); this.tcpClient.connect(handler); } else if (StompCommand.DISCONNECT.equals(command)) { @@ -479,6 +491,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } return; } + stats.incrementDisconnectCount(); handler.forward(message, stompAccessor); } else { @@ -615,6 +628,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler */ protected void afterStompConnected(StompHeaderAccessor connectedHeaders) { this.isStompConnected = true; + stats.incrementConnectedCount(); initHeartbeats(connectedHeaders); } @@ -877,4 +891,33 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } } + private class Stats { + + private final AtomicInteger connect = new AtomicInteger(); + + private final AtomicInteger connected = new AtomicInteger(); + + private final AtomicInteger disconnect = new AtomicInteger(); + + + public void incrementConnectCount() { + this.connect.incrementAndGet(); + } + + public void incrementConnectedCount() { + this.connected.incrementAndGet(); + } + + public void incrementDisconnectCount() { + this.disconnect.incrementAndGet(); + } + + public String toString() { + return connectionHandlers.size() + " sessions, " + relayHost + ":" + relayPort + + (isBrokerAvailable() ? " (available)" : " (not available)") + + ", processed CONNECT(" + this.connect.get() + ")-CONNECTED(" + + this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")"; + } + } + } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java index 62a40055ff0..45771ea27b6 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java @@ -80,7 +80,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @Before public void setUp() throws Exception { - logger.debug("Setting up '" + this.testName.getMethodName() + "'"); + logger.debug("Setting up before '" + this.testName.getMethodName() + "'"); this.port = SocketUtils.findAvailableTcpPort(61613); this.responseChannel = new ExecutorSubscribableChannel(); this.responseHandler = new TestMessageHandler(); @@ -116,6 +116,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @After public void tearDown() throws Exception { try { + logger.debug("STOMP broker relay stats: " + this.relay.getStatsInfo()); this.relay.stop(); } finally { diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java index 1b78a8d2dfb..f0e45814990 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java @@ -128,13 +128,13 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { beanName = registerBeanDef(beanDef, parserCxt, source); RuntimeBeanReference userSessionRegistry = new RuntimeBeanReference(beanName); - RuntimeBeanReference subProtocolWsHandler = registerSubProtocolWebSocketHandler( + RuntimeBeanReference subProtocolHandlerDef = registerSubProtocolWebSocketHandler( element, clientInChannel, clientOutChannel, userSessionRegistry, parserCxt, source); for(Element stompEndpointElem : DomUtils.getChildElementsByTagName(element, "stomp-endpoint")) { RuntimeBeanReference httpRequestHandler = registerHttpRequestHandler( - stompEndpointElem, subProtocolWsHandler, parserCxt, source); + stompEndpointElem, subProtocolHandlerDef, parserCxt, source); String pathAttribute = stompEndpointElem.getAttribute("path"); Assert.state(StringUtils.hasText(pathAttribute), "Invalid (no path mapping)"); @@ -155,7 +155,8 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { beanName = "brokerChannel"; channelElem = DomUtils.getChildElementByTagName(element, "broker-channel"); RuntimeBeanReference brokerChannel = getMessageChannel(beanName, channelElem, parserCxt, source); - registerMessageBroker(element, clientInChannel, clientOutChannel, brokerChannel, parserCxt, source); + RootBeanDefinition brokerDef = registerMessageBroker(element, clientInChannel, + clientOutChannel, brokerChannel, parserCxt, source); RuntimeBeanReference messageConverter = registerBrokerMessageConverter(element, parserCxt, source); @@ -176,6 +177,9 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { scopeConfigurerDef.getPropertyValues().add("scopes", scopeMap); registerBeanDefByName("webSocketScopeConfigurer", scopeConfigurerDef, parserCxt, source); + registerWebSocketMessageBrokerStats(subProtocolHandlerDef, brokerDef, clientInChannel, + clientOutChannel, parserCxt, source); + parserCxt.popAndRegisterContainingComponent(); return null; @@ -309,7 +313,7 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { return new RuntimeBeanReference(httpRequestHandlerBeanName); } - private void registerMessageBroker(Element messageBrokerElement, RuntimeBeanReference clientInChannelDef, + private RootBeanDefinition registerMessageBroker(Element messageBrokerElement, RuntimeBeanReference clientInChannelDef, RuntimeBeanReference clientOutChannelDef, RuntimeBeanReference brokerChannelDef, ParserContext parserCxt, Object source) { @@ -321,15 +325,13 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { cavs.addIndexedArgumentValue(1, clientOutChannelDef); cavs.addIndexedArgumentValue(2, brokerChannelDef); + RootBeanDefinition brokerDef; if (simpleBrokerElem != null) { - String prefix = simpleBrokerElem.getAttribute("prefix"); cavs.addIndexedArgumentValue(3, Arrays.asList(StringUtils.tokenizeToStringArray(prefix, ","))); - RootBeanDefinition brokerDef = new RootBeanDefinition(SimpleBrokerMessageHandler.class, cavs, null); - registerBeanDef(brokerDef, parserCxt, source); + brokerDef = new RootBeanDefinition(SimpleBrokerMessageHandler.class, cavs, null); } else if (brokerRelayElem != null) { - String prefix = brokerRelayElem.getAttribute("prefix"); cavs.addIndexedArgumentValue(3, Arrays.asList(StringUtils.tokenizeToStringArray(prefix, ","))); @@ -370,12 +372,15 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { if(!attrValue.isEmpty()) { mpvs.add("virtualHost", attrValue); } - Class handlerType = StompBrokerRelayMessageHandler.class; - RootBeanDefinition messageBrokerDef = new RootBeanDefinition(handlerType, cavs, mpvs); - registerBeanDef(messageBrokerDef, parserCxt, source); + brokerDef = new RootBeanDefinition(handlerType, cavs, mpvs); } - + else { + // Should not happen + throw new IllegalStateException("Neither nor elements found."); + } + registerBeanDef(brokerDef, parserCxt, source); + return brokerDef; } private RuntimeBeanReference registerBrokerMessageConverter(Element element, @@ -407,8 +412,8 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { ConstructorArgumentValues cavs = new ConstructorArgumentValues(); cavs.addIndexedArgumentValue(0, convertersDef); - RootBeanDefinition brokerMessage = new RootBeanDefinition(CompositeMessageConverter.class, cavs, null); - return new RuntimeBeanReference(registerBeanDef(brokerMessage, parserCxt, source)); + RootBeanDefinition messageConverterDef = new RootBeanDefinition(CompositeMessageConverter.class, cavs, null); + return new RuntimeBeanReference(registerBeanDef(messageConverterDef, parserCxt, source)); } private RuntimeBeanReference registerBrokerMessagingTemplate( @@ -481,6 +486,37 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { return new RuntimeBeanReference(userDestinationMessageHandleName); } + private void registerWebSocketMessageBrokerStats(RuntimeBeanReference subProtocolHandlerDef, + RootBeanDefinition brokerDef, RuntimeBeanReference clientInChannel, + RuntimeBeanReference clientOutChannel, ParserContext parserCxt, Object source) { + + RootBeanDefinition statsDef = new RootBeanDefinition(WebSocketMessageBrokerStats.class); + statsDef.getPropertyValues().add("subProtocolWebSocketHandler", subProtocolHandlerDef); + + if (StompBrokerRelayMessageHandler.class.equals(brokerDef.getBeanClass())) { + statsDef.getPropertyValues().add("stompBrokerRelay", brokerDef); + } + + String beanName = clientInChannel.getBeanName() + "Executor"; + if (parserCxt.getRegistry().containsBeanDefinition(beanName)) { + BeanDefinition beanDef = parserCxt.getRegistry().getBeanDefinition(beanName); + statsDef.getPropertyValues().add("inboundChannelExecutor", beanDef); + } + + beanName = clientOutChannel.getBeanName() + "Executor"; + if (parserCxt.getRegistry().containsBeanDefinition(beanName)) { + BeanDefinition beanDef = parserCxt.getRegistry().getBeanDefinition(beanName); + statsDef.getPropertyValues().add("outboundChannelExecutor", beanDef); + } + + beanName = SOCKJS_SCHEDULER_BEAN_NAME; + if (parserCxt.getRegistry().containsBeanDefinition(beanName)) { + BeanDefinition beanDef = parserCxt.getRegistry().getBeanDefinition(beanName); + statsDef.getPropertyValues().add("sockJsTaskScheduler", beanDef); + } + registerBeanDefByName("webSocketMessageBrokerStats", statsDef, parserCxt, source); + } + private static String registerBeanDef(RootBeanDefinition beanDef, ParserContext parserCxt, Object source) { String beanName = parserCxt.getReaderContext().generateBeanName(beanDef); diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/WebSocketMessageBrokerStats.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/WebSocketMessageBrokerStats.java new file mode 100644 index 00000000000..c5ede88445b --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/WebSocketMessageBrokerStats.java @@ -0,0 +1,196 @@ +/* + * Copyright 2002-2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.socket.config; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.web.socket.messaging.StompSubProtocolHandler; +import org.springframework.web.socket.messaging.SubProtocolHandler; +import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler; + +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + + +/** + * A central class for aggregating information about internal state and counters + * from key infrastructure components of the setup that comes with + * {@code @EnableWebSocketMessageBroker} for Java config and + * {@code } for XML. + * + *

By default aggregated information is logged every 15 minutes at INFO level. + * The frequency of logging can be changed via {@link #setLoggingPeriod(long)}. + * + *

This class is declared as a Spring bean by the above configuration with the + * name "webSocketMessageBrokerStats" and can be easily exported to JMX, e.g. with + * the {@link org.springframework.jmx.export.MBeanExporter MBeanExporter}. + * + * @author Rossen Stoyanchev + * @since 4.1 + */ +public class WebSocketMessageBrokerStats { + + private static Log logger = LogFactory.getLog(WebSocketMessageBrokerStats.class); + + + private SubProtocolWebSocketHandler webSocketHandler; + + private StompSubProtocolHandler stompSubProtocolHandler; + + private StompBrokerRelayMessageHandler stompBrokerRelay; + + private ThreadPoolExecutor inboundChannelExecutor; + + private ThreadPoolExecutor outboundChannelExecutor; + + private ScheduledThreadPoolExecutor sockJsTaskScheduler; + + private ScheduledFuture loggingTask; + + private long loggingPeriod = 15 * 60 * 1000; + + + public void setSubProtocolWebSocketHandler(SubProtocolWebSocketHandler webSocketHandler) { + this.webSocketHandler = webSocketHandler; + this.stompSubProtocolHandler = initStompSubProtocolHandler(); + } + + private StompSubProtocolHandler initStompSubProtocolHandler() { + for (SubProtocolHandler handler : this.webSocketHandler.getProtocolHandlers()) { + if (handler instanceof StompSubProtocolHandler) { + return (StompSubProtocolHandler) handler; + } + } + SubProtocolHandler defaultHandler = this.webSocketHandler.getDefaultProtocolHandler(); + if (defaultHandler != null && defaultHandler instanceof StompSubProtocolHandler) { + return (StompSubProtocolHandler) defaultHandler; + } + return null; + } + + public void setStompBrokerRelay(StompBrokerRelayMessageHandler stompBrokerRelay) { + this.stompBrokerRelay = stompBrokerRelay; + } + + public void setInboundChannelExecutor(ThreadPoolTaskExecutor inboundChannelExecutor) { + this.inboundChannelExecutor = inboundChannelExecutor.getThreadPoolExecutor(); + } + + public void setOutboundChannelExecutor(ThreadPoolTaskExecutor outboundChannelExecutor) { + this.outboundChannelExecutor = outboundChannelExecutor.getThreadPoolExecutor(); + } + + public void setSockJsTaskScheduler(ThreadPoolTaskScheduler sockJsTaskScheduler) { + this.sockJsTaskScheduler = sockJsTaskScheduler.getScheduledThreadPoolExecutor(); + this.loggingTask = initLoggingTask(3 * 60 * 1000); + } + + private ScheduledFuture initLoggingTask(long initialDelay) { + if (logger.isInfoEnabled() && this.loggingPeriod > 0) { + return this.sockJsTaskScheduler.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + logger.info(WebSocketMessageBrokerStats.this.toString()); + } + }, initialDelay, this.loggingPeriod, TimeUnit.MILLISECONDS); + } + return null; + } + + /** + * Set the frequency for logging information at INFO level in milliseconds. + * If set 0 or less than 0, the logging task is cancelled. + *

By default this property is set to 30 minutes (30 * 60 * 1000). + */ + public void setLoggingPeriod(long period) { + if (this.loggingTask != null) { + this.loggingTask.cancel(true); + } + this.loggingPeriod = period; + this.loggingTask = initLoggingTask(0); + } + + /** + * Return the configured logging period frequency in milliseconds. + */ + public long getLoggingPeriod() { + return this.loggingPeriod; + } + + /** + * Get stats about WebSocket sessions. + */ + public String getWebSocketSessionStatsInfo() { + return (this.webSocketHandler != null ? this.webSocketHandler.getStatsInfo() : "null"); + } + + /** + * Get stats about STOMP-related WebSocket message processing. + */ + public String getStompSubProtocolStatsInfo() { + return (this.stompSubProtocolHandler != null ? this.stompSubProtocolHandler.getStatsInfo() : "null"); + } + + /** + * Get stats about STOMP broker relay (when using a full-featured STOMP broker). + */ + public String getStompBrokerRelayStatsInfo() { + return (this.stompBrokerRelay != null ? this.stompBrokerRelay.getStatsInfo() : "null"); + } + + /** + * Get stats about the executor processing incoming messages from WebSocket clients. + */ + public String getClientInboundExecutorStatsInfo() { + return (this.inboundChannelExecutor != null ? getExecutorStatsInfo(this.inboundChannelExecutor) : "null"); + } + + /** + * Get stats about the executor processing outgoing messages to WebSocket clients. + */ + public String getClientOutboundExecutorStatsInfo() { + return (this.outboundChannelExecutor != null ? getExecutorStatsInfo(this.outboundChannelExecutor) : "null"); + } + + /** + * Get stats about the SockJS task scheduler. + */ + public String getSockJsTaskSchedulerStatsInfo() { + return (this.sockJsTaskScheduler != null ? getExecutorStatsInfo(this.sockJsTaskScheduler) : "null"); + } + + private String getExecutorStatsInfo(Executor executor) { + String s = executor.toString(); + return s.substring(s.indexOf("pool"), s.length() - 1); + } + + public String toString() { + return "WebSocketSession[" + getWebSocketSessionStatsInfo() + "]" + + ", stompSubProtocol[" + getStompSubProtocolStatsInfo() + "]" + + ", stompBrokerRelay[" + getStompBrokerRelayStatsInfo() + "]" + + ", inboundChannel[" + getClientInboundExecutorStatsInfo() + "]" + + ", outboundChannel" + getClientOutboundExecutorStatsInfo() + "]" + + ", sockJsScheduler[" + getSockJsTaskSchedulerStatsInfo() + "]"; + } + +} diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java index 0af443a4709..98f2812e0bd 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java @@ -22,9 +22,11 @@ import org.springframework.beans.factory.config.CustomScopeConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.messaging.simp.SimpSessionScope; import org.springframework.messaging.simp.config.AbstractMessageBrokerConfiguration; +import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.web.servlet.HandlerMapping; import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.config.WebSocketMessageBrokerStats; import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler; /** @@ -47,6 +49,7 @@ public abstract class WebSocketMessageBrokerConfigurationSupport extends Abstrac protected WebSocketMessageBrokerConfigurationSupport() { } + @Bean public HandlerMapping stompWebSocketHandlerMapping() { @@ -109,4 +112,22 @@ public abstract class WebSocketMessageBrokerConfigurationSupport extends Abstrac return configurer; } + @Bean + public WebSocketMessageBrokerStats webSocketMessageBrokerStats() { + StompBrokerRelayMessageHandler brokerRelay = + stompBrokerRelayMessageHandler() instanceof StompBrokerRelayMessageHandler ? + (StompBrokerRelayMessageHandler) stompBrokerRelayMessageHandler() : null; + + // Ensure STOMP endpoints are registered + stompWebSocketHandlerMapping(); + + WebSocketMessageBrokerStats stats = new WebSocketMessageBrokerStats(); + stats.setSubProtocolWebSocketHandler((SubProtocolWebSocketHandler) subProtocolWebSocketHandler()); + stats.setStompBrokerRelay(brokerRelay); + stats.setInboundChannelExecutor(clientInboundChannelExecutor()); + stats.setOutboundChannelExecutor(clientOutboundChannelExecutor()); + stats.setSockJsTaskScheduler(messageBrokerSockJsTaskScheduler()); + return stats; + } + } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java index 9c42025ab48..a11a8aa2c9b 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -99,6 +100,8 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE private ApplicationEventPublisher eventPublisher; + private final Stats stats = new Stats(); + /** * Configure the maximum size allowed for an incoming STOMP message. @@ -167,6 +170,13 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE this.eventPublisher = applicationEventPublisher; } + /** + * Return a String describing internal state and counters. + */ + public String getStatsInfo() { + return this.stats.toString(); + } + /** * Handle incoming WebSocket messages from clients. @@ -221,8 +231,12 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE if (this.eventPublisher != null) { if (StompCommand.CONNECT.equals(headerAccessor.getCommand())) { + this.stats.incrementConnectCount(); publishEvent(new SessionConnectEvent(this, message)); } + else if (StompCommand.DISCONNECT.equals(headerAccessor.getCommand())) { + this.stats.incrementDisconnectCount(); + } else if (StompCommand.SUBSCRIBE.equals(headerAccessor.getCommand())) { publishEvent(new SessionSubscribeEvent(this, message)); } @@ -298,6 +312,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE } } else if (StompCommand.CONNECTED.equals(command)) { + this.stats.incrementConnectedCount(); stompAccessor = afterStompSessionConnected(message, stompAccessor, session); if (this.eventPublisher != null && StompCommand.CONNECTED.equals(command)) { publishEvent(new SessionConnectedEvent(this, (Message) message)); @@ -467,4 +482,33 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE return MessageBuilder.createMessage(EMPTY_PAYLOAD, headerAccessor.getMessageHeaders()); } + + private class Stats { + + private final AtomicInteger connect = new AtomicInteger(); + + private final AtomicInteger connected = new AtomicInteger(); + + private final AtomicInteger disconnect = new AtomicInteger(); + + + public void incrementConnectCount() { + this.connect.incrementAndGet(); + } + + public void incrementConnectedCount() { + this.connected.incrementAndGet(); + } + + public void incrementDisconnectCount() { + this.disconnect.incrementAndGet(); + } + + + public String toString() { + return "processed CONNECT(" + this.connect.get() + ")-CONNECTED(" + + this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")"; + } + } + } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandler.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandler.java index 534681fc09c..48ebc195ceb 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandler.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; @@ -44,6 +45,8 @@ import org.springframework.web.socket.WebSocketMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator; import org.springframework.web.socket.handler.SessionLimitExceededException; +import org.springframework.web.socket.sockjs.transport.session.PollingSockJsSession; +import org.springframework.web.socket.sockjs.transport.session.StreamingSockJsSession; /** * An implementation of {@link WebSocketHandler} that delegates incoming WebSocket @@ -97,6 +100,8 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, private final ReentrantLock sessionCheckLock = new ReentrantLock(); + private final Stats stats = new Stats(); + private final Object lifecycleMonitor = new Object(); private volatile boolean running = false; @@ -214,6 +219,14 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, } } + /** + * Return a String describing internal state and counters. + */ + public String getStatsInfo() { + return this.stats.toString(); + } + + @Override public final void start() { Assert.isTrue(this.defaultProtocolHandler != null || !this.protocolHandlers.isEmpty(), "No handlers"); @@ -249,6 +262,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { + this.stats.incrementSessionCount(session); session = new ConcurrentWebSocketSessionDecorator(session, getSendTimeLimit(), getSendBufferSizeLimit()); this.sessions.put(session.getId(), new WebSocketSessionHolder(session)); if (logger.isDebugEnabled()) { @@ -323,6 +337,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, catch (SessionLimitExceededException ex) { try { logger.error("Terminating '" + session + "'", ex); + this.stats.incrementLimitExceededCount(); clearSession(session, ex.getStatus()); // clear first, session may be unresponsive session.close(ex.getStatus()); } @@ -381,6 +396,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, "Closing " + holder.getSession() + "."); } try { + this.stats.incrementNoMessagesReceivedCount(); session.close(CloseStatus.SESSION_NOT_RELIABLE); } catch (Throwable t) { @@ -396,6 +412,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { + this.stats.incrementTransportError(); } @Override @@ -407,7 +424,9 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, if (logger.isDebugEnabled()) { logger.debug("Clearing session " + session.getId() + " (" + this.sessions.size() + " remain)"); } - this.sessions.remove(session.getId()); + if (this.sessions.remove(session.getId()) != null) { + this.stats.decrementSessionCount(session); + } findProtocolHandler(session).afterSessionEnded(session, closeStatus, this.clientInboundChannel); } @@ -453,4 +472,67 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, } } + private class Stats { + + private final AtomicInteger total = new AtomicInteger(); + + private final AtomicInteger webSocket = new AtomicInteger(); + + private final AtomicInteger httpStreaming = new AtomicInteger(); + + private final AtomicInteger httpPolling = new AtomicInteger(); + + private final AtomicInteger limitExceeded = new AtomicInteger(); + + private final AtomicInteger noMessagesReceived = new AtomicInteger(); + + private final AtomicInteger transportError = new AtomicInteger(); + + + public void incrementSessionCount(WebSocketSession session) { + getCountFor(session).incrementAndGet(); + this.total.incrementAndGet(); + } + + public void decrementSessionCount(WebSocketSession session) { + getCountFor(session).decrementAndGet(); + } + + public void incrementLimitExceededCount() { + this.limitExceeded.incrementAndGet(); + } + + public void incrementNoMessagesReceivedCount() { + this.noMessagesReceived.incrementAndGet(); + } + + public void incrementTransportError() { + this.transportError.incrementAndGet(); + } + + private AtomicInteger getCountFor(WebSocketSession session) { + if (session instanceof PollingSockJsSession) { + return this.httpPolling; + } + else if (session instanceof StreamingSockJsSession) { + return this.httpStreaming; + } + else { + return this.webSocket; + } + } + + public String toString() { + return SubProtocolWebSocketHandler.this.sessions.size() + + " current WS(" + this.webSocket.get() + + ")-HttpStream(" + this.httpStreaming.get() + + ")-HttpPoll(" + this.httpPolling.get() + "), " + + this.total.get() + " total, " + + (this.limitExceeded.get() + this.noMessagesReceived.get()) + " closed abnormally (" + + this.noMessagesReceived.get() + " connect failure, " + + this.limitExceeded.get() + " send limit, " + + this.transportError.get() + " transport error)"; + } + } + } diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java index 465dcb3fda7..9bb15d1b60c 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java @@ -245,6 +245,17 @@ public class MessageBrokerBeanDefinitionParserTests { catch (NoSuchBeanDefinitionException ex) { // expected } + + String name = "webSocketMessageBrokerStats"; + WebSocketMessageBrokerStats stats = this.appContext.getBean(name, WebSocketMessageBrokerStats.class); + assertEquals("WebSocketSession[0 current WS(0)-HttpStream(0)-HttpPoll(0), " + + "0 total, 0 closed abnormally (0 connect failure, 0 send limit, 0 transport error)], " + + "stompSubProtocol[processed CONNECT(0)-CONNECTED(0)-DISCONNECT(0)], " + + "stompBrokerRelay[0 sessions, relayhost:1234 (not available), processed CONNECT(0)-CONNECTED(0)-DISCONNECT(0)], " + + "inboundChannel[pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0], " + + "outboundChannelpool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0], " + + "sockJsScheduler[pool size = 1, active threads = 0, queued tasks = 1, completed tasks = 0]", + stats.toString()); } @Test diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupportTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupportTests.java index 0f16354d22c..0cb1e5f3e27 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupportTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupportTests.java @@ -44,6 +44,7 @@ import org.springframework.web.servlet.HandlerMapping; import org.springframework.web.servlet.handler.SimpleUrlHandlerMapping; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.config.WebSocketMessageBrokerStats; import org.springframework.web.socket.handler.TestWebSocketSession; import org.springframework.web.socket.messaging.StompSubProtocolHandler; import org.springframework.web.socket.messaging.StompTextMessageBuilder; @@ -51,6 +52,7 @@ import org.springframework.web.socket.messaging.SubProtocolHandler; import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler; import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; /** * Test fixture for @@ -134,6 +136,20 @@ public class WebSocketMessageBrokerConfigurationSupportTests { assertTrue(executor.getRemoveOnCancelPolicy()); } + @Test + public void webSocketMessageBrokerStats() { + String name = "webSocketMessageBrokerStats"; + WebSocketMessageBrokerStats stats = this.config.getBean(name, WebSocketMessageBrokerStats.class); + assertEquals("WebSocketSession[0 current WS(0)-HttpStream(0)-HttpPoll(0), " + + "0 total, 0 closed abnormally (0 connect failure, 0 send limit, 0 transport error)], " + + "stompSubProtocol[processed CONNECT(0)-CONNECTED(0)-DISCONNECT(0)], " + + "stompBrokerRelay[null], " + + "inboundChannel[pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0], " + + "outboundChannelpool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0], " + + "sockJsScheduler[pool size = 1, active threads = 0, queued tasks = 1, completed tasks = 0]", + stats.toString()); + } + @Controller static class TestController { diff --git a/src/asciidoc/index.adoc b/src/asciidoc/index.adoc index 969b2abedb5..c78466ffe9b 100644 --- a/src/asciidoc/index.adoc +++ b/src/asciidoc/index.adoc @@ -38692,6 +38692,63 @@ through any other application instances. +[[websocket-stomp-stats]] +==== Runtime Monitoring + +When using `@EnableWebSocketMessageBroker` or `` key +infrastructure components automatically gather stats and counters that provide +important insight into the internal state of the application. The configuration +also declares a bean of type `WebSocketMessageBrokerStats` that gathers all +available information in one place and by default logs it at INFO once +every 15 minutes. This bean can be exported to JMX through Spring's +`MBeanExporter` for viewing at runtime for example through JDK's jconsole. +Below is a summary of the available information. + +Client WebSocket Sessions:: + Current::: indicates how many client sessions there are + currently with the count further broken down by WebSocket vs HTTP + streaming and polling SockJS sessions. + Total::: indicates how many total sessions have been established. + Abnormally Closed::: + Connect Failures:::: these are sessions that got established but were + closed after not having received any messages within 60 seconds. This is + usually an indication of proxy or network issues. + Send Limit Exceeded:::: sessions closed after exceeding the configured send + timeout or the send buffer limits which can occur with slow clients + (see previous section). + Transport Errors:::: sessions closed after a transport error such as + failure to read or write to a WebSocket connection or + HTTP request/response. + STOMP Frames::: the total number of CONNECT, CONNECTED, and DISCONNECT frames + processed indicating how many clients connected on the STOMP level. Note that + the DISCONNECT count may be lower when sessions get closed abnormally or when + clients close without sending a DISCONNECT frame. +STOMP Broker Relay:: + TCP Connections::: indicates how many TCP connections on behalf of client + WebSocket sessions are established to the broker. This should be equal to the + number of client WebSocket sessions + 1 additional shared "system" connection + for sending messages from within the application. + STOMP Frames::: the total number of CONNECT, CONNECTED, and DISCONNECT frames + forwarded to or received from the broker on behalf of clients. Note that a + DISCONNECT frame is sent to the broker regardless of how the client WebSocket + session was closed. Therefore a lower DISCONNECT frame count is an indication + that the broker is pro-actively closing connections, may be because of a + heartbeat that didn't arrive in time, an invalid input frame, or other. +Client Inbound Channel:: stats from thread pool backing the "clientInboundChannel" + providing insight into the health of incoming message processing. Tasks queueing + up here is an indication the application may be too slow to handle messages. + If there I/O bound tasks (e.g. slow database query, HTTP request to 3rd party + REST API, etc) consider increasing the thread pool size. +Client Outbound Channel:: stats from the thread pool backing the "clientOutboundChannel" + providing insight into the health of broadcasting messages to clients. Tasks + queueing up here is an indication clients are too slow to consume messages. + One way to address this is to increase the thread pool size to accommodate the + number of concurrent slow clients expected. Another option is to reduce the + send timeout and send buffer size limits (see the previous section). +SockJS Task Scheduler:: stats from thread pool of the SockJS task scheduler which + is used to send heartbeats. Note that when heartbeats are negotiated on the + STOMP level the SockJS heartbeats are disabled. + [[websocket-stomp-testing]] ==== Testing Annotated Controller Methods