Browse Source
This commit adds new `GsonEncoder` and `GsonDecoder` for serializing and deserializing JSON in a reactive fashion. Because `Gson` itslef does not support decoding JSON in a non-blocking way, the `GsonDecoder` does not support decoding to `Flux<*>` types. Closes gh-27131pull/35535/head
8 changed files with 574 additions and 0 deletions
@ -0,0 +1,101 @@
@@ -0,0 +1,101 @@
|
||||
/* |
||||
* Copyright 2002-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.http.codec.json; |
||||
|
||||
import java.io.InputStreamReader; |
||||
import java.util.Map; |
||||
|
||||
import com.google.gson.Gson; |
||||
import org.jspecify.annotations.Nullable; |
||||
import org.reactivestreams.Publisher; |
||||
import reactor.core.publisher.Flux; |
||||
|
||||
import org.springframework.core.ResolvableType; |
||||
import org.springframework.core.codec.AbstractDataBufferDecoder; |
||||
import org.springframework.core.codec.Decoder; |
||||
import org.springframework.core.codec.DecodingException; |
||||
import org.springframework.core.io.buffer.DataBuffer; |
||||
import org.springframework.core.io.buffer.DataBufferUtils; |
||||
import org.springframework.http.MediaType; |
||||
import org.springframework.util.Assert; |
||||
import org.springframework.util.MimeType; |
||||
|
||||
/** |
||||
* {@link Decoder} that reads a byte stream into JSON and converts it to Objects with |
||||
* <a href="https://google.github.io/gson/">Google Gson</a>. |
||||
* <p>{@code Flux<*>} target types are not available because non-blocking parsing is not supported, |
||||
* so this decoder targets only {@code Mono<*>} types. Attempting to decode to a {@code Flux<*>} will |
||||
* result in a {@link UnsupportedOperationException} being thrown at runtime. |
||||
* |
||||
* @author Brian Clozel |
||||
* @since 7.0 |
||||
*/ |
||||
public class GsonDecoder extends AbstractDataBufferDecoder<Object> { |
||||
|
||||
private static final MimeType[] DEFAULT_JSON_MIME_TYPES = new MimeType[] { |
||||
MediaType.APPLICATION_JSON, |
||||
new MediaType("application", "*+json"), |
||||
}; |
||||
|
||||
private final Gson gson; |
||||
|
||||
/** |
||||
* Construct a new decoder using a default {@link Gson} instance |
||||
* and the {@code "application/json"} and {@code "application/*+json"} |
||||
* MIME types. |
||||
*/ |
||||
public GsonDecoder() { |
||||
this(new Gson(), DEFAULT_JSON_MIME_TYPES); |
||||
} |
||||
|
||||
/** |
||||
* Construct a new decoder using the given {@link Gson} instance |
||||
* and the provided MIME types. |
||||
* @param gson the gson instance to use |
||||
* @param mimeTypes the mime types the decoder should support |
||||
*/ |
||||
public GsonDecoder(Gson gson, MimeType... mimeTypes) { |
||||
super(mimeTypes); |
||||
Assert.notNull(gson, "A Gson instance is required"); |
||||
this.gson = gson; |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType) { |
||||
if (!super.canDecode(elementType, mimeType)) { |
||||
return false; |
||||
} |
||||
return !CharSequence.class.isAssignableFrom(elementType.toClass()); |
||||
} |
||||
|
||||
@Override |
||||
public Flux<Object> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) { |
||||
throw new UnsupportedOperationException("Stream decoding is currently not supported"); |
||||
} |
||||
|
||||
@Override |
||||
public @Nullable Object decode(DataBuffer buffer, ResolvableType targetType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) throws DecodingException { |
||||
try { |
||||
return this.gson.fromJson(new InputStreamReader(buffer.asInputStream()), targetType.getType()); |
||||
} |
||||
finally { |
||||
DataBufferUtils.release(buffer); |
||||
} |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,205 @@
@@ -0,0 +1,205 @@
|
||||
/* |
||||
* Copyright 2002-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.http.codec.json; |
||||
|
||||
import java.io.IOException; |
||||
import java.io.OutputStreamWriter; |
||||
import java.nio.charset.StandardCharsets; |
||||
import java.util.ArrayList; |
||||
import java.util.Collections; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
|
||||
import com.google.gson.Gson; |
||||
import org.jspecify.annotations.Nullable; |
||||
import org.reactivestreams.Publisher; |
||||
import reactor.core.publisher.Flux; |
||||
import reactor.core.publisher.Mono; |
||||
|
||||
import org.springframework.core.ResolvableType; |
||||
import org.springframework.core.codec.AbstractEncoder; |
||||
import org.springframework.core.codec.EncodingException; |
||||
import org.springframework.core.io.buffer.DataBuffer; |
||||
import org.springframework.core.io.buffer.DataBufferFactory; |
||||
import org.springframework.http.MediaType; |
||||
import org.springframework.http.codec.HttpMessageEncoder; |
||||
import org.springframework.util.Assert; |
||||
import org.springframework.util.FastByteArrayOutputStream; |
||||
import org.springframework.util.MimeType; |
||||
|
||||
/** |
||||
* Encode from an {@code Object} stream to a byte stream of JSON objects using |
||||
* <a href="https://google.github.io/gson/">Google Gson</a>. |
||||
* |
||||
* @author Brian Clozel |
||||
* @since 7.0 |
||||
*/ |
||||
public class GsonEncoder extends AbstractEncoder<Object> implements HttpMessageEncoder<Object> { |
||||
|
||||
private static final byte[] NEWLINE_SEPARATOR = {'\n'}; |
||||
|
||||
private static final byte[] EMPTY_BYTES = new byte[0]; |
||||
|
||||
private static final MimeType[] DEFAULT_JSON_MIME_TYPES = new MimeType[] { |
||||
MediaType.APPLICATION_JSON, |
||||
new MediaType("application", "*+json"), |
||||
MediaType.APPLICATION_NDJSON |
||||
}; |
||||
|
||||
private final Gson gson; |
||||
|
||||
private final List<MediaType> streamingMediaTypes = new ArrayList<>(1); |
||||
|
||||
/** |
||||
* Construct a new encoder using a default {@link Gson} instance |
||||
* and the {@code "application/json"} and {@code "application/*+json"} |
||||
* MIME types. The {@code "application/x-ndjson"} is configured for streaming. |
||||
*/ |
||||
public GsonEncoder() { |
||||
this(new Gson(), DEFAULT_JSON_MIME_TYPES); |
||||
setStreamingMediaTypes(List.of(MediaType.APPLICATION_NDJSON)); |
||||
} |
||||
|
||||
/** |
||||
* Construct a new encoder using the given {@link Gson} instance |
||||
* and the provided MIME types. Use {@link #setStreamingMediaTypes(List)} |
||||
* for configuring streaming media types. |
||||
* @param gson the gson instance to use |
||||
* @param mimeTypes the mime types the decoder should support |
||||
*/ |
||||
public GsonEncoder(Gson gson, MimeType... mimeTypes) { |
||||
super(mimeTypes); |
||||
Assert.notNull(gson, "A Gson instance is required"); |
||||
this.gson = gson; |
||||
} |
||||
|
||||
/** |
||||
* Configure "streaming" media types for which flushing should be performed |
||||
* automatically vs at the end of the stream. |
||||
*/ |
||||
public void setStreamingMediaTypes(List<MediaType> mediaTypes) { |
||||
this.streamingMediaTypes.clear(); |
||||
this.streamingMediaTypes.addAll(mediaTypes); |
||||
} |
||||
|
||||
@Override |
||||
public List<MediaType> getStreamingMediaTypes() { |
||||
return Collections.unmodifiableList(this.streamingMediaTypes); |
||||
} |
||||
|
||||
@Override |
||||
public boolean canEncode(ResolvableType elementType, @Nullable MimeType mimeType) { |
||||
if (!super.canEncode(elementType, mimeType)) { |
||||
return false; |
||||
} |
||||
Class<?> clazz = elementType.toClass(); |
||||
return !String.class.isAssignableFrom(elementType.resolve(clazz)); |
||||
} |
||||
|
||||
@Override |
||||
public Flux<DataBuffer> encode(Publisher<?> inputStream, DataBufferFactory bufferFactory, ResolvableType elementType, |
||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) { |
||||
|
||||
boolean isStreaming = isStreamingMediaType(mimeType); |
||||
if (isStreaming) { |
||||
return Flux.from(inputStream).map(message -> encodeValue(message, bufferFactory, EMPTY_BYTES, NEWLINE_SEPARATOR)); |
||||
} |
||||
else { |
||||
JsonArrayJoinHelper helper = new JsonArrayJoinHelper(); |
||||
// Do not prepend JSON array prefix until first signal is known, onNext vs onError
|
||||
// Keeps response not committed for error handling
|
||||
return Flux.from(inputStream) |
||||
.map(value -> { |
||||
byte[] prefix = helper.getPrefix(); |
||||
byte[] delimiter = helper.getDelimiter(); |
||||
DataBuffer dataBuffer = encodeValue(value, bufferFactory, delimiter, EMPTY_BYTES); |
||||
return (prefix.length > 0 ? |
||||
bufferFactory.join(List.of(bufferFactory.wrap(prefix), dataBuffer)) : |
||||
dataBuffer); |
||||
}) |
||||
.switchIfEmpty(Mono.fromCallable(() -> bufferFactory.wrap(helper.getPrefix()))) |
||||
.concatWith(Mono.fromCallable(() -> bufferFactory.wrap(helper.getSuffix()))); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public DataBuffer encodeValue(Object value, DataBufferFactory bufferFactory, ResolvableType valueType, |
||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) { |
||||
return encodeValue(value, bufferFactory, EMPTY_BYTES, EMPTY_BYTES); |
||||
} |
||||
|
||||
private DataBuffer encodeValue(Object value, DataBufferFactory bufferFactory, |
||||
byte[] prefix, byte[] suffix) { |
||||
try { |
||||
FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); |
||||
OutputStreamWriter writer = new OutputStreamWriter(bos, StandardCharsets.UTF_8); |
||||
bos.write(prefix); |
||||
this.gson.toJson(value, writer); |
||||
writer.flush(); |
||||
bos.write(suffix); |
||||
byte[] bytes = bos.toByteArrayUnsafe(); |
||||
return bufferFactory.wrap(bytes); |
||||
} |
||||
catch (IOException ex) { |
||||
throw new EncodingException("JSON encoding error: " + ex.getMessage(), ex); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Return the separator to use for the given mime type. |
||||
* <p>By default, this method returns new line {@code "\n"} if the given |
||||
* mime type is one of the configured {@link #setStreamingMediaTypes(List) |
||||
* streaming} mime types. |
||||
*/ |
||||
protected boolean isStreamingMediaType(@Nullable MimeType mimeType) { |
||||
for (MediaType streamingMediaType : this.streamingMediaTypes) { |
||||
if (streamingMediaType.isCompatibleWith(mimeType)) { |
||||
return true; |
||||
} |
||||
} |
||||
return false; |
||||
} |
||||
|
||||
|
||||
private static class JsonArrayJoinHelper { |
||||
|
||||
private static final byte[] COMMA_SEPARATOR = {','}; |
||||
|
||||
private static final byte[] OPEN_BRACKET = {'['}; |
||||
|
||||
private static final byte[] CLOSE_BRACKET = {']'}; |
||||
|
||||
private boolean firstItemEmitted; |
||||
|
||||
public byte[] getDelimiter() { |
||||
if (this.firstItemEmitted) { |
||||
return COMMA_SEPARATOR; |
||||
} |
||||
this.firstItemEmitted = true; |
||||
return EMPTY_BYTES; |
||||
} |
||||
|
||||
public byte[] getPrefix() { |
||||
return (this.firstItemEmitted ? EMPTY_BYTES : OPEN_BRACKET); |
||||
} |
||||
|
||||
public byte[] getSuffix() { |
||||
return CLOSE_BRACKET; |
||||
} |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,98 @@
@@ -0,0 +1,98 @@
|
||||
/* |
||||
* Copyright 2002-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.http.codec.json; |
||||
|
||||
import java.nio.charset.Charset; |
||||
import java.nio.charset.StandardCharsets; |
||||
import java.util.Arrays; |
||||
import java.util.List; |
||||
|
||||
import org.junit.jupiter.api.Test; |
||||
import reactor.core.publisher.Flux; |
||||
import reactor.core.publisher.Mono; |
||||
|
||||
import org.springframework.core.ResolvableType; |
||||
import org.springframework.core.io.buffer.DataBuffer; |
||||
import org.springframework.core.testfixture.codec.AbstractDecoderTests; |
||||
import org.springframework.web.testfixture.xml.Pojo; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
import static org.assertj.core.api.Assertions.assertThatThrownBy; |
||||
import static org.springframework.http.MediaType.APPLICATION_JSON; |
||||
import static org.springframework.http.MediaType.APPLICATION_NDJSON; |
||||
import static org.springframework.http.MediaType.APPLICATION_XML; |
||||
|
||||
/** |
||||
* Tests for {@link GsonDecoder}. |
||||
*/ |
||||
class GsonDecoderTests extends AbstractDecoderTests<GsonDecoder> { |
||||
|
||||
|
||||
public GsonDecoderTests() { |
||||
super(new GsonDecoder()); |
||||
} |
||||
|
||||
@Test |
||||
@Override |
||||
protected void canDecode() throws Exception { |
||||
assertThat(decoder.canDecode(ResolvableType.forClass(Pojo.class), APPLICATION_JSON)).isTrue(); |
||||
assertThat(decoder.canDecode(ResolvableType.forClass(Pojo.class), APPLICATION_NDJSON)).isFalse(); |
||||
assertThat(decoder.canDecode(ResolvableType.forClass(Pojo.class), null)).isTrue(); |
||||
|
||||
assertThat(decoder.canDecode(ResolvableType.forClass(String.class), null)).isFalse(); |
||||
assertThat(decoder.canDecode(ResolvableType.forClass(Pojo.class), APPLICATION_XML)).isFalse(); |
||||
} |
||||
|
||||
@Test |
||||
@Override |
||||
protected void decode() throws Exception { |
||||
Flux<DataBuffer> input = Flux.concat( |
||||
stringBuffer("[{\"bar\":\"b1\",\"foo\":\"f1\"},"), |
||||
stringBuffer("{\"bar\":\"b2\",\"foo\":\"f2\"}]")); |
||||
assertThatThrownBy(() -> decoder.decode(input, ResolvableType.forClass(Pojo.class), APPLICATION_JSON, null)) |
||||
.isInstanceOf(UnsupportedOperationException.class); |
||||
} |
||||
|
||||
@Test |
||||
@Override |
||||
protected void decodeToMono() throws Exception { |
||||
Flux<DataBuffer> input = Flux.concat( |
||||
stringBuffer("[{\"bar\":\"b1\",\"foo\":\"f1\"},"), |
||||
stringBuffer("{\"bar\":\"b2\",\"foo\":\"f2\"}]")); |
||||
|
||||
ResolvableType elementType = ResolvableType.forClassWithGenerics(List.class, Pojo.class); |
||||
|
||||
testDecodeToMonoAll(input, elementType, step -> step |
||||
.expectNext(Arrays.asList(new Pojo("f1", "b1"), new Pojo("f2", "b2"))) |
||||
.expectComplete() |
||||
.verify(), null, null); |
||||
} |
||||
|
||||
private Mono<DataBuffer> stringBuffer(String value) { |
||||
return stringBuffer(value, StandardCharsets.UTF_8); |
||||
} |
||||
|
||||
|
||||
private Mono<DataBuffer> stringBuffer(String value, Charset charset) { |
||||
return Mono.defer(() -> { |
||||
byte[] bytes = value.getBytes(charset); |
||||
DataBuffer buffer = this.bufferFactory.allocateBuffer(bytes.length); |
||||
buffer.write(bytes); |
||||
return Mono.just(buffer); |
||||
}); |
||||
} |
||||
} |
||||
@ -0,0 +1,97 @@
@@ -0,0 +1,97 @@
|
||||
/* |
||||
* Copyright 2002-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.http.codec.json; |
||||
|
||||
import org.junit.jupiter.api.Test; |
||||
import reactor.core.publisher.Flux; |
||||
import reactor.test.StepVerifier; |
||||
|
||||
import org.springframework.core.ResolvableType; |
||||
import org.springframework.core.io.buffer.DataBuffer; |
||||
import org.springframework.core.testfixture.codec.AbstractEncoderTests; |
||||
import org.springframework.web.testfixture.xml.Pojo; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
import static org.springframework.http.MediaType.APPLICATION_JSON; |
||||
import static org.springframework.http.MediaType.APPLICATION_NDJSON; |
||||
|
||||
class GsonEncoderTests extends AbstractEncoderTests<GsonEncoder> { |
||||
|
||||
public GsonEncoderTests() { |
||||
super(new GsonEncoder()); |
||||
} |
||||
|
||||
@Test |
||||
@Override |
||||
protected void canEncode() throws Exception { |
||||
ResolvableType pojoType = ResolvableType.forClass(Pojo.class); |
||||
assertThat(this.encoder.canEncode(pojoType, APPLICATION_JSON)).isTrue(); |
||||
assertThat(this.encoder.canEncode(pojoType, APPLICATION_NDJSON)).isTrue(); |
||||
assertThat(this.encoder.canEncode(pojoType, null)).isTrue(); |
||||
|
||||
} |
||||
|
||||
@Test |
||||
@Override |
||||
protected void encode() throws Exception { |
||||
Flux<Object> input = Flux.just(new Pojo("foo", "bar"), |
||||
new Pojo("foofoo", "barbar"), |
||||
new Pojo("foofoofoo", "barbarbar")); |
||||
|
||||
testEncodeAll(input, ResolvableType.forClass(Pojo.class), APPLICATION_NDJSON, null, step -> step |
||||
.consumeNextWith(expectString("{\"foo\":\"foo\",\"bar\":\"bar\"}\n")) |
||||
.consumeNextWith(expectString("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}\n")) |
||||
.consumeNextWith(expectString("{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}\n")) |
||||
.verifyComplete() |
||||
); |
||||
} |
||||
|
||||
@Test |
||||
void encodeNonStream() { |
||||
Flux<Pojo> input = Flux.just( |
||||
new Pojo("foo", "bar"), |
||||
new Pojo("foofoo", "barbar"), |
||||
new Pojo("foofoofoo", "barbarbar") |
||||
); |
||||
|
||||
testEncode(input, Pojo.class, step -> step |
||||
.consumeNextWith(expectString("[{\"foo\":\"foo\",\"bar\":\"bar\"}")) |
||||
.consumeNextWith(expectString(",{\"foo\":\"foofoo\",\"bar\":\"barbar\"}")) |
||||
.consumeNextWith(expectString(",{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}")) |
||||
.consumeNextWith(expectString("]")) |
||||
.verifyComplete()); |
||||
} |
||||
|
||||
@Test |
||||
void encodeNonStreamEmpty() { |
||||
testEncode(Flux.empty(), Pojo.class, step -> step |
||||
.consumeNextWith(expectString("[")) |
||||
.consumeNextWith(expectString("]")) |
||||
.verifyComplete()); |
||||
} |
||||
|
||||
@Test |
||||
void encodeNonStreamWithErrorAsFirstSignal() { |
||||
String message = "I'm a teapot"; |
||||
Flux<Object> input = Flux.error(new IllegalStateException(message)); |
||||
|
||||
Flux<DataBuffer> output = this.encoder.encode( |
||||
input, this.bufferFactory, ResolvableType.forClass(Pojo.class), null, null); |
||||
|
||||
StepVerifier.create(output).expectErrorMessage(message).verify(); |
||||
} |
||||
} |
||||
Loading…
Reference in new issue