Browse Source

Use RxJava 2/3 fromPublisher() when possible in ReactiveAdapterRegistry

Closes gh-26051
pull/26058/head
Sébastien Deleuze 5 years ago
parent
commit
c73cff8bad
  1. 21
      spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java

21
spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java

@ -264,14 +264,12 @@ public class ReactiveAdapterRegistry { @@ -264,14 +264,12 @@ public class ReactiveAdapterRegistry {
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(io.reactivex.Observable.class, io.reactivex.Observable::empty),
source -> ((io.reactivex.Observable<?>) source).toFlowable(io.reactivex.BackpressureStrategy.BUFFER),
source -> io.reactivex.Flowable.fromPublisher(source)
.toObservable()
io.reactivex.Observable::fromPublisher
);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleRequiredValue(io.reactivex.Single.class),
source -> ((io.reactivex.Single<?>) source).toFlowable(),
source -> io.reactivex.Flowable.fromPublisher(source)
.toObservable().singleElement().toSingle()
io.reactivex.Single::fromPublisher
);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(io.reactivex.Maybe.class, io.reactivex.Maybe::empty),
@ -282,8 +280,7 @@ public class ReactiveAdapterRegistry { @@ -282,8 +280,7 @@ public class ReactiveAdapterRegistry {
registry.registerReactiveType(
ReactiveTypeDescriptor.noValue(io.reactivex.Completable.class, io.reactivex.Completable::complete),
source -> ((io.reactivex.Completable) source).toFlowable(),
source -> io.reactivex.Flowable.fromPublisher(source)
.toObservable().ignoreElements()
io.reactivex.Completable::fromPublisher
);
}
}
@ -304,30 +301,26 @@ public class ReactiveAdapterRegistry { @@ -304,30 +301,26 @@ public class ReactiveAdapterRegistry {
io.reactivex.rxjava3.core.Observable::empty),
source -> ((io.reactivex.rxjava3.core.Observable<?>) source).toFlowable(
io.reactivex.rxjava3.core.BackpressureStrategy.BUFFER),
source -> io.reactivex.rxjava3.core.Flowable.fromPublisher(source)
.toObservable()
io.reactivex.rxjava3.core.Observable::fromPublisher
);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleRequiredValue(io.reactivex.rxjava3.core.Single.class),
source -> ((io.reactivex.rxjava3.core.Single<?>) source).toFlowable(),
source -> io.reactivex.rxjava3.core.Flowable.fromPublisher(source)
.toObservable().singleElement().toSingle()
io.reactivex.rxjava3.core.Single::fromPublisher
);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(
io.reactivex.rxjava3.core.Maybe.class,
io.reactivex.rxjava3.core.Maybe::empty),
source -> ((io.reactivex.rxjava3.core.Maybe<?>) source).toFlowable(),
source -> io.reactivex.rxjava3.core.Flowable.fromPublisher(source)
.toObservable().singleElement()
io.reactivex.rxjava3.core.Maybe::fromPublisher
);
registry.registerReactiveType(
ReactiveTypeDescriptor.noValue(
io.reactivex.rxjava3.core.Completable.class,
io.reactivex.rxjava3.core.Completable::complete),
source -> ((io.reactivex.rxjava3.core.Completable) source).toFlowable(),
source -> io.reactivex.rxjava3.core.Flowable.fromPublisher(source)
.toObservable().ignoreElements()
io.reactivex.rxjava3.core.Completable::fromPublisher
);
}
}

Loading…
Cancel
Save