@ -155,29 +155,22 @@ public class PartEventHttpMessageReader extends LoggingCodecSupport implements H
@@ -155,29 +155,22 @@ public class PartEventHttpMessageReader extends LoggingCodecSupport implements H
AtomicInteger partCount = new AtomicInteger ( ) ;
return allPartsTokens
. windowUntil ( t - > t instanceof MultipartParser . HeadersToken , true )
. concatMap ( partTokens - > {
if ( tooManyParts ( partCount ) ) {
return Mono . error ( new DecodingException ( "Too many parts (" + partCount . get ( ) + "/" +
this . maxParts + " allowed)" ) ) ;
}
else {
return partTokens . switchOnFirst ( ( signal , flux ) - > {
if ( signal . hasValue ( ) ) {
MultipartParser . HeadersToken headersToken = ( MultipartParser . HeadersToken ) signal . get ( ) ;
Assert . state ( headersToken ! = null , "Signal should be headers token" ) ;
HttpHeaders headers = headersToken . headers ( ) ;
Flux < MultipartParser . BodyToken > bodyTokens = flux . ofType (
MultipartParser . BodyToken . class ) ;
return createEvents ( headers , bodyTokens ) ;
}
else {
. concatMap ( partTokens - > partTokens
. switchOnFirst ( ( signal , flux ) - > {
if ( ! signal . hasValue ( ) ) {
// complete or error signal
return flux . cast ( PartEvent . class ) ;
}
} ) ;
}
} ) ;
else if ( tooManyParts ( partCount ) ) {
return Mono . error ( new DecodingException ( "Too many parts (" + partCount . get ( ) +
"/" + this . maxParts + " allowed)" ) ) ;
}
MultipartParser . HeadersToken headersToken = ( MultipartParser . HeadersToken ) signal . get ( ) ;
Assert . state ( headersToken ! = null , "Signal should be headers token" ) ;
HttpHeaders headers = headersToken . headers ( ) ;
return createEvents ( headers , flux . ofType ( MultipartParser . BodyToken . class ) ) ;
} ) ) ;
} ) ;
}