From d2f2918bf0aa8ce01b581968308e869f807865c4 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 8 Dec 2020 22:16:54 +0000 Subject: [PATCH] Backport fixes for discarding data buffers Closes gh-26232 --- .../http/codec/EncoderHttpMessageWriter.java | 17 ++++++----- .../reactive/AbstractServerHttpResponse.java | 28 ++++++++++++++++--- 2 files changed, 32 insertions(+), 13 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java b/spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java index ce8049a1789..12adac50997 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java +++ b/spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,8 +29,8 @@ import org.springframework.core.codec.AbstractEncoder; import org.springframework.core.codec.Encoder; import org.springframework.core.codec.Hints; import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.PooledDataBuffer; -import org.springframework.http.HttpHeaders; import org.springframework.http.HttpLogging; import org.springframework.http.MediaType; import org.springframework.http.ReactiveHttpOutputMessage; @@ -108,7 +108,6 @@ public class EncoderHttpMessageWriter implements HttpMessageWriter { return this.encoder.canEncode(elementType, mediaType); } - @SuppressWarnings("unchecked") @Override public Mono write(Publisher inputStream, ResolvableType elementType, @Nullable MediaType mediaType, ReactiveHttpOutputMessage message, Map hints) { @@ -119,23 +118,23 @@ public class EncoderHttpMessageWriter implements HttpMessageWriter { inputStream, message.bufferFactory(), elementType, contentType, hints); if (inputStream instanceof Mono) { - HttpHeaders headers = message.getHeaders(); return body .singleOrEmpty() .switchIfEmpty(Mono.defer(() -> { - headers.setContentLength(0); + message.getHeaders().setContentLength(0); return message.setComplete().then(Mono.empty()); })) .flatMap(buffer -> { - headers.setContentLength(buffer.readableByteCount()); + message.getHeaders().setContentLength(buffer.readableByteCount()); return message.writeWith(Mono.just(buffer) - .doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release)); - }); + .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)); + }) + .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); } if (isStreamingMediaType(contentType)) { return message.writeAndFlushWith(body.map(buffer -> - Mono.just(buffer).doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release))); + Mono.just(buffer).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release))); } return message.writeWith(body); diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java index 4f2f51214d4..cd07408b956 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -180,9 +180,29 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse { // Write as Mono if possible as an optimization hint to Reactor Netty // ChannelSendOperator not necessary for Mono if (body instanceof Mono) { - return ((Mono) body).flatMap(buffer -> - doCommit(() -> writeWithInternal(Mono.just(buffer))) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)); + return ((Mono) body) + .flatMap(buffer -> { + AtomicReference subscribed = new AtomicReference<>(false); + return doCommit( + () -> { + try { + return writeWithInternal(Mono.fromCallable(() -> buffer) + .doOnSubscribe(s -> subscribed.set(true)) + .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)); + } + catch (Throwable ex) { + return Mono.error(ex); + } + }) + .doOnError(ex -> DataBufferUtils.release(buffer)) + .doOnCancel(() -> { + if (!subscribed.get()) { + DataBufferUtils.release(buffer); + } + }); + }) + .doOnError(t -> removeContentLength()) + .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); } return new ChannelSendOperator<>(body, inner -> doCommit(() -> writeWithInternal(inner))) .doOnError(t -> removeContentLength());