@ -39,6 +39,7 @@ import com.fasterxml.jackson.databind.ser.FilterProvider;
@@ -39,6 +39,7 @@ import com.fasterxml.jackson.databind.ser.FilterProvider;
import org.reactivestreams.Publisher ;
import reactor.core.publisher.Flux ;
import reactor.core.publisher.Mono ;
import reactor.util.context.ContextView ;
import org.springframework.core.MethodParameter ;
import org.springframework.core.ResolvableType ;
@ -53,7 +54,6 @@ import org.springframework.http.codec.HttpMessageEncoder;
@@ -53,7 +54,6 @@ import org.springframework.http.codec.HttpMessageEncoder;
import org.springframework.http.converter.json.MappingJacksonValue ;
import org.springframework.http.server.reactive.ServerHttpRequest ;
import org.springframework.http.server.reactive.ServerHttpResponse ;
import org.springframework.lang.NonNull ;
import org.springframework.lang.Nullable ;
import org.springframework.util.Assert ;
import org.springframework.util.CollectionUtils ;
@ -87,6 +87,7 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple
@@ -87,6 +87,7 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple
private final List < MediaType > streamingMediaTypes = new ArrayList < > ( 1 ) ;
/ * *
* Constructor with a Jackson { @link ObjectMapper } to use .
* /
@ -148,89 +149,95 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple
@@ -148,89 +149,95 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple
Assert . notNull ( bufferFactory , "'bufferFactory' must not be null" ) ;
Assert . notNull ( elementType , "'elementType' must not be null" ) ;
if ( inputStream instanceof Mono ) {
return Mono . from ( inputStream )
. flatMap ( value - > createEncodingToolsForStream ( value , elementType , mimeType , hints )
. map ( tools - > encodeValue ( value , tools . mapper ( ) , tools . writer ( ) ,
bufferFactory , mimeType , hints ) ) )
. flux ( ) ;
}
return Flux . deferContextual ( contextView - > {
try {
ObjectMapper mapper = selectObjectMapper ( elementType , mimeType ) ;
if ( mapper = = null ) {
throw new IllegalStateException ( "No ObjectMapper for " + elementType ) ;
}
ObjectWriter writer = createObjectWriter ( mapper , elementType , mimeType , null , hints ) ;
ByteArrayBuilder byteBuilder = new ByteArrayBuilder ( writer . getFactory ( ) . _getBufferRecycler ( ) ) ;
JsonEncoding encoding = getJsonEncoding ( mimeType ) ;
JsonGenerator generator = mapper . getFactory ( ) . createGenerator ( byteBuilder , encoding ) ;
SequenceWriter sequenceWriter = writer . writeValues ( generator ) ;
byte [ ] separator = getStreamingMediaTypeSeparator ( mimeType ) ;
Flux < DataBuffer > dataBufferFlux ;
Map < String , Object > hintsToUse = contextView . isEmpty ( ) ? hints :
Hints . merge ( hints , ContextView . class . getName ( ) , contextView ) ;
if ( separator ! = null ) {
dataBufferFlux = Flux . from ( inputStream ) . map ( value - > encodeStreamingValue (
value , bufferFactory , hints , sequenceWriter , byteBuilder , EMPTY_BYTES , separator ) ) ;
if ( inputStream instanceof Mono ) {
return Mono . from ( inputStream )
. map ( value - > encodeValue ( value , bufferFactory , elementType , mimeType , hintsToUse ) )
. flux ( ) ;
}
else {
JsonArrayJoinHelper helper = new JsonArrayJoinHelper ( ) ;
// Do not prepend JSON array prefix until first signal is known, onNext vs onError
// Keeps response not committed for error handling
dataBufferFlux = Flux . from ( inputStream )
. map ( value - > {
byte [ ] prefix = helper . getPrefix ( ) ;
byte [ ] delimiter = helper . getDelimiter ( ) ;
DataBuffer dataBuffer = encodeStreamingValue (
value , bufferFactory , hints , sequenceWriter , byteBuilder , delimiter , EMPTY_BYTES ) ;
return ( prefix . length > 0 ?
bufferFactory . join ( Arrays . asList ( bufferFactory . wrap ( prefix ) , dataBuffer ) ) :
dataBuffer ) ;
} )
. concatWith ( Mono . fromCallable ( ( ) - > bufferFactory . wrap ( helper . getSuffix ( ) ) ) ) ;
try {
ObjectMapper mapper = selectObjectMapper ( elementType , mimeType ) ;
if ( mapper = = null ) {
throw new IllegalStateException ( "No ObjectMapper for " + elementType ) ;
}
ObjectWriter writer = createObjectWriter ( mapper , elementType , mimeType , null , hintsToUse ) ;
ByteArrayBuilder byteBuilder = new ByteArrayBuilder ( writer . getFactory ( ) . _getBufferRecycler ( ) ) ;
JsonEncoding encoding = getJsonEncoding ( mimeType ) ;
JsonGenerator generator = mapper . getFactory ( ) . createGenerator ( byteBuilder , encoding ) ;
SequenceWriter sequenceWriter = writer . writeValues ( generator ) ;
byte [ ] separator = getStreamingMediaTypeSeparator ( mimeType ) ;
Flux < DataBuffer > dataBufferFlux ;
if ( separator ! = null ) {
dataBufferFlux = Flux . from ( inputStream ) . map ( value - > encodeStreamingValue (
value , bufferFactory , hintsToUse , sequenceWriter , byteBuilder , EMPTY_BYTES , separator ) ) ;
}
else {
JsonArrayJoinHelper helper = new JsonArrayJoinHelper ( ) ;
// Do not prepend JSON array prefix until first signal is known, onNext vs onError
// Keeps response not committed for error handling
dataBufferFlux = Flux . from ( inputStream )
. map ( value - > {
byte [ ] prefix = helper . getPrefix ( ) ;
byte [ ] delimiter = helper . getDelimiter ( ) ;
DataBuffer dataBuffer = encodeStreamingValue (
value , bufferFactory , hintsToUse , sequenceWriter , byteBuilder ,
delimiter , EMPTY_BYTES ) ;
return ( prefix . length > 0 ?
bufferFactory . join ( Arrays . asList ( bufferFactory . wrap ( prefix ) , dataBuffer ) ) :
dataBuffer ) ;
} )
. concatWith ( Mono . fromCallable ( ( ) - > bufferFactory . wrap ( helper . getSuffix ( ) ) ) ) ;
}
return dataBufferFlux
. doOnNext ( dataBuffer - > Hints . touchDataBuffer ( dataBuffer , hintsToUse , logger ) )
. doAfterTerminate ( ( ) - > {
try {
byteBuilder . release ( ) ;
generator . close ( ) ;
}
catch ( IOException ex ) {
logger . error ( "Could not close Encoder resources" , ex ) ;
}
} ) ;
}
return dataBufferFlux
. doOnNext ( dataBuffer - > Hints . touchDataBuffer ( dataBuffer , hints , logger ) )
. doAfterTerminate ( ( ) - > {
try {
byteBuilder . release ( ) ;
generator . close ( ) ;
}
catch ( IOException ex ) {
logger . error ( "Could not close Encoder resources" , ex ) ;
}
} ) ;
}
catch ( IOException ex ) {
return Flux . error ( ex ) ;
}
catch ( IOException ex ) {
return Flux . error ( ex ) ;
}
} ) ;
}
@Override
public DataBuffer encodeValue ( Object value , DataBufferFactory bufferFactory ,
ResolvableType valueType , @Nullable MimeType mimeType , @Nullable Map < String , Object > hints ) {
ObjectEncodingTools encodingTools = createEncodingTools ( value , valueType , mimeType , hints ) ;
ObjectWriter writer = encodingTools . writer ( ) ;
writer = customizeWriter ( writer , mimeType , valueType , hints ) ;
return encodeValue ( value , encodingTools . mapper ( ) , writer , bufferFactory , mimeType , hints ) ;
}
private DataBuffer encodeValue ( Object value , ObjectMapper mapper , ObjectWriter writer ,
DataBufferFactory bufferFactory , @Nullable MimeType mimeType , @Nullable Map < String , Object > hints ) {
Class < ? > jsonView = null ;
FilterProvider filters = null ;
if ( value instanceof MappingJacksonValue mappingJacksonValue ) {
value = mappingJacksonValue . getValue ( ) ;
valueType = ResolvableType . forInstance ( value ) ;
jsonView = mappingJacksonValue . getSerializationView ( ) ;
filters = mappingJacksonValue . getFilters ( ) ;
}
ObjectMapper mapper = selectObjectMapper ( valueType , mimeType ) ;
if ( mapper = = null ) {
throw new IllegalStateException ( "No ObjectMapper for " + valueType ) ;
}
ObjectWriter writer = createObjectWriter ( mapper , valueType , mimeType , jsonView , hints ) ;
if ( filters ! = null ) {
writer = writer . with ( filters ) ;
}
@ -324,35 +331,6 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple
@@ -324,35 +331,6 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple
}
}
private Mono < ObjectEncodingTools > createEncodingToolsForStream ( Object value , ResolvableType valueType ,
@Nullable MimeType mimeType , @Nullable Map < String , Object > hints ) {
try {
ObjectEncodingTools encodingTools = createEncodingTools ( value , valueType , mimeType , hints ) ;
ObjectWriter objectWriter = encodingTools . writer ( ) ;
return customizeWriterFromStream ( objectWriter , mimeType , valueType , hints )
. map ( customizedWriter - > new ObjectEncodingTools ( encodingTools . mapper ( ) , customizedWriter ) ) ;
}
catch ( IllegalStateException ex ) {
return Mono . error ( ex ) ;
}
}
private ObjectEncodingTools createEncodingTools ( Object value , ResolvableType valueType ,
@Nullable MimeType mimeType , @Nullable Map < String , Object > hints ) {
Class < ? > jsonView = null ;
if ( value instanceof MappingJacksonValue mappingJacksonValue ) {
valueType = ResolvableType . forInstance ( mappingJacksonValue . getValue ( ) ) ;
jsonView = mappingJacksonValue . getSerializationView ( ) ;
}
ObjectMapper mapper = selectObjectMapper ( valueType , mimeType ) ;
if ( mapper = = null ) {
throw new IllegalStateException ( "No ObjectMapper for " + valueType ) ;
}
ObjectWriter writer = createObjectWriter ( mapper , valueType , mimeType , jsonView , hints ) ;
return new ObjectEncodingTools ( mapper , writer ) ;
}
private ObjectWriter createObjectWriter (
ObjectMapper mapper , ResolvableType valueType , @Nullable MimeType mimeType ,
@Nullable Class < ? > jsonView , @Nullable Map < String , Object > hints ) {
@ -365,32 +343,23 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple
@@ -365,32 +343,23 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple
if ( javaType . isContainerType ( ) ) {
writer = writer . forType ( javaType ) ;
}
return writer ;
return customizeWriter ( writer , mimeType , valueType , hints ) ;
}
/ * *
* Provides the ability for subclasses to customize the { @link ObjectWriter } for serialization from a stream .
* @param writer the { @link ObjectWriter } available for customization
* @param mimeType the MIME type associated with the input stream
* @param elementType the expected type of elements in the output stream
* @param hints additional information about how to do encode
* @return the customized { @link ObjectWriter }
* /
protected Mono < ObjectWriter > customizeWriterFromStream ( @NonNull ObjectWriter writer , @Nullable MimeType mimeType ,
ResolvableType elementType , @Nullable Map < String , Object > hints ) {
return Mono . just ( customizeWriter ( writer , mimeType , elementType , hints ) ) ;
}
/ * *
* Provides the ability for subclasses to customize the { @link ObjectWriter } for serialization .
* @param writer the { @link ObjectWriter } available for customization
* @param mimeType the MIME type associated with the input stream
* @param elementType the expected type of elements in the output stream
* @param hints additional information about how to do encode
* @return the customized { @link ObjectWriter }
* Subclasses can use this method to customize { @link ObjectWriter } used
* for writing values .
* @param writer the writer instance to customize
* @param mimeType the selected MIME type
* @param elementType the type of element values to write
* @param hints a map with serialization hints ;
* the Reactor Context , when available , may be accessed under the key
* { @code ContextView . class . getName ( ) }
* @return the customized { @code ObjectWriter } to use
* /
protected ObjectWriter customizeWriter ( ObjectWriter writer , @Nullable MimeType mimeType ,
ResolvableType elementType , @Nullable Map < String , Object > hints ) {
return writer ;
}
@ -489,8 +458,4 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple
@@ -489,8 +458,4 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple
}
}
private record ObjectEncodingTools ( ObjectMapper mapper , ObjectWriter writer ) {
}
}