Browse Source

Merge branch '6.2.x'

pull/34705/head
Brian Clozel 10 months ago
parent
commit
a787088df2
  1. 24
      spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java
  2. 11
      spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientObservationTests.java

24
spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java

@ -436,22 +436,23 @@ final class DefaultWebClient implements WebClient { @@ -436,22 +436,23 @@ final class DefaultWebClient implements WebClient {
private Mono<ClientResponse> exchange() {
ClientRequest.Builder requestBuilder = initRequestBuilder();
ClientRequestObservationContext observationContext = new ClientRequestObservationContext(requestBuilder);
return Mono.deferContextual(contextView -> {
Observation observation = ClientHttpObservationDocumentation.HTTP_REACTIVE_CLIENT_EXCHANGES.observation(observationConvention,
DEFAULT_OBSERVATION_CONVENTION, () -> observationContext, observationRegistry);
DEFAULT_OBSERVATION_CONVENTION, () -> new ClientRequestObservationContext(requestBuilder), observationRegistry);
observation
.parentObservation(contextView.getOrDefault(ObservationThreadLocalAccessor.KEY, null))
.start();
ExchangeFilterFunction filterFunction = new ObservationFilterFunction(observationContext);
ExchangeFilterFunction filterFunction = new ObservationFilterFunction(observation.getContext());
if (filterFunctions != null) {
filterFunction = filterFunctions.andThen(filterFunction);
}
contextView.getOrEmpty(COROUTINE_CONTEXT_ATTRIBUTE)
.ifPresent(context -> requestBuilder.attribute(COROUTINE_CONTEXT_ATTRIBUTE, context));
ClientRequest request = requestBuilder.build();
observationContext.setUriTemplate((String) request.attribute(URI_TEMPLATE_ATTRIBUTE).orElse(null));
observationContext.setRequest(request);
if (observation.getContext() instanceof ClientRequestObservationContext observationContext) {
observationContext.setUriTemplate((String) request.attribute(URI_TEMPLATE_ATTRIBUTE).orElse(null));
observationContext.setRequest(request);
}
final ExchangeFilterFunction finalFilterFunction = filterFunction;
Mono<ClientResponse> responseMono = Mono.defer(
() -> finalFilterFunction.apply(exchangeFunction).exchange(request))
@ -464,7 +465,8 @@ final class DefaultWebClient implements WebClient { @@ -464,7 +465,8 @@ final class DefaultWebClient implements WebClient {
.doOnNext(response -> responseReceived.set(true))
.doOnError(observation::error)
.doFinally(signalType -> {
if (signalType == SignalType.CANCEL && !responseReceived.get()) {
if (signalType == SignalType.CANCEL && !responseReceived.get() &&
observation.getContext() instanceof ClientRequestObservationContext observationContext) {
observationContext.setAborted(true);
}
observation.stop();
@ -728,15 +730,19 @@ final class DefaultWebClient implements WebClient { @@ -728,15 +730,19 @@ final class DefaultWebClient implements WebClient {
private static class ObservationFilterFunction implements ExchangeFilterFunction {
private final ClientRequestObservationContext observationContext;
private final Observation.Context observationContext;
ObservationFilterFunction(ClientRequestObservationContext observationContext) {
ObservationFilterFunction(Observation.Context observationContext) {
this.observationContext = observationContext;
}
@Override
public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
return next.exchange(request).doOnNext(this.observationContext::setResponse);
Mono<ClientResponse> exchange = next.exchange(request);
if (this.observationContext instanceof ClientRequestObservationContext clientContext) {
exchange = exchange.doOnNext(clientContext::setResponse);
}
return exchange;
}
}

11
spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientObservationTests.java

@ -69,6 +69,7 @@ class WebClientObservationTests { @@ -69,6 +69,7 @@ class WebClientObservationTests {
when(mockResponse.statusCode()).thenReturn(HttpStatus.OK);
when(mockResponse.headers()).thenReturn(new MockClientHeaders());
when(mockResponse.bodyToMono(Void.class)).thenReturn(Mono.empty());
when(mockResponse.bodyToMono(String.class)).thenReturn(Mono.error(IllegalStateException::new), Mono.just("Hello"));
when(mockResponse.bodyToFlux(String.class)).thenReturn(Flux.just("first", "second"));
when(mockResponse.releaseBody()).thenReturn(Mono.empty());
when(this.exchangeFunction.exchange(this.request.capture())).thenReturn(Mono.just(mockResponse));
@ -141,6 +142,16 @@ class WebClientObservationTests { @@ -141,6 +142,16 @@ class WebClientObservationTests {
.hasLowCardinalityKeyValue("status", "200");
}
@Test
void recordsSingleObservationForRetries() {
StepVerifier.create(this.builder.build().get().uri("/path").retrieve().bodyToMono(String.class).retry(1))
.expectNextCount(1)
.expectComplete()
.verify(Duration.ofSeconds(2));
assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SUCCESS")
.hasLowCardinalityKeyValue("status", "200");
}
@Test
void setsCurrentObservationInReactorContext() {
ExchangeFilterFunction assertionFilter = (request, chain) -> chain.exchange(request).contextWrite(context -> {

Loading…
Cancel
Save