|
|
|
|
@ -52,7 +52,7 @@ import org.springframework.util.Assert;
@@ -52,7 +52,7 @@ import org.springframework.util.Assert;
|
|
|
|
|
*/ |
|
|
|
|
class OrderedMessageSender implements MessageChannel { |
|
|
|
|
|
|
|
|
|
static final String COMPLETION_TASK_HEADER = "simpSendCompletionTask"; |
|
|
|
|
private static final CompletionTaskInterceptor COMPLETION_INTERCEPTOR = new CompletionTaskInterceptor(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private final MessageChannel channel; |
|
|
|
|
@ -101,7 +101,7 @@ class OrderedMessageSender implements MessageChannel {
@@ -101,7 +101,7 @@ class OrderedMessageSender implements MessageChannel {
|
|
|
|
|
} |
|
|
|
|
this.control.removeMessage(message); |
|
|
|
|
try { |
|
|
|
|
getMutableAccessor(message).setHeader(COMPLETION_TASK_HEADER, (Runnable) () -> { |
|
|
|
|
CompletionTaskInterceptor.instrumentMessage(message, () -> { |
|
|
|
|
this.control.releaseSessionLock(sessionId); |
|
|
|
|
if (this.control.hasRemainingWork()) { |
|
|
|
|
trySend(); |
|
|
|
|
@ -130,13 +130,6 @@ class OrderedMessageSender implements MessageChannel {
@@ -130,13 +130,6 @@ class OrderedMessageSender implements MessageChannel {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private SimpMessageHeaderAccessor getMutableAccessor(Message<?> message) { |
|
|
|
|
SimpMessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class); |
|
|
|
|
Assert.isTrue(accessor != null && accessor.isMutable(), "Expected mutable SimpMessageHeaderAccessor"); |
|
|
|
|
return accessor; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Install or remove an {@link ExecutorChannelInterceptor} that invokes a |
|
|
|
|
* completion task once the message is handled. |
|
|
|
|
@ -148,14 +141,14 @@ class OrderedMessageSender implements MessageChannel {
@@ -148,14 +141,14 @@ class OrderedMessageSender implements MessageChannel {
|
|
|
|
|
if (preservePublishOrder) { |
|
|
|
|
Assert.isInstanceOf(ExecutorSubscribableChannel.class, channel, |
|
|
|
|
"An ExecutorSubscribableChannel is required for `preservePublishOrder`"); |
|
|
|
|
ExecutorSubscribableChannel execChannel = (ExecutorSubscribableChannel) channel; |
|
|
|
|
if (execChannel.getInterceptors().stream().noneMatch(i -> i instanceof CompletionTaskInterceptor)) { |
|
|
|
|
execChannel.addInterceptor(0, new CompletionTaskInterceptor()); |
|
|
|
|
ExecutorSubscribableChannel executorChannel = (ExecutorSubscribableChannel) channel; |
|
|
|
|
if (executorChannel.getInterceptors().stream().noneMatch(i -> i == COMPLETION_INTERCEPTOR)) { |
|
|
|
|
executorChannel.addInterceptor(0, COMPLETION_INTERCEPTOR); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
else if (channel instanceof ExecutorSubscribableChannel) { |
|
|
|
|
ExecutorSubscribableChannel execChannel = (ExecutorSubscribableChannel) channel; |
|
|
|
|
execChannel.getInterceptors().stream().filter(i -> i instanceof CompletionTaskInterceptor) |
|
|
|
|
execChannel.getInterceptors().stream().filter(i -> i == COMPLETION_INTERCEPTOR) |
|
|
|
|
.findFirst() |
|
|
|
|
.map(execChannel::removeInterceptor); |
|
|
|
|
|
|
|
|
|
@ -200,10 +193,7 @@ class OrderedMessageSender implements MessageChannel {
@@ -200,10 +193,7 @@ class OrderedMessageSender implements MessageChannel {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public boolean acquireSessionLock(String sessionId) { |
|
|
|
|
if (this.sessionsInProgress.put(sessionId, Boolean.TRUE) != null) { |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
return true; |
|
|
|
|
return (this.sessionsInProgress.put(sessionId, Boolean.TRUE) == null); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public void releaseSessionLock(String sessionId) { |
|
|
|
|
@ -223,15 +213,23 @@ class OrderedMessageSender implements MessageChannel {
@@ -223,15 +213,23 @@ class OrderedMessageSender implements MessageChannel {
|
|
|
|
|
|
|
|
|
|
private static class CompletionTaskInterceptor implements ExecutorChannelInterceptor { |
|
|
|
|
|
|
|
|
|
private static final String COMPLETION_TASK_HEADER = "simpSendCompletionTask"; |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public void afterMessageHandled( |
|
|
|
|
Message<?> message, MessageChannel ch, MessageHandler handler, @Nullable Exception ex) { |
|
|
|
|
|
|
|
|
|
Runnable task = (Runnable) message.getHeaders().get(OrderedMessageSender.COMPLETION_TASK_HEADER); |
|
|
|
|
Runnable task = (Runnable) message.getHeaders().get(COMPLETION_TASK_HEADER); |
|
|
|
|
if (task != null) { |
|
|
|
|
task.run(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public static void instrumentMessage(Message<?> message, Runnable task) { |
|
|
|
|
SimpMessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class); |
|
|
|
|
Assert.isTrue(accessor != null && accessor.isMutable(), "Expected a mutable SimpMessageHeaderAccessor"); |
|
|
|
|
accessor.setHeader(COMPLETION_TASK_HEADER, task); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|