|
|
|
@ -52,8 +52,6 @@ import org.springframework.http.MediaType; |
|
|
|
import org.springframework.http.ReactiveHttpInputMessage; |
|
|
|
import org.springframework.http.ReactiveHttpInputMessage; |
|
|
|
import org.springframework.http.codec.HttpMessageReader; |
|
|
|
import org.springframework.http.codec.HttpMessageReader; |
|
|
|
import org.springframework.util.Assert; |
|
|
|
import org.springframework.util.Assert; |
|
|
|
import org.springframework.util.MimeType; |
|
|
|
|
|
|
|
import org.springframework.util.StreamUtils; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
* {@code HttpMessageReader} for parsing {@code "multipart/form-data"} requests |
|
|
|
* {@code HttpMessageReader} for parsing {@code "multipart/form-data"} requests |
|
|
|
@ -71,6 +69,8 @@ import org.springframework.util.StreamUtils; |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
public class SynchronossPartHttpMessageReader implements HttpMessageReader<Part> { |
|
|
|
public class SynchronossPartHttpMessageReader implements HttpMessageReader<Part> { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public List<MediaType> getReadableMediaTypes() { |
|
|
|
public List<MediaType> getReadableMediaTypes() { |
|
|
|
@ -88,7 +88,7 @@ public class SynchronossPartHttpMessageReader implements HttpMessageReader<Part> |
|
|
|
public Flux<Part> read(ResolvableType elementType, ReactiveHttpInputMessage message, |
|
|
|
public Flux<Part> read(ResolvableType elementType, ReactiveHttpInputMessage message, |
|
|
|
Map<String, Object> hints) { |
|
|
|
Map<String, Object> hints) { |
|
|
|
|
|
|
|
|
|
|
|
return Flux.create(new SynchronossPartGenerator(message)); |
|
|
|
return Flux.create(new SynchronossPartGenerator(message, this.bufferFactory)); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -109,9 +109,12 @@ public class SynchronossPartHttpMessageReader implements HttpMessageReader<Part> |
|
|
|
|
|
|
|
|
|
|
|
private final ReactiveHttpInputMessage inputMessage; |
|
|
|
private final ReactiveHttpInputMessage inputMessage; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private final DataBufferFactory bufferFactory; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
SynchronossPartGenerator(ReactiveHttpInputMessage inputMessage) { |
|
|
|
SynchronossPartGenerator(ReactiveHttpInputMessage inputMessage, DataBufferFactory factory) { |
|
|
|
this.inputMessage = inputMessage; |
|
|
|
this.inputMessage = inputMessage; |
|
|
|
|
|
|
|
this.bufferFactory = factory; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -119,7 +122,7 @@ public class SynchronossPartHttpMessageReader implements HttpMessageReader<Part> |
|
|
|
public void accept(FluxSink<Part> emitter) { |
|
|
|
public void accept(FluxSink<Part> emitter) { |
|
|
|
|
|
|
|
|
|
|
|
MultipartContext context = createMultipartContext(); |
|
|
|
MultipartContext context = createMultipartContext(); |
|
|
|
NioMultipartParserListener listener = new FluxSinkAdapterListener(emitter); |
|
|
|
NioMultipartParserListener listener = new FluxSinkAdapterListener(emitter, this.bufferFactory); |
|
|
|
NioMultipartParser parser = Multipart.multipart(context).forNIO(listener); |
|
|
|
NioMultipartParser parser = Multipart.multipart(context).forNIO(listener); |
|
|
|
|
|
|
|
|
|
|
|
this.inputMessage.getBody().subscribe(buffer -> { |
|
|
|
this.inputMessage.getBody().subscribe(buffer -> { |
|
|
|
@ -167,11 +170,14 @@ public class SynchronossPartHttpMessageReader implements HttpMessageReader<Part> |
|
|
|
|
|
|
|
|
|
|
|
private final FluxSink<Part> sink; |
|
|
|
private final FluxSink<Part> sink; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private final DataBufferFactory bufferFactory; |
|
|
|
|
|
|
|
|
|
|
|
private final AtomicInteger terminated = new AtomicInteger(0); |
|
|
|
private final AtomicInteger terminated = new AtomicInteger(0); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
FluxSinkAdapterListener(FluxSink<Part> sink) { |
|
|
|
FluxSinkAdapterListener(FluxSink<Part> sink, DataBufferFactory bufferFactory) { |
|
|
|
this.sink = sink; |
|
|
|
this.sink = sink; |
|
|
|
|
|
|
|
this.bufferFactory = bufferFactory; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -179,14 +185,17 @@ public class SynchronossPartHttpMessageReader implements HttpMessageReader<Part> |
|
|
|
public void onPartFinished(StreamStorage storage, Map<String, List<String>> headers) { |
|
|
|
public void onPartFinished(StreamStorage storage, Map<String, List<String>> headers) { |
|
|
|
HttpHeaders httpHeaders = new HttpHeaders(); |
|
|
|
HttpHeaders httpHeaders = new HttpHeaders(); |
|
|
|
httpHeaders.putAll(headers); |
|
|
|
httpHeaders.putAll(headers); |
|
|
|
this.sink.next(new SynchronossPart(httpHeaders, storage)); |
|
|
|
Part part = MultipartUtils.getFileName(httpHeaders) != null ? |
|
|
|
|
|
|
|
new SynchronossFilePart(httpHeaders, storage, this.bufferFactory) : |
|
|
|
|
|
|
|
new DefaultSynchronossPart(httpHeaders, storage, this.bufferFactory); |
|
|
|
|
|
|
|
this.sink.next(part); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public void onFormFieldPartFinished(String name, String value, Map<String, List<String>> headers) { |
|
|
|
public void onFormFieldPartFinished(String name, String value, Map<String, List<String>> headers) { |
|
|
|
HttpHeaders httpHeaders = new HttpHeaders(); |
|
|
|
HttpHeaders httpHeaders = new HttpHeaders(); |
|
|
|
httpHeaders.putAll(headers); |
|
|
|
httpHeaders.putAll(headers); |
|
|
|
this.sink.next(new SynchronossPart(httpHeaders, value)); |
|
|
|
this.sink.next(new SynchronossFormFieldPart(httpHeaders, this.bufferFactory, value)); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
@ -213,31 +222,18 @@ public class SynchronossPartHttpMessageReader implements HttpMessageReader<Part> |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private static class SynchronossPart implements Part { |
|
|
|
private static abstract class AbstractSynchronossPart implements Part { |
|
|
|
|
|
|
|
|
|
|
|
private final HttpHeaders headers; |
|
|
|
private final HttpHeaders headers; |
|
|
|
|
|
|
|
|
|
|
|
private final StreamStorage storage; |
|
|
|
private final DataBufferFactory bufferFactory; |
|
|
|
|
|
|
|
|
|
|
|
private final String content; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
SynchronossPart(HttpHeaders headers, StreamStorage storage) { |
|
|
|
|
|
|
|
Assert.notNull(headers, "HttpHeaders is required"); |
|
|
|
|
|
|
|
Assert.notNull(storage, "'storage' is required"); |
|
|
|
|
|
|
|
this.headers = headers; |
|
|
|
|
|
|
|
this.storage = storage; |
|
|
|
|
|
|
|
this.content = null; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
SynchronossPart(HttpHeaders headers, String content) { |
|
|
|
AbstractSynchronossPart(HttpHeaders headers, DataBufferFactory bufferFactory) { |
|
|
|
Assert.notNull(headers, "HttpHeaders is required"); |
|
|
|
Assert.notNull(headers, "HttpHeaders is required"); |
|
|
|
Assert.notNull(content, "'content' is required"); |
|
|
|
Assert.notNull(bufferFactory, "'bufferFactory' is required"); |
|
|
|
this.headers = headers; |
|
|
|
this.headers = headers; |
|
|
|
this.storage = null; |
|
|
|
this.bufferFactory = bufferFactory; |
|
|
|
this.content = content; |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -251,52 +247,53 @@ public class SynchronossPartHttpMessageReader implements HttpMessageReader<Part> |
|
|
|
return this.headers; |
|
|
|
return this.headers; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
protected DataBufferFactory getBufferFactory() { |
|
|
|
public Optional<String> getFilename() { |
|
|
|
return this.bufferFactory; |
|
|
|
return Optional.ofNullable(MultipartUtils.getFileName(this.headers)); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
private static class DefaultSynchronossPart extends AbstractSynchronossPart { |
|
|
|
public Mono<String> getContentAsString() { |
|
|
|
|
|
|
|
if (this.content != null) { |
|
|
|
|
|
|
|
return Mono.just(this.content); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
try { |
|
|
|
|
|
|
|
InputStream inputStream = this.storage.getInputStream(); |
|
|
|
|
|
|
|
Charset charset = getCharset(); |
|
|
|
|
|
|
|
return Mono.just(StreamUtils.copyToString(inputStream, charset)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
catch (IOException e) { |
|
|
|
|
|
|
|
return Mono.error(new IllegalStateException( |
|
|
|
|
|
|
|
"Error while reading part content as a string", e)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private Charset getCharset() { |
|
|
|
private final StreamStorage storage; |
|
|
|
return Optional.ofNullable(this.headers.getContentType()) |
|
|
|
|
|
|
|
.map(MimeType::getCharset).orElse(StandardCharsets.UTF_8); |
|
|
|
|
|
|
|
|
|
|
|
DefaultSynchronossPart(HttpHeaders headers, StreamStorage storage, DataBufferFactory factory) { |
|
|
|
|
|
|
|
super(headers, factory); |
|
|
|
|
|
|
|
Assert.notNull(storage, "'storage' is required"); |
|
|
|
|
|
|
|
this.storage = storage; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public Flux<DataBuffer> getContent() { |
|
|
|
public Flux<DataBuffer> getContent() { |
|
|
|
if (this.content != null) { |
|
|
|
|
|
|
|
DataBuffer buffer = this.bufferFactory.allocateBuffer(this.content.length()); |
|
|
|
|
|
|
|
buffer.write(this.content.getBytes()); |
|
|
|
|
|
|
|
return Flux.just(buffer); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
InputStream inputStream = this.storage.getInputStream(); |
|
|
|
InputStream inputStream = this.storage.getInputStream(); |
|
|
|
return DataBufferUtils.read(inputStream, this.bufferFactory, 4096); |
|
|
|
return DataBufferUtils.read(inputStream, getBufferFactory(), 4096); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
protected StreamStorage getStorage() { |
|
|
|
|
|
|
|
return this.storage; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private static class SynchronossFilePart extends DefaultSynchronossPart implements FilePart { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public SynchronossFilePart(HttpHeaders headers, StreamStorage storage, DataBufferFactory factory) { |
|
|
|
|
|
|
|
super(headers, storage, factory); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
|
|
|
public String getFilename() { |
|
|
|
|
|
|
|
return MultipartUtils.getFileName(getHeaders()); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public Mono<Void> transferTo(File destination) { |
|
|
|
public Mono<Void> transferTo(File destination) { |
|
|
|
if (this.storage == null || !getFilename().isPresent()) { |
|
|
|
|
|
|
|
return Mono.error(new IllegalStateException("The part does not represent a file.")); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
ReadableByteChannel input = null; |
|
|
|
ReadableByteChannel input = null; |
|
|
|
FileChannel output = null; |
|
|
|
FileChannel output = null; |
|
|
|
try { |
|
|
|
try { |
|
|
|
input = Channels.newChannel(this.storage.getInputStream()); |
|
|
|
input = Channels.newChannel(getStorage().getInputStream()); |
|
|
|
output = new FileOutputStream(destination).getChannel(); |
|
|
|
output = new FileOutputStream(destination).getChannel(); |
|
|
|
|
|
|
|
|
|
|
|
long size = (input instanceof FileChannel ? ((FileChannel) input).size() : Long.MAX_VALUE); |
|
|
|
long size = (input instanceof FileChannel ? ((FileChannel) input).size() : Long.MAX_VALUE); |
|
|
|
@ -332,4 +329,34 @@ public class SynchronossPartHttpMessageReader implements HttpMessageReader<Part> |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private static class SynchronossFormFieldPart extends AbstractSynchronossPart implements FormFieldPart { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private final String content; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
SynchronossFormFieldPart(HttpHeaders headers, DataBufferFactory bufferFactory, String content) { |
|
|
|
|
|
|
|
super(headers, bufferFactory); |
|
|
|
|
|
|
|
this.content = content; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
|
|
|
public String getValue() { |
|
|
|
|
|
|
|
return this.content; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
|
|
|
public Flux<DataBuffer> getContent() { |
|
|
|
|
|
|
|
byte[] bytes = this.content.getBytes(getCharset()); |
|
|
|
|
|
|
|
DataBuffer buffer = getBufferFactory().allocateBuffer(bytes.length); |
|
|
|
|
|
|
|
buffer.write(bytes); |
|
|
|
|
|
|
|
return Flux.just(buffer); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private Charset getCharset() { |
|
|
|
|
|
|
|
return Optional.ofNullable(MultipartUtils.getCharEncoding(getHeaders())) |
|
|
|
|
|
|
|
.map(Charset::forName).orElse(StandardCharsets.UTF_8); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|