|
|
|
|
@ -81,6 +81,8 @@ class ReactiveTypeHandler {
@@ -81,6 +81,8 @@ class ReactiveTypeHandler {
|
|
|
|
|
|
|
|
|
|
private static final MediaType WILDCARD_SUBTYPE_SUFFIXED_BY_NDJSON = MediaType.valueOf("application/*+x-ndjson"); |
|
|
|
|
|
|
|
|
|
private static final MediaType APPLICATION_GRPC = MediaType.valueOf("application/grpc"); |
|
|
|
|
|
|
|
|
|
private static final boolean isContextPropagationPresent = ClassUtils.isPresent( |
|
|
|
|
"io.micrometer.context.ContextSnapshot", ReactiveTypeHandler.class.getClassLoader()); |
|
|
|
|
|
|
|
|
|
@ -165,9 +167,14 @@ class ReactiveTypeHandler {
@@ -165,9 +167,14 @@ class ReactiveTypeHandler {
|
|
|
|
|
new SseEmitterSubscriber(emitter, this.taskExecutor, taskDecorator).connect(adapter, returnValue); |
|
|
|
|
return emitter; |
|
|
|
|
} |
|
|
|
|
if (mediaTypes.stream().anyMatch(APPLICATION_GRPC::includes)) { |
|
|
|
|
ResponseBodyEmitter emitter = getEmitter(mediaType.orElse(APPLICATION_GRPC)); |
|
|
|
|
new BasicEmitterSubscriber(emitter, APPLICATION_GRPC, this.taskExecutor).connect(adapter, returnValue); |
|
|
|
|
return emitter; |
|
|
|
|
} |
|
|
|
|
if (CharSequence.class.isAssignableFrom(elementClass)) { |
|
|
|
|
ResponseBodyEmitter emitter = getEmitter(mediaType.orElse(MediaType.TEXT_PLAIN)); |
|
|
|
|
new TextEmitterSubscriber(emitter, this.taskExecutor).connect(adapter, returnValue); |
|
|
|
|
new BasicEmitterSubscriber(emitter, MediaType.TEXT_PLAIN, this.taskExecutor).connect(adapter, returnValue); |
|
|
|
|
return emitter; |
|
|
|
|
} |
|
|
|
|
MediaType streamingResponseType = findConcreteJsonStreamMediaType(mediaTypes); |
|
|
|
|
@ -475,15 +482,18 @@ class ReactiveTypeHandler {
@@ -475,15 +482,18 @@ class ReactiveTypeHandler {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private static class TextEmitterSubscriber extends AbstractEmitterSubscriber { |
|
|
|
|
private static class BasicEmitterSubscriber extends AbstractEmitterSubscriber { |
|
|
|
|
|
|
|
|
|
private final MediaType mediaType; |
|
|
|
|
|
|
|
|
|
TextEmitterSubscriber(ResponseBodyEmitter emitter, TaskExecutor executor) { |
|
|
|
|
BasicEmitterSubscriber(ResponseBodyEmitter emitter, MediaType mediaType, TaskExecutor executor) { |
|
|
|
|
super(emitter, executor, null); |
|
|
|
|
this.mediaType = mediaType; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
protected void send(Object element) throws IOException { |
|
|
|
|
getEmitter().send(element, MediaType.TEXT_PLAIN); |
|
|
|
|
getEmitter().send(element, this.mediaType); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|