From 35fcbae8c67d413cc7df8b96aec2769d0fe6bd94 Mon Sep 17 00:00:00 2001 From: Brian Clozel Date: Wed, 29 Nov 2023 14:39:56 +0100 Subject: [PATCH] Fix reactive HTTP server Observation instrumentation Prior to this commit, regressions were introduced with gh-31417: 1. the observation keyvalues would be inconsistent with the HTTP response 2. the observation scope would not cover all controller handlers, causing traceIds to be missing The first issue is caused by the fact that in case of error signals, the observation was stopped before the response was fully committed, which means further processing could happen and update the response status. This commit delays the stop event until the response is committed in case of errors. The second problem is caused by the change from a `contextWrite` operator to using the `tap` operator with a `SignalListener`. The observation was started in the `doOnSubscription` callback, which is too late in some cases. If the WebFlux controller handler is synchronous non-blocking, the execution of the handler is performed before the subscription happens. This means that for those handlers, the observation was not started, even if the current observation was present in the reactor context. This commit changes the `doOnSubscription` to `doFirst` to ensure that the observation is started at the right time. Fixes gh-31703 Fixes gh-31706 --- .../reactive/ServerHttpObservationFilter.java | 33 +++++++++++-------- .../server/adapter/HttpWebHandlerAdapter.java | 33 +++++++++++-------- .../ServerHttpObservationFilterTests.java | 26 ++++++++++++++- ...tpWebHandlerAdapterObservabilityTests.java | 12 ++++--- 4 files changed, 71 insertions(+), 33 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/web/filter/reactive/ServerHttpObservationFilter.java b/spring-web/src/main/java/org/springframework/web/filter/reactive/ServerHttpObservationFilter.java index 174cde705ac..5cc36eeeed8 100644 --- a/spring-web/src/main/java/org/springframework/web/filter/reactive/ServerHttpObservationFilter.java +++ b/spring-web/src/main/java/org/springframework/web/filter/reactive/ServerHttpObservationFilter.java @@ -121,16 +121,17 @@ public class ServerHttpObservationFilter implements WebFilter { DEFAULT_OBSERVATION_CONVENTION, () -> observationContext, observationRegistry); } - @Override - public void doOnSubscription() throws Throwable { - this.observation.start(); - } @Override public Context addToContext(Context originalContext) { return originalContext.put(ObservationThreadLocalAccessor.KEY, this.observation); } + @Override + public void doFirst() throws Throwable { + this.observation.start(); + } + @Override public void doOnCancel() throws Throwable { if (this.observationRecorded.compareAndSet(false, true)) { @@ -142,16 +143,7 @@ public class ServerHttpObservationFilter implements WebFilter { @Override public void doOnComplete() throws Throwable { if (this.observationRecorded.compareAndSet(false, true)) { - ServerHttpResponse response = this.observationContext.getResponse(); - if (response.isCommitted()) { - this.observation.stop(); - } - else { - response.beforeCommit(() -> { - this.observation.stop(); - return Mono.empty(); - }); - } + doOnTerminate(this.observationContext); } } @@ -162,8 +154,21 @@ public class ServerHttpObservationFilter implements WebFilter { this.observationContext.setConnectionAborted(true); } this.observationContext.setError(error); + doOnTerminate(this.observationContext); + } + } + + private void doOnTerminate(ServerRequestObservationContext context) { + ServerHttpResponse response = context.getResponse(); + if (response.isCommitted()) { this.observation.stop(); } + else { + response.beforeCommit(() -> { + this.observation.stop(); + return Mono.empty(); + }); + } } } diff --git a/spring-web/src/main/java/org/springframework/web/server/adapter/HttpWebHandlerAdapter.java b/spring-web/src/main/java/org/springframework/web/server/adapter/HttpWebHandlerAdapter.java index 0eacc76fdfe..34f4f754c04 100644 --- a/spring-web/src/main/java/org/springframework/web/server/adapter/HttpWebHandlerAdapter.java +++ b/spring-web/src/main/java/org/springframework/web/server/adapter/HttpWebHandlerAdapter.java @@ -374,13 +374,13 @@ public class HttpWebHandlerAdapter extends WebHandlerDecorator implements HttpHa } @Override - public void doOnSubscription() throws Throwable { - this.observation.start(); + public Context addToContext(Context originalContext) { + return originalContext.put(ObservationThreadLocalAccessor.KEY, this.observation); } @Override - public Context addToContext(Context originalContext) { - return originalContext.put(ObservationThreadLocalAccessor.KEY, this.observation); + public void doFirst() throws Throwable { + this.observation.start(); } @Override @@ -394,21 +394,12 @@ public class HttpWebHandlerAdapter extends WebHandlerDecorator implements HttpHa @Override public void doOnComplete() throws Throwable { if (this.observationRecorded.compareAndSet(false, true)) { - ServerHttpResponse response = this.observationContext.getResponse(); Throwable throwable = (Throwable) this.observationContext.getAttributes() .get(ExceptionHandlingWebHandler.HANDLED_WEB_EXCEPTION); if (throwable != null) { this.observation.error(throwable); } - if (response.isCommitted()) { - this.observation.stop(); - } - else { - response.beforeCommit(() -> { - this.observation.stop(); - return Mono.empty(); - }); - } + doOnTerminate(this.observationContext); } } @@ -416,8 +407,22 @@ public class HttpWebHandlerAdapter extends WebHandlerDecorator implements HttpHa public void doOnError(Throwable error) throws Throwable { if (this.observationRecorded.compareAndSet(false, true)) { this.observationContext.setError(error); + doOnTerminate(this.observationContext); + } + } + + + private void doOnTerminate(ServerRequestObservationContext context) { + ServerHttpResponse response = context.getResponse(); + if (response.isCommitted()) { this.observation.stop(); } + else { + response.beforeCommit(() -> { + this.observation.stop(); + return Mono.empty(); + }); + } } } diff --git a/spring-web/src/test/java/org/springframework/web/filter/reactive/ServerHttpObservationFilterTests.java b/spring-web/src/test/java/org/springframework/web/filter/reactive/ServerHttpObservationFilterTests.java index b6a6d98cce0..fc1d3e75f7c 100644 --- a/spring-web/src/test/java/org/springframework/web/filter/reactive/ServerHttpObservationFilterTests.java +++ b/spring-web/src/test/java/org/springframework/web/filter/reactive/ServerHttpObservationFilterTests.java @@ -19,6 +19,7 @@ package org.springframework.web.filter.reactive; import java.util.Optional; +import io.micrometer.observation.Observation; import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor; import io.micrometer.observation.tck.TestObservationRegistry; import io.micrometer.observation.tck.TestObservationRegistryAssert; @@ -27,6 +28,7 @@ import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.http.server.reactive.observation.ServerRequestObservationContext; import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.WebFilterChain; @@ -66,7 +68,10 @@ class ServerHttpObservationFilterTests { ServerWebExchange exchange = MockServerWebExchange.from(MockServerHttpRequest.post("/test/resource")); exchange.getResponse().setRawStatusCode(200); WebFilterChain filterChain = webExchange -> Mono.deferContextual(contextView -> { - assertThat(contextView.getOrEmpty(ObservationThreadLocalAccessor.KEY)).isPresent(); + Observation observation = contextView.get(ObservationThreadLocalAccessor.KEY); + assertThat(observation).isNotNull(); + // check that the observation was started + assertThat(observation.getContext().getLowCardinalityKeyValue("outcome")).isNotNull(); return Mono.empty(); }); this.filter.filter(exchange, filterChain).block(); @@ -100,6 +105,25 @@ class ServerHttpObservationFilterTests { assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "UNKNOWN"); } + @Test + void filterShouldStopObservationOnResponseCommit() { + ServerWebExchange exchange = MockServerWebExchange.from(MockServerHttpRequest.post("/test/resource")); + WebFilterChain filterChain = createFilterChain(filterExchange -> { + throw new IllegalArgumentException("server error"); + }); + StepVerifier.create(this.filter.filter(exchange, filterChain).doOnError(throwable -> { + ServerHttpResponse response = exchange.getResponse(); + response.setRawStatusCode(500); + response.setComplete().block(); + })) + .expectError(IllegalArgumentException.class) + .verify(); + Optional observationContext = ServerHttpObservationFilter.findObservationContext(exchange); + assertThat(observationContext.get().getError()).isInstanceOf(IllegalArgumentException.class); + assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SERVER_ERROR"); + } + + private WebFilterChain createFilterChain(ThrowingConsumer exchangeConsumer) { return filterExchange -> { try { diff --git a/spring-web/src/test/java/org/springframework/web/server/adapter/HttpWebHandlerAdapterObservabilityTests.java b/spring-web/src/test/java/org/springframework/web/server/adapter/HttpWebHandlerAdapterObservabilityTests.java index 7e44daad549..8f83da58f54 100644 --- a/spring-web/src/test/java/org/springframework/web/server/adapter/HttpWebHandlerAdapterObservabilityTests.java +++ b/spring-web/src/test/java/org/springframework/web/server/adapter/HttpWebHandlerAdapterObservabilityTests.java @@ -20,13 +20,13 @@ package org.springframework.web.server.adapter; import java.util.List; import java.util.Optional; +import io.micrometer.observation.Observation; import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor; import io.micrometer.observation.tck.TestObservationRegistry; import io.micrometer.observation.tck.TestObservationRegistryAssert; import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; -import reactor.util.context.ContextView; import org.springframework.http.HttpStatus; import org.springframework.http.server.reactive.observation.ServerRequestObservationContext; @@ -66,7 +66,8 @@ class HttpWebHandlerAdapterObservabilityTests { void handlerShouldSetCurrentObservationInReactorContext() { ReactorContextWebHandler targetHandler = new ReactorContextWebHandler(); createWebHandler(targetHandler).handle(this.request, this.response).block(); - assertThat(targetHandler.contextView.getOrEmpty(ObservationThreadLocalAccessor.KEY)).isPresent(); + assertThat(targetHandler.currentObservation).isNotNull(); + assertThat(targetHandler.observationStarted).isTrue(); assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SUCCESS"); } @@ -120,13 +121,16 @@ class HttpWebHandlerAdapterObservabilityTests { private static class ReactorContextWebHandler implements WebHandler { - ContextView contextView; + Observation currentObservation; + + boolean observationStarted; @Override public Mono handle(ServerWebExchange exchange) { exchange.getResponse().setStatusCode(HttpStatus.OK); return Mono.deferContextual(contextView -> { - this.contextView = contextView; + this.currentObservation = contextView.get(ObservationThreadLocalAccessor.KEY); + this.observationStarted = this.currentObservation.getContext().getLowCardinalityKeyValue("outcome") != null; return Mono.empty(); }); }