|
|
|
|
@ -338,6 +338,9 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
@@ -338,6 +338,9 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
|
|
|
|
|
|
|
|
|
|
private final WriteBarrier writeBarrier; |
|
|
|
|
|
|
|
|
|
@Nullable |
|
|
|
|
private Subscription subscription; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public WriteCompletionBarrier(CoreSubscriber<? super Void> subscriber, WriteBarrier writeBarrier) { |
|
|
|
|
this.completionSubscriber = subscriber; |
|
|
|
|
@ -357,6 +360,7 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
@@ -357,6 +360,7 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public void onSubscribe(Subscription subscription) { |
|
|
|
|
this.subscription = subscription; |
|
|
|
|
subscription.request(Long.MAX_VALUE); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -388,6 +392,10 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
@@ -388,6 +392,10 @@ public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
|
|
|
|
|
@Override |
|
|
|
|
public void cancel() { |
|
|
|
|
this.writeBarrier.cancel(); |
|
|
|
|
Subscription subscription = this.subscription; |
|
|
|
|
if (subscription != null) { |
|
|
|
|
subscription.cancel(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|