|
|
|
@ -1,5 +1,5 @@ |
|
|
|
/* |
|
|
|
/* |
|
|
|
* Copyright 2002-2021 the original author or authors. |
|
|
|
* Copyright 2002-2022 the original author or authors. |
|
|
|
* |
|
|
|
* |
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
|
|
* you may not use this file except in compliance with the License. |
|
|
|
* you may not use this file except in compliance with the License. |
|
|
|
@ -24,6 +24,7 @@ import java.util.Map; |
|
|
|
|
|
|
|
|
|
|
|
import org.springframework.beans.BeanUtils; |
|
|
|
import org.springframework.beans.BeanUtils; |
|
|
|
import org.springframework.beans.factory.BeanInitializationException; |
|
|
|
import org.springframework.beans.factory.BeanInitializationException; |
|
|
|
|
|
|
|
import org.springframework.beans.factory.annotation.Qualifier; |
|
|
|
import org.springframework.context.ApplicationContext; |
|
|
|
import org.springframework.context.ApplicationContext; |
|
|
|
import org.springframework.context.ApplicationContextAware; |
|
|
|
import org.springframework.context.ApplicationContextAware; |
|
|
|
import org.springframework.context.annotation.Bean; |
|
|
|
import org.springframework.context.annotation.Bean; |
|
|
|
@ -149,8 +150,10 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Bean |
|
|
|
@Bean |
|
|
|
public AbstractSubscribableChannel clientInboundChannel(TaskExecutor clientInboundChannelExecutor) { |
|
|
|
public AbstractSubscribableChannel clientInboundChannel( |
|
|
|
ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientInboundChannelExecutor); |
|
|
|
@Qualifier("clientInboundChannelExecutor") TaskExecutor executor) { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(executor); |
|
|
|
channel.setLogger(SimpLogging.forLog(channel.getLogger())); |
|
|
|
channel.setLogger(SimpLogging.forLog(channel.getLogger())); |
|
|
|
ChannelRegistration reg = getClientInboundChannelRegistration(); |
|
|
|
ChannelRegistration reg = getClientInboundChannelRegistration(); |
|
|
|
if (reg.hasInterceptors()) { |
|
|
|
if (reg.hasInterceptors()) { |
|
|
|
@ -185,8 +188,10 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Bean |
|
|
|
@Bean |
|
|
|
public AbstractSubscribableChannel clientOutboundChannel(TaskExecutor clientOutboundChannelExecutor) { |
|
|
|
public AbstractSubscribableChannel clientOutboundChannel( |
|
|
|
ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientOutboundChannelExecutor); |
|
|
|
@Qualifier("clientOutboundChannelExecutor") TaskExecutor executor) { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(executor); |
|
|
|
channel.setLogger(SimpLogging.forLog(channel.getLogger())); |
|
|
|
channel.setLogger(SimpLogging.forLog(channel.getLogger())); |
|
|
|
ChannelRegistration reg = getClientOutboundChannelRegistration(); |
|
|
|
ChannelRegistration reg = getClientOutboundChannelRegistration(); |
|
|
|
if (reg.hasInterceptors()) { |
|
|
|
if (reg.hasInterceptors()) { |
|
|
|
@ -221,13 +226,14 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Bean |
|
|
|
@Bean |
|
|
|
public AbstractSubscribableChannel brokerChannel(AbstractSubscribableChannel clientInboundChannel, |
|
|
|
public AbstractSubscribableChannel brokerChannel( |
|
|
|
AbstractSubscribableChannel clientOutboundChannel, TaskExecutor brokerChannelExecutor) { |
|
|
|
AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel, |
|
|
|
|
|
|
|
@Qualifier("brokerChannelExecutor") TaskExecutor executor) { |
|
|
|
|
|
|
|
|
|
|
|
MessageBrokerRegistry registry = getBrokerRegistry(clientInboundChannel, clientOutboundChannel); |
|
|
|
MessageBrokerRegistry registry = getBrokerRegistry(clientInboundChannel, clientOutboundChannel); |
|
|
|
ChannelRegistration registration = registry.getBrokerChannelRegistration(); |
|
|
|
ChannelRegistration registration = registry.getBrokerChannelRegistration(); |
|
|
|
ExecutorSubscribableChannel channel = (registration.hasTaskExecutor() ? |
|
|
|
ExecutorSubscribableChannel channel = (registration.hasTaskExecutor() ? |
|
|
|
new ExecutorSubscribableChannel(brokerChannelExecutor) : new ExecutorSubscribableChannel()); |
|
|
|
new ExecutorSubscribableChannel(executor) : new ExecutorSubscribableChannel()); |
|
|
|
registration.interceptors(new ImmutableMessageChannelInterceptor()); |
|
|
|
registration.interceptors(new ImmutableMessageChannelInterceptor()); |
|
|
|
channel.setLogger(SimpLogging.forLog(channel.getLogger())); |
|
|
|
channel.setLogger(SimpLogging.forLog(channel.getLogger())); |
|
|
|
channel.setInterceptors(registration.getInterceptors()); |
|
|
|
channel.setInterceptors(registration.getInterceptors()); |
|
|
|
@ -366,10 +372,10 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC |
|
|
|
|
|
|
|
|
|
|
|
@Bean |
|
|
|
@Bean |
|
|
|
@Nullable |
|
|
|
@Nullable |
|
|
|
public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler(AbstractSubscribableChannel clientInboundChannel, |
|
|
|
public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler( |
|
|
|
AbstractSubscribableChannel clientOutboundChannel, AbstractSubscribableChannel brokerChannel, |
|
|
|
AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel, |
|
|
|
UserDestinationMessageHandler userDestinationMessageHandler, @Nullable MessageHandler userRegistryMessageHandler, |
|
|
|
AbstractSubscribableChannel brokerChannel, UserDestinationMessageHandler userDestinationMessageHandler, |
|
|
|
UserDestinationResolver userDestinationResolver) { |
|
|
|
@Nullable MessageHandler userRegistryMessageHandler, UserDestinationResolver userDestinationResolver) { |
|
|
|
|
|
|
|
|
|
|
|
MessageBrokerRegistry registry = getBrokerRegistry(clientInboundChannel, clientOutboundChannel); |
|
|
|
MessageBrokerRegistry registry = getBrokerRegistry(clientInboundChannel, clientOutboundChannel); |
|
|
|
StompBrokerRelayMessageHandler handler = registry.getStompBrokerRelay(brokerChannel); |
|
|
|
StompBrokerRelayMessageHandler handler = registry.getStompBrokerRelay(brokerChannel); |
|
|
|
@ -411,7 +417,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC |
|
|
|
public MessageHandler userRegistryMessageHandler( |
|
|
|
public MessageHandler userRegistryMessageHandler( |
|
|
|
AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel, |
|
|
|
AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel, |
|
|
|
SimpUserRegistry userRegistry, SimpMessagingTemplate brokerMessagingTemplate, |
|
|
|
SimpUserRegistry userRegistry, SimpMessagingTemplate brokerMessagingTemplate, |
|
|
|
TaskScheduler messageBrokerTaskScheduler) { |
|
|
|
@Qualifier("messageBrokerTaskScheduler") TaskScheduler scheduler) { |
|
|
|
|
|
|
|
|
|
|
|
MessageBrokerRegistry brokerRegistry = getBrokerRegistry(clientInboundChannel, clientOutboundChannel); |
|
|
|
MessageBrokerRegistry brokerRegistry = getBrokerRegistry(clientInboundChannel, clientOutboundChannel); |
|
|
|
if (brokerRegistry.getUserRegistryBroadcast() == null) { |
|
|
|
if (brokerRegistry.getUserRegistryBroadcast() == null) { |
|
|
|
@ -420,7 +426,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC |
|
|
|
Assert.isInstanceOf(MultiServerUserRegistry.class, userRegistry, "MultiServerUserRegistry required"); |
|
|
|
Assert.isInstanceOf(MultiServerUserRegistry.class, userRegistry, "MultiServerUserRegistry required"); |
|
|
|
return new UserRegistryMessageHandler((MultiServerUserRegistry) userRegistry, |
|
|
|
return new UserRegistryMessageHandler((MultiServerUserRegistry) userRegistry, |
|
|
|
brokerMessagingTemplate, brokerRegistry.getUserRegistryBroadcast(), |
|
|
|
brokerMessagingTemplate, brokerRegistry.getUserRegistryBroadcast(), |
|
|
|
messageBrokerTaskScheduler); |
|
|
|
scheduler); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Expose alias for 4.1 compatibility
|
|
|
|
// Expose alias for 4.1 compatibility
|
|
|
|
|