From ce568468aed09147e335b5d5a717e1b2dac581a8 Mon Sep 17 00:00:00 2001 From: rstoyanchev Date: Wed, 18 May 2022 17:11:30 +0100 Subject: [PATCH] Refine JSON encoding of non-streaming Flux Closes gh-28398 --- .../codec/json/AbstractJackson2Encoder.java | 129 ++++++++++++------ .../codec/json/Jackson2JsonEncoderTests.java | 38 +++--- 2 files changed, 109 insertions(+), 58 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/codec/json/AbstractJackson2Encoder.java b/spring-web/src/main/java/org/springframework/http/codec/json/AbstractJackson2Encoder.java index 4ae48550181..3f4b7558956 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/json/AbstractJackson2Encoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/json/AbstractJackson2Encoder.java @@ -35,6 +35,7 @@ import com.fasterxml.jackson.databind.ObjectWriter; import com.fasterxml.jackson.databind.SequenceWriter; import com.fasterxml.jackson.databind.exc.InvalidDefinitionException; import com.fasterxml.jackson.databind.ser.FilterProvider; +import org.apache.commons.logging.Log; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -70,6 +71,8 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple private static final byte[] NEWLINE_SEPARATOR = {'\n'}; + private static final byte[] EMPTY_BYTES = new byte[0]; + private static final Map ENCODINGS; static { @@ -150,45 +153,47 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple .map(value -> encodeValue(value, bufferFactory, elementType, mimeType, hints)) .flux(); } - else { + + try { + ObjectMapper mapper = selectObjectMapper(elementType, mimeType); + if (mapper == null) { + throw new IllegalStateException("No ObjectMapper for " + elementType); + } + ObjectWriter writer = createObjectWriter(mapper, elementType, mimeType, null, hints); + ByteArrayBuilder byteBuilder = new ByteArrayBuilder(writer.getFactory()._getBufferRecycler()); + JsonEncoding encoding = getJsonEncoding(mimeType); + JsonGenerator generator = mapper.getFactory().createGenerator(byteBuilder, encoding); + SequenceWriter sequenceWriter = writer.writeValues(generator); + byte[] separator = getStreamingMediaTypeSeparator(mimeType); - if (separator != null) { // streaming - try { - ObjectMapper mapper = selectObjectMapper(elementType, mimeType); - if (mapper == null) { - throw new IllegalStateException("No ObjectMapper for " + elementType); - } - ObjectWriter writer = createObjectWriter(mapper, elementType, mimeType, null, hints); - ByteArrayBuilder byteBuilder = new ByteArrayBuilder(writer.getFactory()._getBufferRecycler()); - JsonEncoding encoding = getJsonEncoding(mimeType); - JsonGenerator generator = mapper.getFactory().createGenerator(byteBuilder, encoding); - SequenceWriter sequenceWriter = writer.writeValues(generator); - - return Flux.from(inputStream) - .map(value -> encodeStreamingValue(value, bufferFactory, hints, sequenceWriter, byteBuilder, - separator)) - .doAfterTerminate(() -> { - try { - byteBuilder.release(); - generator.close(); - } - catch (IOException ex) { - logger.error("Could not close Encoder resources", ex); - } - }); - } - catch (IOException ex) { - return Flux.error(ex); - } + Flux dataBufferFlux; + + if (separator != null) { + dataBufferFlux = Flux.from(inputStream).map(value -> encodeStreamingValue( + value, bufferFactory, hints, sequenceWriter, byteBuilder, EMPTY_BYTES, separator)); } - else { // non-streaming - ResolvableType listType = ResolvableType.forClassWithGenerics(List.class, elementType); - return Flux.from(inputStream) - .collectList() - .map(list -> encodeValue(list, bufferFactory, listType, mimeType, hints)) - .flux(); + else { + JsonArrayJoinHelper helper = new JsonArrayJoinHelper(); + return Flux.concat( + helper.getPrefix(bufferFactory, hints, logger), + Flux.from(inputStream).map(value -> encodeStreamingValue( + value, bufferFactory, hints, sequenceWriter, byteBuilder, helper.getDelimiter(), EMPTY_BYTES)), + helper.getSuffix(bufferFactory, hints, logger)); } + return dataBufferFlux + .doAfterTerminate(() -> { + try { + byteBuilder.release(); + generator.close(); + } + catch (IOException ex) { + logger.error("Could not close Encoder resources", ex); + } + }); + } + catch (IOException ex) { + return Flux.error(ex); } } @@ -247,8 +252,10 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple } } - private DataBuffer encodeStreamingValue(Object value, DataBufferFactory bufferFactory, @Nullable Map hints, - SequenceWriter sequenceWriter, ByteArrayBuilder byteArrayBuilder, byte[] separator) { + private DataBuffer encodeStreamingValue( + Object value, DataBufferFactory bufferFactory, @Nullable Map hints, + SequenceWriter sequenceWriter, ByteArrayBuilder byteArrayBuilder, + byte[] prefix, byte[] suffix) { logValue(hints, value); @@ -280,9 +287,14 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple offset = 0; length = bytes.length; } - DataBuffer buffer = bufferFactory.allocateBuffer(length + separator.length); + DataBuffer buffer = bufferFactory.allocateBuffer(length + prefix.length + suffix.length); + if (prefix.length != 0) { + buffer.write(prefix); + } buffer.write(bytes, offset, length); - buffer.write(separator); + if (suffix.length != 0) { + buffer.write(suffix); + } Hints.touchDataBuffer(buffer, hints, logger); return buffer; @@ -385,4 +397,43 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple return parameter.getMethodAnnotation(annotType); } + + private static class JsonArrayJoinHelper { + + private static final byte[] COMMA_SEPARATOR = {','}; + + private static final byte[] OPEN_BRACKET = {'['}; + + private static final byte[] CLOSE_BRACKET = {']'}; + + + private boolean afterFirstItem = false; + + public byte[] getDelimiter() { + if (this.afterFirstItem) { + return COMMA_SEPARATOR; + } + this.afterFirstItem = true; + return EMPTY_BYTES; + } + + public Mono getPrefix(DataBufferFactory factory, @Nullable Map hints, Log logger) { + return wrapBytes(OPEN_BRACKET, factory, hints, logger); + } + + public Mono getSuffix(DataBufferFactory factory, @Nullable Map hints, Log logger) { + return wrapBytes(CLOSE_BRACKET, factory, hints, logger); + } + + private Mono wrapBytes( + byte[] bytes, DataBufferFactory bufferFactory, @Nullable Map hints, Log logger) { + + return Mono.fromCallable(() -> { + DataBuffer buffer = bufferFactory.wrap(bytes); + Hints.touchDataBuffer(buffer, hints, logger); + return buffer; + }); + } + } + } diff --git a/spring-web/src/test/java/org/springframework/http/codec/json/Jackson2JsonEncoderTests.java b/spring-web/src/test/java/org/springframework/http/codec/json/Jackson2JsonEncoderTests.java index b66084e8299..75c949e4b08 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/json/Jackson2JsonEncoderTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/json/Jackson2JsonEncoderTests.java @@ -33,7 +33,6 @@ import reactor.test.StepVerifier; import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.testfixture.codec.AbstractEncoderTests; import org.springframework.http.MediaType; import org.springframework.http.codec.ServerSentEvent; @@ -138,11 +137,11 @@ public class Jackson2JsonEncoderTests extends AbstractEncoderTests step - .consumeNextWith(expectString("[" + - "{\"foo\":\"foo\",\"bar\":\"bar\"}," + - "{\"foo\":\"foofoo\",\"bar\":\"barbar\"}," + - "{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}]") - .andThen(DataBufferUtils::release)) + .consumeNextWith(expectString("[")) + .consumeNextWith(expectString("{\"foo\":\"foo\",\"bar\":\"bar\"}")) + .consumeNextWith(expectString(",{\"foo\":\"foofoo\",\"bar\":\"barbar\"}")) + .consumeNextWith(expectString(",{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}")) + .consumeNextWith(expectString("]")) .verifyComplete()); } @@ -151,8 +150,10 @@ public class Jackson2JsonEncoderTests extends AbstractEncoderTests input = Flux.just(new Foo(), new Bar()); testEncode(input, ParentClass.class, step -> step - .consumeNextWith(expectString("[{\"type\":\"foo\"},{\"type\":\"bar\"}]") - .andThen(DataBufferUtils::release)) + .consumeNextWith(expectString("[")) + .consumeNextWith(expectString("{\"type\":\"foo\"}")) + .consumeNextWith(expectString(",{\"type\":\"bar\"}")) + .consumeNextWith(expectString("]")) .verifyComplete()); } @@ -169,12 +170,9 @@ public class Jackson2JsonEncoderTests extends AbstractEncoderTests step - .consumeNextWith(expectString("{\"foo\":\"foo\",\"bar\":\"bar\"}\n") - .andThen(DataBufferUtils::release)) - .consumeNextWith(expectString("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}\n") - .andThen(DataBufferUtils::release)) - .consumeNextWith(expectString("{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}\n") - .andThen(DataBufferUtils::release)) + .consumeNextWith(expectString("{\"foo\":\"foo\",\"bar\":\"bar\"}\n")) + .consumeNextWith(expectString("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}\n")) + .consumeNextWith(expectString("{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}\n")) .verifyComplete() ); } @@ -191,7 +189,7 @@ public class Jackson2JsonEncoderTests extends AbstractEncoderTests hints = singletonMap(JSON_VIEW_HINT, MyJacksonView1.class); testEncode(input, type, null, hints, step -> step - .consumeNextWith(expectString("{\"withView1\":\"with\"}").andThen(DataBufferUtils::release)) + .consumeNextWith(expectString("{\"withView1\":\"with\"}")) .verifyComplete() ); } @@ -208,7 +206,7 @@ public class Jackson2JsonEncoderTests extends AbstractEncoderTests hints = singletonMap(JSON_VIEW_HINT, MyJacksonView3.class); testEncode(input, type, null, hints, step -> step - .consumeNextWith(expectString("{\"withoutView\":\"without\"}").andThen(DataBufferUtils::release)) + .consumeNextWith(expectString("{\"withoutView\":\"without\"}")) .verifyComplete() ); } @@ -226,7 +224,7 @@ public class Jackson2JsonEncoderTests extends AbstractEncoderTests step - .consumeNextWith(expectString("{\"withView1\":\"with\"}").andThen(DataBufferUtils::release)) + .consumeNextWith(expectString("{\"withView1\":\"with\"}")) .verifyComplete() ); } @@ -250,7 +248,7 @@ public class Jackson2JsonEncoderTests extends AbstractEncoderTests step .consumeNextWith(expectString("{" + ls + " \"withView1\" : \"with\"" + ls + "}") - .andThen(DataBufferUtils::release)) + ) .verifyComplete() ); } @@ -265,7 +263,9 @@ public class Jackson2JsonEncoderTests extends AbstractEncoderTests