|
|
|
|
@ -60,6 +60,8 @@ public class ReactiveAdapterRegistry {
@@ -60,6 +60,8 @@ public class ReactiveAdapterRegistry {
|
|
|
|
|
@Nullable |
|
|
|
|
private static volatile ReactiveAdapterRegistry sharedInstance; |
|
|
|
|
|
|
|
|
|
private static final boolean reactiveStreamsPresent; |
|
|
|
|
|
|
|
|
|
private static final boolean reactorPresent; |
|
|
|
|
|
|
|
|
|
private static final boolean rxjava3Present; |
|
|
|
|
@ -70,6 +72,7 @@ public class ReactiveAdapterRegistry {
@@ -70,6 +72,7 @@ public class ReactiveAdapterRegistry {
|
|
|
|
|
|
|
|
|
|
static { |
|
|
|
|
ClassLoader classLoader = ReactiveAdapterRegistry.class.getClassLoader(); |
|
|
|
|
reactiveStreamsPresent = ClassUtils.isPresent("org.reactivestreams.Publisher", classLoader); |
|
|
|
|
reactorPresent = ClassUtils.isPresent("reactor.core.publisher.Flux", classLoader); |
|
|
|
|
rxjava3Present = ClassUtils.isPresent("io.reactivex.rxjava3.core.Flowable", classLoader); |
|
|
|
|
kotlinCoroutinesPresent = ClassUtils.isPresent("kotlinx.coroutines.reactor.MonoKt", classLoader); |
|
|
|
|
@ -85,6 +88,11 @@ public class ReactiveAdapterRegistry {
@@ -85,6 +88,11 @@ public class ReactiveAdapterRegistry {
|
|
|
|
|
*/ |
|
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
|
public ReactiveAdapterRegistry() { |
|
|
|
|
// Defensive guard for the Reactive Streams API itself
|
|
|
|
|
if (!reactiveStreamsPresent) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Reactor
|
|
|
|
|
if (reactorPresent) { |
|
|
|
|
new ReactorRegistrar().registerAdapters(this); |
|
|
|
|
@ -107,21 +115,14 @@ public class ReactiveAdapterRegistry {
@@ -107,21 +115,14 @@ public class ReactiveAdapterRegistry {
|
|
|
|
|
|
|
|
|
|
// Simple Flow.Publisher bridge if Reactor is not present
|
|
|
|
|
if (!reactorPresent) { |
|
|
|
|
registerReactiveType( |
|
|
|
|
this.adapters.add(new ReactiveAdapter( |
|
|
|
|
ReactiveTypeDescriptor.multiValue(Flow.Publisher.class, () -> PublisherToRS.EMPTY_FLOW), |
|
|
|
|
source -> new PublisherToRS<>((Flow.Publisher<Object>) source), |
|
|
|
|
source -> new PublisherToFlow<>((Publisher<Object>) source)); |
|
|
|
|
source -> new PublisherToFlow<>((Publisher<Object>) source))); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Whether the registry has any adapters. |
|
|
|
|
*/ |
|
|
|
|
public boolean hasAdapters() { |
|
|
|
|
return !this.adapters.isEmpty(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Register a reactive type along with functions to adapt to and from a |
|
|
|
|
* Reactive Streams {@link Publisher}. The function arguments assume that |
|
|
|
|
@ -138,6 +139,13 @@ public class ReactiveAdapterRegistry {
@@ -138,6 +139,13 @@ public class ReactiveAdapterRegistry {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Return whether the registry has any adapters. |
|
|
|
|
*/ |
|
|
|
|
public boolean hasAdapters() { |
|
|
|
|
return !this.adapters.isEmpty(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Get the adapter for the given reactive type. |
|
|
|
|
* @return the corresponding adapter, or {@code null} if none available |
|
|
|
|
|