diff --git a/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2JsonDecoder.java b/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2JsonDecoder.java index 4a383997382..63e0cf85eff 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2JsonDecoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2JsonDecoder.java @@ -82,8 +82,10 @@ public class Jackson2JsonDecoder extends Jackson2CodecSupport implements HttpMes public Flux decode(Publisher input, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { + Jackson2Tokenizer tokenizer = new Jackson2Tokenizer(nonBlockingParser(), true); Flux tokens = Flux.from(input) - .flatMap(new Jackson2Tokenizer(nonBlockingParser(), true)); + .flatMap(tokenizer) + .doFinally(t -> tokenizer.endOfInput()); return decodeInternal(tokens, elementType, mimeType, hints); } @@ -92,8 +94,10 @@ public class Jackson2JsonDecoder extends Jackson2CodecSupport implements HttpMes public Mono decodeToMono(Publisher input, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { + Jackson2Tokenizer tokenizer = new Jackson2Tokenizer(nonBlockingParser(), false); Flux tokens = Flux.from(input) - .flatMap(new Jackson2Tokenizer(nonBlockingParser(), false)); + .flatMap(tokenizer) + .doFinally(t -> tokenizer.endOfInput()); return decodeInternal(tokens, elementType, mimeType, hints).singleOrEmpty(); } diff --git a/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2Tokenizer.java b/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2Tokenizer.java index d164bde6ffd..2c6657b738b 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2Tokenizer.java +++ b/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2Tokenizer.java @@ -106,6 +106,10 @@ class Jackson2Tokenizer implements Function> { } } + public void endOfInput() { + this.inputFeeder.endOfInput(); + } + private void calculateDepth(JsonToken token) { switch (token) { case START_OBJECT: diff --git a/spring-web/src/main/java/org/springframework/http/codec/xml/XmlEventDecoder.java b/spring-web/src/main/java/org/springframework/http/codec/xml/XmlEventDecoder.java index e40314e3873..a2640b7d3e1 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/xml/XmlEventDecoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/xml/XmlEventDecoder.java @@ -97,7 +97,9 @@ public class XmlEventDecoder extends AbstractDecoder { Flux flux = Flux.from(inputStream); if (useAalto) { - return flux.flatMap(new AaltoDataBufferToXmlEvent()); + AaltoDataBufferToXmlEvent aaltoMapper = new AaltoDataBufferToXmlEvent(); + return flux.flatMap(aaltoMapper) + .doFinally(signalType -> aaltoMapper.endOfInput()); } else { Mono singleBuffer = flux.reduce(DataBuffer::write); @@ -158,6 +160,10 @@ public class XmlEventDecoder extends AbstractDecoder { DataBufferUtils.release(dataBuffer); } } + + public void endOfInput() { + this.streamReader.getInputFeeder().endOfInput(); + } } }