headersToUse = processHeadersToSend(headers);
+ MessageHeaders messageHeaders = (headersToUse != null ? new MessageHeaders(headersToUse) : null);
+ Message> message = getMessageConverter().toMessage(payload, messageHeaders);
if (message == null) {
- String payloadType = (payload != null) ? payload.getClass().getName() : null;
- throw new MessageConversionException("Unable to convert payload type '"
- + payloadType + "', Content-Type=" + messageHeaders.get(MessageHeaders.CONTENT_TYPE)
- + ", converter=" + this.converter, null);
+ String payloadType = (payload != null ? payload.getClass().getName() : null);
+ Object contentType = (messageHeaders != null ? messageHeaders.get(MessageHeaders.CONTENT_TYPE) : null);
+ throw new MessageConversionException("Unable to convert payload with type='" + payloadType +
+ "', contentType='" + contentType + "', converter=[" + getMessageConverter() + "]");
}
-
if (postProcessor != null) {
message = postProcessor.postProcessMessage(message);
}
- this.send(destination, message);
+ send(destination, message);
}
/**
- * Provides access to the map of headers before a send operation.
- * Implementations can modify the headers by returning a different map.
- * This implementation returns the map that was passed in (i.e. without any changes).
- *
- * @param headers the headers to send, possibly {@code null}
- * @return the actual headers to send
+ * Provides access to the map of input headers before a send operation. Sub-classes
+ * can modify the headers and then return the same or a different map.
+ * This default implementation in this class returns the input map.
+ * @param headers the headers to send or {@code null}
+ * @return the actual headers to send or {@code null}
*/
protected Map processHeadersToSend(Map headers) {
return headers;
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/core/AbstractMessagingTemplate.java b/spring-messaging/src/main/java/org/springframework/messaging/core/AbstractMessagingTemplate.java
index 222ef34fe5c..286e12062d2 100644
--- a/spring-messaging/src/main/java/org/springframework/messaging/core/AbstractMessagingTemplate.java
+++ b/spring-messaging/src/main/java/org/springframework/messaging/core/AbstractMessagingTemplate.java
@@ -37,12 +37,12 @@ public abstract class AbstractMessagingTemplate extends AbstractMessageSendin
@Override
public Message> receive() {
- return this.receive(getRequiredDefaultDestination());
+ return receive(getRequiredDefaultDestination());
}
@Override
public Message> receive(D destination) {
- return this.doReceive(destination);
+ return doReceive(destination);
}
protected abstract Message> doReceive(D destination);
@@ -50,13 +50,13 @@ public abstract class AbstractMessagingTemplate extends AbstractMessageSendin
@Override
public T receiveAndConvert(Class targetClass) {
- return this.receiveAndConvert(getRequiredDefaultDestination(), targetClass);
+ return receiveAndConvert(getRequiredDefaultDestination(), targetClass);
}
@SuppressWarnings("unchecked")
@Override
public T receiveAndConvert(D destination, Class targetClass) {
- Message> message = this.doReceive(destination);
+ Message> message = doReceive(destination);
if (message != null) {
return (T) getMessageConverter().fromMessage(message, targetClass);
}
@@ -67,12 +67,12 @@ public abstract class AbstractMessagingTemplate extends AbstractMessageSendin
@Override
public Message> sendAndReceive(Message> requestMessage) {
- return this.sendAndReceive(getRequiredDefaultDestination(), requestMessage);
+ return sendAndReceive(getRequiredDefaultDestination(), requestMessage);
}
@Override
public Message> sendAndReceive(D destination, Message> requestMessage) {
- return this.doSendAndReceive(destination, requestMessage);
+ return doSendAndReceive(destination, requestMessage);
}
protected abstract Message> doSendAndReceive(D destination, Message> requestMessage);
@@ -80,34 +80,27 @@ public abstract class AbstractMessagingTemplate extends AbstractMessageSendin
@Override
public T convertSendAndReceive(Object request, Class targetClass) {
- return this.convertSendAndReceive(getRequiredDefaultDestination(), request, targetClass);
+ return convertSendAndReceive(getRequiredDefaultDestination(), request, targetClass);
}
@Override
public T convertSendAndReceive(D destination, Object request, Class targetClass) {
- Map headers = null;
- return this.convertSendAndReceive(destination, request, headers, targetClass);
+ return convertSendAndReceive(destination, request, null, targetClass);
}
@Override
- public T convertSendAndReceive(D destination, Object request, Map headers,
- Class targetClass) {
-
- MessagePostProcessor postProcessor = null;
- return this.convertSendAndReceive(destination, request, headers, targetClass, postProcessor);
+ public T convertSendAndReceive(D destination, Object request, Map headers, Class targetClass) {
+ return convertSendAndReceive(destination, request, headers, targetClass, null);
}
@Override
public T convertSendAndReceive(Object request, Class targetClass, MessagePostProcessor postProcessor) {
- return this.convertSendAndReceive(getRequiredDefaultDestination(), request, targetClass, postProcessor);
+ return convertSendAndReceive(getRequiredDefaultDestination(), request, targetClass, postProcessor);
}
@Override
- public T convertSendAndReceive(D destination, Object request, Class targetClass,
- MessagePostProcessor postProcessor) {
-
- Map headers = null;
- return this.convertSendAndReceive(destination, request, headers, targetClass, postProcessor);
+ public T convertSendAndReceive(D destination, Object request, Class targetClass, MessagePostProcessor postProcessor) {
+ return convertSendAndReceive(destination, request, null, targetClass, postProcessor);
}
@SuppressWarnings("unchecked")
@@ -115,22 +108,22 @@ public abstract class AbstractMessagingTemplate extends AbstractMessageSendin
public T convertSendAndReceive(D destination, Object request, Map headers,
Class targetClass, MessagePostProcessor postProcessor) {
- MessageHeaders messageHeaders = (headers != null) ? new MessageHeaders(headers) : null;
+ MessageHeaders messageHeaders = (headers != null ? new MessageHeaders(headers) : null);
Message> requestMessage = getMessageConverter().toMessage(request, messageHeaders);
if (requestMessage == null) {
- String payloadType = (request != null) ? request.getClass().getName() : null;
- throw new MessageConversionException("Unable to convert payload type '"
- + payloadType + "', Content-Type=" + messageHeaders.get(MessageHeaders.CONTENT_TYPE)
- + ", converter=" + getMessageConverter(), null);
+ String payloadType = (request != null ? request.getClass().getName() : null);
+ Object contentType = (messageHeaders != null ? messageHeaders.get(MessageHeaders.CONTENT_TYPE) : null);
+ throw new MessageConversionException("Unable to convert payload with type '" + payloadType +
+ "', contentType='" + contentType + "', converter=[" + getMessageConverter() + "]");
}
if (postProcessor != null) {
requestMessage = postProcessor.postProcessMessage(requestMessage);
}
- Message> replyMessage = this.sendAndReceive(destination, requestMessage);
- return (replyMessage != null) ? (T) getMessageConverter().fromMessage(replyMessage, targetClass) : null;
+ Message> replyMessage = sendAndReceive(destination, requestMessage);
+ return (replyMessage != null ? (T) getMessageConverter().fromMessage(replyMessage, targetClass) : null);
}
}
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/core/GenericMessagingTemplate.java b/spring-messaging/src/main/java/org/springframework/messaging/core/GenericMessagingTemplate.java
index 97dd813ee68..1148a376ffe 100644
--- a/spring-messaging/src/main/java/org/springframework/messaging/core/GenericMessagingTemplate.java
+++ b/spring-messaging/src/main/java/org/springframework/messaging/core/GenericMessagingTemplate.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2013 the original author or authors.
+ * Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -53,7 +53,6 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
/**
* Configure the timeout value to use for send operations.
- *
* @param sendTimeout the send timeout in milliseconds
*/
public void setSendTimeout(long sendTimeout) {
@@ -69,7 +68,6 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
/**
* Configure the timeout value to use for receive operations.
- *
* @param receiveTimeout the receive timeout in milliseconds
*/
public void setReceiveTimeout(long receiveTimeout) {
@@ -88,11 +86,9 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
* receiving thread isn't going to receive the reply either because it timed out,
* or because it already received a reply, or because it got an exception while
* sending the request message.
- *
- * The default value is {@code false} in which case only a WARN message is logged.
+ *
The default value is {@code false} in which case only a WARN message is logged.
* If set to {@code true} a {@link MessageDeliveryException} is raised in addition
* to the log message.
- *
* @param throwExceptionOnLateReply whether to throw an exception or not
*/
public void setThrowExceptionOnLateReply(boolean throwExceptionOnLateReply) {
@@ -107,11 +103,10 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
@Override
protected final void doSend(MessageChannel channel, Message> message) {
-
- Assert.notNull(channel, "channel must not be null");
+ Assert.notNull(channel, "'channel' is required");
long timeout = this.sendTimeout;
- boolean sent = (timeout >= 0) ? channel.send(message, timeout) : channel.send(message);
+ boolean sent = (timeout >= 0 ? channel.send(message, timeout) : channel.send(message));
if (!sent) {
throw new MessageDeliveryException(message,
@@ -121,15 +116,14 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
@Override
protected final Message> doReceive(MessageChannel channel) {
-
Assert.notNull(channel, "'channel' is required");
- Assert.state(channel instanceof PollableChannel, "A PollableChannel is required to receive messages.");
+ Assert.state(channel instanceof PollableChannel, "A PollableChannel is required to receive messages");
long timeout = this.receiveTimeout;
- Message> message = (timeout >= 0) ?
- ((PollableChannel) channel).receive(timeout) : ((PollableChannel) channel).receive();
+ Message> message = (timeout >= 0 ?
+ ((PollableChannel) channel).receive(timeout) : ((PollableChannel) channel).receive());
- if ((message == null) && this.logger.isTraceEnabled()) {
+ if (message == null && this.logger.isTraceEnabled()) {
this.logger.trace("Failed to receive message from channel '" + channel + "' within timeout: " + timeout);
}
@@ -138,20 +132,16 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
@Override
protected final Message> doSendAndReceive(MessageChannel channel, Message> requestMessage) {
-
Assert.notNull(channel, "'channel' is required");
-
Object originalReplyChannelHeader = requestMessage.getHeaders().getReplyChannel();
Object originalErrorChannelHeader = requestMessage.getHeaders().getErrorChannel();
TemporaryReplyChannel tempReplyChannel = new TemporaryReplyChannel();
-
- requestMessage = MessageBuilder.fromMessage(requestMessage)
- .setReplyChannel(tempReplyChannel)
- .setErrorChannel(tempReplyChannel).build();
+ requestMessage = MessageBuilder.fromMessage(requestMessage).setReplyChannel(tempReplyChannel).
+ setErrorChannel(tempReplyChannel).build();
try {
- this.doSend(channel, requestMessage);
+ doSend(channel, requestMessage);
}
catch (RuntimeException e) {
tempReplyChannel.setSendFailed(true);
@@ -177,10 +167,10 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
private final Log logger = LogFactory.getLog(TemporaryReplyChannel.class);
- private volatile Message> replyMessage;
-
private final CountDownLatch replyLatch = new CountDownLatch(1);
+ private volatile Message> replyMessage;
+
private volatile boolean hasReceived;
private volatile boolean hasTimedOut;
@@ -192,7 +182,6 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
this.hasSendFailed = hasSendError;
}
-
@Override
public Message> receive() {
return this.receive(-1);
@@ -227,7 +216,6 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
@Override
public boolean send(Message> message, long timeout) {
-
this.replyMessage = message;
boolean alreadyReceivedReply = this.hasReceived;
this.replyLatch.countDown();
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/SimpMessagingTemplate.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/SimpMessagingTemplate.java
index 39da6ca69d7..f6424fc0f9b 100644
--- a/spring-messaging/src/main/java/org/springframework/messaging/simp/SimpMessagingTemplate.java
+++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/SimpMessagingTemplate.java
@@ -65,6 +65,13 @@ public class SimpMessagingTemplate extends AbstractMessageSendingTemplateThe default value is "/user/".
@@ -76,30 +83,21 @@ public class SimpMessagingTemplate extends AbstractMessageSendingTemplate= 0)
- ? this.messageChannel.send(message, timeout)
- : this.messageChannel.send(message);
+ boolean sent = (timeout >= 0 ? this.messageChannel.send(message, timeout) : this.messageChannel.send(message));
if (!sent) {
throw new MessageDeliveryException(message,
- "failed to send message to destination '" + destination + "' within timeout: " + timeout);
+ "Failed to send message to destination '" + destination + "' within timeout: " + timeout);
}
}
@@ -138,21 +134,21 @@ public class SimpMessagingTemplate extends AbstractMessageSendingTemplate headers) throws MessagingException {
- this.convertAndSendToUser(user, destination, payload, headers, null);
+ convertAndSendToUser(user, destination, payload, headers, null);
}
@Override
public void convertAndSendToUser(String user, String destination, Object payload,
MessagePostProcessor postProcessor) throws MessagingException {
- this.convertAndSendToUser(user, destination, payload, null, postProcessor);
+ convertAndSendToUser(user, destination, payload, null, postProcessor);
}
@Override
@@ -169,31 +165,28 @@ public class SimpMessagingTemplate extends AbstractMessageSendingTemplate
- * If the given headers already contain the key
+ * If the given headers already contain the key
* {@link org.springframework.messaging.support.NativeMessageHeaderAccessor#NATIVE_HEADERS NATIVE_HEADERS}
* then the same header map is returned (i.e. without any changes).
*/
@Override
protected Map processHeadersToSend(Map headers) {
-
if (headers == null) {
return null;
}
- else if (headers.containsKey(NativeMessageHeaderAccessor.NATIVE_HEADERS)) {
+ if (headers.containsKey(NativeMessageHeaderAccessor.NATIVE_HEADERS)) {
return headers;
}
- else {
- MultiValueMap nativeHeaders = new LinkedMultiValueMap(headers.size());
- for (String key : headers.keySet()) {
- Object value = headers.get(key);
- nativeHeaders.set(key, (value != null ? value.toString() : null));
- }
-
- headers = new HashMap(1);
- headers.put(NativeMessageHeaderAccessor.NATIVE_HEADERS, nativeHeaders);
- return headers;
+
+ MultiValueMap nativeHeaders = new LinkedMultiValueMap(headers.size());
+ for (String key : headers.keySet()) {
+ Object value = headers.get(key);
+ nativeHeaders.set(key, (value != null ? value.toString() : null));
}
+
+ headers = new HashMap(1);
+ headers.put(NativeMessageHeaderAccessor.NATIVE_HEADERS, nativeHeaders);
+ return headers;
}
}