From 63118c1ea76a7cb5e893ae24495dd1e49d157db5 Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Thu, 9 Mar 2017 13:58:04 +0100 Subject: [PATCH] Add DataBufferUtils.read w/ AsynchFileChannel This commit adds an overloaded DataBufferUtils.read method that operates on a AsynchronousFileChannel (as opposed to a ReadableByteChannel, which already existed). This commit also uses said method in the Resource encoders, if the Resource is a file. --- .../core/codec/ResourceEncoder.java | 17 ++- .../core/codec/ResourceRegionEncoder.java | 34 ++++- .../core/io/buffer/DataBufferUtils.java | 137 +++++++++++++++--- .../codec/ResourceRegionEncoderTests.java | 47 ++++-- .../core/io/buffer/DataBufferUtilsTests.java | 36 ++++- .../core/codec/ResourceRegionEncoderTests.txt | 1 + 6 files changed, 228 insertions(+), 44 deletions(-) create mode 100644 spring-core/src/test/resources/org/springframework/core/codec/ResourceRegionEncoderTests.txt diff --git a/spring-core/src/main/java/org/springframework/core/codec/ResourceEncoder.java b/spring-core/src/main/java/org/springframework/core/codec/ResourceEncoder.java index 89adf12d39a..0f22c8a0f0f 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/ResourceEncoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/ResourceEncoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,8 +16,11 @@ package org.springframework.core.codec; +import java.io.File; import java.io.IOException; +import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.ReadableByteChannel; +import java.nio.file.StandardOpenOption; import java.util.Map; import reactor.core.publisher.Flux; @@ -66,6 +69,18 @@ public class ResourceEncoder extends AbstractSingleValueEncoder { protected Flux encode(Resource resource, DataBufferFactory dataBufferFactory, ResolvableType type, MimeType mimeType, Map hints) { + try { + if (resource.isFile()) { + File file = resource.getFile(); + AsynchronousFileChannel channel = + AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ); + return DataBufferUtils.read(channel, dataBufferFactory, this.bufferSize); + } + } + catch (IOException ignore) { + // fallback to resource.readableChannel(), below + } + try { ReadableByteChannel channel = resource.readableChannel(); return DataBufferUtils.read(channel, dataBufferFactory, this.bufferSize); diff --git a/spring-core/src/main/java/org/springframework/core/codec/ResourceRegionEncoder.java b/spring-core/src/main/java/org/springframework/core/codec/ResourceRegionEncoder.java index 2b3753dd528..d3b37f00d29 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/ResourceRegionEncoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/ResourceRegionEncoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,10 +16,13 @@ package org.springframework.core.codec; +import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.ReadableByteChannel; import java.nio.charset.StandardCharsets; +import java.nio.file.StandardOpenOption; import java.util.Map; import java.util.OptionalLong; @@ -113,14 +116,31 @@ public class ResourceRegionEncoder extends AbstractEncoder { } private Flux writeResourceRegion(ResourceRegion region, DataBufferFactory bufferFactory) { + Flux in = readResourceRegion(region, bufferFactory); + return DataBufferUtils.takeUntilByteCount(in, region.getCount()); + } + + private Flux readResourceRegion(ResourceRegion region, DataBufferFactory bufferFactory) { + Resource resource = region.getResource(); + try { + if (resource.isFile()) { + File file = region.getResource().getFile(); + AsynchronousFileChannel channel = + AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ); + return DataBufferUtils.read(channel, region.getPosition(), + bufferFactory, this.bufferSize); + } + } + catch (IOException ignore) { + // fallback to resource.readableChannel(), below + } try { - ReadableByteChannel resourceChannel = region.getResource().readableChannel(); - Flux in = DataBufferUtils.read(resourceChannel, bufferFactory, this.bufferSize); - Flux skipped = DataBufferUtils.skipUntilByteCount(in, region.getPosition()); - return DataBufferUtils.takeUntilByteCount(skipped, region.getCount()); + ReadableByteChannel channel = resource.readableChannel(); + Flux in = DataBufferUtils.read(channel, bufferFactory, this.bufferSize); + return DataBufferUtils.skipUntilByteCount(in, region.getPosition()); } - catch (IOException exc) { - return Flux.error(exc); + catch (IOException ex) { + return Flux.error(ex); } } diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index b9172bbf883..5e12bed9e6c 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,14 +19,17 @@ package org.springframework.core.io.buffer; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousFileChannel; +import java.nio.channels.Channel; import java.nio.channels.Channels; +import java.nio.channels.CompletionHandler; import java.nio.channels.ReadableByteChannel; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; -import java.util.function.Consumer; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; import reactor.core.publisher.SynchronousSink; import org.springframework.util.Assert; @@ -40,17 +43,6 @@ import org.springframework.util.Assert; */ public abstract class DataBufferUtils { - private static final Consumer CLOSE_CONSUMER = channel -> { - try { - if (channel != null) { - channel.close(); - } - } - catch (IOException ex) { - } - }; - - /** * Read the given {@code InputStream} into a {@code Flux} of * {@code DataBuffer}s. Closes the input stream when the flux is terminated. @@ -85,7 +77,58 @@ public abstract class DataBufferUtils { return Flux.generate(() -> channel, new ReadableByteChannelGenerator(dataBufferFactory, bufferSize), - CLOSE_CONSUMER); + DataBufferUtils::closeChannel); + } + + /** + * Read the given {@code AsynchronousFileChannel} into a {@code Flux} of + * {@code DataBuffer}s. Closes the channel when the flux is terminated. + * @param channel the channel to read from + * @param dataBufferFactory the factory to create data buffers with + * @param bufferSize the maximum size of the data buffers + * @return a flux of data buffers read from the given channel + */ + public static Flux read(AsynchronousFileChannel channel, + DataBufferFactory dataBufferFactory, int bufferSize) { + return read(channel, 0, dataBufferFactory, bufferSize); + } + + /** + * Read the given {@code AsynchronousFileChannel} into a {@code Flux} of + * {@code DataBuffer}s, starting at the given position. Closes the channel when the flux is + * terminated. + * @param channel the channel to read from + * @param position the position to start reading from + * @param dataBufferFactory the factory to create data buffers with + * @param bufferSize the maximum size of the data buffers + * @return a flux of data buffers read from the given channel + */ + public static Flux read(AsynchronousFileChannel channel, + long position, DataBufferFactory dataBufferFactory, int bufferSize) { + + Assert.notNull(channel, "'channel' must not be null"); + Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null"); + Assert.isTrue(position >= 0, "'position' must be >= 0"); + + ByteBuffer byteBuffer = ByteBuffer.allocate(bufferSize); + + return Flux.create(emitter -> { + emitter.setCancellation(() -> closeChannel(channel)); + AsynchronousFileChannelCompletionHandler completionHandler = + new AsynchronousFileChannelCompletionHandler(emitter, position, + dataBufferFactory, byteBuffer); + channel.read(byteBuffer, position, channel, completionHandler); + }); + } + + private static void closeChannel(Channel channel) { + try { + if (channel != null) { + channel.close(); + } + } + catch (IOException ignored) { + } } /** @@ -189,24 +232,23 @@ public abstract class DataBufferUtils { private final DataBufferFactory dataBufferFactory; - private final int chunkSize; + private final ByteBuffer byteBuffer; public ReadableByteChannelGenerator(DataBufferFactory dataBufferFactory, int chunkSize) { this.dataBufferFactory = dataBufferFactory; - this.chunkSize = chunkSize; + this.byteBuffer = ByteBuffer.allocate(chunkSize); } @Override public ReadableByteChannel apply(ReadableByteChannel channel, SynchronousSink sub) { try { - ByteBuffer byteBuffer = ByteBuffer.allocate(chunkSize); int read; - if ((read = channel.read(byteBuffer)) >= 0) { - byteBuffer.flip(); + if ((read = channel.read(this.byteBuffer)) >= 0) { + this.byteBuffer.flip(); boolean release = true; DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(read); try { - dataBuffer.write(byteBuffer); + dataBuffer.write(this.byteBuffer); release = false; sub.next(dataBuffer); } @@ -215,6 +257,7 @@ public abstract class DataBufferUtils { release(dataBuffer); } } + this.byteBuffer.clear(); } else { sub.complete(); @@ -227,4 +270,58 @@ public abstract class DataBufferUtils { } } + private static class AsynchronousFileChannelCompletionHandler + implements CompletionHandler { + + private final FluxSink emitter; + + private final ByteBuffer byteBuffer; + + private final DataBufferFactory dataBufferFactory; + + private long position; + + private AsynchronousFileChannelCompletionHandler(FluxSink emitter, + long position, DataBufferFactory dataBufferFactory, ByteBuffer byteBuffer) { + this.emitter = emitter; + this.position = position; + this.dataBufferFactory = dataBufferFactory; + this.byteBuffer = byteBuffer; + } + + @Override + public void completed(Integer read, AsynchronousFileChannel channel) { + if (read != -1) { + this.position += read; + this.byteBuffer.flip(); + boolean release = true; + DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(read); + try { + dataBuffer.write(this.byteBuffer); + release = false; + this.emitter.next(dataBuffer); + } + finally { + if (release) { + release(dataBuffer); + } + } + this.byteBuffer.clear(); + + if (!this.emitter.isCancelled()) { + channel.read(this.byteBuffer, this.position, channel, this); + } + } + else { + this.emitter.complete(); + closeChannel(channel); + } + } + + @Override + public void failed(Throwable exc, AsynchronousFileChannel channel) { + this.emitter.error(exc); + closeChannel(channel); + } + } } diff --git a/spring-core/src/test/java/org/springframework/core/codec/ResourceRegionEncoderTests.java b/spring-core/src/test/java/org/springframework/core/codec/ResourceRegionEncoderTests.java index 5fcfbf0c471..d6540e0d287 100644 --- a/spring-core/src/test/java/org/springframework/core/codec/ResourceRegionEncoderTests.java +++ b/spring-core/src/test/java/org/springframework/core/codec/ResourceRegionEncoderTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,6 +27,7 @@ import reactor.test.StepVerifier; import org.springframework.core.ResolvableType; import org.springframework.core.io.ByteArrayResource; +import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.Resource; import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase; import org.springframework.core.io.buffer.DataBuffer; @@ -51,13 +52,9 @@ public class ResourceRegionEncoderTests extends AbstractDataBufferAllocatingTest private ResourceRegionEncoder encoder; - private Resource resource; - @Before public void setUp() { this.encoder = new ResourceRegionEncoder(); - String content = "Spring Framework test resource content."; - this.resource = new ByteArrayResource(content.getBytes(StandardCharsets.UTF_8)); this.bufferFactory = new DefaultDataBufferFactory(); } @@ -74,12 +71,23 @@ public class ResourceRegionEncoderTests extends AbstractDataBufferAllocatingTest } @Test - public void shouldEncodeResourceRegion() throws Exception { + public void shouldEncodeResourceRegionFileResource() throws Exception { + shouldEncodeResourceRegion( + new ClassPathResource("ResourceRegionEncoderTests.txt", getClass())); + } + + @Test + public void shouldEncodeResourceRegionByteArrayResource() throws Exception { + String content = "Spring Framework test resource content."; + shouldEncodeResourceRegion(new ByteArrayResource(content.getBytes(StandardCharsets.UTF_8))); + } - ResourceRegion region = new ResourceRegion(this.resource, 0, 6); + private void shouldEncodeResourceRegion(Resource resource) { + ResourceRegion region = new ResourceRegion(resource, 0, 6); Flux result = this.encoder.encode(Mono.just(region), this.bufferFactory, - ResolvableType.forClass(ResourceRegion.class), MimeTypeUtils.APPLICATION_OCTET_STREAM - , Collections.emptyMap()); + ResolvableType.forClass(ResourceRegion.class), + MimeTypeUtils.APPLICATION_OCTET_STREAM, + Collections.emptyMap()); StepVerifier.create(result) .consumeNextWith(stringConsumer("Spring")) @@ -88,13 +96,24 @@ public class ResourceRegionEncoderTests extends AbstractDataBufferAllocatingTest } @Test - public void shouldEncodeMultipleResourceRegions() throws Exception { + public void shouldEncodeMultipleResourceRegionsFileResource() throws Exception { + shouldEncodeMultipleResourceRegions( + new ClassPathResource("ResourceRegionEncoderTests.txt", getClass())); + } + + @Test + public void shouldEncodeMultipleResourceRegionsByteArrayResource() throws Exception { + String content = "Spring Framework test resource content."; + shouldEncodeMultipleResourceRegions( + new ByteArrayResource(content.getBytes(StandardCharsets.UTF_8))); + } + private void shouldEncodeMultipleResourceRegions(Resource resource) { Flux regions = Flux.just( - new ResourceRegion(this.resource, 0, 6), - new ResourceRegion(this.resource, 7, 9), - new ResourceRegion(this.resource, 17, 4), - new ResourceRegion(this.resource, 22, 17) + new ResourceRegion(resource, 0, 6), + new ResourceRegion(resource, 7, 9), + new ResourceRegion(resource, 17, 4), + new ResourceRegion(resource, 22, 17) ); String boundary = MimeTypeUtils.generateMultipartBoundaryString(); diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java index 3c5bc823857..b96867dd95c 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ package org.springframework.core.io.buffer; import java.io.InputStream; import java.net.URI; +import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.FileChannel; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; @@ -34,7 +35,7 @@ import static org.junit.Assert.assertFalse; public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { @Test - public void readChannel() throws Exception { + public void readReadableByteChannel() throws Exception { URI uri = DataBufferUtilsTests.class.getResource("DataBufferUtilsTests.txt").toURI(); FileChannel channel = FileChannel.open(Paths.get(uri), StandardOpenOption.READ); Flux flux = DataBufferUtils.read(channel, this.bufferFactory, 3); @@ -50,6 +51,37 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { assertFalse(channel.isOpen()); } + @Test + public void readAsynchronousFileChannel() throws Exception { + URI uri = DataBufferUtilsTests.class.getResource("DataBufferUtilsTests.txt").toURI(); + AsynchronousFileChannel + channel = AsynchronousFileChannel.open(Paths.get(uri), StandardOpenOption.READ); + Flux flux = DataBufferUtils.read(channel, this.bufferFactory, 3); + + StepVerifier.create(flux) + .consumeNextWith(stringConsumer("foo")) + .consumeNextWith(stringConsumer("bar")) + .consumeNextWith(stringConsumer("baz")) + .consumeNextWith(stringConsumer("qux")) + .expectComplete() + .verify(); + } + + @Test + public void readAsynchronousFileChannelPosition() throws Exception { + URI uri = DataBufferUtilsTests.class.getResource("DataBufferUtilsTests.txt").toURI(); + AsynchronousFileChannel + channel = AsynchronousFileChannel.open(Paths.get(uri), StandardOpenOption.READ); + Flux flux = DataBufferUtils.read(channel, 3, this.bufferFactory, 3); + + StepVerifier.create(flux) + .consumeNextWith(stringConsumer("bar")) + .consumeNextWith(stringConsumer("baz")) + .consumeNextWith(stringConsumer("qux")) + .expectComplete() + .verify(); + } + @Test public void readUnalignedChannel() throws Exception { URI uri = DataBufferUtilsTests.class.getResource("DataBufferUtilsTests.txt").toURI(); diff --git a/spring-core/src/test/resources/org/springframework/core/codec/ResourceRegionEncoderTests.txt b/spring-core/src/test/resources/org/springframework/core/codec/ResourceRegionEncoderTests.txt new file mode 100644 index 00000000000..84bbb9ddaf7 --- /dev/null +++ b/spring-core/src/test/resources/org/springframework/core/codec/ResourceRegionEncoderTests.txt @@ -0,0 +1 @@ +Spring Framework test resource content. \ No newline at end of file