@ -1,5 +1,5 @@
@@ -1,5 +1,5 @@
/ *
* Copyright 2002 - 2018 the original author or authors .
* Copyright 2002 - 2019 the original author or authors .
*
* Licensed under the Apache License , Version 2 . 0 ( the "License" ) ;
* you may not use this file except in compliance with the License .
@ -26,13 +26,13 @@ import com.fasterxml.jackson.core.JsonParser;
@@ -26,13 +26,13 @@ import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException ;
import com.fasterxml.jackson.core.JsonToken ;
import com.fasterxml.jackson.core.async.ByteArrayFeeder ;
import com.fasterxml.jackson.databind.DeserializationContext ;
import com.fasterxml.jackson.databind.util.TokenBuffer ;
import reactor.core.publisher.Flux ;
import org.springframework.core.codec.DecodingException ;
import org.springframework.core.io.buffer.DataBuffer ;
import org.springframework.core.io.buffer.DataBufferUtils ;
import org.springframework.util.Assert ;
/ * *
* { @link Function } to transform a JSON stream of arbitrary size , byte array
@ -40,6 +40,8 @@ import org.springframework.util.Assert;
@@ -40,6 +40,8 @@ import org.springframework.util.Assert;
* well - formed JSON object .
*
* @author Arjen Poutsma
* @author Rossen Stoyanchev
* @author Juergen Hoeller
* @since 5 . 0
* /
final class Jackson2Tokenizer {
@ -59,36 +61,15 @@ final class Jackson2Tokenizer {
@@ -59,36 +61,15 @@ final class Jackson2Tokenizer {
private final ByteArrayFeeder inputFeeder ;
private Jackson2Tokenizer ( JsonParser parser , boolean tokenizeArrayElements ) {
Assert . notNull ( parser , "'parser' must not be null" ) ;
private Jackson2Tokenizer (
JsonParser parser , DeserializationContext deserializationContext , boolean tokenizeArrayElements ) {
this . parser = parser ;
this . tokenizeArrayElements = tokenizeArrayElements ;
this . tokenBuffer = new TokenBuffer ( parser ) ;
this . tokenBuffer = new TokenBuffer ( parser , deserializationContext ) ;
this . inputFeeder = ( ByteArrayFeeder ) this . parser . getNonBlockingInputFeeder ( ) ;
}
/ * *
* Tokenize the given { @code Flux < DataBuffer > } into { @code Flux < TokenBuffer > } .
* @param dataBuffers the source data buffers
* @param jsonFactory the factory to use
* @param tokenizeArrayElements if { @code true } and the "top level" JSON
* object is an array , each element is returned individually , immediately
* after it is received .
* @return the result token buffers
* /
public static Flux < TokenBuffer > tokenize ( Flux < DataBuffer > dataBuffers , JsonFactory jsonFactory ,
boolean tokenizeArrayElements ) {
try {
JsonParser parser = jsonFactory . createNonBlockingByteArrayParser ( ) ;
Jackson2Tokenizer tokenizer = new Jackson2Tokenizer ( parser , tokenizeArrayElements ) ;
return dataBuffers . flatMap ( tokenizer : : tokenize , Flux : : error , tokenizer : : endOfInput ) ;
}
catch ( IOException ex ) {
return Flux . error ( ex ) ;
}
}
private Flux < TokenBuffer > tokenize ( DataBuffer dataBuffer ) {
byte [ ] bytes = new byte [ dataBuffer . readableByteCount ( ) ] ;
@ -100,8 +81,7 @@ final class Jackson2Tokenizer {
@@ -100,8 +81,7 @@ final class Jackson2Tokenizer {
return parseTokenBufferFlux ( ) ;
}
catch ( JsonProcessingException ex ) {
return Flux . error ( new DecodingException (
"JSON decoding error: " + ex . getOriginalMessage ( ) , ex ) ) ;
return Flux . error ( new DecodingException ( "JSON decoding error: " + ex . getOriginalMessage ( ) , ex ) ) ;
}
catch ( IOException ex ) {
return Flux . error ( ex ) ;
@ -114,8 +94,7 @@ final class Jackson2Tokenizer {
@@ -114,8 +94,7 @@ final class Jackson2Tokenizer {
return parseTokenBufferFlux ( ) ;
}
catch ( JsonProcessingException ex ) {
return Flux . error ( new DecodingException (
"JSON decoding error: " + ex . getOriginalMessage ( ) , ex ) ) ;
return Flux . error ( new DecodingException ( "JSON decoding error: " + ex . getOriginalMessage ( ) , ex ) ) ;
}
catch ( IOException ex ) {
return Flux . error ( ex ) ;
@ -128,12 +107,11 @@ final class Jackson2Tokenizer {
@@ -128,12 +107,11 @@ final class Jackson2Tokenizer {
while ( true ) {
JsonToken token = this . parser . nextToken ( ) ;
// SPR-16151: Smile data format uses null to separate documents
if ( ( token = = JsonToken . NOT_AVAILABLE ) | |
if ( token = = JsonToken . NOT_AVAILABLE | |
( token = = null & & ( token = this . parser . nextToken ( ) ) = = null ) ) {
break ;
}
updateDepth ( token ) ;
if ( ! this . tokenizeArrayElements ) {
processTokenNormal ( token , result ) ;
}
@ -164,8 +142,7 @@ final class Jackson2Tokenizer {
@@ -164,8 +142,7 @@ final class Jackson2Tokenizer {
private void processTokenNormal ( JsonToken token , List < TokenBuffer > result ) throws IOException {
this . tokenBuffer . copyCurrentEvent ( this . parser ) ;
if ( ( token . isStructEnd ( ) | | token . isScalarValue ( ) ) & &
this . objectDepth = = 0 & & this . arrayDepth = = 0 ) {
if ( ( token . isStructEnd ( ) | | token . isScalarValue ( ) ) & & this . objectDepth = = 0 & & this . arrayDepth = = 0 ) {
result . add ( this . tokenBuffer ) ;
this . tokenBuffer = new TokenBuffer ( this . parser ) ;
}
@ -177,8 +154,7 @@ final class Jackson2Tokenizer {
@@ -177,8 +154,7 @@ final class Jackson2Tokenizer {
this . tokenBuffer . copyCurrentEvent ( this . parser ) ;
}
if ( this . objectDepth = = 0 & &
( this . arrayDepth = = 0 | | this . arrayDepth = = 1 ) & &
if ( this . objectDepth = = 0 & & ( this . arrayDepth = = 0 | | this . arrayDepth = = 1 ) & &
( token = = JsonToken . END_OBJECT | | token . isScalarValue ( ) ) ) {
result . add ( this . tokenBuffer ) ;
this . tokenBuffer = new TokenBuffer ( this . parser ) ;
@ -190,4 +166,26 @@ final class Jackson2Tokenizer {
@@ -190,4 +166,26 @@ final class Jackson2Tokenizer {
( token = = JsonToken . END_ARRAY & & this . arrayDepth = = 0 ) ) ;
}
/ * *
* Tokenize the given { @code Flux < DataBuffer > } into { @code Flux < TokenBuffer > } .
* @param dataBuffers the source data buffers
* @param jsonFactory the factory to use
* @param tokenizeArrayElements if { @code true } and the "top level" JSON object is
* an array , each element is returned individually immediately after it is received
* @return the resulting token buffers
* /
public static Flux < TokenBuffer > tokenize ( Flux < DataBuffer > dataBuffers , JsonFactory jsonFactory ,
DeserializationContext deserializationContext , boolean tokenizeArrayElements ) {
try {
JsonParser parser = jsonFactory . createNonBlockingByteArrayParser ( ) ;
Jackson2Tokenizer tokenizer = new Jackson2Tokenizer ( parser , deserializationContext , tokenizeArrayElements ) ;
return dataBuffers . flatMap ( tokenizer : : tokenize , Flux : : error , tokenizer : : endOfInput ) ;
}
catch ( IOException ex ) {
return Flux . error ( ex ) ;
}
}
}