diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/service/RSocketServiceMethod.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/service/RSocketServiceMethod.java index 82f9c8bccd5..8b570b0703d 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/service/RSocketServiceMethod.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/service/RSocketServiceMethod.java @@ -137,13 +137,11 @@ final class RSocketServiceMethod { MethodParameter returnParam = new MethodParameter(method, -1); Class returnType = returnParam.getParameterType(); - boolean isFlowReturnType = COROUTINES_FLOW_CLASS_NAME.equals(returnType.getName()); - boolean isUnwrapped = KotlinDetector.isSuspendingFunction(method) && !isFlowReturnType; - if (isUnwrapped) { - returnType = Mono.class; - } - else if (isFlowReturnType) { - returnType = Flux.class; + boolean isSuspending = KotlinDetector.isSuspendingFunction(method); + boolean hasFlowReturnType = COROUTINES_FLOW_CLASS_NAME.equals(returnType.getName()); + boolean isUnwrapped = isSuspending && !hasFlowReturnType; + if (isSuspending) { + returnType = (hasFlowReturnType ? Flux.class : Mono.class); } ReactiveAdapter reactiveAdapter = reactiveRegistry.getAdapter(returnType); diff --git a/spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/service/RSocketServiceMethodKotlinTests.kt b/spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/service/RSocketServiceMethodKotlinTests.kt index 2764cea7f7a..075906b3fff 100644 --- a/spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/service/RSocketServiceMethodKotlinTests.kt +++ b/spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/service/RSocketServiceMethodKotlinTests.kt @@ -37,6 +37,7 @@ import reactor.core.publisher.Mono * Kotlin tests for [RSocketServiceMethod]. * * @author Dmitry Sulman + * @author Sebastien Deleuze */ class RSocketServiceMethodKotlinTests { @@ -95,6 +96,23 @@ class RSocketServiceMethodKotlinTests { assertThat(rsocket.savedPayload?.dataUtf8).isEqualTo(requestPayload) } + @Test + fun nonSuspendingRequestStream(): Unit = runBlocking { + val service = proxyFactory.createClient(NonSuspendingFunctionsService::class.java) + + val requestPayload = "request" + val responsePayload1 = "response1" + val responsePayload2 = "response2" + rsocket.setPayloadFluxToReturn( + Flux.just(DefaultPayload.create(responsePayload1), DefaultPayload.create(responsePayload2))) + val response = service.requestStream(requestPayload).toList() + + assertThat(response).containsExactly(responsePayload1, responsePayload2) + assertThat(rsocket.savedMethodName).isEqualTo("requestStream") + assertThat(rsocket.savedPayload?.metadataUtf8).isEqualTo("rs") + assertThat(rsocket.savedPayload?.dataUtf8).isEqualTo(requestPayload) + } + @Test fun requestChannel(): Unit = runBlocking { val service = proxyFactory.createClient(SuspendingFunctionsService::class.java) @@ -131,4 +149,11 @@ class RSocketServiceMethodKotlinTests { @RSocketExchange("rc") suspend fun requestChannel(input: Flow): Flow } + + private interface NonSuspendingFunctionsService { + + @RSocketExchange("rs") + fun requestStream(input: String): Flow + } + } \ No newline at end of file