From bded025d9f78aacdf7e543ebac1632009e318ac7 Mon Sep 17 00:00:00 2001 From: Stephane Nicoll Date: Thu, 24 Apr 2014 15:33:24 +0300 Subject: [PATCH] @SendTo support for jms listener endpoints This commit replaces the "responseDestination" attribute on the JmsListener annotation by a support of the standard SendTo annotation. Issue: SPR-11707 --- .../jms/annotation/JmsListener.java | 14 +--- ...msListenerAnnotationBeanPostProcessor.java | 3 - .../jms/config/MethodJmsListenerEndpoint.java | 46 +++++++----- .../jms/support/JmsMessageHeaderAccessor.java | 45 ++++++++++-- .../AbstractJmsAnnotationDrivenTests.java | 2 +- ...tenerAnnotationBeanPostProcessorTests.java | 1 - .../MethodJmsListenerEndpointTests.java | 72 ++++++++++++++++++- .../JmsMessageHeaderAccessorTests.java | 5 +- .../messaging/handler/annotation/SendTo.java | 10 ++- 9 files changed, 153 insertions(+), 45 deletions(-) diff --git a/spring-jms/src/main/java/org/springframework/jms/annotation/JmsListener.java b/spring-jms/src/main/java/org/springframework/jms/annotation/JmsListener.java index 02a4aa692f0..533973c46d9 100644 --- a/spring-jms/src/main/java/org/springframework/jms/annotation/JmsListener.java +++ b/spring-jms/src/main/java/org/springframework/jms/annotation/JmsListener.java @@ -61,7 +61,9 @@ import org.springframework.messaging.handler.annotation.MessageMapping; * *

