|
|
|
|
@ -82,11 +82,7 @@ public class Jackson2JsonDecoder extends Jackson2CodecSupport implements HttpMes
@@ -82,11 +82,7 @@ public class Jackson2JsonDecoder extends Jackson2CodecSupport implements HttpMes
|
|
|
|
|
public Flux<Object> decode(Publisher<DataBuffer> input, ResolvableType elementType, |
|
|
|
|
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) { |
|
|
|
|
|
|
|
|
|
Jackson2Tokenizer tokenizer = new Jackson2Tokenizer(nonBlockingParser(), true); |
|
|
|
|
Flux<TokenBuffer> tokens = Flux.from(input) |
|
|
|
|
.flatMap(tokenizer) |
|
|
|
|
.doFinally(t -> tokenizer.endOfInput()); |
|
|
|
|
|
|
|
|
|
Flux<TokenBuffer> tokens = tokenize(input, true); |
|
|
|
|
return decodeInternal(tokens, elementType, mimeType, hints); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -94,14 +90,26 @@ public class Jackson2JsonDecoder extends Jackson2CodecSupport implements HttpMes
@@ -94,14 +90,26 @@ public class Jackson2JsonDecoder extends Jackson2CodecSupport implements HttpMes
|
|
|
|
|
public Mono<Object> decodeToMono(Publisher<DataBuffer> input, ResolvableType elementType, |
|
|
|
|
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) { |
|
|
|
|
|
|
|
|
|
Jackson2Tokenizer tokenizer = new Jackson2Tokenizer(nonBlockingParser(), false); |
|
|
|
|
Flux<TokenBuffer> tokens = Flux.from(input) |
|
|
|
|
.flatMap(tokenizer) |
|
|
|
|
.doFinally(t -> tokenizer.endOfInput()); |
|
|
|
|
|
|
|
|
|
Flux<TokenBuffer> tokens = tokenize(input, false); |
|
|
|
|
return decodeInternal(tokens, elementType, mimeType, hints).singleOrEmpty(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private Flux<TokenBuffer> tokenize(Publisher<DataBuffer> input, boolean tokenizeArrayElements) { |
|
|
|
|
try { |
|
|
|
|
JsonFactory factory = objectMapper().getFactory(); |
|
|
|
|
JsonParser nonBlockingParser = factory.createNonBlockingByteArrayParser(); |
|
|
|
|
Jackson2Tokenizer tokenizer = new Jackson2Tokenizer(nonBlockingParser, |
|
|
|
|
tokenizeArrayElements); |
|
|
|
|
return Flux.from(input) |
|
|
|
|
.flatMap(tokenizer) |
|
|
|
|
.doFinally(t -> tokenizer.endOfInput()); |
|
|
|
|
} |
|
|
|
|
catch (IOException ex) { |
|
|
|
|
return Flux.error(new UncheckedIOException(ex)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private Flux<Object> decodeInternal(Flux<TokenBuffer> tokens, |
|
|
|
|
ResolvableType elementType, @Nullable MimeType mimeType, |
|
|
|
|
@Nullable Map<String, Object> hints) { |
|
|
|
|
@ -118,19 +126,18 @@ public class Jackson2JsonDecoder extends Jackson2CodecSupport implements HttpMes
@@ -118,19 +126,18 @@ public class Jackson2JsonDecoder extends Jackson2CodecSupport implements HttpMes
|
|
|
|
|
objectMapper().readerWithView(jsonView).forType(javaType) : |
|
|
|
|
objectMapper().readerFor(javaType)); |
|
|
|
|
|
|
|
|
|
return tokens.flatMap(tokenBuffer -> { |
|
|
|
|
return tokens.map(tokenBuffer -> { |
|
|
|
|
try { |
|
|
|
|
Object value = reader.readValue(tokenBuffer.asParser()); |
|
|
|
|
return Mono.just(value); |
|
|
|
|
return reader.readValue(tokenBuffer.asParser()); |
|
|
|
|
} |
|
|
|
|
catch (InvalidDefinitionException ex) { |
|
|
|
|
return Mono.error(new CodecException("Type definition error: " + ex.getType(), ex)); |
|
|
|
|
throw new CodecException("Type definition error: " + ex.getType(), ex); |
|
|
|
|
} |
|
|
|
|
catch (JsonProcessingException ex) { |
|
|
|
|
return Mono.error(new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex)); |
|
|
|
|
throw new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex); |
|
|
|
|
} |
|
|
|
|
catch (IOException ex) { |
|
|
|
|
return Mono.error(new DecodingException("I/O error while parsing input stream", ex)); |
|
|
|
|
throw new DecodingException("I/O error while parsing input stream", ex); |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
@ -150,13 +157,4 @@ public class Jackson2JsonDecoder extends Jackson2CodecSupport implements HttpMes
@@ -150,13 +157,4 @@ public class Jackson2JsonDecoder extends Jackson2CodecSupport implements HttpMes
|
|
|
|
|
return parameter.getParameterAnnotation(annotType); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private JsonParser nonBlockingParser() { |
|
|
|
|
try { |
|
|
|
|
JsonFactory factory = this.objectMapper().getFactory(); |
|
|
|
|
return factory.createNonBlockingByteArrayParser(); |
|
|
|
|
} |
|
|
|
|
catch (IOException ex) { |
|
|
|
|
throw new UncheckedIOException(ex); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|