Browse Source

Fix double accounting of JMS process observations

Prior to this commit, the instrumentation of the processing of JMS
messages would happen a different levels of the hierarchy, accounting
for alli known implementations, `SimpleMessageListenerContainer` and
`DefaultMessageListenerContainer` as well as various use cases and
`MessageListener` variants.

Unfortunately, this instrumentation could lead to observing JMS
processing twice in some cases, and would not be consistent about the
scope of what's observed.

This commit moves the instrumentation basics into the
`AbstractMessageListenerContainer` but leaves the actual observation
calls to the public implementations.

Fixes gh-33758
pull/33766/head
Brian Clozel 1 year ago
parent
commit
84e762b470
  1. 25
      spring-jms/src/main/java/org/springframework/jms/listener/AbstractMessageListenerContainer.java
  2. 2
      spring-jms/src/main/java/org/springframework/jms/listener/SimpleMessageListenerContainer.java
  3. 7
      spring-jms/src/test/java/org/springframework/jms/listener/MessageListenerContainerObservationTests.java

25
spring-jms/src/main/java/org/springframework/jms/listener/AbstractMessageListenerContainer.java

@ -676,16 +676,21 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen @@ -676,16 +676,21 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen
* @see #handleListenerException
*/
protected void executeListener(Session session, Message message) {
createObservation(message).observe(() -> {
try {
doExecuteListener(session, message);
}
catch (Throwable ex) {
handleListenerException(ex);
}
});
try {
doExecuteListener(session, message);
}
catch (Throwable ex) {
handleListenerException(ex);
}
}
/**
* Create, but do not start an {@link Observation} for JMS message processing.
* <p>This will return a "no-op" observation if Micrometer Jakarta instrumentation
* is not available or if no Observation Registry has been configured.
* @param message the message to be observed
* @since 6.1
*/
protected Observation createObservation(Message message) {
if (micrometerJakartaPresent && this.observationRegistry != null) {
return ObservationFactory.create(this.observationRegistry, message);
@ -770,7 +775,6 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen @@ -770,7 +775,6 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen
Connection conToClose = null;
Session sessionToClose = null;
Observation observation = createObservation(message);
try {
Session sessionToUse = session;
if (micrometerJakartaPresent && this.observationRegistry != null) {
@ -782,7 +786,6 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen @@ -782,7 +786,6 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen
sessionToClose = createSession(conToClose);
sessionToUse = sessionToClose;
}
observation.start();
// Actually invoke the message listener...
listener.onMessage(message, sessionToUse);
// Clean up specially exposed Session, if any.
@ -794,11 +797,9 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen @@ -794,11 +797,9 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen
}
}
catch (JMSException exc) {
observation.error(exc);
throw exc;
}
finally {
observation.stop();
JmsUtils.closeSession(sessionToClose);
JmsUtils.closeConnection(conToClose);
}

2
spring-jms/src/main/java/org/springframework/jms/listener/SimpleMessageListenerContainer.java

@ -341,7 +341,7 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta @@ -341,7 +341,7 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
connectionFactory, new LocallyExposedJmsResourceHolder(session));
}
try {
executeListener(session, message);
createObservation(message).observe(() -> executeListener(session, message));
}
finally {
if (exposeResource) {

7
spring-jms/src/test/java/org/springframework/jms/listener/MessageListenerContainerObservationTests.java

@ -75,7 +75,8 @@ class MessageListenerContainerObservationTests { @@ -75,7 +75,8 @@ class MessageListenerContainerObservationTests {
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
jmsTemplate.convertAndSend("spring.test.observation", "message content");
latch.await(2, TimeUnit.SECONDS);
assertThat(registry).hasObservationWithNameEqualTo("jms.message.process")
assertThat(registry).hasNumberOfObservationsWithNameEqualTo("jms.message.process", 1)
.hasObservationWithNameEqualTo("jms.message.process")
.that()
.hasHighCardinalityKeyValue("messaging.destination.name", "spring.test.observation");
assertThat(registry).hasNumberOfObservationsEqualTo(1);
@ -86,7 +87,6 @@ class MessageListenerContainerObservationTests { @@ -86,7 +87,6 @@ class MessageListenerContainerObservationTests {
@ParameterizedTest(name = "[{index}] {0}")
@MethodSource("listenerContainers")
void shouldRecordJmsPublishObservations(AbstractMessageListenerContainer listenerContainer) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
listenerContainer.setConnectionFactory(connectionFactory);
listenerContainer.setObservationRegistry(registry);
listenerContainer.setDestinationName("spring.test.observation");
@ -100,8 +100,7 @@ class MessageListenerContainerObservationTests { @@ -100,8 +100,7 @@ class MessageListenerContainerObservationTests {
TextMessage response = (TextMessage) jmsTemplate.sendAndReceive("spring.test.observation",
session -> session.createTextMessage("test request"));
// request received by listener and response received by template
assertThat(registry).hasNumberOfObservationsWithNameEqualTo("jms.message.process", 2);
assertThat(registry).hasNumberOfObservationsWithNameEqualTo("jms.message.process", 1);
// response sent to the template
assertThat(registry).hasNumberOfObservationsWithNameEqualTo("jms.message.publish", 1);

Loading…
Cancel
Save