From 1e54916119c3efe108cfefe048383c7d6b424efa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Basl=C3=A9?= Date: Mon, 15 Apr 2024 14:20:48 +0200 Subject: [PATCH] Ensure multipart data is deleted in WebFlux when connection terminates Before this change temporary files would not consistently be deleted when the connection which uploads the multipart files closes naturally. This change uses the usingWhen Reactor operator to ensure that the termination of the connection doesn't prevent individual file parts from being deleted due to a cancellation signal. Closes gh-31217 --- .../adapter/DefaultServerWebExchange.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/web/server/adapter/DefaultServerWebExchange.java b/spring-web/src/main/java/org/springframework/web/server/adapter/DefaultServerWebExchange.java index 21a3193158e..2c6f73bfbfc 100644 --- a/spring-web/src/main/java/org/springframework/web/server/adapter/DefaultServerWebExchange.java +++ b/spring-web/src/main/java/org/springframework/web/server/adapter/DefaultServerWebExchange.java @@ -24,7 +24,9 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; +import java.util.stream.Collectors; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.context.ApplicationContext; @@ -249,13 +251,10 @@ public class DefaultServerWebExchange implements ServerWebExchange { public Mono cleanupMultipart() { return Mono.defer(() -> { if (this.multipartRead) { - return getMultipartData() - .onErrorComplete() - .flatMapIterable(Map::values) - .flatMapIterable(Function.identity()) - .flatMap(part -> part.delete() - .onErrorComplete()) - .then(); + return Mono.usingWhen(getMultipartData().onErrorComplete().map(this::collectParts), + parts -> Mono.empty(), + parts -> Flux.fromIterable(parts).flatMap(part -> part.delete().onErrorComplete()) + ); } else { return Mono.empty(); @@ -263,6 +262,10 @@ public class DefaultServerWebExchange implements ServerWebExchange { }); } + private List collectParts(MultiValueMap multipartData) { + return multipartData.values().stream().flatMap(List::stream).collect(Collectors.toList()); + } + @Override public LocaleContext getLocaleContext() { return this.localeContextResolver.resolveLocaleContext(this);