Browse Source

Polishing

pull/1111/head
Arjen Poutsma 10 years ago
parent
commit
b71f143dfd
  1. 40
      spring-web-reactive/src/main/java/org/springframework/core/io/buffer/support/DataBufferUtils.java

40
spring-web-reactive/src/main/java/org/springframework/core/io/buffer/support/DataBufferUtils.java

@ -16,7 +16,6 @@ @@ -16,7 +16,6 @@
package org.springframework.core.io.buffer.support;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
@ -44,10 +43,10 @@ import org.springframework.util.Assert; @@ -44,10 +43,10 @@ import org.springframework.util.Assert;
*/
public abstract class DataBufferUtils {
private static final Consumer<? extends Closeable> CLOSE_CONSUMER = closeable -> {
private static final Consumer<ReadableByteChannel> CLOSE_CONSUMER = channel -> {
try {
if (closeable != null) {
closeable.close();
if (channel != null) {
channel.close();
}
}
catch (IOException ignored) {
@ -55,42 +54,37 @@ public abstract class DataBufferUtils { @@ -55,42 +54,37 @@ public abstract class DataBufferUtils {
};
/**
* Reads the given {@code ReadableByteChannel} into a {@code Flux} of
* {@code DataBuffer}s. Closes the channel when the flux is terminated.
* @param channel the channel to read from
* Reads the given {@code InputStream} into a {@code Flux} of
* {@code DataBuffer}s. Closes the stream when the flux inputStream terminated.
* @param inputStream the input stream to read from
* @param allocator the allocator to create data buffers with
* @param bufferSize the maximum size of the data buffers
* @return a flux of data buffers read from the given channel
*/
public static Flux<DataBuffer> read(ReadableByteChannel channel,
public static Flux<DataBuffer> read(InputStream inputStream,
DataBufferAllocator allocator, int bufferSize) {
Assert.notNull(channel, "'channel' must not be null");
Assert.notNull(inputStream, "'inputStream' must not be null");
Assert.notNull(allocator, "'allocator' must not be null");
return Flux.create(new ReadableByteChannelConsumer(allocator, bufferSize),
subscriber -> channel, closeConsumer());
ReadableByteChannel channel = Channels.newChannel(inputStream);
return read(channel, allocator, bufferSize);
}
/**
* Reads the given {@code InputStream} into a {@code Flux} of
* {@code DataBuffer}s. Closes the stream when the flux inputStream terminated.
* @param inputStream the input stream to read from
* Reads the given {@code ReadableByteChannel} into a {@code Flux} of
* {@code DataBuffer}s. Closes the channel when the flux is terminated.
* @param channel the channel to read from
* @param allocator the allocator to create data buffers with
* @param bufferSize the maximum size of the data buffers
* @return a flux of data buffers read from the given channel
*/
public static Flux<DataBuffer> read(InputStream inputStream,
public static Flux<DataBuffer> read(ReadableByteChannel channel,
DataBufferAllocator allocator, int bufferSize) {
Assert.notNull(inputStream, "'inputStream' must not be null");
Assert.notNull(channel, "'channel' must not be null");
Assert.notNull(allocator, "'allocator' must not be null");
ReadableByteChannel channel = Channels.newChannel(inputStream);
return read(channel, allocator, bufferSize);
}
@SuppressWarnings("unchecked")
private static <T extends Closeable> Consumer<T> closeConsumer() {
return (Consumer<T>) CLOSE_CONSUMER;
return Flux.create(new ReadableByteChannelConsumer(allocator, bufferSize),
subscriber -> channel, CLOSE_CONSUMER);
}
/**

Loading…
Cancel
Save