Browse Source

Polish

pull/364/merge
Rossen Stoyanchev 13 years ago
parent
commit
469aaa8754
  1. 71
      spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java

71
spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java

@ -70,7 +70,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -70,7 +70,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
private TcpClient<Message<byte[]>, Message<byte[]>> tcpClient;
private final Map<String, RelaySession> relaySessions = new ConcurrentHashMap<String, RelaySession>();
private final Map<String, StompRelaySession> relaySessions = new ConcurrentHashMap<String, StompRelaySession>();
/**
@ -158,7 +158,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -158,7 +158,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
if (logger.isDebugEnabled()) {
logger.debug("Initializing \"system\" TCP connection");
}
SystemRelaySession session = new SystemRelaySession();
SystemStompRelaySession session = new SystemStompRelaySession();
this.relaySessions.put(session.getId(), session);
session.connect();
}
@ -189,7 +189,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -189,7 +189,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
SimpMessageType messageType = headers.getMessageType();
if (SimpMessageType.MESSAGE.equals(messageType)) {
sessionId = (sessionId == null) ? SystemRelaySession.ID : sessionId;
sessionId = (sessionId == null) ? SystemStompRelaySession.ID : sessionId;
headers.setSessionId(sessionId);
command = (command == null) ? StompCommand.SEND : command;
headers.setCommandIfNotSet(command);
@ -208,12 +208,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -208,12 +208,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
try {
if (SimpMessageType.CONNECT.equals(messageType)) {
message = MessageBuilder.withPayloadAndHeaders(message.getPayload(), headers).build();
RelaySession session = new RelaySession(sessionId);
StompRelaySession session = new StompRelaySession(sessionId);
this.relaySessions.put(sessionId, session);
session.connect(message);
}
else if (SimpMessageType.DISCONNECT.equals(messageType)) {
RelaySession session = this.relaySessions.remove(sessionId);
StompRelaySession session = this.relaySessions.remove(sessionId);
if (session == null) {
if (logger.isTraceEnabled()) {
logger.trace("Session already removed, sessionId=" + sessionId);
@ -223,7 +223,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -223,7 +223,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
session.forward(message);
}
else {
RelaySession session = this.relaySessions.get(sessionId);
StompRelaySession session = this.relaySessions.get(sessionId);
if (session == null) {
logger.warn("Session id=" + sessionId + " not found. Ignoring message: " + message);
return;
@ -237,14 +237,14 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -237,14 +237,14 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
private class RelaySession {
private class StompRelaySession {
private final String sessionId;
private volatile StompConnection stompConnection = new StompConnection();
private RelaySession(String sessionId) {
private StompRelaySession(String sessionId) {
Assert.notNull(sessionId, "sessionId is required");
this.sessionId = sessionId;
}
@ -257,11 +257,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -257,11 +257,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
public void connect(final Message<?> connectMessage) {
Assert.notNull(connectMessage, "connectMessage is required");
Composable<TcpConnection<Message<byte[]>, Message<byte[]>>> promise = openTcpConnection();
Composable<TcpConnection<Message<byte[]>, Message<byte[]>>> promise = initConnection();
promise.consume(new Consumer<TcpConnection<Message<byte[]>, Message<byte[]>>>() {
@Override
public void accept(TcpConnection<Message<byte[]>, Message<byte[]>> connection) {
handleTcpConnection(connection, connectMessage);
handleConnectionReady(connection, connectMessage);
}
});
promise.when(Throwable.class, new Consumer<Throwable>() {
@ -273,11 +273,13 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -273,11 +273,13 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
});
}
protected Composable<TcpConnection<Message<byte[]>, Message<byte[]>>> openTcpConnection() {
protected Composable<TcpConnection<Message<byte[]>, Message<byte[]>>> initConnection() {
return tcpClient.open();
}
protected void handleTcpConnection(TcpConnection<Message<byte[]>, Message<byte[]>> tcpConn, final Message<?> connectMessage) {
protected void handleConnectionReady(
TcpConnection<Message<byte[]>, Message<byte[]>> tcpConn, final Message<?> connectMessage) {
this.stompConnection.setTcpConnection(tcpConn);
tcpConn.on().close(new Runnable() {
@Override
@ -294,6 +296,13 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -294,6 +296,13 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
forwardInternal(tcpConn, connectMessage);
}
protected void connectionClosed() {
relaySessions.remove(this.sessionId);
if (this.stompConnection.isReady()) {
sendError("Lost connection to the broker");
}
}
private void readStompFrame(Message<byte[]> message) {
if (logger.isTraceEnabled()) {
logger.trace("Reading message " + message);
@ -340,40 +349,33 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -340,40 +349,33 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
public void forward(Message<?> message) {
if (!this.stompConnection.isReady()) {
logger.warn("Message sent to relay before it was CONNECTED. Discarding message: " + message);
return;
}
forwardInternal(message);
}
private boolean forwardInternal(final Message<?> message) {
TcpConnection<Message<byte[]>, Message<byte[]>> tcpConnection = this.stompConnection.getReadyConnection();
if (tcpConnection == null) {
return false;
logger.warn("Connection to STOMP broker is not active, discarding message: " + message);
return;
}
return forwardInternal(tcpConnection, message);
forwardInternal(tcpConnection, message);
}
@SuppressWarnings("unchecked")
private boolean forwardInternal(TcpConnection<Message<byte[]>, Message<byte[]>> tcpConnection, final Message<?> message) {
private boolean forwardInternal(
TcpConnection<Message<byte[]>, Message<byte[]>> tcpConnection, Message<?> message) {
Assert.isInstanceOf(byte[].class, message.getPayload(), "Message's payload must be a byte[]");
@SuppressWarnings("unchecked")
Message<byte[]> byteMessage = (Message<byte[]>) message;
if (logger.isTraceEnabled()) {
logger.trace("Forwarding to STOMP broker, message: " + message);
}
StompCommand command = StompHeaderAccessor.wrap(message).getCommand();
if (command == StompCommand.DISCONNECT) {
this.stompConnection.setDisconnected();
}
final Deferred<Boolean, Promise<Boolean>> deferred = new DeferredPromiseSpec<Boolean>().get();
tcpConnection.send((Message<byte[]>)message, new Consumer<Boolean>() {
tcpConnection.send(byteMessage, new Consumer<Boolean>() {
@Override
public void accept(Boolean success) {
deferred.accept(success);
@ -398,13 +400,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -398,13 +400,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
return (success != null) ? success : false;
}
protected void connectionClosed() {
relaySessions.remove(this.sessionId);
if (this.stompConnection.isReady()) {
sendError("Lost connection to the broker");
}
}
}
private static class StompConnection {
@ -452,7 +447,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -452,7 +447,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
}
private class SystemRelaySession extends RelaySession {
private class SystemStompRelaySession extends StompRelaySession {
private static final long HEARTBEAT_RECEIVE_MULTIPLIER = 3;
@ -465,7 +460,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -465,7 +460,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
private final byte[] heartbeatPayload = new byte[] {'\n'};
public SystemRelaySession() {
public SystemStompRelaySession() {
super(ID);
}
@ -480,7 +475,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -480,7 +475,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
@Override
protected Composable<TcpConnection<Message<byte[]>, Message<byte[]>>> openTcpConnection() {
protected Composable<TcpConnection<Message<byte[]>, Message<byte[]>>> initConnection() {
return tcpClient.open(new Reconnect() {
@Override
public Tuple2<InetSocketAddress, Long> reconnect(InetSocketAddress address, int attempt) {

Loading…
Cancel
Save