@ -25,6 +25,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.LinkedBlockingQueue ;
import java.util.concurrent.LinkedBlockingQueue ;
import org.springframework.context.SmartLifecycle ;
import org.springframework.http.MediaType ;
import org.springframework.http.MediaType ;
import org.springframework.messaging.Message ;
import org.springframework.messaging.Message ;
import org.springframework.messaging.MessageChannel ;
import org.springframework.messaging.MessageChannel ;
@ -54,7 +55,10 @@ import reactor.tcp.netty.NettyTcpClient;
* @since 4 . 0
* @since 4 . 0
* /
* /
@SuppressWarnings ( "rawtypes" )
@SuppressWarnings ( "rawtypes" )
public class StompRelayPubSubMessageHandler < M extends Message > extends AbstractPubSubMessageHandler < M > {
public class StompRelayPubSubMessageHandler < M extends Message > extends AbstractPubSubMessageHandler < M >
implements SmartLifecycle {
private static final String STOMP_RELAY_SYSTEM_SESSION_ID = "stompRelaySystemSessionId" ;
private MessageChannel < M > clientChannel ;
private MessageChannel < M > clientChannel ;
@ -62,19 +66,56 @@ public class StompRelayPubSubMessageHandler<M extends Message> extends AbstractP
private MessageConverter payloadConverter ;
private MessageConverter payloadConverter ;
private final TcpClient < String , String > tcpClient ;
private TcpClient < String , String > tcpClient ;
private final Map < String , RelaySession > relaySessions = new ConcurrentHashMap < String , RelaySession > ( ) ;
private final Map < String , RelaySession > relaySessions = new ConcurrentHashMap < String , RelaySession > ( ) ;
private Object lifecycleMonitor = new Object ( ) ;
private boolean running = false ;
/ * *
/ * *
* @param clientChannel a channel for sending messages from the remote message broker
* @param clientChannel a channel for sending messages from the remote message broker
* back to clients
* back to clients
* /
* /
public StompRelayPubSubMessageHandler ( PubSubChannelRegistry < M , ? > registry ) {
public StompRelayPubSubMessageHandler ( PubSubChannelRegistry < M , ? > registry ) {
Assert . notNull ( registry , "registry is required" ) ;
Assert . notNull ( registry , "registry is required" ) ;
this . clientChannel = registry . getClientOutputChannel ( ) ;
this . clientChannel = registry . getClientOutputChannel ( ) ;
this . payloadConverter = new CompositeMessageConverter ( null ) ;
}
public void setMessageConverters ( List < MessageConverter > converters ) {
this . payloadConverter = new CompositeMessageConverter ( converters ) ;
}
@Override
protected Collection < MessageType > getSupportedMessageTypes ( ) {
return null ;
}
@Override
public boolean isAutoStartup ( ) {
return true ;
}
@Override
public int getPhase ( ) {
return Integer . MAX_VALUE ;
}
@Override
public boolean isRunning ( ) {
synchronized ( this . lifecycleMonitor ) {
return this . running ;
}
}
@Override
public void start ( ) {
synchronized ( this . lifecycleMonitor ) {
// TODO: make this configurable
this . tcpClient = new TcpClient . Spec < String , String > ( NettyTcpClient . class )
this . tcpClient = new TcpClient . Spec < String , String > ( NettyTcpClient . class )
. using ( new Environment ( ) )
. using ( new Environment ( ) )
@ -82,16 +123,40 @@ public class StompRelayPubSubMessageHandler<M extends Message> extends AbstractP
. connect ( "127.0.0.1" , 61613 )
. connect ( "127.0.0.1" , 61613 )
. get ( ) ;
. get ( ) ;
this . payloadConverter = new CompositeMessageConverter ( null ) ;
StompHeaderAccessor headers = StompHeaderAccessor . create ( StompCommand . CONNECT ) ;
headers . setAcceptVersion ( "1.1,1.2" ) ;
headers . setLogin ( "guest" ) ;
headers . setPasscode ( "guest" ) ;
headers . setHeartbeat ( 0 , 0 ) ;
@SuppressWarnings ( "unchecked" )
M message = ( M ) MessageBuilder . withPayload ( new byte [ 0 ] ) . copyHeaders ( headers . toStompMessageHeaders ( ) ) . build ( ) ;
RelaySession session = new RelaySession ( message , headers ) {
@Override
protected void sendMessageToClient ( M message ) {
// TODO: check for ERROR frame (reconnect?)
}
}
} ;
this . relaySessions . put ( STOMP_RELAY_SYSTEM_SESSION_ID , session ) ;
public void setMessageConverters ( List < MessageConverter > converters ) {
this . running = true ;
this . payloadConverter = new CompositeMessageConverter ( converters ) ;
}
}
}
@Override
@Override
protected Collection < MessageType > getSupportedMessageTypes ( ) {
public void stop ( ) {
return null ;
synchronized ( this . lifecycleMonitor ) {
this . running = false ;
this . tcpClient . close ( ) ;
}
}
@Override
public void stop ( Runnable callback ) {
synchronized ( this . lifecycleMonitor ) {
stop ( ) ;
callback . run ( ) ;
}
}
}
@Override
@Override
@ -146,11 +211,20 @@ public class StompRelayPubSubMessageHandler<M extends Message> extends AbstractP
StompHeaderAccessor headers = StompHeaderAccessor . wrap ( message ) ;
StompHeaderAccessor headers = StompHeaderAccessor . wrap ( message ) ;
headers . setStompCommandIfNotSet ( command ) ;
headers . setStompCommandIfNotSet ( command ) ;
if ( headers . getSessionId ( ) = = null & & ( StompCommand . SEND . equals ( command ) ) ) {
}
String sessionId = headers . getSessionId ( ) ;
String sessionId = headers . getSessionId ( ) ;
if ( sessionId = = null ) {
if ( sessionId = = null ) {
if ( StompCommand . SEND . equals ( command ) ) {
sessionId = STOMP_RELAY_SYSTEM_SESSION_ID ;
}
else {
logger . error ( "No sessionId in message " + message ) ;
logger . error ( "No sessionId in message " + message ) ;
return ;
return ;
}
}
}
RelaySession session = this . relaySessions . get ( sessionId ) ;
RelaySession session = this . relaySessions . get ( sessionId ) ;
if ( session = = null ) {
if ( session = = null ) {
@ -163,7 +237,7 @@ public class StompRelayPubSubMessageHandler<M extends Message> extends AbstractP
}
}
private final class RelaySession {
private class RelaySession {
private final String sessionId ;
private final String sessionId ;
@ -236,6 +310,10 @@ public class StompRelayPubSubMessageHandler<M extends Message> extends AbstractP
}
}
relaySessions . remove ( this . sessionId ) ;
relaySessions . remove ( this . sessionId ) ;
}
}
sendMessageToClient ( message ) ;
}
protected void sendMessageToClient ( M message ) {
clientChannel . send ( message ) ;
clientChannel . send ( message ) ;
}
}
@ -245,7 +323,7 @@ public class StompRelayPubSubMessageHandler<M extends Message> extends AbstractP
headers . setMessage ( errorText ) ;
headers . setMessage ( errorText ) ;
@SuppressWarnings ( "unchecked" )
@SuppressWarnings ( "unchecked" )
M errorMessage = ( M ) MessageBuilder . withPayload ( new byte [ 0 ] ) . copyHeaders ( headers . toHeaders ( ) ) . build ( ) ;
M errorMessage = ( M ) MessageBuilder . withPayload ( new byte [ 0 ] ) . copyHeaders ( headers . toHeaders ( ) ) . build ( ) ;
clientChannel . send ( errorMessage ) ;
sendMessageToClient ( errorMessage ) ;
}
}
public void forward ( M message , StompHeaderAccessor headers ) {
public void forward ( M message , StompHeaderAccessor headers ) {
@ -309,4 +387,5 @@ public class StompRelayPubSubMessageHandler<M extends Message> extends AbstractP
return true ;
return true ;
}
}
}
}
}
}