@ -18,18 +18,24 @@ package org.springframework.core.io.buffer;
@@ -18,18 +18,24 @@ package org.springframework.core.io.buffer;
import java.io.IOException ;
import java.io.InputStream ;
import java.io.OutputStream ;
import java.nio.ByteBuffer ;
import java.nio.channels.AsynchronousFileChannel ;
import java.nio.channels.Channel ;
import java.nio.channels.Channels ;
import java.nio.channels.CompletionHandler ;
import java.nio.channels.ReadableByteChannel ;
import java.nio.channels.WritableByteChannel ;
import java.util.concurrent.atomic.AtomicLong ;
import java.util.function.BiFunction ;
import org.reactivestreams.Publisher ;
import org.reactivestreams.Subscription ;
import reactor.core.publisher.BaseSubscriber ;
import reactor.core.publisher.Flux ;
import reactor.core.publisher.FluxSink ;
import reactor.core.publisher.Mono ;
import reactor.core.publisher.MonoSink ;
import reactor.core.publisher.SynchronousSink ;
import org.springframework.lang.Nullable ;
@ -104,7 +110,6 @@ public abstract class DataBufferUtils {
@@ -104,7 +110,6 @@ public abstract class DataBufferUtils {
* @param bufferSize the maximum size of the data buffers
* @return a flux of data buffers read from the given channel
* /
@SuppressWarnings ( "deprecation" )
public static Flux < DataBuffer > read ( AsynchronousFileChannel channel ,
long position , DataBufferFactory dataBufferFactory , int bufferSize ) {
@ -114,15 +119,104 @@ public abstract class DataBufferUtils {
@@ -114,15 +119,104 @@ public abstract class DataBufferUtils {
ByteBuffer byteBuffer = ByteBuffer . allocate ( bufferSize ) ;
return Flux . create ( emitter - > {
emitter . onDispose ( ( ) - > closeChannel ( channel ) ) ;
AsynchronousFileChannelCompletionHandler completionHandler =
new AsynchronousFileChannelCompletionHandler ( emitter , position ,
return Flux . create ( sink - > {
sink . onDispose ( ( ) - > closeChannel ( channel ) ) ;
AsynchronousFileChannelRead CompletionHandler completionHandler =
new AsynchronousFileChannelRead CompletionHandler ( sink , position ,
dataBufferFactory , byteBuffer ) ;
channel . read ( byteBuffer , position , channel , completionHandler ) ;
} ) ;
}
/ * *
* Write the given stream of { @link DataBuffer } s to the given { @code OutputStream } . Does
* < strong > not < / strong > close the output stream when the flux is terminated , but
* < strong > does < / strong > { @linkplain # release ( DataBuffer ) release } the data buffers in the
* source .
* < p > Note that the writing process does not start until the returned { @code Mono } is subscribed
* to .
* @param source the stream of data buffers to be written
* @param outputStream the output stream to write to
* @return a mono that starts the writing process when subscribed to , and that indicates the
* completion of the process
* /
public static Mono < Void > write ( Publisher < DataBuffer > source ,
OutputStream outputStream ) {
Assert . notNull ( source , "'source' must not be null" ) ;
Assert . notNull ( outputStream , "'outputStream' must not be null" ) ;
WritableByteChannel channel = Channels . newChannel ( outputStream ) ;
return write ( source , channel ) ;
}
/ * *
* Write the given stream of { @link DataBuffer } s to the given { @code WritableByteChannel } . Does
* < strong > not < / strong > close the channel when the flux is terminated , but
* < strong > does < / strong > { @linkplain # release ( DataBuffer ) release } the data buffers in the
* source .
* < p > Note that the writing process does not start until the returned { @code Mono } is subscribed
* to .
* @param source the stream of data buffers to be written
* @param channel the channel to write to
* @return a mono that starts the writing process when subscribed to , and that indicates the
* completion of the process
* /
public static Mono < Void > write ( Publisher < DataBuffer > source ,
WritableByteChannel channel ) {
Assert . notNull ( source , "'source' must not be null" ) ;
Assert . notNull ( channel , "'channel' must not be null" ) ;
Flux < DataBuffer > flux = Flux . from ( source ) ;
return Mono . create ( sink - >
flux . subscribe ( dataBuffer - > {
try {
ByteBuffer byteBuffer = dataBuffer . asByteBuffer ( ) ;
while ( byteBuffer . hasRemaining ( ) ) {
channel . write ( byteBuffer ) ;
}
release ( dataBuffer ) ;
}
catch ( IOException ex ) {
sink . error ( ex ) ;
}
} ,
sink : : error ,
sink : : success ) ) ;
}
/ * *
* Write the given stream of { @link DataBuffer } s to the given { @code AsynchronousFileChannel } .
* Does < strong > not < / strong > close the channel when the flux is terminated , but
* < strong > does < / strong > { @linkplain # release ( DataBuffer ) release } the data buffers in the
* source .
* < p > Note that the writing process does not start until the returned { @code Mono } is subscribed
* to .
* @param source the stream of data buffers to be written
* @param channel the channel to write to
* @return a mono that starts the writing process when subscribed to , and that indicates the
* completion of the process
* /
public static Mono < Void > write ( Publisher < DataBuffer > source , AsynchronousFileChannel channel ,
long position ) {
Assert . notNull ( source , "'source' must not be null" ) ;
Assert . notNull ( channel , "'channel' must not be null" ) ;
Assert . isTrue ( position > = 0 , "'position' must be >= 0" ) ;
Flux < DataBuffer > flux = Flux . from ( source ) ;
return Mono . create ( sink - > {
BaseSubscriber < DataBuffer > subscriber =
new AsynchronousFileChannelWriteCompletionHandler ( sink , channel , position ) ;
flux . subscribe ( subscriber ) ;
} ) ;
}
private static void closeChannel ( @Nullable Channel channel ) {
try {
if ( channel ! = null ) {
@ -272,10 +366,10 @@ public abstract class DataBufferUtils {
@@ -272,10 +366,10 @@ public abstract class DataBufferUtils {
}
}
private static class AsynchronousFileChannelCompletionHandler
private static class AsynchronousFileChannelRead CompletionHandler
implements CompletionHandler < Integer , AsynchronousFileChannel > {
private final FluxSink < DataBuffer > emitter ;
private final FluxSink < DataBuffer > sink ;
private final ByteBuffer byteBuffer ;
@ -283,9 +377,9 @@ public abstract class DataBufferUtils {
@@ -283,9 +377,9 @@ public abstract class DataBufferUtils {
private long position ;
private AsynchronousFileChannelCompletionHandler ( FluxSink < DataBuffer > emitter ,
private AsynchronousFileChannelRead CompletionHandler ( FluxSink < DataBuffer > sink ,
long position , DataBufferFactory dataBufferFactory , ByteBuffer byteBuffer ) {
this . emitter = emitter ;
this . sink = sink ;
this . position = position ;
this . dataBufferFactory = dataBufferFactory ;
this . byteBuffer = byteBuffer ;
@ -301,7 +395,7 @@ public abstract class DataBufferUtils {
@@ -301,7 +395,7 @@ public abstract class DataBufferUtils {
try {
dataBuffer . write ( this . byteBuffer ) ;
release = false ;
this . emitter . next ( dataBuffer ) ;
this . sink . next ( dataBuffer ) ;
}
finally {
if ( release ) {
@ -310,20 +404,82 @@ public abstract class DataBufferUtils {
@@ -310,20 +404,82 @@ public abstract class DataBufferUtils {
}
this . byteBuffer . clear ( ) ;
if ( ! this . emitter . isCancelled ( ) ) {
if ( ! this . sink . isCancelled ( ) ) {
channel . read ( this . byteBuffer , this . position , channel , this ) ;
}
}
else {
this . emitter . complete ( ) ;
this . sink . complete ( ) ;
closeChannel ( channel ) ;
}
}
@Override
public void failed ( Throwable exc , AsynchronousFileChannel channel ) {
this . emitter . error ( exc ) ;
this . sink . error ( exc ) ;
closeChannel ( channel ) ;
}
}
private static class AsynchronousFileChannelWriteCompletionHandler
extends BaseSubscriber < DataBuffer >
implements CompletionHandler < Integer , ByteBuffer > {
private final MonoSink < Void > sink ;
private final AsynchronousFileChannel channel ;
private long position ;
@Nullable
private DataBuffer dataBuffer ;
public AsynchronousFileChannelWriteCompletionHandler (
MonoSink < Void > sink , AsynchronousFileChannel channel , long position ) {
this . sink = sink ;
this . channel = channel ;
this . position = position ;
}
@Override
protected void hookOnSubscribe ( Subscription subscription ) {
request ( 1 ) ;
}
@Override
protected void hookOnNext ( DataBuffer value ) {
this . dataBuffer = value ;
ByteBuffer byteBuffer = value . asByteBuffer ( ) ;
this . channel . write ( byteBuffer , this . position , byteBuffer , this ) ;
}
@Override
protected void hookOnError ( Throwable throwable ) {
this . sink . error ( throwable ) ;
}
@Override
protected void hookOnComplete ( ) {
this . sink . success ( ) ;
}
@Override
public void completed ( Integer written , ByteBuffer byteBuffer ) {
this . position + = written ;
if ( byteBuffer . hasRemaining ( ) ) {
this . channel . write ( byteBuffer , this . position , byteBuffer , this ) ;
}
else {
release ( this . dataBuffer ) ;
request ( 1 ) ;
}
}
@Override
public void failed ( Throwable exc , ByteBuffer byteBuffer ) {
this . sink . error ( exc ) ;
}
}
}