@ -55,21 +55,21 @@ import org.springframework.util.concurrent.ListenableFutureTask;
@@ -55,21 +55,21 @@ import org.springframework.util.concurrent.ListenableFutureTask;
* connection to the broker is opened and used exclusively for all messages from the
* client that originated the CONNECT message . Messages from the same client are
* identified through the session id message header . Reversely , when the STOMP broker
* sends messages back on the TCP connection , those messages are enriched with the session
* id of the client and sent back downstream through the { @link MessageChannel } provided
* to the constructor .
* sends messages back on the TCP connection , those messages are enriched with the
* session id of the client and sent back downstream through the { @link MessageChannel }
* provided to the constructor .
*
* < p > This class also automatically opens a default "system" TCP connection to the message
* broker that is used for sending messages that originate from the server application ( as
* opposed to from a client ) . Such messages are not associated with any client and
* therefore do not have a session id header . The "system" connection is effectively
* shared and cannot be used to receive messages . Several properties are provided to
* configure the "system" connection including :
* < p > This class also automatically opens a default "system" TCP connection to the
* message broker that is used for sending messages that originate from the server
* application ( as opposed to from a client ) . Such messages are not associated with
* any client and therefore do not have a session id header . The "system" connection
* is effectively shared and cannot be used to receive messages . Several properties
* are provided to configure the "system" connection including :
* < ul >
* < li > { @link # setSystemLogin ( String ) } < / li >
* < li > { @link # setSystemPasscode ( String ) } < / li >
* < li > { @link # setSystemHeartbeatSendInterval ( long ) } < / li >
* < li > { @link # setSystemHeartbeatReceiveInterval ( long ) } < / li >
* < li > { @link # setSystemLogin } < / li >
* < li > { @link # setSystemPasscode } < / li >
* < li > { @link # setSystemHeartbeatSendInterval } < / li >
* < li > { @link # setSystemHeartbeatReceiveInterval } < / li >
* < / ul >
*
* @author Rossen Stoyanchev
@ -80,22 +80,20 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@@ -80,22 +80,20 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
public static final String SYSTEM_SESSION_ID = "_system_" ;
private static final byte [ ] EMPTY_PAYLOAD = new byte [ 0 ] ;
private static final ListenableFutureTask < Void > EMPTY_TASK = new ListenableFutureTask < > ( new VoidCallable ( ) ) ;
// STOMP recommends error of margin for receiving heartbeats
private static final long HEARTBEAT_MULTIPLIER = 3 ;
private static final Message < byte [ ] > HEARTBEAT_MESSAGE ;
/ * *
* A heartbeat is setup once a CONNECTED frame is received which contains
* the heartbeat settings we need . If we don ' t receive CONNECTED within
* a minute , the connection is closed proactively .
* A heartbeat is setup once a CONNECTED frame is received which contains the heartbeat settings
* we need . If we don ' t receive CONNECTED within a minute , the connection is closed proactively .
* /
private static final int MAX_TIME_TO_CONNECTED_FRAME = 60 * 1000 ;
private static final byte [ ] EMPTY_PAYLOAD = new byte [ 0 ] ;
private static final ListenableFutureTask < Void > EMPTY_TASK = new ListenableFutureTask < > ( new VoidCallable ( ) ) ;
private static final Message < byte [ ] > HEARTBEAT_MESSAGE ;
static {
@ -121,19 +119,18 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@@ -121,19 +119,18 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
private long systemHeartbeatReceiveInterval = 10000 ;
private String virtualHost ;
private final Map < String , MessageHandler > systemSubscriptions = new HashMap < > ( 4 ) ;
private String virtualHost ;
private TcpOperations < byte [ ] > tcpClient ;
private MessageHeaderInitializer headerInitializer ;
private final Map < String , StompConnectionHandler > connectionHandlers =
new ConcurrentHashMap < > ( ) ;
private final Stats stats = new Stats ( ) ;
private final Map < String , StompConnectionHandler > connectionHandlers = new ConcurrentHashMap < > ( ) ;
/ * *
* Create a StompBrokerRelayMessageHandler instance with the given message channels
@ -179,46 +176,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@@ -179,46 +176,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
public int getRelayPort ( ) {
return this . relayPort ;
}
/ * *
* Set the interval , in milliseconds , at which the "system" connection will , in the
* absence of any other data being sent , send a heartbeat to the STOMP broker . A value
* of zero will prevent heartbeats from being sent to the broker .
* < p > The default value is 10000 .
* < p > See class - level documentation for more information on the "system" connection .
* /
public void setSystemHeartbeatSendInterval ( long systemHeartbeatSendInterval ) {
this . systemHeartbeatSendInterval = systemHeartbeatSendInterval ;
}
/ * *
* Return the interval , in milliseconds , at which the "system" connection will
* send heartbeats to the STOMP broker .
* /
public long getSystemHeartbeatSendInterval ( ) {
return this . systemHeartbeatSendInterval ;
}
/ * *
* Set the maximum interval , in milliseconds , at which the "system" connection
* expects , in the absence of any other data , to receive a heartbeat from the STOMP
* broker . A value of zero will configure the connection to expect not to receive
* heartbeats from the broker .
* < p > The default value is 10000 .
* < p > See class - level documentation for more information on the "system" connection .
* /
public void setSystemHeartbeatReceiveInterval ( long heartbeatReceiveInterval ) {
this . systemHeartbeatReceiveInterval = heartbeatReceiveInterval ;
}
/ * *
* Return the interval , in milliseconds , at which the "system" connection expects
* to receive heartbeats from the STOMP broker .
* /
public long getSystemHeartbeatReceiveInterval ( ) {
return this . systemHeartbeatReceiveInterval ;
}
/ * *
* Set the login to use when creating connections to the STOMP broker on
* behalf of connected clients .
@ -294,6 +251,46 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@@ -294,6 +251,46 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
return this . systemPasscode ;
}
/ * *
* Set the interval , in milliseconds , at which the "system" connection will , in the
* absence of any other data being sent , send a heartbeat to the STOMP broker . A value
* of zero will prevent heartbeats from being sent to the broker .
* < p > The default value is 10000 .
* < p > See class - level documentation for more information on the "system" connection .
* /
public void setSystemHeartbeatSendInterval ( long systemHeartbeatSendInterval ) {
this . systemHeartbeatSendInterval = systemHeartbeatSendInterval ;
}
/ * *
* Return the interval , in milliseconds , at which the "system" connection will
* send heartbeats to the STOMP broker .
* /
public long getSystemHeartbeatSendInterval ( ) {
return this . systemHeartbeatSendInterval ;
}
/ * *
* Set the maximum interval , in milliseconds , at which the "system" connection
* expects , in the absence of any other data , to receive a heartbeat from the STOMP
* broker . A value of zero will configure the connection to expect not to receive
* heartbeats from the broker .
* < p > The default value is 10000 .
* < p > See class - level documentation for more information on the "system" connection .
* /
public void setSystemHeartbeatReceiveInterval ( long heartbeatReceiveInterval ) {
this . systemHeartbeatReceiveInterval = heartbeatReceiveInterval ;
}
/ * *
* Return the interval , in milliseconds , at which the "system" connection expects
* to receive heartbeats from the STOMP broker .
* /
public long getSystemHeartbeatReceiveInterval ( ) {
return this . systemHeartbeatReceiveInterval ;
}
/ * *
* Configure one more destinations to subscribe to on the shared "system"
* connection along with MessageHandler ' s to handle received messages .
@ -336,28 +333,21 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@@ -336,28 +333,21 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
/ * *
* Configure a TCP client for managing TCP connections to the STOMP broker .
* By default { @link ReactorNettyTcpClient } is used .
* < p > By default { @link ReactorNettyTcpClient } is used .
* /
public void setTcpClient ( TcpOperations < byte [ ] > tcpClient ) {
this . tcpClient = tcpClient ;
}
/ * *
* Get the configured TCP client . N ever { @code null } unless not configured
* Get the configured TCP client ( n ever { @code null } unless not configured
* invoked and this method is invoked before the handler is started and
* hence a default implementation initialized .
* hence a default implementation initialized ) .
* /
public TcpOperations < byte [ ] > getTcpClient ( ) {
return this . tcpClient ;
}
/ * *
* Return the current count of TCP connection to the broker .
* /
public int getConnectionCount ( ) {
return this . connectionHandlers . size ( ) ;
}
/ * *
* Configure a { @link MessageHeaderInitializer } to apply to the headers of all
* messages created through the { @code StompBrokerRelayMessageHandler } that
@ -382,6 +372,13 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@@ -382,6 +372,13 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
return this . stats . toString ( ) ;
}
/ * *
* Return the current count of TCP connection to the broker .
* /
public int getConnectionCount ( ) {
return this . connectionHandlers . size ( ) ;
}
@Override
protected void startInternal ( ) {
@ -872,6 +869,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@@ -872,6 +869,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
}
private class SystemStompConnectionHandler extends StompConnectionHandler {
public SystemStompConnectionHandler ( StompHeaderAccessor connectHeaders ) {
@ -971,10 +969,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@@ -971,10 +969,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
}
private static class VoidCallable implements Callable < Void > {
@Override
public Void call ( ) throws Exception {
public Void call ( ) {
return null ;
}
}
@ -1001,10 +1000,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@@ -1001,10 +1000,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
public String toString ( ) {
return connectionHandlers . size ( ) + " sessions, " + relayHost + ":" + relayPort +
return ( connectionHandlers . size ( ) + " sessions, " + relayHost + ":" + relayPort +
( isBrokerAvailable ( ) ? " (available)" : " (not available)" ) +
", processed CONNECT(" + this . connect . get ( ) + ")-CONNECTED(" +
this . connected . get ( ) + ")-DISCONNECT(" + this . disconnect . get ( ) + ")" ;
this . connected . get ( ) + ")-DISCONNECT(" + this . disconnect . get ( ) + ")" ) ;
}
}