From 469aaa875492f9b86bd92b80f5f87d18233829c5 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 26 Sep 2013 16:04:31 -0400 Subject: [PATCH] Polish --- .../stomp/StompBrokerRelayMessageHandler.java | 71 +++++++++---------- 1 file changed, 33 insertions(+), 38 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index b65a032d726..beaef084538 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -70,7 +70,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler private TcpClient, Message> tcpClient; - private final Map relaySessions = new ConcurrentHashMap(); + private final Map relaySessions = new ConcurrentHashMap(); /** @@ -158,7 +158,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler if (logger.isDebugEnabled()) { logger.debug("Initializing \"system\" TCP connection"); } - SystemRelaySession session = new SystemRelaySession(); + SystemStompRelaySession session = new SystemStompRelaySession(); this.relaySessions.put(session.getId(), session); session.connect(); } @@ -189,7 +189,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler SimpMessageType messageType = headers.getMessageType(); if (SimpMessageType.MESSAGE.equals(messageType)) { - sessionId = (sessionId == null) ? SystemRelaySession.ID : sessionId; + sessionId = (sessionId == null) ? SystemStompRelaySession.ID : sessionId; headers.setSessionId(sessionId); command = (command == null) ? StompCommand.SEND : command; headers.setCommandIfNotSet(command); @@ -208,12 +208,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler try { if (SimpMessageType.CONNECT.equals(messageType)) { message = MessageBuilder.withPayloadAndHeaders(message.getPayload(), headers).build(); - RelaySession session = new RelaySession(sessionId); + StompRelaySession session = new StompRelaySession(sessionId); this.relaySessions.put(sessionId, session); session.connect(message); } else if (SimpMessageType.DISCONNECT.equals(messageType)) { - RelaySession session = this.relaySessions.remove(sessionId); + StompRelaySession session = this.relaySessions.remove(sessionId); if (session == null) { if (logger.isTraceEnabled()) { logger.trace("Session already removed, sessionId=" + sessionId); @@ -223,7 +223,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler session.forward(message); } else { - RelaySession session = this.relaySessions.get(sessionId); + StompRelaySession session = this.relaySessions.get(sessionId); if (session == null) { logger.warn("Session id=" + sessionId + " not found. Ignoring message: " + message); return; @@ -237,14 +237,14 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } - private class RelaySession { + private class StompRelaySession { private final String sessionId; private volatile StompConnection stompConnection = new StompConnection(); - private RelaySession(String sessionId) { + private StompRelaySession(String sessionId) { Assert.notNull(sessionId, "sessionId is required"); this.sessionId = sessionId; } @@ -257,11 +257,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler public void connect(final Message connectMessage) { Assert.notNull(connectMessage, "connectMessage is required"); - Composable, Message>> promise = openTcpConnection(); + Composable, Message>> promise = initConnection(); promise.consume(new Consumer, Message>>() { @Override public void accept(TcpConnection, Message> connection) { - handleTcpConnection(connection, connectMessage); + handleConnectionReady(connection, connectMessage); } }); promise.when(Throwable.class, new Consumer() { @@ -273,11 +273,13 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler }); } - protected Composable, Message>> openTcpConnection() { + protected Composable, Message>> initConnection() { return tcpClient.open(); } - protected void handleTcpConnection(TcpConnection, Message> tcpConn, final Message connectMessage) { + protected void handleConnectionReady( + TcpConnection, Message> tcpConn, final Message connectMessage) { + this.stompConnection.setTcpConnection(tcpConn); tcpConn.on().close(new Runnable() { @Override @@ -294,6 +296,13 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler forwardInternal(tcpConn, connectMessage); } + protected void connectionClosed() { + relaySessions.remove(this.sessionId); + if (this.stompConnection.isReady()) { + sendError("Lost connection to the broker"); + } + } + private void readStompFrame(Message message) { if (logger.isTraceEnabled()) { logger.trace("Reading message " + message); @@ -340,40 +349,33 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } public void forward(Message message) { - - if (!this.stompConnection.isReady()) { - logger.warn("Message sent to relay before it was CONNECTED. Discarding message: " + message); - return; - } - - forwardInternal(message); - } - - private boolean forwardInternal(final Message message) { TcpConnection, Message> tcpConnection = this.stompConnection.getReadyConnection(); if (tcpConnection == null) { - return false; + logger.warn("Connection to STOMP broker is not active, discarding message: " + message); + return; } - return forwardInternal(tcpConnection, message); + forwardInternal(tcpConnection, message); } - @SuppressWarnings("unchecked") - private boolean forwardInternal(TcpConnection, Message> tcpConnection, final Message message) { + private boolean forwardInternal( + TcpConnection, Message> tcpConnection, Message message) { Assert.isInstanceOf(byte[].class, message.getPayload(), "Message's payload must be a byte[]"); + @SuppressWarnings("unchecked") + Message byteMessage = (Message) message; + if (logger.isTraceEnabled()) { logger.trace("Forwarding to STOMP broker, message: " + message); } StompCommand command = StompHeaderAccessor.wrap(message).getCommand(); - if (command == StompCommand.DISCONNECT) { this.stompConnection.setDisconnected(); } final Deferred> deferred = new DeferredPromiseSpec().get(); - tcpConnection.send((Message)message, new Consumer() { + tcpConnection.send(byteMessage, new Consumer() { @Override public void accept(Boolean success) { deferred.accept(success); @@ -398,13 +400,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } return (success != null) ? success : false; } - - protected void connectionClosed() { - relaySessions.remove(this.sessionId); - if (this.stompConnection.isReady()) { - sendError("Lost connection to the broker"); - } - } } private static class StompConnection { @@ -452,7 +447,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } } - private class SystemRelaySession extends RelaySession { + private class SystemStompRelaySession extends StompRelaySession { private static final long HEARTBEAT_RECEIVE_MULTIPLIER = 3; @@ -465,7 +460,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler private final byte[] heartbeatPayload = new byte[] {'\n'}; - public SystemRelaySession() { + public SystemStompRelaySession() { super(ID); } @@ -480,7 +475,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } @Override - protected Composable, Message>> openTcpConnection() { + protected Composable, Message>> initConnection() { return tcpClient.open(new Reconnect() { @Override public Tuple2 reconnect(InetSocketAddress address, int attempt) {