diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java index ebea1c16371..fc20f17a420 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java @@ -394,7 +394,12 @@ public class ChannelSendOperator extends Mono implements Scannable { @Override public void onError(Throwable ex) { - this.completionSubscriber.onError(ex); + try { + this.completionSubscriber.onError(ex); + } + finally { + this.writeBarrier.releaseCachedItem(); + } } @Override diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/ChannelSendOperatorTests.java b/spring-web/src/test/java/org/springframework/http/server/reactive/ChannelSendOperatorTests.java index 7e2cd98cdc1..d1aa4fc9f94 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/ChannelSendOperatorTests.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/ChannelSendOperatorTests.java @@ -17,6 +17,7 @@ package org.springframework.http.server.reactive; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -33,6 +34,7 @@ import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Signal; +import reactor.test.StepVerifier; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.LeakAwareDataBufferFactory; @@ -156,7 +158,12 @@ public class ChannelSendOperatorTests { } @Test // gh-22720 - public void errorWhileItemCached() { + public void errorFromWriteSourceWhileItemCached() { + + // 1. First item received + // 2. writeFunction applied and writeCompletionBarrier subscribed to it + // 3. Write Publisher fails right after that and before request(n) from server + NettyDataBufferFactory delegate = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT); LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(delegate); ZeroDemandSubscriber writeSubscriber = new ZeroDemandSubscriber(); @@ -186,6 +193,30 @@ public class ChannelSendOperatorTests { bufferFactory.checkForLeaks(); } + @Test // gh-22720 + public void errorFromWriteFunctionWhileItemCached() { + + // 1. First item received + // 2. writeFunction applied and writeCompletionBarrier subscribed to it + // 3. writeFunction fails, e.g. to flush status and headers, before request(n) from server + + NettyDataBufferFactory delegate = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT); + LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(delegate); + + ChannelSendOperator operator = new ChannelSendOperator<>( + Flux.create(sink -> { + DataBuffer dataBuffer = bufferFactory.allocateBuffer(); + dataBuffer.write("foo", StandardCharsets.UTF_8); + sink.next(dataBuffer); + }), + publisher -> { + publisher.subscribe(new ZeroDemandSubscriber()); + return Mono.error(new IllegalStateException("err")); + }); + + StepVerifier.create(operator).expectErrorMessage("err").verify(Duration.ofSeconds(5)); + bufferFactory.checkForLeaks(); + } private Mono sendOperator(Publisher source){ return new ChannelSendOperator<>(source, writer::send);