diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandler.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandler.java index 55276ce5eff..7a4bf8dff83 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandler.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandler.java @@ -19,7 +19,6 @@ package org.springframework.web.servlet.mvc.method.annotation; import java.io.IOException; import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Optional; @@ -78,10 +77,7 @@ class ReactiveTypeHandler { private static final long STREAMING_TIMEOUT_VALUE = -1; - @SuppressWarnings("deprecation") - private static final List JSON_STREAMING_MEDIA_TYPES = - Arrays.asList(MediaType.APPLICATION_NDJSON, MediaType.APPLICATION_STREAM_JSON, - MediaType.valueOf("application/*+x-ndjson")); + private static final MediaType WILDCARD_SUBTYPE_SUFFIXED_BY_NDJSON = MediaType.valueOf("application/*+x-ndjson"); private static final boolean isContextPropagationPresent = ClassUtils.isPresent( "io.micrometer.context.ContextSnapshot", ReactiveTypeHandler.class.getClassLoader()); @@ -162,15 +158,12 @@ class ReactiveTypeHandler { new TextEmitterSubscriber(emitter, this.taskExecutor).connect(adapter, returnValue); return emitter; } - for (MediaType type : mediaTypes) { - for (MediaType streamingType : JSON_STREAMING_MEDIA_TYPES) { - if (streamingType.includes(type)) { - logExecutorWarning(returnType); - ResponseBodyEmitter emitter = getEmitter(type); - new JsonEmitterSubscriber(emitter, this.taskExecutor).connect(adapter, returnValue); - return emitter; - } - } + MediaType streamingResponseType = findConcreteStreamingMediaType(mediaTypes); + if (streamingResponseType != null) { + logExecutorWarning(returnType); + ResponseBodyEmitter emitter = getEmitter(streamingResponseType); + new JsonEmitterSubscriber(emitter, this.taskExecutor).connect(adapter, returnValue); + return emitter; } } @@ -182,6 +175,45 @@ class ReactiveTypeHandler { return null; } + /** + * Attempts to find a concrete {@code MediaType} that can be streamed (as json separated + * by newlines in the response body). This method considers two concrete types + * {@code APPLICATION_NDJSON} and {@code APPLICATION_STREAM_JSON}) as well as any + * subtype of application that has the {@code +x-ndjson} suffix. In the later case, + * the media type MUST be concrete for it to be considered. + * + *

For example {@code application/vnd.myapp+x-ndjson} is considered a streaming type + * while {@code application/*+x-ndjson} isn't. + * @param acceptedMediaTypes the collection of acceptable media types in the request + * @return the concrete streaming {@code MediaType} if one could be found or {@code null} + * if none could be found + */ + @SuppressWarnings("deprecation") + @Nullable + static MediaType findConcreteStreamingMediaType(Collection acceptedMediaTypes) { + for (MediaType acceptedType : acceptedMediaTypes) { + if (WILDCARD_SUBTYPE_SUFFIXED_BY_NDJSON.includes(acceptedType)) { + if (acceptedType.isConcrete()) { + return acceptedType; + } + else { + // if not concrete, it must be application/*+x-ndjson: we assume + // that the requester is only interested in the ndjson nature of + // the underlying representation and can parse any example of that + // underlying representation, so we use the ndjson media type. + return MediaType.APPLICATION_NDJSON; + } + } + else if (MediaType.APPLICATION_NDJSON.includes(acceptedType)) { + return MediaType.APPLICATION_NDJSON; + } + else if (MediaType.APPLICATION_STREAM_JSON.includes(acceptedType)) { + return MediaType.APPLICATION_STREAM_JSON; + } + } + return null; // not a concrete streaming type + } + @SuppressWarnings("unchecked") private Collection getMediaTypes(NativeWebRequest request) throws HttpMediaTypeNotAcceptableException { diff --git a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java index 28bccb9aa9d..301d175fe96 100644 --- a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java +++ b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java @@ -104,6 +104,54 @@ public class ReactiveTypeHandlerTests { assertThat(this.handler.isReactiveType(String.class)).isFalse(); } + @Test + void findsConcreteStreamingMediaType() { + final List accept = List.of( + MediaType.ALL, + MediaType.parseMediaType("application/*+x-ndjson"), + MediaType.parseMediaType("application/vnd.myapp.v1+x-ndjson")); + + assertThat(ReactiveTypeHandler.findConcreteStreamingMediaType(accept)) + .isEqualTo(MediaType.APPLICATION_NDJSON); + } + + @Test + void findsConcreteStreamingMediaType_vendorFirst() { + final List accept = List.of( + MediaType.ALL, + MediaType.parseMediaType("application/vnd.myapp.v1+x-ndjson"), + MediaType.parseMediaType("application/*+x-ndjson"), + MediaType.APPLICATION_NDJSON); + + assertThat(ReactiveTypeHandler.findConcreteStreamingMediaType(accept)) + .hasToString("application/vnd.myapp.v1+x-ndjson"); + } + + @Test + void findsConcreteStreamingMediaType_plainNdJsonFirst() { + final List accept = List.of( + MediaType.ALL, + MediaType.APPLICATION_NDJSON, + MediaType.parseMediaType("application/*+x-ndjson"), + MediaType.parseMediaType("application/vnd.myapp.v1+x-ndjson")); + + assertThat(ReactiveTypeHandler.findConcreteStreamingMediaType(accept)) + .isEqualTo(MediaType.APPLICATION_NDJSON); + } + + @SuppressWarnings("deprecation") + @Test + void findsConcreteStreamingMediaType_plainStreamingJsonFirst() { + final List accept = List.of( + MediaType.ALL, + MediaType.APPLICATION_STREAM_JSON, + MediaType.parseMediaType("application/*+x-ndjson"), + MediaType.parseMediaType("application/vnd.myapp.v1+x-ndjson")); + + assertThat(ReactiveTypeHandler.findConcreteStreamingMediaType(accept)) + .isEqualTo(MediaType.APPLICATION_STREAM_JSON); + } + @Test public void deferredResultSubscriberWithOneValue() throws Exception { @@ -251,7 +299,7 @@ public class ReactiveTypeHandlerTests { sink.tryEmitNext(bar2); sink.tryEmitComplete(); - assertThat(message.getHeaders().getContentType().toString()).isEqualTo("application/x-ndjson"); + assertThat(message.getHeaders().getContentType()).hasToString("application/x-ndjson"); assertThat(emitterHandler.getValues()).isEqualTo(Arrays.asList(bar1, "\n", bar2, "\n")); } @@ -277,7 +325,33 @@ public class ReactiveTypeHandlerTests { sink.tryEmitNext(bar2); sink.tryEmitComplete(); - assertThat(message.getHeaders().getContentType().toString()).isEqualTo("application/vnd.myapp.v1+x-ndjson"); + assertThat(message.getHeaders().getContentType()).hasToString("application/vnd.myapp.v1+x-ndjson"); + assertThat(emitterHandler.getValues()).isEqualTo(Arrays.asList(bar1, "\n", bar2, "\n")); + } + + @Test + public void writeStreamJsonWithWildcardSubtype() throws Exception { + this.servletRequest.addHeader("Accept", "application/*+x-ndjson"); + + Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer(); + ResponseBodyEmitter emitter = handleValue(sink.asFlux(), Flux.class, forClass(Bar.class)); + + assertThat(emitter).as("emitter").isNotNull(); + + EmitterHandler emitterHandler = new EmitterHandler(); + emitter.initialize(emitterHandler); + + ServletServerHttpResponse message = new ServletServerHttpResponse(this.servletResponse); + emitter.extendResponse(message); + + Bar bar1 = new Bar("foo"); + Bar bar2 = new Bar("bar"); + + sink.tryEmitNext(bar1); + sink.tryEmitNext(bar2); + sink.tryEmitComplete(); + + assertThat(message.getHeaders().getContentType()).hasToString("application/x-ndjson"); assertThat(emitterHandler.getValues()).isEqualTo(Arrays.asList(bar1, "\n", bar2, "\n")); }