From ac86acde53437c0758949a8ec927fc928cdf3db5 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Sun, 27 Nov 2016 21:36:14 -0500 Subject: [PATCH] ReactiveAdapterRegistry detects Reactor The ReactiveAdapterRegistry now detects the presence of Reactor. In practice Reactor is required for the Spring Framework reactive support and it is expected to be present. The registry however is now capable of being neutral if Reactor is not present on the classpath for example where other Spring projects may not have the same assumptions about Reactor's presence. Issue: SPR-14902 --- .../core/ReactiveAdapterRegistry.java | 85 ++++++++++++------- .../core/ReactiveTypeDescriptor.java | 7 +- 2 files changed, 54 insertions(+), 38 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java b/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java index 0b816fd9371..5cf10f1618f 100644 --- a/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java +++ b/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java @@ -36,12 +36,12 @@ import rx.RxReactiveStreams; import org.springframework.util.ClassUtils; /** - * A registry of adapters to adapt to {@link Flux} and {@link Mono}. + * A registry of adapters to adapt a Reactive Streams {@link Publisher} to/from + * various async/reactive types such as {@code CompletableFuture}, RxJava + * {@code Observable}, and others. * *

By default, depending on classpath availability, adapters are registered - * for RxJava 1, RxJava 2 types, and {@link CompletableFuture}. In addition the - * registry contains adapters for Reactor's own Flux and Mono types (no-op) - * along with adaption for any other Reactive Streams {@link Publisher}. + * for Reactor, RxJava 1, RxJava 2 types, and {@link CompletableFuture}. * * @author Rossen Stoyanchev * @author Sebastien Deleuze @@ -49,6 +49,9 @@ import org.springframework.util.ClassUtils; */ public class ReactiveAdapterRegistry { + private static final boolean reactorPresent = + ClassUtils.isPresent("reactor.core.publisher.Flux", ReactiveAdapterRegistry.class.getClassLoader()); + private static final boolean rxJava1Present = ClassUtils.isPresent("rx.Observable", ReactiveAdapterRegistry.class.getClassLoader()); @@ -67,35 +70,14 @@ public class ReactiveAdapterRegistry { */ public ReactiveAdapterRegistry() { - // Flux and Mono ahead of Publisher... - - registerReactiveType( - ReactiveTypeDescriptor.singleOptionalValue(Mono.class, Mono::empty), - source -> (Mono) source, - Mono::from - ); - - registerReactiveType(ReactiveTypeDescriptor.multiValue(Flux.class, Flux::empty), - source -> (Flux) source, - Flux::from); - - registerReactiveType(ReactiveTypeDescriptor.multiValue(Publisher.class, Flux::empty), - source -> (Publisher) source, - source -> source); - - registerReactiveType( - ReactiveTypeDescriptor.singleOptionalValue(CompletableFuture.class, () -> { - CompletableFuture empty = new CompletableFuture<>(); - empty.complete(null); - return empty; - }), - source -> Mono.fromFuture((CompletableFuture) source), - source -> Mono.from(source).toFuture() - ); + if (reactorPresent) { + new ReactorRegistrar().registerAdapters(this); + } if (rxJava1Present && rxJava1Adapter) { new RxJava1Registrar().registerAdapters(this); } + if (rxJava2Present) { new RxJava2Registrar().registerAdapters(this); } @@ -110,7 +92,12 @@ public class ReactiveAdapterRegistry { public void registerReactiveType(ReactiveTypeDescriptor descriptor, Function> toAdapter, Function, Object> fromAdapter) { - this.adapters.add(new ReactorAdapter(descriptor, toAdapter, fromAdapter)); + if (reactorPresent) { + this.adapters.add(new ReactorAdapter(descriptor, toAdapter, fromAdapter)); + } + else { + this.adapters.add(new ReactiveAdapter(descriptor, toAdapter, fromAdapter)); + } } /** @@ -145,9 +132,41 @@ public class ReactiveAdapterRegistry { } + private static class ReactorRegistrar { + + void registerAdapters(ReactiveAdapterRegistry registry) { + + // Flux and Mono ahead of Publisher... + + registry.registerReactiveType( + ReactiveTypeDescriptor.singleOptionalValue(Mono.class, Mono::empty), + source -> (Mono) source, + Mono::from + ); + + registry.registerReactiveType(ReactiveTypeDescriptor.multiValue(Flux.class, Flux::empty), + source -> (Flux) source, + Flux::from); + + registry.registerReactiveType(ReactiveTypeDescriptor.multiValue(Publisher.class, Flux::empty), + source -> (Publisher) source, + source -> source); + + registry.registerReactiveType( + ReactiveTypeDescriptor.singleOptionalValue(CompletableFuture.class, () -> { + CompletableFuture empty = new CompletableFuture<>(); + empty.complete(null); + return empty; + }), + source -> Mono.fromFuture((CompletableFuture) source), + source -> Mono.from(source).toFuture() + ); + } + } + private static class RxJava1Registrar { - public void registerAdapters(ReactiveAdapterRegistry registry) { + void registerAdapters(ReactiveAdapterRegistry registry) { registry.registerReactiveType( ReactiveTypeDescriptor.multiValue(rx.Observable.class, rx.Observable::empty), source -> RxReactiveStreams.toPublisher((rx.Observable) source), @@ -168,7 +187,7 @@ public class ReactiveAdapterRegistry { private static class RxJava2Registrar { - public void registerAdapters(ReactiveAdapterRegistry registry) { + void registerAdapters(ReactiveAdapterRegistry registry) { registry.registerReactiveType( ReactiveTypeDescriptor.multiValue(Flowable.class, Flowable::empty), source -> (Flowable) source, @@ -204,7 +223,7 @@ public class ReactiveAdapterRegistry { */ private static class ReactorAdapter extends ReactiveAdapter { - public ReactorAdapter(ReactiveTypeDescriptor descriptor, + ReactorAdapter(ReactiveTypeDescriptor descriptor, Function> toPublisherFunction, Function, Object> fromPublisherFunction) { diff --git a/spring-core/src/main/java/org/springframework/core/ReactiveTypeDescriptor.java b/spring-core/src/main/java/org/springframework/core/ReactiveTypeDescriptor.java index fd4e84eb9be..587201e06e2 100644 --- a/spring-core/src/main/java/org/springframework/core/ReactiveTypeDescriptor.java +++ b/spring-core/src/main/java/org/springframework/core/ReactiveTypeDescriptor.java @@ -17,9 +17,6 @@ package org.springframework.core; import java.util.function.Supplier; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - import org.springframework.util.Assert; /** @@ -76,9 +73,9 @@ public class ReactiveTypeDescriptor { /** * Return {@code true} if the reactive type can produce more than 1 value - * can be produced and is therefore a good fit to adapt to {@link Flux}. + * can be produced and is therefore a good fit to adapt to {@code Flux}. * A {@code false} return value implies the reactive type can produce 1 - * value at most and is therefore a good fit to adapt to {@link Mono}. + * value at most and is therefore a good fit to adapt to {@code Mono}. */ public boolean isMultiValue() { return this.multiValue;