Browse Source

MessageConversionException offers constructor without cause argument now, plus related polishing

Issue: SPR-11653
(cherry picked from commit 2888775)
pull/531/head
Juergen Hoeller 12 years ago
parent
commit
02e8198283
  1. 5
      spring-messaging/src/main/java/org/springframework/messaging/converter/MessageConversionException.java
  2. 25
      spring-messaging/src/main/java/org/springframework/messaging/core/AbstractDestinationResolvingMessagingTemplate.java
  3. 49
      spring-messaging/src/main/java/org/springframework/messaging/core/AbstractMessageSendingTemplate.java
  4. 47
      spring-messaging/src/main/java/org/springframework/messaging/core/AbstractMessagingTemplate.java
  5. 38
      spring-messaging/src/main/java/org/springframework/messaging/core/GenericMessagingTemplate.java
  6. 59
      spring-messaging/src/main/java/org/springframework/messaging/simp/SimpMessagingTemplate.java

5
spring-messaging/src/main/java/org/springframework/messaging/converter/MessageConversionException.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2013 the original author or authors. * Copyright 2002-2014 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -28,6 +28,9 @@ import org.springframework.messaging.MessagingException;
@SuppressWarnings("serial") @SuppressWarnings("serial")
public class MessageConversionException extends MessagingException { public class MessageConversionException extends MessagingException {
public MessageConversionException(String description) {
super(description);
}
public MessageConversionException(String description, Throwable cause) { public MessageConversionException(String description, Throwable cause) {
super(description, cause); super(description, cause);

25
spring-messaging/src/main/java/org/springframework/messaging/core/AbstractDestinationResolvingMessagingTemplate.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2013 the original author or authors. * Copyright 2002-2014 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -25,9 +25,9 @@ import org.springframework.util.Assert;
* An extension of {@link AbstractMessagingTemplate} that adds operations for sending * An extension of {@link AbstractMessagingTemplate} that adds operations for sending
* messages to a resolvable destination name as defined by the following interfaces: * messages to a resolvable destination name as defined by the following interfaces:
* <ul> * <ul>
* <li>{@link DestinationResolvingMessageSendingOperations}</li> * <li>{@link DestinationResolvingMessageSendingOperations}</li>
* <li>{@link DestinationResolvingMessageReceivingOperations}</li> * <li>{@link DestinationResolvingMessageReceivingOperations}</li>
* <li>{@link DestinationResolvingMessageRequestReplyOperations}</li> * <li>{@link DestinationResolvingMessageRequestReplyOperations}</li>
* </ul> * </ul>
* *
* @author Mark Fisher * @author Mark Fisher
@ -65,36 +65,31 @@ public abstract class AbstractDestinationResolvingMessagingTemplate<D> extends A
@Override @Override
public void send(String destinationName, Message<?> message) { public void send(String destinationName, Message<?> message) {
D destination = resolveDestination(destinationName); D destination = resolveDestination(destinationName);
this.doSend(destination, message); doSend(destination, message);
} }
protected final D resolveDestination(String destinationName) { protected final D resolveDestination(String destinationName) {
Assert.state(this.destinationResolver != null, "destinationResolver is required to resolve destination names"); Assert.state(this.destinationResolver != null, "DestinationResolver is required to resolve destination names");
return this.destinationResolver.resolveDestination(destinationName); return this.destinationResolver.resolveDestination(destinationName);
} }
@Override @Override
public <T> void convertAndSend(String destinationName, T payload) { public <T> void convertAndSend(String destinationName, T payload) {
Map<String, Object> headers = null; convertAndSend(destinationName, payload, null, null);
this.convertAndSend(destinationName, payload, headers);
} }
@Override @Override
public <T> void convertAndSend(String destinationName, T payload, Map<String, Object> headers) { public <T> void convertAndSend(String destinationName, T payload, Map<String, Object> headers) {
MessagePostProcessor postProcessor = null; convertAndSend(destinationName, payload, headers, null);
this.convertAndSend(destinationName, payload, headers, postProcessor);
} }
@Override @Override
public <T> void convertAndSend(String destinationName, T payload, MessagePostProcessor postProcessor) { public <T> void convertAndSend(String destinationName, T payload, MessagePostProcessor postProcessor) {
Map<String, Object> headers = null; convertAndSend(destinationName, payload, null, postProcessor);
this.convertAndSend(destinationName, payload, headers, postProcessor);
} }
@Override @Override
public <T> void convertAndSend(String destinationName, T payload, Map<String, Object> headers, public <T> void convertAndSend(String destinationName, T payload, Map<String, Object> headers, MessagePostProcessor postProcessor) {
MessagePostProcessor postProcessor) {
D destination = resolveDestination(destinationName); D destination = resolveDestination(destinationName);
super.convertAndSend(destination, payload, headers, postProcessor); super.convertAndSend(destination, payload, headers, postProcessor);
} }

49
spring-messaging/src/main/java/org/springframework/messaging/core/AbstractMessageSendingTemplate.java

@ -20,6 +20,7 @@ import java.util.Map;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException; import org.springframework.messaging.MessagingException;
@ -37,7 +38,7 @@ import org.springframework.util.Assert;
*/ */
public abstract class AbstractMessageSendingTemplate<D> implements MessageSendingOperations<D> { public abstract class AbstractMessageSendingTemplate<D> implements MessageSendingOperations<D> {
protected final Log logger = LogFactory.getLog(this.getClass()); protected final Log logger = LogFactory.getLog(getClass());
private volatile D defaultDestination; private volatile D defaultDestination;
@ -80,17 +81,17 @@ public abstract class AbstractMessageSendingTemplate<D> implements MessageSendin
@Override @Override
public void send(Message<?> message) { public void send(Message<?> message) {
this.send(getRequiredDefaultDestination(), message); send(getRequiredDefaultDestination(), message);
} }
protected final D getRequiredDefaultDestination() { protected final D getRequiredDefaultDestination() {
Assert.state(this.defaultDestination != null, "No 'defaultDestination' configured."); Assert.state(this.defaultDestination != null, "No 'defaultDestination' configured");
return this.defaultDestination; return this.defaultDestination;
} }
@Override @Override
public void send(D destination, Message<?> message) { public void send(D destination, Message<?> message) {
this.doSend(destination, message); doSend(destination, message);
} }
protected abstract void doSend(D destination, Message<?> message); protected abstract void doSend(D destination, Message<?> message);
@ -98,61 +99,57 @@ public abstract class AbstractMessageSendingTemplate<D> implements MessageSendin
@Override @Override
public void convertAndSend(Object payload) throws MessagingException { public void convertAndSend(Object payload) throws MessagingException {
this.convertAndSend(getRequiredDefaultDestination(), payload); convertAndSend(getRequiredDefaultDestination(), payload);
} }
@Override @Override
public void convertAndSend(D destination, Object payload) throws MessagingException { public void convertAndSend(D destination, Object payload) throws MessagingException {
this.convertAndSend(destination, payload, (Map<String, Object>) null); convertAndSend(destination, payload, (Map<String, Object>) null);
} }
@Override @Override
public void convertAndSend(D destination, Object payload, Map<String, Object> headers) throws MessagingException { public void convertAndSend(D destination, Object payload, Map<String, Object> headers) throws MessagingException {
MessagePostProcessor postProcessor = null; convertAndSend(destination, payload, headers, null);
this.convertAndSend(destination, payload, headers, postProcessor);
} }
@Override @Override
public void convertAndSend(Object payload, MessagePostProcessor postProcessor) throws MessagingException { public void convertAndSend(Object payload, MessagePostProcessor postProcessor) throws MessagingException {
this.convertAndSend(getRequiredDefaultDestination(), payload, postProcessor); convertAndSend(getRequiredDefaultDestination(), payload, postProcessor);
} }
@Override @Override
public void convertAndSend(D destination, Object payload, MessagePostProcessor postProcessor) public void convertAndSend(D destination, Object payload, MessagePostProcessor postProcessor)
throws MessagingException { throws MessagingException {
Map<String, Object> headers = null; convertAndSend(destination, payload, null, postProcessor);
this.convertAndSend(destination, payload, headers, postProcessor);
} }
@Override @Override
public void convertAndSend(D destination, Object payload, Map<String, Object> headers, public void convertAndSend(D destination, Object payload, Map<String, Object> headers,
MessagePostProcessor postProcessor) throws MessagingException { MessagePostProcessor postProcessor) throws MessagingException {
headers = processHeadersToSend(headers); Map<String, Object> headersToUse = processHeadersToSend(headers);
MessageHeaders messageHeaders = (headers != null) ? new MessageHeaders(headers) : null; MessageHeaders messageHeaders = (headersToUse != null ? new MessageHeaders(headersToUse) : null);
Message<?> message = this.converter.toMessage(payload, messageHeaders);
Message<?> message = getMessageConverter().toMessage(payload, messageHeaders);
if (message == null) { if (message == null) {
String payloadType = (payload != null) ? payload.getClass().getName() : null; String payloadType = (payload != null ? payload.getClass().getName() : null);
throw new MessageConversionException("Unable to convert payload type '" Object contentType = (messageHeaders != null ? messageHeaders.get(MessageHeaders.CONTENT_TYPE) : null);
+ payloadType + "', Content-Type=" + messageHeaders.get(MessageHeaders.CONTENT_TYPE) throw new MessageConversionException("Unable to convert payload with type='" + payloadType +
+ ", converter=" + this.converter, null); "', contentType='" + contentType + "', converter=[" + getMessageConverter() + "]");
} }
if (postProcessor != null) { if (postProcessor != null) {
message = postProcessor.postProcessMessage(message); message = postProcessor.postProcessMessage(message);
} }
this.send(destination, message); send(destination, message);
} }
/** /**
* Provides access to the map of headers before a send operation. * Provides access to the map of input headers before a send operation. Sub-classes
* Implementations can modify the headers by returning a different map. * can modify the headers and then return the same or a different map.
* This implementation returns the map that was passed in (i.e. without any changes). * <p>This default implementation in this class returns the input map.
* * @param headers the headers to send or {@code null}
* @param headers the headers to send, possibly {@code null} * @return the actual headers to send or {@code null}
* @return the actual headers to send
*/ */
protected Map<String, Object> processHeadersToSend(Map<String, Object> headers) { protected Map<String, Object> processHeadersToSend(Map<String, Object> headers) {
return headers; return headers;

47
spring-messaging/src/main/java/org/springframework/messaging/core/AbstractMessagingTemplate.java

@ -37,12 +37,12 @@ public abstract class AbstractMessagingTemplate<D> extends AbstractMessageSendin
@Override @Override
public Message<?> receive() { public Message<?> receive() {
return this.receive(getRequiredDefaultDestination()); return receive(getRequiredDefaultDestination());
} }
@Override @Override
public Message<?> receive(D destination) { public Message<?> receive(D destination) {
return this.doReceive(destination); return doReceive(destination);
} }
protected abstract Message<?> doReceive(D destination); protected abstract Message<?> doReceive(D destination);
@ -50,13 +50,13 @@ public abstract class AbstractMessagingTemplate<D> extends AbstractMessageSendin
@Override @Override
public <T> T receiveAndConvert(Class<T> targetClass) { public <T> T receiveAndConvert(Class<T> targetClass) {
return this.receiveAndConvert(getRequiredDefaultDestination(), targetClass); return receiveAndConvert(getRequiredDefaultDestination(), targetClass);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public <T> T receiveAndConvert(D destination, Class<T> targetClass) { public <T> T receiveAndConvert(D destination, Class<T> targetClass) {
Message<?> message = this.doReceive(destination); Message<?> message = doReceive(destination);
if (message != null) { if (message != null) {
return (T) getMessageConverter().fromMessage(message, targetClass); return (T) getMessageConverter().fromMessage(message, targetClass);
} }
@ -67,12 +67,12 @@ public abstract class AbstractMessagingTemplate<D> extends AbstractMessageSendin
@Override @Override
public Message<?> sendAndReceive(Message<?> requestMessage) { public Message<?> sendAndReceive(Message<?> requestMessage) {
return this.sendAndReceive(getRequiredDefaultDestination(), requestMessage); return sendAndReceive(getRequiredDefaultDestination(), requestMessage);
} }
@Override @Override
public Message<?> sendAndReceive(D destination, Message<?> requestMessage) { public Message<?> sendAndReceive(D destination, Message<?> requestMessage) {
return this.doSendAndReceive(destination, requestMessage); return doSendAndReceive(destination, requestMessage);
} }
protected abstract Message<?> doSendAndReceive(D destination, Message<?> requestMessage); protected abstract Message<?> doSendAndReceive(D destination, Message<?> requestMessage);
@ -80,34 +80,27 @@ public abstract class AbstractMessagingTemplate<D> extends AbstractMessageSendin
@Override @Override
public <T> T convertSendAndReceive(Object request, Class<T> targetClass) { public <T> T convertSendAndReceive(Object request, Class<T> targetClass) {
return this.convertSendAndReceive(getRequiredDefaultDestination(), request, targetClass); return convertSendAndReceive(getRequiredDefaultDestination(), request, targetClass);
} }
@Override @Override
public <T> T convertSendAndReceive(D destination, Object request, Class<T> targetClass) { public <T> T convertSendAndReceive(D destination, Object request, Class<T> targetClass) {
Map<String, Object> headers = null; return convertSendAndReceive(destination, request, null, targetClass);
return this.convertSendAndReceive(destination, request, headers, targetClass);
} }
@Override @Override
public <T> T convertSendAndReceive(D destination, Object request, Map<String, Object> headers, public <T> T convertSendAndReceive(D destination, Object request, Map<String, Object> headers, Class<T> targetClass) {
Class<T> targetClass) { return convertSendAndReceive(destination, request, headers, targetClass, null);
MessagePostProcessor postProcessor = null;
return this.convertSendAndReceive(destination, request, headers, targetClass, postProcessor);
} }
@Override @Override
public <T> T convertSendAndReceive(Object request, Class<T> targetClass, MessagePostProcessor postProcessor) { public <T> T convertSendAndReceive(Object request, Class<T> targetClass, MessagePostProcessor postProcessor) {
return this.convertSendAndReceive(getRequiredDefaultDestination(), request, targetClass, postProcessor); return convertSendAndReceive(getRequiredDefaultDestination(), request, targetClass, postProcessor);
} }
@Override @Override
public <T> T convertSendAndReceive(D destination, Object request, Class<T> targetClass, public <T> T convertSendAndReceive(D destination, Object request, Class<T> targetClass, MessagePostProcessor postProcessor) {
MessagePostProcessor postProcessor) { return convertSendAndReceive(destination, request, null, targetClass, postProcessor);
Map<String, Object> headers = null;
return this.convertSendAndReceive(destination, request, headers, targetClass, postProcessor);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -115,22 +108,22 @@ public abstract class AbstractMessagingTemplate<D> extends AbstractMessageSendin
public <T> T convertSendAndReceive(D destination, Object request, Map<String, Object> headers, public <T> T convertSendAndReceive(D destination, Object request, Map<String, Object> headers,
Class<T> targetClass, MessagePostProcessor postProcessor) { Class<T> targetClass, MessagePostProcessor postProcessor) {
MessageHeaders messageHeaders = (headers != null) ? new MessageHeaders(headers) : null; MessageHeaders messageHeaders = (headers != null ? new MessageHeaders(headers) : null);
Message<?> requestMessage = getMessageConverter().toMessage(request, messageHeaders); Message<?> requestMessage = getMessageConverter().toMessage(request, messageHeaders);
if (requestMessage == null) { if (requestMessage == null) {
String payloadType = (request != null) ? request.getClass().getName() : null; String payloadType = (request != null ? request.getClass().getName() : null);
throw new MessageConversionException("Unable to convert payload type '" Object contentType = (messageHeaders != null ? messageHeaders.get(MessageHeaders.CONTENT_TYPE) : null);
+ payloadType + "', Content-Type=" + messageHeaders.get(MessageHeaders.CONTENT_TYPE) throw new MessageConversionException("Unable to convert payload with type '" + payloadType +
+ ", converter=" + getMessageConverter(), null); "', contentType='" + contentType + "', converter=[" + getMessageConverter() + "]");
} }
if (postProcessor != null) { if (postProcessor != null) {
requestMessage = postProcessor.postProcessMessage(requestMessage); requestMessage = postProcessor.postProcessMessage(requestMessage);
} }
Message<?> replyMessage = this.sendAndReceive(destination, requestMessage); Message<?> replyMessage = sendAndReceive(destination, requestMessage);
return (replyMessage != null) ? (T) getMessageConverter().fromMessage(replyMessage, targetClass) : null; return (replyMessage != null ? (T) getMessageConverter().fromMessage(replyMessage, targetClass) : null);
} }
} }

38
spring-messaging/src/main/java/org/springframework/messaging/core/GenericMessagingTemplate.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2013 the original author or authors. * Copyright 2002-2014 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -53,7 +53,6 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
/** /**
* Configure the timeout value to use for send operations. * Configure the timeout value to use for send operations.
*
* @param sendTimeout the send timeout in milliseconds * @param sendTimeout the send timeout in milliseconds
*/ */
public void setSendTimeout(long sendTimeout) { public void setSendTimeout(long sendTimeout) {
@ -69,7 +68,6 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
/** /**
* Configure the timeout value to use for receive operations. * Configure the timeout value to use for receive operations.
*
* @param receiveTimeout the receive timeout in milliseconds * @param receiveTimeout the receive timeout in milliseconds
*/ */
public void setReceiveTimeout(long receiveTimeout) { public void setReceiveTimeout(long receiveTimeout) {
@ -88,11 +86,9 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
* receiving thread isn't going to receive the reply either because it timed out, * receiving thread isn't going to receive the reply either because it timed out,
* or because it already received a reply, or because it got an exception while * or because it already received a reply, or because it got an exception while
* sending the request message. * sending the request message.
* <p> * <p>The default value is {@code false} in which case only a WARN message is logged.
* The default value is {@code false} in which case only a WARN message is logged.
* If set to {@code true} a {@link MessageDeliveryException} is raised in addition * If set to {@code true} a {@link MessageDeliveryException} is raised in addition
* to the log message. * to the log message.
*
* @param throwExceptionOnLateReply whether to throw an exception or not * @param throwExceptionOnLateReply whether to throw an exception or not
*/ */
public void setThrowExceptionOnLateReply(boolean throwExceptionOnLateReply) { public void setThrowExceptionOnLateReply(boolean throwExceptionOnLateReply) {
@ -107,11 +103,10 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
@Override @Override
protected final void doSend(MessageChannel channel, Message<?> message) { protected final void doSend(MessageChannel channel, Message<?> message) {
Assert.notNull(channel, "'channel' is required");
Assert.notNull(channel, "channel must not be null");
long timeout = this.sendTimeout; long timeout = this.sendTimeout;
boolean sent = (timeout >= 0) ? channel.send(message, timeout) : channel.send(message); boolean sent = (timeout >= 0 ? channel.send(message, timeout) : channel.send(message));
if (!sent) { if (!sent) {
throw new MessageDeliveryException(message, throw new MessageDeliveryException(message,
@ -121,15 +116,14 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
@Override @Override
protected final Message<?> doReceive(MessageChannel channel) { protected final Message<?> doReceive(MessageChannel channel) {
Assert.notNull(channel, "'channel' is required"); Assert.notNull(channel, "'channel' is required");
Assert.state(channel instanceof PollableChannel, "A PollableChannel is required to receive messages."); Assert.state(channel instanceof PollableChannel, "A PollableChannel is required to receive messages");
long timeout = this.receiveTimeout; long timeout = this.receiveTimeout;
Message<?> message = (timeout >= 0) ? Message<?> message = (timeout >= 0 ?
((PollableChannel) channel).receive(timeout) : ((PollableChannel) channel).receive(); ((PollableChannel) channel).receive(timeout) : ((PollableChannel) channel).receive());
if ((message == null) && this.logger.isTraceEnabled()) { if (message == null && this.logger.isTraceEnabled()) {
this.logger.trace("Failed to receive message from channel '" + channel + "' within timeout: " + timeout); this.logger.trace("Failed to receive message from channel '" + channel + "' within timeout: " + timeout);
} }
@ -138,20 +132,16 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
@Override @Override
protected final Message<?> doSendAndReceive(MessageChannel channel, Message<?> requestMessage) { protected final Message<?> doSendAndReceive(MessageChannel channel, Message<?> requestMessage) {
Assert.notNull(channel, "'channel' is required"); Assert.notNull(channel, "'channel' is required");
Object originalReplyChannelHeader = requestMessage.getHeaders().getReplyChannel(); Object originalReplyChannelHeader = requestMessage.getHeaders().getReplyChannel();
Object originalErrorChannelHeader = requestMessage.getHeaders().getErrorChannel(); Object originalErrorChannelHeader = requestMessage.getHeaders().getErrorChannel();
TemporaryReplyChannel tempReplyChannel = new TemporaryReplyChannel(); TemporaryReplyChannel tempReplyChannel = new TemporaryReplyChannel();
requestMessage = MessageBuilder.fromMessage(requestMessage).setReplyChannel(tempReplyChannel).
requestMessage = MessageBuilder.fromMessage(requestMessage) setErrorChannel(tempReplyChannel).build();
.setReplyChannel(tempReplyChannel)
.setErrorChannel(tempReplyChannel).build();
try { try {
this.doSend(channel, requestMessage); doSend(channel, requestMessage);
} }
catch (RuntimeException e) { catch (RuntimeException e) {
tempReplyChannel.setSendFailed(true); tempReplyChannel.setSendFailed(true);
@ -177,10 +167,10 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
private final Log logger = LogFactory.getLog(TemporaryReplyChannel.class); private final Log logger = LogFactory.getLog(TemporaryReplyChannel.class);
private volatile Message<?> replyMessage;
private final CountDownLatch replyLatch = new CountDownLatch(1); private final CountDownLatch replyLatch = new CountDownLatch(1);
private volatile Message<?> replyMessage;
private volatile boolean hasReceived; private volatile boolean hasReceived;
private volatile boolean hasTimedOut; private volatile boolean hasTimedOut;
@ -192,7 +182,6 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
this.hasSendFailed = hasSendError; this.hasSendFailed = hasSendError;
} }
@Override @Override
public Message<?> receive() { public Message<?> receive() {
return this.receive(-1); return this.receive(-1);
@ -227,7 +216,6 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
@Override @Override
public boolean send(Message<?> message, long timeout) { public boolean send(Message<?> message, long timeout) {
this.replyMessage = message; this.replyMessage = message;
boolean alreadyReceivedReply = this.hasReceived; boolean alreadyReceivedReply = this.hasReceived;
this.replyLatch.countDown(); this.replyLatch.countDown();

59
spring-messaging/src/main/java/org/springframework/messaging/simp/SimpMessagingTemplate.java

@ -65,6 +65,13 @@ public class SimpMessagingTemplate extends AbstractMessageSendingTemplate<String
} }
/**
* Return the configured message channel.
*/
public MessageChannel getMessageChannel() {
return this.messageChannel;
}
/** /**
* Configure the prefix to use for destinations targeting a specific user. * Configure the prefix to use for destinations targeting a specific user.
* <p>The default value is "/user/". * <p>The default value is "/user/".
@ -76,30 +83,21 @@ public class SimpMessagingTemplate extends AbstractMessageSendingTemplate<String
} }
/** /**
* @return the userDestinationPrefix * Return the configured user destination prefix.
*/ */
public String getUserDestinationPrefix() { public String getUserDestinationPrefix() {
return this.userDestinationPrefix; return this.userDestinationPrefix;
} }
/** /**
* @return the messageChannel * Specify the timeout value to use for send operations (in milliseconds).
*/
public MessageChannel getMessageChannel() {
return this.messageChannel;
}
/**
* Specify the timeout value to use for send operations.
*
* @param sendTimeout the send timeout in milliseconds
*/ */
public void setSendTimeout(long sendTimeout) { public void setSendTimeout(long sendTimeout) {
this.sendTimeout = sendTimeout; this.sendTimeout = sendTimeout;
} }
/** /**
* @return the sendTimeout * Return the configured send timeout (in milliseconds).
*/ */
public long getSendTimeout() { public long getSendTimeout() {
return this.sendTimeout; return this.sendTimeout;
@ -124,13 +122,11 @@ public class SimpMessagingTemplate extends AbstractMessageSendingTemplate<String
message = MessageBuilder.withPayload(message.getPayload()).setHeaders(headers).build(); message = MessageBuilder.withPayload(message.getPayload()).setHeaders(headers).build();
long timeout = this.sendTimeout; long timeout = this.sendTimeout;
boolean sent = (timeout >= 0) boolean sent = (timeout >= 0 ? this.messageChannel.send(message, timeout) : this.messageChannel.send(message));
? this.messageChannel.send(message, timeout)
: this.messageChannel.send(message);
if (!sent) { if (!sent) {
throw new MessageDeliveryException(message, throw new MessageDeliveryException(message,
"failed to send message to destination '" + destination + "' within timeout: " + timeout); "Failed to send message to destination '" + destination + "' within timeout: " + timeout);
} }
} }
@ -138,21 +134,21 @@ public class SimpMessagingTemplate extends AbstractMessageSendingTemplate<String
@Override @Override
public void convertAndSendToUser(String user, String destination, Object payload) throws MessagingException { public void convertAndSendToUser(String user, String destination, Object payload) throws MessagingException {
this.convertAndSendToUser(user, destination, payload, (MessagePostProcessor) null); convertAndSendToUser(user, destination, payload, (MessagePostProcessor) null);
} }
@Override @Override
public void convertAndSendToUser(String user, String destination, Object payload, public void convertAndSendToUser(String user, String destination, Object payload,
Map<String, Object> headers) throws MessagingException { Map<String, Object> headers) throws MessagingException {
this.convertAndSendToUser(user, destination, payload, headers, null); convertAndSendToUser(user, destination, payload, headers, null);
} }
@Override @Override
public void convertAndSendToUser(String user, String destination, Object payload, public void convertAndSendToUser(String user, String destination, Object payload,
MessagePostProcessor postProcessor) throws MessagingException { MessagePostProcessor postProcessor) throws MessagingException {
this.convertAndSendToUser(user, destination, payload, null, postProcessor); convertAndSendToUser(user, destination, payload, null, postProcessor);
} }
@Override @Override
@ -169,31 +165,28 @@ public class SimpMessagingTemplate extends AbstractMessageSendingTemplate<String
* {@link org.springframework.messaging.support.NativeMessageHeaderAccessor#NATIVE_HEADERS NATIVE_HEADERS}. * {@link org.springframework.messaging.support.NativeMessageHeaderAccessor#NATIVE_HEADERS NATIVE_HEADERS}.
* Effectively this treats all given headers as headers to be sent out to the * Effectively this treats all given headers as headers to be sent out to the
* external source. * external source.
* <p> * <p>If the given headers already contain the key
* If the given headers already contain the key
* {@link org.springframework.messaging.support.NativeMessageHeaderAccessor#NATIVE_HEADERS NATIVE_HEADERS} * {@link org.springframework.messaging.support.NativeMessageHeaderAccessor#NATIVE_HEADERS NATIVE_HEADERS}
* then the same header map is returned (i.e. without any changes). * then the same header map is returned (i.e. without any changes).
*/ */
@Override @Override
protected Map<String, Object> processHeadersToSend(Map<String, Object> headers) { protected Map<String, Object> processHeadersToSend(Map<String, Object> headers) {
if (headers == null) { if (headers == null) {
return null; return null;
} }
else if (headers.containsKey(NativeMessageHeaderAccessor.NATIVE_HEADERS)) { if (headers.containsKey(NativeMessageHeaderAccessor.NATIVE_HEADERS)) {
return headers; return headers;
} }
else {
MultiValueMap<String, String> nativeHeaders = new LinkedMultiValueMap<String, String>(headers.size()); MultiValueMap<String, String> nativeHeaders = new LinkedMultiValueMap<String, String>(headers.size());
for (String key : headers.keySet()) { for (String key : headers.keySet()) {
Object value = headers.get(key); Object value = headers.get(key);
nativeHeaders.set(key, (value != null ? value.toString() : null)); nativeHeaders.set(key, (value != null ? value.toString() : null));
}
headers = new HashMap<String, Object>(1);
headers.put(NativeMessageHeaderAccessor.NATIVE_HEADERS, nativeHeaders);
return headers;
} }
headers = new HashMap<String, Object>(1);
headers.put(NativeMessageHeaderAccessor.NATIVE_HEADERS, nativeHeaders);
return headers;
} }
} }

Loading…
Cancel
Save