From 17abb4d25d3d5fc08d666d2797377040648fa739 Mon Sep 17 00:00:00 2001 From: Brian Clozel Date: Tue, 18 Jun 2024 15:48:46 +0200 Subject: [PATCH] Add codecs for JSON support with Protobuf Prior to this commit, WebFlux had Protobuf codecs for managing the `Message`/`"application/x-protobuf"` encoding and decoding. The `com.google.protobuf:protobuf-java-util` library has additional support for JSON (de)serialization, but this is not supported by existing codecs. This commit adds the new `ProtobufJsonEncode` and `ProtobufJsonDecoder` classes that support this use case. Note, the `ProtobufJsonDecoder` has a significant limitation: it cannot decode JSON arrays as `Flux` because there is no available non-blocking parser able to tokenize JSON arrays into streams of `Databuffer`. Instead, applications should decode to `Mono>` which causes additional buffering but is properly supported. Closes gh-25457 --- .../http/codec/protobuf/ProtobufEncoder.java | 4 +- .../protobuf/ProtobufHttpMessageWriter.java | 6 +- .../codec/protobuf/ProtobufJsonDecoder.java | 173 ++++++++++++++++++ .../codec/protobuf/ProtobufJsonEncoder.java | 173 ++++++++++++++++++ .../protobuf/ProtobufJsonDecoderTests.java | 89 +++++++++ .../protobuf/ProtobufJsonEncoderTests.java | 120 ++++++++++++ 6 files changed, 560 insertions(+), 5 deletions(-) create mode 100644 spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufJsonDecoder.java create mode 100644 spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufJsonEncoder.java create mode 100644 spring-web/src/test/java/org/springframework/http/codec/protobuf/ProtobufJsonDecoderTests.java create mode 100644 spring-web/src/test/java/org/springframework/http/codec/protobuf/ProtobufJsonEncoderTests.java diff --git a/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufEncoder.java b/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufEncoder.java index 39a5c94a689..43e9dd9f433 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufEncoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufEncoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,7 +47,7 @@ import org.springframework.util.MimeType; * *

To generate {@code Message} Java classes, you need to install the {@code protoc} binary. * - *

This encoder requires Protobuf 3 or higher, and supports + *

