From 0bb309f433ce134cdf5f048df9eaa73b5cee5a63 Mon Sep 17 00:00:00 2001 From: Brian Clozel Date: Fri, 19 Jul 2024 17:17:18 +0200 Subject: [PATCH] Instrument @JmsListener session for response messages Prior to this commit, the observation instrumentation for `@JmsListener` annotated methods (implemented in `AbstractMessageListenerContainer` would not instrument the JMS session using the Micrometer JMS support. This means that response messages returned from the listener method would be sent but no observation would be recorded. As a result, tracing message properties would be also missing. This commit ensures that the session provided to the listener method is instrumented beforehand, if Micrometer is on the classpath and an observation registry has been configured. Fixes gh-33221 --- .../ROOT/pages/integration/observability.adoc | 2 ++ .../AbstractMessageListenerContainer.java | 12 ++++++++ ...sageListenerContainerObservationTests.java | 29 +++++++++++++++++++ 3 files changed, 43 insertions(+) diff --git a/framework-docs/modules/ROOT/pages/integration/observability.adoc b/framework-docs/modules/ROOT/pages/integration/observability.adoc index e4208a28d8f..64e5f230ddc 100644 --- a/framework-docs/modules/ROOT/pages/integration/observability.adoc +++ b/framework-docs/modules/ROOT/pages/integration/observability.adoc @@ -162,6 +162,8 @@ include-code::./JmsTemplatePublish[] It uses the `io.micrometer.jakarta9.instrument.jms.DefaultJmsPublishObservationConvention` by default, backed by the `io.micrometer.jakarta9.instrument.jms.JmsPublishObservationContext`. +Similar observations are recorded with `@JmsListener` annotated methods when response messages are returned from the listener method. + [[observability.jms.process]] === JMS message Processing instrumentation diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/AbstractMessageListenerContainer.java b/spring-jms/src/main/java/org/springframework/jms/listener/AbstractMessageListenerContainer.java index 6c0f78b743b..6362a4ba823 100644 --- a/spring-jms/src/main/java/org/springframework/jms/listener/AbstractMessageListenerContainer.java +++ b/spring-jms/src/main/java/org/springframework/jms/listener/AbstractMessageListenerContainer.java @@ -17,6 +17,7 @@ package org.springframework.jms.listener; import io.micrometer.jakarta9.instrument.jms.DefaultJmsProcessObservationConvention; +import io.micrometer.jakarta9.instrument.jms.JmsInstrumentation; import io.micrometer.jakarta9.instrument.jms.JmsObservationDocumentation; import io.micrometer.jakarta9.instrument.jms.JmsProcessObservationContext; import io.micrometer.jakarta9.instrument.jms.JmsProcessObservationConvention; @@ -772,6 +773,9 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen Observation observation = createObservation(message); try { Session sessionToUse = session; + if (micrometerJakartaPresent && this.observationRegistry != null) { + sessionToUse = MicrometerInstrumentation.instrumentSession(sessionToUse, this.observationRegistry); + } if (!isExposeListenerSession()) { // We need to expose a separate Session. conToClose = createConnection(); @@ -992,6 +996,14 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen private static class MessageRejectedWhileStoppingException extends RuntimeException { } + private abstract static class MicrometerInstrumentation { + + static Session instrumentSession(Session session, ObservationRegistry registry) { + return JmsInstrumentation.instrumentSession(session, registry); + } + + } + private abstract static class ObservationFactory { private static final JmsProcessObservationConvention DEFAULT_CONVENTION = new DefaultJmsProcessObservationConvention(); diff --git a/spring-jms/src/test/java/org/springframework/jms/listener/MessageListenerContainerObservationTests.java b/spring-jms/src/test/java/org/springframework/jms/listener/MessageListenerContainerObservationTests.java index 1e175d46532..42109512d57 100644 --- a/spring-jms/src/test/java/org/springframework/jms/listener/MessageListenerContainerObservationTests.java +++ b/spring-jms/src/test/java/org/springframework/jms/listener/MessageListenerContainerObservationTests.java @@ -23,7 +23,9 @@ import java.util.stream.Stream; import io.micrometer.observation.Observation; import io.micrometer.observation.tck.TestObservationRegistry; +import jakarta.jms.Message; import jakarta.jms.MessageListener; +import jakarta.jms.TextMessage; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.junit.EmbeddedActiveMQExtension; import org.assertj.core.api.Assertions; @@ -81,6 +83,33 @@ class MessageListenerContainerObservationTests { listenerContainer.shutdown(); } + @ParameterizedTest(name = "[{index}] {0}") + @MethodSource("listenerContainers") + void shouldRecordJmsPublishObservations(AbstractMessageListenerContainer listenerContainer) throws Exception { + CountDownLatch latch = new CountDownLatch(1); + listenerContainer.setConnectionFactory(connectionFactory); + listenerContainer.setObservationRegistry(registry); + listenerContainer.setDestinationName("spring.test.observation"); + listenerContainer.setMessageListener((SessionAwareMessageListener) (message, session) -> { + Message response = session.createTextMessage("test response"); + session.createProducer(message.getJMSReplyTo()).send(response); + }); + listenerContainer.afterPropertiesSet(); + listenerContainer.start(); + JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); + TextMessage response = (TextMessage) jmsTemplate.sendAndReceive("spring.test.observation", + session -> session.createTextMessage("test request")); + + // request received by listener and response received by template + assertThat(registry).hasNumberOfObservationsWithNameEqualTo("jms.message.process", 2); + // response sent to the template + assertThat(registry).hasNumberOfObservationsWithNameEqualTo("jms.message.publish", 1); + + Assertions.assertThat(response.getText()).isEqualTo("test response"); + listenerContainer.stop(); + listenerContainer.shutdown(); + } + @ParameterizedTest(name = "[{index}] {0}") @MethodSource("listenerContainers") void shouldHaveObservationScopeInErrorHandler(AbstractMessageListenerContainer listenerContainer) throws Exception {