|
|
|
|
@ -133,21 +133,24 @@ class SubscriberInputStreamTests {
@@ -133,21 +133,24 @@ class SubscriberInputStreamTests {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
void cancel() throws InterruptedException, IOException { |
|
|
|
|
CountDownLatch latch = new CountDownLatch(1); |
|
|
|
|
void cancel() throws Exception { |
|
|
|
|
CountDownLatch latch1 = new CountDownLatch(1); |
|
|
|
|
CountDownLatch latch2 = new CountDownLatch(1); |
|
|
|
|
|
|
|
|
|
Flow.Publisher<byte[]> publisher = new OutputStreamPublisher<>( |
|
|
|
|
out -> { |
|
|
|
|
assertThatIOException().isThrownBy(() -> { |
|
|
|
|
out.write(FOO); |
|
|
|
|
out.flush(); |
|
|
|
|
out.write(BAR); |
|
|
|
|
out.flush(); |
|
|
|
|
out.write(BAZ); |
|
|
|
|
out.flush(); |
|
|
|
|
}).withMessage("Subscription has been terminated"); |
|
|
|
|
latch.countDown(); |
|
|
|
|
|
|
|
|
|
assertThatIOException() |
|
|
|
|
.isThrownBy(() -> { |
|
|
|
|
out.write(FOO); |
|
|
|
|
out.flush(); |
|
|
|
|
out.write(BAR); |
|
|
|
|
out.flush(); |
|
|
|
|
latch1.countDown(); |
|
|
|
|
out.write(BAZ); |
|
|
|
|
out.flush(); |
|
|
|
|
}) |
|
|
|
|
.withMessage("Subscription has been terminated"); |
|
|
|
|
latch2.countDown(); |
|
|
|
|
}, this.byteMapper, this.executor, null); |
|
|
|
|
|
|
|
|
|
List<byte[]> discarded = new ArrayList<>(); |
|
|
|
|
@ -158,10 +161,11 @@ class SubscriberInputStreamTests {
@@ -158,10 +161,11 @@ class SubscriberInputStreamTests {
|
|
|
|
|
|
|
|
|
|
assertThat(is.read(chunk)).isEqualTo(3); |
|
|
|
|
assertThat(chunk).containsExactly(FOO); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
latch.await(); |
|
|
|
|
latch1.await(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
latch2.await(); |
|
|
|
|
assertThat(discarded).containsExactly("bar".getBytes(UTF_8)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|