This encoder requires Protobuf 3.29 or higher, and supports * {@code "application/x-protobuf"} and {@code "application/octet-stream"} with the official * {@code "com.google.protobuf:protobuf-java"} library. * diff --git a/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufHttpMessageWriter.java b/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufHttpMessageWriter.java index 8a9bddcd332..6f9e4c30816 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufHttpMessageWriter.java +++ b/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufHttpMessageWriter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,8 +28,8 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.core.ResolvableType; -import org.springframework.core.codec.DecodingException; import org.springframework.core.codec.Encoder; +import org.springframework.core.codec.EncodingException; import org.springframework.http.MediaType; import org.springframework.http.ReactiveHttpOutputMessage; import org.springframework.http.codec.EncoderHttpMessageWriter; @@ -97,7 +97,7 @@ public class ProtobufHttpMessageWriter extends EncoderHttpMessageWriter return super.write(inputStream, elementType, mediaType, message, hints); } catch (Exception ex) { - return Mono.error(new DecodingException("Could not read Protobuf message: " + ex.getMessage(), ex)); + return Mono.error(new EncodingException("Could not write Protobuf message: " + ex.getMessage(), ex)); } } diff --git a/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufJsonDecoder.java b/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufJsonDecoder.java new file mode 100644 index 00000000000..d9bdb0b61ed --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufJsonDecoder.java @@ -0,0 +1,173 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.codec.protobuf; + +import java.io.InputStreamReader; +import java.lang.reflect.Method; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; + +import com.google.protobuf.Message; +import com.google.protobuf.util.JsonFormat; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.core.ResolvableType; +import org.springframework.core.codec.Decoder; +import org.springframework.core.codec.DecodingException; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferLimitException; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.http.MediaType; +import org.springframework.lang.Nullable; +import org.springframework.util.ConcurrentReferenceHashMap; +import org.springframework.util.MimeType; + +/** + * A {@code Decoder} that reads a JSON byte stream and converts it to + * Google Protocol Buffers + * {@link com.google.protobuf.Message}s. + * + *

Flux deserialized via + * {@link #decode(Publisher, ResolvableType, MimeType, Map)} are not supported because + * the Protobuf Java Util library does not provide a non-blocking parser + * that splits a JSON stream into tokens. + * Applications should consider decoding to {@code Mono} or + * {@code Mono>}, which will use the supported + * {@link #decodeToMono(Publisher, ResolvableType, MimeType, Map)}. + * + *

To generate {@code Message} Java classes, you need to install the + * {@code protoc} binary. + * + *

This decoder requires Protobuf 3.29 or higher, and supports + * {@code "application/json"} and {@code "application/*+json"} with + * the official {@code "com.google.protobuf:protobuf-java-util"} library. + * + * @author Brian Clozel + * @since 6.2 + * @see ProtobufJsonEncoder + */ +public class ProtobufJsonDecoder implements Decoder { + + /** The default max size for aggregating messages. */ + protected static final int DEFAULT_MESSAGE_MAX_SIZE = 256 * 1024; + + private static final List defaultMimeTypes = List.of(MediaType.APPLICATION_JSON, + new MediaType("application", "*+json")); + + private static final ConcurrentMap, Method> methodCache = new ConcurrentReferenceHashMap<>(); + + private final JsonFormat.Parser parser; + + private int maxMessageSize = DEFAULT_MESSAGE_MAX_SIZE; + + /** + * Construct a new {@link ProtobufJsonDecoder} using a default {@link JsonFormat.Parser} instance. + */ + public ProtobufJsonDecoder() { + this(JsonFormat.parser()); + } + + /** + * Construct a new {@link ProtobufJsonDecoder} using the given {@link JsonFormat.Parser} instance. + */ + public ProtobufJsonDecoder(JsonFormat.Parser parser) { + this.parser = parser; + } + + /** + * Return the {@link #setMaxMessageSize configured} message size limit. + */ + public int getMaxMessageSize() { + return this.maxMessageSize; + } + + /** + * The max size allowed per message. + *

By default, this is set to 256K. + * @param maxMessageSize the max size per message, or -1 for unlimited + */ + public void setMaxMessageSize(int maxMessageSize) { + this.maxMessageSize = maxMessageSize; + } + + @Override + public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType) { + return Message.class.isAssignableFrom(elementType.toClass()) && supportsMimeType(mimeType); + } + + private static boolean supportsMimeType(@Nullable MimeType mimeType) { + if (mimeType == null) { + return false; + } + for (MimeType m : defaultMimeTypes) { + if (m.isCompatibleWith(mimeType)) { + return true; + } + } + return false; + } + + + @Override + public List getDecodableMimeTypes() { + return defaultMimeTypes; + } + + @Override + public Flux decode(Publisher inputStream, ResolvableType targetType, @Nullable MimeType mimeType, @Nullable Map hints) { + return Flux.error(new UnsupportedOperationException("Protobuf decoder does not support Flux, use Mono> instead.")); + } + + @Override + public Message decode(DataBuffer dataBuffer, ResolvableType targetType, @Nullable MimeType mimeType, @Nullable Map hints) throws DecodingException { + try { + Message.Builder builder = getMessageBuilder(targetType.toClass()); + this.parser.merge(new InputStreamReader(dataBuffer.asInputStream()), builder); + return builder.build(); + } + catch (Exception ex) { + throw new DecodingException("Could not read Protobuf message: " + ex.getMessage(), ex); + } + finally { + DataBufferUtils.release(dataBuffer); + } + } + + /** + * Create a new {@code Message.Builder} instance for the given class. + *

This method uses a ConcurrentHashMap for caching method lookups. + */ + private static Message.Builder getMessageBuilder(Class clazz) throws Exception { + Method method = methodCache.get(clazz); + if (method == null) { + method = clazz.getMethod("newBuilder"); + methodCache.put(clazz, method); + } + return (Message.Builder) method.invoke(clazz); + } + + @Override + public Mono decodeToMono(Publisher inputStream, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { + return DataBufferUtils.join(inputStream, this.maxMessageSize) + .map(dataBuffer -> decode(dataBuffer, elementType, mimeType, hints)) + .onErrorMap(DataBufferLimitException.class, exc -> new DecodingException("Could not decode JSON as Protobuf message", exc)); + } + +} diff --git a/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufJsonEncoder.java b/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufJsonEncoder.java new file mode 100644 index 00000000000..2b836ea60aa --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufJsonEncoder.java @@ -0,0 +1,173 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.codec.protobuf; + +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; + +import com.google.protobuf.Message; +import com.google.protobuf.util.JsonFormat; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.core.ResolvableType; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.http.MediaType; +import org.springframework.http.codec.HttpMessageEncoder; +import org.springframework.lang.Nullable; +import org.springframework.util.FastByteArrayOutputStream; +import org.springframework.util.MimeType; + +/** + * A {@code Encoder} that writes {@link com.google.protobuf.Message}s as JSON. + * + *

To generate {@code Message} Java classes, you need to install the + * {@code protoc} binary. + * + *

This encoder requires Protobuf 3.29 or higher, and supports + * {@code "application/json"} and {@code "application/*+json"} with + * the official {@code "com.google.protobuf:protobuf-java-util"} library. + * + * @author Brian Clozel + * @since 6.2 + * @see ProtobufJsonDecoder + */ +public class ProtobufJsonEncoder implements HttpMessageEncoder { + + private static final byte[] EMPTY_BYTES = new byte[0]; + + private static final ResolvableType MESSAGE_TYPE = ResolvableType.forClass(Message.class); + + private static final List defaultMimeTypes = List.of( + MediaType.APPLICATION_JSON, + new MediaType("application", "*+json")); + + private final JsonFormat.Printer printer; + + + /** + * Construct a new {@link ProtobufJsonEncoder} using a default {@link JsonFormat.Printer} instance. + */ + public ProtobufJsonEncoder() { + this(JsonFormat.printer()); + } + + /** + * Construct a new {@link ProtobufJsonEncoder} using the given {@link JsonFormat.Printer} instance. + */ + public ProtobufJsonEncoder(JsonFormat.Printer printer) { + this.printer = printer; + } + + @Override + public List getStreamingMediaTypes() { + return List.of(MediaType.APPLICATION_NDJSON); + } + + @Override + public List getEncodableMimeTypes() { + return defaultMimeTypes; + } + + @Override + public boolean canEncode(ResolvableType elementType, @Nullable MimeType mimeType) { + return Message.class.isAssignableFrom(elementType.toClass()) && supportsMimeType(mimeType); + } + + private static boolean supportsMimeType(@Nullable MimeType mimeType) { + if (mimeType == null) { + return false; + } + for (MimeType m : defaultMimeTypes) { + if (m.isCompatibleWith(mimeType)) { + return true; + } + } + return false; + } + + @Override + public Flux encode(Publisher inputStream, DataBufferFactory bufferFactory, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { + if (inputStream instanceof Mono) { + return Mono.from(inputStream) + .map(value -> encodeValue(value, bufferFactory, elementType, mimeType, hints)) + .flux(); + } + JsonArrayJoinHelper helper = new JsonArrayJoinHelper(); + + // Do not prepend JSON array prefix until first signal is known, onNext vs onError + // Keeps response not committed for error handling + return Flux.from(inputStream) + .map(value -> { + byte[] prefix = helper.getPrefix(); + byte[] delimiter = helper.getDelimiter(); + DataBuffer dataBuffer = encodeValue(value, bufferFactory, MESSAGE_TYPE, mimeType, hints); + return (prefix.length > 0 ? + bufferFactory.join(List.of(bufferFactory.wrap(prefix), bufferFactory.wrap(delimiter), dataBuffer)) : + bufferFactory.join(List.of(bufferFactory.wrap(delimiter), dataBuffer))); + }) + .switchIfEmpty(Mono.fromCallable(() -> bufferFactory.wrap(helper.getPrefix()))) + .concatWith(Mono.fromCallable(() -> bufferFactory.wrap(helper.getSuffix()))); + } + + @Override + public DataBuffer encodeValue(Message message, DataBufferFactory bufferFactory, ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map hints) { + FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); + OutputStreamWriter writer = new OutputStreamWriter(bos, StandardCharsets.UTF_8); + try { + this.printer.appendTo(message, writer); + writer.flush(); + byte[] bytes = bos.toByteArrayUnsafe(); + return bufferFactory.wrap(bytes); + } + catch (IOException ex) { + throw new IllegalStateException("Unexpected I/O error while writing to data buffer", ex); + } + } + + private static class JsonArrayJoinHelper { + + private static final byte[] COMMA_SEPARATOR = {','}; + + private static final byte[] OPEN_BRACKET = {'['}; + + private static final byte[] CLOSE_BRACKET = {']'}; + + private boolean firstItemEmitted; + + public byte[] getDelimiter() { + if (this.firstItemEmitted) { + return COMMA_SEPARATOR; + } + this.firstItemEmitted = true; + return EMPTY_BYTES; + } + + public byte[] getPrefix() { + return (this.firstItemEmitted ? EMPTY_BYTES : OPEN_BRACKET); + } + + public byte[] getSuffix() { + return CLOSE_BRACKET; + } + } +} diff --git a/spring-web/src/test/java/org/springframework/http/codec/protobuf/ProtobufJsonDecoderTests.java b/spring-web/src/test/java/org/springframework/http/codec/protobuf/ProtobufJsonDecoderTests.java new file mode 100644 index 00000000000..07eb52a786c --- /dev/null +++ b/spring-web/src/test/java/org/springframework/http/codec/protobuf/ProtobufJsonDecoderTests.java @@ -0,0 +1,89 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.codec.protobuf; + +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.core.ResolvableType; +import org.springframework.core.codec.DecodingException; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.testfixture.codec.AbstractDecoderTests; +import org.springframework.http.MediaType; +import org.springframework.protobuf.Msg; +import org.springframework.protobuf.SecondMsg; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@link ProtobufJsonDecoder}. + * @author Brian Clozel + */ +public class ProtobufJsonDecoderTests extends AbstractDecoderTests { + + private Msg msg1 = Msg.newBuilder().setFoo("Foo").setBlah(SecondMsg.newBuilder().setBlah(123).build()).build(); + + public ProtobufJsonDecoderTests() { + super(new ProtobufJsonDecoder()); + } + + @Test + @Override + protected void canDecode() throws Exception { + ResolvableType msgType = ResolvableType.forClass(Msg.class); + assertThat(this.decoder.canDecode(msgType, null)).isFalse(); + assertThat(this.decoder.canDecode(msgType, MediaType.APPLICATION_JSON)).isTrue(); + assertThat(this.decoder.canDecode(msgType, MediaType.APPLICATION_PROTOBUF)).isFalse(); + assertThat(this.decoder.canDecode(ResolvableType.forClass(Object.class), MediaType.APPLICATION_JSON)).isFalse(); + } + + @Test + @Override + protected void decode() throws Exception { + ResolvableType msgType = ResolvableType.forClass(Msg.class); + Flux input = Flux.just(dataBuffer("[{\"foo\":\"Foo\",\"blah\":{\"blah\":123}}"), + dataBuffer(",{\"foo\":\"Bar\",\"blah\":{\"blah\":456}}"), + dataBuffer("]")); + + testDecode(input, msgType, step -> step.consumeErrorWith(error -> assertThat(error).isInstanceOf(UnsupportedOperationException.class)), + MediaType.APPLICATION_JSON, null); + } + + @Test + @Override + protected void decodeToMono() throws Exception { + DataBuffer dataBuffer = dataBuffer("{\"foo\":\"Foo\",\"blah\":{\"blah\":123}}"); + testDecodeToMonoAll(Mono.just(dataBuffer), Msg.class, step -> step + .expectNext(this.msg1) + .verifyComplete()); + } + + @Test + void exceedMaxSize() { + this.decoder.setMaxMessageSize(1); + DataBuffer first = dataBuffer("{\"foo\":\"Foo\","); + DataBuffer second = dataBuffer("\"blah\":{\"blah\":123}}"); + + testDecodeToMono(Flux.just(first, second), Msg.class, step -> step.verifyError(DecodingException.class)); + } + + private DataBuffer dataBuffer(String json) { + return this.bufferFactory.wrap(json.getBytes()); + } + +} diff --git a/spring-web/src/test/java/org/springframework/http/codec/protobuf/ProtobufJsonEncoderTests.java b/spring-web/src/test/java/org/springframework/http/codec/protobuf/ProtobufJsonEncoderTests.java new file mode 100644 index 00000000000..3a6fce099dd --- /dev/null +++ b/spring-web/src/test/java/org/springframework/http/codec/protobuf/ProtobufJsonEncoderTests.java @@ -0,0 +1,120 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.codec.protobuf; + +import java.nio.charset.StandardCharsets; + +import com.google.protobuf.Message; +import com.google.protobuf.util.JsonFormat; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import org.springframework.core.ResolvableType; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.testfixture.codec.AbstractEncoderTests; +import org.springframework.core.testfixture.io.buffer.DataBufferTestUtils; +import org.springframework.http.MediaType; +import org.springframework.protobuf.Msg; +import org.springframework.protobuf.SecondMsg; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.springframework.core.ResolvableType.forClass; + +/** + * Tests for {@link ProtobufJsonEncoder}. + * @author Brian Clozel + */ +class ProtobufJsonEncoderTests extends AbstractEncoderTests { + + private Msg msg1 = + Msg.newBuilder().setFoo("Foo").setBlah(SecondMsg.newBuilder().setBlah(123).build()).build(); + + private Msg msg2 = + Msg.newBuilder().setFoo("Bar").setBlah(SecondMsg.newBuilder().setBlah(456).build()).build(); + + public ProtobufJsonEncoderTests() { + super(new ProtobufJsonEncoder(JsonFormat.printer().omittingInsignificantWhitespace())); + } + + @Override + @Test + protected void canEncode() throws Exception { + assertThat(this.encoder.canEncode(forClass(Msg.class), null)).isFalse(); + assertThat(this.encoder.canEncode(forClass(Msg.class), MediaType.APPLICATION_JSON)).isTrue(); + assertThat(this.encoder.canEncode(forClass(Msg.class), MediaType.APPLICATION_NDJSON)).isFalse(); + assertThat(this.encoder.canEncode(forClass(Object.class), MediaType.APPLICATION_JSON)).isFalse(); + } + + @Override + @Test + protected void encode() throws Exception { + Mono input = Mono.just(this.msg1); + ResolvableType inputType = forClass(Msg.class); + + testEncode(input, inputType, MediaType.APPLICATION_JSON, null, step -> step + .assertNext(dataBuffer -> assertBufferEqualsJson(dataBuffer, "{\"foo\":\"Foo\",\"blah\":{\"blah\":123}}")) + .verifyComplete()); + testEncodeError(input, inputType, MediaType.APPLICATION_JSON, null); + testEncodeCancel(input, inputType, MediaType.APPLICATION_JSON, null); + } + + @Test + void encodeEmptyMono() { + Mono input = Mono.empty(); + ResolvableType inputType = forClass(Msg.class); + Flux result = this.encoder.encode(input, this.bufferFactory, inputType, + MediaType.APPLICATION_JSON, null); + StepVerifier.create(result) + .verifyComplete(); + } + + @Test + void encodeStream() { + Flux input = Flux.just(this.msg1, this.msg2); + ResolvableType inputType = forClass(Msg.class); + + testEncode(input, inputType, MediaType.APPLICATION_JSON, null, step -> step + .assertNext(dataBuffer -> assertBufferEqualsJson(dataBuffer, "[{\"foo\":\"Foo\",\"blah\":{\"blah\":123}}")) + .assertNext(dataBuffer -> assertBufferEqualsJson(dataBuffer, ",{\"foo\":\"Bar\",\"blah\":{\"blah\":456}}")) + .assertNext(dataBuffer -> assertBufferEqualsJson(dataBuffer, "]")) + .verifyComplete()); + } + + @Test + void encodeEmptyFlux() { + Flux input = Flux.empty(); + ResolvableType inputType = forClass(Msg.class); + Flux result = this.encoder.encode(input, this.bufferFactory, inputType, + MediaType.APPLICATION_JSON, null); + StepVerifier.create(result) + .assertNext(buffer -> assertBufferEqualsJson(buffer, "[")) + .assertNext(buffer -> assertBufferEqualsJson(buffer, "]")) + .verifyComplete(); + } + + + private void assertBufferEqualsJson(DataBuffer actual, String expected) { + byte[] bytes = DataBufferTestUtils.dumpBytes(actual); + String json = new String(bytes, StandardCharsets.UTF_8); + assertThat(json).isEqualTo(expected); + DataBufferUtils.release(actual); + } + +}