@ -36,12 +36,12 @@ import rx.RxReactiveStreams;
@@ -36,12 +36,12 @@ import rx.RxReactiveStreams;
import org.springframework.util.ClassUtils ;
/ * *
* A registry of adapters to adapt to { @link Flux } and { @link Mono } .
* A registry of adapters to adapt a Reactive Streams { @link Publisher } to / from
* various async / reactive types such as { @code CompletableFuture } , RxJava
* { @code Observable } , and others .
*
* < p > By default , depending on classpath availability , adapters are registered
* for RxJava 1 , RxJava 2 types , and { @link CompletableFuture } . In addition the
* registry contains adapters for Reactor ' s own Flux and Mono types ( no - op )
* along with adaption for any other Reactive Streams { @link Publisher } .
* for Reactor , RxJava 1 , RxJava 2 types , and { @link CompletableFuture } .
*
* @author Rossen Stoyanchev
* @author Sebastien Deleuze
@ -49,6 +49,9 @@ import org.springframework.util.ClassUtils;
@@ -49,6 +49,9 @@ import org.springframework.util.ClassUtils;
* /
public class ReactiveAdapterRegistry {
private static final boolean reactorPresent =
ClassUtils . isPresent ( "reactor.core.publisher.Flux" , ReactiveAdapterRegistry . class . getClassLoader ( ) ) ;
private static final boolean rxJava1Present =
ClassUtils . isPresent ( "rx.Observable" , ReactiveAdapterRegistry . class . getClassLoader ( ) ) ;
@ -67,35 +70,14 @@ public class ReactiveAdapterRegistry {
@@ -67,35 +70,14 @@ public class ReactiveAdapterRegistry {
* /
public ReactiveAdapterRegistry ( ) {
// Flux and Mono ahead of Publisher...
registerReactiveType (
ReactiveTypeDescriptor . singleOptionalValue ( Mono . class , Mono : : empty ) ,
source - > ( Mono < ? > ) source ,
Mono : : from
) ;
registerReactiveType ( ReactiveTypeDescriptor . multiValue ( Flux . class , Flux : : empty ) ,
source - > ( Flux < ? > ) source ,
Flux : : from ) ;
registerReactiveType ( ReactiveTypeDescriptor . multiValue ( Publisher . class , Flux : : empty ) ,
source - > ( Publisher < ? > ) source ,
source - > source ) ;
registerReactiveType (
ReactiveTypeDescriptor . singleOptionalValue ( CompletableFuture . class , ( ) - > {
CompletableFuture < ? > empty = new CompletableFuture < > ( ) ;
empty . complete ( null ) ;
return empty ;
} ) ,
source - > Mono . fromFuture ( ( CompletableFuture < ? > ) source ) ,
source - > Mono . from ( source ) . toFuture ( )
) ;
if ( reactorPresent ) {
new ReactorRegistrar ( ) . registerAdapters ( this ) ;
}
if ( rxJava1Present & & rxJava1Adapter ) {
new RxJava1Registrar ( ) . registerAdapters ( this ) ;
}
if ( rxJava2Present ) {
new RxJava2Registrar ( ) . registerAdapters ( this ) ;
}
@ -110,7 +92,12 @@ public class ReactiveAdapterRegistry {
@@ -110,7 +92,12 @@ public class ReactiveAdapterRegistry {
public void registerReactiveType ( ReactiveTypeDescriptor descriptor ,
Function < Object , Publisher < ? > > toAdapter , Function < Publisher < ? > , Object > fromAdapter ) {
this . adapters . add ( new ReactorAdapter ( descriptor , toAdapter , fromAdapter ) ) ;
if ( reactorPresent ) {
this . adapters . add ( new ReactorAdapter ( descriptor , toAdapter , fromAdapter ) ) ;
}
else {
this . adapters . add ( new ReactiveAdapter ( descriptor , toAdapter , fromAdapter ) ) ;
}
}
/ * *
@ -145,9 +132,41 @@ public class ReactiveAdapterRegistry {
@@ -145,9 +132,41 @@ public class ReactiveAdapterRegistry {
}
private static class ReactorRegistrar {
void registerAdapters ( ReactiveAdapterRegistry registry ) {
// Flux and Mono ahead of Publisher...
registry . registerReactiveType (
ReactiveTypeDescriptor . singleOptionalValue ( Mono . class , Mono : : empty ) ,
source - > ( Mono < ? > ) source ,
Mono : : from
) ;
registry . registerReactiveType ( ReactiveTypeDescriptor . multiValue ( Flux . class , Flux : : empty ) ,
source - > ( Flux < ? > ) source ,
Flux : : from ) ;
registry . registerReactiveType ( ReactiveTypeDescriptor . multiValue ( Publisher . class , Flux : : empty ) ,
source - > ( Publisher < ? > ) source ,
source - > source ) ;
registry . registerReactiveType (
ReactiveTypeDescriptor . singleOptionalValue ( CompletableFuture . class , ( ) - > {
CompletableFuture < ? > empty = new CompletableFuture < > ( ) ;
empty . complete ( null ) ;
return empty ;
} ) ,
source - > Mono . fromFuture ( ( CompletableFuture < ? > ) source ) ,
source - > Mono . from ( source ) . toFuture ( )
) ;
}
}
private static class RxJava1Registrar {
public void registerAdapters ( ReactiveAdapterRegistry registry ) {
void registerAdapters ( ReactiveAdapterRegistry registry ) {
registry . registerReactiveType (
ReactiveTypeDescriptor . multiValue ( rx . Observable . class , rx . Observable : : empty ) ,
source - > RxReactiveStreams . toPublisher ( ( rx . Observable < ? > ) source ) ,
@ -168,7 +187,7 @@ public class ReactiveAdapterRegistry {
@@ -168,7 +187,7 @@ public class ReactiveAdapterRegistry {
private static class RxJava2Registrar {
public void registerAdapters ( ReactiveAdapterRegistry registry ) {
void registerAdapters ( ReactiveAdapterRegistry registry ) {
registry . registerReactiveType (
ReactiveTypeDescriptor . multiValue ( Flowable . class , Flowable : : empty ) ,
source - > ( Flowable < ? > ) source ,
@ -204,7 +223,7 @@ public class ReactiveAdapterRegistry {
@@ -204,7 +223,7 @@ public class ReactiveAdapterRegistry {
* /
private static class ReactorAdapter extends ReactiveAdapter {
public ReactorAdapter ( ReactiveTypeDescriptor descriptor ,
ReactorAdapter ( ReactiveTypeDescriptor descriptor ,
Function < Object , Publisher < ? > > toPublisherFunction ,
Function < Publisher < ? > , Object > fromPublisherFunction ) {