Browse Source

Add ActiveMQ-based STOMP relay integration tests

pull/339/head
Rossen Stoyanchev 13 years ago
parent
commit
94fefec0f9
  1. 1
      .gitignore
  2. 12
      build.gradle
  3. 76
      spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java
  4. 7
      spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompCommand.java
  5. 28
      spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompHeaderAccessor.java
  6. 589
      spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java
  7. 193
      spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/TestStompBroker.java
  8. 4
      spring-messaging/src/test/resources/log4j.xml

1
.gitignore vendored

@ -15,6 +15,7 @@ spring-test/test-output/ @@ -15,6 +15,7 @@ spring-test/test-output/
.gradle
argfile*
pom.xml
activemq-data/
/build
buildSrc/build

12
build.gradle

@ -321,10 +321,18 @@ project("spring-messaging") { @@ -321,10 +321,18 @@ project("spring-messaging") {
optional("org.projectreactor:reactor-core:1.0.0.BUILD-SNAPSHOT")
optional("org.projectreactor:reactor-tcp:1.0.0.BUILD-SNAPSHOT")
optional("com.lmax:disruptor:3.1.1")
testCompile(project(":spring-test"))
testCompile("com.thoughtworks.xstream:xstream:1.4.4")
testCompile("commons-dbcp:commons-dbcp:1.2.2")
testCompile("javax.inject:javax.inject-tck:1")
testCompile(project(":spring-test"))
}
testCompile("org.apache.activemq:activemq-broker:5.8.0")
testCompile("org.apache.activemq:activemq-kahadb-store:5.8.0") {
exclude group: "org.springframework", module: "spring-context"
}
testCompile("org.apache.activemq:activemq-stomp:5.8.0")
testCompile("org.slf4j:slf4j-jcl:${slf4jVersion}")
testCompile("log4j:log4j:1.2.17")
}
repositories {
maven { url 'http://repo.springsource.org/libs-milestone' } // reactor

76
spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java

@ -376,13 +376,19 @@ public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLife @@ -376,13 +376,19 @@ public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLife
}
private void brokerAvailable() {
if (this.brokerAvailable.compareAndSet(false, true)) {
if ((this.applicationEventPublisher != null) && this.brokerAvailable.compareAndSet(false, true)) {
if (logger.isTraceEnabled()) {
logger.trace("Publishing BrokerAvailabilityEvent (available)");
}
this.applicationEventPublisher.publishEvent(new BrokerAvailabilityEvent(true, this));
}
}
private void brokerUnavailable() {
if (this.brokerAvailable.compareAndSet(true, false)) {
if ((this.applicationEventPublisher != null) && this.brokerAvailable.compareAndSet(true, false)) {
if (logger.isTraceEnabled()) {
logger.trace("Publishing BrokerAvailabilityEvent (unavailable)");
}
this.applicationEventPublisher.publishEvent(new BrokerAvailabilityEvent(false, this));
}
}
@ -518,52 +524,42 @@ public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLife @@ -518,52 +524,42 @@ public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLife
private boolean forwardInternal(final Message<?> message) {
TcpConnection<String, String> localConnection = this.connection;
if (localConnection == null) {
return false;
}
if (localConnection != null) {
if (logger.isTraceEnabled()) {
logger.trace("Forwarding message to STOMP broker, message id=" + message.getHeaders().getId());
}
byte[] bytes = stompMessageConverter.fromMessage(message);
final Deferred<Boolean, Promise<Boolean>> deferred = new DeferredPromiseSpec<Boolean>().get();
String payload = new String(bytes, Charset.forName("UTF-8"));
localConnection.send(payload, new Consumer<Boolean>() {
@Override
public void accept(Boolean success) {
if (!success && StompHeaderAccessor.wrap(message).getCommand() != StompCommand.DISCONNECT) {
deferred.accept(false);
} else {
deferred.accept(true);
}
}
});
Boolean success = null;
if (logger.isTraceEnabled()) {
logger.trace("Forwarding to STOMP broker, message: " + message);
}
try {
success = deferred.compose().await();
byte[] bytes = stompMessageConverter.fromMessage(message);
String payload = new String(bytes, Charset.forName("UTF-8"));
if (success == null) {
sendError(sessionId, "Timed out waiting for message to be forwarded to the broker");
}
else if (!success) {
sendError(sessionId, "Failed to forward message to the broker");
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
sendError(sessionId, "Interrupted while forwarding message to the broker");
final Deferred<Boolean, Promise<Boolean>> deferred = new DeferredPromiseSpec<Boolean>().get();
localConnection.send(payload, new Consumer<Boolean>() {
@Override
public void accept(Boolean success) {
deferred.accept(success);
}
});
Boolean success = null;
try {
success = deferred.compose().await();
if (success == null) {
success = false;
sendError(sessionId, "Timed out waiting for message to be forwarded to the broker");
}
return success;
} else {
return false;
else if (!success) {
if (StompHeaderAccessor.wrap(message).getCommand() != StompCommand.DISCONNECT) {
sendError(sessionId, "Failed to forward message to the broker");
}
}
}
catch (InterruptedException ie) {
Thread.currentThread().interrupt();
sendError(sessionId, "Interrupted while forwarding message to the broker");
}
return (success != null) ? success : false;
}
private void flushMessages() {

7
spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompCommand.java

@ -57,6 +57,9 @@ public enum StompCommand { @@ -57,6 +57,9 @@ public enum StompCommand {
private static Set<StompCommand> destinationRequiredLookup =
new HashSet<StompCommand>(Arrays.asList(SEND, SUBSCRIBE, MESSAGE));
private static Set<StompCommand> subscriptionIdRequiredLookup =
new HashSet<StompCommand>(Arrays.asList(SUBSCRIBE, UNSUBSCRIBE, MESSAGE));
static {
messageTypeLookup.put(StompCommand.CONNECT, SimpMessageType.CONNECT);
messageTypeLookup.put(StompCommand.STOMP, SimpMessageType.CONNECT);
@ -76,4 +79,8 @@ public enum StompCommand { @@ -76,4 +79,8 @@ public enum StompCommand {
return destinationRequiredLookup.contains(this);
}
public boolean requiresSubscriptionId() {
return subscriptionIdRequiredLookup.contains(this);
}
}

28
spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompHeaderAccessor.java

@ -54,7 +54,9 @@ public class StompHeaderAccessor extends SimpMessageHeaderAccessor { @@ -54,7 +54,9 @@ public class StompHeaderAccessor extends SimpMessageHeaderAccessor {
public static final String STOMP_MESSAGE_ID_HEADER = "message-id";
public static final String STOMP_RECEIPT_ID_HEADER = "receipt-id";
public static final String STOMP_RECEIPT_HEADER = "receipt"; // any client frame except CONNECT
public static final String STOMP_RECEIPT_ID_HEADER = "receipt-id"; // RECEIPT frame
public static final String STOMP_SUBSCRIPTION_HEADER = "subscription";
@ -176,20 +178,22 @@ public class StompHeaderAccessor extends SimpMessageHeaderAccessor { @@ -176,20 +178,22 @@ public class StompHeaderAccessor extends SimpMessageHeaderAccessor {
result.put(STOMP_CONTENT_TYPE_HEADER, Arrays.asList(contentType.toString()));
}
if (StompCommand.MESSAGE.equals(getCommand())) {
if (getCommand().requiresSubscriptionId()) {
String subscriptionId = getSubscriptionId();
if (subscriptionId != null) {
result.put(STOMP_SUBSCRIPTION_HEADER, Arrays.asList(subscriptionId));
String name = StompCommand.MESSAGE.equals(getCommand()) ? STOMP_SUBSCRIPTION_HEADER : STOMP_ID_HEADER;
result.put(name, Arrays.asList(subscriptionId));
}
else {
logger.warn("STOMP MESSAGE frame should have a subscription: " + this.toString());
}
if ((getMessageId() == null)) {
String messageId = getSessionId() + "-" + messageIdCounter.getAndIncrement();
result.put(STOMP_MESSAGE_ID_HEADER, Arrays.asList(messageId));
logger.warn(getCommand() + " frame should have a subscription: " + this.toString());
}
}
if (StompCommand.MESSAGE.equals(getCommand()) && ((getMessageId() == null))) {
String messageId = getSessionId() + "-" + messageIdCounter.getAndIncrement();
result.put(STOMP_MESSAGE_ID_HEADER, Arrays.asList(messageId));
}
return result;
}
@ -302,6 +306,14 @@ public class StompHeaderAccessor extends SimpMessageHeaderAccessor { @@ -302,6 +306,14 @@ public class StompHeaderAccessor extends SimpMessageHeaderAccessor {
return getFirstNativeHeader(STOMP_RECEIPT_ID_HEADER);
}
public void setReceipt(String receiptId) {
setNativeHeader(STOMP_RECEIPT_HEADER, receiptId);
}
public String getReceipt() {
return getFirstNativeHeader(STOMP_RECEIPT_HEADER);
}
public String getMessage() {
return getFirstNativeHeader(STOMP_MESSAGE_HEADER);
}

589
spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java

@ -16,285 +16,538 @@ @@ -16,285 +16,538 @@
package org.springframework.messaging.simp.stomp;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.broker.BrokerService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.simp.BrokerAvailabilityEvent;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.channel.ExecutorSubscribableChannel;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.annotation.DirtiesContext.ClassMode;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.util.SocketUtils;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import reactor.util.Assert;
import static org.junit.Assert.*;
/**
* Integration tests for {@link StompBrokerRelayMessageHandler}
*
* @author Andy Wilkinson
* @author Rossen Stoyanchev
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = {StompBrokerRelayMessageHandlerIntegrationTests.TestConfiguration.class})
@DirtiesContext(classMode=ClassMode.AFTER_EACH_TEST_METHOD)
public class StompBrokerRelayMessageHandlerIntegrationTests {
@Autowired
private SubscribableChannel messageChannel;
private static final Log logger = LogFactory.getLog(StompBrokerRelayMessageHandlerIntegrationTests.class);
private static final Charset UTF_8 = Charset.forName("UTF-8");
@Autowired
private StompBrokerRelayMessageHandler relay;
@Autowired
private TestStompBroker stompBroker;
private BrokerService activeMQBroker;
private ExecutorSubscribableChannel responseChannel;
private ExpectationMatchingMessageHandler responseHandler;
@Autowired
private ApplicationContext applicationContext;
private ExpectationMatchingEventPublisher eventPublisher;
@Autowired
private BrokerAvailabilityListener brokerAvailabilityListener;
@Before
public void setUp() throws Exception {
int port = SocketUtils.findAvailableTcpPort(61613);
this.activeMQBroker = new BrokerService();
this.activeMQBroker.addConnector("stomp://localhost:" + port);
this.activeMQBroker.setStartAsync(false);
this.activeMQBroker.setDeleteAllMessagesOnStartup(true);
this.activeMQBroker.start();
this.responseChannel = new ExecutorSubscribableChannel();
this.responseHandler = new ExpectationMatchingMessageHandler();
this.responseChannel.subscribe(this.responseHandler);
this.eventPublisher = new ExpectationMatchingEventPublisher();
this.relay = new StompBrokerRelayMessageHandler(this.responseChannel, Arrays.asList("/queue/", "/topic/"));
this.relay.setRelayPort(port);
this.relay.setApplicationEventPublisher(this.eventPublisher);
this.relay.start();
}
@After
public void tearDown() throws Exception {
try {
this.relay.stop();
}
finally {
stopBrokerAndAwait();
}
}
@Test
public void basicPublishAndSubscribe() throws IOException, InterruptedException {
public void publishSubscribe() throws Exception {
String client1SessionId = "abc123";
String client2SessionId = "def456";
String sess1 = "sess1";
MessageExchange conn1 = MessageExchangeBuilder.connect(sess1).build();
this.relay.handleMessage(conn1.message);
final CountDownLatch messageLatch = new CountDownLatch(1);
String sess2 = "sess2";
MessageExchange conn2 = MessageExchangeBuilder.connect(sess2).build();
this.relay.handleMessage(conn2.message);
this.messageChannel.subscribe(new MessageHandler() {
String subs1 = "subs1";
String destination = "/topic/test";
@Override
public void handleMessage(Message<?> message) throws MessagingException {
StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
if (headers.getCommand() == StompCommand.MESSAGE) {
messageLatch.countDown();
}
}
MessageExchange subscribe = MessageExchangeBuilder.subscribeWithReceipt(sess1, subs1, destination, "r1").build();
this.responseHandler.expect(subscribe);
});
this.relay.handleMessage(subscribe.message);
this.responseHandler.awaitAndAssert();
this.relay.handleMessage(createConnectMessage(client1SessionId));
this.relay.handleMessage(createConnectMessage(client2SessionId));
this.relay.handleMessage(createSubscribeMessage(client1SessionId, "/topic/test"));
MessageExchange send = MessageExchangeBuilder.send(destination, "foo").andExpectMessage(sess1, subs1).build();
this.responseHandler.reset();
this.responseHandler.expect(send);
this.stompBroker.awaitMessages(4);
this.relay.handleMessage(send.message);
this.responseHandler.awaitAndAssert();
}
@Test
public void brokerUnvailableErrorFrameOnConnect() throws Exception {
this.relay.handleMessage(createSendMessage(client2SessionId, "/topic/test", "fromClient2"));
stopBrokerAndAwait();
assertTrue(messageLatch.await(30, TimeUnit.SECONDS));
MessageExchange connect = MessageExchangeBuilder.connect("sess1").andExpectError().build();
this.responseHandler.expect(connect);
List<BrokerAvailabilityEvent> availabilityEvents = this.brokerAvailabilityListener.awaitAvailabilityEvents(1);
assertTrue(availabilityEvents.get(0).isBrokerAvailable());
this.relay.handleMessage(connect.message);
this.responseHandler.awaitAndAssert();
}
@Test
public void whenConnectFailsDueToTheBrokerBeingUnavailableAnErrorFrameIsSentToTheClient()
throws IOException, InterruptedException {
public void brokerUnvailableErrorFrameOnSend() throws Exception {
String sessionId = "abc123";
String sess1 = "sess1";
MessageExchange connect = MessageExchangeBuilder.connect(sess1).build();
this.relay.handleMessage(connect.message);
final CountDownLatch errorLatch = new CountDownLatch(1);
// TODO: expect CONNECTED
Thread.sleep(2000);
this.messageChannel.subscribe(new MessageHandler() {
stopBrokerAndAwait();
@Override
public void handleMessage(Message<?> message) throws MessagingException {
StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
if (headers.getCommand() == StompCommand.ERROR) {
errorLatch.countDown();
}
}
MessageExchange subscribe = MessageExchangeBuilder.subscribe(sess1, "s1", "/topic/a").andExpectError().build();
this.responseHandler.expect(subscribe);
});
this.relay.handleMessage(subscribe.message);
this.responseHandler.awaitAndAssert();
}
this.stompBroker.awaitMessages(1);
@Test
public void brokerAvailabilityEvents() throws Exception {
List<BrokerAvailabilityEvent> availabilityEvents = this.brokerAvailabilityListener.awaitAvailabilityEvents(1);
assertTrue(availabilityEvents.get(0).isBrokerAvailable());
// TODO: expect CONNECTED
Thread.sleep(2000);
this.stompBroker.stop();
this.eventPublisher.expect(true, false);
this.relay.handleMessage(createConnectMessage(sessionId));
stopBrokerAndAwait();
errorLatch.await(30, TimeUnit.SECONDS);
// TODO: remove when stop is detecteded
this.relay.handleMessage(MessageExchangeBuilder.connect("sess1").build().message);
availabilityEvents = brokerAvailabilityListener.awaitAvailabilityEvents(2);
assertTrue(availabilityEvents.get(0).isBrokerAvailable());
assertFalse(availabilityEvents.get(1).isBrokerAvailable());
this.eventPublisher.awaitAndAssert();
}
@Test
public void whenSendFailsDueToTheBrokerBeingUnavailableAnErrorFrameIsSentToTheClient()
throws IOException, InterruptedException {
public void relayReconnectsIfBrokerComesBackUp() throws Exception {
String sessionId = "abc123";
String sess1 = "sess1";
MessageExchange conn1 = MessageExchangeBuilder.connect(sess1).build();
this.relay.handleMessage(conn1.message);
final CountDownLatch errorLatch = new CountDownLatch(1);
String subs1 = "subs1";
String destination = "/topic/test";
MessageExchange subscribe = MessageExchangeBuilder.subscribeWithReceipt(sess1, subs1, destination, "r1").build();
this.responseHandler.expect(subscribe);
this.messageChannel.subscribe(new MessageHandler() {
this.relay.handleMessage(subscribe.message);
this.responseHandler.awaitAndAssert();
@Override
public void handleMessage(Message<?> message) throws MessagingException {
StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
if (headers.getCommand() == StompCommand.ERROR) {
errorLatch.countDown();
}
}
stopBrokerAndAwait();
// TODO:
// 1st message will see ERROR frame (broker shutdown is not but should be detected)
// 2nd message will be queued (a side effect of CONNECT/CONNECTED-buffering, likely to be removed)
// Finish this once the above changes are made.
/* MessageExchange send = MessageExchangeBuilder.send(destination, "foo").build();
this.responseHandler.reset();
this.relay.handleMessage(send.message);
Thread.sleep(2000);
this.activeMQBroker.start();
Thread.sleep(5000);
send = MessageExchangeBuilder.send(destination, "foo").andExpectMessage(sess1, subs1).build();
this.responseHandler.reset();
this.responseHandler.expect(send);
this.relay.handleMessage(send.message);
this.responseHandler.awaitAndAssert();
*/
}
private void stopBrokerAndAwait() throws Exception {
logger.debug("Stopping ActiveMQ broker and will await shutdown");
if (!this.activeMQBroker.isStarted()) {
logger.debug("Broker not running");
return;
}
final CountDownLatch latch = new CountDownLatch(1);
this.activeMQBroker.addShutdownHook(new Runnable() {
public void run() {
latch.countDown();
}
});
this.activeMQBroker.stop();
assertTrue("Broker did not stop", latch.await(5, TimeUnit.SECONDS));
logger.debug("Broker stopped");
}
/**
* Handles messages by matching them to expectations including a latch to wait for
* the completion of expected messages.
*/
private static class ExpectationMatchingMessageHandler implements MessageHandler {
private final List<MessageExchange> expected;
private final List<MessageExchange> actual = new CopyOnWriteArrayList<>();
this.relay.handleMessage(createConnectMessage(sessionId));
private final List<Message<?>> unexpected = new CopyOnWriteArrayList<>();
this.stompBroker.awaitMessages(2);
private CountDownLatch latch = new CountDownLatch(1);
List<BrokerAvailabilityEvent> availabilityEvents = this.brokerAvailabilityListener.awaitAvailabilityEvents(1);
assertTrue(availabilityEvents.get(0).isBrokerAvailable());
this.stompBroker.stop();
public ExpectationMatchingMessageHandler(MessageExchange... expected) {
this.expected = new CopyOnWriteArrayList<>(expected);
}
public void expect(MessageExchange... expected) {
this.expected.addAll(Arrays.asList(expected));
}
this.relay.handleMessage(createSubscribeMessage(sessionId, "/topic/test/"));
public void awaitAndAssert() throws InterruptedException {
boolean result = this.latch.await(5000, TimeUnit.MILLISECONDS);
assertTrue(getAsString(), result && this.unexpected.isEmpty());
}
public void reset() {
this.latch = new CountDownLatch(1);
this.expected.clear();
this.actual.clear();
this.unexpected.clear();
}
errorLatch.await(30, TimeUnit.SECONDS);
@Override
public void handleMessage(Message<?> message) throws MessagingException {
for (MessageExchange exch : this.expected) {
if (exch.matchMessage(message)) {
if (exch.isDone()) {
this.expected.remove(exch);
this.actual.add(exch);
if (this.expected.isEmpty()) {
this.latch.countDown();
}
}
return;
}
}
this.unexpected.add(message);
}
availabilityEvents = this.brokerAvailabilityListener.awaitAvailabilityEvents(1);
assertTrue(availabilityEvents.get(0).isBrokerAvailable());
assertFalse(availabilityEvents.get(1).isBrokerAvailable());
public String getAsString() {
StringBuilder sb = new StringBuilder("\n");
sb.append("INCOMPLETE:\n").append(this.expected).append("\n");
sb.append("COMPLETE:\n").append(this.actual).append("\n");
sb.append("UNMATCHED MESSAGES:\n").append(this.unexpected).append("\n");
return sb.toString();
}
}
@Test
public void relayReconnectsIfTheBrokerComesBackUp() throws InterruptedException {
List<BrokerAvailabilityEvent> availabilityEvents = this.brokerAvailabilityListener.awaitAvailabilityEvents(1);
assertTrue(availabilityEvents.get(0).isBrokerAvailable());
/**
* Holds a message as well as expected and actual messages matched against expectations.
*/
private static class MessageExchange {
private final Message<?> message;
List<Message<?>> messages = this.stompBroker.awaitMessages(1);
assertEquals(1, messages.size());
assertStompCommand(messages.get(0), StompCommand.CONNECT);
private final MessageMatcher[] expected;
this.stompBroker.stop();
private final Message<?>[] actual;
this.relay.handleMessage(createSendMessage(null, "/topic/test", "test"));
availabilityEvents = this.brokerAvailabilityListener.awaitAvailabilityEvents(2);
assertFalse(availabilityEvents.get(1).isBrokerAvailable());
public MessageExchange(Message<?> message, MessageMatcher... expected) {
this.message = message;
this.expected = expected;
this.actual = new Message<?>[expected.length];
}
this.relay.handleMessage(createSendMessage(null, "/topic/test", "test-again"));
this.stompBroker.start();
public boolean isDone() {
for (int i=0 ; i < actual.length; i++) {
if (actual[i] == null) {
return false;
}
}
return true;
}
messages = this.stompBroker.awaitMessages(3);
assertEquals(3, messages.size());
assertStompCommand(messages.get(1), StompCommand.CONNECT);
assertStompCommandAndPayload(messages.get(2), StompCommand.SEND, "test-again");
public boolean matchMessage(Message<?> message) {
for (int i=0 ; i < this.expected.length; i++) {
if (this.expected[i].match(message)) {
this.actual[i] = message;
return true;
}
}
return false;
}
availabilityEvents = this.brokerAvailabilityListener.awaitAvailabilityEvents(3);
assertTrue(availabilityEvents.get(2).isBrokerAvailable());
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Forwarded message:\n").append(this.message).append("\n");
sb.append("Should receive back:\n").append(Arrays.toString(this.expected)).append("\n");
sb.append("Actually received:\n").append(Arrays.toString(this.actual)).append("\n");
return sb.toString();
}
}
private Message<?> createConnectMessage(String sessionId) {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
headers.setSessionId(sessionId);
return MessageBuilder.withPayloadAndHeaders(new byte[0], headers).build();
}
private static class MessageExchangeBuilder {
private Message<?> createSubscribeMessage(String sessionId, String destination) {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
headers.setSessionId(sessionId);
headers.setDestination(destination);
headers.setNativeHeader(StompHeaderAccessor.STOMP_ID_HEADER, sessionId);
private final Message<?> message;
return MessageBuilder.withPayloadAndHeaders(new byte[0], headers).build();
private final StompHeaderAccessor headers;
private final List<MessageMatcher> expected = new ArrayList<>();
private MessageExchangeBuilder(Message<?> message) {
this.message = message;
this.headers = StompHeaderAccessor.wrap(message);
}
public static MessageExchangeBuilder connect(String sessionId) {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
headers.setSessionId(sessionId);
Message<?> message = MessageBuilder.withPayloadAndHeaders(new byte[0], headers).build();
return new MessageExchangeBuilder(message);
}
public static MessageExchangeBuilder subscribe(String sessionId, String subscriptionId, String destination) {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
headers.setSessionId(sessionId);
headers.setSubscriptionId(subscriptionId);
headers.setDestination(destination);
Message<?> message = MessageBuilder.withPayloadAndHeaders(new byte[0], headers).build();
return new MessageExchangeBuilder(message);
}
public static MessageExchangeBuilder subscribeWithReceipt(String sessionId, String subscriptionId,
String destination, String receiptId) {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
headers.setSessionId(sessionId);
headers.setSubscriptionId(subscriptionId);
headers.setDestination(destination);
headers.setReceipt(receiptId);
Message<?> message = MessageBuilder.withPayloadAndHeaders(new byte[0], headers).build();
MessageExchangeBuilder builder = new MessageExchangeBuilder(message);
builder.expected.add(new StompReceiptFrameMessageMatcher(sessionId, receiptId));
return builder;
}
public static MessageExchangeBuilder send(String destination, String payload) {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
headers.setDestination(destination);
Message<?> message = MessageBuilder.withPayloadAndHeaders(payload.getBytes(UTF_8), headers).build();
return new MessageExchangeBuilder(message);
}
public MessageExchangeBuilder andExpectMessage(String sessionId, String subscriptionId) {
Assert.isTrue(StompCommand.SEND.equals(headers.getCommand()), "MESSAGE can only be expected after SEND");
String destination = this.headers.getDestination();
Object payload = this.message.getPayload();
this.expected.add(new StompMessageFrameMessageMatcher(sessionId, subscriptionId, destination, payload));
return this;
}
public MessageExchangeBuilder andExpectError() {
String sessionId = this.headers.getSessionId();
Assert.notNull(sessionId, "No sessionId to match the ERROR frame to");
return andExpectError(sessionId);
}
public MessageExchangeBuilder andExpectError(String sessionId) {
this.expected.add(new StompFrameMessageMatcher(StompCommand.ERROR, sessionId));
return this;
}
public MessageExchange build() {
return new MessageExchange(this.message, this.expected.toArray(new MessageMatcher[this.expected.size()]));
}
}
private Message<?> createSendMessage(String sessionId, String destination, String payload) {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
headers.setSessionId(sessionId);
headers.setDestination(destination);
private static interface MessageMatcher {
boolean match(Message<?> message);
return MessageBuilder.withPayloadAndHeaders(payload.getBytes(), headers).build();
}
private void assertStompCommand(Message<?> message, StompCommand expectedCommand) {
assertEquals(expectedCommand, StompHeaderAccessor.wrap(message).getCommand());
private static class StompFrameMessageMatcher implements MessageMatcher {
private final StompCommand command;
private final String sessionId;
public StompFrameMessageMatcher(StompCommand command, String sessionId) {
this.command = command;
this.sessionId = sessionId;
}
@Override
public final boolean match(Message<?> message) {
StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
if (!this.command.equals(headers.getCommand()) || (this.sessionId != headers.getSessionId())) {
return false;
}
return matchInternal(headers, message.getPayload());
}
protected boolean matchInternal(StompHeaderAccessor headers, Object payload) {
return true;
}
@Override
public String toString() {
return "command=" + this.command + ", session=\"" + this.sessionId + "\"";
}
}
private void assertStompCommandAndPayload(Message<?> message, StompCommand expectedCommand,
String expectedPayload) {
assertStompCommand(message, expectedCommand);
assertEquals(expectedPayload, new String(((byte[])message.getPayload())));
private static class StompReceiptFrameMessageMatcher extends StompFrameMessageMatcher {
private final String receiptId;
public StompReceiptFrameMessageMatcher(String sessionId, String receipt) {
super(StompCommand.RECEIPT, sessionId);
this.receiptId = receipt;
}
@Override
protected boolean matchInternal(StompHeaderAccessor headers, Object payload) {
return (this.receiptId.equals(headers.getReceiptId()));
}
@Override
public String toString() {
return super.toString() + ", receiptId=\"" + this.receiptId + "\"";
}
}
private static class StompMessageFrameMessageMatcher extends StompFrameMessageMatcher {
@Configuration
public static class TestConfiguration {
private final String subscriptionId;
@Bean
public MessageChannel messageChannel() {
return new ExecutorSubscribableChannel();
private final String destination;
private final Object payload;
public StompMessageFrameMessageMatcher(String sessionId, String subscriptionId, String destination, Object payload) {
super(StompCommand.MESSAGE, sessionId);
this.subscriptionId = subscriptionId;
this.destination = destination;
this.payload = payload;
}
@Bean
public StompBrokerRelayMessageHandler relay() {
StompBrokerRelayMessageHandler relay =
new StompBrokerRelayMessageHandler(messageChannel(), Arrays.asList("/queue/", "/topic/"));
relay.setRelayPort(SocketUtils.findAvailableTcpPort());
return relay;
@Override
protected boolean matchInternal(StompHeaderAccessor headers, Object payload) {
if (!this.subscriptionId.equals(headers.getSubscriptionId()) || !this.destination.equals(headers.getDestination())) {
return false;
}
if (payload instanceof byte[] && this.payload instanceof byte[]) {
return Arrays.equals((byte[]) payload, (byte[]) this.payload);
}
else {
return this.payload.equals(payload);
}
}
@Bean
public TestStompBroker broker() throws IOException {
TestStompBroker broker = new TestStompBroker(relay().getRelayPort());
return broker;
@Override
public String toString() {
return super.toString() + ", subscriptionId=\"" + this.subscriptionId
+ "\", destination=\"" + this.destination + "\", payload=\"" + getPayloadAsText() + "\"";
}
@Bean
public BrokerAvailabilityListener availabilityListener() {
return new BrokerAvailabilityListener();
protected String getPayloadAsText() {
return (this.payload instanceof byte[])
? new String((byte[]) this.payload, UTF_8) : payload.toString();
}
}
private static class BrokerAvailabilityListener implements ApplicationListener<BrokerAvailabilityEvent> {
private static class ExpectationMatchingEventPublisher implements ApplicationEventPublisher {
private final List<BrokerAvailabilityEvent> availabilityEvents = new ArrayList<BrokerAvailabilityEvent>();
private final List<Boolean> expected = new CopyOnWriteArrayList<>();
private final Object monitor = new Object();
private final List<Boolean> actual = new CopyOnWriteArrayList<>();
@Override
public void onApplicationEvent(BrokerAvailabilityEvent event) {
synchronized (this.monitor) {
this.availabilityEvents.add(event);
this.monitor.notifyAll();
private CountDownLatch latch = new CountDownLatch(1);
public void expect(Boolean... expected) {
this.expected.addAll(Arrays.asList(expected));
}
public void awaitAndAssert() throws InterruptedException {
if (this.expected.size() == this.actual.size()) {
assertEquals(this.expected, this.actual);
}
else {
assertTrue("Expected=" + this.expected + ", actual=" + this.actual,
this.latch.await(5, TimeUnit.SECONDS));
}
}
private List<BrokerAvailabilityEvent> awaitAvailabilityEvents(int eventCount) throws InterruptedException {
synchronized (this.monitor) {
while (this.availabilityEvents.size() < eventCount) {
this.monitor.wait();
@Override
public void publishEvent(ApplicationEvent event) {
if (event instanceof BrokerAvailabilityEvent) {
this.actual.add(((BrokerAvailabilityEvent) event).isBrokerAvailable());
if (this.actual.size() == this.expected.size()) {
this.latch.countDown();
}
return new ArrayList<BrokerAvailabilityEvent>(this.availabilityEvents);
}
}
}
}

193
spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/TestStompBroker.java

@ -1,193 +0,0 @@ @@ -1,193 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.stomp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.springframework.context.SmartLifecycle;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.StringUtils;
import reactor.core.Environment;
import reactor.function.Consumer;
import reactor.tcp.TcpConnection;
import reactor.tcp.TcpServer;
import reactor.tcp.encoding.DelimitedCodec;
import reactor.tcp.encoding.StandardCodecs;
import reactor.tcp.netty.NettyTcpServer;
import reactor.tcp.spec.TcpServerSpec;
/**
* @author Andy Wilkinson
*/
class TestStompBroker implements SmartLifecycle {
private final StompMessageConverter messageConverter = new StompMessageConverter();
private final List<Message<?>> messages = new ArrayList<Message<?>>();
private final Object messageMonitor = new Object();
private final Object subscriberMonitor = new Object();
private final Map<String, Set<Subscription>> subscribers = new HashMap<String, Set<Subscription>>();
private final AtomicLong messageIdCounter = new AtomicLong();
private final int port;
private volatile Environment environment;
private volatile TcpServer tcpServer;
private volatile boolean running;
TestStompBroker(int port) {
this.port = port;
}
public void start() {
this.environment = new Environment();
this.tcpServer = new TcpServerSpec<String, String>(NettyTcpServer.class)
.env(this.environment)
.codec(new DelimitedCodec<String, String>((byte) 0, true, StandardCodecs.STRING_CODEC))
.listen(port)
.consume(new Consumer<TcpConnection<String, String>>() {
@Override
public void accept(final TcpConnection<String, String> connection) {
connection.consume(new Consumer<String>() {
@Override
public void accept(String stompFrame) {
if (!StringUtils.isEmpty(stompFrame)) {
handleMessage(messageConverter.toMessage(stompFrame), connection);
}
}
});
}
})
.get();
this.tcpServer.start();
this.running = true;
}
public void stop() {
try {
this.tcpServer.shutdown().await();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
this.running = false;
}
private void handleMessage(Message<?> message, TcpConnection<String, String> connection) {
StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
if (headers.getCommand() == StompCommand.CONNECT) {
StompHeaderAccessor responseHeaders = StompHeaderAccessor.create(StompCommand.CONNECTED);
MessageBuilder<byte[]> response = MessageBuilder.withPayloadAndHeaders(new byte[0], responseHeaders);
connection.send(new String(messageConverter.fromMessage(response.build())));
}
else if (headers.getCommand() == StompCommand.SUBSCRIBE) {
String destination = headers.getDestination();
synchronized (this.subscriberMonitor) {
Set<Subscription> subscribers = this.subscribers.get(destination);
if (subscribers == null) {
subscribers = new HashSet<Subscription>();
this.subscribers.put(destination, subscribers);
}
String subscriptionId = headers.getFirstNativeHeader(StompHeaderAccessor.STOMP_ID_HEADER);
subscribers.add(new Subscription(subscriptionId, connection));
}
}
else if (headers.getCommand() == StompCommand.SEND) {
String destination = headers.getDestination();
synchronized (this.subscriberMonitor) {
Set<Subscription> subscriptions = this.subscribers.get(destination);
if (subscriptions != null) {
for (Subscription subscription: subscriptions) {
StompHeaderAccessor outboundHeaders = StompHeaderAccessor.create(StompCommand.MESSAGE);
outboundHeaders.setSubscriptionId(subscription.subscriptionId);
outboundHeaders.setMessageId(Long.toString(messageIdCounter.incrementAndGet()));
Message<?> outbound =
MessageBuilder.withPayloadAndHeaders(message.getPayload(), outboundHeaders).build();
subscription.tcpConnection.send(new String(this.messageConverter.fromMessage(outbound)));
}
}
}
}
addMessage(message);
}
private void addMessage(Message<?> message) {
synchronized (this.messageMonitor) {
this.messages.add(message);
this.messageMonitor.notifyAll();
}
}
public List<Message<?>> awaitMessages(int messageCount) throws InterruptedException {
synchronized (this.messageMonitor) {
while (this.messages.size() < messageCount) {
this.messageMonitor.wait();
}
return this.messages;
}
}
private static final class Subscription {
private final String subscriptionId;
private final TcpConnection<String, String> tcpConnection;
public Subscription(String subscriptionId, TcpConnection<String, String> tcpConnection) {
this.subscriptionId = subscriptionId;
this.tcpConnection = tcpConnection;
}
}
@Override
public boolean isRunning() {
return this.running;
}
@Override
public int getPhase() {
return Integer.MIN_VALUE;
}
@Override
public boolean isAutoStartup() {
return true;
}
@Override
public void stop(Runnable callback) {
this.stop();
callback.run();
}
}

4
spring-messaging/src/test/resources/log4j.xml

@ -15,6 +15,10 @@ @@ -15,6 +15,10 @@
<level value="info" />
</logger>
<logger name="org.apache.activemq">
<level value="info" />
</logger>
<!-- Root Logger -->
<root>
<priority value="warn" />

Loading…
Cancel
Save