diff --git a/build.gradle b/build.gradle index 300a5423a86..68f90ace0c2 100644 --- a/build.gradle +++ b/build.gradle @@ -323,6 +323,7 @@ project("spring-messaging") { optional("com.lmax:disruptor:3.1.1") testCompile("commons-dbcp:commons-dbcp:1.2.2") testCompile("javax.inject:javax.inject-tck:1") + testCompile(project(":spring-test")) } repositories { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BrokerAvailabilityEvent.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BrokerAvailabilityEvent.java new file mode 100644 index 00000000000..aec5e0533af --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BrokerAvailabilityEvent.java @@ -0,0 +1,33 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.stomp; + +import org.springframework.context.ApplicationEvent; + + +/** + * Base class for application events relating to broker availability. + * + * @author Andy Wilkinson + */ +public abstract class BrokerAvailabilityEvent extends ApplicationEvent { + + + protected BrokerAvailabilityEvent(Object source) { + super(source); + } +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BrokerBecameAvailableEvent.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BrokerBecameAvailableEvent.java new file mode 100644 index 00000000000..34b88985089 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BrokerBecameAvailableEvent.java @@ -0,0 +1,38 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.stomp; + +/** + * Event raised when the broker being used by a {@link + * StompBrokerRelayMessageHandler} becomes available + * + * @author Andy Wilkinson + */ +public class BrokerBecameAvailableEvent extends BrokerAvailabilityEvent { + + /** + * Creates a new BrokerBecameAvailableEvent + * + * @param source the {@code StompBrokerRelayMessageHandler} that is acting + * as a relay for the broker that has become available. Must not be {@code + * null}. + */ + public BrokerBecameAvailableEvent(StompBrokerRelayMessageHandler source) { + super(source); + } + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BrokerBecameUnavailableEvent.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BrokerBecameUnavailableEvent.java new file mode 100644 index 00000000000..c8f2cf839b5 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BrokerBecameUnavailableEvent.java @@ -0,0 +1,39 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.stomp; + + +/** + * Event raised when the broker being used by a {@link + * StompBrokerRelayMessageHandler} becomes unavailable + * + * @author Andy Wilkinson + */ +public class BrokerBecameUnavailableEvent extends BrokerAvailabilityEvent { + + /** + * Creates a new BrokerBecameUnavailableEvent + * + * @param source the {@code StompBrokerRelayMessageHandler} that is acting + * as a relay for the broker that has become unavailable. Must not be {@code + * null}. + */ + public BrokerBecameUnavailableEvent(StompBrokerRelayMessageHandler source) { + super(source); + } + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index cb4b498063d..55d239c9ba8 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -24,9 +24,13 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.context.SmartLifecycle; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; @@ -51,9 +55,11 @@ import reactor.tcp.spec.TcpClientSpec; /** * @author Rossen Stoyanchev + * @author Andy Wilkinson + * * @since 4.0 */ -public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLifecycle { +public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLifecycle, ApplicationEventPublisherAware { private static final Log logger = LogFactory.getLog(StompBrokerRelayMessageHandler.class); @@ -63,6 +69,8 @@ public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLife private final String[] destinationPrefixes; + private ApplicationEventPublisher applicationEventPublisher; + private String relayHost = "127.0.0.1"; private int relayPort = 61613; @@ -83,6 +91,7 @@ public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLife private boolean running = false; + private AtomicBoolean brokerAvailable = new AtomicBoolean(false); /** * @param messageChannel the channel to send messages from the STOMP broker to @@ -197,6 +206,12 @@ public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLife } } + @Override + public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) + throws BeansException { + this.applicationEventPublisher = applicationEventPublisher; + } + /** * Open a "system" session for sending messages from parts of the application * not associated with a client STOMP session. @@ -342,6 +357,18 @@ public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLife return false; } + private void brokerAvailable() { + if (this.brokerAvailable.compareAndSet(false, true)) { + this.applicationEventPublisher.publishEvent(new BrokerBecameAvailableEvent(this)); + } + } + + private void brokerUnavailable() { + if (this.brokerAvailable.compareAndSet(true, false)) { + this.applicationEventPublisher.publishEvent(new BrokerBecameUnavailableEvent(this)); + } + } + private class RelaySession { @@ -356,7 +383,7 @@ public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLife private final Object monitor = new Object(); - public RelaySession(String sessionId) { + private RelaySession(String sessionId) { Assert.notNull(sessionId, "sessionId is required"); this.sessionId = sessionId; } @@ -404,6 +431,7 @@ public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLife if (StompCommand.CONNECTED == headers.getCommand()) { synchronized(this.monitor) { this.isConnected = true; + brokerAvailable(); flushMessages(this.promise.get()); } return; @@ -419,6 +447,8 @@ public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLife } private void sendError(String sessionId, String errorText) { + brokerUnavailable(); + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.ERROR); headers.setSessionId(sessionId); headers.setMessage(errorText); diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java index fd00093ed58..92eaa501bcd 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java @@ -17,20 +17,34 @@ package org.springframework.messaging.simp.stomp; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationListener; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.channel.ExecutorSubscribableChannel; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.annotation.DirtiesContext.ClassMode; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import org.springframework.util.SocketUtils; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** @@ -38,20 +52,29 @@ import static org.junit.Assert.*; * * @author Andy Wilkinson */ +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes = {StompBrokerRelayMessageHandlerIntegrationTests.TestConfiguration.class}) +@DirtiesContext(classMode=ClassMode.AFTER_EACH_TEST_METHOD) public class StompBrokerRelayMessageHandlerIntegrationTests { - private final SubscribableChannel messageChannel = new ExecutorSubscribableChannel(); + @Autowired + private SubscribableChannel messageChannel; - private final StompBrokerRelayMessageHandler relay = - new StompBrokerRelayMessageHandler(messageChannel, Arrays.asList("/queue/", "/topic/")); + @Autowired + private StompBrokerRelayMessageHandler relay; + + @Autowired + private TestStompBroker stompBroker; + + @Autowired + private ApplicationContext applicationContext; + + @Autowired + private BrokerAvailabilityListener brokerAvailabilityListener; @Test public void basicPublishAndSubscribe() throws IOException, InterruptedException { - int port = SocketUtils.findAvailableTcpPort(); - - TestStompBroker stompBroker = new TestStompBroker(port); - stompBroker.start(); String client1SessionId = "abc123"; String client2SessionId = "def456"; @@ -70,9 +93,6 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { }); - relay.setRelayPort(port); - relay.start(); - relay.handleMessage(createConnectMessage(client1SessionId)); relay.handleMessage(createConnectMessage(client2SessionId)); relay.handleMessage(createSubscribeMessage(client1SessionId, "/topic/test")); @@ -83,17 +103,13 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { assertTrue(messageLatch.await(30, TimeUnit.SECONDS)); - this.relay.stop(); - stompBroker.stop(); + assertEquals(1, brokerAvailabilityListener.availabilityEvents.size()); + assertTrue(brokerAvailabilityListener.availabilityEvents.get(0) instanceof BrokerBecameAvailableEvent); } @Test public void whenConnectFailsDueToTheBrokerBeingUnavailableAnErrorFrameIsSentToTheClient() throws IOException, InterruptedException { - int port = SocketUtils.findAvailableTcpPort(); - - TestStompBroker stompBroker = new TestStompBroker(port); - stompBroker.start(); String sessionId = "abc123"; @@ -111,25 +127,25 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { }); - relay.setRelayPort(port); - relay.start(); - stompBroker.awaitMessages(1); + assertEquals(1, brokerAvailabilityListener.availabilityEvents.size()); + assertTrue(brokerAvailabilityListener.availabilityEvents.get(0) instanceof BrokerBecameAvailableEvent); + stompBroker.stop(); relay.handleMessage(createConnectMessage(sessionId)); errorLatch.await(30, TimeUnit.SECONDS); + + assertEquals(2, brokerAvailabilityListener.availabilityEvents.size()); + assertTrue(brokerAvailabilityListener.availabilityEvents.get(0) instanceof BrokerBecameAvailableEvent); + assertTrue(brokerAvailabilityListener.availabilityEvents.get(1) instanceof BrokerBecameUnavailableEvent); } @Test public void whenSendFailsDueToTheBrokerBeingUnavailableAnErrorFrameIsSentToTheClient() throws IOException, InterruptedException { - int port = SocketUtils.findAvailableTcpPort(); - - TestStompBroker stompBroker = new TestStompBroker(port); - stompBroker.start(); String sessionId = "abc123"; @@ -147,18 +163,22 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { }); - relay.setRelayPort(port); - relay.start(); - relay.handleMessage(createConnectMessage(sessionId)); stompBroker.awaitMessages(2); + assertEquals(1, brokerAvailabilityListener.availabilityEvents.size()); + assertTrue(brokerAvailabilityListener.availabilityEvents.get(0) instanceof BrokerBecameAvailableEvent); + stompBroker.stop(); relay.handleMessage(createSubscribeMessage(sessionId, "/topic/test/")); errorLatch.await(30, TimeUnit.SECONDS); + + assertEquals(2, brokerAvailabilityListener.availabilityEvents.size()); + assertTrue(brokerAvailabilityListener.availabilityEvents.get(0) instanceof BrokerBecameAvailableEvent); + assertTrue(brokerAvailabilityListener.availabilityEvents.get(1) instanceof BrokerBecameUnavailableEvent); } private Message createConnectMessage(String sessionId) { @@ -183,4 +203,44 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { return MessageBuilder.withPayloadAndHeaders(payload.getBytes(), headers).build(); } + + + @Configuration + public static class TestConfiguration { + + @Bean + public MessageChannel messageChannel() { + return new ExecutorSubscribableChannel(); + } + + @Bean + public StompBrokerRelayMessageHandler relay() { + StompBrokerRelayMessageHandler relay = + new StompBrokerRelayMessageHandler(messageChannel(), Arrays.asList("/queue/", "/topic/")); + relay.setRelayPort(SocketUtils.findAvailableTcpPort()); + return relay; + } + + @Bean + public TestStompBroker broker() throws IOException { + TestStompBroker broker = new TestStompBroker(relay().getRelayPort()); + return broker; + } + + @Bean + public BrokerAvailabilityListener availabilityListener() { + return new BrokerAvailabilityListener(); + } + } + + + private static class BrokerAvailabilityListener implements ApplicationListener { + + private final List availabilityEvents = new ArrayList(); + + @Override + public void onApplicationEvent(BrokerAvailabilityEvent event) { + this.availabilityEvents.add(event); + } + } } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/TestStompBroker.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/TestStompBroker.java index fe5d7462dd5..f5f648ced53 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/TestStompBroker.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/TestStompBroker.java @@ -25,8 +25,10 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import org.springframework.context.SmartLifecycle; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.StringUtils; import reactor.core.Environment; import reactor.function.Consumer; @@ -40,7 +42,7 @@ import reactor.tcp.spec.TcpServerSpec; /** * @author Andy Wilkinson */ -class TestStompBroker { +class TestStompBroker implements SmartLifecycle { private final StompMessageConverter messageConverter = new StompMessageConverter(); @@ -60,11 +62,13 @@ class TestStompBroker { private volatile TcpServer tcpServer; + private volatile boolean running; + TestStompBroker(int port) { this.port = port; } - public void start() throws IOException { + public void start() { this.environment = new Environment(); this.tcpServer = new TcpServerSpec(NettyTcpServer.class) @@ -78,7 +82,9 @@ class TestStompBroker { connection.consume(new Consumer() { @Override public void accept(String stompFrame) { - handleMessage(messageConverter.toMessage(stompFrame), connection); + if (!StringUtils.isEmpty(stompFrame)) { + handleMessage(messageConverter.toMessage(stompFrame), connection); + } } }); } @@ -86,10 +92,16 @@ class TestStompBroker { .get(); this.tcpServer.start(); + this.running = true; } - public void stop() throws IOException, InterruptedException { - this.tcpServer.shutdown().await(); + public void stop() { + try { + this.tcpServer.shutdown().await(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + this.running = false; } private void handleMessage(Message message, TcpConnection connection) { @@ -158,4 +170,25 @@ class TestStompBroker { } } + + @Override + public boolean isRunning() { + return this.running; + } + + @Override + public int getPhase() { + return Integer.MIN_VALUE; + } + + @Override + public boolean isAutoStartup() { + return true; + } + + @Override + public void stop(Runnable callback) { + this.stop(); + callback.run(); + } }