diff --git a/spring-core/src/main/java/org/springframework/core/ReactiveAdapter.java b/spring-core/src/main/java/org/springframework/core/ReactiveAdapter.java
index a829562f2d2..d856aa16021 100644
--- a/spring-core/src/main/java/org/springframework/core/ReactiveAdapter.java
+++ b/spring-core/src/main/java/org/springframework/core/ReactiveAdapter.java
@@ -16,99 +16,81 @@
package org.springframework.core;
+import java.util.Optional;
+import java.util.function.Function;
+
import org.reactivestreams.Publisher;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
+
+import org.springframework.util.Assert;
/**
- * Contract for adapting to and from {@link Flux} and {@link Mono}.
+ * Adapt a Reactive Streams {@link Publisher} to and from an async/reactive type
+ * such as {@code CompletableFuture}, an RxJava {@code Observable}, etc.
*
- *
An adapter supports a specific adaptee type whose stream semantics
- * can be checked via {@link #getDescriptor()}.
- *
- *
Use the {@link ReactiveAdapterRegistry} to obtain an adapter for a
- * supported adaptee type or to register additional adapters.
+ *
Use the {@link ReactiveAdapterRegistry} to register reactive types and
+ * obtain adapters from.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
-public interface ReactiveAdapter {
-
- /**
- * Return a descriptor with further information about the adaptee.
- */
- Descriptor getDescriptor();
-
- /**
- * Adapt the given Object to a {@link Mono}
- * @param source the source object to adapt
- * @return the resulting {@link Mono} possibly empty
- */
- Mono toMono(Object source);
+public class ReactiveAdapter {
- /**
- * Adapt the given Object to a {@link Flux}.
- * @param source the source object to adapt
- * @return the resulting {@link Flux} possibly empty
- */
- Flux toFlux(Object source);
+ private final ReactiveTypeDescriptor descriptor;
- /**
- * Adapt the given Object to a Publisher.
- * @param source the source object to adapt
- * @return the resulting {@link Mono} or {@link Flux} possibly empty
- */
- Publisher toPublisher(Object source);
+ private final Function> toPublisherFunction;
- /**
- * Adapt the given Publisher to the target adaptee.
- * @param publisher the publisher to adapt
- * @return the resulting adaptee
- */
- Object fromPublisher(Publisher> publisher);
+ private final Function, Object> fromPublisherFunction;
/**
- * A descriptor with information about the adaptee stream semantics.
+ * Constructor for an adapter with functions to convert the target reactive
+ * or async type to and from a Reactive Streams Publisher.
+ * @param descriptor the reactive type descriptor
+ * @param toPublisherFunction adapter to a Publisher
+ * @param fromPublisherFunction adapter from a Publisher
*/
- class Descriptor {
-
- private final boolean isMultiValue;
+ public ReactiveAdapter(ReactiveTypeDescriptor descriptor,
+ Function> toPublisherFunction,
+ Function, Object> fromPublisherFunction) {
- private final boolean supportsEmpty;
+ Assert.notNull(descriptor, "'descriptor' is required");
+ Assert.notNull(toPublisherFunction, "'toPublisherFunction' is required");
+ Assert.notNull(fromPublisherFunction, "'fromPublisherFunction' is required");
- private final boolean isNoValue;
+ this.descriptor = descriptor;
+ this.toPublisherFunction = toPublisherFunction;
+ this.fromPublisherFunction = fromPublisherFunction;
+ }
- public Descriptor(boolean isMultiValue, boolean canBeEmpty, boolean isNoValue) {
- this.isMultiValue = isMultiValue;
- this.supportsEmpty = canBeEmpty;
- this.isNoValue = isNoValue;
- }
- /**
- * Return {@code true} if the adaptee implies 0..N values can be produced
- * and is therefore a good fit to adapt to {@link Flux}. A {@code false}
- * return value implies the adaptee will produce 1 value at most and is
- * therefore a good fit for {@link Mono}.
- */
- public boolean isMultiValue() {
- return this.isMultiValue;
- }
+ /**
+ * Return the descriptor of the reactive type for the adapter.
+ */
+ public ReactiveTypeDescriptor getDescriptor() {
+ return this.descriptor;
+ }
- /**
- * Return {@code true} if the adaptee can complete without values.
- */
- public boolean supportsEmpty() {
- return this.supportsEmpty;
+ /**
+ * Adapt the given instance to a Reactive Streams Publisher.
+ * @param source the source object to adapt from
+ * @return the Publisher repesenting the adaptation
+ */
+ @SuppressWarnings("unchecked")
+ public Publisher toPublisher(Object source) {
+ source = (source instanceof Optional ? ((Optional>) source).orElse(null) : source);
+ if (source == null) {
+ source = getDescriptor().getEmptyValue();
}
+ return (Publisher) this.toPublisherFunction.apply(source);
+ }
- /**
- * Return {@code true} if the adaptee implies no values will be produced,
- * i.e. providing only completion or error signal.
- */
- public boolean isNoValue() {
- return this.isNoValue;
- }
+ /**
+ * Adapt from the given Reactive Streams Publisher.
+ * @param publisher the publisher to adapt from
+ * @return the reactive type instance representing the adapted publisher
+ */
+ public Object fromPublisher(Publisher> publisher) {
+ return (publisher != null ? this.fromPublisherFunction.apply(publisher) : null);
}
}
diff --git a/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java b/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java
index 6be60256376..5cf10f1618f 100644
--- a/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java
+++ b/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java
@@ -16,32 +16,32 @@
package org.springframework.core;
-import java.util.LinkedHashMap;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Predicate;
import io.reactivex.BackpressureStrategy;
+import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
+import io.reactivex.Observable;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import rx.Completable;
-import rx.Observable;
import rx.RxReactiveStreams;
-import rx.Single;
import org.springframework.util.ClassUtils;
/**
- * A registry of adapters to adapt to {@link Flux} and {@link Mono}.
+ * A registry of adapters to adapt a Reactive Streams {@link Publisher} to/from
+ * various async/reactive types such as {@code CompletableFuture}, RxJava
+ * {@code Observable}, and others.
*
- * By default there are adapters for {@link CompletableFuture}, RxJava 1, and
- * also for a any Reactive Streams {@link Publisher}. Additional adapters can be
- * registered via {@link #registerFluxAdapter} and {@link #registerMonoAdapter}.
+ *
By default, depending on classpath availability, adapters are registered
+ * for Reactor, RxJava 1, RxJava 2 types, and {@link CompletableFuture}.
*
* @author Rossen Stoyanchev
* @author Sebastien Deleuze
@@ -49,6 +49,9 @@ import org.springframework.util.ClassUtils;
*/
public class ReactiveAdapterRegistry {
+ private static final boolean reactorPresent =
+ ClassUtils.isPresent("reactor.core.publisher.Flux", ReactiveAdapterRegistry.class.getClassLoader());
+
private static final boolean rxJava1Present =
ClassUtils.isPresent("rx.Observable", ReactiveAdapterRegistry.class.getClassLoader());
@@ -58,257 +61,180 @@ public class ReactiveAdapterRegistry {
private static final boolean rxJava2Present =
ClassUtils.isPresent("io.reactivex.Flowable", ReactiveAdapterRegistry.class.getClassLoader());
- private final Map, ReactiveAdapter> adapterMap = new LinkedHashMap<>(4);
+
+ private final List adapters = new ArrayList<>(32);
/**
* Create a registry and auto-register default adapters.
*/
public ReactiveAdapterRegistry() {
- // Flux and Mono ahead of Publisher...
- registerMonoAdapter(Mono.class,
- source -> (Mono>) source, source -> source,
- new ReactiveAdapter.Descriptor(false, true, false));
- registerFluxAdapter(
- Flux.class, source -> (Flux>) source, source -> source);
- registerFluxAdapter(
- Publisher.class, source -> Flux.from((Publisher>) source), source -> source);
-
- registerMonoAdapter(CompletableFuture.class,
- source -> Mono.fromFuture((CompletableFuture>) source), Mono::toFuture,
- new ReactiveAdapter.Descriptor(false, true, false)
- );
+
+ if (reactorPresent) {
+ new ReactorRegistrar().registerAdapters(this);
+ }
if (rxJava1Present && rxJava1Adapter) {
- new RxJava1AdapterRegistrar().register(this);
+ new RxJava1Registrar().registerAdapters(this);
}
+
if (rxJava2Present) {
- new RxJava2AdapterRegistrar().register(this);
+ new RxJava2Registrar().registerAdapters(this);
}
}
/**
- * Register an adapter for adapting to and from a {@link Mono}.
- * The provided functions can assume that input will never be {@code null}
- * and also that any {@link Optional} wrapper is unwrapped.
- */
- public void registerMonoAdapter(Class> adapteeType, Function> toAdapter,
- Function, Object> fromAdapter, ReactiveAdapter.Descriptor descriptor) {
-
- this.adapterMap.put(adapteeType, new MonoReactiveAdapter(toAdapter, fromAdapter, descriptor));
- }
-
- /**
- * Register an adapter for adapting to and from a {@link Flux}.
- * The provided functions can assume that input will never be {@code null}
- * and also that any {@link Optional} wrapper is unwrapped.
+ * Register a reactive type along with functions to adapt to and from a
+ * Reactive Streams {@link Publisher}. The functions can assume their
+ * input is never be {@code null} nor {@link Optional}.
*/
- public void registerFluxAdapter(Class> adapteeType, Function> toAdapter,
- Function, Object> fromAdapter) {
+ public void registerReactiveType(ReactiveTypeDescriptor descriptor,
+ Function> toAdapter, Function, Object> fromAdapter) {
- this.adapterMap.put(adapteeType, new FluxReactiveAdapter(toAdapter, fromAdapter));
- }
-
-
- /**
- * Get the adapter for the given adaptee type to adapt from.
- */
- public ReactiveAdapter getAdapterFrom(Class> adapteeType) {
- return getAdapterFrom(adapteeType, null);
+ if (reactorPresent) {
+ this.adapters.add(new ReactorAdapter(descriptor, toAdapter, fromAdapter));
+ }
+ else {
+ this.adapters.add(new ReactiveAdapter(descriptor, toAdapter, fromAdapter));
+ }
}
/**
- * Get the adapter for the given adaptee type to adapt from.
- * If the instance is not {@code null} its actual type is used to check.
+ * Get the adapter to use to adapt from the given reactive type.
*/
- public ReactiveAdapter getAdapterFrom(Class> adapteeType, Object adaptee) {
- Class> actualType = getActualType(adapteeType, adaptee);
- return getAdapterInternal(supportedType -> supportedType.isAssignableFrom(actualType));
+ public ReactiveAdapter getAdapterFrom(Class> reactiveType) {
+ return getAdapterFrom(reactiveType, null);
}
/**
- * Get the adapter for the given adaptee type to adapt to.
+ * Get the adapter to use to adapt from the given reactive type. Or if the
+ * "source" object is not {@code null} its actual type is used instead.
*/
- public ReactiveAdapter getAdapterTo(Class> adapteeType) {
- return getAdapterTo(adapteeType, null);
+ public ReactiveAdapter getAdapterFrom(Class> reactiveType, Object source) {
+ source = (source instanceof Optional ? ((Optional>) source).orElse(null) : source);
+ Class> clazz = (source != null ? source.getClass() : reactiveType);
+ return getAdapter(type -> type.isAssignableFrom(clazz));
}
/**
- * Get the adapter for the given adaptee type to adapt to.
- * If the instance is not {@code null} its actual type is used to check.
+ * Get the adapter for the given reactive type to adapt to.
*/
- public ReactiveAdapter getAdapterTo(Class> adapteeType, Object adaptee) {
- Class> actualType = getActualType(adapteeType, adaptee);
- return getAdapterInternal(supportedType -> supportedType.equals(actualType));
+ public ReactiveAdapter getAdapterTo(Class> reactiveType) {
+ return getAdapter(reactiveType::equals);
}
- private ReactiveAdapter getAdapterInternal(Predicate> adapteeTypePredicate) {
- return this.adapterMap.keySet().stream()
- .filter(adapteeTypePredicate)
- .map(this.adapterMap::get)
+ private ReactiveAdapter getAdapter(Predicate> predicate) {
+ return this.adapters.stream()
+ .filter(adapter -> predicate.test(adapter.getDescriptor().getReactiveType()))
.findFirst()
.orElse(null);
}
- private static Class> getActualType(Class> adapteeType, Object adaptee) {
- adaptee = unwrapOptional(adaptee);
- return (adaptee != null ? adaptee.getClass() : adapteeType);
- }
-
- private static Object unwrapOptional(Object value) {
- return (value instanceof Optional ? ((Optional>) value).orElse(null) : value);
- }
+ private static class ReactorRegistrar {
+ void registerAdapters(ReactiveAdapterRegistry registry) {
- @SuppressWarnings("unchecked")
- private static class MonoReactiveAdapter implements ReactiveAdapter {
-
- private final Function> toAdapter;
-
- private final Function, Object> fromAdapter;
-
- private final Descriptor descriptor;
-
-
- MonoReactiveAdapter(Function> to, Function, Object> from, Descriptor descriptor) {
- this.toAdapter = to;
- this.fromAdapter = from;
- this.descriptor = descriptor;
- }
-
- @Override
- public Descriptor getDescriptor() {
- return this.descriptor;
- }
-
- @Override
- public Mono toMono(Object source) {
- source = unwrapOptional(source);
- if (source == null) {
- return Mono.empty();
- }
- return (Mono) this.toAdapter.apply(source);
- }
-
- @Override
- public Flux toFlux(Object source) {
- source = unwrapOptional(source);
- if (source == null) {
- return Flux.empty();
- }
- return (Flux) this.toMono(source).flux();
- }
-
- @Override
- public Publisher toPublisher(Object source) {
- return toMono(source);
- }
+ // Flux and Mono ahead of Publisher...
- @Override
- public Object fromPublisher(Publisher> source) {
- return (source != null ? this.fromAdapter.apply((Mono>) source) : null);
- }
- }
-
- @SuppressWarnings("unchecked")
- private static class FluxReactiveAdapter implements ReactiveAdapter {
-
- private final Function> toAdapter;
-
- private final Function, Object> fromAdapter;
-
- private final Descriptor descriptor = new Descriptor(true, true, false);
-
-
- FluxReactiveAdapter(Function> to, Function, Object> from) {
- this.toAdapter = to;
- this.fromAdapter = from;
- }
-
- @Override
- public Descriptor getDescriptor() {
- return this.descriptor;
- }
-
- @Override
- public Mono toMono(Object source) {
- source = unwrapOptional(source);
- if (source == null) {
- return Mono.empty();
- }
- return (Mono) this.toAdapter.apply(source).next();
- }
-
- @Override
- public Flux toFlux(Object source) {
- source = unwrapOptional(source);
- if (source == null) {
- return Flux.empty();
- }
- return (Flux) this.toAdapter.apply(source);
- }
-
- @Override
- public Publisher toPublisher(Object source) {
- return toFlux(source);
- }
+ registry.registerReactiveType(
+ ReactiveTypeDescriptor.singleOptionalValue(Mono.class, Mono::empty),
+ source -> (Mono>) source,
+ Mono::from
+ );
- @Override
- public Object fromPublisher(Publisher> source) {
- return (source != null ? this.fromAdapter.apply((Flux>) source) : null);
+ registry.registerReactiveType(ReactiveTypeDescriptor.multiValue(Flux.class, Flux::empty),
+ source -> (Flux>) source,
+ Flux::from);
+
+ registry.registerReactiveType(ReactiveTypeDescriptor.multiValue(Publisher.class, Flux::empty),
+ source -> (Publisher>) source,
+ source -> source);
+
+ registry.registerReactiveType(
+ ReactiveTypeDescriptor.singleOptionalValue(CompletableFuture.class, () -> {
+ CompletableFuture> empty = new CompletableFuture<>();
+ empty.complete(null);
+ return empty;
+ }),
+ source -> Mono.fromFuture((CompletableFuture>) source),
+ source -> Mono.from(source).toFuture()
+ );
}
}
+ private static class RxJava1Registrar {
- private static class RxJava1AdapterRegistrar {
-
- public void register(ReactiveAdapterRegistry registry) {
- registry.registerFluxAdapter(Observable.class,
- source -> Flux.from(RxReactiveStreams.toPublisher((Observable>) source)),
+ void registerAdapters(ReactiveAdapterRegistry registry) {
+ registry.registerReactiveType(
+ ReactiveTypeDescriptor.multiValue(rx.Observable.class, rx.Observable::empty),
+ source -> RxReactiveStreams.toPublisher((rx.Observable>) source),
RxReactiveStreams::toObservable
);
- registry.registerMonoAdapter(Single.class,
- source -> Mono.from(RxReactiveStreams.toPublisher((Single>) source)),
- RxReactiveStreams::toSingle,
- new ReactiveAdapter.Descriptor(false, false, false)
+ registry.registerReactiveType(
+ ReactiveTypeDescriptor.singleRequiredValue(rx.Single.class),
+ source -> RxReactiveStreams.toPublisher((rx.Single>) source),
+ RxReactiveStreams::toSingle
);
- registry.registerMonoAdapter(Completable.class,
- source -> Mono.from(RxReactiveStreams.toPublisher((Completable) source)),
- RxReactiveStreams::toCompletable,
- new ReactiveAdapter.Descriptor(false, true, true)
+ registry.registerReactiveType(
+ ReactiveTypeDescriptor.noValue(rx.Completable.class, Completable::complete),
+ source -> RxReactiveStreams.toPublisher((rx.Completable) source),
+ RxReactiveStreams::toCompletable
);
}
}
- private static class RxJava2AdapterRegistrar {
+ private static class RxJava2Registrar {
- public void register(ReactiveAdapterRegistry registry) {
- registry.registerFluxAdapter(Flowable.class,
- source -> Flux.from((Flowable>) source),
+ void registerAdapters(ReactiveAdapterRegistry registry) {
+ registry.registerReactiveType(
+ ReactiveTypeDescriptor.multiValue(Flowable.class, Flowable::empty),
+ source -> (Flowable>) source,
source-> Flowable.fromPublisher(source)
);
- registry.registerFluxAdapter(io.reactivex.Observable.class,
- source -> Flux.from(((io.reactivex.Observable>) source).toFlowable(BackpressureStrategy.BUFFER)),
+ registry.registerReactiveType(
+ ReactiveTypeDescriptor.multiValue(Observable.class, Observable::empty),
+ source -> ((Observable>) source).toFlowable(BackpressureStrategy.BUFFER),
source -> Flowable.fromPublisher(source).toObservable()
);
- registry.registerMonoAdapter(io.reactivex.Single.class,
- source -> Mono.from(((io.reactivex.Single>) source).toFlowable()),
- source -> Flowable.fromPublisher(source).toObservable().singleElement().toSingle(),
- new ReactiveAdapter.Descriptor(false, false, false)
+ registry.registerReactiveType(
+ ReactiveTypeDescriptor.singleRequiredValue(io.reactivex.Single.class),
+ source -> ((io.reactivex.Single>) source).toFlowable(),
+ source -> Flowable.fromPublisher(source).toObservable().singleElement().toSingle()
);
- registry.registerMonoAdapter(Maybe.class,
- source -> Mono.from(((Maybe>) source).toFlowable()),
- source -> Flowable.fromPublisher(source).toObservable().singleElement(),
- new ReactiveAdapter.Descriptor(false, true, false)
+ registry.registerReactiveType(
+ ReactiveTypeDescriptor.singleOptionalValue(Maybe.class, Maybe::empty),
+ source -> ((Maybe>) source).toFlowable(),
+ source -> Flowable.fromPublisher(source).toObservable().singleElement()
);
- registry.registerMonoAdapter(io.reactivex.Completable.class,
- source -> Mono.from(((io.reactivex.Completable) source).toFlowable()),
- source -> Flowable.fromPublisher(source).toObservable().ignoreElements(),
- new ReactiveAdapter.Descriptor(false, true, true)
+ registry.registerReactiveType(
+ ReactiveTypeDescriptor.noValue(Completable.class, Completable::complete),
+ source -> ((Completable) source).toFlowable(),
+ source -> Flowable.fromPublisher(source).toObservable().ignoreElements()
);
}
}
+ /**
+ * Extension of ReactiveAdapter that wraps adapted (raw) Publisher's as
+ * {@link Flux} or {@link Mono} depending on the underlying reactive type's
+ * stream semantics.
+ */
+ private static class ReactorAdapter extends ReactiveAdapter {
+
+ ReactorAdapter(ReactiveTypeDescriptor descriptor,
+ Function> toPublisherFunction,
+ Function, Object> fromPublisherFunction) {
+
+ super(descriptor, toPublisherFunction, fromPublisherFunction);
+ }
+
+ @Override
+ public Publisher toPublisher(Object source) {
+ Publisher publisher = super.toPublisher(source);
+ return (getDescriptor().isMultiValue() ? Flux.from(publisher) : Mono.from(publisher));
+ }
+ }
+
}
diff --git a/spring-core/src/main/java/org/springframework/core/ReactiveTypeDescriptor.java b/spring-core/src/main/java/org/springframework/core/ReactiveTypeDescriptor.java
new file mode 100644
index 00000000000..587201e06e2
--- /dev/null
+++ b/spring-core/src/main/java/org/springframework/core/ReactiveTypeDescriptor.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2002-2016 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.core;
+
+import java.util.function.Supplier;
+
+import org.springframework.util.Assert;
+
+/**
+ * Descriptor for a reactive type with information its stream semantics, i.e.
+ * how many values it can produce.
+ *
+ * @author Rossen Stoyanchev
+ * @since 5.0
+ */
+public class ReactiveTypeDescriptor {
+
+ private final Class> reactiveType;
+
+ private final Supplier> emptyValueSupplier;
+
+ private final boolean multiValue;
+
+ private final boolean supportsEmpty;
+
+ private final boolean noValue;
+
+
+ /**
+ * Private constructor. See static factory methods.
+ */
+ private ReactiveTypeDescriptor(Class> reactiveType, Supplier> emptySupplier,
+ boolean multiValue, boolean canBeEmpty, boolean noValue) {
+
+ Assert.notNull(reactiveType, "'reactiveType' must not be null");
+ Assert.isTrue(!canBeEmpty || emptySupplier != null, "Empty value supplier is required.");
+ this.reactiveType = reactiveType;
+ this.emptyValueSupplier = emptySupplier;
+ this.multiValue = multiValue;
+ this.supportsEmpty = canBeEmpty;
+ this.noValue = noValue;
+ }
+
+
+ /**
+ * Return the reactive type the descriptor was created for.
+ */
+ public Class> getReactiveType() {
+ return this.reactiveType;
+ }
+
+ /**
+ * Return an empty-value instance for the underlying reactive or async type.
+ * Use of this type implies {@link #supportsEmpty()} is true.
+ */
+ public Object getEmptyValue() {
+ Assert.isTrue(supportsEmpty(), "Empty values not supported.");
+ return this.emptyValueSupplier.get();
+ }
+
+ /**
+ * Return {@code true} if the reactive type can produce more than 1 value
+ * can be produced and is therefore a good fit to adapt to {@code Flux}.
+ * A {@code false} return value implies the reactive type can produce 1
+ * value at most and is therefore a good fit to adapt to {@code Mono}.
+ */
+ public boolean isMultiValue() {
+ return this.multiValue;
+ }
+
+ /**
+ * Return {@code true} if the reactive type can complete with no values.
+ */
+ public boolean supportsEmpty() {
+ return this.supportsEmpty;
+ }
+
+ /**
+ * Return {@code true} if the reactive type does not produce any values and
+ * only provides completion and error signals.
+ */
+ public boolean isNoValue() {
+ return this.noValue;
+ }
+
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+ return this.reactiveType.equals(((ReactiveTypeDescriptor) other).reactiveType);
+ }
+
+ @Override
+ public int hashCode() {
+ return this.reactiveType.hashCode();
+ }
+
+
+ /**
+ * Descriptor for a reactive type that can produce 0..N values.
+ * @param type the reactive type
+ * @param emptySupplier a supplier of an empty-value instance of the reactive type
+ */
+ public static ReactiveTypeDescriptor multiValue(Class> type, Supplier> emptySupplier) {
+ return new ReactiveTypeDescriptor(type, emptySupplier, true, true, false);
+ }
+
+ /**
+ * Descriptor for a reactive type that can produce 0..1 values.
+ * @param type the reactive type
+ * @param emptySupplier a supplier of an empty-value instance of the reactive type
+ */
+ public static ReactiveTypeDescriptor singleOptionalValue(Class> type, Supplier> emptySupplier) {
+ return new ReactiveTypeDescriptor(type, emptySupplier, false, true, false);
+ }
+
+ /**
+ * Descriptor for a reactive type that must produce 1 value to complete.
+ * @param type the reactive type
+ */
+ public static ReactiveTypeDescriptor singleRequiredValue(Class> type) {
+ return new ReactiveTypeDescriptor(type, null, false, false, false);
+ }
+
+ /**
+ * Descriptor for a reactive type that does not produce any values.
+ * @param type the reactive type
+ * @param emptySupplier a supplier of an empty-value instance of the reactive type
+ */
+ public static ReactiveTypeDescriptor noValue(Class> type, Supplier> emptySupplier) {
+ return new ReactiveTypeDescriptor(type, emptySupplier, false, true, true);
+ }
+
+}
diff --git a/spring-core/src/test/java/org/springframework/core/convert/support/ReactiveAdapterRegistryTests.java b/spring-core/src/test/java/org/springframework/core/convert/support/ReactiveAdapterRegistryTests.java
index 44c44f9d65f..6be82a63a17 100644
--- a/spring-core/src/test/java/org/springframework/core/convert/support/ReactiveAdapterRegistryTests.java
+++ b/spring-core/src/test/java/org/springframework/core/convert/support/ReactiveAdapterRegistryTests.java
@@ -15,6 +15,8 @@
*/
package org.springframework.core.convert.support;
+import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import io.reactivex.Flowable;
@@ -31,57 +33,211 @@ import rx.Single;
import org.springframework.core.ReactiveAdapter;
import org.springframework.core.ReactiveAdapterRegistry;
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
* Unit tests for {@link ReactiveAdapterRegistry}.
* @author Rossen Stoyanchev
*/
+@SuppressWarnings("unchecked")
public class ReactiveAdapterRegistryTests {
- private ReactiveAdapterRegistry adapterRegistry;
+ private ReactiveAdapterRegistry registry;
+
@Before
public void setUp() throws Exception {
- this.adapterRegistry = new ReactiveAdapterRegistry();
+ this.registry = new ReactiveAdapterRegistry();
}
+
@Test
public void getDefaultAdapters() throws Exception {
- testMonoAdapter(Mono.class);
- testFluxAdapter(Flux.class);
- testFluxAdapter(Publisher.class);
- testMonoAdapter(CompletableFuture.class);
- testFluxAdapter(Observable.class);
- testMonoAdapter(Single.class);
- testMonoAdapter(Completable.class);
- testFluxAdapter(Flowable.class);
- testFluxAdapter(io.reactivex.Observable.class);
- testMonoAdapter(io.reactivex.Single.class);
- testMonoAdapter(Maybe.class);
- testMonoAdapter(io.reactivex.Completable.class);
- }
-
- private void testFluxAdapter(Class> adapteeType) {
- ReactiveAdapter adapter = this.adapterRegistry.getAdapterFrom(adapteeType);
- assertNotNull(adapter);
- assertTrue(adapter.getDescriptor().isMultiValue());
-
- adapter = this.adapterRegistry.getAdapterTo(adapteeType);
- assertNotNull(adapter);
- assertTrue(adapter.getDescriptor().isMultiValue());
- }
-
- private void testMonoAdapter(Class> adapteeType) {
- ReactiveAdapter adapter = this.adapterRegistry.getAdapterFrom(adapteeType);
- assertNotNull(adapter);
- assertFalse(adapter.getDescriptor().isMultiValue());
-
- adapter = this.adapterRegistry.getAdapterTo(adapteeType);
- assertNotNull(adapter);
- assertFalse(adapter.getDescriptor().isMultiValue());
+
+ // Reactor
+ assertNotNull(getAdapterTo(Mono.class));
+ assertNotNull(getAdapterTo(Flux.class));
+
+ assertNotNull(getAdapterTo(Publisher.class));
+ assertNotNull(getAdapterTo(CompletableFuture.class));
+
+ // RxJava 1
+ assertNotNull(getAdapterTo(Observable.class));
+ assertNotNull(getAdapterTo(Single.class));
+ assertNotNull(getAdapterTo(Completable.class));
+
+ // RxJava 2
+ assertNotNull(getAdapterTo(Flowable.class));
+ assertNotNull(getAdapterTo(io.reactivex.Observable.class));
+ assertNotNull(getAdapterTo(io.reactivex.Single.class));
+ assertNotNull(getAdapterTo(Maybe.class));
+ assertNotNull(getAdapterTo(io.reactivex.Completable.class));
+ }
+
+ @Test
+ public void publisherToFlux() throws Exception {
+ List sequence = Arrays.asList(1, 2, 3);
+ Publisher source = Flowable.fromIterable(sequence);
+ Object target = getAdapterTo(Flux.class).fromPublisher(source);
+ assertTrue(target instanceof Flux);
+ assertEquals(sequence, ((Flux) target).collectList().blockMillis(1000));
+ }
+
+ // TODO: publisherToMono/CompletableFuture vs Single (ISE on multiple elements)?
+
+ @Test
+ public void publisherToMono() throws Exception {
+ Publisher source = Flowable.fromArray(1, 2, 3);
+ Object target = getAdapterTo(Mono.class).fromPublisher(source);
+ assertTrue(target instanceof Mono);
+ assertEquals(new Integer(1), ((Mono) target).blockMillis(1000));
+ }
+
+ @Test
+ public void publisherToCompletableFuture() throws Exception {
+ Publisher source = Flowable.fromArray(1, 2, 3);
+ Object target = getAdapterTo(CompletableFuture.class).fromPublisher(source);
+ assertTrue(target instanceof CompletableFuture);
+ assertEquals(new Integer(1), ((CompletableFuture) target).get());
+ }
+
+ @Test
+ public void publisherToRxObservable() throws Exception {
+ List sequence = Arrays.asList(1, 2, 3);
+ Publisher source = Flowable.fromIterable(sequence);
+ Object target = getAdapterTo(rx.Observable.class).fromPublisher(source);
+ assertTrue(target instanceof rx.Observable);
+ assertEquals(sequence, ((rx.Observable) target).toList().toBlocking().first());
+ }
+
+ @Test
+ public void publisherToRxSingle() throws Exception {
+ Publisher source = Flowable.fromArray(1);
+ Object target = getAdapterTo(rx.Single.class).fromPublisher(source);
+ assertTrue(target instanceof rx.Single);
+ assertEquals(new Integer(1), ((rx.Single) target).toBlocking().value());
+ }
+
+ @Test
+ public void publisherToRxCompletable() throws Exception {
+ Publisher source = Flowable.fromArray(1, 2, 3);
+ Object target = getAdapterTo(rx.Completable.class).fromPublisher(source);
+ assertTrue(target instanceof rx.Completable);
+ assertNull(((rx.Completable) target).get());
+ }
+
+ @Test
+ public void publisherToReactivexFlowable() throws Exception {
+ List sequence = Arrays.asList(1, 2, 3);
+ Publisher source = Flux.fromIterable(sequence);
+ Object target = getAdapterTo(io.reactivex.Flowable.class).fromPublisher(source);
+ assertTrue(target instanceof io.reactivex.Flowable);
+ assertEquals(sequence, ((io.reactivex.Flowable) target).toList().blockingGet());
+ }
+
+ @Test
+ public void publisherToReactivexObservable() throws Exception {
+ List sequence = Arrays.asList(1, 2, 3);
+ Publisher source = Flowable.fromIterable(sequence);
+ Object target = getAdapterTo(io.reactivex.Observable.class).fromPublisher(source);
+ assertTrue(target instanceof io.reactivex.Observable);
+ assertEquals(sequence, ((io.reactivex.Observable) target).toList().blockingGet());
+ }
+
+ @Test
+ public void publisherToReactivexSingle() throws Exception {
+ Publisher source = Flowable.fromArray(1);
+ Object target = getAdapterTo(io.reactivex.Single.class).fromPublisher(source);
+ assertTrue(target instanceof io.reactivex.Single);
+ assertEquals(new Integer(1), ((io.reactivex.Single) target).blockingGet());
+ }
+
+ @Test
+ public void publisherToReactivexCompletable() throws Exception {
+ Publisher source = Flowable.fromArray(1, 2, 3);
+ Object target = getAdapterTo(io.reactivex.Completable.class).fromPublisher(source);
+ assertTrue(target instanceof io.reactivex.Completable);
+ assertNull(((io.reactivex.Completable) target).blockingGet());
+ }
+
+ @Test
+ public void rxObservableToPublisher() throws Exception {
+ List sequence = Arrays.asList(1, 2, 3);
+ Object source = rx.Observable.from(sequence);
+ Object target = getAdapterFrom(rx.Observable.class).toPublisher(source);
+ assertTrue("Expected Flux Publisher: " + target.getClass().getName(), target instanceof Flux);
+ assertEquals(sequence, ((Flux) target).collectList().blockMillis(1000));
+ }
+
+ @Test
+ public void rxSingleToPublisher() throws Exception {
+ Object source = rx.Single.just(1);
+ Object target = getAdapterFrom(rx.Single.class).toPublisher(source);
+ assertTrue("Expected Mono Publisher: " + target.getClass().getName(), target instanceof Mono);
+ assertEquals(new Integer(1), ((Mono) target).blockMillis(1000));
+ }
+
+ @Test
+ public void rxCompletableToPublisher() throws Exception {
+ Object source = rx.Completable.complete();
+ Object target = getAdapterFrom(rx.Completable.class).toPublisher(source);
+ assertTrue("Expected Mono Publisher: " + target.getClass().getName(), target instanceof Mono);
+ ((Mono) target).blockMillis(1000);
+ }
+
+ @Test
+ public void reactivexFlowableToPublisher() throws Exception {
+ List sequence = Arrays.asList(1, 2, 3);
+ Object source = io.reactivex.Flowable.fromIterable(sequence);
+ Object target = getAdapterFrom(io.reactivex.Flowable.class).toPublisher(source);
+ assertTrue("Expected Flux Publisher: " + target.getClass().getName(), target instanceof Flux);
+ assertEquals(sequence, ((Flux) target).collectList().blockMillis(1000));
+ }
+
+ @Test
+ public void reactivexObservableToPublisher() throws Exception {
+ List sequence = Arrays.asList(1, 2, 3);
+ Object source = io.reactivex.Observable.fromIterable(sequence);
+ Object target = getAdapterFrom(io.reactivex.Observable.class).toPublisher(source);
+ assertTrue("Expected Flux Publisher: " + target.getClass().getName(), target instanceof Flux);
+ assertEquals(sequence, ((Flux) target).collectList().blockMillis(1000));
+ }
+
+ @Test
+ public void reactivexSingleToPublisher() throws Exception {
+ Object source = io.reactivex.Single.just(1);
+ Object target = getAdapterFrom(io.reactivex.Single.class).toPublisher(source);
+ assertTrue("Expected Mono Publisher: " + target.getClass().getName(), target instanceof Mono);
+ assertEquals(new Integer(1), ((Mono) target).blockMillis(1000));
+ }
+
+ @Test
+ public void reactivexCompletableToPublisher() throws Exception {
+ Object source = io.reactivex.Completable.complete();
+ Object target = getAdapterFrom(io.reactivex.Completable.class).toPublisher(source);
+ assertTrue("Expected Mono Publisher: " + target.getClass().getName(), target instanceof Mono);
+ ((Mono) target).blockMillis(1000);
+ }
+
+ @Test
+ public void CompletableFutureToPublisher() throws Exception {
+ CompletableFuture future = new CompletableFuture();
+ future.complete(1);
+ Object target = getAdapterFrom(CompletableFuture.class).toPublisher(future);
+ assertTrue("Expected Mono Publisher: " + target.getClass().getName(), target instanceof Mono);
+ assertEquals(new Integer(1), ((Mono) target).blockMillis(1000));
+ }
+
+
+ private ReactiveAdapter getAdapterTo(Class> reactiveType) {
+ return this.registry.getAdapterTo(reactiveType);
+ }
+
+ private ReactiveAdapter getAdapterFrom(Class> reactiveType) {
+ return this.registry.getAdapterFrom(reactiveType);
}
}
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/result/method/annotation/BindingContextFactory.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/result/method/annotation/BindingContextFactory.java
index 2ecbed21bc9..93609de3510 100644
--- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/result/method/annotation/BindingContextFactory.java
+++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/result/method/annotation/BindingContextFactory.java
@@ -149,7 +149,7 @@ class BindingContextFactory {
Class> valueType = (adapter != null ? type.resolveGeneric(0) : type.resolve());
if (Void.class.equals(valueType) || void.class.equals(valueType)) {
- return (adapter != null ? adapter.toMono(value) : Mono.empty());
+ return (adapter != null ? Mono.from(adapter.toPublisher(value)) : Mono.empty());
}
String name = getAttributeName(valueType, result.getReturnTypeSource());
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/result/method/annotation/ModelAttributeMethodArgumentResolver.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/result/method/annotation/ModelAttributeMethodArgumentResolver.java
index afff4e87fb1..ddd3229bccd 100644
--- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/result/method/annotation/ModelAttributeMethodArgumentResolver.java
+++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/result/method/annotation/ModelAttributeMethodArgumentResolver.java
@@ -25,8 +25,8 @@ import reactor.core.publisher.MonoProcessor;
import org.springframework.beans.BeanUtils;
import org.springframework.core.MethodParameter;
import org.springframework.core.ReactiveAdapter;
-import org.springframework.core.ReactiveAdapter.Descriptor;
import org.springframework.core.ReactiveAdapterRegistry;
+import org.springframework.core.ReactiveTypeDescriptor;
import org.springframework.core.ResolvableType;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.util.Assert;
@@ -107,7 +107,7 @@ public class ModelAttributeMethodArgumentResolver implements HandlerMethodArgume
Class> clazz = parameter.getParameterType();
ReactiveAdapter adapter = getAdapterRegistry().getAdapterFrom(clazz);
if (adapter != null) {
- Descriptor descriptor = adapter.getDescriptor();
+ ReactiveTypeDescriptor descriptor = adapter.getDescriptor();
if (descriptor.isNoValue() || descriptor.isMultiValue()) {
return false;
}
@@ -179,12 +179,15 @@ public class ModelAttributeMethodArgumentResolver implements HandlerMethodArgume
if (attribute != null) {
ReactiveAdapter adapterFrom = getAdapterRegistry().getAdapterFrom(null, attribute);
if (adapterFrom != null) {
- return adapterFrom.toMono(attribute);
+ ReactiveTypeDescriptor descriptor = adapterFrom.getDescriptor();
+ Assert.isTrue(!descriptor.isMultiValue(), "Data binding supports single-value async types.");
+ return Mono.from(adapterFrom.toPublisher(attribute));
}
}
return Mono.justOrEmpty(attribute);
}
+
protected Object createAttribute(String attributeName, Class> attributeType,
MethodParameter parameter, BindingContext context, ServerWebExchange exchange) {
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/result/method/annotation/ResponseEntityResultHandler.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/result/method/annotation/ResponseEntityResultHandler.java
index 5e291231118..b62ffea09ec 100644
--- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/result/method/annotation/ResponseEntityResultHandler.java
+++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/result/method/annotation/ResponseEntityResultHandler.java
@@ -25,6 +25,7 @@ import reactor.core.publisher.Mono;
import org.springframework.core.MethodParameter;
import org.springframework.core.ReactiveAdapter;
import org.springframework.core.ReactiveAdapterRegistry;
+import org.springframework.core.ReactiveTypeDescriptor;
import org.springframework.core.ResolvableType;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
@@ -120,7 +121,9 @@ public class ResponseEntityResultHandler extends AbstractMessageWriterResultHand
ReactiveAdapter adapter = getAdapterRegistry().getAdapterFrom(rawClass, optionalValue);
if (adapter != null) {
- returnValueMono = adapter.toMono(optionalValue);
+ ReactiveTypeDescriptor descriptor = adapter.getDescriptor();
+ Assert.isTrue(!descriptor.isMultiValue(), "Only a single ResponseEntity supported.");
+ returnValueMono = Mono.from(adapter.toPublisher(optionalValue));
bodyType = new MethodParameter(result.getReturnTypeSource());
bodyType.increaseNestingLevel();
bodyType.increaseNestingLevel();
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/result/view/ViewResolutionResultHandler.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/result/view/ViewResolutionResultHandler.java
index 71098648d3d..b0b53a355f6 100644
--- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/result/view/ViewResolutionResultHandler.java
+++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/result/view/ViewResolutionResultHandler.java
@@ -33,10 +33,12 @@ import org.springframework.core.MethodParameter;
import org.springframework.core.Ordered;
import org.springframework.core.ReactiveAdapter;
import org.springframework.core.ReactiveAdapterRegistry;
+import org.springframework.core.ReactiveTypeDescriptor;
import org.springframework.core.ResolvableType;
import org.springframework.core.annotation.AnnotationAwareOrderComparator;
import org.springframework.http.MediaType;
import org.springframework.ui.Model;
+import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.StringUtils;
import org.springframework.validation.BindingResult;
@@ -191,8 +193,10 @@ public class ViewResolutionResultHandler extends AbstractHandlerResultHandler
ReactiveAdapter adapter = getAdapterRegistry().getAdapterFrom(parameterType.getRawClass(), optional);
if (adapter != null) {
+ ReactiveTypeDescriptor descriptor = adapter.getDescriptor();
+ Assert.isTrue(!descriptor.isMultiValue(), "Only single-value async return type supported.");
returnValueMono = optional
- .map(value -> adapter.toMono(value).cast(Object.class))
+ .map(value -> Mono.from(adapter.toPublisher(value)))
.orElse(Mono.empty());
elementType = !adapter.getDescriptor().isNoValue() ?
parameterType.getGeneric(0) : ResolvableType.forClass(Void.class);
@@ -301,11 +305,11 @@ public class ViewResolutionResultHandler extends AbstractHandlerResultHandler
if (adapter != null) {
names.add(entry.getKey());
if (adapter.getDescriptor().isMultiValue()) {
- Flux value = adapter.toFlux(entry.getValue());
+ Flux value = Flux.from(adapter.toPublisher(entry.getValue()));
valueMonos.add(value.collectList().defaultIfEmpty(Collections.emptyList()));
}
else {
- Mono value = adapter.toMono(entry.getValue());
+ Mono value = Mono.from(adapter.toPublisher(entry.getValue()));
valueMonos.add(value.defaultIfEmpty(NO_VALUE));
}
}