Browse Source

Refine PropagationContextElement

This commit apply several refinements to PropagationContextElement:
 - Capture the ThreadLocal when instantiating the
   PropagationContextElement in order to support dispatchers switching
   threads
 - Remove the constructor parameter which is not idiomatic and breaks
   the support when switching threads, and use instead the
   updateThreadContext(context: CoroutineContext) parameter
 - Make the kotlinx-coroutines-reactor dependency optional
 - Make the properties private

The Javadoc and tests are also updated to use the
`Dispatchers.IO + PropagationContextElement()` pattern performed
outside of the suspending lambda, which is the typical use case.

Closes gh-35469
pull/35474/head
Sébastien Deleuze 7 months ago
parent
commit
2faed3cdbb
  1. 25
      framework-docs/modules/ROOT/pages/languages/kotlin/coroutines.adoc
  2. 16
      framework-docs/src/main/kotlin/org/springframework/docs/languages/kotlin/coroutines/propagation/ContextPropagationSample.kt
  3. 73
      spring-core/src/main/kotlin/org/springframework/core/PropagationContextElement.kt
  4. 43
      spring-core/src/test/kotlin/org/springframework/core/PropagationContextElementTests.kt

25
framework-docs/modules/ROOT/pages/languages/kotlin/coroutines.adoc

@ -257,29 +257,20 @@ For Kotlin `Flow`, a `Flow<T>.transactional` extension is provided.
Spring applications are xref:integration/observability.adoc[instrumented with Micrometer for Observability support]. Spring applications are xref:integration/observability.adoc[instrumented with Micrometer for Observability support].
For tracing support, the current observation is propagated through a `ThreadLocal` for blocking code, For tracing support, the current observation is propagated through a `ThreadLocal` for blocking code,
or the Reactor `Context` for reactive pipelines. But the current observation also needs to be made available or the Reactor `Context` for reactive pipelines. But the current observation also needs to be made available
in the execution context of a suspended function. Without that, the current "traceId" will not be automatically prepended in the execution context of a suspended function. Without that, the current "traceId" will not be automatically
to logged statements from coroutines. prepended to logged statements from coroutines.
The `org.springframework.core.PropagationContextElement` operator generally ensures that the The {spring-framework-api-kdoc}/spring-core/org.springframework.core/-propagation-context-element/index.html[`PropagationContextElement`] operator generally ensures that the
{micrometer-context-propagation-docs}/[Micrometer Context Propagation library] works with Kotlin Coroutines. {micrometer-context-propagation-docs}/[Micrometer Context Propagation library] works with Kotlin Coroutines.
The `PropagationContextElement` requires the following dependencies: It requires the `io.micrometer:context-propagation` dependency and optionally the
`org.jetbrains.kotlinx:kotlinx-coroutines-reactor` one.
`build.gradle.kts` Applications can then use the `PropagationContextElement` to augment the `CoroutineContext`
[source,kotlin,indent=0]
----
dependencies {
implementation("io.micrometer:context-propagation:${contextPropagationVersion}")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}")
}
----
Applications can then use the `PropagationContextElement` operator to connect the `currentCoroutineContext()`
with the context propagation mechanism: with the context propagation mechanism:
include-code::./ContextPropagationSample[tag=context,indent=0] include-code::./ContextPropagationSample[tag=context,indent=0]
Here, assuming that Micrometer Tracing is configured, the resulting logging statement Here, assuming that Micrometer Tracing is configured, the resulting logging statement will show the current "traceId"
will show the current "traceId" and unlock better observability for your application. and unlock better observability for your application.

16
framework-docs/src/main/kotlin/org/springframework/docs/languages/kotlin/coroutines/propagation/ContextPropagationSample.kt

@ -16,8 +16,9 @@
package org.springframework.docs.languages.kotlin.coroutines.propagation package org.springframework.docs.languages.kotlin.coroutines.propagation
import kotlinx.coroutines.currentCoroutineContext import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import org.apache.commons.logging.Log import org.apache.commons.logging.Log
import org.apache.commons.logging.LogFactory import org.apache.commons.logging.LogFactory
import org.springframework.core.PropagationContextElement import org.springframework.core.PropagationContextElement
@ -31,10 +32,15 @@ class ContextPropagationSample {
} }
// tag::context[] // tag::context[]
fun main() {
runBlocking(Dispatchers.IO + PropagationContextElement()) {
suspendingFunction()
}
}
suspend fun suspendingFunction() { suspend fun suspendingFunction() {
return withContext(PropagationContextElement(currentCoroutineContext())) { delay(1)
logger.info("Suspending function with traceId") logger.info("Suspending function with traceId")
}
} }
// end::context[] // end::context[]
} }

