diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompRelayPubSubMessageHandler.java b/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompRelayPubSubMessageHandler.java index 7e9948973cc..2e1d31d4294 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompRelayPubSubMessageHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompRelayPubSubMessageHandler.java @@ -54,13 +54,21 @@ import reactor.tcp.netty.NettyTcpClient; * @author Rossen Stoyanchev * @since 4.0 */ -public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler - implements SmartLifecycle { +public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler implements SmartLifecycle { private static final String STOMP_RELAY_SYSTEM_SESSION_ID = "stompRelaySystemSessionId"; + private MessageChannel clientChannel; + private String relayHost = "127.0.0.1"; + + private int relayPort = 61613; + + private String systemLogin = "guest"; + + private String systemPasscode = "guest"; + private final StompMessageConverter stompMessageConverter = new StompMessageConverter(); private MessageConverter payloadConverter; @@ -83,6 +91,67 @@ public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler this.payloadConverter = new CompositeMessageConverter(null); } + + /** + * Set the STOMP message broker host. + */ + public void setRelayHost(String relayHost) { + Assert.hasText(relayHost, "relayHost must not be empty"); + this.relayHost = relayHost; + } + + /** + * @return the STOMP message broker host. + */ + public String getRelayHost() { + return this.relayHost; + } + + /** + * Set the STOMP message broker port. + */ + public void setRelayPort(int relayPort) { + this.relayPort = relayPort; + } + + /** + * @return the STOMP message broker port. + */ + public int getRelayPort() { + return this.relayPort; + } + + /** + * Set the login for a "system" TCP connection used to send messages to the STOMP + * broker without having a client session (e.g. REST/HTTP request handling method). + */ + public void setSystemLogin(String systemLogin) { + Assert.hasText(systemLogin, "systemLogin must not be empty"); + this.systemLogin = systemLogin; + } + + /** + * @return the login for a shared, "system" connection to the STOMP message broker. + */ + public String getSystemLogin() { + return this.systemLogin; + } + + /** + * Set the passcode for a "system" TCP connection used to send messages to the STOMP + * broker without having a client session (e.g. REST/HTTP request handling method). + */ + public void setSystemPasscode(String systemPasscode) { + this.systemPasscode = systemPasscode; + } + + /** + * @return the passcode for a shared, "system" connection to the STOMP message broker. + */ + public String getSystemPasscode() { + return this.systemPasscode; + } + public void setMessageConverters(List converters) { this.payloadConverter = new CompositeMessageConverter(converters); } @@ -113,19 +182,17 @@ public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler public void start() { synchronized (this.lifecycleMonitor) { - // TODO: make this configurable - this.tcpClient = new TcpClient.Spec(NettyTcpClient.class) .using(new Environment()) .codec(new DelimitedCodec((byte) 0, true, StandardCodecs.STRING_CODEC)) - .connect("127.0.0.1", 61616) + .connect(this.relayHost, this.relayPort) .get(); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT); headers.setAcceptVersion("1.1,1.2"); - headers.setLogin("guest"); - headers.setPasscode("guest"); - headers.setHeartbeat(0, 0); + headers.setLogin(this.systemLogin); + headers.setPasscode(this.systemPasscode); + headers.setHeartbeat(0,0); // TODO Message message = MessageBuilder.withPayload( new byte[0]).copyHeaders(headers.toNativeHeaderMap()).build(); @@ -270,7 +337,7 @@ public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler readStompFrame(stompFrame); } }); - stompHeaders.setHeartbeat(0, 0); // TODO + stompHeaders.setHeartbeat(0,0); // TODO forwardInternal(message, stompHeaders, connection); } });