From b71f143dfd622e9e3b7742e032ce982ec00a94b2 Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Tue, 3 May 2016 16:33:56 +0200 Subject: [PATCH] Polishing --- .../io/buffer/support/DataBufferUtils.java | 40 ++++++++----------- 1 file changed, 17 insertions(+), 23 deletions(-) diff --git a/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/support/DataBufferUtils.java b/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/support/DataBufferUtils.java index ad50c7b3efa..28ce6781c09 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/support/DataBufferUtils.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/support/DataBufferUtils.java @@ -16,7 +16,6 @@ package org.springframework.core.io.buffer.support; -import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; @@ -44,10 +43,10 @@ import org.springframework.util.Assert; */ public abstract class DataBufferUtils { - private static final Consumer CLOSE_CONSUMER = closeable -> { + private static final Consumer CLOSE_CONSUMER = channel -> { try { - if (closeable != null) { - closeable.close(); + if (channel != null) { + channel.close(); } } catch (IOException ignored) { @@ -55,42 +54,37 @@ public abstract class DataBufferUtils { }; /** - * Reads the given {@code ReadableByteChannel} into a {@code Flux} of - * {@code DataBuffer}s. Closes the channel when the flux is terminated. - * @param channel the channel to read from + * Reads the given {@code InputStream} into a {@code Flux} of + * {@code DataBuffer}s. Closes the stream when the flux inputStream terminated. + * @param inputStream the input stream to read from * @param allocator the allocator to create data buffers with * @param bufferSize the maximum size of the data buffers * @return a flux of data buffers read from the given channel */ - public static Flux read(ReadableByteChannel channel, + public static Flux read(InputStream inputStream, DataBufferAllocator allocator, int bufferSize) { - Assert.notNull(channel, "'channel' must not be null"); + Assert.notNull(inputStream, "'inputStream' must not be null"); Assert.notNull(allocator, "'allocator' must not be null"); - return Flux.create(new ReadableByteChannelConsumer(allocator, bufferSize), - subscriber -> channel, closeConsumer()); + ReadableByteChannel channel = Channels.newChannel(inputStream); + return read(channel, allocator, bufferSize); } /** - * Reads the given {@code InputStream} into a {@code Flux} of - * {@code DataBuffer}s. Closes the stream when the flux inputStream terminated. - * @param inputStream the input stream to read from + * Reads the given {@code ReadableByteChannel} into a {@code Flux} of + * {@code DataBuffer}s. Closes the channel when the flux is terminated. + * @param channel the channel to read from * @param allocator the allocator to create data buffers with * @param bufferSize the maximum size of the data buffers * @return a flux of data buffers read from the given channel */ - public static Flux read(InputStream inputStream, + public static Flux read(ReadableByteChannel channel, DataBufferAllocator allocator, int bufferSize) { - Assert.notNull(inputStream, "'inputStream' must not be null"); + Assert.notNull(channel, "'channel' must not be null"); Assert.notNull(allocator, "'allocator' must not be null"); - ReadableByteChannel channel = Channels.newChannel(inputStream); - return read(channel, allocator, bufferSize); - } - - @SuppressWarnings("unchecked") - private static Consumer closeConsumer() { - return (Consumer) CLOSE_CONSUMER; + return Flux.create(new ReadableByteChannelConsumer(allocator, bufferSize), + subscriber -> channel, CLOSE_CONSUMER); } /**