73
spring-core/src/main/kotlin/org/springframework/core/PropagationContextElement.kt

@ -21,55 +21,78 @@ import io.micrometer.context.ContextSnapshot
import io.micrometer.context.ContextSnapshotFactory import io.micrometer.context.ContextSnapshotFactory
import kotlinx.coroutines.ThreadContextElement import kotlinx.coroutines.ThreadContextElement
import kotlinx.coroutines.reactor.ReactorContext import kotlinx.coroutines.reactor.ReactorContext
import org.springframework.util.ClassUtils
import reactor.util.context.ContextView import reactor.util.context.ContextView
import kotlin.coroutines.AbstractCoroutineContextElement import kotlin.coroutines.AbstractCoroutineContextElement
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
/** /**
* [ThreadContextElement] that restores `ThreadLocals` from the Reactor [ContextSnapshot] * [ThreadContextElement] that ensures that contexts registered with the
* every time the coroutine with this element in the context is resumed on a thread. * Micrometer Context Propagation library are captured and restored when
* a coroutine is resumed on a thread. This is typically being used for
* Micrometer Tracing support in Kotlin suspended functions.
* *
* This effectively ensures that Kotlin Coroutines, Reactor and Micrometer Context Propagation * It requires the `io.micrometer:context-propagation` library. If the
* work together in an application, typically for observability purposes. * `org.jetbrains.kotlinx:kotlinx-coroutines-reactor` dependency is also
* on the classpath, this element also supports Reactor `Context`.
* *
* Applications need to have both `"io.micrometer:context-propagation"` and * `PropagationContextElement` can be used like this:
* `"org.jetbrains.kotlinx:kotlinx-coroutines-reactor"` on the classpath to use this context element.
* *
* The `PropagationContextElement` can be used like this:
*
* ```kotlin * ```kotlin
* suspend fun suspendable() { * fun main() {
* withContext(PropagationContextElement(coroutineContext)) { * runBlocking(Dispatchers.IO + PropagationContextElement()) {
* logger.info("Log statement with traceId") * suspendingFunction()
* } * }
* } * }
*
* suspend fun suspendingFunction() {
* delay(1)
* logger.info("Log statement with traceId")
* }
* ``` * ```
* *
* @author Brian Clozel * @author Brian Clozel
* @author Sebastien Deleuze
* @since 7.0 * @since 7.0
*/ */
class PropagationContextElement(private val context: CoroutineContext) : ThreadContextElement<ContextSnapshot.Scope>, class PropagationContextElement : ThreadContextElement<ContextSnapshot.Scope>,
AbstractCoroutineContextElement(Key) { AbstractCoroutineContextElement(Key) {
companion object Key : CoroutineContext.Key<PropagationContextElement> companion object Key : CoroutineContext.Key<PropagationContextElement> {
val contextSnapshot: ContextSnapshot private val contextSnapshotFactory =
get() { ContextSnapshotFactory.builder().contextRegistry(ContextRegistry.getInstance()).build()
val contextView: ContextView? = context[ReactorContext]?.context
val contextSnapshotFactory = private val coroutinesReactorPresent =
ContextSnapshotFactory.builder().contextRegistry(ContextRegistry.getInstance()).build() ClassUtils.isPresent("kotlinx.coroutines.reactor.ReactorContext",
if (contextView != null) { PropagationContextElement::class.java.classLoader);
return contextSnapshotFactory.captureFrom(contextView) }
}
return contextSnapshotFactory.captureAll() // Context captured from the the ThreadLocal where the PropagationContextElement is instantiated
} private val threadLocalContextSnapshot: ContextSnapshot = contextSnapshotFactory.captureAll()
override fun restoreThreadContext(context: CoroutineContext, oldState: ContextSnapshot.Scope) { override fun restoreThreadContext(context: CoroutineContext, oldState: ContextSnapshot.Scope) {
oldState.close() oldState.close()
} }
override fun updateThreadContext(context: CoroutineContext): ContextSnapshot.Scope { override fun updateThreadContext(context: CoroutineContext): ContextSnapshot.Scope {
val contextSnapshot = if (coroutinesReactorPresent) {
ReactorDelegate().captureFrom(context) ?: threadLocalContextSnapshot
} else {
threadLocalContextSnapshot
}
return contextSnapshot.setThreadLocals() return contextSnapshot.setThreadLocals()
} }
}
private class ReactorDelegate {
fun captureFrom(context: CoroutineContext): ContextSnapshot? {
val contextView: ContextView? = context[ReactorContext]?.context
if (contextView != null) {
return contextSnapshotFactory.captureFrom(contextView)
}
return null;
}
}
}

