Browse Source

Use Dispatchers.Unconfined for Coroutines

As of Coroutines 1.2.0-alpha, Dispatchers.Unconfined
is a stable API so we can leverage it in order to get
better performances in our Reactive to Coroutines
bridge.

See gh-19975
pull/27391/head
Sebastien Deleuze 7 years ago
parent
commit
beb491b840
  1. 10
      spring-core-coroutines/src/main/kotlin/org/springframework/core/CoroutinesUtils.kt
  2. 3
      spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/client/WebClientExtensions.kt
  3. 5
      spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/server/CoRouterFunctionDsl.kt
  4. 3
      spring-webflux/src/main/kotlin/org/springframework/web/reactive/server/ServerWebExchangeExtensions.kt

10
spring-core-coroutines/src/main/kotlin/org/springframework/core/CoroutinesUtils.kt

@ -18,6 +18,7 @@ @@ -18,6 +18,7 @@
package org.springframework.core
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.reactive.awaitFirstOrNull
@ -36,7 +37,8 @@ import kotlin.reflect.jvm.kotlinFunction @@ -36,7 +37,8 @@ import kotlin.reflect.jvm.kotlinFunction
* @author Sebastien Deleuze
* @since 5.2
*/
internal fun <T: Any> deferredToMono(source: Deferred<T>) = GlobalScope.mono { source.await() }
internal fun <T: Any> deferredToMono(source: Deferred<T>) =
GlobalScope.mono(Dispatchers.Unconfined) { source.await() }
/**
* Convert a [Mono] instance to a [Deferred] one.
@ -44,7 +46,8 @@ internal fun <T: Any> deferredToMono(source: Deferred<T>) = GlobalScope.mono { s @@ -44,7 +46,8 @@ internal fun <T: Any> deferredToMono(source: Deferred<T>) = GlobalScope.mono { s
* @author Sebastien Deleuze
* @since 5.2
*/
internal fun <T: Any> monoToDeferred(source: Mono<T>) = GlobalScope.async { source.awaitFirstOrNull() }
internal fun <T: Any> monoToDeferred(source: Mono<T>) =
GlobalScope.async(Dispatchers.Unconfined) { source.awaitFirstOrNull() }
/**
* Invoke an handler method converting suspending method to [Mono] if necessary.
@ -55,7 +58,8 @@ internal fun <T: Any> monoToDeferred(source: Mono<T>) = GlobalScope.async { sour @@ -55,7 +58,8 @@ internal fun <T: Any> monoToDeferred(source: Mono<T>) = GlobalScope.async { sour
internal fun invokeHandlerMethod(method: Method, bean: Any, vararg args: Any?): Any? {
val function = method.kotlinFunction!!
return if (function.isSuspend) {
GlobalScope.mono { function.callSuspend(bean, *args.sliceArray(0..(args.size-2)))
GlobalScope.mono(Dispatchers.Unconfined) {
function.callSuspend(bean, *args.sliceArray(0..(args.size-2)))
.let { if (it == Unit) null else it} }
.onErrorMap(InvocationTargetException::class) { it.targetException }
}

3
spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/client/WebClientExtensions.kt

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
package org.springframework.web.reactive.function.client
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactor.mono
@ -77,7 +78,7 @@ suspend fun WebClient.RequestHeadersSpec<out WebClient.RequestHeadersSpec<*>>.aw @@ -77,7 +78,7 @@ suspend fun WebClient.RequestHeadersSpec<out WebClient.RequestHeadersSpec<*>>.aw
* @since 5.2
*/
inline fun <reified T: Any> WebClient.RequestBodySpec.body(crossinline supplier: suspend () -> T)
= body(GlobalScope.mono { supplier.invoke() })
= body(GlobalScope.mono(Dispatchers.Unconfined) { supplier.invoke() })
/**
* Coroutines variant of [WebClient.ResponseSpec.bodyToMono].

5
spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/server/CoRouterFunctionDsl.kt

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
package org.springframework.web.reactive.function.server
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.reactor.mono
import org.springframework.core.io.Resource
@ -389,7 +390,7 @@ open class CoRouterFunctionDsl(private val init: (CoRouterFunctionDsl.() -> Unit @@ -389,7 +390,7 @@ open class CoRouterFunctionDsl(private val init: (CoRouterFunctionDsl.() -> Unit
*/
fun resources(lookupFunction: suspend (ServerRequest) -> Resource?) {
builder.resources {
GlobalScope.mono {
GlobalScope.mono(Dispatchers.Unconfined) {
lookupFunction.invoke(it)
}
}
@ -404,7 +405,7 @@ open class CoRouterFunctionDsl(private val init: (CoRouterFunctionDsl.() -> Unit @@ -404,7 +405,7 @@ open class CoRouterFunctionDsl(private val init: (CoRouterFunctionDsl.() -> Unit
}
private fun asHandlerFunction(init: suspend (ServerRequest) -> ServerResponse) = HandlerFunction {
GlobalScope.mono {
GlobalScope.mono(Dispatchers.Unconfined) {
init(it)
}
}

3
spring-webflux/src/main/kotlin/org/springframework/web/reactive/server/ServerWebExchangeExtensions.kt

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
package org.springframework.web.reactive.server
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactor.mono
@ -68,4 +69,4 @@ suspend fun ServerWebExchange.awaitSession(): WebSession = @@ -68,4 +69,4 @@ suspend fun ServerWebExchange.awaitSession(): WebSession =
* @since 5.2
*/
fun ServerWebExchange.Builder.principal(supplier: suspend () -> Principal): ServerWebExchange.Builder
= principal(GlobalScope.mono { supplier.invoke() })
= principal(GlobalScope.mono(Dispatchers.Unconfined) { supplier.invoke() })

Loading…
Cancel
Save