Browse Source

Reactor2TcpClient constructor with address supplier

Issue: SPR-12452
pull/1745/head
Rossen Stoyanchev 8 years ago
parent
commit
88a17a4b10
  1. 21
      spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistration.java
  2. 9
      spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java
  3. 44
      src/docs/asciidoc/web/websocket.adoc

21
spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistration.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -20,6 +20,7 @@ import org.springframework.lang.Nullable; @@ -20,6 +20,7 @@ import org.springframework.lang.Nullable;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.util.Assert;
/**
@ -51,6 +52,9 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration { @@ -51,6 +52,9 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {
@Nullable
private String virtualHost;
@Nullable
private TcpOperations<byte[]> tcpClient;
private boolean autoStartup = true;
@Nullable
@ -166,6 +170,18 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration { @@ -166,6 +170,18 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {
return this;
}
/**
* Configure a TCP client for managing TCP connections to the STOMP broker.
* <p>By default {@code ReactorNettyTcpClient} is used.
* <p><strong>Note:</strong> when this property is used, any
* {@link #setRelayHost(String) host} or {@link #setRelayPort(int) port}
* specified are effectively ignored.
* @since 4.3.15
*/
public void setTcpClient(TcpOperations<byte[]> tcpClient) {
this.tcpClient = tcpClient;
}
/**
* Configure whether the {@link StompBrokerRelayMessageHandler} should start
* automatically when the Spring ApplicationContext is refreshed.
@ -239,6 +255,9 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration { @@ -239,6 +255,9 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {
if (this.virtualHost != null) {
handler.setVirtualHost(this.virtualHost);
}
if (this.tcpClient != null) {
handler.setTcpClient(this.tcpClient);
}
handler.setAutoStartup(this.autoStartup);

9
spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -340,6 +340,9 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -340,6 +340,9 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
/**
* Configure a TCP client for managing TCP connections to the STOMP broker.
* <p>By default {@link ReactorNettyTcpClient} is used.
* <p><strong>Note:</strong> when this property is used, any
* {@link #setRelayHost(String) host} or {@link #setRelayPort(int) port}
* specified are effectively ignored.
*/
public void setTcpClient(@Nullable TcpOperations<byte[]> tcpClient) {
this.tcpClient = tcpClient;
@ -613,8 +616,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -613,8 +616,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
* the TCP connection, failure to send a message, missed heartbeat, etc.
*/
protected void handleTcpConnectionFailure(String error, @Nullable Throwable ex) {
if (logger.isErrorEnabled()) {
logger.error("TCP connection failure in session " + this.sessionId + ": " + error, ex);
if (logger.isWarnEnabled()) {
logger.warn("TCP connection failure in session " + this.sessionId + ": " + error, ex);
}
try {
sendStompErrorFrameToClient(error);

44
src/docs/asciidoc/web/websocket.adoc

@ -1440,9 +1440,9 @@ values ``guest``/``guest``. @@ -1440,9 +1440,9 @@ values ``guest``/``guest``.
====
The STOMP broker relay always sets the `login` and `passcode` headers on every `CONNECT`
frame that it forwards to the broker on behalf of clients. Therefore WebSocket clients
need not set those headers; they will be ignored. As the following section explains,
instead WebSocket clients should rely on HTTP authentication to protect the WebSocket
endpoint and establish the client identity.
need not set those headers; they will be ignored. As the <<websocket-stomp-authentication>>
section explains, instead WebSocket clients should rely on HTTP authentication to protect
the WebSocket endpoint and establish the client identity.
====
The STOMP broker relay also sends and receives heartbeats to and from the message
@ -1451,13 +1451,43 @@ and receiving heartbeats (10 seconds each by default). If connectivity to the br @@ -1451,13 +1451,43 @@ and receiving heartbeats (10 seconds each by default). If connectivity to the br
is lost, the broker relay will continue to try to reconnect, every 5 seconds,
until it succeeds.
[NOTE]
====
A Spring bean can implement `ApplicationListener<BrokerAvailabilityEvent>` in order
Any Spring bean can implement `ApplicationListener<BrokerAvailabilityEvent>` in order
to receive notifications when the "system" connection to the broker is lost and
re-established. For example a Stock Quote service broadcasting stock quotes can
stop trying to send messages when there is no active "system" connection.
====
By default, the STOMP broker relay always connects, and reconnects as needed if
connectivity is lost, to the same host and port. If you wish to supply multiple addresses,
on each attempt to connect, you can configure a supplier of addresses, instead of a
fixed host and port. For example:
[source,java,indent=0]
[subs="verbatim,quotes"]
----
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
// ...
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableStompBrokerRelay("/queue/", "/topic/").setTcpClient(createTcpClient());
registry.setApplicationDestinationPrefixes("/app");
}
private ReactorNettyTcpClient<byte[]> createTcpClient() {
Consumer<ClientOptions.Builder<?>> builderConsumer = builder -> {
builder.connectAddress(()-> {
// Select address to connect to ...
});
};
return new ReactorNettyTcpClient<>(builderConsumer, new StompReactorNettyCodec());
}
}
----
The STOMP broker relay can also be configured with a `virtualHost` property.
The value of this property will be set as the `host` header of every `CONNECT` frame

Loading…
Cancel
Save