Browse Source

Support multiple boundary buffers in MultipartParser

In a small minority of cases, the multipart boundary can spread across
three incoming buffers.

Prior to this commit, MultipartParser.BodyState only supported two
buffers. If the boundary is spread across three buffers, the first
buffer of the three is sent as a whole, even though it contains the
first bytes of the boundary.

This commit fixes this bug, by enqueuing all prior buffers in a queue,
and emitting the ones that cannot contain boundary bytes.

Closes gh-27939
pull/28119/head
Arjen Poutsma 4 years ago
parent
commit
caa13690e8
  1. 91
      spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartParser.java
  2. 3
      spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/MultipartIntegrationTests.java

91
spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartParser.java

@ -17,8 +17,12 @@
package org.springframework.http.codec.multipart; package org.springframework.http.codec.multipart;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Deque;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -477,11 +481,14 @@ final class MultipartParser extends BaseSubscriber<DataBuffer> {
private final DataBufferUtils.Matcher boundary; private final DataBufferUtils.Matcher boundary;
private final AtomicReference<DataBuffer> previous = new AtomicReference<>(); private final int boundaryLength;
private final Deque<DataBuffer> queue = new ConcurrentLinkedDeque<>();
public BodyState() { public BodyState() {
this.boundary = DataBufferUtils.matcher( byte[] delimiter = MultipartUtils.concat(CR_LF, TWO_HYPHENS, MultipartParser.this.boundary);
MultipartUtils.concat(CR_LF, TWO_HYPHENS, MultipartParser.this.boundary)); this.boundary = DataBufferUtils.matcher(delimiter);
this.boundaryLength = delimiter.length;
} }
/** /**
@ -499,31 +506,38 @@ final class MultipartParser extends BaseSubscriber<DataBuffer> {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Boundary found @" + endIdx + " in " + buffer); logger.trace("Boundary found @" + endIdx + " in " + buffer);
} }
int len = endIdx - buffer.readPosition() - this.boundary.delimiter().length + 1; int len = endIdx - buffer.readPosition() - this.boundaryLength + 1;
if (len > 0) { if (len > 0) {
// buffer contains complete delimiter, let's slice it and flush it // whole boundary in buffer.
// slice off the body part, and flush
DataBuffer body = buffer.retainedSlice(buffer.readPosition(), len); DataBuffer body = buffer.retainedSlice(buffer.readPosition(), len);
enqueue(body); enqueue(body);
enqueue(null); flush();
} }
else if (len < 0) { else if (len < 0) {
// buffer starts with the end of the delimiter, let's slice the previous buffer and flush it // boundary spans multiple buffers, and we've just found the end
DataBuffer previous = this.previous.get(); // iterate over buffers in reverse order
int prevLen = previous.readableByteCount() + len; DataBuffer prev;
if (prevLen > 0) { while ((prev = this.queue.pollLast()) != null) {
DataBuffer body = previous.retainedSlice(previous.readPosition(), prevLen); int prevLen = prev.readableByteCount() + len;
DataBufferUtils.release(previous); if (prevLen > 0) {
this.previous.set(body); // slice body part of previous buffer, and flush it
enqueue(null); DataBuffer body = prev.retainedSlice(prev.readPosition(), prevLen);
} DataBufferUtils.release(prev);
else { enqueue(body);
DataBufferUtils.release(previous); flush();
this.previous.set(null); break;
}
else {
// previous buffer only contains boundary bytes
DataBufferUtils.release(prev);
len += prev.readableByteCount();
}
} }
} }
else /* if (sliceLength == 0) */ { else /* if (len == 0) */ {
// buffer starts with complete delimiter, flush out the previous buffer // buffer starts with complete delimiter, flush out the previous buffers
enqueue(null); flush();
} }
DataBuffer remainder = MultipartUtils.sliceFrom(buffer, endIdx); DataBuffer remainder = MultipartUtils.sliceFrom(buffer, endIdx);
@ -538,13 +552,32 @@ final class MultipartParser extends BaseSubscriber<DataBuffer> {
} }
/** /**
* Stores the given buffer and sends out the previous buffer. * Store the given buffer. Emit buffers that cannot contain boundary bytes,
* by iterating over the queue in reverse order, and summing buffer sizes.
* The first buffer that passes the boundary length and subsequent buffers
* are emitted (in the correct, non-reverse order).
*/ */
private void enqueue(@Nullable DataBuffer buf) { private void enqueue(DataBuffer buf) {
DataBuffer previous = this.previous.getAndSet(buf); this.queue.add(buf);
if (previous != null) {
emitBody(previous); int len = 0;
Deque<DataBuffer> emit = new ArrayDeque<>();
for (Iterator<DataBuffer> iterator = this.queue.descendingIterator(); iterator.hasNext(); ) {
DataBuffer previous = iterator.next();
if (len > this.boundaryLength) {
// addFirst to negate iterating in reverse order
emit.addFirst(previous);
iterator.remove();
}
len += previous.readableByteCount();
} }
emit.forEach(MultipartParser.this::emitBody);
}
private void flush() {
this.queue.forEach(MultipartParser.this::emitBody);
this.queue.clear();
} }
@Override @Override
@ -556,10 +589,8 @@ final class MultipartParser extends BaseSubscriber<DataBuffer> {
@Override @Override
public void dispose() { public void dispose() {
DataBuffer previous = this.previous.getAndSet(null); this.queue.forEach(DataBufferUtils::release);
if (previous != null) { this.queue.clear();
DataBufferUtils.release(previous);
}
} }
@Override @Override

3
spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/MultipartIntegrationTests.java

@ -211,10 +211,9 @@ class MultipartIntegrationTests extends AbstractHttpHandlerIntegrationTests {
private static void verifyContents(Path tempFile, Resource resource) { private static void verifyContents(Path tempFile, Resource resource) {
try { try {
byte[] tempBytes = Files.readAllBytes(tempFile);
// Use FileCopyUtils since the resource might reside in a JAR instead of in the file system. // Use FileCopyUtils since the resource might reside in a JAR instead of in the file system.
byte[] resourceBytes = FileCopyUtils.copyToByteArray(resource.getInputStream()); byte[] resourceBytes = FileCopyUtils.copyToByteArray(resource.getInputStream());
assertThat(tempBytes).isEqualTo(resourceBytes); assertThat(tempFile).hasBinaryContent(resourceBytes);
} }
catch (IOException ex) { catch (IOException ex) {
throw new AssertionError(ex); throw new AssertionError(ex);

Loading…
Cancel
Save