From 84e762b470a2838a3b0c01f776bfaebf0d6bc583 Mon Sep 17 00:00:00 2001 From: Brian Clozel Date: Mon, 21 Oct 2024 16:09:58 +0200 Subject: [PATCH] Fix double accounting of JMS process observations Prior to this commit, the instrumentation of the processing of JMS messages would happen a different levels of the hierarchy, accounting for alli known implementations, `SimpleMessageListenerContainer` and `DefaultMessageListenerContainer` as well as various use cases and `MessageListener` variants. Unfortunately, this instrumentation could lead to observing JMS processing twice in some cases, and would not be consistent about the scope of what's observed. This commit moves the instrumentation basics into the `AbstractMessageListenerContainer` but leaves the actual observation calls to the public implementations. Fixes gh-33758 --- .../AbstractMessageListenerContainer.java | 25 ++++++++++--------- .../SimpleMessageListenerContainer.java | 2 +- ...sageListenerContainerObservationTests.java | 7 +++--- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/AbstractMessageListenerContainer.java b/spring-jms/src/main/java/org/springframework/jms/listener/AbstractMessageListenerContainer.java index 6362a4ba823..035805fa821 100644 --- a/spring-jms/src/main/java/org/springframework/jms/listener/AbstractMessageListenerContainer.java +++ b/spring-jms/src/main/java/org/springframework/jms/listener/AbstractMessageListenerContainer.java @@ -676,16 +676,21 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen * @see #handleListenerException */ protected void executeListener(Session session, Message message) { - createObservation(message).observe(() -> { - try { - doExecuteListener(session, message); - } - catch (Throwable ex) { - handleListenerException(ex); - } - }); + try { + doExecuteListener(session, message); + } + catch (Throwable ex) { + handleListenerException(ex); + } } + /** + * Create, but do not start an {@link Observation} for JMS message processing. + *

This will return a "no-op" observation if Micrometer Jakarta instrumentation + * is not available or if no Observation Registry has been configured. + * @param message the message to be observed + * @since 6.1 + */ protected Observation createObservation(Message message) { if (micrometerJakartaPresent && this.observationRegistry != null) { return ObservationFactory.create(this.observationRegistry, message); @@ -770,7 +775,6 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen Connection conToClose = null; Session sessionToClose = null; - Observation observation = createObservation(message); try { Session sessionToUse = session; if (micrometerJakartaPresent && this.observationRegistry != null) { @@ -782,7 +786,6 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen sessionToClose = createSession(conToClose); sessionToUse = sessionToClose; } - observation.start(); // Actually invoke the message listener... listener.onMessage(message, sessionToUse); // Clean up specially exposed Session, if any. @@ -794,11 +797,9 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen } } catch (JMSException exc) { - observation.error(exc); throw exc; } finally { - observation.stop(); JmsUtils.closeSession(sessionToClose); JmsUtils.closeConnection(conToClose); } diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/SimpleMessageListenerContainer.java b/spring-jms/src/main/java/org/springframework/jms/listener/SimpleMessageListenerContainer.java index 9530b236586..27a72421f64 100644 --- a/spring-jms/src/main/java/org/springframework/jms/listener/SimpleMessageListenerContainer.java +++ b/spring-jms/src/main/java/org/springframework/jms/listener/SimpleMessageListenerContainer.java @@ -341,7 +341,7 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta connectionFactory, new LocallyExposedJmsResourceHolder(session)); } try { - executeListener(session, message); + createObservation(message).observe(() -> executeListener(session, message)); } finally { if (exposeResource) { diff --git a/spring-jms/src/test/java/org/springframework/jms/listener/MessageListenerContainerObservationTests.java b/spring-jms/src/test/java/org/springframework/jms/listener/MessageListenerContainerObservationTests.java index 42109512d57..f3879fe72e8 100644 --- a/spring-jms/src/test/java/org/springframework/jms/listener/MessageListenerContainerObservationTests.java +++ b/spring-jms/src/test/java/org/springframework/jms/listener/MessageListenerContainerObservationTests.java @@ -75,7 +75,8 @@ class MessageListenerContainerObservationTests { JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); jmsTemplate.convertAndSend("spring.test.observation", "message content"); latch.await(2, TimeUnit.SECONDS); - assertThat(registry).hasObservationWithNameEqualTo("jms.message.process") + assertThat(registry).hasNumberOfObservationsWithNameEqualTo("jms.message.process", 1) + .hasObservationWithNameEqualTo("jms.message.process") .that() .hasHighCardinalityKeyValue("messaging.destination.name", "spring.test.observation"); assertThat(registry).hasNumberOfObservationsEqualTo(1); @@ -86,7 +87,6 @@ class MessageListenerContainerObservationTests { @ParameterizedTest(name = "[{index}] {0}") @MethodSource("listenerContainers") void shouldRecordJmsPublishObservations(AbstractMessageListenerContainer listenerContainer) throws Exception { - CountDownLatch latch = new CountDownLatch(1); listenerContainer.setConnectionFactory(connectionFactory); listenerContainer.setObservationRegistry(registry); listenerContainer.setDestinationName("spring.test.observation"); @@ -100,8 +100,7 @@ class MessageListenerContainerObservationTests { TextMessage response = (TextMessage) jmsTemplate.sendAndReceive("spring.test.observation", session -> session.createTextMessage("test request")); - // request received by listener and response received by template - assertThat(registry).hasNumberOfObservationsWithNameEqualTo("jms.message.process", 2); + assertThat(registry).hasNumberOfObservationsWithNameEqualTo("jms.message.process", 1); // response sent to the template assertThat(registry).hasNumberOfObservationsWithNameEqualTo("jms.message.publish", 1);