Browse Source

Refine JSON encoding of non-streaming Flux

Closes gh-28398
pull/28515/head
rstoyanchev 4 years ago
parent
commit
ce568468ae
  1. 129
      spring-web/src/main/java/org/springframework/http/codec/json/AbstractJackson2Encoder.java
  2. 38
      spring-web/src/test/java/org/springframework/http/codec/json/Jackson2JsonEncoderTests.java

129
spring-web/src/main/java/org/springframework/http/codec/json/AbstractJackson2Encoder.java

@ -35,6 +35,7 @@ import com.fasterxml.jackson.databind.ObjectWriter; @@ -35,6 +35,7 @@ import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.SequenceWriter;
import com.fasterxml.jackson.databind.exc.InvalidDefinitionException;
import com.fasterxml.jackson.databind.ser.FilterProvider;
import org.apache.commons.logging.Log;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -70,6 +71,8 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple @@ -70,6 +71,8 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple
private static final byte[] NEWLINE_SEPARATOR = {'\n'};
private static final byte[] EMPTY_BYTES = new byte[0];
private static final Map<String, JsonEncoding> ENCODINGS;
static {
@ -150,45 +153,47 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple @@ -150,45 +153,47 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple
.map(value -> encodeValue(value, bufferFactory, elementType, mimeType, hints))
.flux();
}
else {
try {
ObjectMapper mapper = selectObjectMapper(elementType, mimeType);
if (mapper == null) {
throw new IllegalStateException("No ObjectMapper for " + elementType);
}
ObjectWriter writer = createObjectWriter(mapper, elementType, mimeType, null, hints);
ByteArrayBuilder byteBuilder = new ByteArrayBuilder(writer.getFactory()._getBufferRecycler());
JsonEncoding encoding = getJsonEncoding(mimeType);
JsonGenerator generator = mapper.getFactory().createGenerator(byteBuilder, encoding);
SequenceWriter sequenceWriter = writer.writeValues(generator);
byte[] separator = getStreamingMediaTypeSeparator(mimeType);
if (separator != null) { // streaming
try {
ObjectMapper mapper = selectObjectMapper(elementType, mimeType);
if (mapper == null) {
throw new IllegalStateException("No ObjectMapper for " + elementType);
}
ObjectWriter writer = createObjectWriter(mapper, elementType, mimeType, null, hints);
ByteArrayBuilder byteBuilder = new ByteArrayBuilder(writer.getFactory()._getBufferRecycler());
JsonEncoding encoding = getJsonEncoding(mimeType);
JsonGenerator generator = mapper.getFactory().createGenerator(byteBuilder, encoding);
SequenceWriter sequenceWriter = writer.writeValues(generator);
return Flux.from(inputStream)
.map(value -> encodeStreamingValue(value, bufferFactory, hints, sequenceWriter, byteBuilder,
separator))
.doAfterTerminate(() -> {
try {
byteBuilder.release();
generator.close();
}
catch (IOException ex) {
logger.error("Could not close Encoder resources", ex);
}
});
}
catch (IOException ex) {
return Flux.error(ex);
}
Flux<DataBuffer> dataBufferFlux;
if (separator != null) {
dataBufferFlux = Flux.from(inputStream).map(value -> encodeStreamingValue(
value, bufferFactory, hints, sequenceWriter, byteBuilder, EMPTY_BYTES, separator));
}
else { // non-streaming
ResolvableType listType = ResolvableType.forClassWithGenerics(List.class, elementType);
return Flux.from(inputStream)
.collectList()
.map(list -> encodeValue(list, bufferFactory, listType, mimeType, hints))
.flux();
else {
JsonArrayJoinHelper helper = new JsonArrayJoinHelper();
return Flux.concat(
helper.getPrefix(bufferFactory, hints, logger),
Flux.from(inputStream).map(value -> encodeStreamingValue(
value, bufferFactory, hints, sequenceWriter, byteBuilder, helper.getDelimiter(), EMPTY_BYTES)),
helper.getSuffix(bufferFactory, hints, logger));
}
return dataBufferFlux
.doAfterTerminate(() -> {
try {
byteBuilder.release();
generator.close();
}
catch (IOException ex) {
logger.error("Could not close Encoder resources", ex);
}
});
}
catch (IOException ex) {
return Flux.error(ex);
}
}
@ -247,8 +252,10 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple @@ -247,8 +252,10 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple
}
}
private DataBuffer encodeStreamingValue(Object value, DataBufferFactory bufferFactory, @Nullable Map<String, Object> hints,
SequenceWriter sequenceWriter, ByteArrayBuilder byteArrayBuilder, byte[] separator) {
private DataBuffer encodeStreamingValue(
Object value, DataBufferFactory bufferFactory, @Nullable Map<String, Object> hints,
SequenceWriter sequenceWriter, ByteArrayBuilder byteArrayBuilder,
byte[] prefix, byte[] suffix) {
logValue(hints, value);
@ -280,9 +287,14 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple @@ -280,9 +287,14 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple
offset = 0;
length = bytes.length;
}
DataBuffer buffer = bufferFactory.allocateBuffer(length + separator.length);
DataBuffer buffer = bufferFactory.allocateBuffer(length + prefix.length + suffix.length);
if (prefix.length != 0) {
buffer.write(prefix);
}
buffer.write(bytes, offset, length);
buffer.write(separator);
if (suffix.length != 0) {
buffer.write(suffix);
}
Hints.touchDataBuffer(buffer, hints, logger);
return buffer;
@ -385,4 +397,43 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple @@ -385,4 +397,43 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple
return parameter.getMethodAnnotation(annotType);
}
private static class JsonArrayJoinHelper {
private static final byte[] COMMA_SEPARATOR = {','};
private static final byte[] OPEN_BRACKET = {'['};
private static final byte[] CLOSE_BRACKET = {']'};
private boolean afterFirstItem = false;
public byte[] getDelimiter() {
if (this.afterFirstItem) {
return COMMA_SEPARATOR;
}
this.afterFirstItem = true;
return EMPTY_BYTES;
}
public Mono<DataBuffer> getPrefix(DataBufferFactory factory, @Nullable Map<String, Object> hints, Log logger) {
return wrapBytes(OPEN_BRACKET, factory, hints, logger);
}
public Mono<DataBuffer> getSuffix(DataBufferFactory factory, @Nullable Map<String, Object> hints, Log logger) {
return wrapBytes(CLOSE_BRACKET, factory, hints, logger);
}
private Mono<DataBuffer> wrapBytes(
byte[] bytes, DataBufferFactory bufferFactory, @Nullable Map<String, Object> hints, Log logger) {
return Mono.fromCallable(() -> {
DataBuffer buffer = bufferFactory.wrap(bytes);
Hints.touchDataBuffer(buffer, hints, logger);
return buffer;
});
}
}
}

