From d451d6adccf82c410a249c69d13964d413dcce97 Mon Sep 17 00:00:00 2001 From: Brian Clozel Date: Wed, 29 Mar 2023 17:41:28 +0200 Subject: [PATCH] Ensure that client responses are observed when filters fail Prior to this commit, an error thrown by a `ExchangeFilterFunction` configured on a `WebClient` instance would be recorded as such by the client observation, but the response details would be missing from the observation. All filter functions and the exchange function (performing the HTTP call) would be merged into a single `ExchangeFunction`; this instance was instrumented and osberved. As a result, the instrumentation would only get the error signal returned by the filter function and would not see the HTTP response even if it was received. This means that the recorded observation would not have the relevant information for the HTTP status. This commit ensures that between the configured `ExchangeFilterFunction` and the `ExchangeFunction`, an instrumentation `ExchangeFilterFunction` is inserted. This allows to set the client response to the observation context, even if a later error signal is thrown by a filter function. Note that with this change, an error signal sent by a filter function will be still recorded in the observation. See gh-30059 --- .../function/client/DefaultWebClient.java | 33 ++++++++++++++++--- .../client/DefaultWebClientBuilder.java | 9 ++--- .../client/WebClientObservationTests.java | 13 ++++++++ 3 files changed, 47 insertions(+), 8 deletions(-) diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java index cf3828cd70e..a675dba7019 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java @@ -78,6 +78,9 @@ class DefaultWebClient implements WebClient { private final ExchangeFunction exchangeFunction; + @Nullable + private final ExchangeFilterFunction filterFunctions; + private final UriBuilderFactory uriBuilderFactory; @Nullable @@ -93,19 +96,21 @@ class DefaultWebClient implements WebClient { private final ObservationRegistry observationRegistry; + @Nullable private final ClientRequestObservationConvention observationConvention; private final DefaultWebClientBuilder builder; - DefaultWebClient(ExchangeFunction exchangeFunction, UriBuilderFactory uriBuilderFactory, + DefaultWebClient(ExchangeFunction exchangeFunction, @Nullable ExchangeFilterFunction filterFunctions, UriBuilderFactory uriBuilderFactory, @Nullable HttpHeaders defaultHeaders, @Nullable MultiValueMap defaultCookies, @Nullable Consumer> defaultRequest, @Nullable Map, Function>> statusHandlerMap, - ObservationRegistry observationRegistry, ClientRequestObservationConvention observationConvention, + ObservationRegistry observationRegistry, @Nullable ClientRequestObservationConvention observationConvention, DefaultWebClientBuilder builder) { this.exchangeFunction = exchangeFunction; + this.filterFunctions = filterFunctions; this.uriBuilderFactory = uriBuilderFactory; this.defaultHeaders = defaultHeaders; this.defaultCookies = defaultCookies; @@ -438,16 +443,21 @@ class DefaultWebClient implements WebClient { observation .parentObservation(contextView.getOrDefault(ObservationThreadLocalAccessor.KEY, null)) .start(); + ExchangeFilterFunction filterFunction = new ObservationFilterFunction(observationContext); + if (filterFunctions != null) { + filterFunction = filterFunctions.andThen(filterFunction); + } ClientRequest request = requestBuilder.build(); observationContext.setUriTemplate((String) request.attribute(URI_TEMPLATE_ATTRIBUTE).orElse(null)); observationContext.setRequest(request); - Mono responseMono = exchangeFunction.exchange(request) + Mono responseMono = filterFunction.apply(exchangeFunction) + .exchange(request) .checkpoint("Request to " + this.httpMethod.name() + " " + this.uri + " [DefaultWebClient]") .switchIfEmpty(NO_HTTP_CLIENT_RESPONSE_ERROR); if (this.contextModifier != null) { responseMono = responseMono.contextWrite(this.contextModifier); } - return responseMono.doOnNext(observationContext::setResponse) + return responseMono .doOnError(observationContext::setError) .doOnCancel(() -> { observationContext.setAborted(true); @@ -718,4 +728,19 @@ class DefaultWebClient implements WebClient { } } + private static class ObservationFilterFunction implements ExchangeFilterFunction { + + private final ClientRequestObservationContext observationContext; + + public ObservationFilterFunction(ClientRequestObservationContext observationContext) { + this.observationContext = observationContext; + } + + @Override + public Mono filter(ClientRequest request, ExchangeFunction next) { + return next.exchange(request) + .doOnNext(this.observationContext::setResponse); + } + } + } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClientBuilder.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClientBuilder.java index a5493f2def1..4321e05abf5 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClientBuilder.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClientBuilder.java @@ -314,16 +314,17 @@ final class DefaultWebClientBuilder implements WebClient.Builder { ExchangeFunctions.create(connectorToUse, initExchangeStrategies()) : this.exchangeFunction); - ExchangeFunction filteredExchange = (this.filters != null ? this.filters.stream() + ExchangeFilterFunction filterFunctions = (this.filters != null ? this.filters.stream() .reduce(ExchangeFilterFunction::andThen) - .map(filter -> filter.apply(exchange)) - .orElse(exchange) : exchange); + .orElse(null) : null); HttpHeaders defaultHeaders = copyDefaultHeaders(); MultiValueMap defaultCookies = copyDefaultCookies(); - return new DefaultWebClient(filteredExchange, initUriBuilderFactory(), + return new DefaultWebClient(exchange, + filterFunctions, + initUriBuilderFactory(), defaultHeaders, defaultCookies, this.defaultRequest, diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientObservationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientObservationTests.java index 3131cbda3e3..a94a5671b26 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientObservationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientObservationTests.java @@ -133,6 +133,19 @@ class WebClientObservationTests { verifyAndGetRequest(); } + @Test + void recordsObservationWithResponseDetailsWhenFilterFunctionErrors() { + ExchangeFilterFunction errorFunction = (req, next) -> next.exchange(req).then(Mono.error(new IllegalStateException())); + WebClient client = this.builder.filter(errorFunction).build(); + Mono responseMono = client.get().uri("/path").retrieve().bodyToMono(Void.class); + StepVerifier.create(responseMono) + .expectError(IllegalStateException.class) + .verify(Duration.ofSeconds(5)); + assertThatHttpObservation() + .hasLowCardinalityKeyValue("exception", "IllegalStateException") + .hasLowCardinalityKeyValue("status", "200"); + } + private TestObservationRegistryAssert.TestObservationRegistryAssertReturningObservationContextAssert assertThatHttpObservation() { return TestObservationRegistryAssert.assertThat(this.observationRegistry) .hasObservationWithNameEqualTo("http.client.requests").that();