12 changed files with 1509 additions and 146 deletions
@ -0,0 +1,195 @@
@@ -0,0 +1,195 @@
|
||||
/* |
||||
* Copyright 2002-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.jms.core; |
||||
|
||||
import java.util.Map; |
||||
import java.util.Optional; |
||||
|
||||
import jakarta.jms.ConnectionFactory; |
||||
import jakarta.jms.Destination; |
||||
import org.jspecify.annotations.Nullable; |
||||
|
||||
import org.springframework.jms.support.JmsAccessor; |
||||
import org.springframework.messaging.Message; |
||||
import org.springframework.messaging.MessagingException; |
||||
import org.springframework.messaging.converter.MessageConverter; |
||||
import org.springframework.util.Assert; |
||||
|
||||
/** |
||||
* The default implementation of {@link JmsClient}, |
||||
* as created by the static factory methods. |
||||
* |
||||
* @author Juergen Hoeller |
||||
* @since 7.0 |
||||
* @see JmsClient#create(ConnectionFactory) |
||||
* @see JmsClient#create(ConnectionFactory, MessageConverter) |
||||
* @see JmsClient#create(JmsOperations) |
||||
* @see JmsClient#create(JmsOperations, MessageConverter) |
||||
*/ |
||||
class DefaultJmsClient implements JmsClient { |
||||
|
||||
private final JmsOperations jmsTemplate; |
||||
|
||||
private final @Nullable MessageConverter messageConverter; |
||||
|
||||
|
||||
public DefaultJmsClient(ConnectionFactory connectionFactory, @Nullable MessageConverter messageConverter) { |
||||
this.jmsTemplate = new JmsTemplate(connectionFactory); |
||||
this.messageConverter = messageConverter; |
||||
} |
||||
|
||||
public DefaultJmsClient(JmsOperations jmsTemplate, @Nullable MessageConverter messageConverter) { |
||||
Assert.notNull(jmsTemplate, "JmsTemplate must not be null"); |
||||
this.jmsTemplate = jmsTemplate; |
||||
this.messageConverter = messageConverter; |
||||
} |
||||
|
||||
|
||||
public OperationSpec destination(Destination destination) { |
||||
return new DefaultOperationSpec(destination); |
||||
} |
||||
|
||||
public OperationSpec destination(String destinationName) { |
||||
return new DefaultOperationSpec(destinationName); |
||||
} |
||||
|
||||
private JmsMessagingTemplate newDelegate() { |
||||
JmsMessagingTemplate delegate = new JmsMessagingTemplate(DefaultJmsClient.this.jmsTemplate); |
||||
MessageConverter converter = DefaultJmsClient.this.messageConverter; |
||||
if (converter != null) { |
||||
delegate.setMessageConverter(converter); |
||||
} |
||||
return delegate; |
||||
} |
||||
|
||||
|
||||
private class DefaultOperationSpec implements OperationSpec { |
||||
|
||||
private final JmsMessagingTemplate delegate; |
||||
|
||||
private @Nullable JmsTemplate customTemplate; |
||||
|
||||
public DefaultOperationSpec(Destination destination) { |
||||
this.delegate = newDelegate(); |
||||
this.delegate.setDefaultDestination(destination); |
||||
} |
||||
|
||||
public DefaultOperationSpec(String destinationName) { |
||||
this.delegate = newDelegate(); |
||||
this.delegate.setDefaultDestinationName(destinationName); |
||||
} |
||||
|
||||
private JmsTemplate enforceCustomTemplate(boolean qos) { |
||||
if (this.customTemplate == null) { |
||||
JmsOperations jmsOperations = DefaultJmsClient.this.jmsTemplate; |
||||
if (!(jmsOperations instanceof JmsAccessor original)) { |
||||
throw new IllegalStateException( |
||||
"Needs to be bound to a JmsAccessor for custom settings support: " + jmsOperations); |
||||
} |
||||
this.customTemplate = new JmsTemplate(original); |
||||
this.delegate.setJmsTemplate(this.customTemplate); |
||||
} |
||||
if (qos) { |
||||
this.customTemplate.setExplicitQosEnabled(true); |
||||
} |
||||
return this.customTemplate; |
||||
} |
||||
|
||||
@Override |
||||
public OperationSpec withReceiveTimeout(long receiveTimeout) { |
||||
enforceCustomTemplate(false).setReceiveTimeout(receiveTimeout); |
||||
return this; |
||||
} |
||||
|
||||
@Override |
||||
public OperationSpec withDeliveryDelay(long deliveryDelay) { |
||||
enforceCustomTemplate(false).setDeliveryDelay(deliveryDelay); |
||||
return this; |
||||
} |
||||
|
||||
@Override |
||||
public OperationSpec withDeliveryPersistent(boolean persistent) { |
||||
enforceCustomTemplate(true).setDeliveryPersistent(persistent); |
||||
return this; |
||||
} |
||||
|
||||
@Override |
||||
public OperationSpec withPriority(int priority) { |
||||
enforceCustomTemplate(true).setPriority(priority); |
||||
return this; |
||||
} |
||||
|
||||
@Override |
||||
public OperationSpec withTimeToLive(long timeToLive) { |
||||
enforceCustomTemplate(true).setTimeToLive(timeToLive); |
||||
return this; |
||||
} |
||||
|
||||
@Override |
||||
public void send(Message<?> message) throws MessagingException { |
||||
this.delegate.send(message); |
||||
} |
||||
|
||||
@Override |
||||
public void send(Object payload) throws MessagingException { |
||||
this.delegate.convertAndSend(payload); |
||||
} |
||||
|
||||
@Override |
||||
public void send(Object payload, Map<String, Object> headers) throws MessagingException { |
||||
this.delegate.convertAndSend(payload, headers); |
||||
} |
||||
|
||||
@Override |
||||
public Optional<Message<?>> receive() throws MessagingException { |
||||
return Optional.ofNullable(this.delegate.receive()); |
||||
} |
||||
|
||||
@Override |
||||
public <T> Optional<T> receive(Class<T> targetClass) throws MessagingException { |
||||
return Optional.ofNullable(this.delegate.receiveAndConvert(targetClass)); |
||||
} |
||||
|
||||
@Override |
||||
public Optional<Message<?>> receive(String messageSelector) throws MessagingException { |
||||
return Optional.ofNullable(this.delegate.receiveSelected(messageSelector)); |
||||
} |
||||
|
||||
@Override |
||||
public <T> Optional<T> receive(String messageSelector, Class<T> targetClass) throws MessagingException { |
||||
return Optional.ofNullable(this.delegate.receiveSelectedAndConvert(messageSelector, targetClass)); |
||||
} |
||||
|
||||
@Override |
||||
public Optional<Message<?>> sendAndReceive(Message<?> requestMessage) throws MessagingException { |
||||
return Optional.ofNullable(this.delegate.sendAndReceive(requestMessage)); |
||||
} |
||||
|
||||
@Override |
||||
public <T> Optional<T> sendAndReceive(Object request, Class<T> targetClass) throws MessagingException { |
||||
return Optional.ofNullable(this.delegate.convertSendAndReceive(request, targetClass)); |
||||
} |
||||
|
||||
@Override |
||||
public <T> Optional<T> sendAndReceive(Object request, Map<String, Object> headers, Class<T> targetClass) |
||||
throws MessagingException { |
||||
|
||||
return Optional.ofNullable(this.delegate.convertSendAndReceive(request, headers, targetClass)); |
||||
} |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,248 @@
@@ -0,0 +1,248 @@
|
||||
/* |
||||
* Copyright 2002-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.jms.core; |
||||
|
||||
import java.util.Map; |
||||
import java.util.Optional; |
||||
|
||||
import jakarta.jms.ConnectionFactory; |
||||
import jakarta.jms.Destination; |
||||
|
||||
import org.springframework.messaging.Message; |
||||
import org.springframework.messaging.MessagingException; |
||||
import org.springframework.messaging.converter.MessageConverter; |
||||
|
||||
/** |
||||
* A fluent {@code JmsClient} with common send and receive operations against a JMS |
||||
* destination. This is effectively an alternative to {@link JmsMessagingTemplate}, |
||||
* also delegating to Spring's {@link JmsTemplate} for performing actual operations. |
||||
* |
||||
* <p>Note: Operations in this interface throw {@link MessagingException} instead of |
||||
* the JMS-specific {@link org.springframework.jms.JmsException}, aligning with the |
||||
* {@code spring-messaging} module and its other client operation handles. |
||||
* |
||||
* <p>This client provides reusable operation handles which can be configured with |
||||
* custom QoS settings. Note that any use of such explicit settings will override |
||||
* administrative provider settings (see {@link JmsTemplate#setExplicitQosEnabled}). |
||||
* |
||||
* @author Juergen Hoeller |
||||
* @since 7.0 |
||||
* @see JmsTemplate |
||||
* @see JmsMessagingTemplate |
||||
*/ |
||||
interface JmsClient { |
||||
|
||||
/** |
||||
* Provide an operation handle for the given JMS destination. |
||||
* @param destination the JMS {@code Destination} object |
||||
* @return a reusable operation handle bound to the destination |
||||
*/ |
||||
OperationSpec destination(Destination destination); |
||||
|
||||
/** |
||||
* Provide an operation handle for the specified JMS destination. |
||||
* @param destinationName a name resolving to a JMS {@code Destination} |
||||
* @return a reusable operation handle bound to the destination |
||||
* @see org.springframework.jms.support.destination.DestinationResolver |
||||
*/ |
||||
OperationSpec destination(String destinationName); |
||||
|
||||
|
||||
// Static factory methods
|
||||
|
||||
/** |
||||
* Create a new {@code JmsClient} for the given {@link ConnectionFactory}. |
||||
* @param connectionFactory the factory to obtain JMS connections from |
||||
*/ |
||||
static JmsClient create(ConnectionFactory connectionFactory) { |
||||
return new DefaultJmsClient(connectionFactory, null); |
||||
} |
||||
|
||||
/** |
||||
* Create a new {@code JmsClient} for the given {@link ConnectionFactory}. |
||||
* @param connectionFactory the factory to obtain JMS connections from |
||||
* @param messageConverter the message converter for payload objects |
||||
*/ |
||||
static JmsClient create(ConnectionFactory connectionFactory, MessageConverter messageConverter) { |
||||
return new DefaultJmsClient(connectionFactory, messageConverter); |
||||
} |
||||
|
||||
/** |
||||
* Create a new {@code JmsClient} for the given {@link ConnectionFactory}. |
||||
* @param jmsTemplate the {@link JmsTemplate} to use for performing operations |
||||
* (can be a custom {@link JmsOperations} implementation as well) |
||||
*/ |
||||
static JmsClient create(JmsOperations jmsTemplate) { |
||||
return new DefaultJmsClient(jmsTemplate, null); |
||||
} |
||||
|
||||
/** |
||||
* Create a new {@code JmsClient} for the given {@link ConnectionFactory}. |
||||
* @param jmsTemplate the {@link JmsTemplate} to use for performing operations |
||||
* (can be a custom {@link JmsOperations} implementation as well) |
||||
* @param messageConverter the message converter for payload objects |
||||
*/ |
||||
static JmsClient create(JmsOperations jmsTemplate, MessageConverter messageConverter) { |
||||
return new DefaultJmsClient(jmsTemplate, messageConverter); |
||||
} |
||||
|
||||
|
||||
/** |
||||
* Common JMS send and receive operations with various settings. |
||||
*/ |
||||
interface OperationSpec { |
||||
|
||||
/** |
||||
* Apply the given timeout to any subsequent receive operations. |
||||
* @param receiveTimeout the timeout in milliseconds |
||||
* @see JmsTemplate#setReceiveTimeout |
||||
*/ |
||||
OperationSpec withReceiveTimeout(long receiveTimeout); |
||||
|
||||
/** |
||||
* Apply the given delivery delay to any subsequent send operations. |
||||
* @param deliveryDelay the delay in milliseconds |
||||
* @see JmsTemplate#setDeliveryDelay |
||||
*/ |
||||
OperationSpec withDeliveryDelay(long deliveryDelay); |
||||
|
||||
/** |
||||
* Set whether message delivery should be persistent or non-persistent. |
||||
* @param persistent to choose between delivery mode "PERSISTENT" |
||||
* ({@code true}) or "NON_PERSISTENT" ({@code false}) |
||||
* @see JmsTemplate#setDeliveryPersistent |
||||
*/ |
||||
OperationSpec withDeliveryPersistent(boolean persistent); |
||||
|
||||
/** |
||||
* Apply the given priority to any subsequent send operations. |
||||
* @param priority the priority value |
||||
* @see JmsTemplate#setPriority |
||||
*/ |
||||
OperationSpec withPriority(int priority); |
||||
|
||||
/** |
||||
* Apply the given time-to-live to any subsequent send operations. |
||||
* @param timeToLive the message lifetime in milliseconds |
||||
* @see JmsTemplate#setTimeToLive |
||||
*/ |
||||
OperationSpec withTimeToLive(long timeToLive); |
||||
|
||||
/** |
||||
* Send the given {@link Message} to the pre-bound destination. |
||||
* @param message the spring-messaging {@link Message} to send |
||||
* @see #withDeliveryDelay |
||||
* @see #withDeliveryPersistent |
||||
* @see #withPriority |
||||
* @see #withTimeToLive |
||||
*/ |
||||
void send(Message<?> message) throws MessagingException; |
||||
|
||||
/** |
||||
* Send a message with the given payload to the pre-bound destination. |
||||
* @param payload the payload to convert into a {@link Message} |
||||
* @see #withDeliveryDelay |
||||
* @see #withDeliveryPersistent |
||||
* @see #withPriority |
||||
* @see #withTimeToLive |
||||
*/ |
||||
void send(Object payload) throws MessagingException; |
||||
|
||||
/** |
||||
* Send a message with the given payload to the pre-bound destination. |
||||
* @param payload the payload to convert into a {@link Message} |
||||
* @param headers the message headers to apply to the {@link Message} |
||||
* @see #withDeliveryDelay |
||||
* @see #withDeliveryPersistent |
||||
* @see #withPriority |
||||
* @see #withTimeToLive |
||||
*/ |
||||
void send(Object payload, Map<String, Object> headers) throws MessagingException; |
||||
|
||||
/** |
||||
* Receive a {@link Message} from the pre-bound destination. |
||||
* @return the spring-messaging {@link Message} received, |
||||
* or {@link Optional#empty()} if none |
||||
* @see #withReceiveTimeout |
||||
*/ |
||||
Optional<Message<?>> receive() throws MessagingException; |
||||
|
||||
/** |
||||
* Receive a {@link Message} from the pre-bound destination, |
||||
* extracting and converting its payload. |
||||
* @param targetClass the class to convert the payload to |
||||
* @return the payload of the {@link Message} received, |
||||
* or {@link Optional#empty()} if none |
||||
* @see #withReceiveTimeout |
||||
*/ |
||||
<T> Optional<T> receive(Class<T> targetClass) throws MessagingException; |
||||
|
||||
/** |
||||
* Receive a {@link Message} from the pre-bound destination. |
||||
* @param messageSelector the JMS message selector to apply |
||||
* @return the spring-messaging {@link Message} received, |
||||
* or {@link Optional#empty()} if none |
||||
* @see #withReceiveTimeout |
||||
*/ |
||||
Optional<Message<?>> receive(String messageSelector) throws MessagingException; |
||||
|
||||
/** |
||||
* Receive a {@link Message} from the pre-bound destination, |
||||
* extracting and converting its payload. |
||||
* @param targetClass the class to convert the payload to |
||||
* @return the payload of the {@link Message} received, |
||||
* or {@link Optional#empty()} if none |
||||
* @param messageSelector the JMS message selector to apply |
||||
* @see #withReceiveTimeout |
||||
*/ |
||||
<T> Optional<T> receive(String messageSelector, Class<T> targetClass) throws MessagingException; |
||||
|
||||
/** |
||||
* Send a request message and receive the reply from the given destination. |
||||
* @param requestMessage the spring-messaging {@link Message} to send |
||||
* @return the spring-messaging {@link Message} received as a reply, |
||||
* or {@link Optional#empty()} if none |
||||
* @see #withReceiveTimeout |
||||
*/ |
||||
Optional<Message<?>> sendAndReceive(Message<?> requestMessage) throws MessagingException; |
||||
|
||||
/** |
||||
* Send a request message and receive the reply from the given destination. |
||||
* @param request the payload to convert into a request {@link Message} |
||||
* @param targetClass the class to convert the reply's payload to |
||||
* @return the payload of the {@link Message} received as a reply, |
||||
* or {@link Optional#empty()} if none |
||||
* @see #withTimeToLive |
||||
* @see #withReceiveTimeout |
||||
*/ |
||||
<T> Optional<T> sendAndReceive(Object request, Class<T> targetClass) throws MessagingException; |
||||
|
||||
/** |
||||
* Send a request message and receive the reply from the given destination. |
||||
* @param request the payload to convert into a request {@link Message} |
||||
* @param headers the message headers to apply to the request {@link Message} |
||||
* @param targetClass the class to convert the reply's payload to |
||||
* @return the payload of the {@link Message} received as a reply, |
||||
* or {@link Optional#empty()} if none |
||||
* @see #withTimeToLive |
||||
* @see #withReceiveTimeout |
||||
*/ |
||||
<T> Optional<T> sendAndReceive(Object request, Map<String, Object> headers, Class<T> targetClass) |
||||
throws MessagingException; |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,513 @@
@@ -0,0 +1,513 @@
|
||||
/* |
||||
* Copyright 2002-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.jms.core; |
||||
|
||||
import java.io.Writer; |
||||
import java.util.HashMap; |
||||
import java.util.Map; |
||||
|
||||
import jakarta.jms.Connection; |
||||
import jakarta.jms.ConnectionFactory; |
||||
import jakarta.jms.DeliveryMode; |
||||
import jakarta.jms.Destination; |
||||
import jakarta.jms.JMSException; |
||||
import jakarta.jms.MessageConsumer; |
||||
import jakarta.jms.MessageProducer; |
||||
import jakarta.jms.Queue; |
||||
import jakarta.jms.Session; |
||||
import jakarta.jms.TextMessage; |
||||
import org.junit.jupiter.api.BeforeEach; |
||||
import org.junit.jupiter.api.Test; |
||||
import org.junit.jupiter.api.extension.ExtendWith; |
||||
import org.mockito.ArgumentCaptor; |
||||
import org.mockito.Captor; |
||||
import org.mockito.Mock; |
||||
import org.mockito.junit.jupiter.MockitoExtension; |
||||
import org.mockito.stubbing.Answer; |
||||
|
||||
import org.springframework.jms.InvalidDestinationException; |
||||
import org.springframework.jms.MessageNotReadableException; |
||||
import org.springframework.jms.StubTextMessage; |
||||
import org.springframework.jms.support.destination.DestinationResolutionException; |
||||
import org.springframework.messaging.Message; |
||||
import org.springframework.messaging.MessagingException; |
||||
import org.springframework.messaging.converter.GenericMessageConverter; |
||||
import org.springframework.messaging.support.MessageBuilder; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
import static org.assertj.core.api.Assertions.assertThatExceptionOfType; |
||||
import static org.mockito.ArgumentMatchers.any; |
||||
import static org.mockito.ArgumentMatchers.eq; |
||||
import static org.mockito.BDDMockito.given; |
||||
import static org.mockito.BDDMockito.willThrow; |
||||
import static org.mockito.Mockito.mock; |
||||
import static org.mockito.Mockito.times; |
||||
import static org.mockito.Mockito.verify; |
||||
|
||||
/** |
||||
* Tests for {@link JmsClient}. |
||||
* |
||||
* @author Juergen Hoeller |
||||
* @since 7.0 |
||||
*/ |
||||
@ExtendWith(MockitoExtension.class) |
||||
class JmsClientTests { |
||||
|
||||
@Captor |
||||
private ArgumentCaptor<MessageCreator> messageCreator; |
||||
|
||||
@Mock |
||||
private JmsTemplate jmsTemplate; |
||||
|
||||
private JmsClient jmsClient; |
||||
|
||||
|
||||
@BeforeEach |
||||
void setup() { |
||||
this.jmsClient = JmsClient.create(this.jmsTemplate); |
||||
} |
||||
|
||||
@Test |
||||
void send() { |
||||
Destination destination = new Destination() {}; |
||||
Message<String> message = createTextMessage(); |
||||
|
||||
this.jmsClient.destination(destination).send(message); |
||||
verify(this.jmsTemplate).send(eq(destination), this.messageCreator.capture()); |
||||
assertTextMessage(this.messageCreator.getValue()); |
||||
} |
||||
|
||||
@Test |
||||
void sendName() { |
||||
Message<String> message = createTextMessage(); |
||||
|
||||
this.jmsClient.destination("myQueue").send(message); |
||||
verify(this.jmsTemplate).send(eq("myQueue"), this.messageCreator.capture()); |
||||
assertTextMessage(this.messageCreator.getValue()); |
||||
} |
||||
|
||||
@Test |
||||
void convertAndSendPayload() throws JMSException { |
||||
Destination destination = new Destination() {}; |
||||
|
||||
this.jmsClient.destination(destination).send("my Payload"); |
||||
verify(this.jmsTemplate).send(eq(destination), this.messageCreator.capture()); |
||||
TextMessage textMessage = createTextMessage(this.messageCreator.getValue()); |
||||
assertThat(textMessage.getText()).isEqualTo("my Payload"); |
||||
} |
||||
|
||||
@Test |
||||
void convertAndSendPayloadName() throws JMSException { |
||||
this.jmsClient.destination("myQueue").send("my Payload"); |
||||
verify(this.jmsTemplate).send(eq("myQueue"), this.messageCreator.capture()); |
||||
TextMessage textMessage = createTextMessage(this.messageCreator.getValue()); |
||||
assertThat(textMessage.getText()).isEqualTo("my Payload"); |
||||
} |
||||
|
||||
@Test |
||||
void convertAndSendPayloadAndHeaders() { |
||||
Destination destination = new Destination() {}; |
||||
Map<String, Object> headers = new HashMap<>(); |
||||
headers.put("foo", "bar"); |
||||
|
||||
this.jmsClient.destination(destination).send("Hello", headers); |
||||
verify(this.jmsTemplate).send(eq(destination), this.messageCreator.capture()); |
||||
assertTextMessage(this.messageCreator.getValue()); // see createTextMessage
|
||||
} |
||||
|
||||
@Test |
||||
void convertAndSendPayloadAndHeadersName() { |
||||
Map<String, Object> headers = new HashMap<>(); |
||||
headers.put("foo", "bar"); |
||||
|
||||
this.jmsClient.destination("myQueue").send("Hello", headers); |
||||
verify(this.jmsTemplate).send(eq("myQueue"), this.messageCreator.capture()); |
||||
assertTextMessage(this.messageCreator.getValue()); // see createTextMessage
|
||||
} |
||||
|
||||
@Test |
||||
void receive() { |
||||
Destination destination = new Destination() {}; |
||||
jakarta.jms.Message jmsMessage = createJmsTextMessage(); |
||||
given(this.jmsTemplate.receive(destination)).willReturn(jmsMessage); |
||||
|
||||
Message<?> message = this.jmsClient.destination(destination).receive().get(); |
||||
verify(this.jmsTemplate).receive(destination); |
||||
assertTextMessage(message); |
||||
} |
||||
|
||||
@Test |
||||
void receiveName() { |
||||
jakarta.jms.Message jmsMessage = createJmsTextMessage(); |
||||
given(this.jmsTemplate.receive("myQueue")).willReturn(jmsMessage); |
||||
|
||||
Message<?> message = this.jmsClient.destination("myQueue").receive().get(); |
||||
verify(this.jmsTemplate).receive("myQueue"); |
||||
assertTextMessage(message); |
||||
} |
||||
|
||||
@Test |
||||
void receiveSelected() { |
||||
Destination destination = new Destination() {}; |
||||
jakarta.jms.Message jmsMessage = createJmsTextMessage(); |
||||
given(this.jmsTemplate.receiveSelected(destination, "selector")).willReturn(jmsMessage); |
||||
|
||||
Message<?> message = this.jmsClient.destination(destination).receive("selector").get(); |
||||
verify(this.jmsTemplate).receiveSelected(destination, "selector"); |
||||
assertTextMessage(message); |
||||
} |
||||
|
||||
@Test |
||||
void receiveSelectedName() { |
||||
jakarta.jms.Message jmsMessage = createJmsTextMessage(); |
||||
given(this.jmsTemplate.receiveSelected("myQueue", "selector")).willReturn(jmsMessage); |
||||
|
||||
Message<?> message = this.jmsClient.destination("myQueue").receive("selector").get(); |
||||
verify(this.jmsTemplate).receiveSelected("myQueue", "selector"); |
||||
assertTextMessage(message); |
||||
} |
||||
|
||||
@Test |
||||
void receiveAndConvert() { |
||||
Destination destination = new Destination() {}; |
||||
jakarta.jms.Message jmsMessage = createJmsTextMessage("my Payload"); |
||||
given(this.jmsTemplate.receive(destination)).willReturn(jmsMessage); |
||||
|
||||
String payload = this.jmsClient.destination(destination).receive(String.class).get(); |
||||
assertThat(payload).isEqualTo("my Payload"); |
||||
verify(this.jmsTemplate).receive(destination); |
||||
} |
||||
|
||||
@Test |
||||
void receiveAndConvertName() { |
||||
jakarta.jms.Message jmsMessage = createJmsTextMessage("my Payload"); |
||||
given(this.jmsTemplate.receive("myQueue")).willReturn(jmsMessage); |
||||
|
||||
String payload = this.jmsClient.destination("myQueue").receive(String.class).get(); |
||||
assertThat(payload).isEqualTo("my Payload"); |
||||
verify(this.jmsTemplate).receive("myQueue"); |
||||
} |
||||
|
||||
@Test |
||||
void receiveAndConvertWithConversion() { |
||||
jakarta.jms.Message jmsMessage = createJmsTextMessage("123"); |
||||
given(this.jmsTemplate.receive("myQueue")).willReturn(jmsMessage); |
||||
|
||||
this.jmsClient = JmsClient.create(this.jmsTemplate, new GenericMessageConverter()); |
||||
|
||||
Integer payload = this.jmsClient.destination("myQueue").receive(Integer.class).get(); |
||||
assertThat(payload).isEqualTo(Integer.valueOf(123)); |
||||
verify(this.jmsTemplate).receive("myQueue"); |
||||
} |
||||
|
||||
@Test |
||||
void receiveAndConvertNoConverter() { |
||||
jakarta.jms.Message jmsMessage = createJmsTextMessage("Hello"); |
||||
given(this.jmsTemplate.receive("myQueue")).willReturn(jmsMessage); |
||||
|
||||
assertThatExceptionOfType(org.springframework.messaging.converter.MessageConversionException.class).isThrownBy(() -> |
||||
this.jmsClient.destination("myQueue").receive(Writer.class)); |
||||
} |
||||
|
||||
@Test |
||||
void receiveAndConvertNoInput() { |
||||
given(this.jmsTemplate.receive("myQueue")).willReturn(null); |
||||
|
||||
assertThat(this.jmsClient.destination("myQueue").receive(String.class)).isEmpty(); |
||||
} |
||||
|
||||
@Test |
||||
void receiveSelectedAndConvert() { |
||||
Destination destination = new Destination() {}; |
||||
jakarta.jms.Message jmsMessage = createJmsTextMessage("my Payload"); |
||||
given(this.jmsTemplate.receiveSelected(destination, "selector")).willReturn(jmsMessage); |
||||
|
||||
String payload = this.jmsClient.destination(destination).receive("selector", String.class).get(); |
||||
assertThat(payload).isEqualTo("my Payload"); |
||||
verify(this.jmsTemplate).receiveSelected(destination, "selector"); |
||||
} |
||||
|
||||
@Test |
||||
void receiveSelectedAndConvertName() { |
||||
jakarta.jms.Message jmsMessage = createJmsTextMessage("my Payload"); |
||||
given(this.jmsTemplate.receiveSelected("myQueue", "selector")).willReturn(jmsMessage); |
||||
|
||||
String payload = this.jmsClient.destination("myQueue").receive("selector", String.class).get(); |
||||
assertThat(payload).isEqualTo("my Payload"); |
||||
verify(this.jmsTemplate).receiveSelected("myQueue", "selector"); |
||||
} |
||||
|
||||
@Test |
||||
void receiveSelectedAndConvertWithConversion() { |
||||
jakarta.jms.Message jmsMessage = createJmsTextMessage("123"); |
||||
given(this.jmsTemplate.receiveSelected("myQueue", "selector")).willReturn(jmsMessage); |
||||
|
||||
this.jmsClient = JmsClient.create(this.jmsTemplate, new GenericMessageConverter()); |
||||
|
||||
Integer payload = this.jmsClient.destination("myQueue").receive("selector", Integer.class).get(); |
||||
assertThat(payload).isEqualTo(Integer.valueOf(123)); |
||||
verify(this.jmsTemplate).receiveSelected("myQueue", "selector"); |
||||
} |
||||
|
||||
@Test |
||||
void receiveSelectedAndConvertNoConverter() { |
||||
jakarta.jms.Message jmsMessage = createJmsTextMessage("Hello"); |
||||
given(this.jmsTemplate.receiveSelected("myQueue", "selector")).willReturn(jmsMessage); |
||||
|
||||
assertThatExceptionOfType(org.springframework.messaging.converter.MessageConversionException.class).isThrownBy(() -> |
||||
this.jmsClient.destination("myQueue").receive("selector", Writer.class)); |
||||
} |
||||
|
||||
@Test |
||||
void receiveSelectedAndConvertNoInput() { |
||||
given(this.jmsTemplate.receiveSelected("myQueue", "selector")).willReturn(null); |
||||
|
||||
assertThat(this.jmsClient.destination("myQueue").receive("selector", String.class)).isEmpty(); |
||||
} |
||||
|
||||
@Test |
||||
void sendAndReceive() { |
||||
Destination destination = new Destination() {}; |
||||
Message<String> request = createTextMessage(); |
||||
jakarta.jms.Message replyJmsMessage = createJmsTextMessage(); |
||||
given(this.jmsTemplate.sendAndReceive(eq(destination), any())).willReturn(replyJmsMessage); |
||||
|
||||
Message<?> actual = this.jmsClient.destination(destination).sendAndReceive(request).get(); |
||||
verify(this.jmsTemplate, times(1)).sendAndReceive(eq(destination), any()); |
||||
assertTextMessage(actual); |
||||
} |
||||
|
||||
@Test |
||||
void sendAndReceiveName() { |
||||
Message<String> request = createTextMessage(); |
||||
jakarta.jms.Message replyJmsMessage = createJmsTextMessage(); |
||||
given(this.jmsTemplate.sendAndReceive(eq("myQueue"), any())).willReturn(replyJmsMessage); |
||||
|
||||
Message<?> actual = this.jmsClient.destination("myQueue").sendAndReceive(request).get(); |
||||
verify(this.jmsTemplate, times(1)).sendAndReceive(eq("myQueue"), any()); |
||||
assertTextMessage(actual); |
||||
} |
||||
|
||||
@Test |
||||
void convertSendAndReceivePayload() { |
||||
Destination destination = new Destination() {}; |
||||
jakarta.jms.Message replyJmsMessage = createJmsTextMessage("My reply"); |
||||
given(this.jmsTemplate.sendAndReceive(eq(destination), any())).willReturn(replyJmsMessage); |
||||
|
||||
String reply = this.jmsClient.destination(destination).sendAndReceive("my Payload", String.class).get(); |
||||
verify(this.jmsTemplate, times(1)).sendAndReceive(eq(destination), any()); |
||||
assertThat(reply).isEqualTo("My reply"); |
||||
} |
||||
|
||||
@Test |
||||
void convertSendAndReceivePayloadName() { |
||||
jakarta.jms.Message replyJmsMessage = createJmsTextMessage("My reply"); |
||||
given(this.jmsTemplate.sendAndReceive(eq("myQueue"), any())).willReturn(replyJmsMessage); |
||||
|
||||
String reply = this.jmsClient.destination("myQueue").sendAndReceive("my Payload", String.class).get(); |
||||
verify(this.jmsTemplate, times(1)).sendAndReceive(eq("myQueue"), any()); |
||||
assertThat(reply).isEqualTo("My reply"); |
||||
} |
||||
|
||||
@Test |
||||
void convertMessageNotReadableException() { |
||||
willThrow(MessageNotReadableException.class).given(this.jmsTemplate).receive("myQueue"); |
||||
|
||||
assertThatExceptionOfType(MessagingException.class).isThrownBy(() -> |
||||
this.jmsClient.destination("myQueue").receive()); |
||||
} |
||||
|
||||
@Test |
||||
void convertDestinationResolutionExceptionOnSend() { |
||||
Destination destination = new Destination() {}; |
||||
willThrow(DestinationResolutionException.class).given(this.jmsTemplate).send(eq(destination), any()); |
||||
|
||||
assertThatExceptionOfType(org.springframework.messaging.core.DestinationResolutionException.class).isThrownBy(() -> |
||||
this.jmsClient.destination(destination).send(createTextMessage())); |
||||
} |
||||
|
||||
@Test |
||||
void convertDestinationResolutionExceptionOnReceive() { |
||||
Destination destination = new Destination() {}; |
||||
willThrow(DestinationResolutionException.class).given(this.jmsTemplate).receive(destination); |
||||
|
||||
assertThatExceptionOfType(org.springframework.messaging.core.DestinationResolutionException.class).isThrownBy(() -> |
||||
this.jmsClient.destination(destination).receive()); |
||||
} |
||||
|
||||
@Test |
||||
void convertInvalidDestinationExceptionOnSendAndReceiveWithName() { |
||||
willThrow(InvalidDestinationException.class).given(this.jmsTemplate).sendAndReceive(eq("unknownQueue"), any()); |
||||
|
||||
assertThatExceptionOfType(org.springframework.messaging.core.DestinationResolutionException.class).isThrownBy(() -> |
||||
this.jmsClient.destination("unknownQueue").sendAndReceive(createTextMessage())); |
||||
} |
||||
|
||||
@Test |
||||
void convertInvalidDestinationExceptionOnSendAndReceive() { |
||||
Destination destination = new Destination() {}; |
||||
willThrow(InvalidDestinationException.class).given(this.jmsTemplate).sendAndReceive(eq(destination), any()); |
||||
|
||||
assertThatExceptionOfType(org.springframework.messaging.core.DestinationResolutionException.class).isThrownBy(() -> |
||||
this.jmsClient.destination(destination).sendAndReceive(createTextMessage())); |
||||
} |
||||
|
||||
@Test |
||||
void sendWithDefaults() throws Exception { |
||||
ConnectionFactory connectionFactory = mock(); |
||||
Connection connection = mock(); |
||||
Session session = mock(); |
||||
Queue queue = mock(); |
||||
MessageProducer messageProducer = mock(); |
||||
TextMessage textMessage = mock(); |
||||
|
||||
given(connectionFactory.createConnection()).willReturn(connection); |
||||
given(connection.createSession(false, Session.AUTO_ACKNOWLEDGE)).willReturn(session); |
||||
given(session.createProducer(queue)).willReturn(messageProducer); |
||||
given(session.createTextMessage("just testing")).willReturn(textMessage); |
||||
|
||||
JmsClient.create(connectionFactory).destination(queue) |
||||
.send("just testing"); |
||||
|
||||
verify(messageProducer).send(textMessage); |
||||
verify(messageProducer).close(); |
||||
verify(session).close(); |
||||
verify(connection).close(); |
||||
} |
||||
|
||||
@Test |
||||
void sendWithCustomSettings() throws Exception { |
||||
ConnectionFactory connectionFactory = mock(); |
||||
Connection connection = mock(); |
||||
Session session = mock(); |
||||
Queue queue = mock(); |
||||
MessageProducer messageProducer = mock(); |
||||
TextMessage textMessage = mock(); |
||||
|
||||
given(connectionFactory.createConnection()).willReturn(connection); |
||||
given(connection.createSession(false, Session.AUTO_ACKNOWLEDGE)).willReturn(session); |
||||
given(session.createProducer(queue)).willReturn(messageProducer); |
||||
given(session.createTextMessage("just testing")).willReturn(textMessage); |
||||
|
||||
JmsClient.create(connectionFactory).destination(queue) |
||||
.withDeliveryDelay(0).withDeliveryPersistent(false).withPriority(2).withTimeToLive(3) |
||||
.send("just testing"); |
||||
|
||||
verify(messageProducer).setDeliveryDelay(0); |
||||
verify(messageProducer).send(textMessage, DeliveryMode.NON_PERSISTENT, 2, 3); |
||||
verify(messageProducer).close(); |
||||
verify(session).close(); |
||||
verify(connection).close(); |
||||
} |
||||
|
||||
@Test |
||||
void receiveWithDefaults() throws Exception { |
||||
ConnectionFactory connectionFactory = mock(); |
||||
Connection connection = mock(); |
||||
Session session = mock(); |
||||
Queue queue = mock(); |
||||
MessageConsumer messageConsumer = mock(); |
||||
TextMessage textMessage = mock(); |
||||
|
||||
given(connectionFactory.createConnection()).willReturn(connection); |
||||
given(connection.createSession(false, Session.AUTO_ACKNOWLEDGE)).willReturn(session); |
||||
given(session.createConsumer(queue, null)).willReturn(messageConsumer); |
||||
given(messageConsumer.receive()).willReturn(textMessage); |
||||
given(textMessage.getText()).willReturn("Hello World!"); |
||||
|
||||
String result = JmsClient.create(connectionFactory).destination(queue) |
||||
.receive(String.class).get(); |
||||
assertThat(result).isEqualTo("Hello World!"); |
||||
|
||||
verify(connection).start(); |
||||
verify(messageConsumer).close(); |
||||
verify(session).close(); |
||||
verify(connection).close(); |
||||
} |
||||
|
||||
@Test |
||||
void receiveWithCustomTimeout() throws Exception { |
||||
ConnectionFactory connectionFactory = mock(); |
||||
Connection connection = mock(); |
||||
Session session = mock(); |
||||
Queue queue = mock(); |
||||
MessageConsumer messageConsumer = mock(); |
||||
TextMessage textMessage = mock(); |
||||
|
||||
given(connectionFactory.createConnection()).willReturn(connection); |
||||
given(connection.createSession(false, Session.AUTO_ACKNOWLEDGE)).willReturn(session); |
||||
given(session.createConsumer(queue, null)).willReturn(messageConsumer); |
||||
given(messageConsumer.receive(10)).willReturn(textMessage); |
||||
given(textMessage.getText()).willReturn("Hello World!"); |
||||
|
||||
String result = JmsClient.create(connectionFactory).destination(queue) |
||||
.withReceiveTimeout(10).receive(String.class).get(); |
||||
assertThat(result).isEqualTo("Hello World!"); |
||||
|
||||
verify(connection).start(); |
||||
verify(messageConsumer).close(); |
||||
verify(session).close(); |
||||
verify(connection).close(); |
||||
} |
||||
|
||||
|
||||
private Message<String> createTextMessage(String payload) { |
||||
return MessageBuilder.withPayload(payload).setHeader("foo", "bar").build(); |
||||
} |
||||
|
||||
private Message<String> createTextMessage() { |
||||
return createTextMessage("Hello"); |
||||
} |
||||
|
||||
private jakarta.jms.Message createJmsTextMessage(String payload) { |
||||
StubTextMessage jmsMessage = new StubTextMessage(payload); |
||||
jmsMessage.setStringProperty("foo", "bar"); |
||||
return jmsMessage; |
||||
} |
||||
|
||||
private jakarta.jms.Message createJmsTextMessage() { |
||||
return createJmsTextMessage("Hello"); |
||||
} |
||||
|
||||
private void assertTextMessage(MessageCreator messageCreator) { |
||||
try { |
||||
TextMessage jmsMessage = createTextMessage(messageCreator); |
||||
assertThat(jmsMessage.getText()).as("Wrong body message").isEqualTo("Hello"); |
||||
assertThat(jmsMessage.getStringProperty("foo")).as("Invalid foo property").isEqualTo("bar"); |
||||
} |
||||
catch (JMSException e) { |
||||
throw new IllegalStateException("Wrong text message", e); |
||||
} |
||||
} |
||||
|
||||
private void assertTextMessage(Message<?> message) { |
||||
assertThat(message).as("message should not be null").isNotNull(); |
||||
assertThat(message.getPayload()).as("Wrong payload").isEqualTo("Hello"); |
||||
assertThat(message.getHeaders().get("foo")).as("Invalid foo property").isEqualTo("bar"); |
||||
} |
||||
|
||||
protected TextMessage createTextMessage(MessageCreator creator) throws JMSException { |
||||
Session mock = mock(); |
||||
given(mock.createTextMessage(any())).willAnswer( |
||||
(Answer<TextMessage>) invocation -> |
||||
new StubTextMessage((String) invocation.getArguments()[0])); |
||||
jakarta.jms.Message message = creator.createMessage(mock); |
||||
verify(mock).createTextMessage(any()); |
||||
return (TextMessage) message; |
||||
} |
||||
|
||||
} |
||||
Loading…
Reference in new issue