|
|
|
|
@ -34,8 +34,9 @@ import org.springframework.core.io.buffer.DataBufferUtils;
@@ -34,8 +34,9 @@ import org.springframework.core.io.buffer.DataBufferUtils;
|
|
|
|
|
import org.springframework.util.Assert; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Function that transforms an arbitrary split byte stream representing JSON objects into a |
|
|
|
|
* {@code Flux<TokenBuffer>}, where each token buffer is a well-formed JSON object. |
|
|
|
|
* {@link Function} to transform a JSON stream of arbitrary size, byte array |
|
|
|
|
* chunks into a {@code Flux<TokenBuffer>} where each token buffer is a |
|
|
|
|
* well-formed JSON object. |
|
|
|
|
* |
|
|
|
|
* @author Arjen Poutsma |
|
|
|
|
* @since 5.0 |
|
|
|
|
@ -53,14 +54,16 @@ class Jackson2Tokenizer implements Function<DataBuffer, Flux<TokenBuffer>> {
@@ -53,14 +54,16 @@ class Jackson2Tokenizer implements Function<DataBuffer, Flux<TokenBuffer>> {
|
|
|
|
|
private int arrayDepth; |
|
|
|
|
|
|
|
|
|
// TODO: change to ByteBufferFeeder when supported by Jackson
|
|
|
|
|
private ByteArrayFeeder inputFeeder; |
|
|
|
|
private final ByteArrayFeeder inputFeeder; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Create a new instance of the {@code Jackson2Tokenizer}. |
|
|
|
|
* @param parser the non-blocking parser, obtained via |
|
|
|
|
* {@link com.fasterxml.jackson.core.JsonFactory#createNonBlockingByteArrayParser} |
|
|
|
|
* @param tokenizeArrayElements if {@code true} and the "top level" JSON object is an array, |
|
|
|
|
* each of its elements is returned individually and immediately after it was fully received |
|
|
|
|
* @param tokenizeArrayElements if {@code true} and the "top level" JSON |
|
|
|
|
* object is an array, each element is returned individually, immediately |
|
|
|
|
* after it is received. |
|
|
|
|
*/ |
|
|
|
|
public Jackson2Tokenizer(JsonParser parser, boolean tokenizeArrayElements) { |
|
|
|
|
Assert.notNull(parser, "'parser' must not be null"); |
|
|
|
|
@ -71,6 +74,7 @@ class Jackson2Tokenizer implements Function<DataBuffer, Flux<TokenBuffer>> {
@@ -71,6 +74,7 @@ class Jackson2Tokenizer implements Function<DataBuffer, Flux<TokenBuffer>> {
|
|
|
|
|
this.inputFeeder = (ByteArrayFeeder) this.parser.getNonBlockingInputFeeder(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public Flux<TokenBuffer> apply(DataBuffer dataBuffer) { |
|
|
|
|
byte[] bytes = new byte[dataBuffer.readableByteCount()]; |
|
|
|
|
@ -86,7 +90,7 @@ class Jackson2Tokenizer implements Function<DataBuffer, Flux<TokenBuffer>> {
@@ -86,7 +90,7 @@ class Jackson2Tokenizer implements Function<DataBuffer, Flux<TokenBuffer>> {
|
|
|
|
|
if (token == JsonToken.NOT_AVAILABLE) { |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
calculateDepth(token); |
|
|
|
|
updateDepth(token); |
|
|
|
|
|
|
|
|
|
if (!this.tokenizeArrayElements) { |
|
|
|
|
processTokenNormal(token, result); |
|
|
|
|
@ -106,11 +110,7 @@ class Jackson2Tokenizer implements Function<DataBuffer, Flux<TokenBuffer>> {
@@ -106,11 +110,7 @@ class Jackson2Tokenizer implements Function<DataBuffer, Flux<TokenBuffer>> {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public void endOfInput() { |
|
|
|
|
this.inputFeeder.endOfInput(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void calculateDepth(JsonToken token) { |
|
|
|
|
private void updateDepth(JsonToken token) { |
|
|
|
|
switch (token) { |
|
|
|
|
case START_OBJECT: |
|
|
|
|
this.objectDepth++; |
|
|
|
|
@ -149,7 +149,10 @@ class Jackson2Tokenizer implements Function<DataBuffer, Flux<TokenBuffer>> {
@@ -149,7 +149,10 @@ class Jackson2Tokenizer implements Function<DataBuffer, Flux<TokenBuffer>> {
|
|
|
|
|
result.add(this.tokenBuffer); |
|
|
|
|
this.tokenBuffer = new TokenBuffer(this.parser); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public void endOfInput() { |
|
|
|
|
this.inputFeeder.endOfInput(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|