headersToUse = processHeadersToSend(headers);
+ if (headersToUse != null) {
+ if (headersToUse instanceof MessageHeaders) {
+ messageHeaders = (MessageHeaders) headersToUse;
}
else {
- messageHeaders = new MessageHeaders(headers);
+ messageHeaders = new MessageHeaders(headersToUse);
}
}
- Message> message = this.converter.toMessage(payload, messageHeaders);
-
+ Message> message = getMessageConverter().toMessage(payload, messageHeaders);
if (message == null) {
- String payloadType = (payload != null) ? payload.getClass().getName() : null;
- Object contentType = (messageHeaders != null) ? messageHeaders.get(MessageHeaders.CONTENT_TYPE) : null;
- throw new MessageConversionException("Unable to convert payload type '"
- + payloadType + "', Content-Type=" + contentType + ", converter=" + this.converter, null);
+ String payloadType = (payload != null ? payload.getClass().getName() : null);
+ Object contentType = (messageHeaders != null ? messageHeaders.get(MessageHeaders.CONTENT_TYPE) : null);
+ throw new MessageConversionException("Unable to convert payload with type='" + payloadType +
+ "', contentType='" + contentType + "', converter=[" + getMessageConverter() + "]");
}
-
if (postProcessor != null) {
message = postProcessor.postProcessMessage(message);
}
-
- this.send(destination, message);
+ send(destination, message);
}
/**
* Provides access to the map of input headers before a send operation. Sub-classes
* can modify the headers and then return the same or a different map.
- *
* This default implementation in this class returns the input map.
- *
- * @param headers the headers to send or {@code null}.
- * @return the actual headers to send or {@code null}.
+ * @param headers the headers to send or {@code null}
+ * @return the actual headers to send or {@code null}
*/
protected Map processHeadersToSend(Map headers) {
return headers;
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/core/AbstractMessagingTemplate.java b/spring-messaging/src/main/java/org/springframework/messaging/core/AbstractMessagingTemplate.java
index 222ef34fe5c..286e12062d2 100644
--- a/spring-messaging/src/main/java/org/springframework/messaging/core/AbstractMessagingTemplate.java
+++ b/spring-messaging/src/main/java/org/springframework/messaging/core/AbstractMessagingTemplate.java
@@ -37,12 +37,12 @@ public abstract class AbstractMessagingTemplate extends AbstractMessageSendin
@Override
public Message> receive() {
- return this.receive(getRequiredDefaultDestination());
+ return receive(getRequiredDefaultDestination());
}
@Override
public Message> receive(D destination) {
- return this.doReceive(destination);
+ return doReceive(destination);
}
protected abstract Message> doReceive(D destination);
@@ -50,13 +50,13 @@ public abstract class AbstractMessagingTemplate extends AbstractMessageSendin
@Override
public T receiveAndConvert(Class targetClass) {
- return this.receiveAndConvert(getRequiredDefaultDestination(), targetClass);
+ return receiveAndConvert(getRequiredDefaultDestination(), targetClass);
}
@SuppressWarnings("unchecked")
@Override
public T receiveAndConvert(D destination, Class targetClass) {
- Message> message = this.doReceive(destination);
+ Message> message = doReceive(destination);
if (message != null) {
return (T) getMessageConverter().fromMessage(message, targetClass);
}
@@ -67,12 +67,12 @@ public abstract class AbstractMessagingTemplate extends AbstractMessageSendin
@Override
public Message> sendAndReceive(Message> requestMessage) {
- return this.sendAndReceive(getRequiredDefaultDestination(), requestMessage);
+ return sendAndReceive(getRequiredDefaultDestination(), requestMessage);
}
@Override
public Message> sendAndReceive(D destination, Message> requestMessage) {
- return this.doSendAndReceive(destination, requestMessage);
+ return doSendAndReceive(destination, requestMessage);
}
protected abstract Message> doSendAndReceive(D destination, Message> requestMessage);
@@ -80,34 +80,27 @@ public abstract class AbstractMessagingTemplate extends AbstractMessageSendin
@Override
public T convertSendAndReceive(Object request, Class targetClass) {
- return this.convertSendAndReceive(getRequiredDefaultDestination(), request, targetClass);
+ return convertSendAndReceive(getRequiredDefaultDestination(), request, targetClass);
}
@Override
public T convertSendAndReceive(D destination, Object request, Class targetClass) {
- Map headers = null;
- return this.convertSendAndReceive(destination, request, headers, targetClass);
+ return convertSendAndReceive(destination, request, null, targetClass);
}
@Override
- public T convertSendAndReceive(D destination, Object request, Map headers,
- Class targetClass) {
-
- MessagePostProcessor postProcessor = null;
- return this.convertSendAndReceive(destination, request, headers, targetClass, postProcessor);
+ public T convertSendAndReceive(D destination, Object request, Map headers, Class targetClass) {
+ return convertSendAndReceive(destination, request, headers, targetClass, null);
}
@Override
public T convertSendAndReceive(Object request, Class targetClass, MessagePostProcessor postProcessor) {
- return this.convertSendAndReceive(getRequiredDefaultDestination(), request, targetClass, postProcessor);
+ return convertSendAndReceive(getRequiredDefaultDestination(), request, targetClass, postProcessor);
}
@Override
- public T convertSendAndReceive(D destination, Object request, Class targetClass,
- MessagePostProcessor postProcessor) {
-
- Map headers = null;
- return this.convertSendAndReceive(destination, request, headers, targetClass, postProcessor);
+ public T convertSendAndReceive(D destination, Object request, Class targetClass, MessagePostProcessor postProcessor) {
+ return convertSendAndReceive(destination, request, null, targetClass, postProcessor);
}
@SuppressWarnings("unchecked")
@@ -115,22 +108,22 @@ public abstract class AbstractMessagingTemplate extends AbstractMessageSendin
public T convertSendAndReceive(D destination, Object request, Map headers,
Class targetClass, MessagePostProcessor postProcessor) {
- MessageHeaders messageHeaders = (headers != null) ? new MessageHeaders(headers) : null;
+ MessageHeaders messageHeaders = (headers != null ? new MessageHeaders(headers) : null);
Message> requestMessage = getMessageConverter().toMessage(request, messageHeaders);
if (requestMessage == null) {
- String payloadType = (request != null) ? request.getClass().getName() : null;
- throw new MessageConversionException("Unable to convert payload type '"
- + payloadType + "', Content-Type=" + messageHeaders.get(MessageHeaders.CONTENT_TYPE)
- + ", converter=" + getMessageConverter(), null);
+ String payloadType = (request != null ? request.getClass().getName() : null);
+ Object contentType = (messageHeaders != null ? messageHeaders.get(MessageHeaders.CONTENT_TYPE) : null);
+ throw new MessageConversionException("Unable to convert payload with type '" + payloadType +
+ "', contentType='" + contentType + "', converter=[" + getMessageConverter() + "]");
}
if (postProcessor != null) {
requestMessage = postProcessor.postProcessMessage(requestMessage);
}
- Message> replyMessage = this.sendAndReceive(destination, requestMessage);
- return (replyMessage != null) ? (T) getMessageConverter().fromMessage(replyMessage, targetClass) : null;
+ Message> replyMessage = sendAndReceive(destination, requestMessage);
+ return (replyMessage != null ? (T) getMessageConverter().fromMessage(replyMessage, targetClass) : null);
}
}
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/core/GenericMessagingTemplate.java b/spring-messaging/src/main/java/org/springframework/messaging/core/GenericMessagingTemplate.java
index 059534ac88b..653c8a70191 100644
--- a/spring-messaging/src/main/java/org/springframework/messaging/core/GenericMessagingTemplate.java
+++ b/spring-messaging/src/main/java/org/springframework/messaging/core/GenericMessagingTemplate.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2013 the original author or authors.
+ * Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -54,7 +54,6 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
/**
* Configure the timeout value to use for send operations.
- *
* @param sendTimeout the send timeout in milliseconds
*/
public void setSendTimeout(long sendTimeout) {
@@ -70,7 +69,6 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
/**
* Configure the timeout value to use for receive operations.
- *
* @param receiveTimeout the receive timeout in milliseconds
*/
public void setReceiveTimeout(long receiveTimeout) {
@@ -89,11 +87,9 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
* receiving thread isn't going to receive the reply either because it timed out,
* or because it already received a reply, or because it got an exception while
* sending the request message.
- *
- * The default value is {@code false} in which case only a WARN message is logged.
+ *
The default value is {@code false} in which case only a WARN message is logged.
* If set to {@code true} a {@link MessageDeliveryException} is raised in addition
* to the log message.
- *
* @param throwExceptionOnLateReply whether to throw an exception or not
*/
public void setThrowExceptionOnLateReply(boolean throwExceptionOnLateReply) {
@@ -108,8 +104,7 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
@Override
protected final void doSend(MessageChannel channel, Message> message) {
-
- Assert.notNull(channel, "channel must not be null");
+ Assert.notNull(channel, "'channel' is required");
MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class);
if (accessor != null && accessor.isMutable()) {
@@ -117,7 +112,7 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
}
long timeout = this.sendTimeout;
- boolean sent = (timeout >= 0) ? channel.send(message, timeout) : channel.send(message);
+ boolean sent = (timeout >= 0 ? channel.send(message, timeout) : channel.send(message));
if (!sent) {
throw new MessageDeliveryException(message,
@@ -127,15 +122,14 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
@Override
protected final Message> doReceive(MessageChannel channel) {
-
Assert.notNull(channel, "'channel' is required");
- Assert.state(channel instanceof PollableChannel, "A PollableChannel is required to receive messages.");
+ Assert.state(channel instanceof PollableChannel, "A PollableChannel is required to receive messages");
long timeout = this.receiveTimeout;
- Message> message = (timeout >= 0) ?
- ((PollableChannel) channel).receive(timeout) : ((PollableChannel) channel).receive();
+ Message> message = (timeout >= 0 ?
+ ((PollableChannel) channel).receive(timeout) : ((PollableChannel) channel).receive());
- if ((message == null) && this.logger.isTraceEnabled()) {
+ if (message == null && this.logger.isTraceEnabled()) {
this.logger.trace("Failed to receive message from channel '" + channel + "' within timeout: " + timeout);
}
@@ -144,20 +138,16 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
@Override
protected final Message> doSendAndReceive(MessageChannel channel, Message> requestMessage) {
-
Assert.notNull(channel, "'channel' is required");
-
Object originalReplyChannelHeader = requestMessage.getHeaders().getReplyChannel();
Object originalErrorChannelHeader = requestMessage.getHeaders().getErrorChannel();
TemporaryReplyChannel tempReplyChannel = new TemporaryReplyChannel();
-
- requestMessage = MessageBuilder.fromMessage(requestMessage)
- .setReplyChannel(tempReplyChannel)
- .setErrorChannel(tempReplyChannel).build();
+ requestMessage = MessageBuilder.fromMessage(requestMessage).setReplyChannel(tempReplyChannel).
+ setErrorChannel(tempReplyChannel).build();
try {
- this.doSend(channel, requestMessage);
+ doSend(channel, requestMessage);
}
catch (RuntimeException e) {
tempReplyChannel.setSendFailed(true);
@@ -183,10 +173,10 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
private final Log logger = LogFactory.getLog(TemporaryReplyChannel.class);
- private volatile Message> replyMessage;
-
private final CountDownLatch replyLatch = new CountDownLatch(1);
+ private volatile Message> replyMessage;
+
private volatile boolean hasReceived;
private volatile boolean hasTimedOut;
@@ -198,7 +188,6 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
this.hasSendFailed = hasSendError;
}
-
@Override
public Message> receive() {
return this.receive(-1);
@@ -233,7 +222,6 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
@Override
public boolean send(Message> message, long timeout) {
-
this.replyMessage = message;
boolean alreadyReceivedReply = this.hasReceived;
this.replyLatch.countDown();
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/SimpMessagingTemplate.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/SimpMessagingTemplate.java
index 2b53358fd2c..7a56f05bb3a 100644
--- a/spring-messaging/src/main/java/org/springframework/messaging/simp/SimpMessagingTemplate.java
+++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/SimpMessagingTemplate.java
@@ -84,22 +84,21 @@ public class SimpMessagingTemplate extends AbstractMessageSendingTemplateBy default this property is not set.
+ * By default, this property is not set.
*/
public void setHeaderInitializer(MessageHeaderInitializer headerInitializer) {
this.headerInitializer = headerInitializer;
}
/**
- * @return the configured header initializer.
+ * Return the configured header initializer.
*/
public MessageHeaderInitializer getHeaderInitializer() {
return this.headerInitializer;
}
/**
- * @return the messageChannel
+ * Return the configured message channel.
*/
public MessageChannel getMessageChannel() {
return this.messageChannel;
@@ -107,7 +106,6 @@ public class SimpMessagingTemplate extends AbstractMessageSendingTemplateIf a destination header is not already present ,the message is sent
* to the configured {@link #setDefaultDestination(Object) defaultDestination}
* or an exception an {@code IllegalStateException} is raised if that isn't
* configured.
- *
- * @param message the message to send, never {@code null}
+ * @param message the message to send (never {@code null})
*/
@Override
public void send(Message> message) {
@@ -149,7 +145,6 @@ public class SimpMessagingTemplate extends AbstractMessageSendingTemplate message) {
-
Assert.notNull(destination, "Destination must not be null");
SimpMessageHeaderAccessor simpAccessor =
@@ -181,14 +176,11 @@ public class SimpMessagingTemplate extends AbstractMessageSendingTemplate message) {
-
String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders());
Assert.notNull(destination);
long timeout = this.sendTimeout;
- boolean sent = (timeout >= 0)
- ? this.messageChannel.send(message, timeout)
- : this.messageChannel.send(message);
+ boolean sent = (timeout >= 0 ? this.messageChannel.send(message, timeout) : this.messageChannel.send(message));
if (!sent) {
throw new MessageDeliveryException(message,
@@ -204,21 +196,21 @@ public class SimpMessagingTemplate extends AbstractMessageSendingTemplate headers) throws MessagingException {
- this.convertAndSendToUser(user, destination, payload, headers, null);
+ convertAndSendToUser(user, destination, payload, headers, null);
}
@Override
public void convertAndSendToUser(String user, String destination, Object payload,
MessagePostProcessor postProcessor) throws MessagingException {
- this.convertAndSendToUser(user, destination, payload, null, postProcessor);
+ convertAndSendToUser(user, destination, payload, null, postProcessor);
}
@Override
@@ -235,11 +227,9 @@ public class SimpMessagingTemplate extends AbstractMessageSendingTemplateHowever if the given headers already contain the key
* {@code NATIVE_HEADERS NATIVE_HEADERS} then the same headers instance is
* returned without changes.
- *
* Also if the given headers were prepared and obtained with
* {@link SimpMessageHeaderAccessor#getMessageHeaders()} then the same headers
* instance is also returned without changes.