|
|
|
@ -42,18 +42,18 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
* Create a new {@link ExecutorSubscribableChannel} instance where messages will be sent |
|
|
|
* Create a new {@link ExecutorSubscribableChannel} instance |
|
|
|
* in the callers thread. |
|
|
|
* where messages will be sent in the callers thread. |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
public ExecutorSubscribableChannel() { |
|
|
|
public ExecutorSubscribableChannel() { |
|
|
|
this(null); |
|
|
|
this(null); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
* Create a new {@link ExecutorSubscribableChannel} instance where messages will be sent |
|
|
|
* Create a new {@link ExecutorSubscribableChannel} instance |
|
|
|
* via the specified executor. |
|
|
|
* where messages will be sent via the specified executor. |
|
|
|
* @param executor the executor used to send the message or {@code null} to execute in |
|
|
|
* @param executor the executor used to send the message, |
|
|
|
* the callers thread. |
|
|
|
* or {@code null} to execute in the callers thread. |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
public ExecutorSubscribableChannel(Executor executor) { |
|
|
|
public ExecutorSubscribableChannel(Executor executor) { |
|
|
|
this.executor = executor; |
|
|
|
this.executor = executor; |
|
|
|
@ -100,6 +100,45 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
|
|
* Helps with the invocation of configured executor channel interceptors. |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
private class ExecutorChannelInterceptorChain { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private int interceptorIndex = -1; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public Message<?> applyBeforeHandle(Message<?> message, MessageChannel channel, MessageHandler handler) { |
|
|
|
|
|
|
|
for (ExecutorChannelInterceptor interceptor : executorInterceptors) { |
|
|
|
|
|
|
|
message = interceptor.beforeHandle(message, channel, handler); |
|
|
|
|
|
|
|
if (message == null) { |
|
|
|
|
|
|
|
String name = interceptor.getClass().getSimpleName(); |
|
|
|
|
|
|
|
if (logger.isDebugEnabled()) { |
|
|
|
|
|
|
|
logger.debug(name + " returned null from beforeHandle, i.e. precluding the send."); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
triggerAfterMessageHandled(message, channel, handler, null); |
|
|
|
|
|
|
|
return null; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
this.interceptorIndex++; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
return message; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public void triggerAfterMessageHandled(Message<?> message, MessageChannel channel, |
|
|
|
|
|
|
|
MessageHandler handler, Exception ex) { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for (int i = this.interceptorIndex; i >= 0; i--) { |
|
|
|
|
|
|
|
ExecutorChannelInterceptor interceptor = executorInterceptors.get(i); |
|
|
|
|
|
|
|
try { |
|
|
|
|
|
|
|
interceptor.afterMessageHandled(message, channel, handler, ex); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
catch (Throwable ex2) { |
|
|
|
|
|
|
|
logger.error("Exception from afterMessageHandled in " + interceptor, ex2); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
* Helps with the invocation of the target MessageHandler and interceptors. |
|
|
|
* Helps with the invocation of the target MessageHandler and interceptors. |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
@ -113,7 +152,6 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel { |
|
|
|
|
|
|
|
|
|
|
|
private final ExecutorChannelInterceptorChain chain; |
|
|
|
private final ExecutorChannelInterceptorChain chain; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public SendTask(Message<?> message, MessageChannel channel, MessageHandler handler, |
|
|
|
public SendTask(Message<?> message, MessageChannel channel, MessageHandler handler, |
|
|
|
ExecutorChannelInterceptorChain chain) { |
|
|
|
ExecutorChannelInterceptorChain chain) { |
|
|
|
|
|
|
|
|
|
|
|
@ -151,43 +189,4 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
|
|
* Helps with the invocation of configured executor channel interceptors. |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
private class ExecutorChannelInterceptorChain { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private int interceptorIndex = -1; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public Message<?> applyBeforeHandle(Message<?> message, MessageChannel channel, MessageHandler handler) { |
|
|
|
|
|
|
|
for (ExecutorChannelInterceptor interceptor : executorInterceptors) { |
|
|
|
|
|
|
|
message = interceptor.beforeHandle(message, channel, handler); |
|
|
|
|
|
|
|
if (message == null) { |
|
|
|
|
|
|
|
String name = interceptor.getClass().getSimpleName(); |
|
|
|
|
|
|
|
if (logger.isDebugEnabled()) { |
|
|
|
|
|
|
|
logger.debug(name + " returned null from beforeHandle, i.e. precluding the send."); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
triggerAfterMessageHandled(message, channel, handler, null); |
|
|
|
|
|
|
|
return null; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
this.interceptorIndex++; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
return message; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public void triggerAfterMessageHandled(Message<?> message, MessageChannel channel, |
|
|
|
|
|
|
|
MessageHandler handler, Exception ex) { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for (int i = this.interceptorIndex; i >= 0; i--) { |
|
|
|
|
|
|
|
ExecutorChannelInterceptor interceptor = executorInterceptors.get(i); |
|
|
|
|
|
|
|
try { |
|
|
|
|
|
|
|
interceptor.afterMessageHandled(message, channel, handler, ex); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
catch (Throwable ex2) { |
|
|
|
|
|
|
|
logger.error("Exception from afterMessageHandled in " + interceptor, ex2); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|