diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index 0c0fa719150..7ad441168a7 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -517,7 +517,7 @@ public abstract class DataBufferUtils { private final AtomicBoolean completed = new AtomicBoolean(); - private long position; + private final AtomicLong position; @Nullable private DataBuffer dataBuffer; @@ -526,7 +526,7 @@ public abstract class DataBufferUtils { FluxSink sink, AsynchronousFileChannel channel, long position) { this.sink = sink; this.channel = channel; - this.position = position; + this.position = new AtomicLong(position); } @Override @@ -539,7 +539,7 @@ public abstract class DataBufferUtils { this.dataBuffer = value; ByteBuffer byteBuffer = value.asByteBuffer(); - this.channel.write(byteBuffer, this.position, byteBuffer, this); + this.channel.write(byteBuffer, this.position.get(), byteBuffer, this); } @Override @@ -550,17 +550,23 @@ public abstract class DataBufferUtils { @Override protected void hookOnComplete() { this.completed.set(true); + + if (this.dataBuffer == null) { + this.sink.complete(); + } } @Override public void completed(Integer written, ByteBuffer byteBuffer) { - this.position += written; + this.position.addAndGet(written); if (byteBuffer.hasRemaining()) { - this.channel.write(byteBuffer, this.position, byteBuffer, this); + this.channel.write(byteBuffer, this.position.get(), byteBuffer, this); return; } - else if (this.dataBuffer != null) { + + if (this.dataBuffer != null) { this.sink.next(this.dataBuffer); + this.dataBuffer = null; } if (this.completed.get()) { this.sink.complete(); diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java index 10f4435d348..be66e39e47d 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java @@ -207,7 +207,6 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { } @Test - @Ignore // SPR-15798 public void writeAsynchronousFileChannel() throws Exception { DataBuffer foo = stringBuffer("foo"); DataBuffer bar = stringBuffer("bar");