From 9bd0ec33f808cd2aa55048e535a9e8775f524dc6 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Tue, 2 Apr 2019 16:42:52 +0300 Subject: [PATCH 1/2] Release cached item in ChannelSendOperator when server error Related to gh-22720 --- .../http/server/reactive/ChannelSendOperator.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java index ebea1c16371..fc20f17a420 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java @@ -394,7 +394,12 @@ public class ChannelSendOperator extends Mono implements Scannable { @Override public void onError(Throwable ex) { - this.completionSubscriber.onError(ex); + try { + this.completionSubscriber.onError(ex); + } + finally { + this.writeBarrier.releaseCachedItem(); + } } @Override From 4c08863776df9cf6da99b871ba1b5766a4772948 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 2 Apr 2019 11:00:58 -0400 Subject: [PATCH 2/2] Add test case for writeFunction error signal See gh-22720 --- .../reactive/ChannelSendOperatorTests.java | 33 ++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/ChannelSendOperatorTests.java b/spring-web/src/test/java/org/springframework/http/server/reactive/ChannelSendOperatorTests.java index 7e2cd98cdc1..d1aa4fc9f94 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/ChannelSendOperatorTests.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/ChannelSendOperatorTests.java @@ -17,6 +17,7 @@ package org.springframework.http.server.reactive; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -33,6 +34,7 @@ import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Signal; +import reactor.test.StepVerifier; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.LeakAwareDataBufferFactory; @@ -156,7 +158,12 @@ public class ChannelSendOperatorTests { } @Test // gh-22720 - public void errorWhileItemCached() { + public void errorFromWriteSourceWhileItemCached() { + + // 1. First item received + // 2. writeFunction applied and writeCompletionBarrier subscribed to it + // 3. Write Publisher fails right after that and before request(n) from server + NettyDataBufferFactory delegate = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT); LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(delegate); ZeroDemandSubscriber writeSubscriber = new ZeroDemandSubscriber(); @@ -186,6 +193,30 @@ public class ChannelSendOperatorTests { bufferFactory.checkForLeaks(); } + @Test // gh-22720 + public void errorFromWriteFunctionWhileItemCached() { + + // 1. First item received + // 2. writeFunction applied and writeCompletionBarrier subscribed to it + // 3. writeFunction fails, e.g. to flush status and headers, before request(n) from server + + NettyDataBufferFactory delegate = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT); + LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(delegate); + + ChannelSendOperator operator = new ChannelSendOperator<>( + Flux.create(sink -> { + DataBuffer dataBuffer = bufferFactory.allocateBuffer(); + dataBuffer.write("foo", StandardCharsets.UTF_8); + sink.next(dataBuffer); + }), + publisher -> { + publisher.subscribe(new ZeroDemandSubscriber()); + return Mono.error(new IllegalStateException("err")); + }); + + StepVerifier.create(operator).expectErrorMessage("err").verify(Duration.ofSeconds(5)); + bufferFactory.checkForLeaks(); + } private Mono sendOperator(Publisher source){ return new ChannelSendOperator<>(source, writer::send);