From 7e159c346e188e6e546ea33e78eeaed73b758e51 Mon Sep 17 00:00:00 2001 From: Brian Clozel Date: Fri, 7 Aug 2020 19:02:58 +0200 Subject: [PATCH] Adapt to MonoProcessor deprecations in Reactor This commit updates our usage of `MonoProcessor` after the deprecations introduced in reactor/reactor-core#1053 --- .../AbstractReactiveWebServerFactoryTests.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/spring-boot-project/spring-boot/src/test/java/org/springframework/boot/web/reactive/server/AbstractReactiveWebServerFactoryTests.java b/spring-boot-project/spring-boot/src/test/java/org/springframework/boot/web/reactive/server/AbstractReactiveWebServerFactoryTests.java index 463350ebb5a..df41705a74a 100644 --- a/spring-boot-project/spring-boot/src/test/java/org/springframework/boot/web/reactive/server/AbstractReactiveWebServerFactoryTests.java +++ b/spring-boot-project/spring-boot/src/test/java/org/springframework/boot/web/reactive/server/AbstractReactiveWebServerFactoryTests.java @@ -47,6 +47,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.Sinks; import reactor.netty.NettyPipeline; import reactor.netty.http.client.HttpClient; import reactor.test.StepVerifier; @@ -507,7 +508,7 @@ public abstract class AbstractReactiveWebServerFactoryTests { protected static class BlockingHandler implements HttpHandler { - private final BlockingQueue> monoProcessors = new ArrayBlockingQueue<>(10); + private final BlockingQueue> processors = new ArrayBlockingQueue<>(10); private volatile boolean blocking = true; @@ -518,16 +519,16 @@ public abstract class AbstractReactiveWebServerFactoryTests { @Override public Mono handle(ServerHttpRequest request, ServerHttpResponse response) { if (this.blocking) { - MonoProcessor completion = MonoProcessor.create(); - this.monoProcessors.add(completion); - return completion.then(Mono.empty()); + Sinks.One completion = Sinks.one(); + this.processors.add(MonoProcessor.fromSink(completion)); + return completion.asMono().then(Mono.empty()); } return Mono.empty(); } public void completeOne() { try { - MonoProcessor processor = this.monoProcessors.take(); + MonoProcessor processor = this.processors.take(); processor.onComplete(); } catch (InterruptedException ex) { @@ -536,14 +537,14 @@ public abstract class AbstractReactiveWebServerFactoryTests { } public void awaitQueue() throws InterruptedException { - while (this.monoProcessors.isEmpty()) { + while (this.processors.isEmpty()) { Thread.sleep(100); } } public void stopBlocking() { this.blocking = false; - this.monoProcessors.forEach(MonoProcessor::onComplete); + this.processors.forEach(MonoProcessor::onComplete); } }