Browse Source

Fix memory leak in WiretapConnector

Prior to this commit, we found in gh-35953 that using the `WebTestClient`
the following way leaks data buffers:

```
var body = client.get().uri("download")
  .exchange()
  .expectStatus().isOk()
  .returnResult()
  .getResponseBodyContent();
```

Here, the test performs expectations on the response status and headers,
but not on the response body. The WiretapConnector already supports this
case by subscribing to the Flux response body in those cases and
accumulating the entire content as a single byte[].

Here, the `DataBuffer` instances are not decoded by any `Decoder` and
are not released. This results in a memory leak.

This commit ensures that the automatic subscription in
`WiretapConnector` also releases the buffers automatically as the DSL
does not allow at that point to go back to performing body expectations.

Fixes gh-36050
pull/36056/head
Brian Clozel 1 month ago
parent
commit
7353ab41d2
  1. 21
      spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java
  2. 61
      spring-test/src/test/java/org/springframework/test/web/reactive/server/WiretapConnectorTests.java

21
spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java

@ -31,6 +31,7 @@ import reactor.core.publisher.Mono; @@ -31,6 +31,7 @@ import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpMethod;
import org.springframework.http.client.reactive.ClientHttpConnector;
@ -196,11 +197,21 @@ class WiretapConnector implements ClientHttpConnector { @@ -196,11 +197,21 @@ class WiretapConnector implements ClientHttpConnector {
// 1. Mock server never consumed request body (for example, error before read)
// 2. FluxExchangeResult: getResponseBodyContent called before getResponseBody
//noinspection ConstantConditions
(this.publisher != null ? this.publisher : this.publisherNested)
.onErrorMap(ex -> new IllegalStateException(
"Content has not been consumed, and " +
"an error was raised while attempting to produce it.", ex))
.subscribe();
if (this.publisher != null) {
this.publisher.doOnNext(DataBufferUtils::release)
.onErrorMap(ex -> new IllegalStateException(
"Content has not been consumed, and " +
"an error was raised while attempting to produce it.", ex))
.subscribe();
}
else if (this.publisherNested != null) {
this.publisherNested
.map(pub -> Flux.from(pub).doOnNext(DataBufferUtils::release))
.onErrorMap(ex -> new IllegalStateException(
"Content has not been consumed, and " +
"an error was raised while attempting to produce it.", ex))
.subscribe();
}
}
return this.content.asMono();
});

61
spring-test/src/test/java/org/springframework/test/web/reactive/server/WiretapConnectorTests.java

@ -17,11 +17,18 @@ @@ -17,11 +17,18 @@
package org.springframework.test.web.reactive.server;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import io.netty.buffer.PooledByteBufAllocator;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.core.testfixture.io.buffer.LeakAwareDataBufferFactory;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.client.reactive.ClientHttpConnector;
@ -30,6 +37,7 @@ import org.springframework.http.client.reactive.ClientHttpResponse; @@ -30,6 +37,7 @@ import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.mock.http.client.reactive.MockClientHttpRequest;
import org.springframework.mock.http.client.reactive.MockClientHttpResponse;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.ExchangeFunction;
import org.springframework.web.reactive.function.client.ExchangeFunctions;
@ -44,11 +52,19 @@ import static org.assertj.core.api.Assertions.assertThat; @@ -44,11 +52,19 @@ import static org.assertj.core.api.Assertions.assertThat;
*/
public class WiretapConnectorTests {
private final LeakAwareDataBufferFactory bufferFactory =
new LeakAwareDataBufferFactory(new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT));
@AfterEach
void tearDown() {
this.bufferFactory.checkForLeaks();
}
@Test
public void captureAndClaim() {
ClientHttpRequest request = new MockClientHttpRequest(HttpMethod.GET, "/test");
ClientHttpResponse response = new MockClientHttpResponse(HttpStatus.OK);
ClientHttpConnector connector = (method, uri, fn) -> fn.apply(request).then(Mono.just(response));
ClientHttpConnector connector = createConnector(request, response);
ClientRequest clientRequest = ClientRequest.create(HttpMethod.GET, URI.create("/test"))
.header(WebTestClient.WEBTESTCLIENT_REQUEST_ID, "1").build();
@ -62,4 +78,47 @@ public class WiretapConnectorTests { @@ -62,4 +78,47 @@ public class WiretapConnectorTests {
assertThat(result.getUrl().toString()).isEqualTo("/test");
}
@Test
void shouldReleaseBuffers() {
MockClientHttpRequest request = new MockClientHttpRequest(HttpMethod.GET, "/test");
MockClientHttpResponse response = new MockClientHttpResponse(HttpStatus.OK);
response.setBody(Flux.just(toDataBuffer("Hello Spring")));
ClientHttpConnector connector = createConnector(request, response);
ClientRequest clientRequest = ClientRequest.create(HttpMethod.GET, URI.create("/test"))
.header(WebTestClient.WEBTESTCLIENT_REQUEST_ID, "1").build();
WiretapConnector wiretapConnector = new WiretapConnector(connector, null);
ExchangeFunction function = ExchangeFunctions.create(wiretapConnector);
function.exchange(clientRequest).block(ofMillis(0));
ExchangeResult result = wiretapConnector.getExchangeResult("1", null, Duration.ZERO);
result.getResponseBodyContent();
}
@Test
void shouldReleaseBuffersOnlyOnce() {
MockClientHttpRequest request = new MockClientHttpRequest(HttpMethod.GET, "/test");
MockClientHttpResponse response = new MockClientHttpResponse(HttpStatus.OK);
response.setBody(Flux.just(toDataBuffer("Hello Spring"), toDataBuffer("Hello Spring"), toDataBuffer("Hello Spring"), toDataBuffer("Hello Spring")));
ClientHttpConnector connector = createConnector(request, response);
ClientRequest clientRequest = ClientRequest.create(HttpMethod.GET, URI.create("/test"))
.header(WebTestClient.WEBTESTCLIENT_REQUEST_ID, "1").build();
WiretapConnector wiretapConnector = new WiretapConnector(connector, null);
ExchangeFunction function = ExchangeFunctions.create(wiretapConnector);
function.exchange(clientRequest).flatMap(ClientResponse::releaseBody).block(ofMillis(0));
ExchangeResult result = wiretapConnector.getExchangeResult("1", null, Duration.ZERO);
result.getResponseBodyContent();
}
private ClientHttpConnector createConnector(ClientHttpRequest request, ClientHttpResponse response) {
return (method, uri, fn) -> fn.apply(request).then(Mono.just(response));
}
private DataBuffer toDataBuffer(String s) {
DataBuffer buffer = this.bufferFactory.allocateBuffer(256);
buffer.write(s.getBytes(StandardCharsets.UTF_8));
return buffer;
}
}

Loading…
Cancel
Save