From 5520e730f11e26c42e83eef2089690112e5b7417 Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Fri, 19 Jan 2018 17:37:40 +0100 Subject: [PATCH] DataBufferUtils.read should not take input stream/channel as parameter Fixed by creating `Callable`-based variants, as explained in the JIRA issue. Issue: SPR-16403 --- .../core/io/buffer/DataBufferUtils.java | 204 +++++++++++++----- .../core/io/buffer/DataBufferUtilsTests.java | 43 ++-- .../SynchronossPartHttpMessageReader.java | 7 +- 3 files changed, 166 insertions(+), 88 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index f4e6dcb4773..dbf577ca512 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -28,9 +28,9 @@ import java.nio.channels.CompletionHandler; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; import java.nio.file.StandardOpenOption; +import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiFunction; import java.util.function.BinaryOperator; import java.util.function.Consumer; @@ -69,69 +69,158 @@ public abstract class DataBufferUtils { //--------------------------------------------------------------------- /** - * Read the given {@code InputStream} into a {@code Flux} of + * Read the given {@code InputStream} into a read-once {@code Flux} of * {@code DataBuffer}s. Closes the input stream when the flux is terminated. + *

The resulting {@code Flux} can only be subscribed to once. See + * {@link #readInputStream(Callable, DataBufferFactory, int)} for a variant that supports + * multiple subscriptions. * @param inputStream the input stream to read from * @param dataBufferFactory the factory to create data buffers with * @param bufferSize the maximum size of the data buffers * @return a flux of data buffers read from the given channel + * @deprecated as of Spring 5.0.3, in favor of + * {@link #readInputStream(Callable, DataBufferFactory, int)}, to be removed in Spring 5.1 */ + @Deprecated public static Flux read(InputStream inputStream, DataBufferFactory dataBufferFactory, int bufferSize) { + return readInputStream(() -> inputStream, dataBufferFactory, bufferSize); + } - Assert.notNull(inputStream, "InputStream must not be null"); + /** + * Obtain a {@link InputStream} from the given supplier, and read it into a {@code Flux} of + * {@code DataBuffer}s. Closes the input stream when the flux is terminated. + * @param inputStreamSupplier the supplier for the input stream to read from + * @param dataBufferFactory the factory to create data buffers with + * @param bufferSize the maximum size of the data buffers + * @return a flux of data buffers read from the given channel + */ + public static Flux readInputStream(Callable inputStreamSupplier, + DataBufferFactory dataBufferFactory, int bufferSize) { - ReadableByteChannel channel = Channels.newChannel(inputStream); - return read(channel, dataBufferFactory, bufferSize); + Assert.notNull(inputStreamSupplier, "'inputStreamSupplier' must not be null"); + + return readByteChannel(() -> Channels.newChannel(inputStreamSupplier.call()), + dataBufferFactory, bufferSize); } /** - * Read the given {@code ReadableByteChannel} into a {@code Flux} of + * Read the given {@code ReadableByteChannel} into a read-once {@code Flux} of * {@code DataBuffer}s. Closes the channel when the flux is terminated. + *

The resulting {@code Flux} can only be subscribed to once. See + * {@link #readByteChannel(Callable, DataBufferFactory, int)} for a variant that supports + * multiple subscriptions. * @param channel the channel to read from * @param dataBufferFactory the factory to create data buffers with * @param bufferSize the maximum size of the data buffers * @return a flux of data buffers read from the given channel + * @deprecated as of Spring 5.0.3, in favor of + * {@link #readByteChannel(Callable, DataBufferFactory, int)}, to be removed in Spring 5.1 */ + @Deprecated public static Flux read(ReadableByteChannel channel, DataBufferFactory dataBufferFactory, int bufferSize) { + return readByteChannel(() -> channel, dataBufferFactory, bufferSize); + } - Assert.notNull(channel, "ReadableByteChannel must not be null"); - Assert.notNull(dataBufferFactory, "DataBufferFactory must not be null"); + /** + * Obtain a {@link ReadableByteChannel} from the given supplier, and read it into a + * {@code Flux} of {@code DataBuffer}s. Closes the channel when the flux is terminated. + * @param channelSupplier the supplier for the channel to read from + * @param dataBufferFactory the factory to create data buffers with + * @param bufferSize the maximum size of the data buffers + * @return a flux of data buffers read from the given channel + */ + public static Flux readByteChannel(Callable channelSupplier, + DataBufferFactory dataBufferFactory, int bufferSize) { + + Assert.notNull(channelSupplier, "'channelSupplier' must not be null"); + Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null"); Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0"); - return Flux.generate(() -> channel, - new ReadableByteChannelGenerator(dataBufferFactory, bufferSize), - DataBufferUtils::closeChannel); + return Flux.using(channelSupplier, + channel -> { + ReadableByteChannelGenerator generator = + new ReadableByteChannelGenerator(channel, dataBufferFactory, + bufferSize); + return Flux.generate(generator); + }, + DataBufferUtils::closeChannel + ); } /** - * Read the given {@code AsynchronousFileChannel} into a {@code Flux} of - * {@code DataBuffer}s. Closes the channel when the flux is terminated. + * Read the given {@code AsynchronousFileChannel} into a read-once {@code Flux} + * of {@code DataBuffer}s. Closes the channel when the flux is terminated. + *

The resulting {@code Flux} can only be subscribed to once. See + * {@link #readAsynchronousFileChannel(Callable, DataBufferFactory, int)} for a variant that + * supports multiple subscriptions. * @param channel the channel to read from * @param dataBufferFactory the factory to create data buffers with * @param bufferSize the maximum size of the data buffers * @return a flux of data buffers read from the given channel + * @deprecated as of Spring 5.0.3, in favor of + * {@link #readAsynchronousFileChannel(Callable, DataBufferFactory, int)}, to be removed in + * Spring 5.1 */ + @Deprecated public static Flux read(AsynchronousFileChannel channel, DataBufferFactory dataBufferFactory, int bufferSize) { - return read(channel, 0, dataBufferFactory, bufferSize); + return readAsynchronousFileChannel(() -> channel, dataBufferFactory, bufferSize); } /** - * Read the given {@code AsynchronousFileChannel} into a {@code Flux} of - * {@code DataBuffer}s, starting at the given position. Closes the channel when the flux is + * Read the given {@code AsynchronousFileChannel} into a read-once {@code Flux} + * of {@code DataBuffer}s, starting at the given position. Closes the channel when the flux is * terminated. + *

The resulting {@code Flux} can only be subscribed to once. See + * {@link #readAsynchronousFileChannel(Callable, long, DataBufferFactory, int)} for a variant + * that supports multiple subscriptions. * @param channel the channel to read from * @param position the position to start reading from * @param dataBufferFactory the factory to create data buffers with * @param bufferSize the maximum size of the data buffers * @return a flux of data buffers read from the given channel + * @deprecated as of Spring 5.0.3, in favor of + * {@link #readAsynchronousFileChannel(Callable, long, DataBufferFactory, int)}, to be removed + * in Spring 5.1 */ + @Deprecated public static Flux read(AsynchronousFileChannel channel, long position, DataBufferFactory dataBufferFactory, int bufferSize) { + return readAsynchronousFileChannel(() -> channel, position, dataBufferFactory, bufferSize); + } - Assert.notNull(channel, "'channel' must not be null"); + /** + * Obtain a {@code AsynchronousFileChannel} from the given supplier, and read it into a + * {@code Flux} of {@code DataBuffer}s. Closes the channel when the flux is terminated. + * @param channelSupplier the supplier for the channel to read from + * @param dataBufferFactory the factory to create data buffers with + * @param bufferSize the maximum size of the data buffers + * @return a flux of data buffers read from the given channel + */ + public static Flux readAsynchronousFileChannel( + Callable channelSupplier, + DataBufferFactory dataBufferFactory, int bufferSize) { + + return readAsynchronousFileChannel(channelSupplier, 0, dataBufferFactory, bufferSize); + } + + /** + * Obtain a {@code AsynchronousFileChannel} from the given supplier, and read it into a + * {@code Flux} of {@code DataBuffer}s, starting at the given position. Closes the channel when + * the flux is terminated. + * @param channelSupplier the supplier for the channel to read from + * @param position the position to start reading from + * @param dataBufferFactory the factory to create data buffers with + * @param bufferSize the maximum size of the data buffers + * @return a flux of data buffers read from the given channel + */ + public static Flux readAsynchronousFileChannel( + Callable channelSupplier, + long position, DataBufferFactory dataBufferFactory, int bufferSize) { + + Assert.notNull(channelSupplier, "'channelSupplier' must not be null"); Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null"); Assert.isTrue(position >= 0, "'position' must be >= 0"); Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0"); @@ -139,22 +228,23 @@ public abstract class DataBufferUtils { DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(bufferSize); ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize); - return Flux.create(sink -> { - sink.onDispose(() -> closeChannel(channel)); - CompletionHandler completionHandler = - new AsynchronousFileChannelReadCompletionHandler(channel, sink, position, - dataBufferFactory, bufferSize); - channel.read(byteBuffer, position, dataBuffer, completionHandler); - }); + return Flux.using(channelSupplier, + channel -> Flux.create(sink -> { + CompletionHandler completionHandler = + new AsynchronousFileChannelReadCompletionHandler(channel, + sink, position, dataBufferFactory, bufferSize); + channel.read(byteBuffer, position, dataBuffer, completionHandler); + }), + DataBufferUtils::closeChannel); } /** * Read the given {@code Resource} into a {@code Flux} of {@code DataBuffer}s. *

If the resource is a file, it is read into an * {@code AsynchronousFileChannel} and turned to {@code Flux} via - * {@link #read(AsynchronousFileChannel, DataBufferFactory, int)} or else - * fall back on {@link #read(InputStream, DataBufferFactory, int)} closes - * the channel when the flux is terminated. + * {@link #readAsynchronousFileChannel(Callable, DataBufferFactory, int)} or else + * fall back to {@link #readByteChannel(Callable, DataBufferFactory, int)}. + * Closes the channel when the flux is terminated. * @param resource the resource to read from * @param dataBufferFactory the factory to create data buffers with * @param bufferSize the maximum size of the data buffers @@ -171,9 +261,9 @@ public abstract class DataBufferUtils { * starting at the given position. *

If the resource is a file, it is read into an * {@code AsynchronousFileChannel} and turned to {@code Flux} via - * {@link #read(AsynchronousFileChannel, DataBufferFactory, int)} or else - * fall back on {@link #read(InputStream, DataBufferFactory, int)}. Closes - * the channel when the flux is terminated. + * {@link #readAsynchronousFileChannel(Callable, DataBufferFactory, int)} or else + * fall back on {@link #readByteChannel(Callable, DataBufferFactory, int)}. + * Closes the channel when the flux is terminated. * @param resource the resource to read from * @param position the position to start reading from * @param dataBufferFactory the factory to create data buffers with @@ -186,26 +276,23 @@ public abstract class DataBufferUtils { try { if (resource.isFile()) { File file = resource.getFile(); - AsynchronousFileChannel channel = - AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ); - return DataBufferUtils.read(channel, position, dataBufferFactory, bufferSize); + + return readAsynchronousFileChannel( + () -> AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ), + position, dataBufferFactory, bufferSize); } } catch (IOException ignore) { // fallback to resource.readableChannel(), below } - try { - ReadableByteChannel channel = resource.readableChannel(); - Flux in = DataBufferUtils.read(channel, dataBufferFactory, bufferSize); - return DataBufferUtils.skipUntilByteCount(in, position); - } - catch (IOException ex) { - return Flux.error(ex); - } + Flux result = readByteChannel(resource::readableChannel, dataBufferFactory, bufferSize); + return position == 0 ? result : skipUntilByteCount(result, position); } + + //--------------------------------------------------------------------- // Writing //--------------------------------------------------------------------- @@ -304,7 +391,7 @@ public abstract class DataBufferUtils { private static void closeChannel(@Nullable Channel channel) { try { - if (channel != null) { + if (channel != null && channel.isOpen()) { channel.close(); } } @@ -442,44 +529,49 @@ public abstract class DataBufferUtils { private static class ReadableByteChannelGenerator - implements BiFunction, ReadableByteChannel> { + implements Consumer> { + + private final ReadableByteChannel channel; private final DataBufferFactory dataBufferFactory; private final int bufferSize; - public ReadableByteChannelGenerator(DataBufferFactory dataBufferFactory, int bufferSize) { + + public ReadableByteChannelGenerator(ReadableByteChannel channel, + DataBufferFactory dataBufferFactory, int bufferSize) { + + this.channel = channel; this.dataBufferFactory = dataBufferFactory; this.bufferSize = bufferSize; } @Override - public ReadableByteChannel apply(ReadableByteChannel channel, - SynchronousSink sub) { + public void accept(SynchronousSink sink) { boolean release = true; DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize); try { int read; ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, dataBuffer.capacity()); - if ((read = channel.read(byteBuffer)) >= 0) { + if ((read = this.channel.read(byteBuffer)) >= 0) { dataBuffer.writePosition(read); release = false; - sub.next(dataBuffer); + sink.next(dataBuffer); } else { - sub.complete(); + sink.complete(); } } catch (IOException ex) { - sub.error(ex); + sink.error(ex); } finally { if (release) { release(dataBuffer); } } - return channel; } + } @@ -494,7 +586,10 @@ public abstract class DataBufferUtils { private final int bufferSize; - private AtomicLong position; + private final AtomicLong position; + + private final AtomicBoolean disposed = new AtomicBoolean(); + private AsynchronousFileChannelReadCompletionHandler( AsynchronousFileChannel channel, FluxSink sink, @@ -513,7 +608,7 @@ public abstract class DataBufferUtils { dataBuffer.writePosition(read); this.sink.next(dataBuffer); - if (!this.sink.isCancelled()) { + if (!this.disposed.get()) { DataBuffer newDataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize); ByteBuffer newByteBuffer = newDataBuffer.asByteBuffer(0, this.bufferSize); @@ -522,7 +617,6 @@ public abstract class DataBufferUtils { } else { release(dataBuffer); - closeChannel(this.channel); this.sink.complete(); } } @@ -530,9 +624,9 @@ public abstract class DataBufferUtils { @Override public void failed(Throwable exc, DataBuffer dataBuffer) { release(dataBuffer); - closeChannel(this.channel); this.sink.error(exc); } + } diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java index 3c7dab56cd5..e62b72ba561 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java @@ -52,10 +52,11 @@ import static org.mockito.Mockito.*; public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { @Test - public void readReadableByteChannel() throws Exception { + public void readByteChannel() throws Exception { URI uri = DataBufferUtilsTests.class.getResource("DataBufferUtilsTests.txt").toURI(); - FileChannel channel = FileChannel.open(Paths.get(uri), StandardOpenOption.READ); - Flux flux = DataBufferUtils.read(channel, this.bufferFactory, 3); + Flux flux = + DataBufferUtils.readByteChannel(() -> FileChannel.open(Paths.get(uri), StandardOpenOption.READ), + this.bufferFactory, 3); StepVerifier.create(flux) .consumeNextWith(stringConsumer("foo")) @@ -64,16 +65,14 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { .consumeNextWith(stringConsumer("qux")) .expectComplete() .verify(Duration.ofSeconds(5)); - - assertFalse(channel.isOpen()); } @Test public void readAsynchronousFileChannel() throws Exception { URI uri = DataBufferUtilsTests.class.getResource("DataBufferUtilsTests.txt").toURI(); - AsynchronousFileChannel - channel = AsynchronousFileChannel.open(Paths.get(uri), StandardOpenOption.READ); - Flux flux = DataBufferUtils.read(channel, this.bufferFactory, 3); + Flux flux = DataBufferUtils.readAsynchronousFileChannel( + () -> AsynchronousFileChannel.open(Paths.get(uri), StandardOpenOption.READ), + this.bufferFactory, 3); StepVerifier.create(flux) .consumeNextWith(stringConsumer("foo")) @@ -87,9 +86,9 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { @Test public void readAsynchronousFileChannelPosition() throws Exception { URI uri = DataBufferUtilsTests.class.getResource("DataBufferUtilsTests.txt").toURI(); - AsynchronousFileChannel - channel = AsynchronousFileChannel.open(Paths.get(uri), StandardOpenOption.READ); - Flux flux = DataBufferUtils.read(channel, 3, this.bufferFactory, 3); + Flux flux = DataBufferUtils.readAsynchronousFileChannel( + () -> AsynchronousFileChannel.open(Paths.get(uri), StandardOpenOption.READ), + 3, this.bufferFactory, 3); StepVerifier.create(flux) .consumeNextWith(stringConsumer("bar")) @@ -99,26 +98,12 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { .verify(Duration.ofSeconds(5)); } - @Test - public void readUnalignedChannel() throws Exception { - URI uri = DataBufferUtilsTests.class.getResource("DataBufferUtilsTests.txt").toURI(); - FileChannel channel = FileChannel.open(Paths.get(uri), StandardOpenOption.READ); - Flux flux = DataBufferUtils.read(channel, this.bufferFactory, 5); - - StepVerifier.create(flux) - .consumeNextWith(stringConsumer("fooba")) - .consumeNextWith(stringConsumer("rbazq")) - .consumeNextWith(stringConsumer("ux")) - .expectComplete() - .verify(Duration.ofSeconds(5)); - - assertFalse(channel.isOpen()); - } - @Test public void readInputStream() throws Exception { InputStream is = DataBufferUtilsTests.class.getResourceAsStream("DataBufferUtilsTests.txt"); - Flux flux = DataBufferUtils.read(is, this.bufferFactory, 3); + Flux flux = DataBufferUtils.readInputStream( + () -> DataBufferUtilsTests.class.getResourceAsStream("DataBufferUtilsTests.txt"), + this.bufferFactory, 3); StepVerifier.create(flux) .consumeNextWith(stringConsumer("foo")) @@ -317,7 +302,7 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { .thenAnswer(putByte('c')) .thenReturn(-1); - Flux read = DataBufferUtils.read(channel, this.bufferFactory, 1); + Flux read = DataBufferUtils.readByteChannel(() -> channel, this.bufferFactory, 1); StepVerifier.create(read) .consumeNextWith(stringConsumer("a")) diff --git a/spring-web/src/main/java/org/springframework/http/codec/multipart/SynchronossPartHttpMessageReader.java b/spring-web/src/main/java/org/springframework/http/codec/multipart/SynchronossPartHttpMessageReader.java index 7085da1d8c3..65d02c8f624 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/multipart/SynchronossPartHttpMessageReader.java +++ b/spring-web/src/main/java/org/springframework/http/codec/multipart/SynchronossPartHttpMessageReader.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,6 @@ package org.springframework.http.codec.multipart; import java.io.File; import java.io.IOException; -import java.io.InputStream; import java.nio.channels.Channels; import java.nio.channels.FileChannel; import java.nio.channels.ReadableByteChannel; @@ -268,8 +267,8 @@ public class SynchronossPartHttpMessageReader implements HttpMessageReader @Override public Flux content() { - InputStream inputStream = this.storage.getInputStream(); - return DataBufferUtils.read(inputStream, getBufferFactory(), 4096); + return DataBufferUtils.readInputStream(this.storage::getInputStream, getBufferFactory(), + 4096); } protected StreamStorage getStorage() {