diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistration.java index 835467729e0..376a2b07608 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistration.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistration.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import org.springframework.lang.Nullable; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler; +import org.springframework.messaging.tcp.TcpOperations; import org.springframework.util.Assert; /** @@ -51,6 +52,9 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration { @Nullable private String virtualHost; + @Nullable + private TcpOperations tcpClient; + private boolean autoStartup = true; @Nullable @@ -166,6 +170,18 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration { return this; } + /** + * Configure a TCP client for managing TCP connections to the STOMP broker. + *

By default {@code ReactorNettyTcpClient} is used. + *

Note: when this property is used, any + * {@link #setRelayHost(String) host} or {@link #setRelayPort(int) port} + * specified are effectively ignored. + * @since 4.3.15 + */ + public void setTcpClient(TcpOperations tcpClient) { + this.tcpClient = tcpClient; + } + /** * Configure whether the {@link StompBrokerRelayMessageHandler} should start * automatically when the Spring ApplicationContext is refreshed. @@ -239,6 +255,9 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration { if (this.virtualHost != null) { handler.setVirtualHost(this.virtualHost); } + if (this.tcpClient != null) { + handler.setTcpClient(this.tcpClient); + } handler.setAutoStartup(this.autoStartup); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index ff0733356f0..7a3b28c231f 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -340,6 +340,9 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler /** * Configure a TCP client for managing TCP connections to the STOMP broker. *

By default {@link ReactorNettyTcpClient} is used. + *

Note: when this property is used, any + * {@link #setRelayHost(String) host} or {@link #setRelayPort(int) port} + * specified are effectively ignored. */ public void setTcpClient(@Nullable TcpOperations tcpClient) { this.tcpClient = tcpClient; @@ -613,8 +616,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler * the TCP connection, failure to send a message, missed heartbeat, etc. */ protected void handleTcpConnectionFailure(String error, @Nullable Throwable ex) { - if (logger.isErrorEnabled()) { - logger.error("TCP connection failure in session " + this.sessionId + ": " + error, ex); + if (logger.isWarnEnabled()) { + logger.warn("TCP connection failure in session " + this.sessionId + ": " + error, ex); } try { sendStompErrorFrameToClient(error); diff --git a/src/docs/asciidoc/web/websocket.adoc b/src/docs/asciidoc/web/websocket.adoc index 251784255af..c19407fb6e0 100644 --- a/src/docs/asciidoc/web/websocket.adoc +++ b/src/docs/asciidoc/web/websocket.adoc @@ -1440,9 +1440,9 @@ values ``guest``/``guest``. ==== The STOMP broker relay always sets the `login` and `passcode` headers on every `CONNECT` frame that it forwards to the broker on behalf of clients. Therefore WebSocket clients -need not set those headers; they will be ignored. As the following section explains, -instead WebSocket clients should rely on HTTP authentication to protect the WebSocket -endpoint and establish the client identity. +need not set those headers; they will be ignored. As the <> +section explains, instead WebSocket clients should rely on HTTP authentication to protect +the WebSocket endpoint and establish the client identity. ==== The STOMP broker relay also sends and receives heartbeats to and from the message @@ -1451,13 +1451,43 @@ and receiving heartbeats (10 seconds each by default). If connectivity to the br is lost, the broker relay will continue to try to reconnect, every 5 seconds, until it succeeds. -[NOTE] -==== -A Spring bean can implement `ApplicationListener` in order +Any Spring bean can implement `ApplicationListener` in order to receive notifications when the "system" connection to the broker is lost and re-established. For example a Stock Quote service broadcasting stock quotes can stop trying to send messages when there is no active "system" connection. -==== + +By default, the STOMP broker relay always connects, and reconnects as needed if +connectivity is lost, to the same host and port. If you wish to supply multiple addresses, +on each attempt to connect, you can configure a supplier of addresses, instead of a +fixed host and port. For example: + +[source,java,indent=0] +[subs="verbatim,quotes"] +---- +@Configuration +@EnableWebSocketMessageBroker +public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer { + + // ... + + @Override + public void configureMessageBroker(MessageBrokerRegistry registry) { + registry.enableStompBrokerRelay("/queue/", "/topic/").setTcpClient(createTcpClient()); + registry.setApplicationDestinationPrefixes("/app"); + } + + private ReactorNettyTcpClient createTcpClient() { + + Consumer> builderConsumer = builder -> { + builder.connectAddress(()-> { + // Select address to connect to ... + }); + }; + + return new ReactorNettyTcpClient<>(builderConsumer, new StompReactorNettyCodec()); + } +} +---- The STOMP broker relay can also be configured with a `virtualHost` property. The value of this property will be set as the `host` header of every `CONNECT` frame