@ -25,13 +25,18 @@ import kotlinx.coroutines.flow.Flow
@@ -25,13 +25,18 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.reactivestreams.Publisher
import org.springframework.core.ParameterizedTypeReference
import org.springframework.http.HttpHeaders
import org.springframework.http.HttpStatus
import org.springframework.http.ResponseEntity
import org.springframework.web.reactive.function.client.CoExchangeFilterFunction.Companion.COROUTINE_CONTEXT_ATTRIBUTE
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.time.Duration
import java.util.concurrent.CompletableFuture
import java.util.function.Function
import kotlin.coroutines.AbstractCoroutineContextElement
@ -41,6 +46,7 @@ import kotlin.coroutines.CoroutineContext
@@ -41,6 +46,7 @@ import kotlin.coroutines.CoroutineContext
* Mock object based tests for [ WebClient ] Kotlin extensions
*
* @author Sebastien Deleuze
* @author Dmitry Sulman
* /
class WebClientExtensionsTests {
@ -226,6 +232,225 @@ class WebClientExtensionsTests {
@@ -226,6 +232,225 @@ class WebClientExtensionsTests {
verify { responseSpec . toEntityFlux ( object : ParameterizedTypeReference < List < Foo > > ( ) { } ) }
}
@Test
fun `ResponseSpec#awaitEntity with coroutine context propagation` ( ) {
val exchangeFunction = mockk < ExchangeFunction > ( )
val mockResponse = mockk < ClientResponse > ( )
val mockClientHeaders = mockk < ClientResponse . Headers > ( )
val foo = mockk < Foo > ( )
val slot = slot < ClientRequest > ( )
every { exchangeFunction . exchange ( capture ( slot ) ) } returns Mono . just ( mockResponse )
every { mockResponse . statusCode ( ) } returns HttpStatus . OK
every { mockResponse . headers ( ) } returns mockClientHeaders
every { mockClientHeaders . asHttpHeaders ( ) } returns HttpHeaders ( )
every { mockResponse . bodyToMono ( Foo :: class . java ) } returns Mono . just ( foo )
runBlocking ( FooContextElement ( foo ) ) {
val responseEntity = WebClient . builder ( )
. exchangeFunction ( exchangeFunction )
. filter ( object : CoExchangeFilterFunction ( ) {
override suspend fun filter ( request : ClientRequest , next : CoExchangeFunction ) : ClientResponse {
assertThat ( currentCoroutineContext ( ) [ FooContextElement . Key ] !! . foo ) . isEqualTo ( foo )
return next . exchange ( request )
}
} )
. build ( ) . get ( ) . uri ( " /path " ) . retrieve ( ) . awaitEntity < Foo > ( )
val capturedContext = slot . captured . attribute ( COROUTINE _CONTEXT _ATTRIBUTE ) . get ( ) as CoroutineContext
assertThat ( capturedContext [ FooContextElement . Key ] !! . foo ) . isEqualTo ( foo )
assertThat ( responseEntity . body ) . isEqualTo ( foo )
}
}
@Test
fun `ResponseSpec#awaitEntity with coroutine context propagation to multiple CoExchangeFilterFunctions` ( ) {
val exchangeFunction = mockk < ExchangeFunction > ( )
val mockResponse = mockk < ClientResponse > ( )
val mockClientHeaders = mockk < ClientResponse . Headers > ( )
val foo = mockk < Foo > ( )
val slot = slot < ClientRequest > ( )
every { exchangeFunction . exchange ( capture ( slot ) ) } returns Mono . just ( mockResponse )
every { mockResponse . statusCode ( ) } returns HttpStatus . OK
every { mockResponse . headers ( ) } returns mockClientHeaders
every { mockClientHeaders . asHttpHeaders ( ) } returns HttpHeaders ( )
every { mockResponse . bodyToMono ( Foo :: class . java ) } returns Mono . just ( foo )
runBlocking {
val responseEntity = WebClient . builder ( )
. exchangeFunction ( exchangeFunction )
. filter ( object : CoExchangeFilterFunction ( ) {
override suspend fun filter ( request : ClientRequest , next : CoExchangeFunction ) : ClientResponse {
return withContext ( FooContextElement ( foo ) ) { next . exchange ( request ) }
}
} )
. filter ( object : CoExchangeFilterFunction ( ) {
override suspend fun filter ( request : ClientRequest , next : CoExchangeFunction ) : ClientResponse {
assertThat ( currentCoroutineContext ( ) [ FooContextElement . Key ] !! . foo ) . isEqualTo ( foo )
return next . exchange ( request )
}
} )
. build ( ) . get ( ) . uri ( " /path " ) . retrieve ( ) . awaitEntity < Foo > ( )
val capturedContext = slot . captured . attribute ( COROUTINE _CONTEXT _ATTRIBUTE ) . get ( ) as CoroutineContext
assertThat ( capturedContext [ FooContextElement . Key ] !! . foo ) . isEqualTo ( foo )
assertThat ( responseEntity . body ) . isEqualTo ( foo )
}
}
@Test
fun `ResponseSpec#toEntity with coroutine context propagation to multiple CoExchangeFilterFunctions` ( ) {
val exchangeFunction = mockk < ExchangeFunction > ( )
val mockResponse = mockk < ClientResponse > ( )
val mockClientHeaders = mockk < ClientResponse . Headers > ( )
val foo = mockk < Foo > ( )
val slot = slot < ClientRequest > ( )
every { exchangeFunction . exchange ( capture ( slot ) ) } returns Mono . just ( mockResponse )
every { mockResponse . statusCode ( ) } returns HttpStatus . OK
every { mockResponse . headers ( ) } returns mockClientHeaders
every { mockClientHeaders . asHttpHeaders ( ) } returns HttpHeaders ( )
every { mockResponse . bodyToMono ( Foo :: class . java ) } returns Mono . just ( foo )
val responseEntity = WebClient . builder ( )
. exchangeFunction ( exchangeFunction )
. filter ( object : CoExchangeFilterFunction ( ) {
override suspend fun filter ( request : ClientRequest , next : CoExchangeFunction ) : ClientResponse {
return withContext ( FooContextElement ( foo ) ) { next . exchange ( request ) }
}
} )
. filter ( object : CoExchangeFilterFunction ( ) {
override suspend fun filter ( request : ClientRequest , next : CoExchangeFunction ) : ClientResponse {
assertThat ( currentCoroutineContext ( ) [ FooContextElement . Key ] !! . foo ) . isEqualTo ( foo )
return next . exchange ( request )
}
} )
. build ( ) . get ( ) . uri ( " /path " ) . retrieve ( ) . toEntity ( Foo :: class . java )
. block ( Duration . ofSeconds ( 10 ) )
val capturedContext = slot . captured . attribute ( COROUTINE _CONTEXT _ATTRIBUTE ) . get ( ) as CoroutineContext
assertThat ( capturedContext [ FooContextElement . Key ] !! . foo ) . isEqualTo ( foo )
assertThat ( responseEntity !! . body ) . isEqualTo ( foo )
}
@Test
fun `ResponseSpec#awaitExchange with coroutine context propagation` ( ) {
val exchangeFunction = mockk < ExchangeFunction > ( )
val mockResponse = mockk < ClientResponse > ( )
val foo = mockk < Foo > ( )
val slot = slot < ClientRequest > ( )
every { exchangeFunction . exchange ( capture ( slot ) ) } returns Mono . just ( mockResponse )
every { mockResponse . releaseBody ( ) } returns Mono . empty ( )
runBlocking ( FooContextElement ( foo ) ) {
val responseBody = WebClient . builder ( )
. exchangeFunction ( exchangeFunction )
. filter ( object : CoExchangeFilterFunction ( ) {
override suspend fun filter ( request : ClientRequest , next : CoExchangeFunction ) : ClientResponse {
assertThat ( currentCoroutineContext ( ) [ FooContextElement . Key ] !! . foo ) . isEqualTo ( foo )
return next . exchange ( request )
}
} )
. build ( ) . get ( ) . uri ( " /path " ) . awaitExchange { foo }
val capturedContext = slot . captured . attribute ( COROUTINE _CONTEXT _ATTRIBUTE ) . get ( ) as CoroutineContext
assertThat ( capturedContext [ FooContextElement . Key ] !! . foo ) . isEqualTo ( foo )
assertThat ( responseBody ) . isEqualTo ( foo )
}
}
@Test
fun `ResponseSpec#awaitExchangeOrNull with coroutine context propagation` ( ) {
val exchangeFunction = mockk < ExchangeFunction > ( )
val mockResponse = mockk < ClientResponse > ( )
val foo = mockk < Foo > ( )
val slot = slot < ClientRequest > ( )
every { exchangeFunction . exchange ( capture ( slot ) ) } returns Mono . just ( mockResponse )
every { mockResponse . releaseBody ( ) } returns Mono . empty ( )
runBlocking ( FooContextElement ( foo ) ) {
val responseBody = WebClient . builder ( )
. exchangeFunction ( exchangeFunction )
. filter ( object : CoExchangeFilterFunction ( ) {
override suspend fun filter ( request : ClientRequest , next : CoExchangeFunction ) : ClientResponse {
assertThat ( currentCoroutineContext ( ) [ FooContextElement . Key ] !! . foo ) . isEqualTo ( foo )
return next . exchange ( request )
}
} )
. build ( ) . get ( ) . uri ( " /path " ) . awaitExchangeOrNull { foo }
val capturedContext = slot . captured . attribute ( COROUTINE _CONTEXT _ATTRIBUTE ) . get ( ) as CoroutineContext
assertThat ( capturedContext [ FooContextElement . Key ] !! . foo ) . isEqualTo ( foo )
assertThat ( responseBody ) . isEqualTo ( foo )
}
}
@Test
fun `ResponseSpec#awaitBody with coroutine context propagation` ( ) {
val exchangeFunction = mockk < ExchangeFunction > ( )
val mockResponse = mockk < ClientResponse > ( )
val foo = mockk < Foo > ( )
val slot = slot < ClientRequest > ( )
every { exchangeFunction . exchange ( capture ( slot ) ) } returns Mono . just ( mockResponse )
every { mockResponse . statusCode ( ) } returns HttpStatus . OK
every { mockResponse . bodyToMono ( object : ParameterizedTypeReference < Foo > ( ) { } ) } returns Mono . just ( foo )
runBlocking ( FooContextElement ( foo ) ) {
val responseBody = WebClient . builder ( )
. exchangeFunction ( exchangeFunction )
. filter ( object : CoExchangeFilterFunction ( ) {
override suspend fun filter ( request : ClientRequest , next : CoExchangeFunction ) : ClientResponse {
assertThat ( currentCoroutineContext ( ) [ FooContextElement . Key ] !! . foo ) . isEqualTo ( foo )
return next . exchange ( request )
}
} )
. build ( ) . get ( ) . uri ( " /path " ) . retrieve ( ) . awaitBody < Foo > ( )
val capturedContext = slot . captured . attribute ( COROUTINE _CONTEXT _ATTRIBUTE ) . get ( ) as CoroutineContext
assertThat ( capturedContext [ FooContextElement . Key ] !! . foo ) . isEqualTo ( foo )
assertThat ( responseBody ) . isEqualTo ( foo )
}
}
@Test
fun `ResponseSpec#awaitBodyOrNull with coroutine context propagation` ( ) {
val exchangeFunction = mockk < ExchangeFunction > ( )
val mockResponse = mockk < ClientResponse > ( )
val foo = mockk < Foo > ( )
val slot = slot < ClientRequest > ( )
every { exchangeFunction . exchange ( capture ( slot ) ) } returns Mono . just ( mockResponse )
every { mockResponse . statusCode ( ) } returns HttpStatus . OK
every { mockResponse . bodyToMono ( object : ParameterizedTypeReference < Foo > ( ) { } ) } returns Mono . just ( foo )
runBlocking ( FooContextElement ( foo ) ) {
val responseBody = WebClient . builder ( )
. exchangeFunction ( exchangeFunction )
. filter ( object : CoExchangeFilterFunction ( ) {
override suspend fun filter ( request : ClientRequest , next : CoExchangeFunction ) : ClientResponse {
assertThat ( currentCoroutineContext ( ) [ FooContextElement . Key ] !! . foo ) . isEqualTo ( foo )
return next . exchange ( request )
}
} )
. build ( ) . get ( ) . uri ( " /path " ) . retrieve ( ) . awaitBodyOrNull < Foo > ( )
val capturedContext = slot . captured . attribute ( COROUTINE _CONTEXT _ATTRIBUTE ) . get ( ) as CoroutineContext
assertThat ( capturedContext [ FooContextElement . Key ] !! . foo ) . isEqualTo ( foo )
assertThat ( responseBody ) . isEqualTo ( foo )
}
}
@Test
fun `ResponseSpec#awaitBodilessEntity with coroutine context propagation` ( ) {
val exchangeFunction = mockk < ExchangeFunction > ( )
val mockResponse = mockk < ClientResponse > ( )
val mockClientHeaders = mockk < ClientResponse . Headers > ( )
val foo = mockk < Foo > ( )
val slot = slot < ClientRequest > ( )
every { exchangeFunction . exchange ( capture ( slot ) ) } returns Mono . just ( mockResponse )
every { mockResponse . statusCode ( ) } returns HttpStatus . OK
every { mockResponse . releaseBody ( ) } returns Mono . empty ( )
every { mockResponse . headers ( ) } returns mockClientHeaders
every { mockClientHeaders . asHttpHeaders ( ) } returns HttpHeaders ( )
runBlocking ( FooContextElement ( foo ) ) {
val responseEntity = WebClient . builder ( )
. exchangeFunction ( exchangeFunction )
. filter ( object : CoExchangeFilterFunction ( ) {
override suspend fun filter ( request : ClientRequest , next : CoExchangeFunction ) : ClientResponse {
assertThat ( currentCoroutineContext ( ) [ FooContextElement . Key ] !! . foo ) . isEqualTo ( foo )
return next . exchange ( request )
}
} )
. build ( ) . get ( ) . uri ( " /path " ) . retrieve ( ) . awaitBodilessEntity ( )
val capturedContext = slot . captured . attribute ( COROUTINE _CONTEXT _ATTRIBUTE ) . get ( ) as CoroutineContext
assertThat ( capturedContext [ FooContextElement . Key ] !! . foo ) . isEqualTo ( foo )
assertThat ( responseEntity . hasBody ( ) ) . isEqualTo ( false )
}
}
class Foo
private data class FooContextElement ( val foo : Foo ) : AbstractCoroutineContextElement ( FooContextElement ) {