diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java index 443ba3018f9..62f93369561 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -449,20 +449,21 @@ final class DefaultWebClient implements WebClient { @Override public Mono exchange() { ClientRequest.Builder requestBuilder = initRequestBuilder(); - ClientRequestObservationContext observationContext = new ClientRequestObservationContext(requestBuilder); return Mono.deferContextual(contextView -> { Observation observation = ClientHttpObservationDocumentation.HTTP_REACTIVE_CLIENT_EXCHANGES.observation(observationConvention, - DEFAULT_OBSERVATION_CONVENTION, () -> observationContext, observationRegistry); + DEFAULT_OBSERVATION_CONVENTION, () -> new ClientRequestObservationContext(requestBuilder), observationRegistry); observation .parentObservation(contextView.getOrDefault(ObservationThreadLocalAccessor.KEY, null)) .start(); - ExchangeFilterFunction filterFunction = new ObservationFilterFunction(observationContext); + ExchangeFilterFunction filterFunction = new ObservationFilterFunction(observation.getContext()); if (filterFunctions != null) { filterFunction = filterFunctions.andThen(filterFunction); } ClientRequest request = requestBuilder.build(); - observationContext.setUriTemplate((String) request.attribute(URI_TEMPLATE_ATTRIBUTE).orElse(null)); - observationContext.setRequest(request); + if (observation.getContext() instanceof ClientRequestObservationContext observationContext) { + observationContext.setUriTemplate((String) request.attribute(URI_TEMPLATE_ATTRIBUTE).orElse(null)); + observationContext.setRequest(request); + } final ExchangeFilterFunction finalFilterFunction = filterFunction; Mono responseMono = Mono.defer( () -> finalFilterFunction.apply(exchangeFunction).exchange(request)) @@ -478,7 +479,8 @@ final class DefaultWebClient implements WebClient { .doOnNext(response -> responseReceived.set(true)) .doOnError(observation::error) .doFinally(signalType -> { - if (signalType == SignalType.CANCEL && !responseReceived.get()) { + if (signalType == SignalType.CANCEL && !responseReceived.get() && + observation.getContext() instanceof ClientRequestObservationContext observationContext) { observationContext.setAborted(true); } observation.stop(); @@ -734,15 +736,19 @@ final class DefaultWebClient implements WebClient { private static class ObservationFilterFunction implements ExchangeFilterFunction { - private final ClientRequestObservationContext observationContext; + private final Observation.Context observationContext; - ObservationFilterFunction(ClientRequestObservationContext observationContext) { + ObservationFilterFunction(Observation.Context observationContext) { this.observationContext = observationContext; } @Override public Mono filter(ClientRequest request, ExchangeFunction next) { - return next.exchange(request).doOnNext(this.observationContext::setResponse); + Mono exchange = next.exchange(request); + if (this.observationContext instanceof ClientRequestObservationContext clientContext) { + exchange = exchange.doOnNext(clientContext::setResponse); + } + return exchange; } } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientObservationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientObservationTests.java index 96c6d3a3ff0..f42efc97f35 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientObservationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientObservationTests.java @@ -69,6 +69,7 @@ class WebClientObservationTests { when(mockResponse.statusCode()).thenReturn(HttpStatus.OK); when(mockResponse.headers()).thenReturn(new MockClientHeaders()); when(mockResponse.bodyToMono(Void.class)).thenReturn(Mono.empty()); + when(mockResponse.bodyToMono(String.class)).thenReturn(Mono.error(IllegalStateException::new), Mono.just("Hello")); when(mockResponse.bodyToFlux(String.class)).thenReturn(Flux.just("first", "second")); when(mockResponse.releaseBody()).thenReturn(Mono.empty()); when(this.exchangeFunction.exchange(this.request.capture())).thenReturn(Mono.just(mockResponse)); @@ -141,6 +142,16 @@ class WebClientObservationTests { .hasLowCardinalityKeyValue("status", "200"); } + @Test + void recordsSingleObservationForRetries() { + StepVerifier.create(this.builder.build().get().uri("/path").retrieve().bodyToMono(String.class).retry(1)) + .expectNextCount(1) + .expectComplete() + .verify(Duration.ofSeconds(2)); + assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SUCCESS") + .hasLowCardinalityKeyValue("status", "200"); + } + @Test void setsCurrentObservationInReactorContext() { ExchangeFilterFunction assertionFilter = (request, chain) -> chain.exchange(request).contextWrite(context -> {