diff --git a/build.gradle b/build.gradle
index 15d4bf9cbec..433531dcef9 100644
--- a/build.gradle
+++ b/build.gradle
@@ -58,6 +58,7 @@ configure(allprojects) { project ->
dependency "io.reactivex:rxjava:1.3.8"
dependency "io.reactivex:rxjava-reactive-streams:1.2.1"
dependency "io.reactivex.rxjava2:rxjava:2.2.19"
+ dependency "io.reactivex.rxjava3:rxjava:3.0.3"
dependency "io.projectreactor.tools:blockhound:1.0.2.RELEASE"
dependency "com.caucho:hessian:4.0.62"
diff --git a/spring-core/spring-core.gradle b/spring-core/spring-core.gradle
index 5fb5767fd57..d656bb3e83f 100644
--- a/spring-core/spring-core.gradle
+++ b/spring-core/spring-core.gradle
@@ -52,6 +52,7 @@ dependencies {
optional("io.reactivex:rxjava")
optional("io.reactivex:rxjava-reactive-streams")
optional("io.reactivex.rxjava2:rxjava")
+ optional("io.reactivex.rxjava3:rxjava")
optional("io.netty:netty-buffer")
testCompile("io.projectreactor:reactor-test")
testCompile("javax.annotation:javax.annotation-api")
diff --git a/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java b/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java
index ab205a8f085..079e5e51c9f 100644
--- a/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java
+++ b/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java
@@ -24,8 +24,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
-import io.reactivex.BackpressureStrategy;
-import io.reactivex.Flowable;
import kotlinx.coroutines.CompletableDeferredKt;
import kotlinx.coroutines.Deferred;
import org.reactivestreams.Publisher;
@@ -46,8 +44,9 @@ import org.springframework.util.ReflectionUtils;
* {@code Observable}, and others.
*
*
By default, depending on classpath availability, adapters are registered
- * for Reactor, RxJava 1, RxJava 2 types, {@link CompletableFuture}, Java 9+
- * {@code Flow.Publisher} and Kotlin Coroutines {@code Deferred} and {@code Flow}.
+ * for Reactor, RxJava 2/3, or RxJava 1 (+ RxJava Reactive Streams bridge),
+ * {@link CompletableFuture}, Java 9+ {@code Flow.Publisher}, and Kotlin
+ * Coroutines' {@code Deferred} and {@code Flow}.
*
* @author Rossen Stoyanchev
* @author Sebastien Deleuze
@@ -89,6 +88,11 @@ public class ReactiveAdapterRegistry {
new RxJava2Registrar().registerAdapters(this);
}
+ // RxJava3
+ if (ClassUtils.isPresent("io.reactivex.rxjava3.core.Flowable", classLoader)) {
+ new RxJava3Registrar().registerAdapters(this);
+ }
+
// Java 9+ Flow.Publisher
if (ClassUtils.isPresent("java.util.concurrent.Flow.Publisher", classLoader)) {
new ReactorJdkFlowAdapterRegistrar().registerAdapter(this);
@@ -104,9 +108,7 @@ public class ReactiveAdapterRegistry {
/**
- * Whether the registry has any adapters which would be the case if any of
- * Reactor, RxJava 2, or RxJava 1 (+ RxJava Reactive Streams bridge) are
- * present on the classpath.
+ * Whether the registry has any adapters.
*/
public boolean hasAdapters() {
return !this.adapters.isEmpty();
@@ -254,31 +256,78 @@ public class ReactiveAdapterRegistry {
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(io.reactivex.Flowable.class, io.reactivex.Flowable::empty),
source -> (io.reactivex.Flowable>) source,
- Flowable::fromPublisher
+ io.reactivex.Flowable::fromPublisher
);
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(io.reactivex.Observable.class, io.reactivex.Observable::empty),
- source -> ((io.reactivex.Observable>) source).toFlowable(BackpressureStrategy.BUFFER),
- source -> io.reactivex.Flowable.fromPublisher(source).toObservable()
+ source -> ((io.reactivex.Observable>) source).toFlowable(io.reactivex.BackpressureStrategy.BUFFER),
+ source -> io.reactivex.Flowable.fromPublisher(source)
+ .toObservable()
);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleRequiredValue(io.reactivex.Single.class),
source -> ((io.reactivex.Single>) source).toFlowable(),
- source -> io.reactivex.Flowable.fromPublisher(source).toObservable().singleElement().toSingle()
+ source -> io.reactivex.Flowable.fromPublisher(source)
+ .toObservable().singleElement().toSingle()
);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(io.reactivex.Maybe.class, io.reactivex.Maybe::empty),
source -> ((io.reactivex.Maybe>) source).toFlowable(),
- source -> io.reactivex.Flowable.fromPublisher(source).toObservable().singleElement()
+ source -> io.reactivex.Flowable.fromPublisher(source)
+ .toObservable().singleElement()
);
registry.registerReactiveType(
ReactiveTypeDescriptor.noValue(io.reactivex.Completable.class, io.reactivex.Completable::complete),
source -> ((io.reactivex.Completable) source).toFlowable(),
- source -> io.reactivex.Flowable.fromPublisher(source).toObservable().ignoreElements()
+ source -> io.reactivex.Flowable.fromPublisher(source)
+ .toObservable().ignoreElements()
);
}
}
+ private static class RxJava3Registrar {
+
+ void registerAdapters(ReactiveAdapterRegistry registry) {
+ registry.registerReactiveType(
+ ReactiveTypeDescriptor.multiValue(
+ io.reactivex.rxjava3.core.Flowable.class,
+ io.reactivex.rxjava3.core.Flowable::empty),
+ source -> (io.reactivex.rxjava3.core.Flowable>) source,
+ io.reactivex.rxjava3.core.Flowable::fromPublisher
+ );
+ registry.registerReactiveType(
+ ReactiveTypeDescriptor.multiValue(
+ io.reactivex.rxjava3.core.Observable.class,
+ io.reactivex.rxjava3.core.Observable::empty),
+ source -> ((io.reactivex.rxjava3.core.Observable>) source).toFlowable(
+ io.reactivex.rxjava3.core.BackpressureStrategy.BUFFER),
+ source -> io.reactivex.rxjava3.core.Flowable.fromPublisher(source)
+ .toObservable()
+ );
+ registry.registerReactiveType(
+ ReactiveTypeDescriptor.singleRequiredValue(io.reactivex.rxjava3.core.Single.class),
+ source -> ((io.reactivex.rxjava3.core.Single>) source).toFlowable(),
+ source -> io.reactivex.rxjava3.core.Flowable.fromPublisher(source)
+ .toObservable().singleElement().toSingle()
+ );
+ registry.registerReactiveType(
+ ReactiveTypeDescriptor.singleOptionalValue(
+ io.reactivex.rxjava3.core.Maybe.class,
+ io.reactivex.rxjava3.core.Maybe::empty),
+ source -> ((io.reactivex.rxjava3.core.Maybe>) source).toFlowable(),
+ source -> io.reactivex.rxjava3.core.Flowable.fromPublisher(source)
+ .toObservable().singleElement()
+ );
+ registry.registerReactiveType(
+ ReactiveTypeDescriptor.noValue(
+ io.reactivex.rxjava3.core.Completable.class,
+ io.reactivex.rxjava3.core.Completable::complete),
+ source -> ((io.reactivex.rxjava3.core.Completable) source).toFlowable(),
+ source -> io.reactivex.rxjava3.core.Flowable.fromPublisher(source)
+ .toObservable().ignoreElements()
+ );
+ }
+ }
private static class ReactorJdkFlowAdapterRegistrar {
diff --git a/spring-core/src/test/java/org/springframework/core/ConventionsTests.java b/spring-core/src/test/java/org/springframework/core/ConventionsTests.java
index 3017ef3a55d..ada7e89b1ce 100644
--- a/spring-core/src/test/java/org/springframework/core/ConventionsTests.java
+++ b/spring-core/src/test/java/org/springframework/core/ConventionsTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2019 the original author or authors.
+ * Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,8 +23,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
-import io.reactivex.Observable;
-import io.reactivex.Single;
+import io.reactivex.rxjava3.core.Observable;
+import io.reactivex.rxjava3.core.Single;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
diff --git a/spring-core/src/test/java/org/springframework/core/ReactiveAdapterRegistryTests.java b/spring-core/src/test/java/org/springframework/core/ReactiveAdapterRegistryTests.java
index 8e8374fc583..1992cfae6c8 100644
--- a/spring-core/src/test/java/org/springframework/core/ReactiveAdapterRegistryTests.java
+++ b/spring-core/src/test/java/org/springframework/core/ReactiveAdapterRegistryTests.java
@@ -21,17 +21,13 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import io.reactivex.Flowable;
-import io.reactivex.Maybe;
import kotlinx.coroutines.Deferred;
+import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.Mono;
-import rx.Completable;
-import rx.Observable;
-import rx.Single;
import static org.assertj.core.api.Assertions.assertThat;
@@ -45,35 +41,6 @@ class ReactiveAdapterRegistryTests {
private final ReactiveAdapterRegistry registry = ReactiveAdapterRegistry.getSharedInstance();
- @Test
- void defaultAdapterRegistrations() {
-
- // Reactor
- assertThat(getAdapter(Mono.class)).isNotNull();
- assertThat(getAdapter(Flux.class)).isNotNull();
-
- // Publisher
- assertThat(getAdapter(Publisher.class)).isNotNull();
-
- // Completable
- assertThat(getAdapter(CompletableFuture.class)).isNotNull();
-
- // RxJava 1
- assertThat(getAdapter(Observable.class)).isNotNull();
- assertThat(getAdapter(Single.class)).isNotNull();
- assertThat(getAdapter(Completable.class)).isNotNull();
-
- // RxJava 2
- assertThat(getAdapter(Flowable.class)).isNotNull();
- assertThat(getAdapter(io.reactivex.Observable.class)).isNotNull();
- assertThat(getAdapter(io.reactivex.Single.class)).isNotNull();
- assertThat(getAdapter(Maybe.class)).isNotNull();
- assertThat(getAdapter(io.reactivex.Completable.class)).isNotNull();
-
- // Coroutines
- assertThat(getAdapter(Deferred.class)).isNotNull();
- }
-
@Test
void getAdapterForReactiveSubType() {
@@ -93,192 +60,301 @@ class ReactiveAdapterRegistryTests {
assertThat(adapter3).isNotSameAs(adapter1);
}
- @Test
- void publisherToFlux() {
- List sequence = Arrays.asList(1, 2, 3);
- Publisher source = Flowable.fromIterable(sequence);
- Object target = getAdapter(Flux.class).fromPublisher(source);
- boolean condition = target instanceof Flux;
- assertThat(condition).isTrue();
- assertThat(((Flux) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence);
+ @Nested
+ class Reactor {
+
+ @Test
+ void defaultAdapterRegistrations() {
+
+ // Reactor
+ assertThat(getAdapter(Mono.class)).isNotNull();
+ assertThat(getAdapter(Flux.class)).isNotNull();
+
+ // Publisher
+ assertThat(getAdapter(Publisher.class)).isNotNull();
+
+ // Completable
+ assertThat(getAdapter(CompletableFuture.class)).isNotNull();
+ }
+
+ @Test
+ void toFlux() {
+ List sequence = Arrays.asList(1, 2, 3);
+ Publisher source = io.reactivex.rxjava3.core.Flowable.fromIterable(sequence);
+ Object target = getAdapter(Flux.class).fromPublisher(source);
+ assertThat(target instanceof Flux).isTrue();
+ assertThat(((Flux) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence);
+ }
+
+ @Test
+ void toMono() {
+ Publisher source = io.reactivex.rxjava3.core.Flowable.fromArray(1, 2, 3);
+ Object target = getAdapter(Mono.class).fromPublisher(source);
+ assertThat(target instanceof Mono).isTrue();
+ assertThat(((Mono) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1));
+ }
+
+ @Test
+ void toCompletableFuture() throws Exception {
+ Publisher source = Flux.fromArray(new Integer[] {1, 2, 3});
+ Object target = getAdapter(CompletableFuture.class).fromPublisher(source);
+ assertThat(target instanceof CompletableFuture).isTrue();
+ assertThat(((CompletableFuture) target).get()).isEqualTo(Integer.valueOf(1));
+ }
+
+ @Test
+ void fromCompletableFuture() {
+ CompletableFuture future = new CompletableFuture<>();
+ future.complete(1);
+ Object target = getAdapter(CompletableFuture.class).toPublisher(future);
+ assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
+ assertThat(((Mono) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1));
+ }
}
- // TODO: publisherToMono/CompletableFuture vs Single (ISE on multiple elements)?
-
- @Test
- void publisherToMono() {
- Publisher source = Flowable.fromArray(1, 2, 3);
- Object target = getAdapter(Mono.class).fromPublisher(source);
- boolean condition = target instanceof Mono;
- assertThat(condition).isTrue();
- assertThat(((Mono) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1));
+ @Nested
+ class RxJava1 {
+
+ @Test
+ void defaultAdapterRegistrations() {
+ assertThat(getAdapter(rx.Observable.class)).isNotNull();
+ assertThat(getAdapter(rx.Single.class)).isNotNull();
+ assertThat(getAdapter(rx.Completable.class)).isNotNull();
+ }
+
+ @Test
+ void toObservable() {
+ List sequence = Arrays.asList(1, 2, 3);
+ Publisher source = Flux.fromIterable(sequence);
+ Object target = getAdapter(rx.Observable.class).fromPublisher(source);
+ assertThat(target instanceof rx.Observable).isTrue();
+ assertThat(((rx.Observable>) target).toList().toBlocking().first()).isEqualTo(sequence);
+ }
+
+ @Test
+ void toSingle() {
+ Publisher source = Flux.fromArray(new Integer[] {1});
+ Object target = getAdapter(rx.Single.class).fromPublisher(source);
+ assertThat(target instanceof rx.Single).isTrue();
+ assertThat(((rx.Single) target).toBlocking().value()).isEqualTo(Integer.valueOf(1));
+ }
+
+ @Test
+ void toCompletable() {
+ Publisher source = Flux.fromArray(new Integer[] {1, 2, 3});
+ Object target = getAdapter(rx.Completable.class).fromPublisher(source);
+ assertThat(target instanceof rx.Completable).isTrue();
+ assertThat(((rx.Completable) target).get()).isNull();
+ }
+
+ @Test
+ void fromObservable() {
+ List sequence = Arrays.asList(1, 2, 3);
+ Object source = rx.Observable.from(sequence);
+ Object target = getAdapter(rx.Observable.class).toPublisher(source);
+ assertThat(target instanceof Flux).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue();
+ assertThat(((Flux) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence);
+ }
+
+ @Test
+ void fromSingle() {
+ Object source = rx.Single.just(1);
+ Object target = getAdapter(rx.Single.class).toPublisher(source);
+ assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
+ assertThat(((Mono) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1));
+ }
+
+ @Test
+ void fromCompletable() {
+ Object source = rx.Completable.complete();
+ Object target = getAdapter(rx.Completable.class).toPublisher(source);
+ assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
+ ((Mono) target).block(Duration.ofMillis(1000));
+ }
}
- @Test
- void publisherToCompletableFuture() throws Exception {
- Publisher source = Flowable.fromArray(1, 2, 3);
- Object target = getAdapter(CompletableFuture.class).fromPublisher(source);
- boolean condition = target instanceof CompletableFuture;
- assertThat(condition).isTrue();
- assertThat(((CompletableFuture) target).get()).isEqualTo(Integer.valueOf(1));
+ @Nested
+ class RxJava2 {
+
+ @Test
+ void defaultAdapterRegistrations() {
+
+ // RxJava 2
+ assertThat(getAdapter(io.reactivex.Flowable.class)).isNotNull();
+ assertThat(getAdapter(io.reactivex.Observable.class)).isNotNull();
+ assertThat(getAdapter(io.reactivex.Single.class)).isNotNull();
+ assertThat(getAdapter(io.reactivex.Maybe.class)).isNotNull();
+ assertThat(getAdapter(io.reactivex.Completable.class)).isNotNull();
+ }
+
+ @Test
+ void toFlowable() {
+ List sequence = Arrays.asList(1, 2, 3);
+ Publisher source = Flux.fromIterable(sequence);
+ Object target = getAdapter(io.reactivex.Flowable.class).fromPublisher(source);
+ assertThat(target instanceof io.reactivex.Flowable).isTrue();
+ assertThat(((io.reactivex.Flowable>) target).toList().blockingGet()).isEqualTo(sequence);
+ }
+
+ @Test
+ void toObservable() {
+ List sequence = Arrays.asList(1, 2, 3);
+ Publisher source = Flux.fromIterable(sequence);
+ Object target = getAdapter(io.reactivex.Observable.class).fromPublisher(source);
+ assertThat(target instanceof io.reactivex.Observable).isTrue();
+ assertThat(((io.reactivex.Observable>) target).toList().blockingGet()).isEqualTo(sequence);
+ }
+
+ @Test
+ void toSingle() {
+ Publisher source = Flux.fromArray(new Integer[] {1});
+ Object target = getAdapter(io.reactivex.Single.class).fromPublisher(source);
+ assertThat(target instanceof io.reactivex.Single).isTrue();
+ assertThat(((io.reactivex.Single) target).blockingGet()).isEqualTo(Integer.valueOf(1));
+ }
+
+ @Test
+ void toCompletable() {
+ Publisher source = Flux.fromArray(new Integer[] {1, 2, 3});
+ Object target = getAdapter(io.reactivex.Completable.class).fromPublisher(source);
+ assertThat(target instanceof io.reactivex.Completable).isTrue();
+ ((io.reactivex.Completable) target).blockingAwait();
+ }
+
+ @Test
+ void fromFlowable() {
+ List sequence = Arrays.asList(1, 2, 3);
+ Object source = io.reactivex.Flowable.fromIterable(sequence);
+ Object target = getAdapter(io.reactivex.Flowable.class).toPublisher(source);
+ assertThat(target instanceof Flux).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue();
+ assertThat(((Flux) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence);
+ }
+
+ @Test
+ void fromObservable() {
+ List sequence = Arrays.asList(1, 2, 3);
+ Object source = io.reactivex.Observable.fromIterable(sequence);
+ Object target = getAdapter(io.reactivex.Observable.class).toPublisher(source);
+ assertThat(target instanceof Flux).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue();
+ assertThat(((Flux) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence);
+ }
+
+ @Test
+ void fromSingle() {
+ Object source = io.reactivex.Single.just(1);
+ Object target = getAdapter(io.reactivex.Single.class).toPublisher(source);
+ assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
+ assertThat(((Mono) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1));
+ }
+
+ @Test
+ void fromCompletable() {
+ Object source = io.reactivex.Completable.complete();
+ Object target = getAdapter(io.reactivex.Completable.class).toPublisher(source);
+ assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
+ ((Mono) target).block(Duration.ofMillis(1000));
+ }
}
- @Test
- void publisherToRxObservable() {
- List sequence = Arrays.asList(1, 2, 3);
- Publisher source = Flowable.fromIterable(sequence);
- Object target = getAdapter(rx.Observable.class).fromPublisher(source);
- boolean condition = target instanceof Observable;
- assertThat(condition).isTrue();
- assertThat(((Observable>) target).toList().toBlocking().first()).isEqualTo(sequence);
- }
-
- @Test
- void publisherToRxSingle() {
- Publisher source = Flowable.fromArray(1);
- Object target = getAdapter(rx.Single.class).fromPublisher(source);
- boolean condition = target instanceof Single;
- assertThat(condition).isTrue();
- assertThat(((Single) target).toBlocking().value()).isEqualTo(Integer.valueOf(1));
- }
-
- @Test
- void publisherToRxCompletable() {
- Publisher source = Flowable.fromArray(1, 2, 3);
- Object target = getAdapter(rx.Completable.class).fromPublisher(source);
- boolean condition = target instanceof Completable;
- assertThat(condition).isTrue();
- assertThat(((Completable) target).get()).isNull();
- }
-
- @Test
- void publisherToReactivexFlowable() {
- List sequence = Arrays.asList(1, 2, 3);
- Publisher source = Flux.fromIterable(sequence);
- Object target = getAdapter(io.reactivex.Flowable.class).fromPublisher(source);
- boolean condition = target instanceof Flowable;
- assertThat(condition).isTrue();
- assertThat(((Flowable>) target).toList().blockingGet()).isEqualTo(sequence);
- }
-
- @Test
- void publisherToReactivexObservable() {
- List sequence = Arrays.asList(1, 2, 3);
- Publisher source = Flowable.fromIterable(sequence);
- Object target = getAdapter(io.reactivex.Observable.class).fromPublisher(source);
- boolean condition = target instanceof io.reactivex.Observable;
- assertThat(condition).isTrue();
- assertThat(((io.reactivex.Observable>) target).toList().blockingGet()).isEqualTo(sequence);
- }
-
- @Test
- void publisherToReactivexSingle() {
- Publisher source = Flowable.fromArray(1);
- Object target = getAdapter(io.reactivex.Single.class).fromPublisher(source);
- boolean condition = target instanceof io.reactivex.Single;
- assertThat(condition).isTrue();
- assertThat(((io.reactivex.Single) target).blockingGet()).isEqualTo(Integer.valueOf(1));
+ @Nested
+ class RxJava3 {
+
+ @Test
+ void defaultAdapterRegistrations() {
+
+ // RxJava 3
+ assertThat(getAdapter(io.reactivex.rxjava3.core.Flowable.class)).isNotNull();
+ assertThat(getAdapter(io.reactivex.rxjava3.core.Observable.class)).isNotNull();
+ assertThat(getAdapter(io.reactivex.rxjava3.core.Single.class)).isNotNull();
+ assertThat(getAdapter(io.reactivex.rxjava3.core.Maybe.class)).isNotNull();
+ assertThat(getAdapter(io.reactivex.rxjava3.core.Completable.class)).isNotNull();
+ }
+
+ @Test
+ void toFlowable() {
+ List sequence = Arrays.asList(1, 2, 3);
+ Publisher source = Flux.fromIterable(sequence);
+ Object target = getAdapter(io.reactivex.rxjava3.core.Flowable.class).fromPublisher(source);
+ assertThat(target instanceof io.reactivex.rxjava3.core.Flowable).isTrue();
+ assertThat(((io.reactivex.rxjava3.core.Flowable>) target).toList().blockingGet()).isEqualTo(sequence);
+ }
+
+ @Test
+ void toObservable() {
+ List sequence = Arrays.asList(1, 2, 3);
+ Publisher source = Flux.fromIterable(sequence);
+ Object target = getAdapter(io.reactivex.rxjava3.core.Observable.class).fromPublisher(source);
+ assertThat(target instanceof io.reactivex.rxjava3.core.Observable).isTrue();
+ assertThat(((io.reactivex.rxjava3.core.Observable>) target).toList().blockingGet()).isEqualTo(sequence);
+ }
+
+ @Test
+ void toSingle() {
+ Publisher source = Flux.fromArray(new Integer[] {1});
+ Object target = getAdapter(io.reactivex.rxjava3.core.Single.class).fromPublisher(source);
+ assertThat(target instanceof io.reactivex.rxjava3.core.Single).isTrue();
+ assertThat(((io.reactivex.rxjava3.core.Single) target).blockingGet()).isEqualTo(Integer.valueOf(1));
+ }
+
+ @Test
+ void toCompletable() {
+ Publisher source = Flux.fromArray(new Integer[] {1, 2, 3});
+ Object target = getAdapter(io.reactivex.rxjava3.core.Completable.class).fromPublisher(source);
+ assertThat(target instanceof io.reactivex.rxjava3.core.Completable).isTrue();
+ ((io.reactivex.rxjava3.core.Completable) target).blockingAwait();
+ }
+
+ @Test
+ void fromFlowable() {
+ List sequence = Arrays.asList(1, 2, 3);
+ Object source = io.reactivex.rxjava3.core.Flowable.fromIterable(sequence);
+ Object target = getAdapter(io.reactivex.rxjava3.core.Flowable.class).toPublisher(source);
+ assertThat(target instanceof Flux).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue();
+ assertThat(((Flux) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence);
+ }
+
+ @Test
+ void fromObservable() {
+ List sequence = Arrays.asList(1, 2, 3);
+ Object source = io.reactivex.rxjava3.core.Observable.fromIterable(sequence);
+ Object target = getAdapter(io.reactivex.rxjava3.core.Observable.class).toPublisher(source);
+ assertThat(target instanceof Flux).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue();
+ assertThat(((Flux) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence);
+ }
+
+ @Test
+ void fromSingle() {
+ Object source = io.reactivex.rxjava3.core.Single.just(1);
+ Object target = getAdapter(io.reactivex.rxjava3.core.Single.class).toPublisher(source);
+ assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
+ assertThat(((Mono) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1));
+ }
+
+ @Test
+ void fromCompletable() {
+ Object source = io.reactivex.rxjava3.core.Completable.complete();
+ Object target = getAdapter(io.reactivex.rxjava3.core.Completable.class).toPublisher(source);
+ assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
+ ((Mono) target).block(Duration.ofMillis(1000));
+ }
}
- @Test
- void publisherToReactivexCompletable() {
- Publisher source = Flowable.fromArray(1, 2, 3);
- Object target = getAdapter(io.reactivex.Completable.class).fromPublisher(source);
- boolean condition = target instanceof io.reactivex.Completable;
- assertThat(condition).isTrue();
- ((io.reactivex.Completable) target).blockingAwait();
- }
-
- @Test
- void rxObservableToPublisher() {
- List sequence = Arrays.asList(1, 2, 3);
- Object source = rx.Observable.from(sequence);
- Object target = getAdapter(rx.Observable.class).toPublisher(source);
- boolean condition = target instanceof Flux;
- assertThat(condition).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue();
- assertThat(((Flux) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence);
- }
-
- @Test
- void rxSingleToPublisher() {
- Object source = rx.Single.just(1);
- Object target = getAdapter(rx.Single.class).toPublisher(source);
- boolean condition = target instanceof Mono;
- assertThat(condition).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
- assertThat(((Mono) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1));
- }
-
- @Test
- void rxCompletableToPublisher() {
- Object source = rx.Completable.complete();
- Object target = getAdapter(rx.Completable.class).toPublisher(source);
- boolean condition = target instanceof Mono;
- assertThat(condition).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
- ((Mono) target).block(Duration.ofMillis(1000));
- }
-
- @Test
- void reactivexFlowableToPublisher() {
- List sequence = Arrays.asList(1, 2, 3);
- Object source = io.reactivex.Flowable.fromIterable(sequence);
- Object target = getAdapter(io.reactivex.Flowable.class).toPublisher(source);
- boolean condition = target instanceof Flux;
- assertThat(condition).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue();
- assertThat(((Flux) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence);
- }
-
- @Test
- void reactivexObservableToPublisher() {
- List sequence = Arrays.asList(1, 2, 3);
- Object source = io.reactivex.Observable.fromIterable(sequence);
- Object target = getAdapter(io.reactivex.Observable.class).toPublisher(source);
- boolean condition = target instanceof Flux;
- assertThat(condition).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue();
- assertThat(((Flux) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence);
- }
-
- @Test
- void reactivexSingleToPublisher() {
- Object source = io.reactivex.Single.just(1);
- Object target = getAdapter(io.reactivex.Single.class).toPublisher(source);
- boolean condition = target instanceof Mono;
- assertThat(condition).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
- assertThat(((Mono) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1));
- }
-
- @Test
- void reactivexCompletableToPublisher() {
- Object source = io.reactivex.Completable.complete();
- Object target = getAdapter(io.reactivex.Completable.class).toPublisher(source);
- boolean condition = target instanceof Mono;
- assertThat(condition).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
- ((Mono) target).block(Duration.ofMillis(1000));
- }
-
- @Test
- void completableFutureToPublisher() {
- CompletableFuture future = new CompletableFuture<>();
- future.complete(1);
- Object target = getAdapter(CompletableFuture.class).toPublisher(future);
- boolean condition = target instanceof Mono;
- assertThat(condition).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
- assertThat(((Mono) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1));
- }
-
- @Test
- void deferred() {
- assertThat(getAdapter(CompletableFuture.class).getDescriptor().isDeferred()).isEqualTo(false);
+ @Nested
+ class Kotlin {
- assertThat(getAdapter(Mono.class).getDescriptor().isDeferred()).isEqualTo(true);
- assertThat(getAdapter(Flux.class).getDescriptor().isDeferred()).isEqualTo(true);
+ @Test
+ void defaultAdapterRegistrations() {
- assertThat(getAdapter(io.reactivex.Completable.class).getDescriptor().isDeferred()).isEqualTo(true);
- assertThat(getAdapter(io.reactivex.Single.class).getDescriptor().isDeferred()).isEqualTo(true);
- assertThat(getAdapter(io.reactivex.Flowable.class).getDescriptor().isDeferred()).isEqualTo(true);
- assertThat(getAdapter(io.reactivex.Observable.class).getDescriptor().isDeferred()).isEqualTo(true);
+ // Coroutines
+ assertThat(getAdapter(Deferred.class)).isNotNull();
+ }
- assertThat(getAdapter(Deferred.class).getDescriptor().isDeferred()).isEqualTo(true);
- assertThat(getAdapter(kotlinx.coroutines.flow.Flow.class).getDescriptor().isDeferred()).isEqualTo(true);
+ @Test
+ void deferred() {
+ assertThat(getAdapter(CompletableFuture.class).getDescriptor().isDeferred()).isEqualTo(false);
+ assertThat(getAdapter(Deferred.class).getDescriptor().isDeferred()).isEqualTo(true);
+ assertThat(getAdapter(kotlinx.coroutines.flow.Flow.class).getDescriptor().isDeferred()).isEqualTo(true);
+ }
}
private ReactiveAdapter getAdapter(Class> reactiveType) {
diff --git a/spring-messaging/spring-messaging.gradle b/spring-messaging/spring-messaging.gradle
index a2851000418..b1e9bae989e 100644
--- a/spring-messaging/spring-messaging.gradle
+++ b/spring-messaging/spring-messaging.gradle
@@ -26,7 +26,7 @@ dependencies {
testCompile("org.apache.activemq:activemq-kahadb-store")
testCompile("org.apache.activemq:activemq-stomp")
testCompile("io.projectreactor:reactor-test")
- testCompile "io.reactivex.rxjava2:rxjava"
+ testCompile "io.reactivex.rxjava3:rxjava"
testCompile("org.jetbrains.kotlin:kotlin-reflect")
testCompile("org.jetbrains.kotlin:kotlin-stdlib")
testCompile("org.xmlunit:xmlunit-assertj")
diff --git a/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/reactive/EncoderMethodReturnValueHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/reactive/EncoderMethodReturnValueHandlerTests.java
index c5e2e806ca8..4c6b122280d 100644
--- a/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/reactive/EncoderMethodReturnValueHandlerTests.java
+++ b/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/reactive/EncoderMethodReturnValueHandlerTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2019 the original author or authors.
+ * Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,7 +17,7 @@ package org.springframework.messaging.handler.invocation.reactive;
import java.util.Collections;
-import io.reactivex.Completable;
+import io.reactivex.rxjava3.core.Completable;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterTests.java
index 878f3776c76..54ee6c56ae1 100644
--- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterTests.java
+++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2019 the original author or authors.
+ * Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,9 +24,9 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
-import io.reactivex.Completable;
-import io.reactivex.Observable;
-import io.reactivex.Single;
+import io.reactivex.rxjava3.core.Completable;
+import io.reactivex.rxjava3.core.Observable;
+import io.reactivex.rxjava3.core.Single;
import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.metadata.WellKnownMimeType;
diff --git a/spring-web/spring-web.gradle b/spring-web/spring-web.gradle
index c16ccc80b7d..0ddcc57f9a1 100644
--- a/spring-web/spring-web.gradle
+++ b/spring-web/spring-web.gradle
@@ -18,9 +18,7 @@ dependencies {
optional("javax.xml.bind:jaxb-api")
optional("javax.xml.ws:jaxws-api")
optional("org.glassfish.main:javax.jws")
- optional("io.reactivex:rxjava")
- optional("io.reactivex:rxjava-reactive-streams")
- optional("io.reactivex.rxjava2:rxjava")
+ optional("io.reactivex.rxjava3:rxjava")
optional("io.netty:netty-buffer")
optional("io.netty:netty-handler")
optional("io.netty:netty-codec-http") // Until Netty4ClientHttpRequest is removed
diff --git a/spring-webflux/spring-webflux.gradle b/spring-webflux/spring-webflux.gradle
index 16f8fa93740..45c9eac256c 100644
--- a/spring-webflux/spring-webflux.gradle
+++ b/spring-webflux/spring-webflux.gradle
@@ -16,8 +16,6 @@ dependencies {
optional("org.freemarker:freemarker")
optional("com.fasterxml.jackson.core:jackson-databind")
optional("com.fasterxml.jackson.dataformat:jackson-dataformat-smile")
- optional("io.reactivex:rxjava")
- optional("io.reactivex:rxjava-reactive-streams")
optional("io.projectreactor.netty:reactor-netty")
optional("org.apache.tomcat:tomcat-websocket")
optional("org.eclipse.jetty.websocket:websocket-server") {
@@ -38,7 +36,7 @@ dependencies {
testCompile("com.fasterxml:aalto-xml")
testCompile("org.hibernate:hibernate-validator")
testCompile("javax.validation:validation-api")
- testCompile "io.reactivex.rxjava2:rxjava"
+ testCompile("io.reactivex.rxjava3:rxjava")
testCompile("io.projectreactor:reactor-test")
testCompile("io.undertow:undertow-core")
testCompile("org.apache.tomcat.embed:tomcat-embed-core")
diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/BodyInsertersTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/BodyInsertersTests.java
index d730fb31994..d0b8c1502ce 100644
--- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/BodyInsertersTests.java
+++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/BodyInsertersTests.java
@@ -30,7 +30,7 @@ import java.util.Map;
import java.util.Optional;
import com.fasterxml.jackson.annotation.JsonView;
-import io.reactivex.Single;
+import io.reactivex.rxjava3.core.Single;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/server/DefaultEntityResponseBuilderTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/server/DefaultEntityResponseBuilderTests.java
index 96ad5cc5cdd..b6bb38ebd54 100644
--- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/server/DefaultEntityResponseBuilderTests.java
+++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/server/DefaultEntityResponseBuilderTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2019 the original author or authors.
+ * Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,7 +24,7 @@ import java.util.EnumSet;
import java.util.List;
import java.util.Set;
-import io.reactivex.Single;
+import io.reactivex.rxjava3.core.Single;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/HttpEntityMethodArgumentResolverTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/HttpEntityMethodArgumentResolverTests.java
index f907bcb27f9..31f81a19062 100644
--- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/HttpEntityMethodArgumentResolverTests.java
+++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/HttpEntityMethodArgumentResolverTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2019 the original author or authors.
+ * Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,16 +21,15 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import io.reactivex.BackpressureStrategy;
-import io.reactivex.Flowable;
-import io.reactivex.Maybe;
+import io.reactivex.rxjava3.core.BackpressureStrategy;
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.Maybe;
+import io.reactivex.rxjava3.core.Observable;
+import io.reactivex.rxjava3.core.Single;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
-import rx.Observable;
-import rx.RxReactiveStreams;
-import rx.Single;
import org.springframework.core.MethodParameter;
import org.springframework.core.ReactiveAdapterRegistry;
@@ -80,12 +79,10 @@ public class HttpEntityMethodArgumentResolverTests {
testSupports(this.testMethod.arg(httpEntityType(String.class)));
testSupports(this.testMethod.arg(httpEntityType(Mono.class, String.class)));
testSupports(this.testMethod.arg(httpEntityType(Single.class, String.class)));
- testSupports(this.testMethod.arg(httpEntityType(io.reactivex.Single.class, String.class)));
testSupports(this.testMethod.arg(httpEntityType(Maybe.class, String.class)));
testSupports(this.testMethod.arg(httpEntityType(CompletableFuture.class, String.class)));
testSupports(this.testMethod.arg(httpEntityType(Flux.class, String.class)));
testSupports(this.testMethod.arg(httpEntityType(Observable.class, String.class)));
- testSupports(this.testMethod.arg(httpEntityType(io.reactivex.Observable.class, String.class)));
testSupports(this.testMethod.arg(httpEntityType(Flowable.class, String.class)));
testSupports(this.testMethod.arg(forClassWithGenerics(RequestEntity.class, String.class)));
}
@@ -132,17 +129,6 @@ public class HttpEntityMethodArgumentResolverTests {
ResolvableType type = httpEntityType(Single.class, String.class);
HttpEntity> entity = resolveValueWithEmptyBody(type);
- StepVerifier.create(RxReactiveStreams.toPublisher(entity.getBody()))
- .expectNextCount(0)
- .expectError(ServerWebInputException.class)
- .verify();
- }
-
- @Test
- public void emptyBodyWithRxJava2Single() {
- ResolvableType type = httpEntityType(io.reactivex.Single.class, String.class);
- HttpEntity> entity = resolveValueWithEmptyBody(type);
-
StepVerifier.create(entity.getBody().toFlowable())
.expectNextCount(0)
.expectError(ServerWebInputException.class)
@@ -150,7 +136,7 @@ public class HttpEntityMethodArgumentResolverTests {
}
@Test
- public void emptyBodyWithRxJava2Maybe() {
+ public void emptyBodyWithMaybe() {
ResolvableType type = httpEntityType(Maybe.class, String.class);
HttpEntity> entity = resolveValueWithEmptyBody(type);
@@ -165,17 +151,6 @@ public class HttpEntityMethodArgumentResolverTests {
ResolvableType type = httpEntityType(Observable.class, String.class);
HttpEntity> entity = resolveValueWithEmptyBody(type);
- StepVerifier.create(RxReactiveStreams.toPublisher(entity.getBody()))
- .expectNextCount(0)
- .expectComplete()
- .verify();
- }
-
- @Test
- public void emptyBodyWithRxJava2Observable() {
- ResolvableType type = httpEntityType(io.reactivex.Observable.class, String.class);
- HttpEntity> entity = resolveValueWithEmptyBody(type);
-
StepVerifier.create(entity.getBody().toFlowable(BackpressureStrategy.BUFFER))
.expectNextCount(0)
.expectComplete()
@@ -230,22 +205,12 @@ public class HttpEntityMethodArgumentResolverTests {
ResolvableType type = httpEntityType(Single.class, String.class);
HttpEntity> httpEntity = resolveValue(exchange, type);
- assertThat(httpEntity.getHeaders()).isEqualTo(exchange.getRequest().getHeaders());
- assertThat(httpEntity.getBody().toBlocking().value()).isEqualTo("line1");
- }
-
- @Test
- public void httpEntityWithRxJava2SingleBody() {
- ServerWebExchange exchange = postExchange("line1");
- ResolvableType type = httpEntityType(io.reactivex.Single.class, String.class);
- HttpEntity> httpEntity = resolveValue(exchange, type);
-
assertThat(httpEntity.getHeaders()).isEqualTo(exchange.getRequest().getHeaders());
assertThat(httpEntity.getBody().blockingGet()).isEqualTo("line1");
}
@Test
- public void httpEntityWithRxJava2MaybeBody() {
+ public void httpEntityWithMaybeBody() {
ServerWebExchange exchange = postExchange("line1");
ResolvableType type = httpEntityType(Maybe.class, String.class);
HttpEntity> httpEntity = resolveValue(exchange, type);
@@ -335,10 +300,8 @@ public class HttpEntityMethodArgumentResolverTests {
HttpEntity> monoBody,
HttpEntity> fluxBody,
HttpEntity> singleBody,
- HttpEntity> rxJava2SingleBody,
- HttpEntity> rxJava2MaybeBody,
+ HttpEntity> maybeBody,
HttpEntity> observableBody,
- HttpEntity> rxJava2ObservableBody,
HttpEntity> flowableBody,
HttpEntity> completableFutureBody,
RequestEntity requestEntity,
diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/MessageReaderArgumentResolverTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/MessageReaderArgumentResolverTests.java
index df9d4cb27e5..7d06e637b6b 100644
--- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/MessageReaderArgumentResolverTests.java
+++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/MessageReaderArgumentResolverTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2019 the original author or authors.
+ * Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -29,15 +29,15 @@ import java.util.concurrent.CompletableFuture;
import javax.xml.bind.annotation.XmlRootElement;
-import io.reactivex.Flowable;
-import io.reactivex.Maybe;
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.Maybe;
+import io.reactivex.rxjava3.core.Observable;
+import io.reactivex.rxjava3.core.Single;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
-import rx.Observable;
-import rx.Single;
import org.springframework.core.MethodParameter;
import org.springframework.core.ResolvableType;
@@ -140,21 +140,11 @@ public class MessageReaderArgumentResolverTests {
MethodParameter param = this.testMethod.arg(type);
Single single = resolveValue(param, body);
- assertThat(single.toBlocking().value()).isEqualTo(new TestBean("f1", "b1"));
- }
-
- @Test
- public void rxJava2SingleTestBean() throws Exception {
- String body = "{\"bar\":\"b1\",\"foo\":\"f1\"}";
- ResolvableType type = forClassWithGenerics(io.reactivex.Single.class, TestBean.class);
- MethodParameter param = this.testMethod.arg(type);
- io.reactivex.Single single = resolveValue(param, body);
-
assertThat(single.blockingGet()).isEqualTo(new TestBean("f1", "b1"));
}
@Test
- public void rxJava2MaybeTestBean() throws Exception {
+ public void maybeTestBean() throws Exception {
String body = "{\"bar\":\"b1\",\"foo\":\"f1\"}";
ResolvableType type = forClassWithGenerics(Maybe.class, TestBean.class);
MethodParameter param = this.testMethod.arg(type);
@@ -170,16 +160,6 @@ public class MessageReaderArgumentResolverTests {
MethodParameter param = this.testMethod.arg(type);
Observable> observable = resolveValue(param, body);
- assertThat(observable.toList().toBlocking().first()).isEqualTo(Arrays.asList(new TestBean("f1", "b1"), new TestBean("f2", "b2")));
- }
-
- @Test
- public void rxJava2ObservableTestBean() throws Exception {
- String body = "[{\"bar\":\"b1\",\"foo\":\"f1\"},{\"bar\":\"b2\",\"foo\":\"f2\"}]";
- ResolvableType type = forClassWithGenerics(io.reactivex.Observable.class, TestBean.class);
- MethodParameter param = this.testMethod.arg(type);
- io.reactivex.Observable> observable = resolveValue(param, body);
-
assertThat(observable.toList().blockingGet()).isEqualTo(Arrays.asList(new TestBean("f1", "b1"), new TestBean("f2", "b2")));
}
@@ -324,10 +304,8 @@ public class MessageReaderArgumentResolverTests {
@Validated Mono monoTestBean,
@Validated Flux fluxTestBean,
Single singleTestBean,
- io.reactivex.Single rxJava2SingleTestBean,
- Maybe rxJava2MaybeTestBean,
+ Maybe maybeTestBean,
Observable observableTestBean,
- io.reactivex.Observable rxJava2ObservableTestBean,
Flowable flowableTestBean,
CompletableFuture futureTestBean,
TestBean testBean,
diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/MessageWriterResultHandlerTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/MessageWriterResultHandlerTests.java
index 4940a3009e0..49d12463154 100644
--- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/MessageWriterResultHandlerTests.java
+++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/MessageWriterResultHandlerTests.java
@@ -27,13 +27,13 @@ import java.util.List;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeName;
-import io.reactivex.Flowable;
+import io.reactivex.rxjava3.core.Completable;
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.Observable;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
-import rx.Completable;
-import rx.Observable;
import org.springframework.core.MethodParameter;
import org.springframework.core.codec.ByteBufferEncoder;
@@ -117,11 +117,11 @@ public class MessageWriterResultHandlerTests {
testVoid(Completable.complete(), on(TestController.class).resolveReturnType(Completable.class));
testVoid(Observable.empty(), on(TestController.class).resolveReturnType(Observable.class, Void.class));
- MethodParameter type = on(TestController.class).resolveReturnType(io.reactivex.Completable.class);
- testVoid(io.reactivex.Completable.complete(), type);
+ MethodParameter type = on(TestController.class).resolveReturnType(Completable.class);
+ testVoid(Completable.complete(), type);
- type = on(TestController.class).resolveReturnType(io.reactivex.Observable.class, Void.class);
- testVoid(io.reactivex.Observable.empty(), type);
+ type = on(TestController.class).resolveReturnType(Observable.class, Void.class);
+ testVoid(Observable.empty(), type);
type = on(TestController.class).resolveReturnType(Flowable.class, Void.class);
testVoid(Flowable.empty(), type);
@@ -274,14 +274,10 @@ public class MessageWriterResultHandlerTests {
Completable completable() { return null; }
- io.reactivex.Completable rxJava2Completable() { return null; }
-
Flux fluxVoid() { return null; }
Observable observableVoid() { return null; }
- io.reactivex.Observable rxJava2ObservableVoid() { return null; }
-
Flowable flowableVoid() { return null; }
OutputStream outputStream() { return null; }
diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ModelAttributeMethodArgumentResolverTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ModelAttributeMethodArgumentResolverTests.java
index 22160e1f27e..cb8052d751d 100644
--- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ModelAttributeMethodArgumentResolverTests.java
+++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ModelAttributeMethodArgumentResolverTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2019 the original author or authors.
+ * Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,12 +21,11 @@ import java.time.Duration;
import java.util.Map;
import java.util.function.Function;
+import io.reactivex.rxjava3.core.Single;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
-import rx.RxReactiveStreams;
-import rx.Single;
import org.springframework.core.MethodParameter;
import org.springframework.core.ReactiveAdapterRegistry;
@@ -134,7 +133,7 @@ public class ModelAttributeMethodArgumentResolverTests {
testBindFoo("fooSingle", parameter, single -> {
boolean condition = single instanceof Single;
assertThat(condition).as(single.getClass().getName()).isTrue();
- Object value = ((Single>) single).toBlocking().value();
+ Object value = ((Single>) single).blockingGet();
assertThat(value.getClass()).isEqualTo(Foo.class);
return (Foo) value;
});
@@ -257,7 +256,7 @@ public class ModelAttributeMethodArgumentResolverTests {
assertThat(value).isNotNull();
boolean condition = value instanceof Single;
assertThat(condition).isTrue();
- return Mono.from(RxReactiveStreams.toPublisher((Single>) value));
+ return Mono.from(((Single>) value).toFlowable());
});
}
diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ModelInitializerTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ModelInitializerTests.java
index 90e07285be0..da47913c173 100644
--- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ModelInitializerTests.java
+++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ModelInitializerTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2019 the original author or authors.
+ * Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,10 +23,10 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import io.reactivex.rxjava3.core.Single;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
-import rx.Single;
import org.springframework.context.support.StaticApplicationContext;
import org.springframework.core.MethodIntrospector;
@@ -122,7 +122,7 @@ public class ModelInitializerTests {
assertThat(((Mono) value).block(TIMEOUT).getName()).isEqualTo("Mono Bean");
value = model.get("singleBean");
- assertThat(((Single) value).toBlocking().value().getName()).isEqualTo("Single Bean");
+ assertThat(((Single) value).blockingGet().getName()).isEqualTo("Single Bean");
value = model.get("voidMethodBean");
assertThat(((TestBean) value).getName()).isEqualTo("Void Method Bean");
diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/PrincipalMethodArgumentResolverTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/PrincipalMethodArgumentResolverTests.java
index a923b89e014..3f7fb53b9ee 100644
--- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/PrincipalMethodArgumentResolverTests.java
+++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/PrincipalMethodArgumentResolverTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2019 the original author or authors.
+ * Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,7 +18,7 @@ package org.springframework.web.reactive.result.method.annotation;
import java.security.Principal;
-import io.reactivex.Single;
+import io.reactivex.rxjava3.core.Single;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestAttributeMethodArgumentResolverTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestAttributeMethodArgumentResolverTests.java
index f66db9ab11d..27ba786d6e1 100644
--- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestAttributeMethodArgumentResolverTests.java
+++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestAttributeMethodArgumentResolverTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2019 the original author or authors.
+ * Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,7 +19,7 @@ package org.springframework.web.reactive.result.method.annotation;
import java.time.Duration;
import java.util.Optional;
-import io.reactivex.Single;
+import io.reactivex.rxjava3.core.Single;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestBodyMethodArgumentResolverTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestBodyMethodArgumentResolverTests.java
index 28d4755a29d..49f78fa70e5 100644
--- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestBodyMethodArgumentResolverTests.java
+++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestBodyMethodArgumentResolverTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2019 the original author or authors.
+ * Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,15 +22,15 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import io.reactivex.Maybe;
+import io.reactivex.rxjava3.core.BackpressureStrategy;
+import io.reactivex.rxjava3.core.Maybe;
+import io.reactivex.rxjava3.core.Observable;
+import io.reactivex.rxjava3.core.Single;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
-import rx.Observable;
-import rx.RxReactiveStreams;
-import rx.Single;
import org.springframework.core.MethodParameter;
import org.springframework.core.ReactiveAdapterRegistry;
@@ -150,14 +150,14 @@ public class RequestBodyMethodArgumentResolverTests {
public void emptyBodyWithSingle() {
MethodParameter param = this.testMethod.annot(requestBody()).arg(Single.class, String.class);
Single single = resolveValueWithEmptyBody(param);
- StepVerifier.create(RxReactiveStreams.toPublisher(single))
+ StepVerifier.create(single.toFlowable())
.expectNextCount(0)
.expectError(ServerWebInputException.class)
.verify();
param = this.testMethod.annot(requestBody().notRequired()).arg(Single.class, String.class);
single = resolveValueWithEmptyBody(param);
- StepVerifier.create(RxReactiveStreams.toPublisher(single))
+ StepVerifier.create(single.toFlowable())
.expectNextCount(0)
.expectError(ServerWebInputException.class)
.verify();
@@ -184,14 +184,14 @@ public class RequestBodyMethodArgumentResolverTests {
public void emptyBodyWithObservable() {
MethodParameter param = this.testMethod.annot(requestBody()).arg(Observable.class, String.class);
Observable observable = resolveValueWithEmptyBody(param);
- StepVerifier.create(RxReactiveStreams.toPublisher(observable))
+ StepVerifier.create(observable.toFlowable(BackpressureStrategy.BUFFER))
.expectNextCount(0)
.expectError(ServerWebInputException.class)
.verify();
param = this.testMethod.annot(requestBody().notRequired()).arg(Observable.class, String.class);
observable = resolveValueWithEmptyBody(param);
- StepVerifier.create(RxReactiveStreams.toPublisher(observable))
+ StepVerifier.create(observable.toFlowable(BackpressureStrategy.BUFFER))
.expectNextCount(0)
.expectComplete()
.verify();
@@ -248,19 +248,15 @@ public class RequestBodyMethodArgumentResolverTests {
@RequestBody Mono mono,
@RequestBody Flux flux,
@RequestBody Single single,
- @RequestBody io.reactivex.Single rxJava2Single,
- @RequestBody Maybe rxJava2Maybe,
- @RequestBody Observable obs,
- @RequestBody io.reactivex.Observable rxjava2Obs,
+ @RequestBody Maybe maybe,
+ @RequestBody Observable observable,
@RequestBody CompletableFuture future,
@RequestBody(required = false) String stringNotRequired,
@RequestBody(required = false) Mono monoNotRequired,
@RequestBody(required = false) Flux fluxNotRequired,
@RequestBody(required = false) Single singleNotRequired,
- @RequestBody(required = false) io.reactivex.Single rxJava2SingleNotRequired,
- @RequestBody(required = false) Maybe rxJava2MaybeNotRequired,
- @RequestBody(required = false) Observable obsNotRequired,
- @RequestBody(required = false) io.reactivex.Observable rxjava2ObsNotRequired,
+ @RequestBody(required = false) Maybe maybeNotRequired,
+ @RequestBody(required = false) Observable observableNotRequired,
@RequestBody(required = false) CompletableFuture futureNotRequired,
@RequestBody(required = false) Map, ?> mapNotRequired,
String notAnnotated) {}
diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingMessageConversionIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingMessageConversionIntegrationTests.java
index d967b9560a1..5faed552a50 100644
--- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingMessageConversionIntegrationTests.java
+++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingMessageConversionIntegrationTests.java
@@ -26,14 +26,14 @@ import java.util.concurrent.CompletableFuture;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
-import io.reactivex.Flowable;
-import io.reactivex.Maybe;
+import io.reactivex.rxjava3.core.Completable;
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.Maybe;
+import io.reactivex.rxjava3.core.Observable;
+import io.reactivex.rxjava3.core.Single;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import rx.Completable;
-import rx.Observable;
-import rx.Single;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
@@ -118,15 +118,7 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
startServer(httpServer);
String expected = "Hello!";
- assertThat(performGet("/raw-response/observable", new HttpHeaders(), String.class).getBody()).isEqualTo(expected);
- }
-
- @ParameterizedHttpServerTest
- public void byteBufferResponseBodyWithRxJava2Observable(HttpServer httpServer) throws Exception {
- startServer(httpServer);
-
- String expected = "Hello!";
- assertThat(performGet("/raw-response/rxjava2-observable",
+ assertThat(performGet("/raw-response/observable",
new HttpHeaders(), String.class).getBody()).isEqualTo(expected);
}
@@ -317,18 +309,10 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
}
@ParameterizedHttpServerTest
- public void personTransformWithRxJava2Single(HttpServer httpServer) throws Exception {
+ public void personTransformWithMaybe(HttpServer httpServer) throws Exception {
startServer(httpServer);
- assertThat(performPost("/person-transform/rxjava2-single", JSON, new Person("Robert"),
- JSON, Person.class).getBody()).isEqualTo(new Person("ROBERT"));
- }
-
- @ParameterizedHttpServerTest
- public void personTransformWithRxJava2Maybe(HttpServer httpServer) throws Exception {
- startServer(httpServer);
-
- assertThat(performPost("/person-transform/rxjava2-maybe", JSON, new Person("Robert"),
+ assertThat(performPost("/person-transform/maybe", JSON, new Person("Robert"),
JSON, Person.class).getBody()).isEqualTo(new Person("ROBERT"));
}
@@ -359,15 +343,6 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
assertThat(performPost("/person-transform/observable", JSON, req, JSON, PERSON_LIST).getBody()).isEqualTo(res);
}
- @ParameterizedHttpServerTest
- public void personTransformWithRxJava2Observable(HttpServer httpServer) throws Exception {
- startServer(httpServer);
-
- List> req = asList(new Person("Robert"), new Person("Marie"));
- List> res = asList(new Person("ROBERT"), new Person("MARIE"));
- assertThat(performPost("/person-transform/rxjava2-observable", JSON, req, JSON, PERSON_LIST).getBody()).isEqualTo(res);
- }
-
@ParameterizedHttpServerTest
public void personTransformWithFlowable(HttpServer httpServer) throws Exception {
startServer(httpServer);
@@ -421,17 +396,6 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
assertThat(getApplicationContext().getBean(PersonCreateController.class).persons.size()).isEqualTo(1);
}
- @ParameterizedHttpServerTest
- public void personCreateWithRxJava2Single(HttpServer httpServer) throws Exception {
- startServer(httpServer);
-
- ResponseEntity entity = performPost(
- "/person-create/rxjava2-single", JSON, new Person("Robert"), null, Void.class);
-
- assertThat(entity.getStatusCode()).isEqualTo(HttpStatus.OK);
- assertThat(getApplicationContext().getBean(PersonCreateController.class).persons.size()).isEqualTo(1);
- }
-
@ParameterizedHttpServerTest
public void personCreateWithFluxJson(HttpServer httpServer) throws Exception {
startServer(httpServer);
@@ -465,17 +429,6 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
assertThat(getApplicationContext().getBean(PersonCreateController.class).persons.size()).isEqualTo(2);
}
- @ParameterizedHttpServerTest
- public void personCreateWithRxJava2ObservableJson(HttpServer httpServer) throws Exception {
- startServer(httpServer);
-
- ResponseEntity entity = performPost("/person-create/rxjava2-observable", JSON,
- asList(new Person("Robert"), new Person("Marie")), null, Void.class);
-
- assertThat(entity.getStatusCode()).isEqualTo(HttpStatus.OK);
- assertThat(getApplicationContext().getBean(PersonCreateController.class).persons.size()).isEqualTo(2);
- }
-
@ParameterizedHttpServerTest
public void personCreateWithObservableXml(HttpServer httpServer) throws Exception {
startServer(httpServer);
@@ -487,18 +440,6 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
assertThat(getApplicationContext().getBean(PersonCreateController.class).persons.size()).isEqualTo(2);
}
- @ParameterizedHttpServerTest
- public void personCreateWithRxJava2ObservableXml(HttpServer httpServer) throws Exception {
- startServer(httpServer);
-
- People people = new People(new Person("Robert"), new Person("Marie"));
- String url = "/person-create/rxjava2-observable";
- ResponseEntity response = performPost(url, APPLICATION_XML, people, null, Void.class);
-
- assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
- assertThat(getApplicationContext().getBean(PersonCreateController.class).persons.size()).isEqualTo(2);
- }
-
@ParameterizedHttpServerTest
public void personCreateWithFlowableJson(HttpServer httpServer) throws Exception {
startServer(httpServer);
@@ -567,11 +508,6 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
return Observable.just(ByteBuffer.wrap("Hello!".getBytes()));
}
- @GetMapping("/rxjava2-observable")
- public io.reactivex.Observable getRxJava2Observable() {
- return io.reactivex.Observable.just(ByteBuffer.wrap("Hello!".getBytes()));
- }
-
@GetMapping("/flowable")
public Flowable getFlowable() {
return Flowable.just(ByteBuffer.wrap("Hello!".getBytes()));
@@ -670,9 +606,8 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
}
@PostMapping("/completable-future")
- public CompletableFuture transformCompletableFuture(
- @RequestBody CompletableFuture personFuture) {
- return personFuture.thenApply(person -> new Person(person.getName().toUpperCase()));
+ public CompletableFuture transformCompletableFuture(@RequestBody CompletableFuture future) {
+ return future.thenApply(person -> new Person(person.getName().toUpperCase()));
}
@PostMapping("/mono")
@@ -685,21 +620,14 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
return personFuture.map(person -> new Person(person.getName().toUpperCase()));
}
- @PostMapping("/rxjava2-single")
- public io.reactivex.Single transformRxJava2Single(@RequestBody io.reactivex.Single personFuture) {
- return personFuture.map(person -> new Person(person.getName().toUpperCase()));
- }
-
- @PostMapping("/rxjava2-maybe")
- public Maybe transformRxJava2Maybe(@RequestBody Maybe personFuture) {
+ @PostMapping("/maybe")
+ public Maybe transformMaybe(@RequestBody Maybe personFuture) {
return personFuture.map(person -> new Person(person.getName().toUpperCase()));
}
@PostMapping("/publisher")
public Publisher transformPublisher(@RequestBody Publisher persons) {
- return Flux
- .from(persons)
- .map(person -> new Person(person.getName().toUpperCase()));
+ return Flux.from(persons).map(person -> new Person(person.getName().toUpperCase()));
}
@PostMapping("/flux")
@@ -712,11 +640,6 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
return persons.map(person -> new Person(person.getName().toUpperCase()));
}
- @PostMapping("/rxjava2-observable")
- public io.reactivex.Observable transformObservable(@RequestBody io.reactivex.Observable persons) {
- return persons.map(person -> new Person(person.getName().toUpperCase()));
- }
-
@PostMapping("/flowable")
public Flowable transformFlowable(@RequestBody Flowable persons) {
return persons.map(person -> new Person(person.getName().toUpperCase()));
@@ -743,11 +666,6 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
@PostMapping("/single")
public Completable createWithSingle(@RequestBody Single single) {
- return single.map(persons::add).toCompletable();
- }
-
- @PostMapping("/rxjava2-single")
- public io.reactivex.Completable createWithRxJava2Single(@RequestBody io.reactivex.Single single) {
return single.map(persons::add).ignoreElement();
}
@@ -757,19 +675,12 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
}
@PostMapping("/observable")
- public Observable createWithObservable(@RequestBody Observable observable) {
- return observable.toList().doOnNext(persons::addAll).flatMap(document -> Observable.empty());
- }
-
- @PostMapping("/rxjava2-observable")
- public io.reactivex.Completable createWithRxJava2Observable(
- @RequestBody io.reactivex.Observable observable) {
-
+ public Completable createWithObservable(@RequestBody Observable observable) {
return observable.toList().doOnSuccess(persons::addAll).ignoreElement();
}
@PostMapping("/flowable")
- public io.reactivex.Completable createWithFlowable(@RequestBody Flowable flowable) {
+ public Completable createWithFlowable(@RequestBody Flowable flowable) {
return flowable.toList().doOnSuccess(persons::addAll).ignoreElement();
}
}
diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ResponseBodyResultHandlerTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ResponseBodyResultHandlerTests.java
index 96fd0a3c076..ad5e8f50cd3 100644
--- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ResponseBodyResultHandlerTests.java
+++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ResponseBodyResultHandlerTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2019 the original author or authors.
+ * Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,11 +20,11 @@ import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
+import io.reactivex.rxjava3.core.Completable;
+import io.reactivex.rxjava3.core.Single;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
-import rx.Completable;
-import rx.Single;
import org.springframework.core.codec.ByteBufferEncoder;
import org.springframework.core.codec.CharSequenceEncoder;
diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ResponseEntityResultHandlerTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ResponseEntityResultHandlerTests.java
index 7d27cc19fe4..c215ee7a77a 100644
--- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ResponseEntityResultHandlerTests.java
+++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ResponseEntityResultHandlerTests.java
@@ -28,13 +28,13 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import io.reactivex.rxjava3.core.Completable;
+import io.reactivex.rxjava3.core.Single;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
-import rx.Completable;
-import rx.Single;
import org.springframework.core.MethodParameter;
import org.springframework.core.ResolvableType;
diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/WebSessionMethodArgumentResolverTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/WebSessionMethodArgumentResolverTests.java
index 96796f93947..c304f5c36e3 100644
--- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/WebSessionMethodArgumentResolverTests.java
+++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/WebSessionMethodArgumentResolverTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2019 the original author or authors.
+ * Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -15,7 +15,7 @@
*/
package org.springframework.web.reactive.result.method.annotation;
-import io.reactivex.Single;
+import io.reactivex.rxjava3.core.Single;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/view/AbstractViewTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/view/AbstractViewTests.java
index 0d494966867..94896dd84ae 100644
--- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/view/AbstractViewTests.java
+++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/view/AbstractViewTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2019 the original author or authors.
+ * Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,8 +21,8 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
-import io.reactivex.Observable;
-import io.reactivex.Single;
+import io.reactivex.rxjava3.core.Observable;
+import io.reactivex.rxjava3.core.Single;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/view/ViewResolutionResultHandlerTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/view/ViewResolutionResultHandlerTests.java
index 015c6b2b666..3c8e368ac45 100644
--- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/view/ViewResolutionResultHandlerTests.java
+++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/view/ViewResolutionResultHandlerTests.java
@@ -26,11 +26,11 @@ import java.util.Locale;
import java.util.Map;
import java.util.TreeMap;
+import io.reactivex.rxjava3.core.Completable;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
-import rx.Completable;
import org.springframework.context.support.StaticApplicationContext;
import org.springframework.core.MethodParameter;
diff --git a/spring-webflux/src/test/kotlin/org/springframework/web/reactive/function/server/ServerResponseExtensionsTests.kt b/spring-webflux/src/test/kotlin/org/springframework/web/reactive/function/server/ServerResponseExtensionsTests.kt
index 0d6dead2155..a4455cf1c11 100644
--- a/spring-webflux/src/test/kotlin/org/springframework/web/reactive/function/server/ServerResponseExtensionsTests.kt
+++ b/spring-webflux/src/test/kotlin/org/springframework/web/reactive/function/server/ServerResponseExtensionsTests.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2019 the original author or authors.
+ * Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,7 +19,7 @@ package org.springframework.web.reactive.function.server
import io.mockk.every
import io.mockk.mockk
import io.mockk.verify
-import io.reactivex.Flowable
+import io.reactivex.rxjava3.core.Flowable
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.runBlocking
import org.assertj.core.api.Assertions.assertThat
diff --git a/spring-webmvc/spring-webmvc.gradle b/spring-webmvc/spring-webmvc.gradle
index 2d9f18fe2a5..61b1d4dbbdb 100644
--- a/spring-webmvc/spring-webmvc.gradle
+++ b/spring-webmvc/spring-webmvc.gradle
@@ -59,9 +59,7 @@ dependencies {
testCompile("org.hibernate:hibernate-validator")
testCompile("javax.validation:validation-api")
testCompile("io.projectreactor:reactor-core")
- testCompile("io.reactivex:rxjava")
- testCompile("io.reactivex:rxjava-reactive-streams")
- testCompile("io.reactivex.rxjava2:rxjava")
+ testCompile("io.reactivex.rxjava3:rxjava")
testCompile("org.jetbrains.kotlin:kotlin-script-runtime")
testRuntime("org.jetbrains.kotlin:kotlin-scripting-jsr223-embeddable")
testRuntime("org.jruby:jruby")
diff --git a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java
index e4f230e95eb..6b663551da7 100644
--- a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java
+++ b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2019 the original author or authors.
+ * Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -26,14 +26,14 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
+import io.reactivex.rxjava3.core.Single;
+import io.reactivex.rxjava3.core.SingleEmitter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
-import rx.Single;
-import rx.SingleEmitter;
import org.springframework.core.MethodParameter;
import org.springframework.core.ReactiveAdapterRegistry;
@@ -98,7 +98,6 @@ public class ReactiveTypeHandlerTests {
public void supportsType() throws Exception {
assertThat(this.handler.isReactiveType(Mono.class)).isTrue();
assertThat(this.handler.isReactiveType(Single.class)).isTrue();
- assertThat(this.handler.isReactiveType(io.reactivex.Single.class)).isTrue();
}
@Test
@@ -117,16 +116,10 @@ public class ReactiveTypeHandlerTests {
MonoProcessor monoEmpty = MonoProcessor.create();
testDeferredResultSubscriber(monoEmpty, Mono.class, forClass(String.class), monoEmpty::onComplete, null);
- // RxJava 1 Single
- AtomicReference> ref = new AtomicReference<>();
- Single single = Single.fromEmitter(ref::set);
- testDeferredResultSubscriber(single, Single.class, forClass(String.class),
- () -> ref.get().onSuccess("foo"), "foo");
-
- // RxJava 2 Single
- AtomicReference> ref2 = new AtomicReference<>();
- io.reactivex.Single single2 = io.reactivex.Single.create(ref2::set);
- testDeferredResultSubscriber(single2, io.reactivex.Single.class, forClass(String.class),
+ // RxJava Single
+ AtomicReference> ref2 = new AtomicReference<>();
+ Single single2 = Single.create(ref2::set);
+ testDeferredResultSubscriber(single2, Single.class, forClass(String.class),
() -> ref2.get().onSuccess("foo"), "foo");
}
@@ -162,15 +155,10 @@ public class ReactiveTypeHandlerTests {
MonoProcessor mono = MonoProcessor.create();
testDeferredResultSubscriber(mono, Mono.class, forClass(String.class), () -> mono.onError(ex), ex);
- // RxJava 1 Single
- AtomicReference> ref = new AtomicReference<>();
- Single single = Single.fromEmitter(ref::set);
- testDeferredResultSubscriber(single, Single.class, forClass(String.class), () -> ref.get().onError(ex), ex);
-
- // RxJava 2 Single
- AtomicReference> ref2 = new AtomicReference<>();
- io.reactivex.Single single2 = io.reactivex.Single.create(ref2::set);
- testDeferredResultSubscriber(single2, io.reactivex.Single.class, forClass(String.class),
+ // RxJava Single
+ AtomicReference> ref2 = new AtomicReference<>();
+ Single single2 = Single.create(ref2::set);
+ testDeferredResultSubscriber(single2, Single.class, forClass(String.class),
() -> ref2.get().onError(ex), ex);
}
@@ -343,8 +331,6 @@ public class ReactiveTypeHandlerTests {
Single handleSingle() { return null; }
- io.reactivex.Single handleSingleRxJava2() { return null; }
-
Flux handleFlux() { return null; }
Flux handleFluxString() { return null; }