Browse Source

Fix Jetty DataBufferFactory memory leak

Prior to this commit, gh-32097 added native support for Jetty for both
client and server integrations. The `JettyDataBufferFactory` was
promoted as a first class citizen, extracted from a private class in the
client support. To accomodate with server-side requirements, an extra
`buffer.retain()` call was performed.
While this is useful for server-side support, this introduced a bug in
the data buffer factory, as wrapping an existing chunk means that this
chunk is already retained.

This commit fixes the buffer factory implementation and moved existing
tests from mocks to actual pooled buffer implementations from Jetty.
The extra `buffer.retain()` is now done from the server support, right
before wrapping the buffer.

Fixes gh-35319
pull/35405/head
Brian Clozel 4 months ago
parent
commit
764336f0f2
  1. 1
      spring-core/src/main/java/org/springframework/core/io/buffer/JettyDataBuffer.java
  2. 31
      spring-core/src/test/java/org/springframework/core/io/buffer/JettyDataBufferTests.java
  3. 5
      spring-web/src/main/java/org/springframework/http/server/reactive/JettyCoreServerHttpRequest.java

1
spring-core/src/main/java/org/springframework/core/io/buffer/JettyDataBuffer.java

@ -55,7 +55,6 @@ public final class JettyDataBuffer implements PooledDataBuffer { @@ -55,7 +55,6 @@ public final class JettyDataBuffer implements PooledDataBuffer {
this.bufferFactory = bufferFactory;
this.delegate = delegate;
this.chunk = chunk;
this.chunk.retain();
}
JettyDataBuffer(JettyDataBufferFactory bufferFactory, DefaultDataBuffer delegate) {

31
spring-core/src/test/java/org/springframework/core/io/buffer/JettyDataBufferTests.java

@ -18,33 +18,34 @@ package org.springframework.core.io.buffer; @@ -18,33 +18,34 @@ package org.springframework.core.io.buffer;
import java.nio.ByteBuffer;
import org.eclipse.jetty.io.ArrayByteBufferPool;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
/**
* Tests for {@link JettyDataBuffer}
* @author Arjen Poutsma
* @author Brian Clozel
*/
public class JettyDataBufferTests {
private final JettyDataBufferFactory dataBufferFactory = new JettyDataBufferFactory();
private ArrayByteBufferPool.Tracking byteBufferPool = new ArrayByteBufferPool.Tracking();
@Test
void releaseRetainChunk() {
ByteBuffer buffer = ByteBuffer.allocate(3);
Content.Chunk mockChunk = mock();
given(mockChunk.getByteBuffer()).willReturn(buffer);
given(mockChunk.release()).willReturn(false, false, true);
RetainableByteBuffer retainableBuffer = byteBufferPool.acquire(3, false);
ByteBuffer buffer = retainableBuffer.getByteBuffer();
buffer.position(0).limit(1);
Content.Chunk chunk = Content.Chunk.asChunk(buffer, false, retainableBuffer);
JettyDataBuffer dataBuffer = this.dataBufferFactory.wrap(mockChunk);
JettyDataBuffer dataBuffer = this.dataBufferFactory.wrap(chunk);
dataBuffer.retain();
dataBuffer.retain();
assertThat(dataBuffer.release()).isFalse();
@ -52,8 +53,12 @@ public class JettyDataBufferTests { @@ -52,8 +53,12 @@ public class JettyDataBufferTests {
assertThat(dataBuffer.release()).isTrue();
assertThatIllegalStateException().isThrownBy(dataBuffer::release);
assertThat(retainableBuffer.isRetained()).isFalse();
assertThat(byteBufferPool.getLeaks()).isEmpty();
}
then(mockChunk).should(times(3)).retain();
then(mockChunk).should(times(3)).release();
@AfterEach
public void tearDown() throws Exception {
this.byteBufferPool.clear();
}
}

5
spring-web/src/main/java/org/springframework/http/server/reactive/JettyCoreServerHttpRequest.java

@ -115,7 +115,10 @@ class JettyCoreServerHttpRequest extends AbstractServerHttpRequest { @@ -115,7 +115,10 @@ class JettyCoreServerHttpRequest extends AbstractServerHttpRequest {
// We access the request body as a Flow.Publisher, which is wrapped as an org.reactivestreams.Publisher and
// then wrapped as a Flux.
return Flux.from(FlowAdapters.toPublisher(Content.Source.asPublisher(this.request)))
.map(this.dataBufferFactory::wrap);
.map(chunk -> {
chunk.retain();
return this.dataBufferFactory.wrap(chunk);
});
}
}

Loading…
Cancel
Save