diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BrokerAvailabilityEvent.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/BrokerAvailabilityEvent.java similarity index 50% rename from spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BrokerAvailabilityEvent.java rename to spring-messaging/src/main/java/org/springframework/messaging/simp/BrokerAvailabilityEvent.java index aec5e0533af..e2df82f77a4 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BrokerAvailabilityEvent.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/BrokerAvailabilityEvent.java @@ -14,20 +14,35 @@ * limitations under the License. */ -package org.springframework.messaging.simp.stomp; +package org.springframework.messaging.simp; import org.springframework.context.ApplicationEvent; /** - * Base class for application events relating to broker availability. + * Event raised when a broker's availabilty changes * * @author Andy Wilkinson */ -public abstract class BrokerAvailabilityEvent extends ApplicationEvent { +public class BrokerAvailabilityEvent extends ApplicationEvent { + private final boolean brokerAvailable; - protected BrokerAvailabilityEvent(Object source) { + /** + * Creates a new {@code BrokerAvailabilityEvent}. + * + * @param brokerAvailable {@code true} if the broker is available, {@code} + * false otherwise + * @param source the component that is acting as the broker, or as a relay + * for an external broker, that has changed availability. Must not be {@code + * null}. + */ + public BrokerAvailabilityEvent(boolean brokerAvailable, Object source) { super(source); + this.brokerAvailable = brokerAvailable; + } + + public boolean isBrokerAvailable() { + return this.brokerAvailable; } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleBrokerMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleBrokerMessageHandler.java index dbbcc0fa90e..cddc9ecc2c9 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleBrokerMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleBrokerMessageHandler.java @@ -20,10 +20,14 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationEventPublisherAware; +import org.springframework.context.SmartLifecycle; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; +import org.springframework.messaging.simp.BrokerAvailabilityEvent; import org.springframework.messaging.simp.SimpMessageHeaderAccessor; import org.springframework.messaging.simp.SimpMessageType; import org.springframework.messaging.support.MessageBuilder; @@ -36,7 +40,8 @@ import org.springframework.util.MultiValueMap; * @author Rossen Stoyanchev * @since 4.0 */ -public class SimpleBrokerMessageHandler implements MessageHandler { +public class SimpleBrokerMessageHandler implements MessageHandler, ApplicationEventPublisherAware, + SmartLifecycle { private static final Log logger = LogFactory.getLog(SimpleBrokerMessageHandler.class); @@ -46,6 +51,10 @@ public class SimpleBrokerMessageHandler implements MessageHandler { private SubscriptionRegistry subscriptionRegistry = new DefaultSubscriptionRegistry(); + private ApplicationEventPublisher eventPublisher; + + private volatile boolean running = false; + /** * @param messageChannel the channel to broadcast messages to @@ -73,6 +82,10 @@ public class SimpleBrokerMessageHandler implements MessageHandler { return this.subscriptionRegistry; } + public void setApplicationEventPublisher(ApplicationEventPublisher eventPublisher) { + this.eventPublisher = eventPublisher; + } + @Override public void handleMessage(Message message) throws MessagingException { @@ -142,4 +155,38 @@ public class SimpleBrokerMessageHandler implements MessageHandler { } } } + + @Override + public void start() { + this.eventPublisher.publishEvent(new BrokerAvailabilityEvent(true, this)); + this.running = true; + } + + @Override + public void stop() { + this.running = false; + this.eventPublisher.publishEvent(new BrokerAvailabilityEvent(false, this)); + } + + @Override + public boolean isRunning() { + return this.running; + } + + @Override + public int getPhase() { + return 0; + } + + @Override + public boolean isAutoStartup() { + return true; + } + + @Override + public void stop(Runnable callback) { + callback.run(); + this.stop(); + } + } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BrokerBecameAvailableEvent.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BrokerBecameAvailableEvent.java deleted file mode 100644 index 34b88985089..00000000000 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BrokerBecameAvailableEvent.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2002-2013 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.messaging.simp.stomp; - -/** - * Event raised when the broker being used by a {@link - * StompBrokerRelayMessageHandler} becomes available - * - * @author Andy Wilkinson - */ -public class BrokerBecameAvailableEvent extends BrokerAvailabilityEvent { - - /** - * Creates a new BrokerBecameAvailableEvent - * - * @param source the {@code StompBrokerRelayMessageHandler} that is acting - * as a relay for the broker that has become available. Must not be {@code - * null}. - */ - public BrokerBecameAvailableEvent(StompBrokerRelayMessageHandler source) { - super(source); - } - -} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BrokerBecameUnavailableEvent.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BrokerBecameUnavailableEvent.java deleted file mode 100644 index c8f2cf839b5..00000000000 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BrokerBecameUnavailableEvent.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2002-2013 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.messaging.simp.stomp; - - -/** - * Event raised when the broker being used by a {@link - * StompBrokerRelayMessageHandler} becomes unavailable - * - * @author Andy Wilkinson - */ -public class BrokerBecameUnavailableEvent extends BrokerAvailabilityEvent { - - /** - * Creates a new BrokerBecameUnavailableEvent - * - * @param source the {@code StompBrokerRelayMessageHandler} that is acting - * as a relay for the broker that has become unavailable. Must not be {@code - * null}. - */ - public BrokerBecameUnavailableEvent(StompBrokerRelayMessageHandler source) { - super(source); - } - -} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index 2ad569dc72a..09e2e388eef 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -36,6 +36,7 @@ import org.springframework.context.SmartLifecycle; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.simp.BrokerAvailabilityEvent; import org.springframework.messaging.simp.SimpMessageType; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; @@ -376,13 +377,13 @@ public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLife private void brokerAvailable() { if (this.brokerAvailable.compareAndSet(false, true)) { - this.applicationEventPublisher.publishEvent(new BrokerBecameAvailableEvent(this)); + this.applicationEventPublisher.publishEvent(new BrokerAvailabilityEvent(true, this)); } } private void brokerUnavailable() { if (this.brokerAvailable.compareAndSet(true, false)) { - this.applicationEventPublisher.publishEvent(new BrokerBecameUnavailableEvent(this)); + this.applicationEventPublisher.publishEvent(new BrokerAvailabilityEvent(false, this)); } } @@ -482,8 +483,12 @@ public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLife private void disconnect() { this.isConnected = false; - this.connection.close(); - this.connection = null; + + TcpConnection localConnection = this.connection; + if (localConnection != null) { + localConnection.close(); + this.connection = null; + } brokerUnavailable(); } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java index de26562c40e..e5df5b89eab 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java @@ -35,6 +35,7 @@ import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.messaging.SubscribableChannel; +import org.springframework.messaging.simp.BrokerAvailabilityEvent; import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.channel.ExecutorSubscribableChannel; import org.springframework.test.annotation.DirtiesContext; @@ -45,6 +46,7 @@ import org.springframework.util.SocketUtils; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; /** @@ -104,7 +106,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { assertTrue(messageLatch.await(30, TimeUnit.SECONDS)); List availabilityEvents = this.brokerAvailabilityListener.awaitAvailabilityEvents(1); - assertTrue(availabilityEvents.get(0) instanceof BrokerBecameAvailableEvent); + assertTrue(availabilityEvents.get(0).isBrokerAvailable()); } @Test @@ -130,7 +132,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { this.stompBroker.awaitMessages(1); List availabilityEvents = this.brokerAvailabilityListener.awaitAvailabilityEvents(1); - assertTrue(availabilityEvents.get(0) instanceof BrokerBecameAvailableEvent); + assertTrue(availabilityEvents.get(0).isBrokerAvailable()); this.stompBroker.stop(); @@ -139,8 +141,8 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { errorLatch.await(30, TimeUnit.SECONDS); availabilityEvents = brokerAvailabilityListener.awaitAvailabilityEvents(2); - assertTrue(availabilityEvents.get(0) instanceof BrokerBecameAvailableEvent); - assertTrue(availabilityEvents.get(1) instanceof BrokerBecameUnavailableEvent); + assertTrue(availabilityEvents.get(0).isBrokerAvailable()); + assertFalse(availabilityEvents.get(1).isBrokerAvailable()); } @Test @@ -168,7 +170,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { this.stompBroker.awaitMessages(2); List availabilityEvents = this.brokerAvailabilityListener.awaitAvailabilityEvents(1); - assertTrue(availabilityEvents.get(0) instanceof BrokerBecameAvailableEvent); + assertTrue(availabilityEvents.get(0).isBrokerAvailable()); this.stompBroker.stop(); @@ -177,14 +179,14 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { errorLatch.await(30, TimeUnit.SECONDS); availabilityEvents = this.brokerAvailabilityListener.awaitAvailabilityEvents(1); - assertTrue(availabilityEvents.get(0) instanceof BrokerBecameAvailableEvent); - assertTrue(availabilityEvents.get(1) instanceof BrokerBecameUnavailableEvent); + assertTrue(availabilityEvents.get(0).isBrokerAvailable()); + assertFalse(availabilityEvents.get(1).isBrokerAvailable()); } @Test public void relayReconnectsIfTheBrokerComesBackUp() throws InterruptedException { List availabilityEvents = this.brokerAvailabilityListener.awaitAvailabilityEvents(1); - assertTrue(availabilityEvents.get(0) instanceof BrokerBecameAvailableEvent); + assertTrue(availabilityEvents.get(0).isBrokerAvailable()); List> messages = this.stompBroker.awaitMessages(1); assertEquals(1, messages.size()); @@ -195,7 +197,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { this.relay.handleMessage(createSendMessage(null, "/topic/test", "test")); availabilityEvents = this.brokerAvailabilityListener.awaitAvailabilityEvents(2); - assertTrue(availabilityEvents.get(1) instanceof BrokerBecameUnavailableEvent); + assertFalse(availabilityEvents.get(1).isBrokerAvailable()); this.relay.handleMessage(createSendMessage(null, "/topic/test", "test-again")); @@ -207,7 +209,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { assertStompCommandAndPayload(messages.get(2), StompCommand.SEND, "test-again"); availabilityEvents = this.brokerAvailabilityListener.awaitAvailabilityEvents(3); - assertTrue(availabilityEvents.get(2) instanceof BrokerBecameAvailableEvent); + assertTrue(availabilityEvents.get(2).isBrokerAvailable()); } private Message createConnectMessage(String sessionId) {