diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index 08da6d15a8a..7c4017d3b8a 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -47,9 +47,24 @@ import reactor.tuple.Tuple2; /** - * A {@link MessageHandler} that handles messages by forwarding them to a STOMP broker and - * reversely sends any returned messages from the broker to the provided - * {@link MessageChannel}. + * A {@link MessageHandler} that handles messages by forwarding them to a STOMP broker. + * For each new {@link SimpMessageType#CONNECT CONNECT} message, an independent TCP + * connection to the broker is opened and used exclusively for all messages from the + * client that originated the CONNECT message. Messages from the same client are + * identified through the session id message header. Reversely, when the STOMP broker + * sends messages back on the TCP connection, those messages are enriched with the session + * id of the client and sent back downstream through the {@link MessageChannel} provided + * to the constructor. + *
+ * This class also automatically opens a default "system" TCP connection to the message
+ * broker that is used for sending messages that originate from the server application (as
+ * opposed to from a client). Such messages are recognized because they are not associated
+ * with any client and therefore do not have a session id header. The "system" connection
+ * is effectively shared and cannot be used to receive messages. Several properties are
+ * provided to configure the "system" session including the the
+ * {@link #setSystemLogin(String) login} {@link #setSystemPasscode(String) passcode},
+ * heartbeat {@link #setSystemHeartbeatSendInterval(long) send} and
+ * {@link #setSystemHeartbeatReceiveInterval(long) receive} intervals.
*
* @author Rossen Stoyanchev
* @author Andy Wilkinson
@@ -71,6 +86,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
private long systemHeartbeatReceiveInterval = 10000;
+ private String virtualHost;
+
private Environment environment;
private TcpClient
* The default value is 10000.
+ *
+ * See class-level documentation for more information on the "system" session.
*/
public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) {
this.systemHeartbeatSendInterval = systemHeartbeatSendInterval;
@@ -145,6 +164,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
* heartbeats from the broker.
*
* The default value is 10000.
+ *
+ * See class-level documentation for more information on the "system" session.
*/
public void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) {
this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval;
@@ -161,6 +182,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
/**
* Set the login for the "system" relay session used to send messages to the STOMP
* broker without having a client session (e.g. REST/HTTP request handling method).
+ *
+ * See class-level documentation for more information on the "system" session.
*/
public void setSystemLogin(String systemLogin) {
Assert.hasText(systemLogin, "systemLogin must not be empty");
@@ -177,6 +200,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
/**
* Set the passcode for the "system" relay session used to send messages to the STOMP
* broker without having a client session (e.g. REST/HTTP request handling method).
+ *
+ * See class-level documentation for more information on the "system" session.
*/
public void setSystemPasscode(String systemPasscode) {
this.systemPasscode = systemPasscode;
@@ -189,6 +214,26 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
return this.systemPasscode;
}
+ /**
+ * Set the value of the "host" header to use in STOMP CONNECT frames. When this
+ * property is configured, a "host" header will be added to every STOMP frame sent to
+ * the STOMP broker. This may be useful for example in a cloud environment where the
+ * actual host to which the TCP connection is established is different from the host
+ * providing the cloud-based STOMP service.
+ *
+ * By default this property is not set.
+ */
+ public void setVirtualHost(String virtualHost) {
+ this.virtualHost = virtualHost;
+ }
+
+ /**
+ * @return the configured virtual host value.
+ */
+ public String getVirtualHost() {
+ return this.virtualHost;
+ }
+
@Override
protected void startInternal() {
@@ -252,7 +297,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
if (SimpMessageType.CONNECT.equals(messageType)) {
- message = MessageBuilder.withPayload(message.getPayload()).setHeaders(headers).build();
+ if (getVirtualHost() != null) {
+ headers.setHost(getVirtualHost());
+ message = MessageBuilder.withPayload(message.getPayload()).setHeaders(headers).build();
+ }
StompRelaySession session = new StompRelaySession(sessionId);
this.relaySessions.put(sessionId, session);
session.connect(message);
@@ -516,6 +564,9 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
headers.setLogin(systemLogin);
headers.setPasscode(systemPasscode);
headers.setHeartbeat(systemHeartbeatSendInterval, systemHeartbeatReceiveInterval);
+ if (getVirtualHost() != null) {
+ headers.setHost(getVirtualHost());
+ }
Message> connectMessage = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
super.connect(connectMessage);
}
diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java
index ccd6b4bc54d..0dcd4c78dd9 100644
--- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java
+++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java
@@ -113,6 +113,8 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
}
}
+ // test "host" header (virtualHost property) when TCP client is behind interface and configurable
+
@Test
public void publishSubscribe() throws Exception {