|
|
|
|
@ -17,7 +17,6 @@
@@ -17,7 +17,6 @@
|
|
|
|
|
package org.springframework.web.messaging.support; |
|
|
|
|
|
|
|
|
|
import java.util.HashSet; |
|
|
|
|
import java.util.List; |
|
|
|
|
import java.util.Set; |
|
|
|
|
|
|
|
|
|
import org.springframework.messaging.Message; |
|
|
|
|
@ -45,12 +44,15 @@ public class PubSubChannelRegistryBuilder {
@@ -45,12 +44,15 @@ public class PubSubChannelRegistryBuilder {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public PubSubChannelRegistryBuilder( |
|
|
|
|
SubscribableChannel<Message<?>, MessageHandler<Message<?>>> clientInputChannel, |
|
|
|
|
SubscribableChannel<Message<?>, MessageHandler<Message<?>>> clientOutputChannel, |
|
|
|
|
MessageHandler<Message<?>> clientGateway) { |
|
|
|
|
|
|
|
|
|
Assert.notNull(clientInputChannel, "clientInputChannel is required"); |
|
|
|
|
Assert.notNull(clientOutputChannel, "clientOutputChannel is required"); |
|
|
|
|
Assert.notNull(clientGateway, "clientGateway is required"); |
|
|
|
|
|
|
|
|
|
this.clientInputChannel = clientInputChannel; |
|
|
|
|
this.clientOutputChannel = clientOutputChannel; |
|
|
|
|
this.clientOutputChannel.subscribe(clientGateway); |
|
|
|
|
this.messageHandlers.add(clientGateway); |
|
|
|
|
@ -58,25 +60,17 @@ public class PubSubChannelRegistryBuilder {
@@ -58,25 +60,17 @@ public class PubSubChannelRegistryBuilder {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public static PubSubChannelRegistryBuilder clientGateway( |
|
|
|
|
SubscribableChannel<Message<?>, MessageHandler<Message<?>>> clientInputChannel, |
|
|
|
|
SubscribableChannel<Message<?>, MessageHandler<Message<?>>> clientOutputChannel, |
|
|
|
|
MessageHandler<Message<?>> clientGateway) { |
|
|
|
|
|
|
|
|
|
return new PubSubChannelRegistryBuilder(clientOutputChannel, clientGateway); |
|
|
|
|
return new PubSubChannelRegistryBuilder(clientInputChannel, clientOutputChannel, clientGateway); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public PubSubChannelRegistryBuilder clientMessageHandlers( |
|
|
|
|
SubscribableChannel<Message<?>, MessageHandler<Message<?>>> clientInputChannel, |
|
|
|
|
List<MessageHandler<Message<?>>> handlers) { |
|
|
|
|
|
|
|
|
|
Assert.notNull(clientInputChannel, "clientInputChannel is required"); |
|
|
|
|
this.clientInputChannel = clientInputChannel; |
|
|
|
|
|
|
|
|
|
for (MessageHandler<Message<?>> handler : handlers) { |
|
|
|
|
this.clientInputChannel.subscribe(handler); |
|
|
|
|
this.messageHandlers.add(handler); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public PubSubChannelRegistryBuilder messageHandler(MessageHandler<Message<?>> handler) { |
|
|
|
|
this.clientInputChannel.subscribe(handler); |
|
|
|
|
this.messageHandlers.add(handler); |
|
|
|
|
return this; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -87,6 +81,10 @@ public class PubSubChannelRegistryBuilder {
@@ -87,6 +81,10 @@ public class PubSubChannelRegistryBuilder {
|
|
|
|
|
Assert.notNull(messageBrokerChannel, "messageBrokerChannel is required"); |
|
|
|
|
Assert.notNull(messageBrokerGateway, "messageBrokerGateway is required"); |
|
|
|
|
|
|
|
|
|
if (!this.messageHandlers.contains(messageBrokerGateway)) { |
|
|
|
|
this.clientInputChannel.subscribe(messageBrokerGateway); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
this.messageBrokerChannel = messageBrokerChannel; |
|
|
|
|
this.messageBrokerChannel.subscribe(messageBrokerGateway); |
|
|
|
|
this.messageHandlers.add(messageBrokerGateway); |
|
|
|
|
|