From ec77bb00329b167faa04eda646961b0ddf90b70e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Deleuze?= Date: Mon, 13 Oct 2025 16:14:18 +0200 Subject: [PATCH] Introduce automatic context propagation in Coroutines Closes gh-35485 --- .../springframework/core/CoroutinesUtils.java | 9 ++++-- .../core/CoroutinesUtilsTests.kt | 30 +++++++++++++++++++ 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/core/CoroutinesUtils.java b/spring-core/src/main/java/org/springframework/core/CoroutinesUtils.java index 92dcefc890f..2d1e509b52d 100644 --- a/spring-core/src/main/java/org/springframework/core/CoroutinesUtils.java +++ b/spring-core/src/main/java/org/springframework/core/CoroutinesUtils.java @@ -45,6 +45,7 @@ import kotlinx.coroutines.reactor.ReactorFlowKt; import org.jspecify.annotations.Nullable; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; +import reactor.core.publisher.Hooks; import reactor.core.publisher.Mono; import org.springframework.util.Assert; @@ -85,7 +86,9 @@ public abstract class CoroutinesUtils { /** * Invoke a suspending function and convert it to {@link Mono} or {@link Flux}. - * Uses an {@linkplain Dispatchers#getUnconfined() unconfined} dispatcher. + * Uses an {@linkplain Dispatchers#getUnconfined() unconfined} dispatcher, augmented with + * {@link PropagationContextElement} if + * {@linkplain Hooks#isAutomaticContextPropagationEnabled() Reactor automatic context propagation} is enabled. * @param method the suspending function to invoke * @param target the target to invoke {@code method} on * @param args the function arguments. If the {@code Continuation} argument is specified as the last argument @@ -94,7 +97,9 @@ public abstract class CoroutinesUtils { * @throws IllegalArgumentException if {@code method} is not a suspending function */ public static Publisher invokeSuspendingFunction(Method method, Object target, @Nullable Object... args) { - return invokeSuspendingFunction(Dispatchers.getUnconfined(), method, target, args); + CoroutineContext context = Hooks.isAutomaticContextPropagationEnabled() ? + Dispatchers.getUnconfined().plus(new PropagationContextElement()) : Dispatchers.getUnconfined(); + return invokeSuspendingFunction(context, method, target, args); } /** diff --git a/spring-core/src/test/kotlin/org/springframework/core/CoroutinesUtilsTests.kt b/spring-core/src/test/kotlin/org/springframework/core/CoroutinesUtilsTests.kt index 8547e553c46..dccb13d0e83 100644 --- a/spring-core/src/test/kotlin/org/springframework/core/CoroutinesUtilsTests.kt +++ b/spring-core/src/test/kotlin/org/springframework/core/CoroutinesUtilsTests.kt @@ -16,20 +16,26 @@ package org.springframework.core +import io.micrometer.observation.Observation +import io.micrometer.observation.tck.TestObservationRegistry import kotlinx.coroutines.* import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.reactor.awaitSingle import kotlinx.coroutines.reactor.awaitSingleOrNull import org.assertj.core.api.Assertions +import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test +import org.reactivestreams.Publisher import reactor.core.publisher.Flux +import reactor.core.publisher.Hooks import reactor.core.publisher.Mono import reactor.test.StepVerifier import kotlin.coroutines.Continuation import kotlin.coroutines.coroutineContext import kotlin.reflect.full.primaryConstructor import kotlin.reflect.jvm.isAccessible +import kotlin.reflect.jvm.javaMethod /** * Kotlin tests for [CoroutinesUtils]. @@ -38,6 +44,8 @@ import kotlin.reflect.jvm.isAccessible */ class CoroutinesUtilsTests { + private val observationRegistry = TestObservationRegistry.create() + @Test fun deferredToMono() { runBlocking { @@ -285,6 +293,21 @@ class CoroutinesUtilsTests { } } + @Test + @Suppress("UNCHECKED_CAST") + fun invokeSuspendingFunctionWithObservation() { + Hooks.enableAutomaticContextPropagation() + val method = CoroutinesUtilsTests::suspendingObservationFunction::javaMethod.get()!! + val publisher = CoroutinesUtils.invokeSuspendingFunction(method, this, "test", null) as Publisher + val observation = Observation.createNotStarted("coroutine", observationRegistry) + observation.observe { + val mono = Mono.from(publisher) + val result = mono.block() + assertThat(result).isEqualTo("coroutine") + } + Hooks.disableAutomaticContextPropagation() + } + suspend fun suspendingFunction(value: String): String { delay(1) return value @@ -417,4 +440,11 @@ class CoroutinesUtilsTests { class CustomException(message: String) : Throwable(message) + suspend fun suspendingObservationFunction(value: String): String? { + delay(1) + val currentObservation = observationRegistry.currentObservation + assertThat(currentObservation).isNotNull + return currentObservation?.context?.name + } + }