From f748b1e68d996665a458563c5bb0e643c8137c1b Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Mon, 2 Sep 2019 15:13:22 +0200 Subject: [PATCH] Fix timing bug in DataBufferUtils::readAsynchronousFileChannel This commit makes sure that reading is enabled after the current signal has been processed, not while is is being processed. The bug was only apparent while using the JettyClientHttpConnector, which requests new elements continuously, even after the end of the stream has been signalled. --- .../core/io/buffer/DataBufferUtils.java | 26 ++++++++++++------- .../client/WebClientIntegrationTests.java | 18 ++++++++----- 2 files changed, 27 insertions(+), 17 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index bc97362c989..9ac2d80b77d 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -506,7 +506,7 @@ public abstract class DataBufferUtils { private final AtomicBoolean reading = new AtomicBoolean(); - private final AtomicBoolean canceled = new AtomicBoolean(); + private final AtomicBoolean disposed = new AtomicBoolean(); public ReadCompletionHandler(AsynchronousFileChannel channel, FluxSink sink, long position, DataBufferFactory dataBufferFactory, int bufferSize) { @@ -519,7 +519,9 @@ public abstract class DataBufferUtils { } public void read() { - if (this.sink.requestedFromDownstream() > 0 && this.reading.compareAndSet(false, true)) { + if (this.sink.requestedFromDownstream() > 0 && + isNotDisposed() && + this.reading.compareAndSet(false, true)) { DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize); ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, this.bufferSize); this.channel.read(byteBuffer, this.position.get(), dataBuffer, this); @@ -528,34 +530,38 @@ public abstract class DataBufferUtils { @Override public void completed(Integer read, DataBuffer dataBuffer) { - this.reading.set(false); - if (!isCanceled()) { + if (isNotDisposed()) { if (read != -1) { this.position.addAndGet(read); dataBuffer.writePosition(read); this.sink.next(dataBuffer); + this.reading.set(false); read(); } else { release(dataBuffer); closeChannel(this.channel); - this.sink.complete(); + if (this.disposed.compareAndSet(false, true)) { + this.sink.complete(); + } + this.reading.set(false); } } else { release(dataBuffer); closeChannel(this.channel); + this.reading.set(false); } } @Override public void failed(Throwable exc, DataBuffer dataBuffer) { - this.reading.set(false); release(dataBuffer); closeChannel(this.channel); - if (!isCanceled()) { + if (this.disposed.compareAndSet(false, true)) { this.sink.error(exc); } + this.reading.set(false); } public void request(long n) { @@ -563,15 +569,15 @@ public abstract class DataBufferUtils { } public void cancel() { - if (this.canceled.compareAndSet(false, true)) { + if (this.disposed.compareAndSet(false, true)) { if (!this.reading.get()) { closeChannel(this.channel); } } } - private boolean isCanceled() { - return this.canceled.get(); + private boolean isNotDisposed() { + return !this.disposed.get(); } } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java index 5e069276874..2720fbb6eaf 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java @@ -18,7 +18,9 @@ package org.springframework.web.reactive.function.client; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.UncheckedIOException; import java.net.URI; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.time.Duration; import java.util.Arrays; @@ -406,7 +408,6 @@ public class WebClientIntegrationTests { prepareResponse(response -> {}); Resource resource = new ClassPathResource("largeTextFile.txt", getClass()); - byte[] expected = Files.readAllBytes(resource.getFile().toPath()); Flux body = DataBufferUtils.read(resource, new DefaultDataBufferFactory(), 4096); Mono result = this.webClient.post() @@ -415,18 +416,21 @@ public class WebClientIntegrationTests { .retrieve() .bodyToMono(Void.class); - StepVerifier.create(result).verifyComplete(); + StepVerifier.create(result) + .expectComplete() + .verify(Duration.ofSeconds(5)); expectRequest(request -> { - ByteArrayOutputStream actual = new ByteArrayOutputStream(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); try { - request.getBody().copyTo(actual); + request.getBody().copyTo(bos); + String actual = bos.toString("UTF-8"); + String expected = new String(Files.readAllBytes(resource.getFile().toPath()), StandardCharsets.UTF_8); + assertEquals(expected, actual); } catch (IOException ex) { - throw new IllegalStateException(ex); + throw new UncheckedIOException(ex); } - assertEquals(expected.length, actual.size()); - assertEquals(hash(expected), hash(actual.toByteArray())); }); }