@ -22,6 +22,7 @@ import kotlinx.coroutines.flow.Flow
@@ -22,6 +22,7 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactive.flow.asFlow
import org.reactivestreams.Publisher
import org.springframework.core.ParameterizedTypeReference
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
@ -55,18 +56,45 @@ suspend fun RSocketRequester.Builder.connectWebSocketAndAwait(uri: URI): RSocket
@@ -55,18 +56,45 @@ suspend fun RSocketRequester.Builder.connectWebSocketAndAwait(uri: URI): RSocket
connectWebSocket ( uri ) . awaitSingle ( )
/ * *
* Extension for [ RSocketRequester . RequestSpec . data ] providing a ` data < Foo > ( producer ) `
* Extension for [ RSocketRequester . RequestSpec . data ] providing a ` dataWithType < Foo > ( Any ) `
* variant leveraging Kotlin reified type parameters . This extension is not subject to type
* erasure and retains actual generic type arguments .
*
* @param producer the source of payload data value ( s ) . This must be a
* [ Publisher ] or another producer adaptable to a
* [ Publisher ] via [ org . springframework . core . ReactiveAdapterRegistry ]
* @param < T > the type of values to be produced
* @author Sebastien Deleuze
* @since 5.2
* /
@Suppress ( " EXTENSION_SHADOWED_BY_MEMBER " )
@FlowPreview
inline fun < reified T : Any > RSocketRequester . RequestSpec . data ( producer : Any ) : RSocketRequester . ResponseSpec =
inline fun < reified T : Any > RSocketRequester . RequestSpec . dataWithType ( producer : Any ) : RSocketRequester . ResponseSpec =
data ( producer , object : ParameterizedTypeReference < T > ( ) { } )
/ * *
* Extension for [ RSocketRequester . RequestSpec . data ] providing a ` dataWithType ( Publisher < T > ) `
* variant leveraging Kotlin reified type parameters . This extension is not subject to type
* erasure and retains actual generic type arguments .
* @param publisher the source of payload data value ( s )
* @param < T > the type of values to be produced
* @author Sebastien Deleuze
* @since 5.2
* /
inline fun < reified T : Any > RSocketRequester . RequestSpec . dataWithType ( publisher : Publisher < T > ) : RSocketRequester . ResponseSpec =
data ( publisher , object : ParameterizedTypeReference < T > ( ) { } )
/ * *
* Extension for [ RSocketRequester . RequestSpec . data ] providing a ` dataWithType ( Flow < T > ) `
* variant leveraging Kotlin reified type parameters . This extension is not subject to type
* erasure and retains actual generic type arguments .
* @param flow the [ Flow ] to write to the request
* @param < T > the source of payload data value ( s )
* @author Sebastien Deleuze
* @since 5.2
* /
@FlowPreview
inline fun < reified T : Any > RSocketRequester . RequestSpec . dataWithType ( flow : Flow < T > ) : RSocketRequester . ResponseSpec =
data ( flow , object : ParameterizedTypeReference < T > ( ) { } )
/ * *
* Coroutines variant of [ RSocketRequester . ResponseSpec . send ] .
*