Browse Source

Improve WebClient observations handling of CANCEL signal

Prior to this commit, `WebClient` observations would be recorded as
aborted (with tags "outcome":"UNKNOWN", "status":"CLIENT_ERROR")
for use cases like this:

```
Flux<String> result = client.get()
    .uri("/path")
    .retrieve()
    .bodyToFlux(String.class)
    .take(1);
```

This is due to operators like `take` or `next` that consume *some*
`onNext` signals and then cancels the subscription before completion.
This means the subscriber is only partially interested in the response
and we should not count this as a client error.

This commit ensures that observations are only recorded as aborted if
the response was not published at the time the CANCEL signal was
received.

The code snippet above will now publish observations with
"outcome":"SUCCESS" and "status":"200" tags, for example.

Closes gh-30070
pull/30308/head
Brian Clozel 3 years ago
parent
commit
01f97887ea
  1. 11
      spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java
  2. 46
      spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientObservationTests.java

11
spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java

@ -26,6 +26,7 @@ import java.util.Collections; @@ -26,6 +26,7 @@ import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntPredicate;
@ -37,6 +38,7 @@ import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccess @@ -37,6 +38,7 @@ import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccess
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.util.context.Context;
import org.springframework.core.ParameterizedTypeReference;
@ -455,13 +457,16 @@ class DefaultWebClient implements WebClient { @@ -455,13 +457,16 @@ class DefaultWebClient implements WebClient {
if (this.contextModifier != null) {
responseMono = responseMono.contextWrite(this.contextModifier);
}
final AtomicBoolean responseReceived = new AtomicBoolean();
return responseMono
.doOnNext(response -> responseReceived.set(true))
.doOnError(observationContext::setError)
.doOnCancel(() -> {
observationContext.setAborted(true);
.doFinally(signalType -> {
if (signalType == SignalType.CANCEL && !responseReceived.get()) {
observationContext.setAborted(true);
}
observation.stop();
})
.doOnTerminate(observation::stop)
.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, observation));
});
}

46
spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientObservationTests.java

@ -18,6 +18,9 @@ package org.springframework.web.reactive.function.client; @@ -18,6 +18,9 @@ package org.springframework.web.reactive.function.client;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationHandler;
@ -27,10 +30,13 @@ import io.micrometer.observation.tck.TestObservationRegistryAssert; @@ -27,10 +30,13 @@ import io.micrometer.observation.tck.TestObservationRegistryAssert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
@ -59,7 +65,10 @@ class WebClientObservationTests { @@ -59,7 +65,10 @@ class WebClientObservationTests {
void setup() {
ClientResponse mockResponse = mock();
when(mockResponse.statusCode()).thenReturn(HttpStatus.OK);
when(mockResponse.headers()).thenReturn(new MockClientHeaders());
when(mockResponse.bodyToMono(Void.class)).thenReturn(Mono.empty());
when(mockResponse.bodyToFlux(String.class)).thenReturn(Flux.just("first", "second"));
when(mockResponse.releaseBody()).thenReturn(Mono.empty());
given(this.exchangeFunction.exchange(this.request.capture())).willReturn(Mono.just(mockResponse));
this.builder = WebClient.builder().baseUrl("/base").exchangeFunction(this.exchangeFunction).observationRegistry(this.observationRegistry);
this.observationRegistry.observationConfig().observationHandler(new HeaderInjectingHandler());
@ -114,6 +123,16 @@ class WebClientObservationTests { @@ -114,6 +123,16 @@ class WebClientObservationTests {
.hasLowCardinalityKeyValue("status", "CLIENT_ERROR");
}
@Test
void recordsObservationForCancelledExchangeDuringResponse() {
StepVerifier.create(this.builder.build().get().uri("/path").retrieve().bodyToFlux(String.class).take(1))
.expectNextCount(1)
.expectComplete()
.verify(Duration.ofSeconds(5));
assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SUCCESS")
.hasLowCardinalityKeyValue("status", "200");
}
@Test
void setsCurrentObservationInReactorContext() {
ExchangeFilterFunction assertionFilter = new ExchangeFilterFunction() {
@ -130,7 +149,7 @@ class WebClientObservationTests { @@ -130,7 +149,7 @@ class WebClientObservationTests {
this.builder.filter(assertionFilter).build().get().uri("/resource/{id}", 42)
.retrieve().bodyToMono(Void.class)
.block(Duration.ofSeconds(10));
verifyAndGetRequest();
verifyAndGetRequest();
}
@Test
@ -170,4 +189,29 @@ class WebClientObservationTests { @@ -170,4 +189,29 @@ class WebClientObservationTests {
}
}
static class MockClientHeaders implements ClientResponse.Headers {
private HttpHeaders headers = new HttpHeaders();
@Override
public OptionalLong contentLength() {
return OptionalLong.empty();
}
@Override
public Optional<MediaType> contentType() {
return Optional.empty();
}
@Override
public List<String> header(String headerName) {
return Collections.emptyList();
}
@Override
public HttpHeaders asHttpHeaders() {
return this.headers;
}
}
}

Loading…
Cancel
Save