diff --git a/spring-web/src/main/java/org/springframework/http/MediaType.java b/spring-web/src/main/java/org/springframework/http/MediaType.java index c44140483d6..528e3990be0 100644 --- a/spring-web/src/main/java/org/springframework/http/MediaType.java +++ b/spring-web/src/main/java/org/springframework/http/MediaType.java @@ -137,6 +137,18 @@ public class MediaType extends MimeType implements Serializable { */ public final static String APPLICATION_RSS_XML_VALUE = "application/rss+xml"; + /** + * Public constant media type for {@code application/stream+json}. + * @since 5.0 + */ + public final static MediaType APPLICATION_STREAM_JSON; + + /** + * A String equivalent of {@link MediaType#APPLICATION_STREAM_JSON}. + * @since 5.0 + */ + public final static String APPLICATION_STREAM_JSON_VALUE = "application/stream+json"; + /** * Public constant media type for {@code application/xhtml+xml}. */ @@ -292,6 +304,7 @@ public class MediaType extends MimeType implements Serializable { APPLICATION_PROBLEM_JSON = valueOf(APPLICATION_PROBLEM_JSON_VALUE); APPLICATION_PROBLEM_XML = valueOf(APPLICATION_PROBLEM_XML_VALUE); APPLICATION_RSS_XML = valueOf(APPLICATION_RSS_XML_VALUE); + APPLICATION_STREAM_JSON = valueOf(APPLICATION_STREAM_JSON_VALUE); APPLICATION_XHTML_XML = valueOf(APPLICATION_XHTML_XML_VALUE); APPLICATION_XML = valueOf(APPLICATION_XML_VALUE); IMAGE_GIF = valueOf(IMAGE_GIF_VALUE); diff --git a/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2JsonEncoder.java b/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2JsonEncoder.java index 2c1eb1c30dd..2acede40f13 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2JsonEncoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2JsonEncoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,6 @@ package org.springframework.http.codec.json; import java.io.IOException; import java.io.OutputStream; -import java.nio.ByteBuffer; import java.util.List; import java.util.Map; @@ -32,6 +31,7 @@ import com.fasterxml.jackson.databind.SerializationConfig; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.databind.type.TypeFactory; import org.reactivestreams.Publisher; +import static org.springframework.http.MediaType.APPLICATION_STREAM_JSON; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -56,13 +56,6 @@ import org.springframework.util.MimeType; */ public class Jackson2JsonEncoder extends AbstractJackson2Codec implements Encoder { - private static final ByteBuffer START_ARRAY_BUFFER = ByteBuffer.wrap(new byte[]{'['}); - - private static final ByteBuffer SEPARATOR_BUFFER = ByteBuffer.wrap(new byte[]{','}); - - private static final ByteBuffer END_ARRAY_BUFFER = ByteBuffer.wrap(new byte[]{']'}); - - private final PrettyPrinter ssePrettyPrinter; @@ -100,17 +93,15 @@ public class Jackson2JsonEncoder extends AbstractJackson2Codec implements Encode if (inputStream instanceof Mono) { return Flux.from(inputStream).map(value -> encodeValue(value, bufferFactory, elementType, hints)); } - - Mono startArray = Mono.just(bufferFactory.wrap(START_ARRAY_BUFFER)); - Mono endArray = Mono.just(bufferFactory.wrap(END_ARRAY_BUFFER)); - - Flux array = Flux.from(inputStream) - .concatMap(value -> { - DataBuffer arraySeparator = bufferFactory.wrap(SEPARATOR_BUFFER); - return Flux.just(encodeValue(value, bufferFactory, elementType, hints), arraySeparator); - }); - - return Flux.concat(startArray, array.skipLast(1), endArray); + else if (APPLICATION_STREAM_JSON.isCompatibleWith(mimeType)) { + return Flux.from(inputStream).map(value -> { + DataBuffer buffer = encodeValue(value, bufferFactory, elementType, hints); + buffer.write(new byte[]{'\n'}); + return buffer; + }); + } + ResolvableType listType = ResolvableType.forClassWithGenerics(List.class, elementType); + return Flux.from(inputStream).collectList().map(list -> encodeValue(list, bufferFactory, listType, hints)).flux(); } private DataBuffer encodeValue(Object value, DataBufferFactory bufferFactory, diff --git a/spring-web/src/test/java/org/springframework/http/codec/json/Jackson2JsonEncoderTests.java b/spring-web/src/test/java/org/springframework/http/codec/json/Jackson2JsonEncoderTests.java index abe6c3c12b4..972f92e9ea9 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/json/Jackson2JsonEncoderTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/json/Jackson2JsonEncoderTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.annotation.JsonView; import org.junit.Test; +import static org.springframework.http.MediaType.APPLICATION_STREAM_JSON; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -66,13 +67,7 @@ public class Jackson2JsonEncoderTests extends AbstractDataBufferAllocatingTestCa Flux output = this.encoder.encode(source, this.bufferFactory, type, null, Collections.emptyMap()); StepVerifier.create(output) - .consumeNextWith(stringConsumer("[")) - .consumeNextWith(stringConsumer("{\"foo\":\"foo\",\"bar\":\"bar\"}")) - .consumeNextWith(stringConsumer(",")) - .consumeNextWith(stringConsumer("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}")) - .consumeNextWith(stringConsumer(",")) - .consumeNextWith(stringConsumer("{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}")) - .consumeNextWith(stringConsumer("]")) + .consumeNextWith(stringConsumer("[{\"foo\":\"foo\",\"bar\":\"bar\"},{\"foo\":\"foofoo\",\"bar\":\"barbar\"},{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}]")) .expectComplete() .verify(); } @@ -84,11 +79,25 @@ public class Jackson2JsonEncoderTests extends AbstractDataBufferAllocatingTestCa Flux output = this.encoder.encode(source, this.bufferFactory, type, null, Collections.emptyMap()); StepVerifier.create(output) - .consumeNextWith(stringConsumer("[")) - .consumeNextWith(stringConsumer("{\"type\":\"foo\"}")) - .consumeNextWith(stringConsumer(",")) - .consumeNextWith(stringConsumer("{\"type\":\"bar\"}")) - .consumeNextWith(stringConsumer("]")) + .consumeNextWith(stringConsumer("[{\"type\":\"foo\"},{\"type\":\"bar\"}]")) + .expectComplete() + .verify(); + } + + @Test + public void encodeAsStream() throws Exception { + Flux source = Flux.just( + new Pojo("foo", "bar"), + new Pojo("foofoo", "barbar"), + new Pojo("foofoofoo", "barbarbar") + ); + ResolvableType type = ResolvableType.forClass(Pojo.class); + Flux output = this.encoder.encode(source, this.bufferFactory, type, APPLICATION_STREAM_JSON, Collections.emptyMap()); + + StepVerifier.create(output) + .consumeNextWith(stringConsumer("{\"foo\":\"foo\",\"bar\":\"bar\"}\n")) + .consumeNextWith(stringConsumer("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}\n")) + .consumeNextWith(stringConsumer("{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}\n")) .expectComplete() .verify(); } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/JsonStreamingIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/JsonStreamingIntegrationTests.java new file mode 100644 index 00000000000..f518ac4d2a2 --- /dev/null +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/JsonStreamingIntegrationTests.java @@ -0,0 +1,148 @@ +/* + * Copyright 2002-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.reactive.result.method.annotation; + +import java.time.Duration; + +import org.junit.Before; +import org.junit.Test; +import static org.springframework.http.MediaType.APPLICATION_STREAM_JSON; +import static org.springframework.http.MediaType.APPLICATION_STREAM_JSON_VALUE; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.server.reactive.AbstractHttpHandlerIntegrationTests; +import org.springframework.http.server.reactive.HttpHandler; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.reactive.DispatcherHandler; +import org.springframework.web.reactive.config.EnableWebFlux; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.server.adapter.WebHttpHandlerBuilder; + +/** + * @author Sebastien Deleuze + */ +public class JsonStreamingIntegrationTests extends AbstractHttpHandlerIntegrationTests { + + private AnnotationConfigApplicationContext wac; + + private WebClient webClient; + + + @Override + @Before + public void setup() throws Exception { + super.setup(); + this.webClient = WebClient.create("http://localhost:" + this.port); + } + + + @Override + protected HttpHandler createHttpHandler() { + this.wac = new AnnotationConfigApplicationContext(); + this.wac.register(TestConfiguration.class); + this.wac.refresh(); + + return WebHttpHandlerBuilder.webHandler(new DispatcherHandler(this.wac)).build(); + } + + @Test + public void jsonStreaming() throws Exception { + Flux result = this.webClient.get() + .uri("/stream") + .accept(APPLICATION_STREAM_JSON) + .exchange() + .flatMap(response -> response.bodyToFlux(Person.class)); + + StepVerifier.create(result) + .expectNext(new Person("foo 0")) + .expectNext(new Person("foo 1")) + .verifyComplete(); + } + + @RestController + @SuppressWarnings("unused") + static class JsonStreamingController { + + @RequestMapping(value = "/stream", produces = APPLICATION_STREAM_JSON_VALUE) + Flux person() { + return Flux.interval(Duration.ofMillis(100)).map(l -> new Person("foo " + l)).take(2); + } + + } + + @Configuration + @EnableWebFlux + @SuppressWarnings("unused") + static class TestConfiguration { + + @Bean + public JsonStreamingController jsonStreamingController() { + return new JsonStreamingController(); + } + } + + private static class Person { + + private String name; + + @SuppressWarnings("unused") + public Person() { + } + + public Person(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Person person = (Person) o; + return !(this.name != null ? !this.name.equals(person.name) : person.name != null); + } + + @Override + public int hashCode() { + return this.name != null ? this.name.hashCode() : 0; + } + + @Override + public String toString() { + return "Person{" + + "name='" + name + '\'' + + '}'; + } + } + +}