|
|
|
@ -55,21 +55,21 @@ import org.springframework.util.concurrent.ListenableFutureTask; |
|
|
|
* connection to the broker is opened and used exclusively for all messages from the |
|
|
|
* connection to the broker is opened and used exclusively for all messages from the |
|
|
|
* client that originated the CONNECT message. Messages from the same client are |
|
|
|
* client that originated the CONNECT message. Messages from the same client are |
|
|
|
* identified through the session id message header. Reversely, when the STOMP broker |
|
|
|
* identified through the session id message header. Reversely, when the STOMP broker |
|
|
|
* sends messages back on the TCP connection, those messages are enriched with the session |
|
|
|
* sends messages back on the TCP connection, those messages are enriched with the |
|
|
|
* id of the client and sent back downstream through the {@link MessageChannel} provided |
|
|
|
* session id of the client and sent back downstream through the {@link MessageChannel} |
|
|
|
* to the constructor. |
|
|
|
* provided to the constructor. |
|
|
|
* |
|
|
|
* |
|
|
|
* <p>This class also automatically opens a default "system" TCP connection to the message |
|
|
|
* <p>This class also automatically opens a default "system" TCP connection to the |
|
|
|
* broker that is used for sending messages that originate from the server application (as |
|
|
|
* message broker that is used for sending messages that originate from the server |
|
|
|
* opposed to from a client). Such messages are not associated with any client and |
|
|
|
* application (as opposed to from a client). Such messages are not associated with |
|
|
|
* therefore do not have a session id header. The "system" connection is effectively |
|
|
|
* any client and therefore do not have a session id header. The "system" connection |
|
|
|
* shared and cannot be used to receive messages. Several properties are provided to |
|
|
|
* is effectively shared and cannot be used to receive messages. Several properties |
|
|
|
* configure the "system" connection including: |
|
|
|
* are provided to configure the "system" connection including: |
|
|
|
* <ul> |
|
|
|
* <ul> |
|
|
|
* <li>{@link #setSystemLogin(String)}</li> |
|
|
|
* <li>{@link #setSystemLogin}</li> |
|
|
|
* <li>{@link #setSystemPasscode(String)}</li> |
|
|
|
* <li>{@link #setSystemPasscode}</li> |
|
|
|
* <li>{@link #setSystemHeartbeatSendInterval(long)}</li> |
|
|
|
* <li>{@link #setSystemHeartbeatSendInterval}</li> |
|
|
|
* <li>{@link #setSystemHeartbeatReceiveInterval(long)}</li> |
|
|
|
* <li>{@link #setSystemHeartbeatReceiveInterval}</li> |
|
|
|
* </ul> |
|
|
|
* </ul> |
|
|
|
* |
|
|
|
* |
|
|
|
* @author Rossen Stoyanchev |
|
|
|
* @author Rossen Stoyanchev |
|
|
|
@ -80,22 +80,20 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler |
|
|
|
|
|
|
|
|
|
|
|
public static final String SYSTEM_SESSION_ID = "_system_"; |
|
|
|
public static final String SYSTEM_SESSION_ID = "_system_"; |
|
|
|
|
|
|
|
|
|
|
|
private static final byte[] EMPTY_PAYLOAD = new byte[0]; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private static final ListenableFutureTask<Void> EMPTY_TASK = new ListenableFutureTask<>(new VoidCallable()); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// STOMP recommends error of margin for receiving heartbeats
|
|
|
|
// STOMP recommends error of margin for receiving heartbeats
|
|
|
|
private static final long HEARTBEAT_MULTIPLIER = 3; |
|
|
|
private static final long HEARTBEAT_MULTIPLIER = 3; |
|
|
|
|
|
|
|
|
|
|
|
private static final Message<byte[]> HEARTBEAT_MESSAGE; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
* A heartbeat is setup once a CONNECTED frame is received which contains |
|
|
|
* A heartbeat is setup once a CONNECTED frame is received which contains the heartbeat settings |
|
|
|
* the heartbeat settings we need. If we don't receive CONNECTED within |
|
|
|
* we need. If we don't receive CONNECTED within a minute, the connection is closed proactively. |
|
|
|
* a minute, the connection is closed proactively. |
|
|
|
|
|
|
|
*/ |
|
|
|
*/ |
|
|
|
private static final int MAX_TIME_TO_CONNECTED_FRAME = 60 * 1000; |
|
|
|
private static final int MAX_TIME_TO_CONNECTED_FRAME = 60 * 1000; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private static final byte[] EMPTY_PAYLOAD = new byte[0]; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private static final ListenableFutureTask<Void> EMPTY_TASK = new ListenableFutureTask<>(new VoidCallable()); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private static final Message<byte[]> HEARTBEAT_MESSAGE; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static { |
|
|
|
static { |
|
|
|
@ -121,19 +119,18 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler |
|
|
|
|
|
|
|
|
|
|
|
private long systemHeartbeatReceiveInterval = 10000; |
|
|
|
private long systemHeartbeatReceiveInterval = 10000; |
|
|
|
|
|
|
|
|
|
|
|
private String virtualHost; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private final Map<String, MessageHandler> systemSubscriptions = new HashMap<>(4); |
|
|
|
private final Map<String, MessageHandler> systemSubscriptions = new HashMap<>(4); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private String virtualHost; |
|
|
|
|
|
|
|
|
|
|
|
private TcpOperations<byte[]> tcpClient; |
|
|
|
private TcpOperations<byte[]> tcpClient; |
|
|
|
|
|
|
|
|
|
|
|
private MessageHeaderInitializer headerInitializer; |
|
|
|
private MessageHeaderInitializer headerInitializer; |
|
|
|
|
|
|
|
|
|
|
|
private final Map<String, StompConnectionHandler> connectionHandlers = |
|
|
|
|
|
|
|
new ConcurrentHashMap<>(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private final Stats stats = new Stats(); |
|
|
|
private final Stats stats = new Stats(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private final Map<String, StompConnectionHandler> connectionHandlers = new ConcurrentHashMap<>(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
* Create a StompBrokerRelayMessageHandler instance with the given message channels |
|
|
|
* Create a StompBrokerRelayMessageHandler instance with the given message channels |
|
|
|
@ -179,46 +176,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler |
|
|
|
public int getRelayPort() { |
|
|
|
public int getRelayPort() { |
|
|
|
return this.relayPort; |
|
|
|
return this.relayPort; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
|
|
* Set the interval, in milliseconds, at which the "system" connection will, in the |
|
|
|
|
|
|
|
* absence of any other data being sent, send a heartbeat to the STOMP broker. A value |
|
|
|
|
|
|
|
* of zero will prevent heartbeats from being sent to the broker. |
|
|
|
|
|
|
|
* <p>The default value is 10000. |
|
|
|
|
|
|
|
* <p>See class-level documentation for more information on the "system" connection. |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) { |
|
|
|
|
|
|
|
this.systemHeartbeatSendInterval = systemHeartbeatSendInterval; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
|
|
* Return the interval, in milliseconds, at which the "system" connection will |
|
|
|
|
|
|
|
* send heartbeats to the STOMP broker. |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
public long getSystemHeartbeatSendInterval() { |
|
|
|
|
|
|
|
return this.systemHeartbeatSendInterval; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
|
|
* Set the maximum interval, in milliseconds, at which the "system" connection |
|
|
|
|
|
|
|
* expects, in the absence of any other data, to receive a heartbeat from the STOMP |
|
|
|
|
|
|
|
* broker. A value of zero will configure the connection to expect not to receive |
|
|
|
|
|
|
|
* heartbeats from the broker. |
|
|
|
|
|
|
|
* <p>The default value is 10000. |
|
|
|
|
|
|
|
* <p>See class-level documentation for more information on the "system" connection. |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
public void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) { |
|
|
|
|
|
|
|
this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
|
|
* Return the interval, in milliseconds, at which the "system" connection expects |
|
|
|
|
|
|
|
* to receive heartbeats from the STOMP broker. |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
public long getSystemHeartbeatReceiveInterval() { |
|
|
|
|
|
|
|
return this.systemHeartbeatReceiveInterval; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
* Set the login to use when creating connections to the STOMP broker on |
|
|
|
* Set the login to use when creating connections to the STOMP broker on |
|
|
|
* behalf of connected clients. |
|
|
|
* behalf of connected clients. |
|
|
|
@ -294,6 +251,46 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler |
|
|
|
return this.systemPasscode; |
|
|
|
return this.systemPasscode; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
|
|
* Set the interval, in milliseconds, at which the "system" connection will, in the |
|
|
|
|
|
|
|
* absence of any other data being sent, send a heartbeat to the STOMP broker. A value |
|
|
|
|
|
|
|
* of zero will prevent heartbeats from being sent to the broker. |
|
|
|
|
|
|
|
* <p>The default value is 10000. |
|
|
|
|
|
|
|
* <p>See class-level documentation for more information on the "system" connection. |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) { |
|
|
|
|
|
|
|
this.systemHeartbeatSendInterval = systemHeartbeatSendInterval; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
|
|
* Return the interval, in milliseconds, at which the "system" connection will |
|
|
|
|
|
|
|
* send heartbeats to the STOMP broker. |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
public long getSystemHeartbeatSendInterval() { |
|
|
|
|
|
|
|
return this.systemHeartbeatSendInterval; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
|
|
* Set the maximum interval, in milliseconds, at which the "system" connection |
|
|
|
|
|
|
|
* expects, in the absence of any other data, to receive a heartbeat from the STOMP |
|
|
|
|
|
|
|
* broker. A value of zero will configure the connection to expect not to receive |
|
|
|
|
|
|
|
* heartbeats from the broker. |
|
|
|
|
|
|
|
* <p>The default value is 10000. |
|
|
|
|
|
|
|
* <p>See class-level documentation for more information on the "system" connection. |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
public void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) { |
|
|
|
|
|
|
|
this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
|
|
* Return the interval, in milliseconds, at which the "system" connection expects |
|
|
|
|
|
|
|
* to receive heartbeats from the STOMP broker. |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
public long getSystemHeartbeatReceiveInterval() { |
|
|
|
|
|
|
|
return this.systemHeartbeatReceiveInterval; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
* Configure one more destinations to subscribe to on the shared "system" |
|
|
|
* Configure one more destinations to subscribe to on the shared "system" |
|
|
|
* connection along with MessageHandler's to handle received messages. |
|
|
|
* connection along with MessageHandler's to handle received messages. |
|
|
|
@ -336,28 +333,21 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
* Configure a TCP client for managing TCP connections to the STOMP broker. |
|
|
|
* Configure a TCP client for managing TCP connections to the STOMP broker. |
|
|
|
* By default {@link ReactorNettyTcpClient} is used. |
|
|
|
* <p>By default {@link ReactorNettyTcpClient} is used. |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
public void setTcpClient(TcpOperations<byte[]> tcpClient) { |
|
|
|
public void setTcpClient(TcpOperations<byte[]> tcpClient) { |
|
|
|
this.tcpClient = tcpClient; |
|
|
|
this.tcpClient = tcpClient; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
* Get the configured TCP client. Never {@code null} unless not configured |
|
|
|
* Get the configured TCP client (never {@code null} unless not configured |
|
|
|
* invoked and this method is invoked before the handler is started and |
|
|
|
* invoked and this method is invoked before the handler is started and |
|
|
|
* hence a default implementation initialized. |
|
|
|
* hence a default implementation initialized). |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
public TcpOperations<byte[]> getTcpClient() { |
|
|
|
public TcpOperations<byte[]> getTcpClient() { |
|
|
|
return this.tcpClient; |
|
|
|
return this.tcpClient; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
|
|
* Return the current count of TCP connection to the broker. |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
public int getConnectionCount() { |
|
|
|
|
|
|
|
return this.connectionHandlers.size(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
* Configure a {@link MessageHeaderInitializer} to apply to the headers of all |
|
|
|
* Configure a {@link MessageHeaderInitializer} to apply to the headers of all |
|
|
|
* messages created through the {@code StompBrokerRelayMessageHandler} that |
|
|
|
* messages created through the {@code StompBrokerRelayMessageHandler} that |
|
|
|
@ -382,6 +372,13 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler |
|
|
|
return this.stats.toString(); |
|
|
|
return this.stats.toString(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
|
|
* Return the current count of TCP connection to the broker. |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
public int getConnectionCount() { |
|
|
|
|
|
|
|
return this.connectionHandlers.size(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
protected void startInternal() { |
|
|
|
protected void startInternal() { |
|
|
|
@ -872,6 +869,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private class SystemStompConnectionHandler extends StompConnectionHandler { |
|
|
|
private class SystemStompConnectionHandler extends StompConnectionHandler { |
|
|
|
|
|
|
|
|
|
|
|
public SystemStompConnectionHandler(StompHeaderAccessor connectHeaders) { |
|
|
|
public SystemStompConnectionHandler(StompHeaderAccessor connectHeaders) { |
|
|
|
@ -971,10 +969,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private static class VoidCallable implements Callable<Void> { |
|
|
|
private static class VoidCallable implements Callable<Void> { |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public Void call() throws Exception { |
|
|
|
public Void call() { |
|
|
|
return null; |
|
|
|
return null; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
@ -1001,10 +1000,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public String toString() { |
|
|
|
public String toString() { |
|
|
|
return connectionHandlers.size() + " sessions, " + relayHost + ":" + relayPort + |
|
|
|
return (connectionHandlers.size() + " sessions, " + relayHost + ":" + relayPort + |
|
|
|
(isBrokerAvailable() ? " (available)" : " (not available)") + |
|
|
|
(isBrokerAvailable() ? " (available)" : " (not available)") + |
|
|
|
", processed CONNECT(" + this.connect.get() + ")-CONNECTED(" + |
|
|
|
", processed CONNECT(" + this.connect.get() + ")-CONNECTED(" + |
|
|
|
this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")"; |
|
|
|
this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")"); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|