Browse Source

Adapt to MonoProcessor deprecations in Reactor

This commit updates our usage of `MonoProcessor` after the deprecations
introduced in reactor/reactor-core#1053
pull/22834/head
Brian Clozel 6 years ago
parent
commit
7e159c346e
  1. 15
      spring-boot-project/spring-boot/src/test/java/org/springframework/boot/web/reactive/server/AbstractReactiveWebServerFactoryTests.java

15
spring-boot-project/spring-boot/src/test/java/org/springframework/boot/web/reactive/server/AbstractReactiveWebServerFactoryTests.java

@ -47,6 +47,7 @@ import org.junit.jupiter.api.AfterEach; @@ -47,6 +47,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import reactor.netty.NettyPipeline;
import reactor.netty.http.client.HttpClient;
import reactor.test.StepVerifier;
@ -507,7 +508,7 @@ public abstract class AbstractReactiveWebServerFactoryTests { @@ -507,7 +508,7 @@ public abstract class AbstractReactiveWebServerFactoryTests {
protected static class BlockingHandler implements HttpHandler {
private final BlockingQueue<MonoProcessor<Void>> monoProcessors = new ArrayBlockingQueue<>(10);
private final BlockingQueue<MonoProcessor<Void>> processors = new ArrayBlockingQueue<>(10);
private volatile boolean blocking = true;
@ -518,16 +519,16 @@ public abstract class AbstractReactiveWebServerFactoryTests { @@ -518,16 +519,16 @@ public abstract class AbstractReactiveWebServerFactoryTests {
@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
if (this.blocking) {
MonoProcessor<Void> completion = MonoProcessor.create();
this.monoProcessors.add(completion);
return completion.then(Mono.empty());
Sinks.One<Void> completion = Sinks.one();
this.processors.add(MonoProcessor.fromSink(completion));
return completion.asMono().then(Mono.empty());
}
return Mono.empty();
}
public void completeOne() {
try {
MonoProcessor<Void> processor = this.monoProcessors.take();
MonoProcessor<Void> processor = this.processors.take();
processor.onComplete();
}
catch (InterruptedException ex) {
@ -536,14 +537,14 @@ public abstract class AbstractReactiveWebServerFactoryTests { @@ -536,14 +537,14 @@ public abstract class AbstractReactiveWebServerFactoryTests {
}
public void awaitQueue() throws InterruptedException {
while (this.monoProcessors.isEmpty()) {
while (this.processors.isEmpty()) {
Thread.sleep(100);
}
}
public void stopBlocking() {
this.blocking = false;
this.monoProcessors.forEach(MonoProcessor::onComplete);
this.processors.forEach(MonoProcessor::onComplete);
}
}

Loading…
Cancel
Save