From d014d88937356d21db3b10d3c2dc0650c7f168b5 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 27 Aug 2020 13:07:46 +0100 Subject: [PATCH] Polishing OrderedMessageSender See gh-25581 --- .../simp/broker/OrderedMessageSender.java | 34 +++++++++---------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/OrderedMessageSender.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/OrderedMessageSender.java index c7652d4eb43..e9586327caf 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/OrderedMessageSender.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/OrderedMessageSender.java @@ -52,7 +52,7 @@ import org.springframework.util.Assert; */ class OrderedMessageSender implements MessageChannel { - static final String COMPLETION_TASK_HEADER = "simpSendCompletionTask"; + private static final CompletionTaskInterceptor COMPLETION_INTERCEPTOR = new CompletionTaskInterceptor(); private final MessageChannel channel; @@ -101,7 +101,7 @@ class OrderedMessageSender implements MessageChannel { } this.control.removeMessage(message); try { - getMutableAccessor(message).setHeader(COMPLETION_TASK_HEADER, (Runnable) () -> { + CompletionTaskInterceptor.instrumentMessage(message, () -> { this.control.releaseSessionLock(sessionId); if (this.control.hasRemainingWork()) { trySend(); @@ -130,13 +130,6 @@ class OrderedMessageSender implements MessageChannel { } } - private SimpMessageHeaderAccessor getMutableAccessor(Message message) { - SimpMessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class); - Assert.isTrue(accessor != null && accessor.isMutable(), "Expected mutable SimpMessageHeaderAccessor"); - return accessor; - } - - /** * Install or remove an {@link ExecutorChannelInterceptor} that invokes a * completion task once the message is handled. @@ -148,14 +141,14 @@ class OrderedMessageSender implements MessageChannel { if (preservePublishOrder) { Assert.isInstanceOf(ExecutorSubscribableChannel.class, channel, "An ExecutorSubscribableChannel is required for `preservePublishOrder`"); - ExecutorSubscribableChannel execChannel = (ExecutorSubscribableChannel) channel; - if (execChannel.getInterceptors().stream().noneMatch(i -> i instanceof CompletionTaskInterceptor)) { - execChannel.addInterceptor(0, new CompletionTaskInterceptor()); + ExecutorSubscribableChannel executorChannel = (ExecutorSubscribableChannel) channel; + if (executorChannel.getInterceptors().stream().noneMatch(i -> i == COMPLETION_INTERCEPTOR)) { + executorChannel.addInterceptor(0, COMPLETION_INTERCEPTOR); } } else if (channel instanceof ExecutorSubscribableChannel) { ExecutorSubscribableChannel execChannel = (ExecutorSubscribableChannel) channel; - execChannel.getInterceptors().stream().filter(i -> i instanceof CompletionTaskInterceptor) + execChannel.getInterceptors().stream().filter(i -> i == COMPLETION_INTERCEPTOR) .findFirst() .map(execChannel::removeInterceptor); @@ -200,10 +193,7 @@ class OrderedMessageSender implements MessageChannel { } public boolean acquireSessionLock(String sessionId) { - if (this.sessionsInProgress.put(sessionId, Boolean.TRUE) != null) { - return false; - } - return true; + return (this.sessionsInProgress.put(sessionId, Boolean.TRUE) == null); } public void releaseSessionLock(String sessionId) { @@ -223,15 +213,23 @@ class OrderedMessageSender implements MessageChannel { private static class CompletionTaskInterceptor implements ExecutorChannelInterceptor { + private static final String COMPLETION_TASK_HEADER = "simpSendCompletionTask"; + @Override public void afterMessageHandled( Message message, MessageChannel ch, MessageHandler handler, @Nullable Exception ex) { - Runnable task = (Runnable) message.getHeaders().get(OrderedMessageSender.COMPLETION_TASK_HEADER); + Runnable task = (Runnable) message.getHeaders().get(COMPLETION_TASK_HEADER); if (task != null) { task.run(); } } + + public static void instrumentMessage(Message message, Runnable task) { + SimpMessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class); + Assert.isTrue(accessor != null && accessor.isMutable(), "Expected a mutable SimpMessageHeaderAccessor"); + accessor.setHeader(COMPLETION_TASK_HEADER, task); + } } }