@ -56,6 +56,7 @@ public abstract class DataBufferUtils {
@@ -56,6 +56,7 @@ public abstract class DataBufferUtils {
private static final Consumer < DataBuffer > RELEASE_CONSUMER = DataBufferUtils : : release ;
//---------------------------------------------------------------------
// Reading
//---------------------------------------------------------------------
@ -74,31 +75,31 @@ public abstract class DataBufferUtils {
@@ -74,31 +75,31 @@ public abstract class DataBufferUtils {
* { @link # readInputStream ( Callable , DataBufferFactory , int ) } , to be removed in Spring 5 . 1
* /
@Deprecated
public static Flux < DataBuffer > read ( InputStream inputStream ,
DataBufferFactory dataBufferFactory , int bufferSize ) {
public static Flux < DataBuffer > read (
InputStream inputStream , DataBufferFactory dataBufferFactory , int bufferSize ) {
return readInputStream ( ( ) - > inputStream , dataBufferFactory , bufferSize ) ;
}
/ * *
* Obtain a { @link InputStream } from the given supplier , and read it into a { @code Flux } of
* { @code DataBuffer } s . Closes the input stream when the flux is terminated .
* Obtain a { @link InputStream } from the given supplier , and read it into a { @code Flux }
* of { @code DataBuffer } s . Closes the input stream when the flux is terminated .
* @param inputStreamSupplier the supplier for the input stream to read from
* @param dataBufferFactory the factory to create data buffers with
* @param bufferSize the maximum size of the data buffers
* @return a flux of data buffers read from the given channel
* /
public static Flux < DataBuffer > readInputStream ( Callable < InputStream > inputStreamSupplier ,
DataBufferFactory dataBufferFactory , int bufferSize ) {
public static Flux < DataBuffer > readInputStream (
Callable < InputStream > inputStreamSupplier , DataBufferFactory dataBufferFactory , int bufferSize ) {
Assert . notNull ( inputStreamSupplier , "'inputStreamSupplier' must not be null" ) ;
return readByteChannel ( ( ) - > Channels . newChannel ( inputStreamSupplier . call ( ) ) ,
dataBufferFactory , bufferSize ) ;
return readByteChannel ( ( ) - > Channels . newChannel ( inputStreamSupplier . call ( ) ) , dataBufferFactory , bufferSize ) ;
}
/ * *
* Read the given { @code ReadableByteChannel } into a < strong > read - once < / strong > { @code Flux } of
* { @code DataBuffer } s . Closes the channel when the flux is terminated .
* Read the given { @code ReadableByteChannel } into a < strong > read - once < / strong > { @code Flux }
* of { @code DataBuffer } s . Closes the channel when the flux is terminated .
* < p > The resulting { @code Flux } can only be subscribed to once . See
* { @link # readByteChannel ( Callable , DataBufferFactory , int ) } for a variant that supports
* multiple subscriptions .
@ -110,8 +111,9 @@ public abstract class DataBufferUtils {
@@ -110,8 +111,9 @@ public abstract class DataBufferUtils {
* { @link # readByteChannel ( Callable , DataBufferFactory , int ) } , to be removed in Spring 5 . 1
* /
@Deprecated
public static Flux < DataBuffer > read ( ReadableByteChannel channel ,
DataBufferFactory dataBufferFactory , int bufferSize ) {
public static Flux < DataBuffer > read (
ReadableByteChannel channel , DataBufferFactory dataBufferFactory , int bufferSize ) {
return readByteChannel ( ( ) - > channel , dataBufferFactory , bufferSize ) ;
}
@ -123,8 +125,8 @@ public abstract class DataBufferUtils {
@@ -123,8 +125,8 @@ public abstract class DataBufferUtils {
* @param bufferSize the maximum size of the data buffers
* @return a flux of data buffers read from the given channel
* /
public static Flux < DataBuffer > readByteChannel ( Callable < ReadableByteChannel > channelSupplier ,
DataBufferFactory dataBufferFactory , int bufferSize ) {
public static Flux < DataBuffer > readByteChannel (
Callable < ReadableByteChannel > channelSupplier , DataBufferFactory dataBufferFactory , int bufferSize ) {
Assert . notNull ( channelSupplier , "'channelSupplier' must not be null" ) ;
Assert . notNull ( dataBufferFactory , "'dataBufferFactory' must not be null" ) ;
@ -156,8 +158,9 @@ public abstract class DataBufferUtils {
@@ -156,8 +158,9 @@ public abstract class DataBufferUtils {
* Spring 5 . 1
* /
@Deprecated
public static Flux < DataBuffer > read ( AsynchronousFileChannel channel ,
DataBufferFactory dataBufferFactory , int bufferSize ) {
public static Flux < DataBuffer > read (
AsynchronousFileChannel channel , DataBufferFactory dataBufferFactory , int bufferSize ) {
return readAsynchronousFileChannel ( ( ) - > channel , dataBufferFactory , bufferSize ) ;
}
@ -178,8 +181,9 @@ public abstract class DataBufferUtils {
@@ -178,8 +181,9 @@ public abstract class DataBufferUtils {
* in Spring 5 . 1
* /
@Deprecated
public static Flux < DataBuffer > read ( AsynchronousFileChannel channel ,
long position , DataBufferFactory dataBufferFactory , int bufferSize ) {
public static Flux < DataBuffer > read (
AsynchronousFileChannel channel , long position , DataBufferFactory dataBufferFactory , int bufferSize ) {
return readAsynchronousFileChannel ( ( ) - > channel , position , dataBufferFactory , bufferSize ) ;
}
@ -192,24 +196,22 @@ public abstract class DataBufferUtils {
@@ -192,24 +196,22 @@ public abstract class DataBufferUtils {
* @return a flux of data buffers read from the given channel
* /
public static Flux < DataBuffer > readAsynchronousFileChannel (
Callable < AsynchronousFileChannel > channelSupplier ,
DataBufferFactory dataBufferFactory , int bufferSize ) {
Callable < AsynchronousFileChannel > channelSupplier , DataBufferFactory dataBufferFactory , int bufferSize ) {
return readAsynchronousFileChannel ( channelSupplier , 0 , dataBufferFactory , bufferSize ) ;
}
/ * *
* Obtain a { @code AsynchronousFileChannel } from the given supplier , and read it into a
* { @code Flux } of { @code DataBuffer } s , starting at the given position . Closes the channel when
* the flux is terminated .
* { @code Flux } of { @code DataBuffer } s , starting at the given position . Closes the
* channel when the flux is terminated .
* @param channelSupplier the supplier for the channel to read from
* @param position the position to start reading from
* @param dataBufferFactory the factory to create data buffers with
* @param bufferSize the maximum size of the data buffers
* @return a flux of data buffers read from the given channel
* /
public static Flux < DataBuffer > readAsynchronousFileChannel (
Callable < AsynchronousFileChannel > channelSupplier ,
public static Flux < DataBuffer > readAsynchronousFileChannel ( Callable < AsynchronousFileChannel > channelSupplier ,
long position , DataBufferFactory dataBufferFactory , int bufferSize ) {
Assert . notNull ( channelSupplier , "'channelSupplier' must not be null" ) ;
@ -242,8 +244,8 @@ public abstract class DataBufferUtils {
@@ -242,8 +244,8 @@ public abstract class DataBufferUtils {
* @param bufferSize the maximum size of the data buffers
* @return a flux of data buffers read from the given channel
* /
public static Flux < DataBuffer > read ( Resource resource ,
DataBufferFactory dataBufferFactory , int bufferSize ) {
public static Flux < DataBuffer > read (
Resource resource , DataBufferFactory dataBufferFactory , int bufferSize ) {
return read ( resource , 0 , dataBufferFactory , bufferSize ) ;
}
@ -262,13 +264,12 @@ public abstract class DataBufferUtils {
@@ -262,13 +264,12 @@ public abstract class DataBufferUtils {
* @param bufferSize the maximum size of the data buffers
* @return a flux of data buffers read from the given channel
* /
public static Flux < DataBuffer > read ( Resource resource , long position ,
DataBufferFactory dataBufferFactory , int bufferSize ) {
public static Flux < DataBuffer > read (
Resource resource , long position , DataBufferFactory dataBufferFactory , int bufferSize ) {
try {
if ( resource . isFile ( ) ) {
File file = resource . getFile ( ) ;
return readAsynchronousFileChannel (
( ) - > AsynchronousFileChannel . open ( file . toPath ( ) , StandardOpenOption . READ ) ,
position , dataBufferFactory , bufferSize ) ;
@ -283,8 +284,6 @@ public abstract class DataBufferUtils {
@@ -283,8 +284,6 @@ public abstract class DataBufferUtils {
}
//---------------------------------------------------------------------
// Writing
//---------------------------------------------------------------------
@ -295,16 +294,13 @@ public abstract class DataBufferUtils {
@@ -295,16 +294,13 @@ public abstract class DataBufferUtils {
* < strong > not < / strong > { @linkplain # release ( DataBuffer ) release } the data buffers in the
* source . If releasing is required , then subscribe to the returned { @code Flux } with a
* { @link # releaseConsumer ( ) } .
* < p > Note that the writing process does not start until the returned { @code Flux } is subscribed
* to .
* < p > Note that the writing process does not start until the returned { @code Flux } is subscribed to .
* @param source the stream of data buffers to be written
* @param outputStream the output stream to write to
* @return a flux containing the same buffers as in { @code source } , that starts the writing
* process when subscribed to , and that publishes any writing errors and the completion signal
* /
public static Flux < DataBuffer > write ( Publisher < DataBuffer > source ,
OutputStream outputStream ) {
public static Flux < DataBuffer > write ( Publisher < DataBuffer > source , OutputStream outputStream ) {
Assert . notNull ( source , "'source' must not be null" ) ;
Assert . notNull ( outputStream , "'outputStream' must not be null" ) ;
@ -318,21 +314,17 @@ public abstract class DataBufferUtils {
@@ -318,21 +314,17 @@ public abstract class DataBufferUtils {
* < strong > not < / strong > { @linkplain # release ( DataBuffer ) release } the data buffers in the
* source . If releasing is required , then subscribe to the returned { @code Flux } with a
* { @link # releaseConsumer ( ) } .
* < p > Note that the writing process does not start until the returned { @code Flux } is subscribed
* to .
* < p > Note that the writing process does not start until the returned { @code Flux } is subscribed to .
* @param source the stream of data buffers to be written
* @param channel the channel to write to
* @return a flux containing the same buffers as in { @code source } , that starts the writing
* process when subscribed to , and that publishes any writing errors and the completion signal
* /
public static Flux < DataBuffer > write ( Publisher < DataBuffer > source ,
WritableByteChannel channel ) {
public static Flux < DataBuffer > write ( Publisher < DataBuffer > source , WritableByteChannel channel ) {
Assert . notNull ( source , "'source' must not be null" ) ;
Assert . notNull ( channel , "'channel' must not be null" ) ;
Flux < DataBuffer > flux = Flux . from ( source ) ;
return Flux . create ( sink - >
flux . subscribe ( dataBuffer - > {
try {
@ -357,40 +349,36 @@ public abstract class DataBufferUtils {
@@ -357,40 +349,36 @@ public abstract class DataBufferUtils {
* < strong > not < / strong > { @linkplain # release ( DataBuffer ) release } the data buffers in the
* source . If releasing is required , then subscribe to the returned { @code Flux } with a
* { @link # releaseConsumer ( ) } .
* < p > Note that the writing process does not start until the returned { @code Flux } is subscribed
* to .
* < p > Note that the writing process does not start until the returned { @code Flux } is subscribed to .
* @param source the stream of data buffers to be written
* @param channel the channel to write to
* @return a flux containing the same buffers as in { @code source } , that starts the writing
* process when subscribed to , and that publishes any writing errors and the completion signal
* /
public static Flux < DataBuffer > write ( Publisher < DataBuffer > source , AsynchronousFileChannel channel ,
long position ) {
public static Flux < DataBuffer > write (
Publisher < DataBuffer > source , AsynchronousFileChannel channel , long position ) {
Assert . notNull ( source , "'source' must not be null" ) ;
Assert . notNull ( channel , "'channel' must not be null" ) ;
Assert . isTrue ( position > = 0 , "'position' must be >= 0" ) ;
Flux < DataBuffer > flux = Flux . from ( source ) ;
return Flux . create ( sink - > {
BaseSubscriber < DataBuffer > subscriber =
new AsynchronousFileChannelWriteCompletionHandler ( sink , channel , position ) ;
flux . subscribe ( subscriber ) ;
flux . subscribe ( new AsynchronousFileChannelWriteCompletionHandler ( sink , channel , position ) ) ;
} ) ;
}
private static void closeChannel ( @Nullable Channel channel ) {
try {
if ( channel ! = null & & channel . isOpen ( ) ) {
if ( channel ! = null & & channel . isOpen ( ) ) {
try {
channel . close ( ) ;
}
}
catch ( IOException ignored ) {
catch ( IOException ignored ) {
}
}
}
//---------------------------------------------------------------------
// Various
//---------------------------------------------------------------------
@ -484,10 +472,7 @@ public abstract class DataBufferUtils {
@@ -484,10 +472,7 @@ public abstract class DataBufferUtils {
* @return { @code true } if the buffer was released ; { @code false } otherwise .
* /
public static boolean release ( @Nullable DataBuffer dataBuffer ) {
if ( dataBuffer instanceof PooledDataBuffer ) {
return ( ( PooledDataBuffer ) dataBuffer ) . release ( ) ;
}
return false ;
return ( dataBuffer instanceof PooledDataBuffer & & ( ( PooledDataBuffer ) dataBuffer ) . release ( ) ) ;
}
/ * *
@ -520,8 +505,7 @@ public abstract class DataBufferUtils {
@@ -520,8 +505,7 @@ public abstract class DataBufferUtils {
}
private static class ReadableByteChannelGenerator
implements Consumer < SynchronousSink < DataBuffer > > {
private static class ReadableByteChannelGenerator implements Consumer < SynchronousSink < DataBuffer > > {
private final ReadableByteChannel channel ;
@ -529,9 +513,8 @@ public abstract class DataBufferUtils {
@@ -529,9 +513,8 @@ public abstract class DataBufferUtils {
private final int bufferSize ;
public ReadableByteChannelGenerator ( ReadableByteChannel channel ,
DataBufferFactory dataBufferFactory , int bufferSize ) {
public ReadableByteChannelGenerator (
ReadableByteChannel channel , DataBufferFactory dataBufferFactory , int bufferSize ) {
this . channel = channel ;
this . dataBufferFactory = dataBufferFactory ;
@ -563,7 +546,6 @@ public abstract class DataBufferUtils {
@@ -563,7 +546,6 @@ public abstract class DataBufferUtils {
}
}
}
}
@ -582,10 +564,9 @@ public abstract class DataBufferUtils {
@@ -582,10 +564,9 @@ public abstract class DataBufferUtils {
private final AtomicBoolean disposed = new AtomicBoolean ( ) ;
public AsynchronousFileChannelReadCompletionHandler ( AsynchronousFileChannel channel ,
FluxSink < DataBuffer > sink , long position , DataBufferFactory dataBufferFactory , int bufferSize ) {
private AsynchronousFileChannelReadCompletionHandler (
AsynchronousFileChannel channel , FluxSink < DataBuffer > sink ,
long position , DataBufferFactory dataBufferFactory , int bufferSize ) {
this . channel = channel ;
this . sink = sink ;
this . position = new AtomicLong ( position ) ;
@ -599,10 +580,8 @@ public abstract class DataBufferUtils {
@@ -599,10 +580,8 @@ public abstract class DataBufferUtils {
long pos = this . position . addAndGet ( read ) ;
dataBuffer . writePosition ( read ) ;
this . sink . next ( dataBuffer ) ;
if ( ! this . disposed . get ( ) ) {
DataBuffer newDataBuffer =
this . dataBufferFactory . allocateBuffer ( this . bufferSize ) ;
DataBuffer newDataBuffer = this . dataBufferFactory . allocateBuffer ( this . bufferSize ) ;
ByteBuffer newByteBuffer = newDataBuffer . asByteBuffer ( 0 , this . bufferSize ) ;
this . channel . read ( newByteBuffer , pos , newDataBuffer , this ) ;
}
@ -618,12 +597,10 @@ public abstract class DataBufferUtils {
@@ -618,12 +597,10 @@ public abstract class DataBufferUtils {
release ( dataBuffer ) ;
this . sink . error ( exc ) ;
}
}
private static class AsynchronousFileChannelWriteCompletionHandler
extends BaseSubscriber < DataBuffer >
private static class AsynchronousFileChannelWriteCompletionHandler extends BaseSubscriber < DataBuffer >
implements CompletionHandler < Integer , ByteBuffer > {
private final FluxSink < DataBuffer > sink ;
@ -639,6 +616,7 @@ public abstract class DataBufferUtils {
@@ -639,6 +616,7 @@ public abstract class DataBufferUtils {
public AsynchronousFileChannelWriteCompletionHandler (
FluxSink < DataBuffer > sink , AsynchronousFileChannel channel , long position ) {
this . sink = sink ;
this . channel = channel ;
this . position = new AtomicLong ( position ) ;
@ -653,7 +631,6 @@ public abstract class DataBufferUtils {
@@ -653,7 +631,6 @@ public abstract class DataBufferUtils {
protected void hookOnNext ( DataBuffer value ) {
this . dataBuffer = value ;
ByteBuffer byteBuffer = value . asByteBuffer ( ) ;
this . channel . write ( byteBuffer , this . position . get ( ) , byteBuffer , this ) ;
}
@ -678,7 +655,6 @@ public abstract class DataBufferUtils {
@@ -678,7 +655,6 @@ public abstract class DataBufferUtils {
this . channel . write ( byteBuffer , pos , byteBuffer , this ) ;
return ;
}
if ( this . dataBuffer ! = null ) {
this . sink . next ( this . dataBuffer ) ;
this . dataBuffer = null ;
@ -696,4 +672,5 @@ public abstract class DataBufferUtils {
@@ -696,4 +672,5 @@ public abstract class DataBufferUtils {
this . sink . error ( exc ) ;
}
}
}