diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistration.java index b6fbfef02e8..dbc10510747 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistration.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistration.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,8 @@ package org.springframework.messaging.simp.config; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler; +import org.springframework.messaging.tcp.TcpOperations; +import org.springframework.messaging.tcp.reactor.Reactor2TcpClient; import org.springframework.util.Assert; /** @@ -47,6 +49,8 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration { private String virtualHost; + private TcpOperations tcpClient; + private boolean autoStartup = true; private String userDestinationBroadcast; @@ -160,6 +164,18 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration { return this; } + /** + * Configure a TCP client for managing TCP connections to the STOMP broker. + * By default {@link Reactor2TcpClient} is used. + *

Note: when this property is used, any + * {@link #setRelayHost(String) host} or {@link #setRelayPort(int) port} + * specified are effectively ignored. + * @since 4.3.15 + */ + public void setTcpClient(TcpOperations tcpClient) { + this.tcpClient = tcpClient; + } + /** * Configure whether the {@link StompBrokerRelayMessageHandler} should start * automatically when the Spring ApplicationContext is refreshed. @@ -231,6 +247,9 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration { if (this.virtualHost != null) { handler.setVirtualHost(this.virtualHost); } + if (this.tcpClient != null) { + handler.setTcpClient(this.tcpClient); + } handler.setAutoStartup(this.autoStartup); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index 84e8a055207..6f0bff4487f 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -334,6 +334,9 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler /** * Configure a TCP client for managing TCP connections to the STOMP broker. * By default {@link Reactor2TcpClient} is used. + *

Note: when this property is used, any + * {@link #setRelayHost(String) host} or {@link #setRelayPort(int) port} + * specified are effectively ignored. */ public void setTcpClient(TcpOperations tcpClient) { this.tcpClient = tcpClient; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java index 0b8106aa6f5..c44e3f80aca 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,6 +35,7 @@ import reactor.core.config.ReactorConfiguration; import reactor.core.support.NamedDaemonThreadFactory; import reactor.fn.Consumer; import reactor.fn.Function; +import reactor.fn.Supplier; import reactor.fn.tuple.Tuple; import reactor.fn.tuple.Tuple2; import reactor.io.buffer.Buffer; @@ -65,10 +66,10 @@ import org.springframework.util.concurrent.ListenableFuture; /** * An implementation of {@link org.springframework.messaging.tcp.TcpOperations} - * based on the TCP client support of the Reactor project. + * based on the TCP client support of project Reactor. * - *

This implementation wraps N (Reactor) clients for N {@link #connect} calls, - * i.e. a separate (Reactor) client instance for each connection. + *

This implementation wraps N Reactor {@code TcpClient} instances created + * for N {@link #connect} calls, i.e. once instance per connection. * * @author Rossen Stoyanchev * @author Stephane Maldini @@ -100,13 +101,28 @@ public class Reactor2TcpClient

implements TcpOperations

{ * relying on Netty threads. The number of Netty threads can be tweaked with * the {@code reactor.tcp.ioThreadCount} System property. The network I/O * threads will be shared amongst the active clients. - *

Also see the constructor accepting a ready Reactor - * {@link TcpClientSpec} {@link Function} factory. + * * @param host the host to connect to * @param port the port to connect to * @param codec the codec to use for encoding and decoding the TCP stream */ public Reactor2TcpClient(final String host, final int port, final Codec, Message

> codec) { + this(new FixedAddressSupplier(host, port), codec); + } + + /** + * A variant of {@link #Reactor2TcpClient(String, int, Codec)} that takes a + * supplier of any number of addresses instead of just one host and port. + * This can be used to {@link #connect(TcpConnectionHandler, ReconnectStrategy) + * reconnect} to a different address after the current host becomes unavailable. + * + * @param addressSupplier supplier of addresses to use for connecting + * @param codec the codec to use for encoding and decoding the TCP stream + * @since 4.3.15 + */ + public Reactor2TcpClient(final Supplier addressSupplier, + final Codec, Message

> codec) { + // Reactor 2.0.5 requires NioEventLoopGroup vs 2.0.6+ requires EventLoopGroup final NioEventLoopGroup nioEventLoopGroup = initEventLoopGroup(); this.eventLoopGroup = nioEventLoopGroup; @@ -118,7 +134,7 @@ public class Reactor2TcpClient

implements TcpOperations

{ return spec .env(environment) .codec(codec) - .connect(host, port) + .connect(addressSupplier) .options(createClientSocketOptions()); } @@ -133,10 +149,13 @@ public class Reactor2TcpClient

implements TcpOperations

{ * A constructor with a pre-configured {@link TcpClientSpec} {@link Function} * factory. This might be used to add SSL or specific network parameters to * the generated client configuration. + * *

NOTE: if the client is configured with a thread-creating - * dispatcher, you are responsible for cleaning them, e.g. using + * dispatcher, you are responsible for cleaning them, e.g. via * {@link reactor.core.Dispatcher#shutdown}. - * @param tcpClientSpecFactory the TcpClientSpec {@link Function} to use for each client creation + * + * @param tcpClientSpecFactory the TcpClientSpec {@link Function} to use for + * each client creation */ public Reactor2TcpClient(TcpClientFactory, Message

> tcpClientSpecFactory) { Assert.notNull(tcpClientSpecFactory, "'tcpClientClientFactory' must not be null"); @@ -295,6 +314,21 @@ public class Reactor2TcpClient

implements TcpOperations

{ } + private static class FixedAddressSupplier implements Supplier { + + private final InetSocketAddress address; + + FixedAddressSupplier(String host, int port) { + this.address = new InetSocketAddress(host, port); + } + + @Override + public InetSocketAddress get() { + return this.address; + } + } + + private static class SynchronousDispatcherConfigReader implements ConfigurationReader { @Override diff --git a/src/asciidoc/web-websocket.adoc b/src/asciidoc/web-websocket.adoc index a13c0d64f1b..53db293fbcf 100644 --- a/src/asciidoc/web-websocket.adoc +++ b/src/asciidoc/web-websocket.adoc @@ -1615,8 +1615,8 @@ values ``guest``/``guest``. ==== The STOMP broker relay always sets the `login` and `passcode` headers on every `CONNECT` frame that it forwards to the broker on behalf of clients. Therefore WebSocket clients -need not set those headers; they will be ignored. As the following section explains, -instead WebSocket clients should rely on HTTP authentication to protect the WebSocket +need not set those headers; they will be ignored. As the <> +explains, instead WebSocket clients should rely on HTTP authentication to protect the WebSocket endpoint and establish the client identity. ==== @@ -1626,13 +1626,47 @@ and receiving heartbeats (10 seconds each by default). If connectivity to the br is lost, the broker relay will continue to try to reconnect, every 5 seconds, until it succeeds. -[NOTE] -==== -A Spring bean can implement `ApplicationListener` in order +Any Spring bean can implement `ApplicationListener` in order to receive notifications when the "system" connection to the broker is lost and re-established. For example a Stock Quote service broadcasting stock quotes can stop trying to send messages when there is no active "system" connection. -==== + +By default, the STOMP broker relay always connects, and reconnects as needed if +connectivity is lost, to the same host and port. If you wish to supply multiple addresses, +on each attempt to connect, you can configure a supplier of addresses, instead of a +fixed host and port. For example: + +[source,java,indent=0] +[subs="verbatim,quotes"] +---- +@Configuration +@EnableWebSocketMessageBroker +public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer { + + // ... + + @Override + public void configureMessageBroker(MessageBrokerRegistry registry) { + registry.enableStompBrokerRelay("/queue/", "/topic/").setTcpClient(createTcpClient()); + registry.setApplicationDestinationPrefixes("/app"); + } + + private Reactor2TcpClient createTcpClient() { + + Supplier addressSupplier = new Supplier() { + @Override + public InetSocketAddress get() { + // Select address to connect to ... + } + }; + + StompDecoder decoder = new StompDecoder(); + Reactor2StompCodec codec = new Reactor2StompCodec(new StompEncoder(), decoder); + return new Reactor2TcpClient<>(addressSupplier, codec); + } + +} +---- The STOMP broker relay can also be configured with a `virtualHost` property. The value of this property will be set as the `host` header of every `CONNECT` frame