From 6d5af60a7ca39389efe99aafecdc4f7d877729f0 Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Wed, 27 Jul 2016 12:11:58 +0200 Subject: [PATCH] Expose DefaultSubscriptionRegistry's cache limit through SimpleBrokerMessageHandler and MessageBrokerRegistry Issue: SPR-14516 --- .../broker/DefaultSubscriptionRegistry.java | 36 ++++++------- .../broker/SimpleBrokerMessageHandler.java | 50 +++++++++++++++---- .../AbstractMessageBrokerConfiguration.java | 12 ++--- .../simp/config/MessageBrokerRegistry.java | 38 ++++++++++---- .../MessageBrokerConfigurationTests.java | 21 ++++++-- 5 files changed, 110 insertions(+), 47 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java index e679a46a0f9..bb0e289e650 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java @@ -50,7 +50,7 @@ import org.springframework.util.PathMatcher; * in memory and uses a {@link org.springframework.util.PathMatcher PathMatcher} * for matching destinations. * - *

As of 4.2 this class supports a {@link #setSelectorHeaderName selector} + *

As of 4.2, this class supports a {@link #setSelectorHeaderName selector} * header on subscription messages with Spring EL expressions evaluated against * the headers to filter out messages in addition to destination matching. * @@ -65,11 +65,10 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { public static final int DEFAULT_CACHE_LIMIT = 1024; - /** The maximum number of entries in the cache */ - private volatile int cacheLimit = DEFAULT_CACHE_LIMIT; - private PathMatcher pathMatcher = new AntPathMatcher(); + private volatile int cacheLimit = DEFAULT_CACHE_LIMIT; + private String selectorHeaderName = "selector"; private volatile boolean selectorHeaderInUse = false; @@ -82,32 +81,32 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { /** - * Specify the maximum number of entries for the resolved destination cache. - * Default is 1024. + * Specify the {@link PathMatcher} to use. */ - public void setCacheLimit(int cacheLimit) { - this.cacheLimit = cacheLimit; + public void setPathMatcher(PathMatcher pathMatcher) { + this.pathMatcher = pathMatcher; } /** - * Return the maximum number of entries for the resolved destination cache. + * Return the configured {@link PathMatcher}. */ - public int getCacheLimit() { - return this.cacheLimit; + public PathMatcher getPathMatcher() { + return this.pathMatcher; } /** - * Specify the {@link PathMatcher} to use. + * Specify the maximum number of entries for the resolved destination cache. + * Default is 1024. */ - public void setPathMatcher(PathMatcher pathMatcher) { - this.pathMatcher = pathMatcher; + public void setCacheLimit(int cacheLimit) { + this.cacheLimit = cacheLimit; } /** - * Return the configured {@link PathMatcher}. + * Return the maximum number of entries for the resolved destination cache. */ - public PathMatcher getPathMatcher() { - return this.pathMatcher; + public int getCacheLimit() { + return this.cacheLimit; } /** @@ -123,12 +122,13 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { * @since 4.2 */ public void setSelectorHeaderName(String selectorHeaderName) { - Assert.notNull(selectorHeaderName); + Assert.notNull(selectorHeaderName, "'selectorHeaderName' must not be null"); this.selectorHeaderName = selectorHeaderName; } /** * Return the name for the selector header. + * @since 4.2 */ public String getSelectorHeaderName() { return this.selectorHeaderName; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandler.java index 2b763970808..35722bd2fb6 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandler.java @@ -42,6 +42,7 @@ import org.springframework.util.PathMatcher; * {@link SubscriptionRegistry} and sends messages to subscribers. * * @author Rossen Stoyanchev + * @author Juergen Hoeller * @since 4.0 */ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { @@ -54,6 +55,8 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { private PathMatcher pathMatcher; + private Integer cacheLimit; + private TaskScheduler taskScheduler; private long[] heartbeatValue; @@ -90,14 +93,7 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { Assert.notNull(subscriptionRegistry, "SubscriptionRegistry must not be null"); this.subscriptionRegistry = subscriptionRegistry; initPathMatcherToUse(); - } - - private void initPathMatcherToUse() { - if (this.pathMatcher != null) { - if (this.subscriptionRegistry instanceof DefaultSubscriptionRegistry) { - ((DefaultSubscriptionRegistry) this.subscriptionRegistry).setPathMatcher(this.pathMatcher); - } - } + initCacheLimitToUse(); } public SubscriptionRegistry getSubscriptionRegistry() { @@ -105,14 +101,46 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { } /** - * When configured, the given PathMatcher is passed down to the + * When configured, the given PathMatcher is passed down to the underlying * SubscriptionRegistry to use for matching destination to subscriptions. + *

Default is a standard {@link org.springframework.util.AntPathMatcher}. + * @since 4.1 + * @see #setSubscriptionRegistry + * @see DefaultSubscriptionRegistry#setPathMatcher + * @see org.springframework.util.AntPathMatcher */ public void setPathMatcher(PathMatcher pathMatcher) { this.pathMatcher = pathMatcher; initPathMatcherToUse(); } + private void initPathMatcherToUse() { + if (this.pathMatcher != null && this.subscriptionRegistry instanceof DefaultSubscriptionRegistry) { + ((DefaultSubscriptionRegistry) this.subscriptionRegistry).setPathMatcher(this.pathMatcher); + } + } + + /** + * When configured, the specified cache limit is passed down to the + * underlying SubscriptionRegistry, overriding any default there. + *

With a standard {@link DefaultSubscriptionRegistry}, the default + * cache limit is 1024. + * @since 4.3.2 + * @see #setSubscriptionRegistry + * @see DefaultSubscriptionRegistry#setCacheLimit + * @see DefaultSubscriptionRegistry#DEFAULT_CACHE_LIMIT + */ + public void setCacheLimit(Integer cacheLimit) { + this.cacheLimit = cacheLimit; + initCacheLimitToUse(); + } + + private void initCacheLimitToUse() { + if (this.cacheLimit != null && this.subscriptionRegistry instanceof DefaultSubscriptionRegistry) { + ((DefaultSubscriptionRegistry) this.subscriptionRegistry).setCacheLimit(this.cacheLimit); + } + } + /** * Configure the {@link org.springframework.scheduling.TaskScheduler} to * use for providing heartbeat support. Setting this property also sets the @@ -130,6 +158,7 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { /** * Return the configured TaskScheduler. + * @since 4.2 */ public TaskScheduler getTaskScheduler() { return this.taskScheduler; @@ -151,6 +180,7 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { /** * The configured value for the heart-beat settings. + * @since 4.2 */ public long[] getHeartbeatValue() { return this.heartbeatValue; @@ -160,6 +190,7 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { * Configure a {@link MessageHeaderInitializer} to apply to the headers * of all messages sent to the client outbound channel. *

By default this property is not set. + * @since 4.1 */ public void setHeaderInitializer(MessageHeaderInitializer headerInitializer) { this.headerInitializer = headerInitializer; @@ -167,6 +198,7 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { /** * Return the configured header initializer. + * @since 4.1 */ public MessageHeaderInitializer getHeaderInitializer() { return this.headerInitializer; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java index 1c3c59c51d2..97b2028d435 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java @@ -143,7 +143,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC } /** - * A hook for sub-classes to customize the message channel for inbound messages + * A hook for subclasses to customize the message channel for inbound messages * from WebSocket clients. */ protected void configureClientInboundChannel(ChannelRegistration registration) { @@ -176,7 +176,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC } /** - * A hook for sub-classes to customize the message channel for messages from + * A hook for subclasses to customize the message channel for messages from * the application or message broker to WebSocket clients. */ protected void configureClientOutboundChannel(ChannelRegistration registration) { @@ -224,7 +224,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC } /** - * A hook for sub-classes to customize message broker configuration through the + * A hook for subclasses to customize message broker configuration through the * provided {@link MessageBrokerRegistry} instance. */ protected void configureMessageBroker(MessageBrokerRegistry registry) { @@ -253,7 +253,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC addReturnValueHandlers(returnValueHandlers); handler.setCustomReturnValueHandlers(returnValueHandlers); - PathMatcher pathMatcher = this.getBrokerRegistry().getPathMatcher(); + PathMatcher pathMatcher = getBrokerRegistry().getPathMatcher(); if (pathMatcher != null) { handler.setPathMatcher(pathMatcher); } @@ -261,7 +261,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC } /** - * Protected method for plugging in a custom sub-class of + * Protected method for plugging in a custom subclass of * {@link org.springframework.messaging.simp.annotation.support.SimpAnnotationMethodMessageHandler * SimpAnnotationMethodMessageHandler}. * @since 4.2 @@ -324,7 +324,6 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC } // Expose alias for 4.1 compatibility - @Bean(name={"messageBrokerTaskScheduler", "messageBrokerSockJsTaskScheduler"}) public ThreadPoolTaskScheduler messageBrokerTaskScheduler() { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); @@ -461,6 +460,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC } + private class NoOpBrokerMessageHandler extends AbstractBrokerMessageHandler { public NoOpBrokerMessageHandler() { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java index 1eabf8fd610..3c8dd759d22 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -51,6 +51,8 @@ public class MessageBrokerRegistry { private PathMatcher pathMatcher; + private Integer cacheLimit; + public MessageBrokerRegistry(SubscribableChannel clientInboundChannel, MessageChannel clientOutboundChannel) { Assert.notNull(clientInboundChannel); @@ -96,6 +98,16 @@ public class MessageBrokerRegistry { return this.brokerChannelRegistration; } + protected String getUserDestinationBroadcast() { + return (this.brokerRelayRegistration != null ? + this.brokerRelayRegistration.getUserDestinationBroadcast() : null); + } + + protected String getUserRegistryBroadcast() { + return (this.brokerRelayRegistration != null ? + this.brokerRelayRegistration.getUserRegistryBroadcast() : null); + } + /** * Configure one or more prefixes to filter destinations targeting application * annotated methods. For example destinations prefixed with "/app" may be @@ -137,16 +149,6 @@ public class MessageBrokerRegistry { return this.userDestinationPrefix; } - protected String getUserDestinationBroadcast() { - return (this.brokerRelayRegistration != null ? - this.brokerRelayRegistration.getUserDestinationBroadcast() : null); - } - - protected String getUserRegistryBroadcast() { - return (this.brokerRelayRegistration != null ? - this.brokerRelayRegistration.getUserRegistryBroadcast() : null); - } - /** * Configure the PathMatcher to use to match the destinations of incoming * messages to {@code @MessageMapping} and {@code @SubscribeMapping} methods. @@ -162,6 +164,7 @@ public class MessageBrokerRegistry { *

When the simple broker is enabled, the PathMatcher configured here is * also used to match message destinations when brokering messages. * @since 4.1 + * @see org.springframework.messaging.simp.broker.DefaultSubscriptionRegistry#setPathMatcher */ public MessageBrokerRegistry setPathMatcher(PathMatcher pathMatcher) { this.pathMatcher = pathMatcher; @@ -172,6 +175,18 @@ public class MessageBrokerRegistry { return this.pathMatcher; } + /** + * Configure the cache limit to apply for registrations with the broker. + *

This is currently only applied for the destination cache in the + * subscription registry. The default cache limit there is 1024. + * @since 4.3.2 + * @see org.springframework.messaging.simp.broker.DefaultSubscriptionRegistry#setCacheLimit + */ + public MessageBrokerRegistry setCacheLimit(int cacheLimit) { + this.cacheLimit = cacheLimit; + return this; + } + protected SimpleBrokerMessageHandler getSimpleBroker(SubscribableChannel brokerChannel) { if (this.simpleBrokerRegistration == null && this.brokerRelayRegistration == null) { @@ -180,6 +195,7 @@ public class MessageBrokerRegistry { if (this.simpleBrokerRegistration != null) { SimpleBrokerMessageHandler handler = this.simpleBrokerRegistration.getMessageHandler(brokerChannel); handler.setPathMatcher(this.pathMatcher); + handler.setCacheLimit(this.cacheLimit); return handler; } return null; diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java index 2454289da28..5a868a75ef1 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java @@ -16,9 +16,6 @@ package org.springframework.messaging.simp.config; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -73,6 +70,9 @@ import org.springframework.validation.Errors; import org.springframework.validation.Validator; import org.springframework.validation.beanvalidation.OptionalValidatorFactoryBean; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + /** * Test fixture for {@link AbstractMessageBrokerConfiguration}. * @@ -392,6 +392,13 @@ public class MessageBrokerConfigurationTests { assertEquals(false, new DirectFieldAccessor(resolver).getPropertyValue("keepLeadingSlash")); } + @Test + public void customCacheLimit() { + SimpleBrokerMessageHandler broker = this.customContext.getBean(SimpleBrokerMessageHandler.class); + DefaultSubscriptionRegistry registry = (DefaultSubscriptionRegistry) broker.getSubscriptionRegistry(); + assertEquals(8192, registry.getCacheLimit()); + } + @Test public void userBroadcasts() throws Exception { SimpUserRegistry userRegistry = this.brokerRelayContext.getBean(SimpUserRegistry.class); @@ -441,6 +448,7 @@ public class MessageBrokerConfigurationTests { } } + static class BaseTestMessageBrokerConfig extends AbstractMessageBrokerConfiguration { @Override @@ -449,6 +457,7 @@ public class MessageBrokerConfigurationTests { } } + @SuppressWarnings("unused") @Configuration static class SimpleBrokerConfig extends BaseTestMessageBrokerConfig { @@ -477,6 +486,7 @@ public class MessageBrokerConfigurationTests { } } + @Configuration static class BrokerRelayConfig extends SimpleBrokerConfig { @@ -488,10 +498,12 @@ public class MessageBrokerConfigurationTests { } } + @Configuration static class DefaultConfig extends BaseTestMessageBrokerConfig { } + @Configuration static class CustomConfig extends BaseTestMessageBrokerConfig { @@ -525,6 +537,7 @@ public class MessageBrokerConfigurationTests { registry.configureBrokerChannel().setInterceptors(this.interceptor, this.interceptor, this.interceptor); registry.configureBrokerChannel().taskExecutor().corePoolSize(31).maxPoolSize(32).keepAliveSeconds(33).queueCapacity(34); registry.setPathMatcher(new AntPathMatcher(".")).enableSimpleBroker("/topic", "/queue"); + registry.setCacheLimit(8192); } } @@ -540,6 +553,7 @@ public class MessageBrokerConfigurationTests { } } + private static class TestValidator implements Validator { @Override @@ -552,6 +566,7 @@ public class MessageBrokerConfigurationTests { } } + @SuppressWarnings("serial") private static class CustomThreadPoolTaskExecutor extends ThreadPoolTaskExecutor { }