@ -292,10 +292,36 @@ public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLife
@@ -292,10 +292,36 @@ public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLife
try {
if ( ( destination = = null ) | | supportsDestination ( destination ) ) {
if ( logger . isTraceEnabled ( ) ) {
logger . trace ( "Processing message: " + message ) ;
}
handleInternal ( message , messageType , sessionId ) ;
if ( SimpMessageType . CONNECT . equals ( messageType ) ) {
headers . setHeartbeat ( 0 , 0 ) ; // TODO: disable for now
message = MessageBuilder . withPayloadAndHeaders ( message . getPayload ( ) , headers ) . build ( ) ;
RelaySession session = new RelaySession ( sessionId ) ;
this . relaySessions . put ( sessionId , session ) ;
session . open ( message ) ;
}
else if ( SimpMessageType . DISCONNECT . equals ( messageType ) ) {
RelaySession session = this . relaySessions . remove ( sessionId ) ;
if ( session = = null ) {
if ( logger . isTraceEnabled ( ) ) {
logger . trace ( "Session already removed, sessionId=" + sessionId ) ;
}
return ;
}
session . forward ( message ) ;
}
else {
RelaySession session = this . relaySessions . get ( sessionId ) ;
if ( session = = null ) {
logger . warn ( "Session id=" + sessionId + " not found. Ignoring message: " + message ) ;
return ;
}
session . forward ( message ) ;
}
}
}
catch ( Throwable t ) {
@ -312,32 +338,6 @@ public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLife
@@ -312,32 +338,6 @@ public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLife
return false ;
}
protected void handleInternal ( Message < ? > message , SimpMessageType messageType , String sessionId ) {
if ( SimpMessageType . CONNECT . equals ( messageType ) ) {
RelaySession session = new RelaySession ( sessionId ) ;
this . relaySessions . put ( sessionId , session ) ;
session . open ( message ) ;
}
else if ( SimpMessageType . DISCONNECT . equals ( messageType ) ) {
RelaySession session = this . relaySessions . remove ( sessionId ) ;
if ( session = = null ) {
if ( logger . isTraceEnabled ( ) ) {
logger . trace ( "Session already removed, sessionId=" + sessionId ) ;
}
return ;
}
session . forward ( message ) ;
}
else {
RelaySession session = this . relaySessions . get ( sessionId ) ;
if ( session = = null ) {
logger . warn ( "Session id=" + sessionId + " not found. Ignoring message: " + message ) ;
return ;
}
session . forward ( message ) ;
}
}
private class RelaySession {