diff --git a/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufEncoder.java b/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufEncoder.java
index 39a5c94a689..43e9dd9f433 100644
--- a/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufEncoder.java
+++ b/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufEncoder.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2022 the original author or authors.
+ * Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -47,7 +47,7 @@ import org.springframework.util.MimeType;
*
*
To generate {@code Message} Java classes, you need to install the {@code protoc} binary.
*
- *
This encoder requires Protobuf 3 or higher, and supports
+ *
This encoder requires Protobuf 3.29 or higher, and supports
* {@code "application/x-protobuf"} and {@code "application/octet-stream"} with the official
* {@code "com.google.protobuf:protobuf-java"} library.
*
diff --git a/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufHttpMessageWriter.java b/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufHttpMessageWriter.java
index 8a9bddcd332..6f9e4c30816 100644
--- a/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufHttpMessageWriter.java
+++ b/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufHttpMessageWriter.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2018 the original author or authors.
+ * Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -28,8 +28,8 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
-import org.springframework.core.codec.DecodingException;
import org.springframework.core.codec.Encoder;
+import org.springframework.core.codec.EncodingException;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.http.codec.EncoderHttpMessageWriter;
@@ -97,7 +97,7 @@ public class ProtobufHttpMessageWriter extends EncoderHttpMessageWriter
return super.write(inputStream, elementType, mediaType, message, hints);
}
catch (Exception ex) {
- return Mono.error(new DecodingException("Could not read Protobuf message: " + ex.getMessage(), ex));
+ return Mono.error(new EncodingException("Could not write Protobuf message: " + ex.getMessage(), ex));
}
}
diff --git a/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufJsonDecoder.java b/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufJsonDecoder.java
new file mode 100644
index 00000000000..d9bdb0b61ed
--- /dev/null
+++ b/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufJsonDecoder.java
@@ -0,0 +1,173 @@
+/*
+ * Copyright 2002-2024 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.http.codec.protobuf;
+
+import java.io.InputStreamReader;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+import com.google.protobuf.Message;
+import com.google.protobuf.util.JsonFormat;
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import org.springframework.core.ResolvableType;
+import org.springframework.core.codec.Decoder;
+import org.springframework.core.codec.DecodingException;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.DataBufferLimitException;
+import org.springframework.core.io.buffer.DataBufferUtils;
+import org.springframework.http.MediaType;
+import org.springframework.lang.Nullable;
+import org.springframework.util.ConcurrentReferenceHashMap;
+import org.springframework.util.MimeType;
+
+/**
+ * A {@code Decoder} that reads a JSON byte stream and converts it to
+ * Google Protocol Buffers
+ * {@link com.google.protobuf.Message}s.
+ *
+ * Flux deserialized via
+ * {@link #decode(Publisher, ResolvableType, MimeType, Map)} are not supported because
+ * the Protobuf Java Util library does not provide a non-blocking parser
+ * that splits a JSON stream into tokens.
+ * Applications should consider decoding to {@code Mono} or
+ * {@code Mono>}, which will use the supported
+ * {@link #decodeToMono(Publisher, ResolvableType, MimeType, Map)}.
+ *
+ * To generate {@code Message} Java classes, you need to install the
+ * {@code protoc} binary.
+ *
+ *
This decoder requires Protobuf 3.29 or higher, and supports
+ * {@code "application/json"} and {@code "application/*+json"} with
+ * the official {@code "com.google.protobuf:protobuf-java-util"} library.
+ *
+ * @author Brian Clozel
+ * @since 6.2
+ * @see ProtobufJsonEncoder
+ */
+public class ProtobufJsonDecoder implements Decoder {
+
+ /** The default max size for aggregating messages. */
+ protected static final int DEFAULT_MESSAGE_MAX_SIZE = 256 * 1024;
+
+ private static final List defaultMimeTypes = List.of(MediaType.APPLICATION_JSON,
+ new MediaType("application", "*+json"));
+
+ private static final ConcurrentMap, Method> methodCache = new ConcurrentReferenceHashMap<>();
+
+ private final JsonFormat.Parser parser;
+
+ private int maxMessageSize = DEFAULT_MESSAGE_MAX_SIZE;
+
+ /**
+ * Construct a new {@link ProtobufJsonDecoder} using a default {@link JsonFormat.Parser} instance.
+ */
+ public ProtobufJsonDecoder() {
+ this(JsonFormat.parser());
+ }
+
+ /**
+ * Construct a new {@link ProtobufJsonDecoder} using the given {@link JsonFormat.Parser} instance.
+ */
+ public ProtobufJsonDecoder(JsonFormat.Parser parser) {
+ this.parser = parser;
+ }
+
+ /**
+ * Return the {@link #setMaxMessageSize configured} message size limit.
+ */
+ public int getMaxMessageSize() {
+ return this.maxMessageSize;
+ }
+
+ /**
+ * The max size allowed per message.
+ * By default, this is set to 256K.
+ * @param maxMessageSize the max size per message, or -1 for unlimited
+ */
+ public void setMaxMessageSize(int maxMessageSize) {
+ this.maxMessageSize = maxMessageSize;
+ }
+
+ @Override
+ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType) {
+ return Message.class.isAssignableFrom(elementType.toClass()) && supportsMimeType(mimeType);
+ }
+
+ private static boolean supportsMimeType(@Nullable MimeType mimeType) {
+ if (mimeType == null) {
+ return false;
+ }
+ for (MimeType m : defaultMimeTypes) {
+ if (m.isCompatibleWith(mimeType)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+
+ @Override
+ public List getDecodableMimeTypes() {
+ return defaultMimeTypes;
+ }
+
+ @Override
+ public Flux decode(Publisher inputStream, ResolvableType targetType, @Nullable MimeType mimeType, @Nullable Map hints) {
+ return Flux.error(new UnsupportedOperationException("Protobuf decoder does not support Flux, use Mono> instead."));
+ }
+
+ @Override
+ public Message decode(DataBuffer dataBuffer, ResolvableType targetType, @Nullable MimeType mimeType, @Nullable Map hints) throws DecodingException {
+ try {
+ Message.Builder builder = getMessageBuilder(targetType.toClass());
+ this.parser.merge(new InputStreamReader(dataBuffer.asInputStream()), builder);
+ return builder.build();
+ }
+ catch (Exception ex) {
+ throw new DecodingException("Could not read Protobuf message: " + ex.getMessage(), ex);
+ }
+ finally {
+ DataBufferUtils.release(dataBuffer);
+ }
+ }
+
+ /**
+ * Create a new {@code Message.Builder} instance for the given class.
+ * This method uses a ConcurrentHashMap for caching method lookups.
+ */
+ private static Message.Builder getMessageBuilder(Class> clazz) throws Exception {
+ Method method = methodCache.get(clazz);
+ if (method == null) {
+ method = clazz.getMethod("newBuilder");
+ methodCache.put(clazz, method);
+ }
+ return (Message.Builder) method.invoke(clazz);
+ }
+
+ @Override
+ public Mono decodeToMono(Publisher inputStream, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) {
+ return DataBufferUtils.join(inputStream, this.maxMessageSize)
+ .map(dataBuffer -> decode(dataBuffer, elementType, mimeType, hints))
+ .onErrorMap(DataBufferLimitException.class, exc -> new DecodingException("Could not decode JSON as Protobuf message", exc));
+ }
+
+}
diff --git a/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufJsonEncoder.java b/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufJsonEncoder.java
new file mode 100644
index 00000000000..2b836ea60aa
--- /dev/null
+++ b/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufJsonEncoder.java
@@ -0,0 +1,173 @@
+/*
+ * Copyright 2002-2024 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.http.codec.protobuf;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+
+import com.google.protobuf.Message;
+import com.google.protobuf.util.JsonFormat;
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import org.springframework.core.ResolvableType;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.DataBufferFactory;
+import org.springframework.http.MediaType;
+import org.springframework.http.codec.HttpMessageEncoder;
+import org.springframework.lang.Nullable;
+import org.springframework.util.FastByteArrayOutputStream;
+import org.springframework.util.MimeType;
+
+/**
+ * A {@code Encoder} that writes {@link com.google.protobuf.Message}s as JSON.
+ *
+ * To generate {@code Message} Java classes, you need to install the
+ * {@code protoc} binary.
+ *
+ *
This encoder requires Protobuf 3.29 or higher, and supports
+ * {@code "application/json"} and {@code "application/*+json"} with
+ * the official {@code "com.google.protobuf:protobuf-java-util"} library.
+ *
+ * @author Brian Clozel
+ * @since 6.2
+ * @see ProtobufJsonDecoder
+ */
+public class ProtobufJsonEncoder implements HttpMessageEncoder {
+
+ private static final byte[] EMPTY_BYTES = new byte[0];
+
+ private static final ResolvableType MESSAGE_TYPE = ResolvableType.forClass(Message.class);
+
+ private static final List defaultMimeTypes = List.of(
+ MediaType.APPLICATION_JSON,
+ new MediaType("application", "*+json"));
+
+ private final JsonFormat.Printer printer;
+
+
+ /**
+ * Construct a new {@link ProtobufJsonEncoder} using a default {@link JsonFormat.Printer} instance.
+ */
+ public ProtobufJsonEncoder() {
+ this(JsonFormat.printer());
+ }
+
+ /**
+ * Construct a new {@link ProtobufJsonEncoder} using the given {@link JsonFormat.Printer} instance.
+ */
+ public ProtobufJsonEncoder(JsonFormat.Printer printer) {
+ this.printer = printer;
+ }
+
+ @Override
+ public List getStreamingMediaTypes() {
+ return List.of(MediaType.APPLICATION_NDJSON);
+ }
+
+ @Override
+ public List getEncodableMimeTypes() {
+ return defaultMimeTypes;
+ }
+
+ @Override
+ public boolean canEncode(ResolvableType elementType, @Nullable MimeType mimeType) {
+ return Message.class.isAssignableFrom(elementType.toClass()) && supportsMimeType(mimeType);
+ }
+
+ private static boolean supportsMimeType(@Nullable MimeType mimeType) {
+ if (mimeType == null) {
+ return false;
+ }
+ for (MimeType m : defaultMimeTypes) {
+ if (m.isCompatibleWith(mimeType)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public Flux encode(Publisher extends Message> inputStream, DataBufferFactory bufferFactory, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) {
+ if (inputStream instanceof Mono) {
+ return Mono.from(inputStream)
+ .map(value -> encodeValue(value, bufferFactory, elementType, mimeType, hints))
+ .flux();
+ }
+ JsonArrayJoinHelper helper = new JsonArrayJoinHelper();
+
+ // Do not prepend JSON array prefix until first signal is known, onNext vs onError
+ // Keeps response not committed for error handling
+ return Flux.from(inputStream)
+ .map(value -> {
+ byte[] prefix = helper.getPrefix();
+ byte[] delimiter = helper.getDelimiter();
+ DataBuffer dataBuffer = encodeValue(value, bufferFactory, MESSAGE_TYPE, mimeType, hints);
+ return (prefix.length > 0 ?
+ bufferFactory.join(List.of(bufferFactory.wrap(prefix), bufferFactory.wrap(delimiter), dataBuffer)) :
+ bufferFactory.join(List.of(bufferFactory.wrap(delimiter), dataBuffer)));
+ })
+ .switchIfEmpty(Mono.fromCallable(() -> bufferFactory.wrap(helper.getPrefix())))
+ .concatWith(Mono.fromCallable(() -> bufferFactory.wrap(helper.getSuffix())));
+ }
+
+ @Override
+ public DataBuffer encodeValue(Message message, DataBufferFactory bufferFactory, ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map hints) {
+ FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
+ OutputStreamWriter writer = new OutputStreamWriter(bos, StandardCharsets.UTF_8);
+ try {
+ this.printer.appendTo(message, writer);
+ writer.flush();
+ byte[] bytes = bos.toByteArrayUnsafe();
+ return bufferFactory.wrap(bytes);
+ }
+ catch (IOException ex) {
+ throw new IllegalStateException("Unexpected I/O error while writing to data buffer", ex);
+ }
+ }
+
+ private static class JsonArrayJoinHelper {
+
+ private static final byte[] COMMA_SEPARATOR = {','};
+
+ private static final byte[] OPEN_BRACKET = {'['};
+
+ private static final byte[] CLOSE_BRACKET = {']'};
+
+ private boolean firstItemEmitted;
+
+ public byte[] getDelimiter() {
+ if (this.firstItemEmitted) {
+ return COMMA_SEPARATOR;
+ }
+ this.firstItemEmitted = true;
+ return EMPTY_BYTES;
+ }
+
+ public byte[] getPrefix() {
+ return (this.firstItemEmitted ? EMPTY_BYTES : OPEN_BRACKET);
+ }
+
+ public byte[] getSuffix() {
+ return CLOSE_BRACKET;
+ }
+ }
+}
diff --git a/spring-web/src/test/java/org/springframework/http/codec/protobuf/ProtobufJsonDecoderTests.java b/spring-web/src/test/java/org/springframework/http/codec/protobuf/ProtobufJsonDecoderTests.java
new file mode 100644
index 00000000000..07eb52a786c
--- /dev/null
+++ b/spring-web/src/test/java/org/springframework/http/codec/protobuf/ProtobufJsonDecoderTests.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2002-2024 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.http.codec.protobuf;
+
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import org.springframework.core.ResolvableType;
+import org.springframework.core.codec.DecodingException;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.testfixture.codec.AbstractDecoderTests;
+import org.springframework.http.MediaType;
+import org.springframework.protobuf.Msg;
+import org.springframework.protobuf.SecondMsg;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for {@link ProtobufJsonDecoder}.
+ * @author Brian Clozel
+ */
+public class ProtobufJsonDecoderTests extends AbstractDecoderTests {
+
+ private Msg msg1 = Msg.newBuilder().setFoo("Foo").setBlah(SecondMsg.newBuilder().setBlah(123).build()).build();
+
+ public ProtobufJsonDecoderTests() {
+ super(new ProtobufJsonDecoder());
+ }
+
+ @Test
+ @Override
+ protected void canDecode() throws Exception {
+ ResolvableType msgType = ResolvableType.forClass(Msg.class);
+ assertThat(this.decoder.canDecode(msgType, null)).isFalse();
+ assertThat(this.decoder.canDecode(msgType, MediaType.APPLICATION_JSON)).isTrue();
+ assertThat(this.decoder.canDecode(msgType, MediaType.APPLICATION_PROTOBUF)).isFalse();
+ assertThat(this.decoder.canDecode(ResolvableType.forClass(Object.class), MediaType.APPLICATION_JSON)).isFalse();
+ }
+
+ @Test
+ @Override
+ protected void decode() throws Exception {
+ ResolvableType msgType = ResolvableType.forClass(Msg.class);
+ Flux input = Flux.just(dataBuffer("[{\"foo\":\"Foo\",\"blah\":{\"blah\":123}}"),
+ dataBuffer(",{\"foo\":\"Bar\",\"blah\":{\"blah\":456}}"),
+ dataBuffer("]"));
+
+ testDecode(input, msgType, step -> step.consumeErrorWith(error -> assertThat(error).isInstanceOf(UnsupportedOperationException.class)),
+ MediaType.APPLICATION_JSON, null);
+ }
+
+ @Test
+ @Override
+ protected void decodeToMono() throws Exception {
+ DataBuffer dataBuffer = dataBuffer("{\"foo\":\"Foo\",\"blah\":{\"blah\":123}}");
+ testDecodeToMonoAll(Mono.just(dataBuffer), Msg.class, step -> step
+ .expectNext(this.msg1)
+ .verifyComplete());
+ }
+
+ @Test
+ void exceedMaxSize() {
+ this.decoder.setMaxMessageSize(1);
+ DataBuffer first = dataBuffer("{\"foo\":\"Foo\",");
+ DataBuffer second = dataBuffer("\"blah\":{\"blah\":123}}");
+
+ testDecodeToMono(Flux.just(first, second), Msg.class, step -> step.verifyError(DecodingException.class));
+ }
+
+ private DataBuffer dataBuffer(String json) {
+ return this.bufferFactory.wrap(json.getBytes());
+ }
+
+}
diff --git a/spring-web/src/test/java/org/springframework/http/codec/protobuf/ProtobufJsonEncoderTests.java b/spring-web/src/test/java/org/springframework/http/codec/protobuf/ProtobufJsonEncoderTests.java
new file mode 100644
index 00000000000..3a6fce099dd
--- /dev/null
+++ b/spring-web/src/test/java/org/springframework/http/codec/protobuf/ProtobufJsonEncoderTests.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2002-2024 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.http.codec.protobuf;
+
+import java.nio.charset.StandardCharsets;
+
+import com.google.protobuf.Message;
+import com.google.protobuf.util.JsonFormat;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+import org.springframework.core.ResolvableType;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.DataBufferUtils;
+import org.springframework.core.testfixture.codec.AbstractEncoderTests;
+import org.springframework.core.testfixture.io.buffer.DataBufferTestUtils;
+import org.springframework.http.MediaType;
+import org.springframework.protobuf.Msg;
+import org.springframework.protobuf.SecondMsg;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.springframework.core.ResolvableType.forClass;
+
+/**
+ * Tests for {@link ProtobufJsonEncoder}.
+ * @author Brian Clozel
+ */
+class ProtobufJsonEncoderTests extends AbstractEncoderTests {
+
+ private Msg msg1 =
+ Msg.newBuilder().setFoo("Foo").setBlah(SecondMsg.newBuilder().setBlah(123).build()).build();
+
+ private Msg msg2 =
+ Msg.newBuilder().setFoo("Bar").setBlah(SecondMsg.newBuilder().setBlah(456).build()).build();
+
+ public ProtobufJsonEncoderTests() {
+ super(new ProtobufJsonEncoder(JsonFormat.printer().omittingInsignificantWhitespace()));
+ }
+
+ @Override
+ @Test
+ protected void canEncode() throws Exception {
+ assertThat(this.encoder.canEncode(forClass(Msg.class), null)).isFalse();
+ assertThat(this.encoder.canEncode(forClass(Msg.class), MediaType.APPLICATION_JSON)).isTrue();
+ assertThat(this.encoder.canEncode(forClass(Msg.class), MediaType.APPLICATION_NDJSON)).isFalse();
+ assertThat(this.encoder.canEncode(forClass(Object.class), MediaType.APPLICATION_JSON)).isFalse();
+ }
+
+ @Override
+ @Test
+ protected void encode() throws Exception {
+ Mono input = Mono.just(this.msg1);
+ ResolvableType inputType = forClass(Msg.class);
+
+ testEncode(input, inputType, MediaType.APPLICATION_JSON, null, step -> step
+ .assertNext(dataBuffer -> assertBufferEqualsJson(dataBuffer, "{\"foo\":\"Foo\",\"blah\":{\"blah\":123}}"))
+ .verifyComplete());
+ testEncodeError(input, inputType, MediaType.APPLICATION_JSON, null);
+ testEncodeCancel(input, inputType, MediaType.APPLICATION_JSON, null);
+ }
+
+ @Test
+ void encodeEmptyMono() {
+ Mono input = Mono.empty();
+ ResolvableType inputType = forClass(Msg.class);
+ Flux result = this.encoder.encode(input, this.bufferFactory, inputType,
+ MediaType.APPLICATION_JSON, null);
+ StepVerifier.create(result)
+ .verifyComplete();
+ }
+
+ @Test
+ void encodeStream() {
+ Flux input = Flux.just(this.msg1, this.msg2);
+ ResolvableType inputType = forClass(Msg.class);
+
+ testEncode(input, inputType, MediaType.APPLICATION_JSON, null, step -> step
+ .assertNext(dataBuffer -> assertBufferEqualsJson(dataBuffer, "[{\"foo\":\"Foo\",\"blah\":{\"blah\":123}}"))
+ .assertNext(dataBuffer -> assertBufferEqualsJson(dataBuffer, ",{\"foo\":\"Bar\",\"blah\":{\"blah\":456}}"))
+ .assertNext(dataBuffer -> assertBufferEqualsJson(dataBuffer, "]"))
+ .verifyComplete());
+ }
+
+ @Test
+ void encodeEmptyFlux() {
+ Flux input = Flux.empty();
+ ResolvableType inputType = forClass(Msg.class);
+ Flux result = this.encoder.encode(input, this.bufferFactory, inputType,
+ MediaType.APPLICATION_JSON, null);
+ StepVerifier.create(result)
+ .assertNext(buffer -> assertBufferEqualsJson(buffer, "["))
+ .assertNext(buffer -> assertBufferEqualsJson(buffer, "]"))
+ .verifyComplete();
+ }
+
+
+ private void assertBufferEqualsJson(DataBuffer actual, String expected) {
+ byte[] bytes = DataBufferTestUtils.dumpBytes(actual);
+ String json = new String(bytes, StandardCharsets.UTF_8);
+ assertThat(json).isEqualTo(expected);
+ DataBufferUtils.release(actual);
+ }
+
+}