From 2faed3cdbb206bb5a92b3bd64bb18b2ad3501900 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Deleuze?= Date: Fri, 12 Sep 2025 10:43:46 +0200 Subject: [PATCH] Refine PropagationContextElement This commit apply several refinements to PropagationContextElement: - Capture the ThreadLocal when instantiating the PropagationContextElement in order to support dispatchers switching threads - Remove the constructor parameter which is not idiomatic and breaks the support when switching threads, and use instead the updateThreadContext(context: CoroutineContext) parameter - Make the kotlinx-coroutines-reactor dependency optional - Make the properties private The Javadoc and tests are also updated to use the `Dispatchers.IO + PropagationContextElement()` pattern performed outside of the suspending lambda, which is the typical use case. Closes gh-35469 --- .../pages/languages/kotlin/coroutines.adoc | 25 ++----- .../propagation/ContextPropagationSample.kt | 16 ++-- .../core/PropagationContextElement.kt | 73 ++++++++++++------- .../core/PropagationContextElementTests.kt | 43 ++++------- 4 files changed, 81 insertions(+), 76 deletions(-) diff --git a/framework-docs/modules/ROOT/pages/languages/kotlin/coroutines.adoc b/framework-docs/modules/ROOT/pages/languages/kotlin/coroutines.adoc index 5b595197308..57d2342c447 100644 --- a/framework-docs/modules/ROOT/pages/languages/kotlin/coroutines.adoc +++ b/framework-docs/modules/ROOT/pages/languages/kotlin/coroutines.adoc @@ -257,29 +257,20 @@ For Kotlin `Flow`, a `Flow.transactional` extension is provided. Spring applications are xref:integration/observability.adoc[instrumented with Micrometer for Observability support]. For tracing support, the current observation is propagated through a `ThreadLocal` for blocking code, or the Reactor `Context` for reactive pipelines. But the current observation also needs to be made available -in the execution context of a suspended function. Without that, the current "traceId" will not be automatically prepended -to logged statements from coroutines. +in the execution context of a suspended function. Without that, the current "traceId" will not be automatically +prepended to logged statements from coroutines. -The `org.springframework.core.PropagationContextElement` operator generally ensures that the +The {spring-framework-api-kdoc}/spring-core/org.springframework.core/-propagation-context-element/index.html[`PropagationContextElement`] operator generally ensures that the {micrometer-context-propagation-docs}/[Micrometer Context Propagation library] works with Kotlin Coroutines. -The `PropagationContextElement` requires the following dependencies: +It requires the `io.micrometer:context-propagation` dependency and optionally the +`org.jetbrains.kotlinx:kotlinx-coroutines-reactor` one. -`build.gradle.kts` -[source,kotlin,indent=0] ----- -dependencies { - implementation("io.micrometer:context-propagation:${contextPropagationVersion}") - implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}") - implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}") -} ----- - -Applications can then use the `PropagationContextElement` operator to connect the `currentCoroutineContext()` +Applications can then use the `PropagationContextElement` to augment the `CoroutineContext` with the context propagation mechanism: include-code::./ContextPropagationSample[tag=context,indent=0] -Here, assuming that Micrometer Tracing is configured, the resulting logging statement -will show the current "traceId" and unlock better observability for your application. +Here, assuming that Micrometer Tracing is configured, the resulting logging statement will show the current "traceId" +and unlock better observability for your application. diff --git a/framework-docs/src/main/kotlin/org/springframework/docs/languages/kotlin/coroutines/propagation/ContextPropagationSample.kt b/framework-docs/src/main/kotlin/org/springframework/docs/languages/kotlin/coroutines/propagation/ContextPropagationSample.kt index e8f0721511a..06e7926de27 100644 --- a/framework-docs/src/main/kotlin/org/springframework/docs/languages/kotlin/coroutines/propagation/ContextPropagationSample.kt +++ b/framework-docs/src/main/kotlin/org/springframework/docs/languages/kotlin/coroutines/propagation/ContextPropagationSample.kt @@ -16,8 +16,9 @@ package org.springframework.docs.languages.kotlin.coroutines.propagation -import kotlinx.coroutines.currentCoroutineContext -import kotlinx.coroutines.withContext +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking import org.apache.commons.logging.Log import org.apache.commons.logging.LogFactory import org.springframework.core.PropagationContextElement @@ -31,10 +32,15 @@ class ContextPropagationSample { } // tag::context[] + fun main() { + runBlocking(Dispatchers.IO + PropagationContextElement()) { + suspendingFunction() + } + } + suspend fun suspendingFunction() { - return withContext(PropagationContextElement(currentCoroutineContext())) { - logger.info("Suspending function with traceId") - } + delay(1) + logger.info("Suspending function with traceId") } // end::context[] } \ No newline at end of file diff --git a/spring-core/src/main/kotlin/org/springframework/core/PropagationContextElement.kt b/spring-core/src/main/kotlin/org/springframework/core/PropagationContextElement.kt index c55d66fd416..ad7ae35948a 100644 --- a/spring-core/src/main/kotlin/org/springframework/core/PropagationContextElement.kt +++ b/spring-core/src/main/kotlin/org/springframework/core/PropagationContextElement.kt @@ -21,55 +21,78 @@ import io.micrometer.context.ContextSnapshot import io.micrometer.context.ContextSnapshotFactory import kotlinx.coroutines.ThreadContextElement import kotlinx.coroutines.reactor.ReactorContext +import org.springframework.util.ClassUtils import reactor.util.context.ContextView import kotlin.coroutines.AbstractCoroutineContextElement import kotlin.coroutines.CoroutineContext /** - * [ThreadContextElement] that restores `ThreadLocals` from the Reactor [ContextSnapshot] - * every time the coroutine with this element in the context is resumed on a thread. + * [ThreadContextElement] that ensures that contexts registered with the + * Micrometer Context Propagation library are captured and restored when + * a coroutine is resumed on a thread. This is typically being used for + * Micrometer Tracing support in Kotlin suspended functions. * - * This effectively ensures that Kotlin Coroutines, Reactor and Micrometer Context Propagation - * work together in an application, typically for observability purposes. + * It requires the `io.micrometer:context-propagation` library. If the + * `org.jetbrains.kotlinx:kotlinx-coroutines-reactor` dependency is also + * on the classpath, this element also supports Reactor `Context`. * - * Applications need to have both `"io.micrometer:context-propagation"` and - * `"org.jetbrains.kotlinx:kotlinx-coroutines-reactor"` on the classpath to use this context element. + * `PropagationContextElement` can be used like this: * - * The `PropagationContextElement` can be used like this: - * * ```kotlin - * suspend fun suspendable() { - * withContext(PropagationContextElement(coroutineContext)) { - * logger.info("Log statement with traceId") - * } + * fun main() { + * runBlocking(Dispatchers.IO + PropagationContextElement()) { + * suspendingFunction() + * } * } + * + * suspend fun suspendingFunction() { + * delay(1) + * logger.info("Log statement with traceId") + * } * ``` * * @author Brian Clozel + * @author Sebastien Deleuze * @since 7.0 */ -class PropagationContextElement(private val context: CoroutineContext) : ThreadContextElement, +class PropagationContextElement : ThreadContextElement, AbstractCoroutineContextElement(Key) { - companion object Key : CoroutineContext.Key + companion object Key : CoroutineContext.Key { - val contextSnapshot: ContextSnapshot - get() { - val contextView: ContextView? = context[ReactorContext]?.context - val contextSnapshotFactory = - ContextSnapshotFactory.builder().contextRegistry(ContextRegistry.getInstance()).build() - if (contextView != null) { - return contextSnapshotFactory.captureFrom(contextView) - } - return contextSnapshotFactory.captureAll() - } + private val contextSnapshotFactory = + ContextSnapshotFactory.builder().contextRegistry(ContextRegistry.getInstance()).build() + + private val coroutinesReactorPresent = + ClassUtils.isPresent("kotlinx.coroutines.reactor.ReactorContext", + PropagationContextElement::class.java.classLoader); + } + + // Context captured from the the ThreadLocal where the PropagationContextElement is instantiated + private val threadLocalContextSnapshot: ContextSnapshot = contextSnapshotFactory.captureAll() override fun restoreThreadContext(context: CoroutineContext, oldState: ContextSnapshot.Scope) { oldState.close() } override fun updateThreadContext(context: CoroutineContext): ContextSnapshot.Scope { + val contextSnapshot = if (coroutinesReactorPresent) { + ReactorDelegate().captureFrom(context) ?: threadLocalContextSnapshot + } else { + threadLocalContextSnapshot + } return contextSnapshot.setThreadLocals() } -} \ No newline at end of file + + private class ReactorDelegate { + + fun captureFrom(context: CoroutineContext): ContextSnapshot? { + val contextView: ContextView? = context[ReactorContext]?.context + if (contextView != null) { + return contextSnapshotFactory.captureFrom(contextView) + } + return null; + } + } +} diff --git a/spring-core/src/test/kotlin/org/springframework/core/PropagationContextElementTests.kt b/spring-core/src/test/kotlin/org/springframework/core/PropagationContextElementTests.kt index d7a7bdf5711..54437996668 100644 --- a/spring-core/src/test/kotlin/org/springframework/core/PropagationContextElementTests.kt +++ b/spring-core/src/test/kotlin/org/springframework/core/PropagationContextElementTests.kt @@ -19,18 +19,14 @@ package org.springframework.core import io.micrometer.observation.Observation import io.micrometer.observation.tck.TestObservationRegistry import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.currentCoroutineContext +import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.withContext import org.assertj.core.api.Assertions import org.assertj.core.api.Assertions.assertThat -import org.junit.jupiter.api.AfterAll -import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.Test import org.reactivestreams.Publisher import reactor.core.publisher.Hooks import reactor.core.publisher.Mono -import reactor.core.scheduler.Schedulers import kotlin.coroutines.Continuation @@ -38,32 +34,18 @@ import kotlin.coroutines.Continuation * Kotlin tests for [PropagationContextElement]. * * @author Brian Clozel + * @author Sebastien Deleuze */ class PropagationContextElementTests { private val observationRegistry = TestObservationRegistry.create() - companion object { - - @BeforeAll - @JvmStatic - fun init() { - Hooks.enableAutomaticContextPropagation() - } - - @AfterAll - @JvmStatic - fun cleanup() { - Hooks.disableAutomaticContextPropagation() - } - - } - @Test fun restoresFromThreadLocal() { val observation = Observation.createNotStarted("coroutine", observationRegistry) observation.observe { - val result = runBlocking(Dispatchers.Unconfined) { + val coroutineContext = Dispatchers.IO + PropagationContextElement() + val result = runBlocking(coroutineContext) { suspendingFunction("test") } Assertions.assertThat(result).isEqualTo("coroutine") @@ -74,20 +56,23 @@ class PropagationContextElementTests { @Suppress("UNCHECKED_CAST") fun restoresFromReactorContext() { val method = PropagationContextElementTests::class.java.getDeclaredMethod("suspendingFunction", String::class.java, Continuation::class.java) - val publisher = CoroutinesUtils.invokeSuspendingFunction(method, this, "test", null) as Publisher + val coroutineContext = Dispatchers.IO + PropagationContextElement() + val publisher = CoroutinesUtils.invokeSuspendingFunction(coroutineContext, method, this, "test", null) as Publisher val observation = Observation.createNotStarted("coroutine", observationRegistry) + Hooks.enableAutomaticContextPropagation() observation.observe { - val result = Mono.from(publisher).publishOn(Schedulers.boundedElastic()).block() + val mono = Mono.from(publisher) + val result = mono.block() assertThat(result).isEqualTo("coroutine") } + Hooks.disableAutomaticContextPropagation() } suspend fun suspendingFunction(value: String): String? { - return withContext(PropagationContextElement(currentCoroutineContext())) { - val currentObservation = observationRegistry.currentObservation - assertThat(currentObservation).isNotNull - currentObservation?.context?.name - } + delay(1) + val currentObservation = observationRegistry.currentObservation + assertThat(currentObservation).isNotNull + return currentObservation?.context?.name } }