@ -54,13 +54,21 @@ import reactor.tcp.netty.NettyTcpClient;
@@ -54,13 +54,21 @@ import reactor.tcp.netty.NettyTcpClient;
* @author Rossen Stoyanchev
* @since 4 . 0
* /
public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler
implements SmartLifecycle {
public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler implements SmartLifecycle {
private static final String STOMP_RELAY_SYSTEM_SESSION_ID = "stompRelaySystemSessionId" ;
private MessageChannel clientChannel ;
private String relayHost = "127.0.0.1" ;
private int relayPort = 61613 ;
private String systemLogin = "guest" ;
private String systemPasscode = "guest" ;
private final StompMessageConverter stompMessageConverter = new StompMessageConverter ( ) ;
private MessageConverter payloadConverter ;
@ -83,6 +91,67 @@ public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler
@@ -83,6 +91,67 @@ public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler
this . payloadConverter = new CompositeMessageConverter ( null ) ;
}
/ * *
* Set the STOMP message broker host .
* /
public void setRelayHost ( String relayHost ) {
Assert . hasText ( relayHost , "relayHost must not be empty" ) ;
this . relayHost = relayHost ;
}
/ * *
* @return the STOMP message broker host .
* /
public String getRelayHost ( ) {
return this . relayHost ;
}
/ * *
* Set the STOMP message broker port .
* /
public void setRelayPort ( int relayPort ) {
this . relayPort = relayPort ;
}
/ * *
* @return the STOMP message broker port .
* /
public int getRelayPort ( ) {
return this . relayPort ;
}
/ * *
* Set the login for a "system" TCP connection used to send messages to the STOMP
* broker without having a client session ( e . g . REST / HTTP request handling method ) .
* /
public void setSystemLogin ( String systemLogin ) {
Assert . hasText ( systemLogin , "systemLogin must not be empty" ) ;
this . systemLogin = systemLogin ;
}
/ * *
* @return the login for a shared , "system" connection to the STOMP message broker .
* /
public String getSystemLogin ( ) {
return this . systemLogin ;
}
/ * *
* Set the passcode for a "system" TCP connection used to send messages to the STOMP
* broker without having a client session ( e . g . REST / HTTP request handling method ) .
* /
public void setSystemPasscode ( String systemPasscode ) {
this . systemPasscode = systemPasscode ;
}
/ * *
* @return the passcode for a shared , "system" connection to the STOMP message broker .
* /
public String getSystemPasscode ( ) {
return this . systemPasscode ;
}
public void setMessageConverters ( List < MessageConverter > converters ) {
this . payloadConverter = new CompositeMessageConverter ( converters ) ;
}
@ -113,19 +182,17 @@ public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler
@@ -113,19 +182,17 @@ public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler
public void start ( ) {
synchronized ( this . lifecycleMonitor ) {
// TODO: make this configurable
this . tcpClient = new TcpClient . Spec < String , String > ( NettyTcpClient . class )
. using ( new Environment ( ) )
. codec ( new DelimitedCodec < String , String > ( ( byte ) 0 , true , StandardCodecs . STRING_CODEC ) )
. connect ( "127.0.0.1" , 61616 )
. connect ( this . relayHost , this . relayPort )
. get ( ) ;
StompHeaderAccessor headers = StompHeaderAccessor . create ( StompCommand . CONNECT ) ;
headers . setAcceptVersion ( "1.1,1.2" ) ;
headers . setLogin ( "guest" ) ;
headers . setPasscode ( "guest" ) ;
headers . setHeartbeat ( 0 , 0 ) ;
headers . setLogin ( this . systemLogin ) ;
headers . setPasscode ( this . systemPasscode ) ;
headers . setHeartbeat ( 0 , 0 ) ; // TODO
Message < ? > message = MessageBuilder . withPayload (
new byte [ 0 ] ) . copyHeaders ( headers . toNativeHeaderMap ( ) ) . build ( ) ;
@ -270,7 +337,7 @@ public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler
@@ -270,7 +337,7 @@ public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler
readStompFrame ( stompFrame ) ;
}
} ) ;
stompHeaders . setHeartbeat ( 0 , 0 ) ; // TODO
stompHeaders . setHeartbeat ( 0 , 0 ) ; // TODO
forwardInternal ( message , stompHeaders , connection ) ;
}
} ) ;