Browse Source

Polishing in ResponseBodyEmitterReturnValueHandler

See gh-33194
pull/33336/head
rstoyanchev 2 years ago
parent
commit
184bb7c23c
  1. 4
      spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandler.java
  2. 77
      spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandler.java
  3. 8
      spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java
  4. 17
      spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandlerTests.java

4
spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandler.java

@ -172,7 +172,7 @@ class ReactiveTypeHandler { @@ -172,7 +172,7 @@ class ReactiveTypeHandler {
new TextEmitterSubscriber(emitter, this.taskExecutor).connect(adapter, returnValue);
return emitter;
}
MediaType streamingResponseType = findConcreteStreamingMediaType(mediaTypes);
MediaType streamingResponseType = findConcreteJsonStreamMediaType(mediaTypes);
if (streamingResponseType != null) {
ResponseBodyEmitter emitter = getEmitter(streamingResponseType);
new JsonEmitterSubscriber(emitter, this.taskExecutor).connect(adapter, returnValue);
@ -203,7 +203,7 @@ class ReactiveTypeHandler { @@ -203,7 +203,7 @@ class ReactiveTypeHandler {
*/
@SuppressWarnings("deprecation")
@Nullable
static MediaType findConcreteStreamingMediaType(Collection<MediaType> acceptedMediaTypes) {
static MediaType findConcreteJsonStreamMediaType(Collection<MediaType> acceptedMediaTypes) {
for (MediaType acceptedType : acceptedMediaTypes) {
if (WILDCARD_SUBTYPE_SUFFIXED_BY_NDJSON.includes(acceptedType)) {
if (acceptedType.isConcrete()) {

77
spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandler.java

@ -49,12 +49,25 @@ import org.springframework.web.method.support.HandlerMethodReturnValueHandler; @@ -49,12 +49,25 @@ import org.springframework.web.method.support.HandlerMethodReturnValueHandler;
import org.springframework.web.method.support.ModelAndViewContainer;
/**
* Handler for return values of type {@link ResponseBodyEmitter} and subclasses
* such as {@link SseEmitter} including the same types wrapped with
* {@link ResponseEntity}.
* Handler for return values of type:
* <ul>
* <li>{@link ResponseBodyEmitter} including sub-class {@link SseEmitter} and others.
* <li>Reactive return types known to {@link ReactiveAdapterRegistry}.
* <li>Any of the above wrapped with {@link ResponseEntity}.
* </ul>
*
* <p>As of 5.0 also supports reactive return value types for any reactive
* library with registered adapters in {@link ReactiveAdapterRegistry}.
* <p>Single-value reactive types are adapted to {@link DeferredResult}.
* Multi-value reactive types are adapted to {@link ResponseBodyEmitter} or
* {@link SseEmitter} as follows:
* <ul>
* <li>SSE stream, if the element type is
* {@link org.springframework.http.codec.ServerSentEvent} or if negotiated by
* content type.
* <li>Text stream for a {@link org.reactivestreams.Publisher} of
* {@link CharSequence}.
* <li>A JSON stream if negotiated by content type to
* {@link MediaType#APPLICATION_NDJSON}.
* </ul>
*
* @author Rossen Stoyanchev
* @since 4.2
@ -153,7 +166,7 @@ public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodRetur @@ -153,7 +166,7 @@ public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodRetur
else {
emitter = this.reactiveHandler.handleValue(returnValue, returnType, mavContainer, webRequest);
if (emitter == null) {
// Not streaming: write headers without committing response..
// We're not streaming; write headers without committing response
outputMessage.getHeaders().forEach((headerName, headerValues) -> {
for (String headerValue : headerValues) {
response.addHeader(headerName, headerValue);
@ -164,18 +177,17 @@ public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodRetur @@ -164,18 +177,17 @@ public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodRetur
}
emitter.extendResponse(outputMessage);
// At this point we know we're streaming..
// We are streaming
ShallowEtagHeaderFilter.disableContentCaching(request);
// Wrap the response to ignore further header changes
// Headers will be flushed at the first write
// Ignore further header changes; response is committed after first event
outputMessage = new StreamingServletServerHttpResponse(outputMessage);
HttpMessageConvertingHandler handler;
try {
DeferredResult<?> deferredResult = new DeferredResult<>(emitter.getTimeout());
WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(deferredResult, mavContainer);
handler = new HttpMessageConvertingHandler(outputMessage, deferredResult);
DeferredResult<?> result = new DeferredResult<>(emitter.getTimeout());
WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(result, mavContainer);
handler = new HttpMessageConvertingHandler(outputMessage, result);
}
catch (Throwable ex) {
emitter.initializeWithError(ex);
@ -186,6 +198,26 @@ public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodRetur @@ -186,6 +198,26 @@ public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodRetur
}
/**
* Wrap to silently ignore header changes HttpMessageConverter's that would
* otherwise cause HttpHeaders to raise exceptions.
*/
private static class StreamingServletServerHttpResponse extends DelegatingServerHttpResponse {
private final HttpHeaders mutableHeaders = new HttpHeaders();
public StreamingServletServerHttpResponse(ServerHttpResponse delegate) {
super(delegate);
this.mutableHeaders.putAll(delegate.getHeaders());
}
@Override
public HttpHeaders getHeaders() {
return this.mutableHeaders;
}
}
/**
* ResponseBodyEmitter.Handler that writes with HttpMessageConverter's.
*/
@ -257,25 +289,4 @@ public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodRetur @@ -257,25 +289,4 @@ public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodRetur
}
}
/**
* Wrap to silently ignore header changes HttpMessageConverter's that would
* otherwise cause HttpHeaders to raise exceptions.
*/
private static class StreamingServletServerHttpResponse extends DelegatingServerHttpResponse {
private final HttpHeaders mutableHeaders = new HttpHeaders();
public StreamingServletServerHttpResponse(ServerHttpResponse delegate) {
super(delegate);
this.mutableHeaders.putAll(delegate.getHeaders());
}
@Override
public HttpHeaders getHeaders() {
return this.mutableHeaders;
}
}
}

8
spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java

@ -130,7 +130,7 @@ class ReactiveTypeHandlerTests { @@ -130,7 +130,7 @@ class ReactiveTypeHandlerTests {
MediaType.parseMediaType("application/*+x-ndjson"),
MediaType.parseMediaType("application/vnd.myapp.v1+x-ndjson"));
assertThat(ReactiveTypeHandler.findConcreteStreamingMediaType(accept))
assertThat(ReactiveTypeHandler.findConcreteJsonStreamMediaType(accept))
.isEqualTo(MediaType.APPLICATION_NDJSON);
}
@ -142,7 +142,7 @@ class ReactiveTypeHandlerTests { @@ -142,7 +142,7 @@ class ReactiveTypeHandlerTests {
MediaType.parseMediaType("application/*+x-ndjson"),
MediaType.APPLICATION_NDJSON);
assertThat(ReactiveTypeHandler.findConcreteStreamingMediaType(accept))
assertThat(ReactiveTypeHandler.findConcreteJsonStreamMediaType(accept))
.hasToString("application/vnd.myapp.v1+x-ndjson");
}
@ -154,7 +154,7 @@ class ReactiveTypeHandlerTests { @@ -154,7 +154,7 @@ class ReactiveTypeHandlerTests {
MediaType.parseMediaType("application/*+x-ndjson"),
MediaType.parseMediaType("application/vnd.myapp.v1+x-ndjson"));
assertThat(ReactiveTypeHandler.findConcreteStreamingMediaType(accept))
assertThat(ReactiveTypeHandler.findConcreteJsonStreamMediaType(accept))
.isEqualTo(MediaType.APPLICATION_NDJSON);
}
@ -167,7 +167,7 @@ class ReactiveTypeHandlerTests { @@ -167,7 +167,7 @@ class ReactiveTypeHandlerTests {
MediaType.parseMediaType("application/*+x-ndjson"),
MediaType.parseMediaType("application/vnd.myapp.v1+x-ndjson"));
assertThat(ReactiveTypeHandler.findConcreteStreamingMediaType(accept))
assertThat(ReactiveTypeHandler.findConcreteJsonStreamMediaType(accept))
.isEqualTo(MediaType.APPLICATION_STREAM_JSON);
}

17
spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandlerTests.java

@ -62,14 +62,14 @@ import static org.springframework.web.testfixture.method.ResolvableMethod.on; @@ -62,14 +62,14 @@ import static org.springframework.web.testfixture.method.ResolvableMethod.on;
*/
class ResponseBodyEmitterReturnValueHandlerTests {
private ResponseBodyEmitterReturnValueHandler handler =
private final ResponseBodyEmitterReturnValueHandler handler =
new ResponseBodyEmitterReturnValueHandler(List.of(new MappingJackson2HttpMessageConverter()));
private MockHttpServletRequest request = new MockHttpServletRequest();
private final MockHttpServletRequest request = new MockHttpServletRequest();
private MockHttpServletResponse response = new MockHttpServletResponse();
private final MockHttpServletResponse response = new MockHttpServletResponse();
private NativeWebRequest webRequest = new ServletWebRequest(this.request, this.response);
private final NativeWebRequest webRequest = new ServletWebRequest(this.request, this.response);
private final ModelAndViewContainer mavContainer = new ModelAndViewContainer();
@ -93,12 +93,15 @@ class ResponseBodyEmitterReturnValueHandlerTests { @@ -93,12 +93,15 @@ class ResponseBodyEmitterReturnValueHandlerTests {
assertThat(this.handler.supportsReturnType(
on(TestController.class).resolveReturnType(ResponseEntity.class, ResponseBodyEmitter.class))).isTrue();
ResolvableType stringFlux = forClassWithGenerics(Flux.class, String.class);
assertThat(this.handler.supportsReturnType(
on(TestController.class).resolveReturnType(Flux.class, String.class))).isTrue();
on(TestController.class).resolveReturnType(stringFlux))).isTrue();
ResolvableType responseEntityStringFlux = forClassWithGenerics(ResponseEntity.class, stringFlux);
assertThat(this.handler.supportsReturnType(
on(TestController.class).resolveReturnType(forClassWithGenerics(ResponseEntity.class,
forClassWithGenerics(Flux.class, String.class))))).isTrue();
on(TestController.class).resolveReturnType(responseEntityStringFlux))).isTrue();
}
@Test

Loading…
Cancel
Save