From afd248da8a14bdc2aa6ef7569d3a34f8caa64882 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Fri, 19 Jan 2018 20:55:22 -0500 Subject: [PATCH] MultipartHttpMessageWriter consumes source once only The previous fix #09f1f7 did not actually address the issue but only moved it further down, so instead of the subscribe(), it was consuming it inside the MultipartHttpMessageWriter#write method which returned this.body.then(), and then again for the actual request body writing. In this commit MultipartHttpMessageWriter#write returns Mono.empty() since we don't actually want to write the part content from there, but only want to access it as soon as it is availabele, for writing to the request body. Issue: SPR-16402 --- .../multipart/MultipartHttpMessageWriter.java | 22 ++++++---- .../MultipartHttpMessageWriterTests.java | 41 +++++++++++++++++++ 2 files changed, 55 insertions(+), 8 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriter.java b/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriter.java index 63c0f9bfd40..8f024f7602f 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriter.java +++ b/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriter.java @@ -269,13 +269,18 @@ public class MultipartHttpMessageWriter implements HttpMessageWriter bodyPublisher = body instanceof Publisher ? (Publisher) body : Mono.just(body); - Mono partWritten = ((HttpMessageWriter) writer.get()) + // The writer will call MultipartHttpOutputMessage#write which doesn't actually write + // but only stores the body Flux and returns Mono.empty(). + + Mono partContentReady = ((HttpMessageWriter) writer.get()) .write(bodyPublisher, resolvableType, contentType, outputMessage, Collections.emptyMap()); - return Flux.concat( - Mono.just(generateBoundaryLine(boundary)), - partWritten.thenMany(Flux.defer(outputMessage::getBody)), - Mono.just(generateNewLine())); + // After partContentReady, we can access the part content from MultipartHttpOutputMessage + // and use it for writing to the actual request body + + Flux partContent = partContentReady.thenMany(Flux.defer(outputMessage::getBody)); + + return Flux.concat(Mono.just(generateBoundaryLine(boundary)), partContent, Mono.just(generateNewLine())); } @@ -353,7 +358,9 @@ public class MultipartHttpMessageWriter implements HttpMessageWriter setComplete() { - return (this.body != null ? this.body.then() : - Mono.error(new IllegalStateException("Body has not been written yet"))); + return Mono.error(new UnsupportedOperationException()); } } diff --git a/spring-web/src/test/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriterTests.java b/spring-web/src/test/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriterTests.java index 1d42d34d6f3..81594d3e2d9 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriterTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriterTests.java @@ -16,6 +16,7 @@ package org.springframework.http.codec.multipart; +import java.io.IOException; import java.time.Duration; import java.util.Collections; import java.util.List; @@ -25,6 +26,7 @@ import org.junit.Test; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.UnicastProcessor; import org.springframework.core.ResolvableType; import org.springframework.core.codec.StringDecoder; @@ -146,9 +148,48 @@ public class MultipartHttpMessageWriterTests { Collections.emptyMap()).block(Duration.ZERO); assertEquals("foobarbaz", value); + } + + @Test // SPR-16402 + public void singleSubscriberWithResource() throws IOException { + UnicastProcessor processor = UnicastProcessor.create(); + Resource logo = new ClassPathResource("/org/springframework/http/converter/logo.jpg"); + Mono.just(logo).subscribe(processor); + + MultipartBodyBuilder bodyBuilder = new MultipartBodyBuilder(); + bodyBuilder.asyncPart("logo", processor, Resource.class); + + Mono>> result = Mono.just(bodyBuilder.build()); + + MockServerHttpResponse response = new MockServerHttpResponse(); + Map hints = Collections.emptyMap(); + this.writer.write(result, null, MediaType.MULTIPART_FORM_DATA, response, hints).block(); + + MultiValueMap requestParts = parse(response, hints); + assertEquals(1, requestParts.size()); + Part part = requestParts.getFirst("logo"); + assertEquals("logo", part.name()); +// TODO: a Resource written as an async part doesn't have a file name in the contentDisposition +// assertTrue(part instanceof FilePart); +// assertEquals("logo.jpg", ((FilePart) part).filename()); + assertEquals(MediaType.IMAGE_JPEG, part.headers().getContentType()); + assertEquals(logo.getFile().length(), part.headers().getContentLength()); + } + @Test // SPR-16402 + public void singleSubscriberWithStrings() { + UnicastProcessor processor = UnicastProcessor.create(); + Flux.just("foo", "bar", "baz").subscribe(processor); + MultipartBodyBuilder bodyBuilder = new MultipartBodyBuilder(); + bodyBuilder.asyncPart("name", processor, String.class); + + Mono>> result = Mono.just(bodyBuilder.build()); + + MockServerHttpResponse response = new MockServerHttpResponse(); + Map hints = Collections.emptyMap(); + this.writer.write(result, null, MediaType.MULTIPART_FORM_DATA, response, hints).block(); } private MultiValueMap parse(MockServerHttpResponse response, Map hints) {