Browse Source

Merge branch '5.1.x'

pull/22715/head
Rossen Stoyanchev 7 years ago
parent
commit
52b109ae68
  1. 39
      spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java
  2. 11
      spring-core/src/test/java/org/springframework/core/codec/ResourceRegionEncoderTests.java
  3. 1
      spring-core/src/test/java/org/springframework/core/io/buffer/AbstractDataBufferAllocatingTestCase.java
  4. 2
      spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java
  5. 1
      spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBufferFactory.java

39
spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java

@ -57,6 +57,12 @@ public abstract class DataBufferUtils {
private static final Consumer<DataBuffer> RELEASE_CONSUMER = DataBufferUtils::release; private static final Consumer<DataBuffer> RELEASE_CONSUMER = DataBufferUtils::release;
/**
* Workaround to disable use of pooled buffers:
* https://github.com/reactor/reactor-core/issues/1634
*/
private static final DataBufferFactory defaultDataBufferFactory = new DefaultDataBufferFactory();
//--------------------------------------------------------------------- //---------------------------------------------------------------------
// Reading // Reading
@ -132,16 +138,21 @@ public abstract class DataBufferUtils {
Assert.isTrue(position >= 0, "'position' must be >= 0"); Assert.isTrue(position >= 0, "'position' must be >= 0");
Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0"); Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
DataBufferFactory bufferFactoryToUse = defaultDataBufferFactory;
Flux<DataBuffer> flux = Flux.using(channelSupplier, Flux<DataBuffer> flux = Flux.using(channelSupplier,
channel -> Flux.create(sink -> { channel -> Flux.create(sink -> {
ReadCompletionHandler handler = ReadCompletionHandler handler =
new ReadCompletionHandler(channel, sink, position, bufferFactory, bufferSize); new ReadCompletionHandler(channel, sink, position, bufferFactoryToUse, bufferSize);
DataBuffer dataBuffer = bufferFactory.allocateBuffer(bufferSize); DataBuffer dataBuffer = bufferFactoryToUse.allocateBuffer(bufferSize);
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize); ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize);
channel.read(byteBuffer, position, dataBuffer, handler); channel.read(byteBuffer, position, dataBuffer, handler);
sink.onDispose(handler::dispose); sink.onDispose(handler::dispose);
}), }),
DataBufferUtils::closeChannel); channel -> {
// Do not close channel from here, rather wait for the current read callback
// and then complete after releasing the DataBuffer.
});
return flux.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); return flux.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
} }
@ -505,26 +516,40 @@ public abstract class DataBufferUtils {
@Override @Override
public void completed(Integer read, DataBuffer dataBuffer) { public void completed(Integer read, DataBuffer dataBuffer) {
if (read != -1) { if (read != -1 && !this.disposed.get()) {
long pos = this.position.addAndGet(read); long pos = this.position.addAndGet(read);
dataBuffer.writePosition(read); dataBuffer.writePosition(read);
this.sink.next(dataBuffer); this.sink.next(dataBuffer);
if (!this.disposed.get()) { // It's possible for cancellation to happen right before the push into the sink
if (this.disposed.get()) {
// TODO:
// This is not ideal since we already passed the buffer into the sink and
// releasing may cause something reading to fail. Maybe we won't have to
// do this after https://github.com/reactor/reactor-core/issues/1634
complete(dataBuffer);
}
else {
DataBuffer newDataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize); DataBuffer newDataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize);
ByteBuffer newByteBuffer = newDataBuffer.asByteBuffer(0, this.bufferSize); ByteBuffer newByteBuffer = newDataBuffer.asByteBuffer(0, this.bufferSize);
this.channel.read(newByteBuffer, pos, newDataBuffer, this); this.channel.read(newByteBuffer, pos, newDataBuffer, this);
} }
} }
else { else {
release(dataBuffer); complete(dataBuffer);
this.sink.complete();
} }
} }
private void complete(DataBuffer dataBuffer) {
release(dataBuffer);
this.sink.complete();
closeChannel(this.channel);
}
@Override @Override
public void failed(Throwable exc, DataBuffer dataBuffer) { public void failed(Throwable exc, DataBuffer dataBuffer) {
release(dataBuffer); release(dataBuffer);
this.sink.error(exc); this.sink.error(exc);
closeChannel(this.channel);
} }
public void dispose() { public void dispose() {

11
spring-core/src/test/java/org/springframework/core/codec/ResourceRegionEncoderTests.java

@ -80,8 +80,7 @@ public class ResourceRegionEncoderTests {
.expectComplete() .expectComplete()
.verify(); .verify();
// TODO: https://github.com/reactor/reactor-core/issues/1634 this.bufferFactory.checkForLeaks();
// this.bufferFactory.checkForLeaks();
} }
@Test @Test
@ -122,11 +121,10 @@ public class ResourceRegionEncoderTests {
.expectComplete() .expectComplete()
.verify(); .verify();
// TODO: https://github.com/reactor/reactor-core/issues/1634 this.bufferFactory.checkForLeaks();
// this.bufferFactory.checkForLeaks();
} }
@Test // gh- @Test // gh-22107
public void cancelWithoutDemandForMultipleResourceRegions() { public void cancelWithoutDemandForMultipleResourceRegions() {
Resource resource = new ClassPathResource("ResourceRegionEncoderTests.txt", getClass()); Resource resource = new ClassPathResource("ResourceRegionEncoderTests.txt", getClass());
Flux<ResourceRegion> regions = Flux.just( Flux<ResourceRegion> regions = Flux.just(
@ -173,8 +171,7 @@ public class ResourceRegionEncoderTests {
.expectError(EncodingException.class) .expectError(EncodingException.class)
.verify(); .verify();
// TODO: https://github.com/reactor/reactor-core/issues/1634 this.bufferFactory.checkForLeaks();
// this.bufferFactory.checkForLeaks();
} }
protected Consumer<DataBuffer> stringConsumer(String expected) { protected Consumer<DataBuffer> stringConsumer(String expected) {

1
spring-core/src/test/java/org/springframework/core/io/buffer/AbstractDataBufferAllocatingTestCase.java

@ -137,6 +137,7 @@ public abstract class AbstractDataBufferAllocatingTestCase {
catch (InterruptedException ex) { catch (InterruptedException ex) {
// ignore // ignore
} }
continue;
} }
assertEquals("ByteBuf Leak: " + total + " unreleased allocations", 0, total); assertEquals("ByteBuf Leak: " + total + " unreleased allocations", 0, total);
} }

2
spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java

@ -187,8 +187,6 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
.verify(); .verify();
} }
// TODO: Remove ignore after https://github.com/reactor/reactor-core/issues/1634
@Ignore
@Test // gh-22107 @Test // gh-22107
public void readAsynchronousFileChannelCancelWithoutDemand() throws Exception { public void readAsynchronousFileChannelCancelWithoutDemand() throws Exception {
URI uri = this.resource.getURI(); URI uri = this.resource.getURI();

1
spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBufferFactory.java

@ -85,6 +85,7 @@ public class LeakAwareDataBufferFactory implements DataBufferFactory {
catch (InterruptedException ex) { catch (InterruptedException ex) {
// ignore // ignore
} }
continue;
} }
List<AssertionError> errors = this.created.stream() List<AssertionError> errors = this.created.stream()
.filter(LeakAwareDataBuffer::isAllocated) .filter(LeakAwareDataBuffer::isAllocated)

Loading…
Cancel
Save