|
|
|
|
@ -1,5 +1,5 @@
@@ -1,5 +1,5 @@
|
|
|
|
|
/* |
|
|
|
|
* Copyright 2002-2020 the original author or authors. |
|
|
|
|
* Copyright 2002-2021 the original author or authors. |
|
|
|
|
* |
|
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
|
|
|
* you may not use this file except in compliance with the License. |
|
|
|
|
@ -47,7 +47,7 @@ import org.springframework.util.ReflectionUtils;
@@ -47,7 +47,7 @@ import org.springframework.util.ReflectionUtils;
|
|
|
|
|
* |
|
|
|
|
* <p>By default, depending on classpath availability, adapters are registered |
|
|
|
|
* for Reactor, RxJava 1, RxJava 2 types, {@link CompletableFuture}, Java 9+ |
|
|
|
|
* {@code Flow.Publisher} and Kotlin Coroutines {@code Deferred} and {@code Flow}. |
|
|
|
|
* {@code Flow.Publisher}, and Kotlin Coroutines {@code Deferred} and {@code Flow}. |
|
|
|
|
* |
|
|
|
|
* @author Rossen Stoyanchev |
|
|
|
|
* @author Sebastien Deleuze |
|
|
|
|
@ -74,6 +74,10 @@ public class ReactiveAdapterRegistry {
@@ -74,6 +74,10 @@ public class ReactiveAdapterRegistry {
|
|
|
|
|
boolean reactorRegistered = false; |
|
|
|
|
if (ClassUtils.isPresent("reactor.core.publisher.Flux", classLoader)) { |
|
|
|
|
new ReactorRegistrar().registerAdapters(this); |
|
|
|
|
if (ClassUtils.isPresent("java.util.concurrent.Flow.Publisher", classLoader)) { |
|
|
|
|
// Java 9+ Flow.Publisher
|
|
|
|
|
new ReactorJdkFlowAdapterRegistrar().registerAdapter(this); |
|
|
|
|
} |
|
|
|
|
reactorRegistered = true; |
|
|
|
|
} |
|
|
|
|
this.reactorPresent = reactorRegistered; |
|
|
|
|
@ -83,20 +87,12 @@ public class ReactiveAdapterRegistry {
@@ -83,20 +87,12 @@ public class ReactiveAdapterRegistry {
|
|
|
|
|
ClassUtils.isPresent("rx.RxReactiveStreams", classLoader)) { |
|
|
|
|
new RxJava1Registrar().registerAdapters(this); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// RxJava2
|
|
|
|
|
if (ClassUtils.isPresent("io.reactivex.Flowable", classLoader)) { |
|
|
|
|
new RxJava2Registrar().registerAdapters(this); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Java 9+ Flow.Publisher
|
|
|
|
|
if (ClassUtils.isPresent("java.util.concurrent.Flow.Publisher", classLoader)) { |
|
|
|
|
new ReactorJdkFlowAdapterRegistrar().registerAdapter(this); |
|
|
|
|
} |
|
|
|
|
// If not present, do nothing for the time being...
|
|
|
|
|
// We can fall back on "reactive-streams-flow-bridge" (once released)
|
|
|
|
|
|
|
|
|
|
// Coroutines
|
|
|
|
|
// Kotlin Coroutines
|
|
|
|
|
if (this.reactorPresent && ClassUtils.isPresent("kotlinx.coroutines.reactor.MonoKt", classLoader)) { |
|
|
|
|
new CoroutinesRegistrar().registerAdapters(this); |
|
|
|
|
} |
|
|
|
|
@ -226,6 +222,35 @@ public class ReactiveAdapterRegistry {
@@ -226,6 +222,35 @@ public class ReactiveAdapterRegistry {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private static class ReactorJdkFlowAdapterRegistrar { |
|
|
|
|
|
|
|
|
|
void registerAdapter(ReactiveAdapterRegistry registry) { |
|
|
|
|
// Reflectively access optional JDK 9+ API (for runtime compatibility with JDK 8)
|
|
|
|
|
|
|
|
|
|
try { |
|
|
|
|
String publisherName = "java.util.concurrent.Flow.Publisher"; |
|
|
|
|
Class<?> publisherClass = ClassUtils.forName(publisherName, getClass().getClassLoader()); |
|
|
|
|
|
|
|
|
|
String adapterName = "reactor.adapter.JdkFlowAdapter"; |
|
|
|
|
Class<?> flowAdapterClass = ClassUtils.forName(adapterName, getClass().getClassLoader()); |
|
|
|
|
|
|
|
|
|
Method toFluxMethod = flowAdapterClass.getMethod("flowPublisherToFlux", publisherClass); |
|
|
|
|
Method toFlowMethod = flowAdapterClass.getMethod("publisherToFlowPublisher", Publisher.class); |
|
|
|
|
Object emptyFlow = ReflectionUtils.invokeMethod(toFlowMethod, null, Flux.empty()); |
|
|
|
|
|
|
|
|
|
registry.registerReactiveType( |
|
|
|
|
ReactiveTypeDescriptor.multiValue(publisherClass, () -> emptyFlow), |
|
|
|
|
source -> (Publisher<?>) ReflectionUtils.invokeMethod(toFluxMethod, null, source), |
|
|
|
|
publisher -> ReflectionUtils.invokeMethod(toFlowMethod, null, publisher) |
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
catch (Throwable ex) { |
|
|
|
|
// Ignore
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private static class RxJava1Registrar { |
|
|
|
|
|
|
|
|
|
void registerAdapters(ReactiveAdapterRegistry registry) { |
|
|
|
|
@ -280,35 +305,6 @@ public class ReactiveAdapterRegistry {
@@ -280,35 +305,6 @@ public class ReactiveAdapterRegistry {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private static class ReactorJdkFlowAdapterRegistrar { |
|
|
|
|
|
|
|
|
|
void registerAdapter(ReactiveAdapterRegistry registry) { |
|
|
|
|
// TODO: remove reflection when build requires JDK 9+
|
|
|
|
|
|
|
|
|
|
try { |
|
|
|
|
String publisherName = "java.util.concurrent.Flow.Publisher"; |
|
|
|
|
Class<?> publisherClass = ClassUtils.forName(publisherName, getClass().getClassLoader()); |
|
|
|
|
|
|
|
|
|
String adapterName = "reactor.adapter.JdkFlowAdapter"; |
|
|
|
|
Class<?> flowAdapterClass = ClassUtils.forName(adapterName, getClass().getClassLoader()); |
|
|
|
|
|
|
|
|
|
Method toFluxMethod = flowAdapterClass.getMethod("flowPublisherToFlux", publisherClass); |
|
|
|
|
Method toFlowMethod = flowAdapterClass.getMethod("publisherToFlowPublisher", Publisher.class); |
|
|
|
|
Object emptyFlow = ReflectionUtils.invokeMethod(toFlowMethod, null, Flux.empty()); |
|
|
|
|
|
|
|
|
|
registry.registerReactiveType( |
|
|
|
|
ReactiveTypeDescriptor.multiValue(publisherClass, () -> emptyFlow), |
|
|
|
|
source -> (Publisher<?>) ReflectionUtils.invokeMethod(toFluxMethod, null, source), |
|
|
|
|
publisher -> ReflectionUtils.invokeMethod(toFlowMethod, null, publisher) |
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
catch (Throwable ex) { |
|
|
|
|
// Ignore
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* ReactiveAdapter variant that wraps adapted Publishers as {@link Flux} or |
|
|
|
|
* {@link Mono} depending on {@link ReactiveTypeDescriptor#isMultiValue()}. |
|
|
|
|
|