Browse Source

Merge branch '5.1.x'

pull/22792/head
Rossen Stoyanchev 7 years ago
parent
commit
bb28477587
  1. 14
      spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java
  2. 15
      spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBuffer.java
  3. 6
      spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBufferFactory.java

14
spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java

@ -532,13 +532,9 @@ public abstract class DataBufferUtils {
long pos = this.position.addAndGet(read); long pos = this.position.addAndGet(read);
dataBuffer.writePosition(read); dataBuffer.writePosition(read);
this.sink.next(dataBuffer); this.sink.next(dataBuffer);
// It's possible for cancellation to happen right before the push into the sink // onNext may have led to onCancel (e.g. downstream takeUntil)
if (this.disposed.get()) { if (this.disposed.get()) {
// TODO: complete();
// This is not ideal since we already passed the buffer into the sink and
// releasing may cause something reading to fail. Maybe we won't have to
// do this after https://github.com/reactor/reactor-core/issues/1634
complete(dataBuffer);
} }
else { else {
DataBuffer newDataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize); DataBuffer newDataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize);
@ -547,12 +543,12 @@ public abstract class DataBufferUtils {
} }
} }
else { else {
complete(dataBuffer); release(dataBuffer);
complete();
} }
} }
private void complete(DataBuffer dataBuffer) { private void complete() {
release(dataBuffer);
this.sink.complete(); this.sink.complete();
closeChannel(this.channel); closeChannel(this.channel);
} }

15
spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBuffer.java

@ -37,8 +37,6 @@ class LeakAwareDataBuffer implements PooledDataBuffer {
private final LeakAwareDataBufferFactory dataBufferFactory; private final LeakAwareDataBufferFactory dataBufferFactory;
private int refCount = 1;
LeakAwareDataBuffer(DataBuffer delegate, LeakAwareDataBufferFactory dataBufferFactory) { LeakAwareDataBuffer(DataBuffer delegate, LeakAwareDataBufferFactory dataBufferFactory) {
Assert.notNull(delegate, "Delegate must not be null"); Assert.notNull(delegate, "Delegate must not be null");
@ -67,19 +65,24 @@ class LeakAwareDataBuffer implements PooledDataBuffer {
@Override @Override
public boolean isAllocated() { public boolean isAllocated() {
return this.refCount > 0; return this.delegate instanceof PooledDataBuffer &&
((PooledDataBuffer) this.delegate).isAllocated();
} }
@Override @Override
public PooledDataBuffer retain() { public PooledDataBuffer retain() {
this.refCount++; if (this.delegate instanceof PooledDataBuffer) {
((PooledDataBuffer) this.delegate).retain();
}
return this; return this;
} }
@Override @Override
public boolean release() { public boolean release() {
this.refCount--; if (this.delegate instanceof PooledDataBuffer) {
return this.refCount == 0; ((PooledDataBuffer) this.delegate).release();
}
return isAllocated();
} }
// delegation // delegation

6
spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBufferFactory.java

@ -99,16 +99,16 @@ public class LeakAwareDataBufferFactory implements DataBufferFactory {
@Override @Override
public DataBuffer allocateBuffer() { public DataBuffer allocateBuffer() {
return allocateBufferInternal(this.delegate.allocateBuffer()); return createLeakAwareDataBuffer(this.delegate.allocateBuffer());
} }
@Override @Override
public DataBuffer allocateBuffer(int initialCapacity) { public DataBuffer allocateBuffer(int initialCapacity) {
return allocateBufferInternal(this.delegate.allocateBuffer(initialCapacity)); return createLeakAwareDataBuffer(this.delegate.allocateBuffer(initialCapacity));
} }
@NotNull @NotNull
private DataBuffer allocateBufferInternal(DataBuffer delegateBuffer) { private DataBuffer createLeakAwareDataBuffer(DataBuffer delegateBuffer) {
LeakAwareDataBuffer dataBuffer = new LeakAwareDataBuffer(delegateBuffer, this); LeakAwareDataBuffer dataBuffer = new LeakAwareDataBuffer(delegateBuffer, this);
this.created.add(dataBuffer); this.created.add(dataBuffer);
return dataBuffer; return dataBuffer;

Loading…
Cancel
Save