diff --git a/spring-jms/src/main/java/org/springframework/jms/connection/CachedMessageProducer.java b/spring-jms/src/main/java/org/springframework/jms/connection/CachedMessageProducer.java index 324ca6667d7..fd2a9fb0129 100644 --- a/spring-jms/src/main/java/org/springframework/jms/connection/CachedMessageProducer.java +++ b/spring-jms/src/main/java/org/springframework/jms/connection/CachedMessageProducer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2013 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,10 @@ package org.springframework.jms.connection; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; @@ -25,6 +29,9 @@ import javax.jms.QueueSender; import javax.jms.Topic; import javax.jms.TopicPublisher; +import org.springframework.util.ClassUtils; +import org.springframework.util.ReflectionUtils; + /** * JMS MessageProducer decorator that adapts calls to a shared MessageProducer * instance underneath, managing QoS settings locally within the decorator. @@ -34,12 +41,42 @@ import javax.jms.TopicPublisher; */ class CachedMessageProducer implements MessageProducer, QueueSender, TopicPublisher { + private static final Method setDeliveryDelayMethod = + ClassUtils.getMethodIfAvailable(MessageProducer.class, "setDeliveryDelay", long.class); + + private static final Method getDeliveryDelayMethod = + ClassUtils.getMethodIfAvailable(MessageProducer.class, "getDeliveryDelay"); + + private static Class completionListenerClass; + + private static Method sendWithCompletionListenerMethod; + + private static Method sendWithDestinationAndCompletionListenerMethod; + + static { + try { + completionListenerClass = ClassUtils.forName( + "javax.jms.CompletionListener", CachedMessageProducer.class.getClassLoader()); + sendWithCompletionListenerMethod = MessageProducer.class.getMethod( + "send", Message.class, int.class, int.class, long.class, completionListenerClass); + sendWithDestinationAndCompletionListenerMethod = MessageProducer.class.getMethod( + "send", Destination.class, Message.class, int.class, int.class, long.class, completionListenerClass); + } + catch (Exception ex) { + // No JMS 2.0 API available + completionListenerClass = null; + } + } + + private final MessageProducer target; private Boolean originalDisableMessageID; private Boolean originalDisableMessageTimestamp; + private Long originalDeliveryDelay; + private int deliveryMode; private int priority; @@ -57,7 +94,7 @@ class CachedMessageProducer implements MessageProducer, QueueSender, TopicPublis public void setDisableMessageID(boolean disableMessageID) throws JMSException { if (this.originalDisableMessageID == null) { - this.originalDisableMessageID = Boolean.valueOf(this.target.getDisableMessageID()); + this.originalDisableMessageID = this.target.getDisableMessageID(); } this.target.setDisableMessageID(disableMessageID); } @@ -68,7 +105,7 @@ class CachedMessageProducer implements MessageProducer, QueueSender, TopicPublis public void setDisableMessageTimestamp(boolean disableMessageTimestamp) throws JMSException { if (this.originalDisableMessageTimestamp == null) { - this.originalDisableMessageTimestamp = Boolean.valueOf(this.target.getDisableMessageTimestamp()); + this.originalDisableMessageTimestamp = this.target.getDisableMessageTimestamp(); } this.target.setDisableMessageTimestamp(disableMessageTimestamp); } @@ -77,6 +114,17 @@ class CachedMessageProducer implements MessageProducer, QueueSender, TopicPublis return this.target.getDisableMessageTimestamp(); } + public void setDeliveryDelay(long deliveryDelay) { + if (this.originalDeliveryDelay == null) { + this.originalDeliveryDelay = (Long) ReflectionUtils.invokeMethod(getDeliveryDelayMethod, this.target); + } + ReflectionUtils.invokeMethod(setDeliveryDelayMethod, this.target, deliveryDelay); + } + + public long getDeliveryDelay() { + return (Long) ReflectionUtils.invokeMethod(getDeliveryDelayMethod, this.target); + } + public void setDeliveryMode(int deliveryMode) { this.deliveryMode = deliveryMode; } @@ -156,18 +204,66 @@ class CachedMessageProducer implements MessageProducer, QueueSender, TopicPublis public void close() throws JMSException { // It's a cached MessageProducer... reset properties only. if (this.originalDisableMessageID != null) { - this.target.setDisableMessageID(this.originalDisableMessageID.booleanValue()); + this.target.setDisableMessageID(this.originalDisableMessageID); this.originalDisableMessageID = null; } if (this.originalDisableMessageTimestamp != null) { - this.target.setDisableMessageTimestamp(this.originalDisableMessageTimestamp.booleanValue()); + this.target.setDisableMessageTimestamp(this.originalDisableMessageTimestamp); this.originalDisableMessageTimestamp = null; } + if (this.originalDeliveryDelay != null) { + ReflectionUtils.invokeMethod(setDeliveryDelayMethod, this.target, this.originalDeliveryDelay); + this.originalDeliveryDelay = null; + } } - public String toString() { return "Cached JMS MessageProducer: " + this.target; } + + /** + * Build a dynamic proxy that reflectively adapts to JMS 2.0 API methods, if necessary. + * Otherwise simply return this CachedMessageProducer instance itself. + */ + public MessageProducer getProxyIfNecessary() { + if (completionListenerClass != null) { + return (MessageProducer) Proxy.newProxyInstance(CachedMessageProducer.class.getClassLoader(), + new Class[] {MessageProducer.class, QueueSender.class, TopicPublisher.class}, + new Jms2MessageProducerInvocationHandler()); + } + else { + return this; + } + } + + + /** + * Reflective InvocationHandler which adapts to JMS 2.0 API methods that we + * cannot statically compile against while preserving JMS 1.1 compatibility + * (due to the new {@code javax.jms.CompletionListener} type in the signatures). + */ + private class Jms2MessageProducerInvocationHandler implements InvocationHandler { + + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + try { + if (method.getName().equals("send") && args != null && + completionListenerClass.equals(method.getParameterTypes()[args.length - 1])) { + if (args.length == 2) { + return sendWithCompletionListenerMethod.invoke( + target, args[0], deliveryMode, priority, timeToLive, args[1]); + } + else if (args.length == 3) { + return sendWithDestinationAndCompletionListenerMethod.invoke( + target, args[0], args[1], deliveryMode, priority, timeToLive, args[2]); + } + } + return method.invoke(target, args); + } + catch (InvocationTargetException ex) { + throw ex.getTargetException(); + } + } + } + } diff --git a/spring-jms/src/main/java/org/springframework/jms/connection/CachingConnectionFactory.java b/spring-jms/src/main/java/org/springframework/jms/connection/CachingConnectionFactory.java index 367d8b804a7..368f87586b0 100644 --- a/spring-jms/src/main/java/org/springframework/jms/connection/CachingConnectionFactory.java +++ b/spring-jms/src/main/java/org/springframework/jms/connection/CachingConnectionFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2013 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,7 +40,9 @@ import javax.jms.Topic; import javax.jms.TopicSession; import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; import org.springframework.util.ObjectUtils; +import org.springframework.util.ReflectionUtils; /** * {@link SingleConnectionFactory} subclass that adds {@link javax.jms.Session} @@ -80,6 +82,10 @@ import org.springframework.util.ObjectUtils; */ public class CachingConnectionFactory extends SingleConnectionFactory { + private static final Method createSharedDurableConsumerMethod = ClassUtils.getMethodIfAvailable( + Session.class, "createSharedDurableConsumer", Topic.class, String.class, String.class); + + private int sessionCacheSize = 1; private boolean cacheProducers = true; @@ -333,7 +339,7 @@ public class CachingConnectionFactory extends SingleConnectionFactory { null); } } - else if (methodName.equals("createDurableSubscriber")) { + else if (methodName.equals("createDurableConsumer") || methodName.equals("createDurableSubscriber")) { Destination dest = (Destination) args[0]; if (dest != null) { return getCachedConsumer(dest, @@ -342,6 +348,15 @@ public class CachingConnectionFactory extends SingleConnectionFactory { (String) args[1]); } } + else if (methodName.equals("createSharedDurableConsumer")) { + Destination dest = (Destination) args[0]; + if (dest != null) { + return getCachedConsumer(dest, + (args.length > 2 ? (String) args[2] : null), + null, + (String) args[1]); + } + } } } try { @@ -367,11 +382,11 @@ public class CachingConnectionFactory extends SingleConnectionFactory { } this.cachedProducers.put(cacheKey, producer); } - return new CachedMessageProducer(producer); + return new CachedMessageProducer(producer).getProxyIfNecessary(); } private MessageConsumer getCachedConsumer( - Destination dest, String selector, boolean noLocal, String subscription) throws JMSException { + Destination dest, String selector, Boolean noLocal, String subscription) throws JMSException { ConsumerCacheKey cacheKey = new ConsumerCacheKey(dest, selector, noLocal, subscription); MessageConsumer consumer = this.cachedConsumers.get(cacheKey); @@ -382,9 +397,27 @@ public class CachingConnectionFactory extends SingleConnectionFactory { } else { if (dest instanceof Topic) { - consumer = (subscription != null ? - this.target.createDurableSubscriber((Topic) dest, subscription, selector, noLocal) : - this.target.createConsumer(dest, selector, noLocal)); + if (noLocal == null) { + // createSharedDurableConsumer((Topic) dest, subscription, selector); + try { + consumer = (MessageConsumer) createSharedDurableConsumerMethod.invoke + (this.target, dest, subscription, selector); + } + catch (InvocationTargetException ex) { + if (ex.getTargetException() instanceof JMSException) { + throw (JMSException) ex.getTargetException(); + } + ReflectionUtils.handleInvocationTargetException(ex); + } + catch (IllegalAccessException ex) { + throw new IllegalStateException("Could not access JMS 2.0 API method: " + ex.getMessage()); + } + } + else { + consumer = (subscription != null ? + this.target.createDurableSubscriber((Topic) dest, subscription, selector, noLocal) : + this.target.createConsumer(dest, selector, noLocal)); + } } else { consumer = this.target.createConsumer(dest, selector); @@ -499,11 +532,11 @@ public class CachingConnectionFactory extends SingleConnectionFactory { private final String selector; - private final boolean noLocal; + private final Boolean noLocal; private final String subscription; - public ConsumerCacheKey(Destination destination, String selector, boolean noLocal, String subscription) { + public ConsumerCacheKey(Destination destination, String selector, Boolean noLocal, String subscription) { super(destination); this.selector = selector; this.noLocal = noLocal; @@ -517,7 +550,7 @@ public class CachingConnectionFactory extends SingleConnectionFactory { ConsumerCacheKey otherKey = (ConsumerCacheKey) other; return (destinationEquals(otherKey) && ObjectUtils.nullSafeEquals(this.selector, otherKey.selector) && - this.noLocal == otherKey.noLocal && + ObjectUtils.nullSafeEquals(this.noLocal, otherKey.noLocal) && ObjectUtils.nullSafeEquals(this.subscription, otherKey.subscription)); } } diff --git a/spring-jms/src/main/java/org/springframework/jms/connection/SingleConnectionFactory.java b/spring-jms/src/main/java/org/springframework/jms/connection/SingleConnectionFactory.java index ed298a1c294..ce1669c5aaa 100644 --- a/spring-jms/src/main/java/org/springframework/jms/connection/SingleConnectionFactory.java +++ b/spring-jms/src/main/java/org/springframework/jms/connection/SingleConnectionFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2013 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -528,9 +528,20 @@ public class SingleConnectionFactory } else if (method.getName().equals("createSession") || method.getName().equals("createQueueSession") || method.getName().equals("createTopicSession")) { - boolean transacted = (Boolean) args[0]; - Integer ackMode = (Integer) args[1]; - Integer mode = (transacted ? Session.SESSION_TRANSACTED : ackMode); + // Default: JMS 2.0 createSession() method + Integer mode = Session.AUTO_ACKNOWLEDGE; + if (args != null) { + if (args.length == 1) { + // JMS 2.0 createSession(int) method + mode = (Integer) args[0]; + } + else if (args.length == 2) { + // JMS 1.1 createSession(boolean, int) method + boolean transacted = (Boolean) args[0]; + Integer ackMode = (Integer) args[1]; + mode = (transacted ? Session.SESSION_TRANSACTED : ackMode); + } + } Session session = getSession(this.target, mode); if (session != null) { if (!method.getReturnType().isInstance(session)) { diff --git a/spring-jms/src/main/java/org/springframework/jms/core/JmsTemplate.java b/spring-jms/src/main/java/org/springframework/jms/core/JmsTemplate.java index c2b7ca8a77d..e4bc172ca78 100644 --- a/spring-jms/src/main/java/org/springframework/jms/core/JmsTemplate.java +++ b/spring-jms/src/main/java/org/springframework/jms/core/JmsTemplate.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2013 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package org.springframework.jms.core; +import java.lang.reflect.Method; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; @@ -37,6 +38,8 @@ import org.springframework.jms.support.converter.SimpleMessageConverter; import org.springframework.jms.support.destination.JmsDestinationAccessor; import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; +import org.springframework.util.ReflectionUtils; /** * Helper class that simplifies synchronous JMS access code. @@ -96,6 +99,9 @@ public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations public static final long RECEIVE_TIMEOUT_INDEFINITE_WAIT = 0; + private static final Method setDeliveryDelayMethod = + ClassUtils.getMethodIfAvailable(MessageProducer.class, "setDeliveryDelay", long.class); + /** Internal ResourceFactory adapter for interacting with ConnectionFactoryUtils */ private final JmsTemplateResourceFactory transactionalResourceFactory = new JmsTemplateResourceFactory(); @@ -113,6 +119,8 @@ public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations private long receiveTimeout = RECEIVE_TIMEOUT_INDEFINITE_WAIT; + private long deliveryDelay = 0; + private boolean explicitQosEnabled = false; @@ -321,6 +329,22 @@ public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations return this.receiveTimeout; } + /** + * Set the delivery delay to use for send calls (in milliseconds). + *
The default is 0 (no delivery delay). + * Note that this feature requires JMS 2.0. + */ + public void setDeliveryDelay(long deliveryDelay) { + this.deliveryDelay = deliveryDelay; + } + + /** + * Return the delivery delay to use for send calls (in milliseconds). + */ + public long getDeliveryDelay() { + return this.deliveryDelay; + } + /** * Set if the QOS values (deliveryMode, priority, timeToLive) @@ -585,6 +609,12 @@ public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations * @throws JMSException if thrown by JMS API methods */ protected void doSend(MessageProducer producer, Message message) throws JMSException { + if (this.deliveryDelay > 0) { + if (setDeliveryDelayMethod == null) { + throw new IllegalStateException("setDeliveryDelay requires JMS 2.0"); + } + ReflectionUtils.invokeMethod(setDeliveryDelayMethod, producer, this.deliveryDelay); + } if (isExplicitQosEnabled()) { producer.send(message, getDeliveryMode(), getPriority(), getTimeToLive()); }