From ef550c43d61dfd506a15034756e68e0476fa4655 Mon Sep 17 00:00:00 2001 From: Sebastien Deleuze Date: Mon, 13 Feb 2017 13:58:39 +0100 Subject: [PATCH] Flush JSON stream after each element Issue: SPR-15104 --- .../core/codec/AbstractEncoder.java | 7 ++++ .../http/codec/EncoderHttpMessageWriter.java | 5 ++- .../Jackson2ServerHttpMessageWriter.java | 36 ++++++++++++++++--- .../DefaultHandlerStrategiesBuilder.java | 3 +- .../JsonStreamingIntegrationTests.java | 7 ++-- 5 files changed, 48 insertions(+), 10 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/core/codec/AbstractEncoder.java b/spring-core/src/main/java/org/springframework/core/codec/AbstractEncoder.java index 5313c3a3540..d469db5d363 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/AbstractEncoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/AbstractEncoder.java @@ -31,6 +31,13 @@ import org.springframework.util.MimeType; */ public abstract class AbstractEncoder implements Encoder { + /** + * Hint key to use with a {@link FlushingStrategy} value. + */ + public static final String FLUSHING_STRATEGY_HINT = AbstractEncoder.class.getName() + ".flushingStrategy"; + + public enum FlushingStrategy { AUTO, AFTER_EACH_ELEMENT } + private final List encodableMimeTypes; diff --git a/spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java b/spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java index 358944e2a51..06c688ad459 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java +++ b/spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java @@ -21,6 +21,8 @@ import java.util.List; import java.util.Map; import org.reactivestreams.Publisher; +import static org.springframework.core.codec.AbstractEncoder.FLUSHING_STRATEGY_HINT; +import static org.springframework.core.codec.AbstractEncoder.FlushingStrategy.AFTER_EACH_ELEMENT; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -102,7 +104,8 @@ public class EncoderHttpMessageWriter implements HttpMessageWriter { DataBufferFactory bufferFactory = outputMessage.bufferFactory(); Flux body = this.encoder.encode(inputStream, bufferFactory, elementType, mediaType, hints); - return outputMessage.writeWith(body); + return (hints.get(FLUSHING_STRATEGY_HINT) == AFTER_EACH_ELEMENT ? + outputMessage.writeAndFlushWith(body.map(Flux::just)) : outputMessage.writeWith(body)); } /** diff --git a/spring-web/src/main/java/org/springframework/http/codec/Jackson2ServerHttpMessageWriter.java b/spring-web/src/main/java/org/springframework/http/codec/Jackson2ServerHttpMessageWriter.java index 7a8ee553724..a00d358a946 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/Jackson2ServerHttpMessageWriter.java +++ b/spring-web/src/main/java/org/springframework/http/codec/Jackson2ServerHttpMessageWriter.java @@ -21,19 +21,24 @@ import java.util.HashMap; import java.util.Map; import com.fasterxml.jackson.annotation.JsonView; +import org.reactivestreams.Publisher; +import static org.springframework.core.codec.AbstractEncoder.FLUSHING_STRATEGY_HINT; +import static org.springframework.core.codec.AbstractEncoder.FlushingStrategy.AFTER_EACH_ELEMENT; +import reactor.core.publisher.Mono; import org.springframework.core.MethodParameter; import org.springframework.core.ResolvableType; +import org.springframework.core.codec.AbstractEncoder; import org.springframework.core.codec.Encoder; import org.springframework.http.MediaType; +import org.springframework.http.ReactiveHttpOutputMessage; import org.springframework.http.codec.json.AbstractJackson2Codec; import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpResponse; /** - * {@link ServerHttpMessageWriter} that resolves those annotation or request based Jackson 2 hints: - *
    - *
  • {@code @JsonView} annotated handler method
  • - *
+ * Jackson {@link ServerHttpMessageWriter} that resolves {@code @JsonView} annotated handler + * method and deals with {@link AbstractEncoder#FLUSHING_STRATEGY_HINT}. * * @author Sebastien Deleuze * @since 5.0 @@ -72,4 +77,27 @@ public class Jackson2ServerHttpMessageWriter extends AbstractServerHttpMessageWr return hints; } + @Override + public Mono write(Publisher inputStream, ResolvableType elementType, MediaType mediaType, + ReactiveHttpOutputMessage outputMessage, Map hints) { + + if ((mediaType != null) && mediaType.isCompatibleWith(MediaType.APPLICATION_STREAM_JSON)) { + Map hintsWithFlush = new HashMap<>(hints); + hintsWithFlush.put(FLUSHING_STRATEGY_HINT, AFTER_EACH_ELEMENT); + return super.write(inputStream, elementType, mediaType, outputMessage, hintsWithFlush); + } + return super.write(inputStream, elementType, mediaType, outputMessage, hints); + } + + @Override + public Mono write(Publisher inputStream, ResolvableType streamType, ResolvableType elementType, + MediaType mediaType, ServerHttpRequest request, ServerHttpResponse response, Map hints) { + + if ((mediaType != null) && mediaType.isCompatibleWith(MediaType.APPLICATION_STREAM_JSON)) { + Map hintsWithFlush = new HashMap<>(hints); + hintsWithFlush.put(FLUSHING_STRATEGY_HINT, AFTER_EACH_ELEMENT); + return super.write(inputStream, streamType, elementType, mediaType, request, response, hintsWithFlush); + } + return super.write(inputStream, streamType, elementType, mediaType, request, response, hints); + } } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/DefaultHandlerStrategiesBuilder.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/DefaultHandlerStrategiesBuilder.java index e25329994ce..3e74d493b0e 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/DefaultHandlerStrategiesBuilder.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/server/DefaultHandlerStrategiesBuilder.java @@ -37,6 +37,7 @@ import org.springframework.http.codec.EncoderHttpMessageWriter; import org.springframework.http.codec.FormHttpMessageReader; import org.springframework.http.codec.HttpMessageReader; import org.springframework.http.codec.HttpMessageWriter; +import org.springframework.http.codec.Jackson2ServerHttpMessageWriter; import org.springframework.http.codec.ResourceHttpMessageWriter; import org.springframework.http.codec.ServerSentEventHttpMessageWriter; import org.springframework.http.codec.json.Jackson2JsonDecoder; @@ -97,7 +98,7 @@ class DefaultHandlerStrategiesBuilder implements HandlerStrategies.Builder { if (jackson2Present) { messageReader(new DecoderHttpMessageReader<>(new Jackson2JsonDecoder())); Jackson2JsonEncoder jsonEncoder = new Jackson2JsonEncoder(); - messageWriter(new EncoderHttpMessageWriter<>(jsonEncoder)); + messageWriter(new Jackson2ServerHttpMessageWriter(jsonEncoder)); messageWriter( new ServerSentEventHttpMessageWriter(Collections.singletonList(jsonEncoder))); } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/JsonStreamingIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/JsonStreamingIntegrationTests.java index 8e5aa4ad746..3f559496e61 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/JsonStreamingIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/JsonStreamingIntegrationTests.java @@ -19,7 +19,6 @@ package org.springframework.web.reactive.result.method.annotation; import java.time.Duration; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import reactor.core.publisher.Flux; import reactor.test.StepVerifier; @@ -42,7 +41,6 @@ import static org.springframework.http.MediaType.APPLICATION_STREAM_JSON_VALUE; /** * @author Sebastien Deleuze */ -@Ignore public class JsonStreamingIntegrationTests extends AbstractHttpHandlerIntegrationTests { private AnnotationConfigApplicationContext wac; @@ -78,7 +76,8 @@ public class JsonStreamingIntegrationTests extends AbstractHttpHandlerIntegratio StepVerifier.create(result) .expectNext(new Person("foo 0")) .expectNext(new Person("foo 1")) - .verifyComplete(); + .thenCancel() + .verify(); } @RestController @@ -87,7 +86,7 @@ public class JsonStreamingIntegrationTests extends AbstractHttpHandlerIntegratio @RequestMapping(value = "/stream", produces = APPLICATION_STREAM_JSON_VALUE) Flux person() { - return Flux.interval(Duration.ofMillis(100)).map(l -> new Person("foo " + l)).take(2); + return Flux.interval(Duration.ofMillis(100)).map(l -> new Person("foo " + l)); } }