|
|
|
@ -21,6 +21,7 @@ import java.time.Duration; |
|
|
|
import java.util.ArrayList; |
|
|
|
import java.util.ArrayList; |
|
|
|
import java.util.Arrays; |
|
|
|
import java.util.Arrays; |
|
|
|
import java.util.List; |
|
|
|
import java.util.List; |
|
|
|
|
|
|
|
import java.util.concurrent.CountDownLatch; |
|
|
|
import java.util.concurrent.Executors; |
|
|
|
import java.util.concurrent.Executors; |
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
|
|
|
|
|
|
|
|
@ -216,6 +217,44 @@ class ChannelSendOperatorTests { |
|
|
|
.verify(Duration.ofMillis(5000)); |
|
|
|
.verify(Duration.ofMillis(5000)); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
|
|
|
void writeFunctionIsNotInvokedUnderMonitorLock() { |
|
|
|
|
|
|
|
ChannelSendOperator<String> operator = new ChannelSendOperator<>( |
|
|
|
|
|
|
|
Mono.just("one"), |
|
|
|
|
|
|
|
publisher -> { |
|
|
|
|
|
|
|
CountDownLatch acquired = new CountDownLatch(1); |
|
|
|
|
|
|
|
Thread t = new Thread(() -> { |
|
|
|
|
|
|
|
synchronized (publisher) { |
|
|
|
|
|
|
|
acquired.countDown(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
}); |
|
|
|
|
|
|
|
t.start(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try { |
|
|
|
|
|
|
|
if (!acquired.await(1, TimeUnit.SECONDS)) { |
|
|
|
|
|
|
|
throw new IllegalStateException("writeFunction appears to be invoked under monitor lock"); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
catch (InterruptedException ex) { |
|
|
|
|
|
|
|
Thread.currentThread().interrupt(); |
|
|
|
|
|
|
|
throw new IllegalStateException(ex); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
finally { |
|
|
|
|
|
|
|
try { |
|
|
|
|
|
|
|
t.join(1_000); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
catch (InterruptedException ex) { |
|
|
|
|
|
|
|
Thread.currentThread().interrupt(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return Mono.empty(); |
|
|
|
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
StepVerifier.create(operator) |
|
|
|
|
|
|
|
.expectComplete() |
|
|
|
|
|
|
|
.verify(Duration.ofSeconds(2)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private Mono<Void> sendOperator(Publisher<String> source){ |
|
|
|
private Mono<Void> sendOperator(Publisher<String> source){ |
|
|
|
return new ChannelSendOperator<>(source, writer::send); |
|
|
|
return new ChannelSendOperator<>(source, writer::send); |
|
|
|
|