diff --git a/spring-messaging/src/main/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensions.kt b/spring-messaging/src/main/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensions.kt index 60a10fe8004..db5b9ad48a0 100644 --- a/spring-messaging/src/main/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensions.kt +++ b/spring-messaging/src/main/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensions.kt @@ -22,6 +22,7 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.reactive.awaitFirstOrNull import kotlinx.coroutines.reactive.awaitSingle import kotlinx.coroutines.reactive.flow.asFlow +import org.reactivestreams.Publisher import org.springframework.core.ParameterizedTypeReference import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -55,18 +56,45 @@ suspend fun RSocketRequester.Builder.connectWebSocketAndAwait(uri: URI): RSocket connectWebSocket(uri).awaitSingle() /** - * Extension for [RSocketRequester.RequestSpec.data] providing a `data(producer)` + * Extension for [RSocketRequester.RequestSpec.data] providing a `dataWithType(Any)` * variant leveraging Kotlin reified type parameters. This extension is not subject to type * erasure and retains actual generic type arguments. - * + * @param producer the source of payload data value(s). This must be a + * [Publisher] or another producer adaptable to a + * [Publisher] via [org.springframework.core.ReactiveAdapterRegistry] + * @param the type of values to be produced * @author Sebastien Deleuze * @since 5.2 */ -@Suppress("EXTENSION_SHADOWED_BY_MEMBER") -@FlowPreview -inline fun RSocketRequester.RequestSpec.data(producer: Any): RSocketRequester.ResponseSpec = +inline fun RSocketRequester.RequestSpec.dataWithType(producer: Any): RSocketRequester.ResponseSpec = data(producer, object : ParameterizedTypeReference() {}) +/** + * Extension for [RSocketRequester.RequestSpec.data] providing a `dataWithType(Publisher)` + * variant leveraging Kotlin reified type parameters. This extension is not subject to type + * erasure and retains actual generic type arguments. + * @param publisher the source of payload data value(s) + * @param the type of values to be produced + * @author Sebastien Deleuze + * @since 5.2 + */ +inline fun RSocketRequester.RequestSpec.dataWithType(publisher: Publisher): RSocketRequester.ResponseSpec = + data(publisher, object : ParameterizedTypeReference() {}) + +/** + * Extension for [RSocketRequester.RequestSpec.data] providing a `dataWithType(Flow)` + * variant leveraging Kotlin reified type parameters. This extension is not subject to type + * erasure and retains actual generic type arguments. + * @param flow the [Flow] to write to the request + * @param the source of payload data value(s) + * @author Sebastien Deleuze + * @since 5.2 + */ +@FlowPreview +inline fun RSocketRequester.RequestSpec.dataWithType(flow: Flow): RSocketRequester.ResponseSpec = + data(flow, object : ParameterizedTypeReference() {}) + + /** * Coroutines variant of [RSocketRequester.ResponseSpec.send]. * diff --git a/spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensionsTests.kt b/spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensionsTests.kt index 9c4297b4b5e..46f870e3301 100644 --- a/spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensionsTests.kt +++ b/spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensionsTests.kt @@ -3,6 +3,7 @@ package org.springframework.messaging.rsocket import io.mockk.every import io.mockk.mockk import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.toList import kotlinx.coroutines.runBlocking import org.junit.Assert.assertEquals @@ -12,6 +13,7 @@ import org.reactivestreams.Publisher import org.springframework.core.ParameterizedTypeReference import reactor.core.publisher.Flux import reactor.core.publisher.Mono +import java.util.concurrent.CompletableFuture /** * Mock object based tests for [RSocketRequester] Kotlin extensions @@ -55,11 +57,30 @@ class RSocketRequesterExtensionsTests { } @Test - fun dataFlowWithType() { + fun `dataWithType with Publisher`() { val requestSpec = mockk() val responseSpec = mockk() + val data = mockk>() every { requestSpec.data(any>(), match>(stringTypeRefMatcher)) } returns responseSpec - assertEquals(responseSpec, requestSpec.data(mockk())) + assertEquals(responseSpec, requestSpec.dataWithType(data)) + } + + @Test + fun `dataWithType with Flow`() { + val requestSpec = mockk() + val responseSpec = mockk() + val data = mockk>() + every { requestSpec.data(any>(), match>(stringTypeRefMatcher)) } returns responseSpec + assertEquals(responseSpec, requestSpec.dataWithType(data)) + } + + @Test + fun `dataWithType with CompletableFuture`() { + val requestSpec = mockk() + val responseSpec = mockk() + val data = mockk>() + every { requestSpec.data(any>(), match>(stringTypeRefMatcher)) } returns responseSpec + assertEquals(responseSpec, requestSpec.dataWithType(data)) } @Test