38
spring-web/src/test/java/org/springframework/http/codec/json/Jackson2JsonEncoderTests.java

@ -33,7 +33,6 @@ import reactor.test.StepVerifier; @@ -33,7 +33,6 @@ import reactor.test.StepVerifier;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.testfixture.codec.AbstractEncoderTests;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
@ -138,11 +137,11 @@ public class Jackson2JsonEncoderTests extends AbstractEncoderTests<Jackson2JsonE @@ -138,11 +137,11 @@ public class Jackson2JsonEncoderTests extends AbstractEncoderTests<Jackson2JsonE
);
testEncode(input, Pojo.class, step -> step
.consumeNextWith(expectString("[" +
"{\"foo\":\"foo\",\"bar\":\"bar\"}," +
"{\"foo\":\"foofoo\",\"bar\":\"barbar\"}," +
"{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}]")
.andThen(DataBufferUtils::release))
.consumeNextWith(expectString("["))
.consumeNextWith(expectString("{\"foo\":\"foo\",\"bar\":\"bar\"}"))
.consumeNextWith(expectString(",{\"foo\":\"foofoo\",\"bar\":\"barbar\"}"))
.consumeNextWith(expectString(",{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}"))
.consumeNextWith(expectString("]"))
.verifyComplete());
}
@ -151,8 +150,10 @@ public class Jackson2JsonEncoderTests extends AbstractEncoderTests<Jackson2JsonE @@ -151,8 +150,10 @@ public class Jackson2JsonEncoderTests extends AbstractEncoderTests<Jackson2JsonE
Flux<ParentClass> input = Flux.just(new Foo(), new Bar());
testEncode(input, ParentClass.class, step -> step
.consumeNextWith(expectString("[{\"type\":\"foo\"},{\"type\":\"bar\"}]")
.andThen(DataBufferUtils::release))
.consumeNextWith(expectString("["))
.consumeNextWith(expectString("{\"type\":\"foo\"}"))
.consumeNextWith(expectString(",{\"type\":\"bar\"}"))
.consumeNextWith(expectString("]"))
.verifyComplete());
}
@ -169,12 +170,9 @@ public class Jackson2JsonEncoderTests extends AbstractEncoderTests<Jackson2JsonE @@ -169,12 +170,9 @@ public class Jackson2JsonEncoderTests extends AbstractEncoderTests<Jackson2JsonE
);
testEncode(input, ResolvableType.forClass(Pojo.class), barMediaType, null, step -> step
.consumeNextWith(expectString("{\"foo\":\"foo\",\"bar\":\"bar\"}\n")
.andThen(DataBufferUtils::release))
.consumeNextWith(expectString("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}\n")
.andThen(DataBufferUtils::release))
.consumeNextWith(expectString("{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}\n")
.andThen(DataBufferUtils::release))
.consumeNextWith(expectString("{\"foo\":\"foo\",\"bar\":\"bar\"}\n"))
.consumeNextWith(expectString("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}\n"))
.consumeNextWith(expectString("{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}\n"))
.verifyComplete()
);
}
@ -191,7 +189,7 @@ public class Jackson2JsonEncoderTests extends AbstractEncoderTests<Jackson2JsonE @@ -191,7 +189,7 @@ public class Jackson2JsonEncoderTests extends AbstractEncoderTests<Jackson2JsonE
Map<String, Object> hints = singletonMap(JSON_VIEW_HINT, MyJacksonView1.class);
testEncode(input, type, null, hints, step -> step
.consumeNextWith(expectString("{\"withView1\":\"with\"}").andThen(DataBufferUtils::release))
.consumeNextWith(expectString("{\"withView1\":\"with\"}"))
.verifyComplete()
);
}
@ -208,7 +206,7 @@ public class Jackson2JsonEncoderTests extends AbstractEncoderTests<Jackson2JsonE @@ -208,7 +206,7 @@ public class Jackson2JsonEncoderTests extends AbstractEncoderTests<Jackson2JsonE
Map<String, Object> hints = singletonMap(JSON_VIEW_HINT, MyJacksonView3.class);
testEncode(input, type, null, hints, step -> step
.consumeNextWith(expectString("{\"withoutView\":\"without\"}").andThen(DataBufferUtils::release))
.consumeNextWith(expectString("{\"withoutView\":\"without\"}"))
.verifyComplete()
);
}
@ -226,7 +224,7 @@ public class Jackson2JsonEncoderTests extends AbstractEncoderTests<Jackson2JsonE @@ -226,7 +224,7 @@ public class Jackson2JsonEncoderTests extends AbstractEncoderTests<Jackson2JsonE
ResolvableType type = ResolvableType.forClass(MappingJacksonValue.class);
testEncode(Mono.just(jacksonValue), type, null, Collections.emptyMap(), step -> step
.consumeNextWith(expectString("{\"withView1\":\"with\"}").andThen(DataBufferUtils::release))
.consumeNextWith(expectString("{\"withView1\":\"with\"}"))
.verifyComplete()
);
}
@ -250,7 +248,7 @@ public class Jackson2JsonEncoderTests extends AbstractEncoderTests<Jackson2JsonE @@ -250,7 +248,7 @@ public class Jackson2JsonEncoderTests extends AbstractEncoderTests<Jackson2JsonE
String ls = System.lineSeparator(); // output below is different between Unix and Windows
testEncode(Mono.just(jacksonValue), type, halMediaType, Collections.emptyMap(), step -> step
.consumeNextWith(expectString("{" + ls + " \"withView1\" : \"with\"" + ls + "}")
.andThen(DataBufferUtils::release))
)
.verifyComplete()
);
}
@ -265,7 +263,9 @@ public class Jackson2JsonEncoderTests extends AbstractEncoderTests<Jackson2JsonE @@ -265,7 +263,9 @@ public class Jackson2JsonEncoderTests extends AbstractEncoderTests<Jackson2JsonE
ResolvableType.forClass(Pojo.class), MimeTypeUtils.APPLICATION_JSON, Collections.emptyMap());
StepVerifier.create(result)
.consumeNextWith(expectString("[{\"foo\":\"foo\",\"bar\":\"bar\"}]"))
.consumeNextWith(expectString("["))
.consumeNextWith(expectString("{\"foo\":\"foo\",\"bar\":\"bar\"}"))
.consumeNextWith(expectString("]"))
.expectComplete()
.verify(Duration.ofSeconds(5));
}

Loading…
Cancel
Save