Browse Source

Backport fixes for discarding data buffers

Closes gh-26232
pull/26273/head
Rossen Stoyanchev 5 years ago
parent
commit
33476a2eae
  1. 3
      spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java
  2. 26
      spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java
  3. 28
      spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java

3
spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java

@ -128,7 +128,8 @@ public class EncoderHttpMessageWriter<T> implements HttpMessageWriter<T> { @@ -128,7 +128,8 @@ public class EncoderHttpMessageWriter<T> implements HttpMessageWriter<T> {
message.getHeaders().setContentLength(buffer.readableByteCount());
return message.writeWith(Mono.just(buffer)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
});
})
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}
if (isStreamingMediaType(contentType)) {

26
spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java

@ -203,10 +203,28 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse { @@ -203,10 +203,28 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
// We must resolve value first however, for a chance to handle potential error.
if (body instanceof Mono) {
return ((Mono<? extends DataBuffer>) body)
.flatMap(buffer -> doCommit(() ->
writeWithInternal(Mono.fromCallable(() -> buffer)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release))))
.doOnError(t -> getHeaders().clearContentHeaders());
.flatMap(buffer -> {
AtomicReference<Boolean> subscribed = new AtomicReference<>(false);
return doCommit(
() -> {
try {
return writeWithInternal(Mono.fromCallable(() -> buffer)
.doOnSubscribe(s -> subscribed.set(true))
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
}
catch (Throwable ex) {
return Mono.error(ex);
}
})
.doOnError(ex -> DataBufferUtils.release(buffer))
.doOnCancel(() -> {
if (!subscribed.get()) {
DataBufferUtils.release(buffer);
}
});
})
.doOnError(t -> getHeaders().clearContentHeaders())
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}
else {
return new ChannelSendOperator<>(body, inner -> doCommit(() -> writeWithInternal(inner)))

28
spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java

@ -19,7 +19,9 @@ package org.springframework.http.server.reactive; @@ -19,7 +19,9 @@ package org.springframework.http.server.reactive;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Supplier;
@ -27,14 +29,22 @@ import org.junit.jupiter.api.Test; @@ -27,14 +29,22 @@ import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.channel.AbortedException;
import reactor.test.StepVerifier;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DefaultDataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.core.testfixture.io.buffer.LeakAwareDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseCookie;
import org.springframework.http.codec.EncoderHttpMessageWriter;
import org.springframework.http.codec.HttpMessageWriter;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.web.testfixture.http.server.reactive.MockServerHttpRequest;
import org.springframework.web.testfixture.http.server.reactive.MockServerHttpResponse;
import static org.assertj.core.api.Assertions.assertThat;
@ -176,6 +186,24 @@ public class ServerHttpResponseTests { @@ -176,6 +186,24 @@ public class ServerHttpResponseTests {
});
}
@Test // gh-26232
void monoResponseShouldNotLeakIfCancelled() {
LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory();
MockServerHttpRequest request = MockServerHttpRequest.get("/").build();
MockServerHttpResponse response = new MockServerHttpResponse(bufferFactory);
response.setWriteHandler(flux -> {
throw AbortedException.beforeSend();
});
HttpMessageWriter<Object> messageWriter = new EncoderHttpMessageWriter<>(new Jackson2JsonEncoder());
Mono<Void> result = messageWriter.write(Mono.just(Collections.singletonMap("foo", "bar")),
ResolvableType.forClass(Mono.class), ResolvableType.forClass(Map.class), null,
request, response, Collections.emptyMap());
StepVerifier.create(result).expectError(AbortedException.class).verify();
bufferFactory.checkForLeaks();
}
private DefaultDataBuffer wrap(String a) {
return new DefaultDataBufferFactory().wrap(ByteBuffer.wrap(a.getBytes(StandardCharsets.UTF_8)));

Loading…
Cancel
Save