43
spring-core/src/test/kotlin/org/springframework/core/PropagationContextElementTests.kt

@ -19,18 +19,14 @@ package org.springframework.core
import io.micrometer.observation.Observation import io.micrometer.observation.Observation
import io.micrometer.observation.tck.TestObservationRegistry import io.micrometer.observation.tck.TestObservationRegistry
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.currentCoroutineContext import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import org.assertj.core.api.Assertions import org.assertj.core.api.Assertions
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import org.reactivestreams.Publisher import org.reactivestreams.Publisher
import reactor.core.publisher.Hooks import reactor.core.publisher.Hooks
import reactor.core.publisher.Mono import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
import kotlin.coroutines.Continuation import kotlin.coroutines.Continuation
@ -38,32 +34,18 @@ import kotlin.coroutines.Continuation
* Kotlin tests for [PropagationContextElement]. * Kotlin tests for [PropagationContextElement].
* *
* @author Brian Clozel * @author Brian Clozel
* @author Sebastien Deleuze
*/ */
class PropagationContextElementTests { class PropagationContextElementTests {
private val observationRegistry = TestObservationRegistry.create() private val observationRegistry = TestObservationRegistry.create()
companion object {
@BeforeAll
@JvmStatic
fun init() {
Hooks.enableAutomaticContextPropagation()
}
@AfterAll
@JvmStatic
fun cleanup() {
Hooks.disableAutomaticContextPropagation()
}
}
@Test @Test
fun restoresFromThreadLocal() { fun restoresFromThreadLocal() {
val observation = Observation.createNotStarted("coroutine", observationRegistry) val observation = Observation.createNotStarted("coroutine", observationRegistry)
observation.observe { observation.observe {
val result = runBlocking(Dispatchers.Unconfined) { val coroutineContext = Dispatchers.IO + PropagationContextElement()
val result = runBlocking(coroutineContext) {
suspendingFunction("test") suspendingFunction("test")
} }
Assertions.assertThat(result).isEqualTo("coroutine") Assertions.assertThat(result).isEqualTo("coroutine")
@ -74,20 +56,23 @@ class PropagationContextElementTests {
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
fun restoresFromReactorContext() { fun restoresFromReactorContext() {
val method = PropagationContextElementTests::class.java.getDeclaredMethod("suspendingFunction", String::class.java, Continuation::class.java) val method = PropagationContextElementTests::class.java.getDeclaredMethod("suspendingFunction", String::class.java, Continuation::class.java)
val publisher = CoroutinesUtils.invokeSuspendingFunction(method, this, "test", null) as Publisher<String> val coroutineContext = Dispatchers.IO + PropagationContextElement()
val publisher = CoroutinesUtils.invokeSuspendingFunction(coroutineContext, method, this, "test", null) as Publisher<String>
val observation = Observation.createNotStarted("coroutine", observationRegistry) val observation = Observation.createNotStarted("coroutine", observationRegistry)
Hooks.enableAutomaticContextPropagation()
observation.observe { observation.observe {
val result = Mono.from<String>(publisher).publishOn(Schedulers.boundedElastic()).block() val mono = Mono.from<String>(publisher)
val result = mono.block()
assertThat(result).isEqualTo("coroutine") assertThat(result).isEqualTo("coroutine")
} }
Hooks.disableAutomaticContextPropagation()
} }
suspend fun suspendingFunction(value: String): String? { suspend fun suspendingFunction(value: String): String? {
return withContext(PropagationContextElement(currentCoroutineContext())) { delay(1)
val currentObservation = observationRegistry.currentObservation val currentObservation = observationRegistry.currentObservation
assertThat(currentObservation).isNotNull assertThat(currentObservation).isNotNull
currentObservation?.context?.name return currentObservation?.context?.name
}
} }
} }

Loading…
Cancel
Save