Browse Source

Additional fixes for discarding data buffers

Closes gh-26232
pull/26254/head
Rossen Stoyanchev 5 years ago
parent
commit
25101fb034
  1. 3
      spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java
  2. 30
      spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java

3
spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java

@ -132,7 +132,8 @@ public class EncoderHttpMessageWriter<T> implements HttpMessageWriter<T> {
message.getHeaders().setContentLength(buffer.readableByteCount()); message.getHeaders().setContentLength(buffer.readableByteCount());
return message.writeWith(Mono.just(buffer) return message.writeWith(Mono.just(buffer)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)); .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
}); })
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
} }
if (isStreamingMediaType(contentType)) { if (isStreamingMediaType(contentType)) {

30
spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java

@ -213,17 +213,27 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
return ((Mono<? extends DataBuffer>) body) return ((Mono<? extends DataBuffer>) body)
.flatMap(buffer -> { .flatMap(buffer -> {
touchDataBuffer(buffer); touchDataBuffer(buffer);
return doCommit(() -> { AtomicReference<Boolean> subscribed = new AtomicReference<>(false);
try { return doCommit(
return writeWithInternal(Mono.fromCallable(() -> buffer) () -> {
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)); try {
} return writeWithInternal(Mono.fromCallable(() -> buffer)
catch (Throwable ex) { .doOnSubscribe(s -> subscribed.set(true))
return Mono.error(ex); .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
} }
}).doOnError(ex -> DataBufferUtils.release(buffer)); catch (Throwable ex) {
return Mono.error(ex);
}
})
.doOnError(ex -> DataBufferUtils.release(buffer))
.doOnCancel(() -> {
if (!subscribed.get()) {
DataBufferUtils.release(buffer);
}
});
}) })
.doOnError(t -> getHeaders().clearContentHeaders()); .doOnError(t -> getHeaders().clearContentHeaders())
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
} }
else { else {
return new ChannelSendOperator<>(body, inner -> doCommit(() -> writeWithInternal(inner))) return new ChannelSendOperator<>(body, inner -> doCommit(() -> writeWithInternal(inner)))

Loading…
Cancel
Save