Browse Source

Polish suspending functions support in RSocketServiceMethod

Closes gh-35473
pull/35718/head
Sébastien Deleuze 2 months ago
parent
commit
9f9f0a7014
  1. 12
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/service/RSocketServiceMethod.java
  2. 25
      spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/service/RSocketServiceMethodKotlinTests.kt

12
spring-messaging/src/main/java/org/springframework/messaging/rsocket/service/RSocketServiceMethod.java

@ -137,13 +137,11 @@ final class RSocketServiceMethod { @@ -137,13 +137,11 @@ final class RSocketServiceMethod {
MethodParameter returnParam = new MethodParameter(method, -1);
Class<?> returnType = returnParam.getParameterType();
boolean isFlowReturnType = COROUTINES_FLOW_CLASS_NAME.equals(returnType.getName());
boolean isUnwrapped = KotlinDetector.isSuspendingFunction(method) && !isFlowReturnType;
if (isUnwrapped) {
returnType = Mono.class;
}
else if (isFlowReturnType) {
returnType = Flux.class;
boolean isSuspending = KotlinDetector.isSuspendingFunction(method);
boolean hasFlowReturnType = COROUTINES_FLOW_CLASS_NAME.equals(returnType.getName());
boolean isUnwrapped = isSuspending && !hasFlowReturnType;
if (isSuspending) {
returnType = (hasFlowReturnType ? Flux.class : Mono.class);
}
ReactiveAdapter reactiveAdapter = reactiveRegistry.getAdapter(returnType);

25
spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/service/RSocketServiceMethodKotlinTests.kt

@ -37,6 +37,7 @@ import reactor.core.publisher.Mono @@ -37,6 +37,7 @@ import reactor.core.publisher.Mono
* Kotlin tests for [RSocketServiceMethod].
*
* @author Dmitry Sulman
* @author Sebastien Deleuze
*/
class RSocketServiceMethodKotlinTests {
@ -95,6 +96,23 @@ class RSocketServiceMethodKotlinTests { @@ -95,6 +96,23 @@ class RSocketServiceMethodKotlinTests {
assertThat(rsocket.savedPayload?.dataUtf8).isEqualTo(requestPayload)
}
@Test
fun nonSuspendingRequestStream(): Unit = runBlocking {
val service = proxyFactory.createClient(NonSuspendingFunctionsService::class.java)
val requestPayload = "request"
val responsePayload1 = "response1"
val responsePayload2 = "response2"
rsocket.setPayloadFluxToReturn(
Flux.just(DefaultPayload.create(responsePayload1), DefaultPayload.create(responsePayload2)))
val response = service.requestStream(requestPayload).toList()
assertThat(response).containsExactly(responsePayload1, responsePayload2)
assertThat(rsocket.savedMethodName).isEqualTo("requestStream")
assertThat(rsocket.savedPayload?.metadataUtf8).isEqualTo("rs")
assertThat(rsocket.savedPayload?.dataUtf8).isEqualTo(requestPayload)
}
@Test
fun requestChannel(): Unit = runBlocking {
val service = proxyFactory.createClient(SuspendingFunctionsService::class.java)
@ -131,4 +149,11 @@ class RSocketServiceMethodKotlinTests { @@ -131,4 +149,11 @@ class RSocketServiceMethodKotlinTests {
@RSocketExchange("rc")
suspend fun requestChannel(input: Flow<String>): Flow<String>
}
private interface NonSuspendingFunctionsService {
@RSocketExchange("rs")
fun requestStream(input: String): Flow<String>
}
}
Loading…
Cancel
Save