Browse Source
Prior to this commit, the `JmsTemplate` would use `MessagePostProcessor` for mutating JMS messages before they are being sent, but only if the method takes a post processor as an argument. The main use case so far is to mutate messages after they've been created by a `MessageConverter` from a payload. This commit updates the `JmsClient` to use `MessagePostProcessor` more broadly, for all outgoing messages (converted or not). This brings an interception-like mechanism for clients to enrich the message before being sent. This change also updates the `JmsClient` static factories and introduces a Builder, allowing for more configuration options: multiple message converters and message post processors. Closes gh-35271pull/34146/merge
11 changed files with 495 additions and 106 deletions
@ -0,0 +1,48 @@ |
|||||||
|
/* |
||||||
|
* Copyright 2002-2024 the original author or authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.springframework.docs.integration.jms.jmssending; |
||||||
|
|
||||||
|
import jakarta.jms.ConnectionFactory; |
||||||
|
import jakarta.jms.JMSException; |
||||||
|
import jakarta.jms.Message; |
||||||
|
import jakarta.jms.Queue; |
||||||
|
import jakarta.jms.Session; |
||||||
|
|
||||||
|
import org.springframework.jms.core.MessageCreator; |
||||||
|
import org.springframework.jms.core.JmsTemplate; |
||||||
|
|
||||||
|
public class JmsQueueSender { |
||||||
|
|
||||||
|
private JmsTemplate jmsTemplate; |
||||||
|
private Queue queue; |
||||||
|
|
||||||
|
public void setConnectionFactory(ConnectionFactory cf) { |
||||||
|
this.jmsTemplate = new JmsTemplate(cf); |
||||||
|
} |
||||||
|
|
||||||
|
public void setQueue(Queue queue) { |
||||||
|
this.queue = queue; |
||||||
|
} |
||||||
|
|
||||||
|
public void simpleSend() { |
||||||
|
this.jmsTemplate.send(this.queue, new MessageCreator() { |
||||||
|
public Message createMessage(Session session) throws JMSException { |
||||||
|
return session.createTextMessage("hello queue world"); |
||||||
|
} |
||||||
|
}); |
||||||
|
} |
||||||
|
} |
||||||
@ -0,0 +1,45 @@ |
|||||||
|
/* |
||||||
|
* Copyright 2002-2024 the original author or authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.springframework.docs.integration.jms.jmssendingconversion; |
||||||
|
|
||||||
|
import java.util.HashMap; |
||||||
|
import java.util.Map; |
||||||
|
|
||||||
|
import jakarta.jms.JMSException; |
||||||
|
import jakarta.jms.Message; |
||||||
|
|
||||||
|
import org.springframework.jms.core.JmsTemplate; |
||||||
|
import org.springframework.jms.core.MessagePostProcessor; |
||||||
|
|
||||||
|
public class JmsSenderWithConversion { |
||||||
|
|
||||||
|
private JmsTemplate jmsTemplate; |
||||||
|
|
||||||
|
public void sendWithConversion() { |
||||||
|
Map<String, Object> map = new HashMap<>(); |
||||||
|
map.put("Name", "Mark"); |
||||||
|
map.put("Age", 47); |
||||||
|
jmsTemplate.convertAndSend("testQueue", map, new MessagePostProcessor() { |
||||||
|
public Message postProcessMessage(Message message) throws JMSException { |
||||||
|
message.setIntProperty("AccountID", 1234); |
||||||
|
message.setJMSCorrelationID("123-00001"); |
||||||
|
return message; |
||||||
|
} |
||||||
|
}); |
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
@ -0,0 +1,46 @@ |
|||||||
|
/* |
||||||
|
* Copyright 2002-present the original author or authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.springframework.docs.integration.jms.jmssendingjmsclient; |
||||||
|
|
||||||
|
import jakarta.jms.ConnectionFactory; |
||||||
|
|
||||||
|
import org.springframework.jms.core.JmsClient; |
||||||
|
import org.springframework.messaging.Message; |
||||||
|
import org.springframework.messaging.support.MessageBuilder; |
||||||
|
|
||||||
|
public class JmsClientSample { |
||||||
|
|
||||||
|
private final JmsClient jmsClient; |
||||||
|
|
||||||
|
public JmsClientSample(ConnectionFactory connectionFactory) { |
||||||
|
// For custom options, use JmsClient.builder(ConnectionFactory)
|
||||||
|
this.jmsClient = JmsClient.create(connectionFactory); |
||||||
|
} |
||||||
|
|
||||||
|
public void sendWithConversion() { |
||||||
|
this.jmsClient.destination("myQueue") |
||||||
|
.withTimeToLive(1000) |
||||||
|
.send("myPayload"); // optionally with a headers Map next to the payload
|
||||||
|
} |
||||||
|
|
||||||
|
public void sendCustomMessage() { |
||||||
|
Message<?> message = MessageBuilder.withPayload("myPayload").build(); // optionally with headers
|
||||||
|
this.jmsClient.destination("myQueue") |
||||||
|
.withTimeToLive(1000) |
||||||
|
.send(message); |
||||||
|
} |
||||||
|
} |
||||||
@ -0,0 +1,58 @@ |
|||||||
|
/* |
||||||
|
* Copyright 2002-2024 the original author or authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.springframework.docs.integration.jms.jmssendingpostprocessor; |
||||||
|
|
||||||
|
|
||||||
|
import jakarta.jms.ConnectionFactory; |
||||||
|
|
||||||
|
import org.springframework.jms.core.JmsClient; |
||||||
|
import org.springframework.messaging.Message; |
||||||
|
import org.springframework.messaging.core.MessagePostProcessor; |
||||||
|
import org.springframework.messaging.support.MessageBuilder; |
||||||
|
|
||||||
|
public class JmsClientWithPostProcessor { |
||||||
|
|
||||||
|
private final JmsClient jmsClient; |
||||||
|
|
||||||
|
public JmsClientWithPostProcessor(ConnectionFactory connectionFactory) { |
||||||
|
this.jmsClient = JmsClient.builder(connectionFactory) |
||||||
|
.messagePostProcessor(new TenantIdMessageInterceptor("42")) |
||||||
|
.build(); |
||||||
|
} |
||||||
|
|
||||||
|
public void sendWithPostProcessor() { |
||||||
|
this.jmsClient.destination("myQueue") |
||||||
|
.withTimeToLive(1000) |
||||||
|
.send("myPayload"); |
||||||
|
} |
||||||
|
|
||||||
|
static class TenantIdMessageInterceptor implements MessagePostProcessor { |
||||||
|
|
||||||
|
private final String tenantId; |
||||||
|
|
||||||
|
public TenantIdMessageInterceptor(String tenantId) { |
||||||
|
this.tenantId = tenantId; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public Message<?> postProcessMessage(Message<?> message) { |
||||||
|
return MessageBuilder.fromMessage(message) |
||||||
|
.setHeader("tenantId", this.tenantId) |
||||||
|
.build(); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
@ -0,0 +1,88 @@ |
|||||||
|
/* |
||||||
|
* Copyright 2002-present the original author or authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.springframework.jms.core; |
||||||
|
|
||||||
|
import java.util.ArrayList; |
||||||
|
import java.util.List; |
||||||
|
|
||||||
|
import jakarta.jms.ConnectionFactory; |
||||||
|
import org.jspecify.annotations.Nullable; |
||||||
|
|
||||||
|
import org.springframework.messaging.converter.CompositeMessageConverter; |
||||||
|
import org.springframework.messaging.converter.MessageConverter; |
||||||
|
import org.springframework.messaging.core.CompositeMessagePostProcessor; |
||||||
|
import org.springframework.messaging.core.MessagePostProcessor; |
||||||
|
import org.springframework.util.Assert; |
||||||
|
|
||||||
|
/** |
||||||
|
* Default implementation of {@link JmsClient.Builder}. |
||||||
|
* @author Brian Clozel |
||||||
|
* @since 7.0 |
||||||
|
* @see JmsClient#builder(ConnectionFactory) |
||||||
|
* @see JmsClient#builder(JmsOperations) |
||||||
|
*/ |
||||||
|
class DefaultJmsClientBuilder implements JmsClient.Builder { |
||||||
|
|
||||||
|
private final DefaultJmsClient jmsClient; |
||||||
|
|
||||||
|
private @Nullable List<MessageConverter> messageConverters; |
||||||
|
|
||||||
|
private @Nullable List<MessagePostProcessor> messagePostProcessors; |
||||||
|
|
||||||
|
|
||||||
|
DefaultJmsClientBuilder(ConnectionFactory connectionFactory) { |
||||||
|
Assert.notNull(connectionFactory, "ConnectionFactory must not be null"); |
||||||
|
this.jmsClient = new DefaultJmsClient(connectionFactory); |
||||||
|
} |
||||||
|
|
||||||
|
DefaultJmsClientBuilder(JmsOperations jmsTemplate) { |
||||||
|
Assert.notNull(jmsTemplate, "JmsOperations must not be null"); |
||||||
|
this.jmsClient = new DefaultJmsClient(jmsTemplate); |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public JmsClient.Builder messageConverter(MessageConverter messageConverter) { |
||||||
|
Assert.notNull(messageConverter, "MessageConverter must not be null"); |
||||||
|
if (this.messageConverters == null) { |
||||||
|
this.messageConverters = new ArrayList<>(); |
||||||
|
} |
||||||
|
this.messageConverters.add(messageConverter); |
||||||
|
return this; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public JmsClient.Builder messagePostProcessor(MessagePostProcessor messagePostProcessor) { |
||||||
|
Assert.notNull(messagePostProcessor, "MessagePostProcessor must not be null"); |
||||||
|
if (this.messagePostProcessors == null) { |
||||||
|
this.messagePostProcessors = new ArrayList<>(); |
||||||
|
} |
||||||
|
this.messagePostProcessors.add(messagePostProcessor); |
||||||
|
return this; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public JmsClient build() { |
||||||
|
if (this.messageConverters != null) { |
||||||
|
this.jmsClient.setMessageConverter(new CompositeMessageConverter(this.messageConverters)); |
||||||
|
} |
||||||
|
if (this.messagePostProcessors != null) { |
||||||
|
this.jmsClient.setMessagePostProcessor(new CompositeMessagePostProcessor(this.messagePostProcessors)); |
||||||
|
} |
||||||
|
return this.jmsClient; |
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
@ -0,0 +1,48 @@ |
|||||||
|
/* |
||||||
|
* Copyright 2002-present the original author or authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.springframework.messaging.core; |
||||||
|
|
||||||
|
import java.util.List; |
||||||
|
|
||||||
|
import org.springframework.messaging.Message; |
||||||
|
|
||||||
|
/** |
||||||
|
* Composite {@link MessagePostProcessor} implementation that iterates over |
||||||
|
* a given collection of delegate {@link MessagePostProcessor} instances. |
||||||
|
* @author Brian Clozel |
||||||
|
* @since 7.0 |
||||||
|
*/ |
||||||
|
public class CompositeMessagePostProcessor implements MessagePostProcessor { |
||||||
|
|
||||||
|
private final List<MessagePostProcessor> messagePostProcessors; |
||||||
|
|
||||||
|
/** |
||||||
|
* Construct a CompositeMessagePostProcessor from the given delegate MessagePostProcessors. |
||||||
|
* @param messagePostProcessors the MessagePostProcessors to delegate to |
||||||
|
*/ |
||||||
|
public CompositeMessagePostProcessor(List<MessagePostProcessor> messagePostProcessors) { |
||||||
|
this.messagePostProcessors = messagePostProcessors; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public Message<?> postProcessMessage(Message<?> message) { |
||||||
|
for (MessagePostProcessor messagePostProcessor : this.messagePostProcessors) { |
||||||
|
message = messagePostProcessor.postProcessMessage(message); |
||||||
|
} |
||||||
|
return message; |
||||||
|
} |
||||||
|
} |
||||||
Loading…
Reference in new issue