@ -17,7 +17,6 @@
@@ -17,7 +17,6 @@
package org.springframework.http.codec.multipart ;
import java.io.IOException ;
import java.io.UncheckedIOException ;
import java.nio.channels.Channels ;
import java.nio.channels.FileChannel ;
import java.nio.channels.ReadableByteChannel ;
@ -31,6 +30,7 @@ import java.util.List;
@@ -31,6 +30,7 @@ import java.util.List;
import java.util.Map ;
import java.util.Optional ;
import java.util.concurrent.atomic.AtomicInteger ;
import java.util.concurrent.atomic.AtomicReference ;
import java.util.function.Consumer ;
import org.synchronoss.cloud.nio.multipart.DefaultPartBodyStreamStorageFactory ;
@ -46,6 +46,7 @@ import reactor.core.publisher.Flux;
@@ -46,6 +46,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink ;
import reactor.core.publisher.Mono ;
import reactor.core.publisher.SignalType ;
import reactor.core.scheduler.Schedulers ;
import org.springframework.core.ResolvableType ;
import org.springframework.core.codec.DecodingException ;
@ -88,7 +89,7 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem
@@ -88,7 +89,7 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem
private int maxParts = - 1 ;
private Path fileStorageDirectory = createTempDirectory ( ) ;
private final AtomicReference < Path > fileStorageDirectory = new AtomicReference < > ( ) ;
/ * *
@ -163,7 +164,7 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem
@@ -163,7 +164,7 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem
if ( ! Files . exists ( fileStorageDirectory ) ) {
Files . createDirectory ( fileStorageDirectory ) ;
}
this . fileStorageDirectory = fileStorageDirectory ;
this . fileStorageDirectory . set ( fileStorageDirectory ) ;
}
@ -189,15 +190,16 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem
@@ -189,15 +190,16 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem
@Override
public Flux < Part > read ( ResolvableType elementType , ReactiveHttpInputMessage message , Map < String , Object > hints ) {
return Flux . create ( new SynchronossPartGenerator ( message , this . fileStorageDirectory ) )
. doOnNext ( part - > {
if ( ! Hints . isLoggingSuppressed ( hints ) ) {
LogFormatUtils . traceDebug ( logger , traceOn - > Hints . getLogPrefix ( hints ) + "Parsed " +
( isEnableLoggingRequestDetails ( ) ?
LogFormatUtils . formatValue ( part , ! traceOn ) :
"parts '" + part . name ( ) + "' (content masked)" ) ) ;
}
} ) ;
return getFileStorageDirectory ( ) . flatMapMany ( directory - >
Flux . create ( new SynchronossPartGenerator ( message , directory ) )
. doOnNext ( part - > {
if ( ! Hints . isLoggingSuppressed ( hints ) ) {
LogFormatUtils . traceDebug ( logger , traceOn - > Hints . getLogPrefix ( hints ) + "Parsed " +
( isEnableLoggingRequestDetails ( ) ?
LogFormatUtils . formatValue ( part , ! traceOn ) :
"parts '" + part . name ( ) + "' (content masked)" ) ) ;
}
} ) ) ;
}
@Override
@ -205,13 +207,29 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem
@@ -205,13 +207,29 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem
return Mono . error ( new UnsupportedOperationException ( "Cannot read multipart request body into single Part" ) ) ;
}
private static Path createTempDirectory ( ) {
try {
return Files . createTempDirectory ( FILE_STORAGE_DIRECTORY_PREFIX ) ;
}
catch ( IOException ex ) {
throw new UncheckedIOException ( ex ) ;
}
private Mono < Path > getFileStorageDirectory ( ) {
return Mono . defer ( ( ) - > {
Path directory = this . fileStorageDirectory . get ( ) ;
if ( directory ! = null ) {
return Mono . just ( directory ) ;
}
else {
return Mono . fromCallable ( ( ) - > {
Path tempDirectory = Files . createTempDirectory ( FILE_STORAGE_DIRECTORY_PREFIX ) ;
if ( this . fileStorageDirectory . compareAndSet ( null , tempDirectory ) ) {
return tempDirectory ;
}
else {
try {
Files . delete ( tempDirectory ) ;
}
catch ( IOException ignored ) {
}
return this . fileStorageDirectory . get ( ) ;
}
} ) . subscribeOn ( Schedulers . boundedElastic ( ) ) ;
}
} ) ;
}