diff --git a/spring-web/src/main/java/org/springframework/web/service/invoker/HttpServiceMethod.java b/spring-web/src/main/java/org/springframework/web/service/invoker/HttpServiceMethod.java index 7deab36fd1f..b7bd5f5a106 100644 --- a/spring-web/src/main/java/org/springframework/web/service/invoker/HttpServiceMethod.java +++ b/spring-web/src/main/java/org/springframework/web/service/invoker/HttpServiceMethod.java @@ -442,6 +442,8 @@ final class HttpServiceMethod { @Nullable ReactiveAdapter returnTypeAdapter, boolean blockForOptional, @Nullable Duration blockTimeout) implements ResponseFunction { + private static final String COROUTINES_FLOW_CLASS_NAME = "kotlinx.coroutines.flow.Flow"; + @Override @Nullable public Object execute(HttpRequestValues requestValues) { @@ -472,14 +474,16 @@ final class HttpServiceMethod { MethodParameter returnParam = new MethodParameter(method, -1); Class returnType = returnParam.getParameterType(); boolean isSuspending = KotlinDetector.isSuspendingFunction(method); + boolean hasFlowReturnType = COROUTINES_FLOW_CLASS_NAME.equals(returnType.getName()); + boolean isUnwrapped = isSuspending && !hasFlowReturnType; if (isSuspending) { - returnType = Mono.class; + returnType = (hasFlowReturnType ? Flux.class : Mono.class); } ReactiveAdapter reactiveAdapter = client.getReactiveAdapterRegistry().getAdapter(returnType); MethodParameter actualParam = (reactiveAdapter != null ? returnParam.nested() : returnParam.nestedIfOptional()); - Class actualType = isSuspending ? actualParam.getParameterType() : actualParam.getNestedParameterType(); + Class actualType = isUnwrapped ? actualParam.getParameterType() : actualParam.getNestedParameterType(); Function> responseFunction; if (ClassUtils.isVoidType(actualType)) { @@ -492,18 +496,18 @@ final class HttpServiceMethod { responseFunction = client::exchangeForHeadersMono; } else if (actualType.equals(ResponseEntity.class)) { - MethodParameter bodyParam = isSuspending ? actualParam : actualParam.nested(); + MethodParameter bodyParam = isUnwrapped ? actualParam : actualParam.nested(); Class bodyType = bodyParam.getNestedParameterType(); if (bodyType.equals(Void.class)) { responseFunction = client::exchangeForBodilessEntityMono; } else { ReactiveAdapter bodyAdapter = client.getReactiveAdapterRegistry().getAdapter(bodyType); - responseFunction = initResponseEntityFunction(client, bodyParam, bodyAdapter, isSuspending); + responseFunction = initResponseEntityFunction(client, bodyParam, bodyAdapter, isUnwrapped); } } else { - responseFunction = initBodyFunction(client, actualParam, reactiveAdapter, isSuspending); + responseFunction = initBodyFunction(client, actualParam, reactiveAdapter, isUnwrapped); } return new ReactorExchangeResponseFunction( @@ -513,7 +517,7 @@ final class HttpServiceMethod { @SuppressWarnings("ConstantConditions") private static Function> initResponseEntityFunction( ReactorHttpExchangeAdapter client, MethodParameter methodParam, - @Nullable ReactiveAdapter reactiveAdapter, boolean isSuspending) { + @Nullable ReactiveAdapter reactiveAdapter, boolean isUnwrapped) { if (reactiveAdapter == null) { return request -> client.exchangeForEntityMono( @@ -524,7 +528,7 @@ final class HttpServiceMethod { "ResponseEntity body must be a concrete value or a multi-value Publisher"); ParameterizedTypeReference bodyType = - ParameterizedTypeReference.forType(isSuspending ? methodParam.nested().getGenericParameterType() : + ParameterizedTypeReference.forType(isUnwrapped ? methodParam.nested().getGenericParameterType() : methodParam.nested().getNestedGenericParameterType()); // Shortcut for Flux diff --git a/spring-web/src/test/kotlin/org/springframework/web/service/invoker/HttpServiceMethodKotlinTests.kt b/spring-web/src/test/kotlin/org/springframework/web/service/invoker/HttpServiceMethodKotlinTests.kt index ae84239f97d..c5a62bd593a 100644 --- a/spring-web/src/test/kotlin/org/springframework/web/service/invoker/HttpServiceMethodKotlinTests.kt +++ b/spring-web/src/test/kotlin/org/springframework/web/service/invoker/HttpServiceMethodKotlinTests.kt @@ -56,6 +56,10 @@ class KotlinHttpServiceMethodTests { assertThat(flowBody.toList()).containsExactly("exchange", "For", "Body", "Flux") verifyClientInvocation("exchangeForBodyFlux", object : ParameterizedTypeReference() {}) + val suspendingFlowBody = service.suspendingFlowBody() + assertThat(suspendingFlowBody.toList()).containsExactly("exchange", "For", "Body", "Flux") + verifyClientInvocation("exchangeForBodyFlux", object : ParameterizedTypeReference() {}) + val stringEntity = service.stringEntity() assertThat(stringEntity).isEqualTo(ResponseEntity.ok("exchangeForEntityMono")) verifyClientInvocation("exchangeForEntityMono", object : ParameterizedTypeReference() {}) @@ -127,6 +131,9 @@ class KotlinHttpServiceMethodTests { @GetExchange suspend fun listBody(): MutableList + @GetExchange + suspend fun suspendingFlowBody(): Flow + @GetExchange suspend fun stringEntity(): ResponseEntity