|
|
|
@ -1,5 +1,5 @@ |
|
|
|
/* |
|
|
|
/* |
|
|
|
* Copyright 2002-2017 the original author or authors. |
|
|
|
* Copyright 2002-2018 the original author or authors. |
|
|
|
* |
|
|
|
* |
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
|
|
* you may not use this file except in compliance with the License. |
|
|
|
* you may not use this file except in compliance with the License. |
|
|
|
@ -86,9 +86,7 @@ class ReactiveTypeHandler { |
|
|
|
this(ReactiveAdapterRegistry.getSharedInstance(), new SyncTaskExecutor(), new ContentNegotiationManager()); |
|
|
|
this(ReactiveAdapterRegistry.getSharedInstance(), new SyncTaskExecutor(), new ContentNegotiationManager()); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
ReactiveTypeHandler(ReactiveAdapterRegistry registry, TaskExecutor executor, |
|
|
|
ReactiveTypeHandler(ReactiveAdapterRegistry registry, TaskExecutor executor, ContentNegotiationManager manager) { |
|
|
|
ContentNegotiationManager manager) { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Assert.notNull(registry, "ReactiveAdapterRegistry is required"); |
|
|
|
Assert.notNull(registry, "ReactiveAdapterRegistry is required"); |
|
|
|
Assert.notNull(executor, "TaskExecutor is required"); |
|
|
|
Assert.notNull(executor, "TaskExecutor is required"); |
|
|
|
Assert.notNull(manager, "ContentNegotiationManager is required"); |
|
|
|
Assert.notNull(manager, "ContentNegotiationManager is required"); |
|
|
|
@ -120,7 +118,7 @@ class ReactiveTypeHandler { |
|
|
|
ReactiveAdapter adapter = this.reactiveRegistry.getAdapter(returnValue.getClass()); |
|
|
|
ReactiveAdapter adapter = this.reactiveRegistry.getAdapter(returnValue.getClass()); |
|
|
|
Assert.state(adapter != null, "Unexpected return value: " + returnValue); |
|
|
|
Assert.state(adapter != null, "Unexpected return value: " + returnValue); |
|
|
|
|
|
|
|
|
|
|
|
ResolvableType elementType = ResolvableType.forMethodParameter(returnType).getGeneric(0); |
|
|
|
ResolvableType elementType = ResolvableType.forMethodParameter(returnType).getGeneric(); |
|
|
|
Class<?> elementClass = elementType.resolve(Object.class); |
|
|
|
Class<?> elementClass = elementType.resolve(Object.class); |
|
|
|
|
|
|
|
|
|
|
|
Collection<MediaType> mediaTypes = getMediaTypes(request); |
|
|
|
Collection<MediaType> mediaTypes = getMediaTypes(request); |
|
|
|
@ -249,7 +247,7 @@ class ReactiveTypeHandler { |
|
|
|
schedule(); |
|
|
|
schedule(); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private void schedule() { |
|
|
|
private void schedule() { |
|
|
|
try { |
|
|
|
try { |
|
|
|
this.taskExecutor.execute(this); |
|
|
|
this.taskExecutor.execute(this); |
|
|
|
@ -264,7 +262,7 @@ class ReactiveTypeHandler { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public void run() { |
|
|
|
public void run() { |
|
|
|
if (this.done) { |
|
|
|
if (this.done) { |
|
|
|
@ -310,7 +308,7 @@ class ReactiveTypeHandler { |
|
|
|
} |
|
|
|
} |
|
|
|
return; |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (this.executing.decrementAndGet() != 0) { |
|
|
|
if (this.executing.decrementAndGet() != 0) { |
|
|
|
schedule(); |
|
|
|
schedule(); |
|
|
|
} |
|
|
|
} |
|
|
|
@ -324,7 +322,6 @@ class ReactiveTypeHandler { |
|
|
|
this.subscription.cancel(); |
|
|
|
this.subscription.cancel(); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -407,16 +404,12 @@ class ReactiveTypeHandler { |
|
|
|
|
|
|
|
|
|
|
|
private final CollectedValuesList values; |
|
|
|
private final CollectedValuesList values; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
DeferredResultSubscriber(DeferredResult<Object> result, ReactiveAdapter adapter, ResolvableType elementType) { |
|
|
|
DeferredResultSubscriber(DeferredResult<Object> result, ReactiveAdapter adapter, |
|
|
|
|
|
|
|
ResolvableType elementType) { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
this.result = result; |
|
|
|
this.result = result; |
|
|
|
this.multiValueSource = adapter.isMultiValue(); |
|
|
|
this.multiValueSource = adapter.isMultiValue(); |
|
|
|
this.values = new CollectedValuesList(elementType); |
|
|
|
this.values = new CollectedValuesList(elementType); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public void connect(ReactiveAdapter adapter, Object returnValue) { |
|
|
|
public void connect(ReactiveAdapter adapter, Object returnValue) { |
|
|
|
Publisher<Object> publisher = adapter.toPublisher(returnValue); |
|
|
|
Publisher<Object> publisher = adapter.toPublisher(returnValue); |
|
|
|
publisher.subscribe(this); |
|
|
|
publisher.subscribe(this); |
|
|
|
@ -452,6 +445,10 @@ class ReactiveTypeHandler { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
|
|
* List of collect values where all elements are a specified type. |
|
|
|
|
|
|
|
*/ |
|
|
|
@SuppressWarnings("serial") |
|
|
|
@SuppressWarnings("serial") |
|
|
|
static class CollectedValuesList extends ArrayList<Object> { |
|
|
|
static class CollectedValuesList extends ArrayList<Object> { |
|
|
|
|
|
|
|
|
|
|
|
@ -466,4 +463,4 @@ class ReactiveTypeHandler { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|