From d5da823482134cc447a7023ad223a661cc50e348 Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Tue, 25 Jul 2017 16:37:11 +0200 Subject: [PATCH] Fix race condition for AsynchronousFileChannel This commit fixes an issue in the DataBufferUtils.write variant that takes a AsynchronousFileChannel. Issue: SPR-15798 --- .../core/io/buffer/DataBufferUtils.java | 18 ++++++++++++------ .../core/io/buffer/DataBufferUtilsTests.java | 1 - 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index 0c0fa719150..7ad441168a7 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -517,7 +517,7 @@ public abstract class DataBufferUtils { private final AtomicBoolean completed = new AtomicBoolean(); - private long position; + private final AtomicLong position; @Nullable private DataBuffer dataBuffer; @@ -526,7 +526,7 @@ public abstract class DataBufferUtils { FluxSink sink, AsynchronousFileChannel channel, long position) { this.sink = sink; this.channel = channel; - this.position = position; + this.position = new AtomicLong(position); } @Override @@ -539,7 +539,7 @@ public abstract class DataBufferUtils { this.dataBuffer = value; ByteBuffer byteBuffer = value.asByteBuffer(); - this.channel.write(byteBuffer, this.position, byteBuffer, this); + this.channel.write(byteBuffer, this.position.get(), byteBuffer, this); } @Override @@ -550,17 +550,23 @@ public abstract class DataBufferUtils { @Override protected void hookOnComplete() { this.completed.set(true); + + if (this.dataBuffer == null) { + this.sink.complete(); + } } @Override public void completed(Integer written, ByteBuffer byteBuffer) { - this.position += written; + this.position.addAndGet(written); if (byteBuffer.hasRemaining()) { - this.channel.write(byteBuffer, this.position, byteBuffer, this); + this.channel.write(byteBuffer, this.position.get(), byteBuffer, this); return; } - else if (this.dataBuffer != null) { + + if (this.dataBuffer != null) { this.sink.next(this.dataBuffer); + this.dataBuffer = null; } if (this.completed.get()) { this.sink.complete(); diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java index 10f4435d348..be66e39e47d 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java @@ -207,7 +207,6 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { } @Test - @Ignore // SPR-15798 public void writeAsynchronousFileChannel() throws Exception { DataBuffer foo = stringBuffer("foo"); DataBuffer bar = stringBuffer("bar");