@ -16,32 +16,32 @@
@@ -16,32 +16,32 @@
package org.springframework.core ;
import java.util.LinkedHashMap ;
import java.util.Map ;
import java.util.ArrayList ;
import java.util.List ;
import java.util.Optional ;
import java.util.concurrent.CompletableFuture ;
import java.util.function.Function ;
import java.util.function.Predicate ;
import io.reactivex.BackpressureStrategy ;
import io.reactivex.Completable ;
import io.reactivex.Flowable ;
import io.reactivex.Maybe ;
import io.reactivex.Observable ;
import org.reactivestreams.Publisher ;
import reactor.core.publisher.Flux ;
import reactor.core.publisher.Mono ;
import rx.Completable ;
import rx.Observable ;
import rx.RxReactiveStreams ;
import rx.Single ;
import org.springframework.util.ClassUtils ;
/ * *
* A registry of adapters to adapt to { @link Flux } and { @link Mono } .
* A registry of adapters to adapt a Reactive Streams { @link Publisher } to / from
* various async / reactive types such as { @code CompletableFuture } , RxJava
* { @code Observable } , and others .
*
* < p > By default there are adapters for { @link CompletableFuture } , RxJava 1 , and
* also for a any Reactive Streams { @link Publisher } . Additional adapters can be
* registered via { @link # registerFluxAdapter } and { @link # registerMonoAdapter } .
* < p > By default , depending on classpath availability , adapters are registered
* for Reactor , RxJava 1 , RxJava 2 types , and { @link CompletableFuture } .
*
* @author Rossen Stoyanchev
* @author Sebastien Deleuze
@ -49,6 +49,9 @@ import org.springframework.util.ClassUtils;
@@ -49,6 +49,9 @@ import org.springframework.util.ClassUtils;
* /
public class ReactiveAdapterRegistry {
private static final boolean reactorPresent =
ClassUtils . isPresent ( "reactor.core.publisher.Flux" , ReactiveAdapterRegistry . class . getClassLoader ( ) ) ;
private static final boolean rxJava1Present =
ClassUtils . isPresent ( "rx.Observable" , ReactiveAdapterRegistry . class . getClassLoader ( ) ) ;
@ -58,257 +61,180 @@ public class ReactiveAdapterRegistry {
@@ -58,257 +61,180 @@ public class ReactiveAdapterRegistry {
private static final boolean rxJava2Present =
ClassUtils . isPresent ( "io.reactivex.Flowable" , ReactiveAdapterRegistry . class . getClassLoader ( ) ) ;
private final Map < Class < ? > , ReactiveAdapter > adapterMap = new LinkedHashMap < > ( 4 ) ;
private final List < ReactiveAdapter > adapters = new ArrayList < > ( 32 ) ;
/ * *
* Create a registry and auto - register default adapters .
* /
public ReactiveAdapterRegistry ( ) {
// Flux and Mono ahead of Publisher...
registerMonoAdapter ( Mono . class ,
source - > ( Mono < ? > ) source , source - > source ,
new ReactiveAdapter . Descriptor ( false , true , false ) ) ;
registerFluxAdapter (
Flux . class , source - > ( Flux < ? > ) source , source - > source ) ;
registerFluxAdapter (
Publisher . class , source - > Flux . from ( ( Publisher < ? > ) source ) , source - > source ) ;
registerMonoAdapter ( CompletableFuture . class ,
source - > Mono . fromFuture ( ( CompletableFuture < ? > ) source ) , Mono : : toFuture ,
new ReactiveAdapter . Descriptor ( false , true , false )
) ;
if ( reactorPresent ) {
new ReactorRegistrar ( ) . registerAdapters ( this ) ;
}
if ( rxJava1Present & & rxJava1Adapter ) {
new RxJava1Adapter Registrar ( ) . register ( this ) ;
new RxJava1Registrar ( ) . registerAdapters ( this ) ;
}
if ( rxJava2Present ) {
new RxJava2Adapter Registrar ( ) . register ( this ) ;
new RxJava2Registrar ( ) . registerAdapters ( this ) ;
}
}
/ * *
* Register an adapter for adapting to and from a { @link Mono } .
* < p > The provided functions can assume that input will never be { @code null }
* and also that any { @link Optional } wrapper is unwrapped .
* /
public void registerMonoAdapter ( Class < ? > adapteeType , Function < Object , Mono < ? > > toAdapter ,
Function < Mono < ? > , Object > fromAdapter , ReactiveAdapter . Descriptor descriptor ) {
this . adapterMap . put ( adapteeType , new MonoReactiveAdapter ( toAdapter , fromAdapter , descriptor ) ) ;
}
/ * *
* Register an adapter for adapting to and from a { @link Flux } .
* < p > The provided functions can assume that input will never be { @code null }
* and also that any { @link Optional } wrapper is unwrapped .
* Register a reactive type along with functions to adapt to and from a
* Reactive Streams { @link Publisher } . The functions can assume their
* input is never be { @code null } nor { @link Optional } .
* /
public void registerFluxAdapter ( Class < ? > adapteeType , Function < Object , Flux < ? > > toAdapte r ,
Function < Flux < ? > , Object > fromAdapter ) {
public void registerReactiveType ( ReactiveTypeDescriptor descriptor ,
Function < Object , Publisher < ? > > toAdapter , Function < Publisher < ? > , Object > fromAdapter ) {
this . adapterMap . put ( adapteeType , new FluxReactiveAdapter ( toAdapter , fromAdapter ) ) ;
}
/ * *
* Get the adapter for the given adaptee type to adapt from .
* /
public ReactiveAdapter getAdapterFrom ( Class < ? > adapteeType ) {
return getAdapterFrom ( adapteeType , null ) ;
if ( reactorPresent ) {
this . adapters . add ( new ReactorAdapter ( descriptor , toAdapter , fromAdapter ) ) ;
}
else {
this . adapters . add ( new ReactiveAdapter ( descriptor , toAdapter , fromAdapter ) ) ;
}
}
/ * *
* Get the adapter for the given adaptee type to adapt from .
* If the instance is not { @code null } its actual type is used to check .
* Get the adapter to use to adapt from the given reactive type .
* /
public ReactiveAdapter getAdapterFrom ( Class < ? > adapteeType , Object adaptee ) {
Class < ? > actualType = getActualType ( adapteeType , adaptee ) ;
return getAdapterInternal ( supportedType - > supportedType . isAssignableFrom ( actualType ) ) ;
public ReactiveAdapter getAdapterFrom ( Class < ? > reactiveType ) {
return getAdapterFrom ( reactiveType , null ) ;
}
/ * *
* Get the adapter for the given adaptee type to adapt to .
* Get the adapter to use to adapt from the given reactive type . Or if the
* "source" object is not { @code null } its actual type is used instead .
* /
public ReactiveAdapter getAdapterTo ( Class < ? > adapteeType ) {
return getAdapterTo ( adapteeType , null ) ;
public ReactiveAdapter getAdapterFrom ( Class < ? > reactiveType , Object source ) {
source = ( source instanceof Optional ? ( ( Optional < ? > ) source ) . orElse ( null ) : source ) ;
Class < ? > clazz = ( source ! = null ? source . getClass ( ) : reactiveType ) ;
return getAdapter ( type - > type . isAssignableFrom ( clazz ) ) ;
}
/ * *
* Get the adapter for the given adaptee type to adapt to .
* If the instance is not { @code null } its actual type is used to check .
* Get the adapter for the given reactive type to adapt to .
* /
public ReactiveAdapter getAdapterTo ( Class < ? > adapteeType , Object adaptee ) {
Class < ? > actualType = getActualType ( adapteeType , adaptee ) ;
return getAdapterInternal ( supportedType - > supportedType . equals ( actualType ) ) ;
public ReactiveAdapter getAdapterTo ( Class < ? > reactiveType ) {
return getAdapter ( reactiveType : : equals ) ;
}
private ReactiveAdapter getAdapterInternal ( Predicate < Class < ? > > adapteeTypePredicate ) {
return this . adapterMap . keySet ( ) . stream ( )
. filter ( adapteeTypePredicate )
. map ( this . adapterMap : : get )
private ReactiveAdapter getAdapter ( Predicate < Class < ? > > predicate ) {
return this . adapters . stream ( )
. filter ( adapter - > predicate . test ( adapter . getDescriptor ( ) . getReactiveType ( ) ) )
. findFirst ( )
. orElse ( null ) ;
}
private static Class < ? > getActualType ( Class < ? > adapteeType , Object adaptee ) {
adaptee = unwrapOptional ( adaptee ) ;
return ( adaptee ! = null ? adaptee . getClass ( ) : adapteeType ) ;
}
private static Object unwrapOptional ( Object value ) {
return ( value instanceof Optional ? ( ( Optional < ? > ) value ) . orElse ( null ) : value ) ;
}
private static class ReactorRegistrar {
void registerAdapters ( ReactiveAdapterRegistry registry ) {
@SuppressWarnings ( "unchecked" )
private static class MonoReactiveAdapter implements ReactiveAdapter {
private final Function < Object , Mono < ? > > toAdapter ;
private final Function < Mono < ? > , Object > fromAdapter ;
private final Descriptor descriptor ;
MonoReactiveAdapter ( Function < Object , Mono < ? > > to , Function < Mono < ? > , Object > from , Descriptor descriptor ) {
this . toAdapter = to ;
this . fromAdapter = from ;
this . descriptor = descriptor ;
}
@Override
public Descriptor getDescriptor ( ) {
return this . descriptor ;
}
@Override
public < T > Mono < T > toMono ( Object source ) {
source = unwrapOptional ( source ) ;
if ( source = = null ) {
return Mono . empty ( ) ;
}
return ( Mono < T > ) this . toAdapter . apply ( source ) ;
}
@Override
public < T > Flux < T > toFlux ( Object source ) {
source = unwrapOptional ( source ) ;
if ( source = = null ) {
return Flux . empty ( ) ;
}
return ( Flux < T > ) this . toMono ( source ) . flux ( ) ;
}
@Override
public < T > Publisher < T > toPublisher ( Object source ) {
return toMono ( source ) ;
}
// Flux and Mono ahead of Publisher...
@Override
public Object fromPublisher ( Publisher < ? > source ) {
return ( source ! = null ? this . fromAdapter . apply ( ( Mono < ? > ) source ) : null ) ;
}
}
@SuppressWarnings ( "unchecked" )
private static class FluxReactiveAdapter implements ReactiveAdapter {
private final Function < Object , Flux < ? > > toAdapter ;
private final Function < Flux < ? > , Object > fromAdapter ;
private final Descriptor descriptor = new Descriptor ( true , true , false ) ;
FluxReactiveAdapter ( Function < Object , Flux < ? > > to , Function < Flux < ? > , Object > from ) {
this . toAdapter = to ;
this . fromAdapter = from ;
}
@Override
public Descriptor getDescriptor ( ) {
return this . descriptor ;
}
@Override
public < T > Mono < T > toMono ( Object source ) {
source = unwrapOptional ( source ) ;
if ( source = = null ) {
return Mono . empty ( ) ;
}
return ( Mono < T > ) this . toAdapter . apply ( source ) . next ( ) ;
}
@Override
public < T > Flux < T > toFlux ( Object source ) {
source = unwrapOptional ( source ) ;
if ( source = = null ) {
return Flux . empty ( ) ;
}
return ( Flux < T > ) this . toAdapter . apply ( source ) ;
}
@Override
public < T > Publisher < T > toPublisher ( Object source ) {
return toFlux ( source ) ;
}
registry . registerReactiveType (
ReactiveTypeDescriptor . singleOptionalValue ( Mono . class , Mono : : empty ) ,
source - > ( Mono < ? > ) source ,
Mono : : from
) ;
@Override
public Object fromPublisher ( Publisher < ? > source ) {
return ( source ! = null ? this . fromAdapter . apply ( ( Flux < ? > ) source ) : null ) ;
registry . registerReactiveType ( ReactiveTypeDescriptor . multiValue ( Flux . class , Flux : : empty ) ,
source - > ( Flux < ? > ) source ,
Flux : : from ) ;
registry . registerReactiveType ( ReactiveTypeDescriptor . multiValue ( Publisher . class , Flux : : empty ) ,
source - > ( Publisher < ? > ) source ,
source - > source ) ;
registry . registerReactiveType (
ReactiveTypeDescriptor . singleOptionalValue ( CompletableFuture . class , ( ) - > {
CompletableFuture < ? > empty = new CompletableFuture < > ( ) ;
empty . complete ( null ) ;
return empty ;
} ) ,
source - > Mono . fromFuture ( ( CompletableFuture < ? > ) source ) ,
source - > Mono . from ( source ) . toFuture ( )
) ;
}
}
private static class RxJava1Registrar {
private static class RxJava1AdapterRegistrar {
public void register ( ReactiveAdapterRegistry registry ) {
registry . registerFluxAdapter ( Observable . class ,
source - > Flux . from ( RxReactiveStreams . toPublisher ( ( Observable < ? > ) source ) ) ,
void registerAdapters ( ReactiveAdapterRegistry registry ) {
registry . registerReactiveType (
ReactiveTypeDescriptor . multiValue ( rx . Observable . class , rx . Observable : : empty ) ,
source - > RxReactiveStreams . toPublisher ( ( rx . Observable < ? > ) source ) ,
RxReactiveStreams : : toObservable
) ;
registry . registerMonoAdapter ( Single . class ,
source - > Mono . from ( RxReactiveStreams . toPublisher ( ( Single < ? > ) source ) ) ,
RxReactiveStreams : : toSingle ,
new ReactiveAdapter . Descriptor ( false , false , false )
registry . registerReactiveType (
ReactiveTypeDescriptor . singleRequiredValue ( rx . Single . class ) ,
source - > RxReactiveStreams . toPublisher ( ( rx . Single < ? > ) source ) ,
RxReactiveStreams : : toSingle
) ;
registry . registerMonoAdapter ( Completable . class ,
source - > Mono . from ( RxReactiveStreams . toPublisher ( ( Completable ) source ) ) ,
RxReactiveStreams : : toCompletable ,
new ReactiveAdapter . Descriptor ( false , true , true )
registry . registerReactiveType (
ReactiveTypeDescriptor . noValue ( rx . Completable . class , Completable : : complete ) ,
source - > RxReactiveStreams . toPublisher ( ( rx . Completable ) source ) ,
RxReactiveStreams : : toCompletable
) ;
}
}
private static class RxJava2Adapter Registrar {
private static class RxJava2Registrar {
public void register ( ReactiveAdapterRegistry registry ) {
registry . registerFluxAdapter ( Flowable . class ,
source - > Flux . from ( ( Flowable < ? > ) source ) ,
void registerAdapters ( ReactiveAdapterRegistry registry ) {
registry . registerReactiveType (
ReactiveTypeDescriptor . multiValue ( Flowable . class , Flowable : : empty ) ,
source - > ( Flowable < ? > ) source ,
source - > Flowable . fromPublisher ( source )
) ;
registry . registerFluxAdapter ( io . reactivex . Observable . class ,
source - > Flux . from ( ( ( io . reactivex . Observable < ? > ) source ) . toFlowable ( BackpressureStrategy . BUFFER ) ) ,
registry . registerReactiveType (
ReactiveTypeDescriptor . multiValue ( Observable . class , Observable : : empty ) ,
source - > ( ( Observable < ? > ) source ) . toFlowable ( BackpressureStrategy . BUFFER ) ,
source - > Flowable . fromPublisher ( source ) . toObservable ( )
) ;
registry . registerMonoAdapter ( io . reactivex . Single . class ,
source - > Mono . from ( ( ( io . reactivex . Single < ? > ) source ) . toFlowable ( ) ) ,
source - > Flowable . fromPublisher ( source ) . toObservable ( ) . singleElement ( ) . toSing le( ) ,
new ReactiveAdapter . Descriptor ( false , false , false )
registry . registerReactiveType (
ReactiveTypeDescriptor . singleRequiredValue ( io . reactivex . Single . class ) ,
source - > ( ( io . reactivex . Single < ? > ) source ) . toFlowab le( ) ,
source - > Flowable . fromPublisher ( source ) . toObservable ( ) . singleElement ( ) . toSingle ( )
) ;
registry . registerMonoAdapter ( Maybe . class ,
source - > Mono . from ( ( ( Maybe < ? > ) source ) . toFlowable ( ) ) ,
source - > Flowable . fromPublisher ( source ) . toObservable ( ) . singleElement ( ) ,
new ReactiveAdapter . Descriptor ( false , true , false )
registry . registerReactiveType (
ReactiveTypeDescriptor . singleOptionalValue ( Maybe . class , Maybe : : empty ) ,
source - > ( ( Maybe < ? > ) source ) . toFlowable ( ) ,
source - > Flowable . fromPublisher ( source ) . toObservable ( ) . singleElement ( )
) ;
registry . registerMonoAdapter ( io . reactivex . Completable . class ,
source - > Mono . from ( ( ( io . reactivex . Completable ) source ) . toFlowable ( ) ) ,
source - > Flowable . fromPublisher ( source ) . toObservable ( ) . ignoreElements ( ) ,
new ReactiveAdapter . Descriptor ( false , true , true )
registry . registerReactiveType (
ReactiveTypeDescriptor . noValue ( Completable . class , Completable : : complete ) ,
source - > ( ( Completable ) source ) . toFlowable ( ) ,
source - > Flowable . fromPublisher ( source ) . toObservable ( ) . ignoreElements ( )
) ;
}
}
/ * *
* Extension of ReactiveAdapter that wraps adapted ( raw ) Publisher ' s as
* { @link Flux } or { @link Mono } depending on the underlying reactive type ' s
* stream semantics .
* /
private static class ReactorAdapter extends ReactiveAdapter {
ReactorAdapter ( ReactiveTypeDescriptor descriptor ,
Function < Object , Publisher < ? > > toPublisherFunction ,
Function < Publisher < ? > , Object > fromPublisherFunction ) {
super ( descriptor , toPublisherFunction , fromPublisherFunction ) ;
}
@Override
public < T > Publisher < T > toPublisher ( Object source ) {
Publisher < T > publisher = super . toPublisher ( source ) ;
return ( getDescriptor ( ) . isMultiValue ( ) ? Flux . from ( publisher ) : Mono . from ( publisher ) ) ;
}
}
}