Browse Source

toEntityFlux methods apply error status handling

Closes gh-26069
pull/26087/head
Rossen Stoyanchev 5 years ago
parent
commit
42d3bc47c9
  1. 36
      spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java
  2. 23
      spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultWebClientTests.java

36
spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java

@ -534,25 +534,29 @@ class DefaultWebClient implements WebClient { @@ -534,25 +534,29 @@ class DefaultWebClient implements WebClient {
@Override
public <T> Mono<T> bodyToMono(Class<T> elementClass) {
Assert.notNull(elementClass, "Class must not be null");
return this.responseMono.flatMap(response -> handleBodyMono(response, response.bodyToMono(elementClass)));
return this.responseMono.flatMap(response ->
handleBodyMono(response, response.bodyToMono(elementClass)));
}
@Override
public <T> Mono<T> bodyToMono(ParameterizedTypeReference<T> elementTypeRef) {
Assert.notNull(elementTypeRef, "ParameterizedTypeReference must not be null");
return this.responseMono.flatMap(response -> handleBodyMono(response, response.bodyToMono(elementTypeRef)));
return this.responseMono.flatMap(response ->
handleBodyMono(response, response.bodyToMono(elementTypeRef)));
}
@Override
public <T> Flux<T> bodyToFlux(Class<T> elementClass) {
Assert.notNull(elementClass, "Class must not be null");
return this.responseMono.flatMapMany(response -> handleBodyFlux(response, response.bodyToFlux(elementClass)));
return this.responseMono.flatMapMany(response ->
handleBodyFlux(response, response.bodyToFlux(elementClass)));
}
@Override
public <T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> elementTypeRef) {
Assert.notNull(elementTypeRef, "ParameterizedTypeReference must not be null");
return this.responseMono.flatMapMany(response -> handleBodyFlux(response, response.bodyToFlux(elementTypeRef)));
return this.responseMono.flatMapMany(response ->
handleBodyFlux(response, response.bodyToFlux(elementTypeRef)));
}
@Override
@ -585,18 +589,14 @@ class DefaultWebClient implements WebClient { @@ -585,18 +589,14 @@ class DefaultWebClient implements WebClient {
@Override
public <T> Mono<ResponseEntity<Flux<T>>> toEntityFlux(Class<T> elementType) {
return this.responseMono.map(response ->
ResponseEntity.status(response.rawStatusCode())
.headers(response.headers().asHttpHeaders())
.body(response.bodyToFlux(elementType)));
return this.responseMono.flatMap(response ->
handlerEntityFlux(response, response.bodyToFlux(elementType)));
}
@Override
public <T> Mono<ResponseEntity<Flux<T>>> toEntityFlux(ParameterizedTypeReference<T> elementTypeReference) {
return this.responseMono.map(response ->
ResponseEntity.status(response.rawStatusCode())
.headers(response.headers().asHttpHeaders())
.body(response.bodyToFlux(elementTypeReference)));
public <T> Mono<ResponseEntity<Flux<T>>> toEntityFlux(ParameterizedTypeReference<T> elementTypeRef) {
return this.responseMono.flatMap(response ->
handlerEntityFlux(response, response.bodyToFlux(elementTypeRef)));
}
@Override
@ -619,6 +619,16 @@ class DefaultWebClient implements WebClient { @@ -619,6 +619,16 @@ class DefaultWebClient implements WebClient {
return (result != null ? result.flux().switchIfEmpty(body) : body);
}
private <T> Mono<? extends ResponseEntity<Flux<T>>> handlerEntityFlux(ClientResponse response, Flux<T> body) {
ResponseEntity<Flux<T>> entity = new ResponseEntity<>(
body.onErrorResume(WebClientUtils.WRAP_EXCEPTION_PREDICATE, exceptionWrappingFunction(response)),
response.headers().asHttpHeaders(),
response.rawStatusCode());
Mono<ResponseEntity<Flux<T>>> result = applyStatusHandlers(response);
return (result != null ? result.defaultIfEmpty(entity) : Mono.just(entity));
}
private <T> Function<Throwable, Mono<? extends T>> exceptionWrappingFunction(ClientResponse response) {
return t -> response.createException().flatMap(ex -> Mono.error(ex.initCause(t)));
}

23
spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultWebClientTests.java

@ -30,10 +30,12 @@ import org.mockito.Captor; @@ -30,10 +30,12 @@ import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import org.springframework.core.NamedThreadLocal;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
@ -443,6 +445,27 @@ public class DefaultWebClientTests { @@ -443,6 +445,27 @@ public class DefaultWebClientTests {
verify(predicate2).test(HttpStatus.BAD_REQUEST);
}
@Test // gh-26069
public void onStatusHandlersApplyForToEntityMethods() {
ClientResponse response = ClientResponse.create(HttpStatus.BAD_REQUEST).build();
given(exchangeFunction.exchange(any())).willReturn(Mono.just(response));
WebClient.ResponseSpec spec = this.builder.build().get().uri("/path").retrieve();
testStatusHandlerForToEntity(spec.toEntity(String.class));
testStatusHandlerForToEntity(spec.toEntity(new ParameterizedTypeReference<String>() {}));
testStatusHandlerForToEntity(spec.toEntityList(String.class));
testStatusHandlerForToEntity(spec.toEntityList(new ParameterizedTypeReference<String>() {}));
testStatusHandlerForToEntity(spec.toEntityFlux(String.class));
testStatusHandlerForToEntity(spec.toEntityFlux(new ParameterizedTypeReference<String>() {}));
}
private void testStatusHandlerForToEntity(Publisher<?> responsePublisher) {
StepVerifier.create(responsePublisher).expectError(WebClientResponseException.class).verify();
}
private ClientRequest verifyAndGetRequest() {
ClientRequest request = this.captor.getValue();
verify(this.exchangeFunction).exchange(request);

Loading…
Cancel
Save