|
|
|
|
@ -58,7 +58,7 @@ class ReactorClientHttpResponse implements ClientHttpResponse {
@@ -58,7 +58,7 @@ class ReactorClientHttpResponse implements ClientHttpResponse {
|
|
|
|
|
|
|
|
|
|
private final NettyDataBufferFactory bufferFactory; |
|
|
|
|
|
|
|
|
|
// 0 - not subscribed, 1 - subscribed, 2 - cancelled, 3 - cancelled via connector (before subscribe)
|
|
|
|
|
// 0 - not subscribed, 1 - subscribed, 2 - cancelled via connector (before subscribe)
|
|
|
|
|
private final AtomicInteger state = new AtomicInteger(0); |
|
|
|
|
|
|
|
|
|
private final String logPrefix; |
|
|
|
|
@ -100,20 +100,11 @@ class ReactorClientHttpResponse implements ClientHttpResponse {
@@ -100,20 +100,11 @@ class ReactorClientHttpResponse implements ClientHttpResponse {
|
|
|
|
|
if (this.state.compareAndSet(0, 1)) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// https://github.com/reactor/reactor-netty/issues/503
|
|
|
|
|
// FluxReceive rejects multiple subscribers, but not after a cancel().
|
|
|
|
|
// Subsequent subscribers after cancel() will not be rejected, but will hang instead.
|
|
|
|
|
// So we need to reject once in cancelled state.
|
|
|
|
|
if (this.state.get() == 2) { |
|
|
|
|
throw new IllegalStateException( |
|
|
|
|
"The client response body can only be consumed once."); |
|
|
|
|
} |
|
|
|
|
else if (this.state.get() == 3) { |
|
|
|
|
throw new IllegalStateException( |
|
|
|
|
"The client response body has been released already due to cancellation."); |
|
|
|
|
} |
|
|
|
|
}) |
|
|
|
|
.doOnCancel(() -> this.state.compareAndSet(1, 2)) |
|
|
|
|
.map(byteBuf -> { |
|
|
|
|
byteBuf.retain(); |
|
|
|
|
return this.bufferFactory.wrap(byteBuf); |
|
|
|
|
@ -159,7 +150,7 @@ class ReactorClientHttpResponse implements ClientHttpResponse {
@@ -159,7 +150,7 @@ class ReactorClientHttpResponse implements ClientHttpResponse {
|
|
|
|
|
* reading was delayed for some reason. |
|
|
|
|
*/ |
|
|
|
|
void releaseAfterCancel(HttpMethod method) { |
|
|
|
|
if (mayHaveBody(method) && this.state.compareAndSet(0, 3)) { |
|
|
|
|
if (mayHaveBody(method) && this.state.compareAndSet(0, 2)) { |
|
|
|
|
if (logger.isDebugEnabled()) { |
|
|
|
|
logger.debug(this.logPrefix + "Releasing body, not yet subscribed."); |
|
|
|
|
} |
|
|
|
|
|