@ -24,6 +24,7 @@ import java.nio.channels.AsynchronousFileChannel;
@@ -24,6 +24,7 @@ import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler ;
import java.nio.channels.FileChannel ;
import java.nio.channels.ReadableByteChannel ;
import java.nio.channels.SeekableByteChannel ;
import java.nio.channels.WritableByteChannel ;
import java.nio.charset.StandardCharsets ;
import java.nio.file.Files ;
@ -43,6 +44,7 @@ import reactor.core.publisher.BaseSubscriber;
@@ -43,6 +44,7 @@ import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux ;
import reactor.core.publisher.Mono ;
import reactor.test.StepVerifier ;
import reactor.util.context.Context ;
import org.springframework.core.io.ByteArrayResource ;
import org.springframework.core.io.ClassPathResource ;
@ -940,6 +942,53 @@ class DataBufferUtilsTests extends AbstractDataBufferAllocatingTests {
@@ -940,6 +942,53 @@ class DataBufferUtilsTests extends AbstractDataBufferAllocatingTests {
release ( foo ) ;
}
@ParameterizedDataBufferAllocatingTest
void propagateContextByteChannel ( String displayName , DataBufferFactory bufferFactory ) throws IOException {
Path path = Paths . get ( this . resource . getURI ( ) ) ;
try ( SeekableByteChannel out = Files . newByteChannel ( this . tempFile , StandardOpenOption . WRITE , StandardOpenOption . TRUNCATE_EXISTING ) ) {
Flux < DataBuffer > result = DataBufferUtils . read ( path , bufferFactory , 1024 , StandardOpenOption . READ )
. transformDeferredContextual ( ( f , ctx ) - > {
assertThat ( ctx . getOrDefault ( "key" , "EMPTY" ) ) . isEqualTo ( "TEST" ) ;
return f ;
} )
. transform ( f - > DataBufferUtils . write ( f , out ) )
. transformDeferredContextual ( ( f , ctx ) - > {
assertThat ( ctx . getOrDefault ( "key" , "EMPTY" ) ) . isEqualTo ( "TEST" ) ;
return f ;
} )
. contextWrite ( Context . of ( "key" , "TEST" ) ) ;
StepVerifier . create ( result )
. consumeNextWith ( DataBufferUtils : : release )
. verifyComplete ( ) ;
}
}
@ParameterizedDataBufferAllocatingTest
void propagateContextAsynchronousFileChannel ( String displayName , DataBufferFactory bufferFactory ) throws IOException {
Path path = Paths . get ( this . resource . getURI ( ) ) ;
try ( AsynchronousFileChannel out = AsynchronousFileChannel . open ( this . tempFile , StandardOpenOption . WRITE , StandardOpenOption . TRUNCATE_EXISTING ) ) {
Flux < DataBuffer > result = DataBufferUtils . read ( path , bufferFactory , 1024 , StandardOpenOption . READ )
. transformDeferredContextual ( ( f , ctx ) - > {
assertThat ( ctx . getOrDefault ( "key" , "EMPTY" ) ) . isEqualTo ( "TEST" ) ;
return f ;
} )
. transform ( f - > DataBufferUtils . write ( f , out ) )
. transformDeferredContextual ( ( f , ctx ) - > {
assertThat ( ctx . getOrDefault ( "key" , "EMPTY" ) ) . isEqualTo ( "TEST" ) ;
return f ;
} )
. contextWrite ( Context . of ( "key" , "TEST" ) ) ;
StepVerifier . create ( result )
. consumeNextWith ( DataBufferUtils : : release )
. verifyComplete ( ) ;
}
}
private static class ZeroDemandSubscriber extends BaseSubscriber < DataBuffer > {