|
|
|
|
@ -543,12 +543,6 @@ class DefaultWebClient implements WebClient {
@@ -543,12 +543,6 @@ class DefaultWebClient implements WebClient {
|
|
|
|
|
return this.responseMono.flatMap(response -> handleBodyMono(response, response.bodyToMono(elementTypeRef))); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private <T> Mono<T> handleBodyMono(ClientResponse response, Mono<T> body) { |
|
|
|
|
body = body.onErrorResume(WebClientUtils.WRAP_EXCEPTION_PREDICATE, exceptionWrappingFunction(response)); |
|
|
|
|
Mono<T> result = statusHandlers(response); |
|
|
|
|
return (result != null ? result.switchIfEmpty(body) : body); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public <T> Flux<T> bodyToFlux(Class<T> elementClass) { |
|
|
|
|
Assert.notNull(elementClass, "Class must not be null"); |
|
|
|
|
@ -561,45 +555,6 @@ class DefaultWebClient implements WebClient {
@@ -561,45 +555,6 @@ class DefaultWebClient implements WebClient {
|
|
|
|
|
return this.responseMono.flatMapMany(response -> handleBodyFlux(response, response.bodyToFlux(elementTypeRef))); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private <T> Publisher<T> handleBodyFlux(ClientResponse response, Flux<T> body) { |
|
|
|
|
body = body.onErrorResume(WebClientUtils.WRAP_EXCEPTION_PREDICATE, exceptionWrappingFunction(response)); |
|
|
|
|
Mono<T> result = statusHandlers(response); |
|
|
|
|
return (result != null ? result.flux().switchIfEmpty(body) : body); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Nullable |
|
|
|
|
private <T> Mono<T> statusHandlers(ClientResponse response) { |
|
|
|
|
int statusCode = response.rawStatusCode(); |
|
|
|
|
for (StatusHandler handler : this.statusHandlers) { |
|
|
|
|
if (handler.test(statusCode)) { |
|
|
|
|
Mono<? extends Throwable> exMono; |
|
|
|
|
try { |
|
|
|
|
exMono = handler.apply(response); |
|
|
|
|
exMono = exMono.flatMap(ex -> releaseIfNotConsumed(response, ex)); |
|
|
|
|
exMono = exMono.onErrorResume(ex -> releaseIfNotConsumed(response, ex)); |
|
|
|
|
} |
|
|
|
|
catch (Throwable ex2) { |
|
|
|
|
exMono = releaseIfNotConsumed(response, ex2); |
|
|
|
|
} |
|
|
|
|
Mono<T> result = exMono.flatMap(Mono::error); |
|
|
|
|
HttpRequest request = this.requestSupplier.get(); |
|
|
|
|
return insertCheckpoint(result, statusCode, request); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return null; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private <T> Mono<T> insertCheckpoint(Mono<T> result, int statusCode, HttpRequest request) { |
|
|
|
|
String httpMethod = request.getMethodValue(); |
|
|
|
|
URI uri = request.getURI(); |
|
|
|
|
String description = statusCode + " from " + httpMethod + " " + uri + " [DefaultWebClient]"; |
|
|
|
|
return result.checkpoint(description); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private <T> Function<Throwable, Mono<? extends T>> exceptionWrappingFunction(ClientResponse response) { |
|
|
|
|
return t -> response.createException().flatMap(ex -> Mono.error(ex.initCause(t))); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public <T> Mono<ResponseEntity<T>> toEntity(Class<T> bodyClass) { |
|
|
|
|
return this.responseMono.flatMap(response -> |
|
|
|
|
@ -652,6 +607,51 @@ class DefaultWebClient implements WebClient {
@@ -652,6 +607,51 @@ class DefaultWebClient implements WebClient {
|
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private <T> Mono<T> handleBodyMono(ClientResponse response, Mono<T> body) { |
|
|
|
|
body = body.onErrorResume(WebClientUtils.WRAP_EXCEPTION_PREDICATE, exceptionWrappingFunction(response)); |
|
|
|
|
Mono<T> result = applyStatusHandlers(response); |
|
|
|
|
return (result != null ? result.switchIfEmpty(body) : body); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private <T> Publisher<T> handleBodyFlux(ClientResponse response, Flux<T> body) { |
|
|
|
|
body = body.onErrorResume(WebClientUtils.WRAP_EXCEPTION_PREDICATE, exceptionWrappingFunction(response)); |
|
|
|
|
Mono<T> result = applyStatusHandlers(response); |
|
|
|
|
return (result != null ? result.flux().switchIfEmpty(body) : body); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private <T> Function<Throwable, Mono<? extends T>> exceptionWrappingFunction(ClientResponse response) { |
|
|
|
|
return t -> response.createException().flatMap(ex -> Mono.error(ex.initCause(t))); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Nullable |
|
|
|
|
private <T> Mono<T> applyStatusHandlers(ClientResponse response) { |
|
|
|
|
int statusCode = response.rawStatusCode(); |
|
|
|
|
for (StatusHandler handler : this.statusHandlers) { |
|
|
|
|
if (handler.test(statusCode)) { |
|
|
|
|
Mono<? extends Throwable> exMono; |
|
|
|
|
try { |
|
|
|
|
exMono = handler.apply(response); |
|
|
|
|
exMono = exMono.flatMap(ex -> releaseIfNotConsumed(response, ex)); |
|
|
|
|
exMono = exMono.onErrorResume(ex -> releaseIfNotConsumed(response, ex)); |
|
|
|
|
} |
|
|
|
|
catch (Throwable ex2) { |
|
|
|
|
exMono = releaseIfNotConsumed(response, ex2); |
|
|
|
|
} |
|
|
|
|
Mono<T> result = exMono.flatMap(Mono::error); |
|
|
|
|
HttpRequest request = this.requestSupplier.get(); |
|
|
|
|
return insertCheckpoint(result, statusCode, request); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return null; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private <T> Mono<T> insertCheckpoint(Mono<T> result, int statusCode, HttpRequest request) { |
|
|
|
|
String httpMethod = request.getMethodValue(); |
|
|
|
|
URI uri = request.getURI(); |
|
|
|
|
String description = statusCode + " from " + httpMethod + " " + uri + " [DefaultWebClient]"; |
|
|
|
|
return result.checkpoint(description); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private static class StatusHandler { |
|
|
|
|
|
|
|
|
|
|