From 461ba53b39d25a88ae6ab298c642c9cee546551f Mon Sep 17 00:00:00 2001 From: CodeInDreams Date: Thu, 30 Jun 2022 17:52:51 +0800 Subject: [PATCH 1/3] Qualify channelExecutor and taskScheduler in WebSocket config See gh-28736 --- .../config/AbstractMessageBrokerConfiguration.java | 14 +++++++++----- ...WebSocketMessageBrokerConfigurationSupport.java | 7 +++++-- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java index fb4150ad7b0..e1cd0989f1c 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java @@ -24,6 +24,7 @@ import java.util.Map; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.BeanInitializationException; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Bean; @@ -149,7 +150,8 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC @Bean - public AbstractSubscribableChannel clientInboundChannel(TaskExecutor clientInboundChannelExecutor) { + public AbstractSubscribableChannel clientInboundChannel( + @Qualifier("clientInboundChannelExecutor") TaskExecutor clientInboundChannelExecutor) { ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientInboundChannelExecutor); channel.setLogger(SimpLogging.forLog(channel.getLogger())); ChannelRegistration reg = getClientInboundChannelRegistration(); @@ -185,7 +187,8 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC } @Bean - public AbstractSubscribableChannel clientOutboundChannel(TaskExecutor clientOutboundChannelExecutor) { + public AbstractSubscribableChannel clientOutboundChannel( + @Qualifier("clientOutboundChannelExecutor") TaskExecutor clientOutboundChannelExecutor) { ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientOutboundChannelExecutor); channel.setLogger(SimpLogging.forLog(channel.getLogger())); ChannelRegistration reg = getClientOutboundChannelRegistration(); @@ -221,8 +224,9 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC } @Bean - public AbstractSubscribableChannel brokerChannel(AbstractSubscribableChannel clientInboundChannel, - AbstractSubscribableChannel clientOutboundChannel, TaskExecutor brokerChannelExecutor) { + public AbstractSubscribableChannel brokerChannel( + AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel, + @Qualifier("brokerChannelExecutor") TaskExecutor brokerChannelExecutor) { MessageBrokerRegistry registry = getBrokerRegistry(clientInboundChannel, clientOutboundChannel); ChannelRegistration registration = registry.getBrokerChannelRegistration(); @@ -411,7 +415,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC public MessageHandler userRegistryMessageHandler( AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel, SimpUserRegistry userRegistry, SimpMessagingTemplate brokerMessagingTemplate, - TaskScheduler messageBrokerTaskScheduler) { + @Qualifier("messageBrokerTaskScheduler") TaskScheduler messageBrokerTaskScheduler) { MessageBrokerRegistry brokerRegistry = getBrokerRegistry(clientInboundChannel, clientOutboundChannel); if (brokerRegistry.getUserRegistryBroadcast() == null) { diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java index e42195502b5..74739ecbe6f 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java @@ -16,6 +16,7 @@ package org.springframework.web.socket.config.annotation; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.config.CustomScopeConfigurer; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; @@ -129,8 +130,10 @@ public abstract class WebSocketMessageBrokerConfigurationSupport extends Abstrac @Bean public WebSocketMessageBrokerStats webSocketMessageBrokerStats( @Nullable AbstractBrokerMessageHandler stompBrokerRelayMessageHandler, - WebSocketHandler subProtocolWebSocketHandler, TaskExecutor clientInboundChannelExecutor, - TaskExecutor clientOutboundChannelExecutor, TaskScheduler messageBrokerTaskScheduler) { + WebSocketHandler subProtocolWebSocketHandler, + @Qualifier("clientInboundChannelExecutor") TaskExecutor clientInboundChannelExecutor, + @Qualifier("clientOutboundChannelExecutor") TaskExecutor clientOutboundChannelExecutor, + @Qualifier("messageBrokerTaskScheduler") TaskScheduler messageBrokerTaskScheduler) { WebSocketMessageBrokerStats stats = new WebSocketMessageBrokerStats(); stats.setSubProtocolWebSocketHandler((SubProtocolWebSocketHandler) subProtocolWebSocketHandler); From 22cc6c5918576de3eee624022b4fb0923b56cf05 Mon Sep 17 00:00:00 2001 From: rstoyanchev Date: Mon, 4 Jul 2022 16:12:57 +0100 Subject: [PATCH 2/3] Polishing contribution Closes gh-28736 --- .../AbstractMessageBrokerConfiguration.java | 28 ++++++++++--------- .../WebSocketConfigurationSupport.java | 8 ++++-- ...cketMessageBrokerConfigurationSupport.java | 14 +++++----- 3 files changed, 27 insertions(+), 23 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java index e1cd0989f1c..fa7ef11fe04 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -151,8 +151,9 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC @Bean public AbstractSubscribableChannel clientInboundChannel( - @Qualifier("clientInboundChannelExecutor") TaskExecutor clientInboundChannelExecutor) { - ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientInboundChannelExecutor); + @Qualifier("clientInboundChannelExecutor") TaskExecutor executor) { + + ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(executor); channel.setLogger(SimpLogging.forLog(channel.getLogger())); ChannelRegistration reg = getClientInboundChannelRegistration(); if (reg.hasInterceptors()) { @@ -188,8 +189,9 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC @Bean public AbstractSubscribableChannel clientOutboundChannel( - @Qualifier("clientOutboundChannelExecutor") TaskExecutor clientOutboundChannelExecutor) { - ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientOutboundChannelExecutor); + @Qualifier("clientOutboundChannelExecutor") TaskExecutor executor) { + + ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(executor); channel.setLogger(SimpLogging.forLog(channel.getLogger())); ChannelRegistration reg = getClientOutboundChannelRegistration(); if (reg.hasInterceptors()) { @@ -226,12 +228,12 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC @Bean public AbstractSubscribableChannel brokerChannel( AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel, - @Qualifier("brokerChannelExecutor") TaskExecutor brokerChannelExecutor) { + @Qualifier("brokerChannelExecutor") TaskExecutor executor) { MessageBrokerRegistry registry = getBrokerRegistry(clientInboundChannel, clientOutboundChannel); ChannelRegistration registration = registry.getBrokerChannelRegistration(); ExecutorSubscribableChannel channel = (registration.hasTaskExecutor() ? - new ExecutorSubscribableChannel(brokerChannelExecutor) : new ExecutorSubscribableChannel()); + new ExecutorSubscribableChannel(executor) : new ExecutorSubscribableChannel()); registration.interceptors(new ImmutableMessageChannelInterceptor()); channel.setLogger(SimpLogging.forLog(channel.getLogger())); channel.setInterceptors(registration.getInterceptors()); @@ -370,10 +372,10 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC @Bean @Nullable - public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler(AbstractSubscribableChannel clientInboundChannel, - AbstractSubscribableChannel clientOutboundChannel, AbstractSubscribableChannel brokerChannel, - UserDestinationMessageHandler userDestinationMessageHandler, @Nullable MessageHandler userRegistryMessageHandler, - UserDestinationResolver userDestinationResolver) { + public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler( + AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel, + AbstractSubscribableChannel brokerChannel, UserDestinationMessageHandler userDestinationMessageHandler, + @Nullable MessageHandler userRegistryMessageHandler, UserDestinationResolver userDestinationResolver) { MessageBrokerRegistry registry = getBrokerRegistry(clientInboundChannel, clientOutboundChannel); StompBrokerRelayMessageHandler handler = registry.getStompBrokerRelay(brokerChannel); @@ -415,7 +417,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC public MessageHandler userRegistryMessageHandler( AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel, SimpUserRegistry userRegistry, SimpMessagingTemplate brokerMessagingTemplate, - @Qualifier("messageBrokerTaskScheduler") TaskScheduler messageBrokerTaskScheduler) { + @Qualifier("messageBrokerTaskScheduler") TaskScheduler scheduler) { MessageBrokerRegistry brokerRegistry = getBrokerRegistry(clientInboundChannel, clientOutboundChannel); if (brokerRegistry.getUserRegistryBroadcast() == null) { @@ -424,7 +426,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC Assert.isInstanceOf(MultiServerUserRegistry.class, userRegistry, "MultiServerUserRegistry required"); return new UserRegistryMessageHandler((MultiServerUserRegistry) userRegistry, brokerMessagingTemplate, brokerRegistry.getUserRegistryBroadcast(), - messageBrokerTaskScheduler); + scheduler); } // Expose alias for 4.1 compatibility diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketConfigurationSupport.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketConfigurationSupport.java index 7a2cb8981b7..a0c57282ed1 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketConfigurationSupport.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketConfigurationSupport.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package org.springframework.web.socket.config.annotation; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.lang.Nullable; import org.springframework.scheduling.TaskScheduler; @@ -40,10 +41,11 @@ public class WebSocketConfigurationSupport { @Bean - public HandlerMapping webSocketHandlerMapping(@Nullable TaskScheduler defaultSockJsTaskScheduler) { + public HandlerMapping webSocketHandlerMapping( + @Qualifier("defaultSockJsTaskScheduler") @Nullable TaskScheduler scheduler) { + ServletWebSocketHandlerRegistry registry = initHandlerRegistry(); if (registry.requiresTaskScheduler()) { - TaskScheduler scheduler = defaultSockJsTaskScheduler; Assert.notNull(scheduler, "Expected default TaskScheduler bean"); registry.setTaskScheduler(scheduler); } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java index 74739ecbe6f..162b3ab52f9 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -131,18 +131,18 @@ public abstract class WebSocketMessageBrokerConfigurationSupport extends Abstrac public WebSocketMessageBrokerStats webSocketMessageBrokerStats( @Nullable AbstractBrokerMessageHandler stompBrokerRelayMessageHandler, WebSocketHandler subProtocolWebSocketHandler, - @Qualifier("clientInboundChannelExecutor") TaskExecutor clientInboundChannelExecutor, - @Qualifier("clientOutboundChannelExecutor") TaskExecutor clientOutboundChannelExecutor, - @Qualifier("messageBrokerTaskScheduler") TaskScheduler messageBrokerTaskScheduler) { + @Qualifier("clientInboundChannelExecutor") TaskExecutor inboundExecutor, + @Qualifier("clientOutboundChannelExecutor") TaskExecutor outboundExecutor, + @Qualifier("messageBrokerTaskScheduler") TaskScheduler scheduler) { WebSocketMessageBrokerStats stats = new WebSocketMessageBrokerStats(); stats.setSubProtocolWebSocketHandler((SubProtocolWebSocketHandler) subProtocolWebSocketHandler); if (stompBrokerRelayMessageHandler instanceof StompBrokerRelayMessageHandler) { stats.setStompBrokerRelay((StompBrokerRelayMessageHandler) stompBrokerRelayMessageHandler); } - stats.setInboundChannelExecutor(clientInboundChannelExecutor); - stats.setOutboundChannelExecutor(clientOutboundChannelExecutor); - stats.setSockJsTaskScheduler(messageBrokerTaskScheduler); + stats.setInboundChannelExecutor(inboundExecutor); + stats.setOutboundChannelExecutor(outboundExecutor); + stats.setSockJsTaskScheduler(scheduler); return stats; } From 007bdede4693332eb9fb857085fd87c389c3ebfa Mon Sep 17 00:00:00 2001 From: rstoyanchev Date: Mon, 4 Jul 2022 16:17:57 +0100 Subject: [PATCH 3/3] Add missing check to avoid re-initialization Noticed during review of #28736 that a check protecting against re-initialization was accidentally removed in commit 3d6e38bb43fe86cc58bc816eff13b968c1e0884f. --- .../annotation/WebSocketConfigurationSupport.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketConfigurationSupport.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketConfigurationSupport.java index a0c57282ed1..37f5b917570 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketConfigurationSupport.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketConfigurationSupport.java @@ -84,12 +84,14 @@ public class WebSocketConfigurationSupport { @Bean @Nullable public TaskScheduler defaultSockJsTaskScheduler() { - if (initHandlerRegistry().requiresTaskScheduler()) { - ThreadPoolTaskScheduler threadPoolScheduler = new ThreadPoolTaskScheduler(); - threadPoolScheduler.setThreadNamePrefix("SockJS-"); - threadPoolScheduler.setPoolSize(Runtime.getRuntime().availableProcessors()); - threadPoolScheduler.setRemoveOnCancelPolicy(true); - this.scheduler = threadPoolScheduler; + if (this.scheduler == null) { + if (initHandlerRegistry().requiresTaskScheduler()) { + ThreadPoolTaskScheduler threadPoolScheduler = new ThreadPoolTaskScheduler(); + threadPoolScheduler.setThreadNamePrefix("SockJS-"); + threadPoolScheduler.setPoolSize(Runtime.getRuntime().availableProcessors()); + threadPoolScheduler.setRemoveOnCancelPolicy(true); + this.scheduler = threadPoolScheduler; + } } return this.scheduler; }