@ -21,7 +21,10 @@ import java.io.OutputStream;
import java.nio.ByteBuffer ;
import java.nio.ByteBuffer ;
import java.nio.charset.StandardCharsets ;
import java.nio.charset.StandardCharsets ;
import com.fasterxml.jackson.databind.JavaType ;
import com.fasterxml.jackson.databind.ObjectMapper ;
import com.fasterxml.jackson.databind.ObjectMapper ;
import com.fasterxml.jackson.databind.ObjectWriter ;
import com.fasterxml.jackson.databind.type.TypeFactory ;
import org.reactivestreams.Publisher ;
import org.reactivestreams.Publisher ;
import reactor.core.publisher.Flux ;
import reactor.core.publisher.Flux ;
import reactor.core.publisher.Mono ;
import reactor.core.publisher.Mono ;
@ -66,10 +69,13 @@ public class JacksonJsonEncoder extends AbstractEncoder<Object> {
public Flux < DataBuffer > encode ( Publisher < ? > inputStream ,
public Flux < DataBuffer > encode ( Publisher < ? > inputStream ,
DataBufferFactory bufferFactory , ResolvableType elementType , MimeType mimeType ,
DataBufferFactory bufferFactory , ResolvableType elementType , MimeType mimeType ,
Object . . . hints ) {
Object . . . hints ) {
Assert . notNull ( inputStream , "'inputStream' must not be null" ) ;
Assert . notNull ( bufferFactory , "'bufferFactory' must not be null" ) ;
Assert . notNull ( elementType , "'elementType' must not be null" ) ;
if ( inputStream instanceof Mono ) {
if ( inputStream instanceof Mono ) {
// single object
// single object
return Flux . from ( inputStream )
return Flux . from ( inputStream )
. map ( value - > serialize ( value , bufferFactory ) ) ;
. map ( value - > serialize ( value , bufferFactory , elementType ) ) ;
}
}
else {
else {
// array
// array
@ -81,7 +87,7 @@ public class JacksonJsonEncoder extends AbstractEncoder<Object> {
Mono . just ( bufferFactory . wrap ( END_ARRAY_BUFFER ) ) ;
Mono . just ( bufferFactory . wrap ( END_ARRAY_BUFFER ) ) ;
Flux < DataBuffer > serializedObjects = Flux . from ( inputStream )
Flux < DataBuffer > serializedObjects = Flux . from ( inputStream )
. map ( value - > serialize ( value , bufferFactory ) ) ;
. map ( value - > serialize ( value , bufferFactory , elementType ) ) ;
Flux < DataBuffer > array = Flux . zip ( serializedObjects , arraySeparators )
Flux < DataBuffer > array = Flux . zip ( serializedObjects , arraySeparators )
. flatMap ( tuple - > Flux . just ( tuple . getT1 ( ) , tuple . getT2 ( ) ) ) ;
. flatMap ( tuple - > Flux . just ( tuple . getT1 ( ) , tuple . getT2 ( ) ) ) ;
@ -92,11 +98,15 @@ public class JacksonJsonEncoder extends AbstractEncoder<Object> {
}
}
}
}
private DataBuffer serialize ( Object value , DataBufferFactory dataBufferFactory ) {
private DataBuffer serialize ( Object value , DataBufferFactory dataBufferFactory ,
ResolvableType type ) {
TypeFactory typeFactory = this . mapper . getTypeFactory ( ) ;
JavaType javaType = typeFactory . constructType ( type . getType ( ) ) ;
ObjectWriter writer = this . mapper . writerFor ( javaType ) ;
DataBuffer buffer = dataBufferFactory . allocateBuffer ( ) ;
DataBuffer buffer = dataBufferFactory . allocateBuffer ( ) ;
OutputStream outputStream = buffer . asOutputStream ( ) ;
OutputStream outputStream = buffer . asOutputStream ( ) ;
try {
try {
this . mapper . writeValue ( outputStream , value ) ;
writ er. writeValue ( outputStream , value ) ;
}
}
catch ( IOException e ) {
catch ( IOException e ) {
throw new CodecException ( "Error while writing the data" , e ) ;
throw new CodecException ( "Error while writing the data" , e ) ;