diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractBrokerMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractBrokerMessageHandler.java index e21725519b6..1abc3fa86ee 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractBrokerMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractBrokerMessageHandler.java @@ -161,8 +161,7 @@ public abstract class AbstractBrokerMessageHandler * ThreadPoolExecutor that in turn does not guarantee processing in order. *

When this flag is set to {@code true} messages within the same session * will be sent to the {@code "clientOutboundChannel"} one at a time in - * order to preserve the order of publication. Enable this only if needed - * since there is some performance overhead to keep messages in order. + * order to preserve the order of publication. * @param preservePublishOrder whether to publish in order * @since 5.1 */ diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/OrderedMessageChannelDecorator.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/OrderedMessageChannelDecorator.java index 63c1830e871..98ea4e422cb 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/OrderedMessageChannelDecorator.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/OrderedMessageChannelDecorator.java @@ -89,11 +89,7 @@ public class OrderedMessageChannelDecorator implements MessageChannel { Message message = this.messages.peek(); if (message != null) { try { - addNextMessageTaskHeader(message, () -> { - if (removeMessage(message)) { - sendNextMessage(); - } - }); + setTaskHeader(message, new PostHandleTask(message)); if (this.channel.send(message)) { return; } @@ -124,19 +120,12 @@ public class OrderedMessageChannelDecorator implements MessageChannel { } } - private static void addNextMessageTaskHeader(Message message, Runnable task) { + private static void setTaskHeader(Message message, Runnable task) { SimpMessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class); Assert.isTrue(accessor != null && accessor.isMutable(), "Expected mutable SimpMessageHeaderAccessor"); accessor.setHeader(NEXT_MESSAGE_TASK_HEADER, task); } - /** - * Obtain the task to release the next message, if found. - */ - @Nullable - public static Runnable getNextMessageTask(Message message) { - return (Runnable) message.getHeaders().get(OrderedMessageChannelDecorator.NEXT_MESSAGE_TASK_HEADER); - } /** * Install or remove an {@link ExecutorChannelInterceptor} that invokes a @@ -150,19 +139,47 @@ public class OrderedMessageChannelDecorator implements MessageChannel { Assert.isInstanceOf(ExecutorSubscribableChannel.class, channel, "An ExecutorSubscribableChannel is required for 'preservePublishOrder'"); ExecutorSubscribableChannel execChannel = (ExecutorSubscribableChannel) channel; - if (execChannel.getInterceptors().stream().noneMatch(CallbackInterceptor.class::isInstance)) { - execChannel.addInterceptor(0, new CallbackInterceptor()); + if (execChannel.getInterceptors().stream().noneMatch(CallbackTaskInterceptor.class::isInstance)) { + execChannel.addInterceptor(0, new CallbackTaskInterceptor()); } } else if (channel instanceof ExecutorSubscribableChannel execChannel) { - execChannel.getInterceptors().stream().filter(CallbackInterceptor.class::isInstance) + execChannel.getInterceptors().stream().filter(CallbackTaskInterceptor.class::isInstance) .findFirst().map(execChannel::removeInterceptor); } } + /** + * Obtain the task to release the next message, if found. + */ + @Nullable + public static Runnable getNextMessageTask(Message message) { + return (Runnable) message.getHeaders().get(OrderedMessageChannelDecorator.NEXT_MESSAGE_TASK_HEADER); + } + + + /** + * Remove handled message from queue, and send next message. + */ + private class PostHandleTask implements Runnable { + + private final Message message; + + private PostHandleTask(Message message) { + this.message = message; + } + + @Override + public void run() { + if (OrderedMessageChannelDecorator.this.removeMessage(message)) { + sendNextMessage(); + } + } + } + - private static class CallbackInterceptor implements ExecutorChannelInterceptor { + private static class CallbackTaskInterceptor implements ExecutorChannelInterceptor { @Override public void afterMessageHandled( diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java index 72a084cedf6..c16d51e2ba4 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -228,8 +228,7 @@ public class MessageBrokerRegistry { * ThreadPoolExecutor that in turn does not guarantee processing in order. *

When this flag is set to {@code true} messages within the same session * will be sent to the {@code "clientOutboundChannel"} one at a time in - * order to preserve the order of publication. Enable this only if needed - * since there is some performance overhead to keep messages in order. + * order to preserve the order of publication. * @since 5.1 */ public MessageBrokerRegistry setPreservePublishOrder(boolean preservePublishOrder) { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractSubscribableChannel.java b/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractSubscribableChannel.java index 420291bb94b..e8b577a1a28 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractSubscribableChannel.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractSubscribableChannel.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,7 +35,7 @@ public abstract class AbstractSubscribableChannel extends AbstractMessageChannel public Set getSubscribers() { - return Collections.unmodifiableSet(this.handlers); + return Collections.unmodifiableSet(this.handlers); } public boolean hasSubscription(MessageHandler handler) { diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/messaging/OrderedMessageSendingIntegrationTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/OrderedMessageSendingIntegrationTests.java index 71f4f5b405f..735619cff20 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/messaging/OrderedMessageSendingIntegrationTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/OrderedMessageSendingIntegrationTests.java @@ -79,9 +79,9 @@ public class OrderedMessageSendingIntegrationTests { private BlockingWebSocketSession blockingSession; - private ExecutorSubscribableChannel subscribableChannel; + private ExecutorSubscribableChannel clientOutChannel; - private OrderedMessageChannelDecorator orderedMessageChannel; + private OrderedMessageChannelDecorator orderedClientOutChannel; private ThreadPoolTaskExecutor executor; @@ -98,10 +98,10 @@ public class OrderedMessageSendingIntegrationTests { this.executor.setAllowCoreThreadTimeOut(true); this.executor.afterPropertiesSet(); - this.subscribableChannel = new ExecutorSubscribableChannel(this.executor); - OrderedMessageChannelDecorator.configureInterceptor(this.subscribableChannel, true); + this.clientOutChannel = new ExecutorSubscribableChannel(this.executor); + OrderedMessageChannelDecorator.configureInterceptor(this.clientOutChannel, true); - this.orderedMessageChannel = new OrderedMessageChannelDecorator(this.subscribableChannel, logger); + this.orderedClientOutChannel = new OrderedMessageChannelDecorator(this.clientOutChannel, logger); } @AfterEach @@ -119,14 +119,14 @@ public class OrderedMessageSendingIntegrationTests { this.blockingSession, 60 * 1000, messageCount * MESSAGE_SIZE); TestMessageHandler handler = new TestMessageHandler(concurrentSessionDecorator); - subscribableChannel.subscribe(handler); + this.clientOutChannel.subscribe(handler); List> expectedMessages = new ArrayList<>(messageCount); // Send one to block Message message = createMessage(0); expectedMessages.add(message); - this.orderedMessageChannel.send(message); + this.orderedClientOutChannel.send(message); CountDownLatch latch = new CountDownLatch(messageCount); handler.setMessageLatch(latch); @@ -134,10 +134,10 @@ public class OrderedMessageSendingIntegrationTests { for (int i = 1; i <= messageCount; i++) { message = createMessage(i); expectedMessages.add(message); - this.orderedMessageChannel.send(message); + this.orderedClientOutChannel.send(message); } - latch.await(5, TimeUnit.SECONDS); + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); assertThat(concurrentSessionDecorator.getTimeSinceSendStarted()).isGreaterThan(0); assertThat(concurrentSessionDecorator.getBufferSize()).isEqualTo((messageCount * MESSAGE_SIZE)); @@ -152,10 +152,10 @@ public class OrderedMessageSendingIntegrationTests { new ConcurrentWebSocketSessionDecorator(this.blockingSession, 100, 1024); TestMessageHandler messageHandler = new TestMessageHandler(concurrentSessionDecorator); - subscribableChannel.subscribe(messageHandler); + this.clientOutChannel.subscribe(messageHandler); // Send one to block - this.orderedMessageChannel.send(createMessage(0)); + this.orderedClientOutChannel.send(createMessage(0)); // Exceed send time.. Thread.sleep(200); @@ -164,10 +164,9 @@ public class OrderedMessageSendingIntegrationTests { messageHandler.setMessageLatch(messageLatch); // Send one more - this.orderedMessageChannel.send(createMessage(1)); - - messageLatch.await(5, TimeUnit.SECONDS); + this.orderedClientOutChannel.send(createMessage(1)); + assertThat(messageLatch.await(5, TimeUnit.SECONDS)).isTrue(); assertThat(messageHandler.getSavedException()).hasMessageMatching( "Send time [\\d]+ \\(ms\\) for session '1' exceeded the allowed limit 100"); } @@ -179,23 +178,23 @@ public class OrderedMessageSendingIntegrationTests { new ConcurrentWebSocketSessionDecorator(this.blockingSession, 60 * 1000, 2 * MESSAGE_SIZE); TestMessageHandler messageHandler = new TestMessageHandler(concurrentSessionDecorator); - subscribableChannel.subscribe(messageHandler); + this.clientOutChannel.subscribe(messageHandler); // Send one to block - this.orderedMessageChannel.send(createMessage(0)); + this.orderedClientOutChannel.send(createMessage(0)); int messageCount = 3; CountDownLatch messageLatch = new CountDownLatch(messageCount); messageHandler.setMessageLatch(messageLatch); for (int i = 1; i <= messageCount; i++) { - this.orderedMessageChannel.send(createMessage(i)); + this.orderedClientOutChannel.send(createMessage(i)); } - messageLatch.await(5, TimeUnit.SECONDS); - + assertThat(messageLatch.await(5, TimeUnit.SECONDS)).isTrue(); assertThat(messageHandler.getSavedException()).hasMessage( - "Buffer size " + 3 * MESSAGE_SIZE + " bytes for session '1' exceeds the allowed limit " + 2 * MESSAGE_SIZE); + "Buffer size " + 3 * MESSAGE_SIZE + " bytes for session '1' " + + "exceeds the allowed limit " + 2 * MESSAGE_SIZE); } private static Message createMessage(int index) { @@ -218,9 +217,9 @@ public class OrderedMessageSendingIntegrationTests { @Nullable private CountDownLatch messageLatch; - private Queue> messages = new LinkedBlockingQueue<>(); + private final Queue> messages = new LinkedBlockingQueue<>(); - private AtomicReference exception = new AtomicReference<>(); + private final AtomicReference exception = new AtomicReference<>(); public TestMessageHandler(WebSocketSession session) {