Annotated method may have a non {@code void} return type. When they do, the result of the * method invocation is sent as a JMS reply to the destination defined by either the - * {@code JMSReplyTO} header of the incoming message or the value of {@link #responseDestination()}. + * {@code JMSReplyTO} header of the incoming message. When this value is not set, a default + * destination can be provided by adding @{@link org.springframework.messaging.handler.annotation.SendTo + * SendTo} to the method declaration. * * @author Stephane Nicoll * @since 4.1 @@ -111,14 +113,4 @@ public @interface JmsListener { */ String selector() default ""; - /** - * The name of the default response destination to send response messages to. - *

This will be applied in case of a request message that does not carry - * a "JMSReplyTo" field. The type of this destination will be determined - * by the listener-container's "destination-type" attribute. - *

Note: This only applies to a listener method with a return value, - * for which each result object will be converted into a response message. - */ - String responseDestination() default ""; - } diff --git a/spring-jms/src/main/java/org/springframework/jms/annotation/JmsListenerAnnotationBeanPostProcessor.java b/spring-jms/src/main/java/org/springframework/jms/annotation/JmsListenerAnnotationBeanPostProcessor.java index a3157722686..390003cb029 100644 --- a/spring-jms/src/main/java/org/springframework/jms/annotation/JmsListenerAnnotationBeanPostProcessor.java +++ b/spring-jms/src/main/java/org/springframework/jms/annotation/JmsListenerAnnotationBeanPostProcessor.java @@ -182,9 +182,6 @@ public class JmsListenerAnnotationBeanPostProcessor implements BeanPostProcessor if (StringUtils.hasText(jmsListener.subscription())) { endpoint.setSubscription(jmsListener.subscription()); } - if (StringUtils.hasText(jmsListener.responseDestination())) { - endpoint.setResponseDestination(jmsListener.responseDestination()); - } JmsListenerContainerFactory factory = null; String containerFactoryBeanName = jmsListener.containerFactory(); diff --git a/spring-jms/src/main/java/org/springframework/jms/config/MethodJmsListenerEndpoint.java b/spring-jms/src/main/java/org/springframework/jms/config/MethodJmsListenerEndpoint.java index f8ff3e4173b..81d3f1f0751 100644 --- a/spring-jms/src/main/java/org/springframework/jms/config/MethodJmsListenerEndpoint.java +++ b/spring-jms/src/main/java/org/springframework/jms/config/MethodJmsListenerEndpoint.java @@ -17,12 +17,16 @@ package org.springframework.jms.config; import java.lang.reflect.Method; +import java.util.Arrays; +import org.springframework.core.annotation.AnnotationUtils; import org.springframework.jms.listener.MessageListenerContainer; import org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter; import org.springframework.jms.support.converter.MessageConverter; +import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; import org.springframework.util.Assert; +import org.springframework.util.StringUtils; /** * A {@link JmsListenerEndpoint} providing the method to invoke to process @@ -37,8 +41,6 @@ public class MethodJmsListenerEndpoint extends AbstractJmsListenerEndpoint { private Method method; - private String responseDestination; - private JmsHandlerMethodFactory jmsHandlerMethodFactory; /** @@ -64,20 +66,6 @@ public class MethodJmsListenerEndpoint extends AbstractJmsListenerEndpoint { return method; } - /** - * Set the name of the default response destination to send response messages to. - */ - public void setResponseDestination(String responseDestination) { - this.responseDestination = responseDestination; - } - - /** - * Return the name of the default response destination to send response messages to. - */ - public String getResponseDestination() { - return responseDestination; - } - /** * Set the {@link DefaultJmsHandlerMethodFactory} to use to build the * {@link InvocableHandlerMethod} responsible to manage the invocation @@ -91,12 +79,12 @@ public class MethodJmsListenerEndpoint extends AbstractJmsListenerEndpoint { protected MessagingMessageListenerAdapter createMessageListener(MessageListenerContainer container) { Assert.state(jmsHandlerMethodFactory != null, "Could not create message listener, message listener factory not set."); - MessagingMessageListenerAdapter messageListener = new MessagingMessageListenerAdapter(); + MessagingMessageListenerAdapter messageListener = createMessageListenerInstance(); InvocableHandlerMethod invocableHandlerMethod = jmsHandlerMethodFactory.createInvocableHandlerMethod(getBean(), getMethod()); messageListener.setHandlerMethod(invocableHandlerMethod); - String responseDestination = getResponseDestination(); - if (responseDestination != null) { + String responseDestination = getDefaultResponseDestination(); + if (StringUtils.hasText(responseDestination)) { if (isQueue()) { messageListener.setDefaultResponseQueueName(responseDestination); } @@ -111,6 +99,26 @@ public class MethodJmsListenerEndpoint extends AbstractJmsListenerEndpoint { return messageListener; } + /** + * Create an empty {@link MessagingMessageListenerAdapter} instance. + */ + protected MessagingMessageListenerAdapter createMessageListenerInstance() { + return new MessagingMessageListenerAdapter(); + } + + private String getDefaultResponseDestination() { + SendTo ann = AnnotationUtils.getAnnotation(getMethod(), SendTo.class); + if (ann != null) { + Object[] destinations = ann.value(); + if (destinations.length != 1) { + throw new IllegalStateException("Invalid @" + SendTo.class.getSimpleName() + " annotation on '" + + getMethod() + "' one destination must be set (got " + Arrays.toString(destinations) + ")"); + } + return (String) destinations[0]; + } + return null; + } + @Override protected StringBuilder getEndpointDescription() { return super.getEndpointDescription() diff --git a/spring-jms/src/main/java/org/springframework/jms/support/JmsMessageHeaderAccessor.java b/spring-jms/src/main/java/org/springframework/jms/support/JmsMessageHeaderAccessor.java index 0261b7131c8..6fa9a5dcfae 100644 --- a/spring-jms/src/main/java/org/springframework/jms/support/JmsMessageHeaderAccessor.java +++ b/spring-jms/src/main/java/org/springframework/jms/support/JmsMessageHeaderAccessor.java @@ -51,47 +51,82 @@ public class JmsMessageHeaderAccessor extends NativeMessageHeaderAccessor { } - @Override - public Object getReplyChannel() { - return getReplyTo(); - } - + /** + * Return the {@link JmsHeaders#CORRELATION_ID correlationId}. + * @see JmsHeaders#CORRELATION_ID + */ public String getCorrelationId() { return (String) getHeader(JmsHeaders.CORRELATION_ID); } + /** + * Return the {@link JmsHeaders#DESTINATION destination}. + * @see JmsHeaders#DESTINATION + */ public Destination getDestination() { return (Destination) getHeader(JmsHeaders.DESTINATION); } + /** + * Return the {@link JmsHeaders#DELIVERY_MODE delivery mode}. + * @see JmsHeaders#DELIVERY_MODE + */ public Integer getDeliveryMode() { return (Integer) getHeader(JmsHeaders.DELIVERY_MODE); } + /** + * Return the message {@link JmsHeaders#EXPIRATION expiration}. + * @see JmsHeaders#EXPIRATION + */ public Long getExpiration() { return (Long) getHeader(JmsHeaders.EXPIRATION); } + /** + * Return the {@link JmsHeaders#MESSAGE_ID message id}. + * @see JmsHeaders#MESSAGE_ID + */ public String getMessageId() { return (String) getHeader(JmsHeaders.MESSAGE_ID); } + /** + * Return the {@link JmsHeaders#PRIORITY}. + * @see JmsHeaders#PRIORITY + */ public Integer getPriority() { return (Integer) getHeader(JmsHeaders.PRIORITY); } + /** + * Return the {@link JmsHeaders#REPLY_TO reply to}. + * @see JmsHeaders#REPLY_TO + */ public Destination getReplyTo() { return (Destination) getHeader(JmsHeaders.REPLY_TO); } + /** + * Return the {@link JmsHeaders#REDELIVERED redelivered} flag. + * @see JmsHeaders#REDELIVERED + */ public Boolean getRedelivered() { return (Boolean) getHeader(JmsHeaders.REDELIVERED); } + /** + * Return the {@link JmsHeaders#TYPE type}. + * @see JmsHeaders#TYPE + */ public String getType() { return (String) getHeader(JmsHeaders.TYPE); } + /** + * Return the {@link JmsHeaders#TIMESTAMP timestamp}. + * @see JmsHeaders#TIMESTAMP + */ public Long getTimestamp() { return (Long) getHeader(JmsHeaders.TIMESTAMP); } diff --git a/spring-jms/src/test/java/org/springframework/jms/annotation/AbstractJmsAnnotationDrivenTests.java b/spring-jms/src/test/java/org/springframework/jms/annotation/AbstractJmsAnnotationDrivenTests.java index 4e7d2b726d4..3745ea0bdc9 100644 --- a/spring-jms/src/test/java/org/springframework/jms/annotation/AbstractJmsAnnotationDrivenTests.java +++ b/spring-jms/src/test/java/org/springframework/jms/annotation/AbstractJmsAnnotationDrivenTests.java @@ -114,7 +114,7 @@ public abstract class AbstractJmsAnnotationDrivenTests { static class FullBean { @JmsListener(id = "listener1", containerFactory = "simpleFactory", destination = "queueIn", - responseDestination = "queueOut", selector = "mySelector", subscription = "mySubscription") + selector = "mySelector", subscription = "mySubscription") public String fullHandle(String msg) { return "reply"; } diff --git a/spring-jms/src/test/java/org/springframework/jms/annotation/JmsListenerAnnotationBeanPostProcessorTests.java b/spring-jms/src/test/java/org/springframework/jms/annotation/JmsListenerAnnotationBeanPostProcessorTests.java index 4d28a7a31ad..58e22c62099 100644 --- a/spring-jms/src/test/java/org/springframework/jms/annotation/JmsListenerAnnotationBeanPostProcessorTests.java +++ b/spring-jms/src/test/java/org/springframework/jms/annotation/JmsListenerAnnotationBeanPostProcessorTests.java @@ -58,7 +58,6 @@ public class JmsListenerAnnotationBeanPostProcessorTests { MethodJmsListenerEndpoint methodEndpoint = (MethodJmsListenerEndpoint) endpoint; assertNotNull(methodEndpoint.getBean()); assertNotNull(methodEndpoint.getMethod()); - assertNull(methodEndpoint.getResponseDestination()); assertTrue(methodEndpoint.isQueue()); assertTrue("Should have been started " + container, container.isStarted()); diff --git a/spring-jms/src/test/java/org/springframework/jms/config/MethodJmsListenerEndpointTests.java b/spring-jms/src/test/java/org/springframework/jms/config/MethodJmsListenerEndpointTests.java index 43bff74e8ef..88231cb01b5 100644 --- a/spring-jms/src/test/java/org/springframework/jms/config/MethodJmsListenerEndpointTests.java +++ b/spring-jms/src/test/java/org/springframework/jms/config/MethodJmsListenerEndpointTests.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.Map; import javax.jms.Destination; +import javax.jms.InvalidDestinationException; import javax.jms.JMSException; import javax.jms.ObjectMessage; import javax.jms.QueueSender; @@ -48,12 +49,14 @@ import org.springframework.jms.listener.adapter.ListenerExecutionFailedException import org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter; import org.springframework.jms.support.JmsMessageHeaderAccessor; import org.springframework.jms.support.converter.JmsHeaders; +import org.springframework.jms.support.destination.DestinationResolver; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.converter.MessageConversionException; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.messaging.handler.annotation.support.MethodArgumentTypeMismatchException; import org.springframework.util.ReflectionUtils; import org.springframework.validation.Errors; @@ -99,7 +102,6 @@ public class MethodJmsListenerEndpointTests { endpoint.setBean(this); endpoint.setMethod(getTestMethod()); endpoint.setJmsHandlerMethodFactory(factory); - endpoint.setResponseDestination("myResponseQueue"); assertNotNull(endpoint.createMessageListener(container)); } @@ -217,6 +219,56 @@ public class MethodJmsListenerEndpointTests { verify(queueSender).close(); } + @Test + public void processAndReplyWithSendTo() throws JMSException { + MessagingMessageListenerAdapter listener = createDefaultInstance(String.class); + String body = "echo text"; + String correlationId = "link-1234"; + Destination replyDestination = new Destination() {}; + + DestinationResolver destinationResolver = mock(DestinationResolver.class); + TextMessage reply = mock(TextMessage.class); + QueueSender queueSender = mock(QueueSender.class); + Session session = mock(Session.class); + + given(destinationResolver.resolveDestinationName(session, "replyDestination", false)) + .willReturn(replyDestination); + given(session.createTextMessage(body)).willReturn(reply); + given(session.createProducer(replyDestination)).willReturn(queueSender); + + listener.setDestinationResolver(destinationResolver); + StubTextMessage inputMessage = createSimpleJmsTextMessage(body); + inputMessage.setJMSCorrelationID(correlationId); + listener.onMessage(inputMessage, session); + assertDefaultListenerMethodInvocation(); + + verify(destinationResolver).resolveDestinationName(session, "replyDestination", false); + verify(reply).setJMSCorrelationID(correlationId); + verify(queueSender).send(reply); + verify(queueSender).close(); + } + + @Test + public void emptySendTo() throws JMSException { + MessagingMessageListenerAdapter listener = createDefaultInstance(String.class); + + TextMessage reply = mock(TextMessage.class); + Session session = mock(Session.class); + given(session.createTextMessage("content")).willReturn(reply); + + thrown.expect(ListenerExecutionFailedException.class); + thrown.expectCause(Matchers.isA(InvalidDestinationException.class)); + listener.onMessage(createSimpleJmsTextMessage("content"), session); + } + + @Test + public void invalidSendTo() { + thrown.expect(IllegalStateException.class); + thrown.expectMessage("firstDestination"); + thrown.expectMessage("secondDestination"); + createDefaultInstance(String.class); + } + @Test public void validatePayloadValid() throws JMSException { String methodName = "validatePayload"; @@ -394,6 +446,24 @@ public class MethodJmsListenerEndpointTests { return content; } + @SendTo("replyDestination") + public String processAndReplyWithSendTo(String content) { + invocations.put("processAndReplyWithSendTo", true); + return content; + } + + @SendTo("") + public String emptySendTo(String content) { + invocations.put("emptySendTo", true); + return content; + } + + @SendTo({"firstDestination", "secondDestination"}) + public String invalidSendTo(String content) { + invocations.put("invalidSendTo", true); + return content; + } + public void validatePayload(@Validated String payload) { invocations.put("validatePayload", true); } diff --git a/spring-jms/src/test/java/org/springframework/jms/support/JmsMessageHeaderAccessorTests.java b/spring-jms/src/test/java/org/springframework/jms/support/JmsMessageHeaderAccessorTests.java index c89056d77bd..9e4b7133212 100644 --- a/spring-jms/src/test/java/org/springframework/jms/support/JmsMessageHeaderAccessorTests.java +++ b/spring-jms/src/test/java/org/springframework/jms/support/JmsMessageHeaderAccessorTests.java @@ -65,9 +65,12 @@ public class JmsMessageHeaderAccessorTests { assertEquals("abcd-1234", headerAccessor.getMessageId()); assertEquals(Integer.valueOf(9), headerAccessor.getPriority()); assertEquals(replyTo, headerAccessor.getReplyTo()); - assertEquals(replyTo, headerAccessor.getReplyChannel()); assertEquals(true, headerAccessor.getRedelivered()); assertEquals("type", headerAccessor.getType()); assertEquals(4567L, headerAccessor.getTimestamp(), 0.0); + + // Making sure replyChannel is not mixed with replyTo + assertNull(headerAccessor.getReplyChannel()); + } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/SendTo.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/SendTo.java index 8cfc90dcef7..ee4ce4152e3 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/SendTo.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/SendTo.java @@ -1,11 +1,11 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -26,7 +26,11 @@ import org.springframework.messaging.Message; /** * Annotation that indicates a method's return value should be converted to - * a {@link Message} and sent to the specified destination. + * a {@link Message} if necessary and sent to the specified destination. + * + *

In a typical request/reply scenario, the incoming {@link Message} may + * convey the destination to use for the reply. In that case, that destination + * should take precedence. * * @author Rossen Stoyanchev * @since 4.0