Browse Source

Backport fixes for discarding data buffers

Closes gh-26232
pull/23967/head
Rossen Stoyanchev 5 years ago
parent
commit
d2f2918bf0
  1. 17
      spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java
  2. 28
      spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java

17
spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -29,8 +29,8 @@ import org.springframework.core.codec.AbstractEncoder; @@ -29,8 +29,8 @@ import org.springframework.core.codec.AbstractEncoder;
import org.springframework.core.codec.Encoder;
import org.springframework.core.codec.Hints;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpLogging;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpOutputMessage;
@ -108,7 +108,6 @@ public class EncoderHttpMessageWriter<T> implements HttpMessageWriter<T> { @@ -108,7 +108,6 @@ public class EncoderHttpMessageWriter<T> implements HttpMessageWriter<T> {
return this.encoder.canEncode(elementType, mediaType);
}
@SuppressWarnings("unchecked")
@Override
public Mono<Void> write(Publisher<? extends T> inputStream, ResolvableType elementType,
@Nullable MediaType mediaType, ReactiveHttpOutputMessage message, Map<String, Object> hints) {
@ -119,23 +118,23 @@ public class EncoderHttpMessageWriter<T> implements HttpMessageWriter<T> { @@ -119,23 +118,23 @@ public class EncoderHttpMessageWriter<T> implements HttpMessageWriter<T> {
inputStream, message.bufferFactory(), elementType, contentType, hints);
if (inputStream instanceof Mono) {
HttpHeaders headers = message.getHeaders();
return body
.singleOrEmpty()
.switchIfEmpty(Mono.defer(() -> {
headers.setContentLength(0);
message.getHeaders().setContentLength(0);
return message.setComplete().then(Mono.empty());
}))
.flatMap(buffer -> {
headers.setContentLength(buffer.readableByteCount());
message.getHeaders().setContentLength(buffer.readableByteCount());
return message.writeWith(Mono.just(buffer)
.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release));
});
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
})
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}
if (isStreamingMediaType(contentType)) {
return message.writeAndFlushWith(body.map(buffer ->
Mono.just(buffer).doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release)));
Mono.just(buffer).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)));
}
return message.writeWith(body);

28
spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -180,9 +180,29 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse { @@ -180,9 +180,29 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
// Write as Mono if possible as an optimization hint to Reactor Netty
// ChannelSendOperator not necessary for Mono
if (body instanceof Mono) {
return ((Mono<? extends DataBuffer>) body).flatMap(buffer ->
doCommit(() -> writeWithInternal(Mono.just(buffer)))
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
return ((Mono<? extends DataBuffer>) body)
.flatMap(buffer -> {
AtomicReference<Boolean> subscribed = new AtomicReference<>(false);
return doCommit(
() -> {
try {
return writeWithInternal(Mono.fromCallable(() -> buffer)
.doOnSubscribe(s -> subscribed.set(true))
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
}
catch (Throwable ex) {
return Mono.error(ex);
}
})
.doOnError(ex -> DataBufferUtils.release(buffer))
.doOnCancel(() -> {
if (!subscribed.get()) {
DataBufferUtils.release(buffer);
}
});
})
.doOnError(t -> removeContentLength())
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}
return new ChannelSendOperator<>(body, inner -> doCommit(() -> writeWithInternal(inner)))
.doOnError(t -> removeContentLength());

Loading…
Cancel
Save