|
|
|
|
@ -28,6 +28,7 @@ import com.fasterxml.jackson.core.JsonToken;
@@ -28,6 +28,7 @@ import com.fasterxml.jackson.core.JsonToken;
|
|
|
|
|
import com.fasterxml.jackson.core.async.ByteArrayFeeder; |
|
|
|
|
import com.fasterxml.jackson.databind.DeserializationContext; |
|
|
|
|
import com.fasterxml.jackson.databind.util.TokenBuffer; |
|
|
|
|
import reactor.core.Exceptions; |
|
|
|
|
import reactor.core.publisher.Flux; |
|
|
|
|
|
|
|
|
|
import org.springframework.core.codec.DecodingException; |
|
|
|
|
@ -74,7 +75,7 @@ final class Jackson2Tokenizer {
@@ -74,7 +75,7 @@ final class Jackson2Tokenizer {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private Flux<TokenBuffer> tokenize(DataBuffer dataBuffer) { |
|
|
|
|
private List<TokenBuffer> tokenize(DataBuffer dataBuffer) { |
|
|
|
|
byte[] bytes = new byte[dataBuffer.readableByteCount()]; |
|
|
|
|
dataBuffer.read(bytes); |
|
|
|
|
DataBufferUtils.release(dataBuffer); |
|
|
|
|
@ -84,27 +85,29 @@ final class Jackson2Tokenizer {
@@ -84,27 +85,29 @@ final class Jackson2Tokenizer {
|
|
|
|
|
return parseTokenBufferFlux(); |
|
|
|
|
} |
|
|
|
|
catch (JsonProcessingException ex) { |
|
|
|
|
return Flux.error(new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex)); |
|
|
|
|
throw new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex); |
|
|
|
|
} |
|
|
|
|
catch (IOException ex) { |
|
|
|
|
return Flux.error(ex); |
|
|
|
|
throw Exceptions.propagate(ex); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private Flux<TokenBuffer> endOfInput() { |
|
|
|
|
this.inputFeeder.endOfInput(); |
|
|
|
|
try { |
|
|
|
|
return parseTokenBufferFlux(); |
|
|
|
|
} |
|
|
|
|
catch (JsonProcessingException ex) { |
|
|
|
|
return Flux.error(new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex)); |
|
|
|
|
} |
|
|
|
|
catch (IOException ex) { |
|
|
|
|
return Flux.error(ex); |
|
|
|
|
} |
|
|
|
|
return Flux.defer(() -> { |
|
|
|
|
this.inputFeeder.endOfInput(); |
|
|
|
|
try { |
|
|
|
|
return Flux.fromIterable(parseTokenBufferFlux()); |
|
|
|
|
} |
|
|
|
|
catch (JsonProcessingException ex) { |
|
|
|
|
throw new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex); |
|
|
|
|
} |
|
|
|
|
catch (IOException ex) { |
|
|
|
|
throw Exceptions.propagate(ex); |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private Flux<TokenBuffer> parseTokenBufferFlux() throws IOException { |
|
|
|
|
private List<TokenBuffer> parseTokenBufferFlux() throws IOException { |
|
|
|
|
List<TokenBuffer> result = new ArrayList<>(); |
|
|
|
|
|
|
|
|
|
while (true) { |
|
|
|
|
@ -122,7 +125,7 @@ final class Jackson2Tokenizer {
@@ -122,7 +125,7 @@ final class Jackson2Tokenizer {
|
|
|
|
|
processTokenArray(token, result); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return Flux.fromIterable(result); |
|
|
|
|
return result; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void updateDepth(JsonToken token) { |
|
|
|
|
@ -184,7 +187,7 @@ final class Jackson2Tokenizer {
@@ -184,7 +187,7 @@ final class Jackson2Tokenizer {
|
|
|
|
|
try { |
|
|
|
|
JsonParser parser = jsonFactory.createNonBlockingByteArrayParser(); |
|
|
|
|
Jackson2Tokenizer tokenizer = new Jackson2Tokenizer(parser, deserializationContext, tokenizeArrayElements); |
|
|
|
|
return dataBuffers.flatMap(tokenizer::tokenize, Flux::error, tokenizer::endOfInput); |
|
|
|
|
return dataBuffers.concatMapIterable(tokenizer::tokenize).concatWith(tokenizer.endOfInput()); |
|
|
|
|
} |
|
|
|
|
catch (IOException ex) { |
|
|
|
|
return Flux.error(ex); |
|
|
|
|
|