From 7c3749769a8d136eee5e3b01e156b9a5eac3ec49 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Mon, 14 Oct 2013 16:35:16 -0400 Subject: [PATCH] Add STOMP broker relay to configure "host" header Issue: SPR-10955 --- .../stomp/StompBrokerRelayMessageHandler.java | 65 +++++++++++++++++-- ...erRelayMessageHandlerIntegrationTests.java | 2 + 2 files changed, 60 insertions(+), 7 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index 08da6d15a8a..7c4017d3b8a 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -47,9 +47,24 @@ import reactor.tuple.Tuple2; /** - * A {@link MessageHandler} that handles messages by forwarding them to a STOMP broker and - * reversely sends any returned messages from the broker to the provided - * {@link MessageChannel}. + * A {@link MessageHandler} that handles messages by forwarding them to a STOMP broker. + * For each new {@link SimpMessageType#CONNECT CONNECT} message, an independent TCP + * connection to the broker is opened and used exclusively for all messages from the + * client that originated the CONNECT message. Messages from the same client are + * identified through the session id message header. Reversely, when the STOMP broker + * sends messages back on the TCP connection, those messages are enriched with the session + * id of the client and sent back downstream through the {@link MessageChannel} provided + * to the constructor. + *

+ * This class also automatically opens a default "system" TCP connection to the message + * broker that is used for sending messages that originate from the server application (as + * opposed to from a client). Such messages are recognized because they are not associated + * with any client and therefore do not have a session id header. The "system" connection + * is effectively shared and cannot be used to receive messages. Several properties are + * provided to configure the "system" session including the the + * {@link #setSystemLogin(String) login} {@link #setSystemPasscode(String) passcode}, + * heartbeat {@link #setSystemHeartbeatSendInterval(long) send} and + * {@link #setSystemHeartbeatReceiveInterval(long) receive} intervals. * * @author Rossen Stoyanchev * @author Andy Wilkinson @@ -71,6 +86,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler private long systemHeartbeatReceiveInterval = 10000; + private String virtualHost; + private Environment environment; private TcpClient, Message> tcpClient; @@ -120,11 +137,13 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } /** - * Set the interval, in milliseconds, at which the "system" relay session will, - * in the absence of any other data being sent, send a heartbeat to the STOMP broker. - * A value of zero will prevent heartbeats from being sent to the broker. + * Set the interval, in milliseconds, at which the "system" relay session will, in the + * absence of any other data being sent, send a heartbeat to the STOMP broker. A value + * of zero will prevent heartbeats from being sent to the broker. *

* The default value is 10000. + *

+ * See class-level documentation for more information on the "system" session. */ public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) { this.systemHeartbeatSendInterval = systemHeartbeatSendInterval; @@ -145,6 +164,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler * heartbeats from the broker. *

* The default value is 10000. + *

+ * See class-level documentation for more information on the "system" session. */ public void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) { this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval; @@ -161,6 +182,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler /** * Set the login for the "system" relay session used to send messages to the STOMP * broker without having a client session (e.g. REST/HTTP request handling method). + *

+ * See class-level documentation for more information on the "system" session. */ public void setSystemLogin(String systemLogin) { Assert.hasText(systemLogin, "systemLogin must not be empty"); @@ -177,6 +200,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler /** * Set the passcode for the "system" relay session used to send messages to the STOMP * broker without having a client session (e.g. REST/HTTP request handling method). + *

+ * See class-level documentation for more information on the "system" session. */ public void setSystemPasscode(String systemPasscode) { this.systemPasscode = systemPasscode; @@ -189,6 +214,26 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler return this.systemPasscode; } + /** + * Set the value of the "host" header to use in STOMP CONNECT frames. When this + * property is configured, a "host" header will be added to every STOMP frame sent to + * the STOMP broker. This may be useful for example in a cloud environment where the + * actual host to which the TCP connection is established is different from the host + * providing the cloud-based STOMP service. + *

+ * By default this property is not set. + */ + public void setVirtualHost(String virtualHost) { + this.virtualHost = virtualHost; + } + + /** + * @return the configured virtual host value. + */ + public String getVirtualHost() { + return this.virtualHost; + } + @Override protected void startInternal() { @@ -252,7 +297,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } if (SimpMessageType.CONNECT.equals(messageType)) { - message = MessageBuilder.withPayload(message.getPayload()).setHeaders(headers).build(); + if (getVirtualHost() != null) { + headers.setHost(getVirtualHost()); + message = MessageBuilder.withPayload(message.getPayload()).setHeaders(headers).build(); + } StompRelaySession session = new StompRelaySession(sessionId); this.relaySessions.put(sessionId, session); session.connect(message); @@ -516,6 +564,9 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler headers.setLogin(systemLogin); headers.setPasscode(systemPasscode); headers.setHeartbeat(systemHeartbeatSendInterval, systemHeartbeatReceiveInterval); + if (getVirtualHost() != null) { + headers.setHost(getVirtualHost()); + } Message connectMessage = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build(); super.connect(connectMessage); } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java index ccd6b4bc54d..0dcd4c78dd9 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java @@ -113,6 +113,8 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { } } + // test "host" header (virtualHost property) when TCP client is behind interface and configurable + @Test public void publishSubscribe() throws Exception {