|
|
|
@ -1,5 +1,5 @@ |
|
|
|
/* |
|
|
|
/* |
|
|
|
* Copyright 2002-2019 the original author or authors. |
|
|
|
* Copyright 2002-2020 the original author or authors. |
|
|
|
* |
|
|
|
* |
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
|
|
* you may not use this file except in compliance with the License. |
|
|
|
* you may not use this file except in compliance with the License. |
|
|
|
@ -75,11 +75,13 @@ public abstract class AbstractMessageChannel implements MessageChannel, Intercep |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public void addInterceptor(ChannelInterceptor interceptor) { |
|
|
|
public void addInterceptor(ChannelInterceptor interceptor) { |
|
|
|
|
|
|
|
Assert.notNull(interceptor, "ChannelInterceptor must not be null"); |
|
|
|
this.interceptors.add(interceptor); |
|
|
|
this.interceptors.add(interceptor); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public void addInterceptor(int index, ChannelInterceptor interceptor) { |
|
|
|
public void addInterceptor(int index, ChannelInterceptor interceptor) { |
|
|
|
|
|
|
|
Assert.notNull(interceptor, "ChannelInterceptor must not be null"); |
|
|
|
this.interceptors.add(index, interceptor); |
|
|
|
this.interceptors.add(index, interceptor); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@ -107,29 +109,30 @@ public abstract class AbstractMessageChannel implements MessageChannel, Intercep |
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public final boolean send(Message<?> message, long timeout) { |
|
|
|
public final boolean send(Message<?> message, long timeout) { |
|
|
|
Assert.notNull(message, "Message must not be null"); |
|
|
|
Assert.notNull(message, "Message must not be null"); |
|
|
|
|
|
|
|
Message<?> messageToUse = message; |
|
|
|
ChannelInterceptorChain chain = new ChannelInterceptorChain(); |
|
|
|
ChannelInterceptorChain chain = new ChannelInterceptorChain(); |
|
|
|
boolean sent = false; |
|
|
|
boolean sent = false; |
|
|
|
try { |
|
|
|
try { |
|
|
|
message = chain.applyPreSend(message, this); |
|
|
|
messageToUse = chain.applyPreSend(messageToUse, this); |
|
|
|
if (message == null) { |
|
|
|
if (messageToUse == null) { |
|
|
|
return false; |
|
|
|
return false; |
|
|
|
} |
|
|
|
} |
|
|
|
sent = sendInternal(message, timeout); |
|
|
|
sent = sendInternal(messageToUse, timeout); |
|
|
|
chain.applyPostSend(message, this, sent); |
|
|
|
chain.applyPostSend(messageToUse, this, sent); |
|
|
|
chain.triggerAfterSendCompletion(message, this, sent, null); |
|
|
|
chain.triggerAfterSendCompletion(messageToUse, this, sent, null); |
|
|
|
return sent; |
|
|
|
return sent; |
|
|
|
} |
|
|
|
} |
|
|
|
catch (Exception ex) { |
|
|
|
catch (Exception ex) { |
|
|
|
chain.triggerAfterSendCompletion(message, this, sent, ex); |
|
|
|
chain.triggerAfterSendCompletion(messageToUse, this, sent, ex); |
|
|
|
if (ex instanceof MessagingException) { |
|
|
|
if (ex instanceof MessagingException) { |
|
|
|
throw (MessagingException) ex; |
|
|
|
throw (MessagingException) ex; |
|
|
|
} |
|
|
|
} |
|
|
|
throw new MessageDeliveryException(message,"Failed to send message to " + this, ex); |
|
|
|
throw new MessageDeliveryException(messageToUse,"Failed to send message to " + this, ex); |
|
|
|
} |
|
|
|
} |
|
|
|
catch (Throwable err) { |
|
|
|
catch (Throwable err) { |
|
|
|
MessageDeliveryException ex2 = |
|
|
|
MessageDeliveryException ex2 = |
|
|
|
new MessageDeliveryException(message, "Failed to send message to " + this, err); |
|
|
|
new MessageDeliveryException(messageToUse, "Failed to send message to " + this, err); |
|
|
|
chain.triggerAfterSendCompletion(message, this, sent, ex2); |
|
|
|
chain.triggerAfterSendCompletion(messageToUse, this, sent, ex2); |
|
|
|
throw ex2; |
|
|
|
throw ex2; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
@ -200,13 +203,14 @@ public abstract class AbstractMessageChannel implements MessageChannel, Intercep |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public Message<?> applyPostReceive(Message<?> message, MessageChannel channel) { |
|
|
|
public Message<?> applyPostReceive(Message<?> message, MessageChannel channel) { |
|
|
|
|
|
|
|
Message<?> messageToUse = message; |
|
|
|
for (ChannelInterceptor interceptor : interceptors) { |
|
|
|
for (ChannelInterceptor interceptor : interceptors) { |
|
|
|
message = interceptor.postReceive(message, channel); |
|
|
|
messageToUse = interceptor.postReceive(messageToUse, channel); |
|
|
|
if (message == null) { |
|
|
|
if (messageToUse == null) { |
|
|
|
return null; |
|
|
|
return null; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
return message; |
|
|
|
return messageToUse; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public void triggerAfterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex) { |
|
|
|
public void triggerAfterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex) { |
|
|
|
|