From bbdb72d8087449ffb8758ac40dfaab60e4bb7f47 Mon Sep 17 00:00:00 2001 From: Sebastien Deleuze Date: Mon, 24 Mar 2014 09:28:33 +0100 Subject: [PATCH] Add configuration for message buffer size limit BufferingStompDecoder message buffer size limit can now be configured with JavaConfig MessageBrokerRegistry.setMessageBufferSizeLimit() or with XML . Issue: SPR-11527 --- .../simp/config/MessageBrokerRegistry.java | 20 +++++++++++++++++- .../simp/stomp/BufferingStompDecoder.java | 2 +- .../MessageBrokerBeanDefinitionParser.java | 11 ++++++++-- .../WebMvcStompEndpointRegistry.java | 7 ++++++- ...cketMessageBrokerConfigurationSupport.java | 5 +++-- .../messaging/StompSubProtocolHandler.java | 8 +++---- .../socket/config/spring-websocket-4.0.xsd | 7 +++++++ ...essageBrokerBeanDefinitionParserTests.java | 3 +++ .../WebMvcStompEndpointRegistryTests.java | 12 +++++++---- ...essageBrokerConfigurationSupportTests.java | 21 +++++++++++++++++++ .../socket/handler/TestWebSocketSession.java | 6 ++++++ .../StompSubProtocolHandlerTests.java | 6 ++++-- .../config/websocket-config-broker-simple.xml | 2 +- src/asciidoc/index.adoc | 13 ++++++++++-- 14 files changed, 103 insertions(+), 20 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java index 0911ef23748..9e48f956ab9 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,6 +47,8 @@ public class MessageBrokerRegistry { private ChannelRegistration brokerChannelRegistration = new ChannelRegistration(); + private Integer messageBufferSizeLimit; + public MessageBrokerRegistry(SubscribableChannel clientInboundChannel, MessageChannel clientOutboundChannel) { Assert.notNull(clientInboundChannel); @@ -119,6 +121,22 @@ public class MessageBrokerRegistry { return this.brokerChannelRegistration; } + /** + * Configure the message buffer size limit in bytes. + * @since 4.0.3 + */ + public MessageBrokerRegistry setMessageBufferSizeLimit(Integer messageBufferSizeLimit) { + this.messageBufferSizeLimit = messageBufferSizeLimit; + return this; + } + + /** + * Get the message buffer size limit in bytes. + * @since 4.0.3 + */ + public Integer getMessageBufferSizeLimit() { + return this.messageBufferSizeLimit; + } protected SimpleBrokerMessageHandler getSimpleBroker(SubscribableChannel brokerChannel) { if ((this.simpleBrokerRegistration == null) && (this.brokerRelayRegistration == null)) { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BufferingStompDecoder.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BufferingStompDecoder.java index 10d46bf26da..25bc9e2c350 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BufferingStompDecoder.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BufferingStompDecoder.java @@ -31,7 +31,7 @@ import java.util.concurrent.LinkedBlockingQueue; /** - * A an extension of {@link org.springframework.messaging.simp.stomp.StompDecoder} + * An extension of {@link org.springframework.messaging.simp.stomp.StompDecoder} * that chunks any bytes remaining after a single full STOMP frame has been read. * The remaining bytes may contain more STOMP frames or an incomplete STOMP frame. * diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java index 68783e6ad31..2fcda4fe5e9 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java @@ -124,8 +124,11 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { beanName = registerBeanDef(beanDef, parserCxt, source); RuntimeBeanReference userSessionRegistry = new RuntimeBeanReference(beanName); + String frameBufferSizeAttribute = element.getAttribute("message-buffer-size"); + Integer messageBufferSizeLimit = frameBufferSizeAttribute.isEmpty() ? null : Integer.parseInt(frameBufferSizeAttribute); + RuntimeBeanReference subProtocolWsHandler = registerSubProtocolWebSocketHandler( - clientInChannel, clientOutChannel, userSessionRegistry, parserCxt, source); + clientInChannel, clientOutChannel, userSessionRegistry, messageBufferSizeLimit, parserCxt, source); for(Element stompEndpointElem : DomUtils.getChildElementsByTagName(element, "stomp-endpoint")) { @@ -228,10 +231,14 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { private RuntimeBeanReference registerSubProtocolWebSocketHandler( RuntimeBeanReference clientInChannel, RuntimeBeanReference clientOutChannel, - RuntimeBeanReference userSessionRegistry, ParserContext parserCxt, Object source) { + RuntimeBeanReference userSessionRegistry, Integer messageBufferSizeLimit, + ParserContext parserCxt, Object source) { RootBeanDefinition stompHandlerDef = new RootBeanDefinition(StompSubProtocolHandler.class); stompHandlerDef.getPropertyValues().add("userSessionRegistry", userSessionRegistry); + if(messageBufferSizeLimit != null) { + stompHandlerDef.getPropertyValues().add("messageBufferSizeLimit", messageBufferSizeLimit); + } registerBeanDef(stompHandlerDef, parserCxt, source); ConstructorArgumentValues cavs = new ConstructorArgumentValues(); diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebMvcStompEndpointRegistry.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebMvcStompEndpointRegistry.java index af1f9b693b6..9743025a9c8 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebMvcStompEndpointRegistry.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebMvcStompEndpointRegistry.java @@ -21,6 +21,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.messaging.simp.user.UserSessionRegistry; import org.springframework.scheduling.TaskScheduler; import org.springframework.util.Assert; @@ -57,7 +58,8 @@ public class WebMvcStompEndpointRegistry implements StompEndpointRegistry { public WebMvcStompEndpointRegistry(WebSocketHandler webSocketHandler, - UserSessionRegistry userSessionRegistry, TaskScheduler defaultSockJsTaskScheduler) { + UserSessionRegistry userSessionRegistry, TaskScheduler defaultSockJsTaskScheduler, + MessageBrokerRegistry brokerRegistry) { Assert.notNull(webSocketHandler); Assert.notNull(userSessionRegistry); @@ -67,6 +69,9 @@ public class WebMvcStompEndpointRegistry implements StompEndpointRegistry { this.stompHandler = new StompSubProtocolHandler(); this.stompHandler.setUserSessionRegistry(userSessionRegistry); this.sockJsScheduler = defaultSockJsTaskScheduler; + if(brokerRegistry.getMessageBufferSizeLimit() != null) { + this.stompHandler.setMessageBufferSizeLimit(brokerRegistry.getMessageBufferSizeLimit()); + } } private static SubProtocolWebSocketHandler unwrapSubProtocolWebSocketHandler(WebSocketHandler webSocketHandler) { diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java index 5d3272bb39a..158b6b105c1 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,7 +42,8 @@ public abstract class WebSocketMessageBrokerConfigurationSupport extends Abstrac @Bean public HandlerMapping stompWebSocketHandlerMapping() { WebMvcStompEndpointRegistry registry = new WebMvcStompEndpointRegistry( - subProtocolWebSocketHandler(), userSessionRegistry(), messageBrokerSockJsTaskScheduler()); + subProtocolWebSocketHandler(), userSessionRegistry(), + messageBrokerSockJsTaskScheduler(), getBrokerRegistry()); registerStompEndpoints(registry); return registry.getHandlerMapping(); } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java index 3827eed4436..7b44bda20b0 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java @@ -79,16 +79,16 @@ public class StompSubProtocolHandler implements SubProtocolHandler { /** - * TODO - * @param messageBufferSizeLimit + * Set the message buffer size limit in bytes. + * @since 4.0.3 */ public void setMessageBufferSizeLimit(int messageBufferSizeLimit) { this.messageBufferSizeLimit = messageBufferSizeLimit; } /** - * TODO - * @return + * Get the message buffer size limit in bytes. + * @since 4.0.3 */ public int getMessageBufferSizeLimit() { return this.messageBufferSizeLimit; diff --git a/spring-websocket/src/main/resources/org/springframework/web/socket/config/spring-websocket-4.0.xsd b/spring-websocket/src/main/resources/org/springframework/web/socket/config/spring-websocket-4.0.xsd index 0b902c9d425..15941db2449 100644 --- a/spring-websocket/src/main/resources/org/springframework/web/socket/config/spring-websocket-4.0.xsd +++ b/spring-websocket/src/main/resources/org/springframework/web/socket/config/spring-websocket-4.0.xsd @@ -575,6 +575,13 @@ ]]> + + + + + protocolHandlers = subProtocolWebSocketHandler.getProtocolHandlers(); + for(SubProtocolHandler protocolHandler : protocolHandlers) { + assertTrue(protocolHandler instanceof StompSubProtocolHandler); + DirectFieldAccessor protocolHandlerFieldAccessor = new DirectFieldAccessor(protocolHandler); + assertEquals(123, protocolHandlerFieldAccessor.getPropertyValue("messageBufferSizeLimit")); + } + } + @Controller static class TestController { @@ -133,6 +149,11 @@ public class WebSocketMessageBrokerConfigurationSupportTests { registry.addEndpoint("/simpleBroker"); } + @Override + public void configureMessageBroker(MessageBrokerRegistry registry) { + registry.setMessageBufferSizeLimit(123); + } + } @Configuration diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/handler/TestWebSocketSession.java b/spring-websocket/src/test/java/org/springframework/web/socket/handler/TestWebSocketSession.java index 7d495611909..1918e7110bc 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/handler/TestWebSocketSession.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/handler/TestWebSocketSession.java @@ -62,6 +62,12 @@ public class TestWebSocketSession implements WebSocketSession { private HttpHeaders headers; + public TestWebSocketSession() { + } + + public TestWebSocketSession(String id) { + this.id = id; + } @Override public String getId() { diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/messaging/StompSubProtocolHandlerTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/StompSubProtocolHandlerTests.java index 28718b17164..9f0a292541f 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/messaging/StompSubProtocolHandlerTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/StompSubProtocolHandlerTests.java @@ -146,8 +146,10 @@ public class StompSubProtocolHandlerTests { assertEquals(1, this.session.getSentMessages().size()); TextMessage textMessage = (TextMessage) this.session.getSentMessages().get(0); - List> message = new StompDecoder().decode(ByteBuffer.wrap(textMessage.getPayload().getBytes())); - StompHeaderAccessor replyHeaders = StompHeaderAccessor.wrap(message.get(0)); + + List> messages = new StompDecoder().decode(ByteBuffer.wrap(textMessage.getPayload().getBytes())); + assertEquals(1, messages.size()); + StompHeaderAccessor replyHeaders = StompHeaderAccessor.wrap(messages.get(0)); assertEquals(StompCommand.CONNECTED, replyHeaders.getCommand()); assertEquals("1.1", replyHeaders.getVersion()); diff --git a/spring-websocket/src/test/resources/org/springframework/web/socket/config/websocket-config-broker-simple.xml b/spring-websocket/src/test/resources/org/springframework/web/socket/config/websocket-config-broker-simple.xml index 66a8d5b57f7..e8970ebed35 100644 --- a/spring-websocket/src/test/resources/org/springframework/web/socket/config/websocket-config-broker-simple.xml +++ b/spring-websocket/src/test/resources/org/springframework/web/socket/config/websocket-config-broker-simple.xml @@ -4,7 +4,7 @@ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/websocket http://www.springframework.org/schema/websocket/spring-websocket-4.0.xsd"> - + diff --git a/src/asciidoc/index.adoc b/src/asciidoc/index.adoc index 6297cfb65ed..606a295973e 100644 --- a/src/asciidoc/index.adoc +++ b/src/asciidoc/index.adoc @@ -37563,7 +37563,8 @@ The Spring Framework provides support for using STOMP over WebSocket through the +spring-messaging+ and +spring-websocket+ modules. It's easy to enable it. Here is an example of configuring a STOMP WebSocket endpoint with SockJS fallback -options. The endpoint is available for clients to connect to at URL path `/portfolio`: +options. The endpoint is available for clients to connect to at URL path `/app/portfolio`. +It is configured with a 1 Mbytes message buffer size limit (64 Kbytes by default): [source,java,indent=0] [subs="verbatim,quotes"] @@ -37575,6 +37576,13 @@ options. The endpoint is available for clients to connect to at URL path `/portf @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { + @Override + public void configureMessageBroker(MessageBrokerRegistry config) { + config.setApplicationDestinationPrefixes("/app") + .setMessageBufferSizeLimit(1024*1024) + .enableSimpleBroker("/queue", "/topic"); + } + @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/portfolio").withSockJS(); @@ -37599,10 +37607,11 @@ XML configuration equivalent: http://www.springframework.org/schema/websocket http://www.springframework.org/schema/websocket/spring-websocket-4.0.xsd"> - + + ...