diff --git a/spring-data-mongodb/pom.xml b/spring-data-mongodb/pom.xml
index b722877d4..004e163bd 100644
--- a/spring-data-mongodb/pom.xml
+++ b/spring-data-mongodb/pom.xml
@@ -272,9 +272,17 @@
true
- org.jetbrains.kotlin
- kotlin-test
- test
+ org.jetbrains.kotlinx
+ kotlinx-coroutines-core
+ ${kotlin-coroutines}
+ true
+
+
+
+ org.jetbrains.kotlinx
+ kotlinx-coroutines-reactor
+ ${kotlin-coroutines}
+ true
diff --git a/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveFindOperationExtensions.kt b/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveFindOperationExtensions.kt
index 3cd79195c..96138a1d4 100644
--- a/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveFindOperationExtensions.kt
+++ b/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveFindOperationExtensions.kt
@@ -15,6 +15,8 @@
*/
package org.springframework.data.mongodb.core
+import kotlinx.coroutines.reactive.awaitFirstOrNull
+import kotlinx.coroutines.reactive.awaitSingle
import kotlin.reflect.KClass
/**
@@ -70,3 +72,39 @@ fun ReactiveFindOperation.DistinctWithProjection.asType(resultType: KC
*/
inline fun ReactiveFindOperation.DistinctWithProjection.asType(): ReactiveFindOperation.DistinctWithProjection =
`as`(T::class.java)
+
+/**
+ * Coroutines variant of [ReactiveFindOperation.TerminatingFind.one].
+ *
+ * @author Sebastien Deleuze
+ * @since 2.2
+ */
+suspend inline fun ReactiveFindOperation.TerminatingFind.awaitOne(): T? =
+ one().awaitFirstOrNull()
+
+/**
+ * Coroutines variant of [ReactiveFindOperation.TerminatingFind.first].
+ *
+ * @author Sebastien Deleuze
+ * @since 2.2
+ */
+suspend inline fun ReactiveFindOperation.TerminatingFind.awaitFirst(): T? =
+ first().awaitFirstOrNull()
+
+/**
+ * Coroutines variant of [ReactiveFindOperation.TerminatingFind.count].
+ *
+ * @author Sebastien Deleuze
+ * @since 2.2
+ */
+suspend fun ReactiveFindOperation.TerminatingFind.awaitCount(): Long =
+ count().awaitSingle()
+
+/**
+ * Coroutines variant of [ReactiveFindOperation.TerminatingFind.exists].
+ *
+ * @author Sebastien Deleuze
+ * @since 2.2
+ */
+suspend fun ReactiveFindOperation.TerminatingFind.awaitExists(): Boolean =
+ exists().awaitSingle()
diff --git a/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveInsertOperationExtensions.kt b/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveInsertOperationExtensions.kt
index d197f5ada..7c6a40df4 100644
--- a/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveInsertOperationExtensions.kt
+++ b/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveInsertOperationExtensions.kt
@@ -15,6 +15,7 @@
*/
package org.springframework.data.mongodb.core
+import kotlinx.coroutines.reactive.awaitSingle
import kotlin.reflect.KClass
/**
@@ -34,3 +35,12 @@ fun ReactiveInsertOperation.insert(entityClass: KClass): ReactiveIn
*/
inline fun ReactiveInsertOperation.insert(): ReactiveInsertOperation.ReactiveInsert =
insert(T::class.java)
+
+/**
+ * Coroutines variant of [ReactiveInsertOperation.TerminatingInsert.one].
+ *
+ * @author Sebastien Deleuze
+ * @since 2.2
+ */
+suspend inline fun ReactiveInsertOperation.TerminatingInsert.oneAndAwait(o: T): T =
+ one(o).awaitSingle()
diff --git a/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveRemoveOperationExtensions.kt b/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveRemoveOperationExtensions.kt
index 0b5cf6470..79585166a 100644
--- a/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveRemoveOperationExtensions.kt
+++ b/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveRemoveOperationExtensions.kt
@@ -15,6 +15,8 @@
*/
package org.springframework.data.mongodb.core
+import com.mongodb.client.result.DeleteResult
+import kotlinx.coroutines.reactive.awaitSingle
import kotlin.reflect.KClass
/**
@@ -34,3 +36,12 @@ fun ReactiveRemoveOperation.remove(entityClass: KClass): ReactiveRe
*/
inline fun ReactiveRemoveOperation.remove(): ReactiveRemoveOperation.ReactiveRemove =
remove(T::class.java)
+
+/**
+ * Coroutines variant of [ReactiveRemoveOperation.TerminatingRemove.all].
+ *
+ * @author Sebastien Deleuze
+ * @since 2.2
+ */
+suspend fun ReactiveRemoveOperation.TerminatingRemove.allAndAwait(): DeleteResult =
+ all().awaitSingle()
diff --git a/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveUpdateOperationExtensions.kt b/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveUpdateOperationExtensions.kt
index 3cbbd02b2..393bdf6f7 100644
--- a/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveUpdateOperationExtensions.kt
+++ b/spring-data-mongodb/src/main/kotlin/org/springframework/data/mongodb/core/ReactiveUpdateOperationExtensions.kt
@@ -15,6 +15,9 @@
*/
package org.springframework.data.mongodb.core
+import com.mongodb.client.result.UpdateResult
+import kotlinx.coroutines.reactive.awaitFirstOrNull
+import kotlinx.coroutines.reactive.awaitSingle
import kotlin.reflect.KClass
/**
@@ -34,3 +37,57 @@ fun ReactiveUpdateOperation.update(entityClass: KClass): ReactiveUp
*/
inline fun ReactiveUpdateOperation.update(): ReactiveUpdateOperation.ReactiveUpdate =
update(T::class.java)
+
+/**
+ * Coroutines variant of [ReactiveUpdateOperation.TerminatingFindAndModify.findModifyAndAwait].
+ *
+ * @author Sebastien Deleuze
+ * @since 2.2
+ */
+suspend fun ReactiveUpdateOperation.TerminatingFindAndModify.findModifyAndAwait(): T? =
+ findAndModify().awaitFirstOrNull()
+
+
+/**
+ * Coroutines variant of [ReactiveUpdateOperation.TerminatingFindAndReplace.findAndReplace].
+ *
+ * @author Sebastien Deleuze
+ * @since 2.2
+ */
+suspend fun ReactiveUpdateOperation.TerminatingFindAndReplace.findReplaceAndAwait(): T? =
+ findAndReplace().awaitFirstOrNull()
+
+/**
+ * Coroutines variant of [ReactiveUpdateOperation.TerminatingUpdate.all].
+ *
+ * @author Sebastien Deleuze
+ * @since 2.2
+ */
+suspend fun ReactiveUpdateOperation.TerminatingUpdate.allAndAwait(): UpdateResult =
+ all().awaitSingle()
+
+/**
+ * Coroutines variant of [ReactiveUpdateOperation.TerminatingUpdate.first].
+ *
+ * @author Sebastien Deleuze
+ * @since 2.2
+ */
+suspend fun ReactiveUpdateOperation.TerminatingUpdate.firstAndAwait(): UpdateResult =
+ first().awaitSingle()
+
+/**
+ * Coroutines variant of [ReactiveUpdateOperation.TerminatingUpdate.upsert].
+ *
+ * @author Sebastien Deleuze
+ * @since 2.2
+ */
+suspend fun ReactiveUpdateOperation.TerminatingUpdate.upsertAndAwait(): UpdateResult = upsert().awaitSingle()
+
+/**
+ * Extension for [ReactiveUpdateOperation.FindAndReplaceWithProjection.as] leveraging reified type parameters.
+ *
+ * @author Sebastien Deleuze
+ * @since 2.2
+ */
+inline fun ReactiveUpdateOperation.FindAndReplaceWithProjection.asType(): ReactiveUpdateOperation.FindAndReplaceWithOptions =
+ `as`(T::class.java)
diff --git a/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveFindOperationExtensionsTests.kt b/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveFindOperationExtensionsTests.kt
index c13ea4f9a..53185d6ce 100644
--- a/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveFindOperationExtensionsTests.kt
+++ b/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveFindOperationExtensionsTests.kt
@@ -16,9 +16,13 @@
package org.springframework.data.mongodb.core
import example.first.First
+import io.mockk.every
import io.mockk.mockk
import io.mockk.verify
+import kotlinx.coroutines.runBlocking
+import org.junit.Assert.assertEquals
import org.junit.Test
+import reactor.core.publisher.Mono
/**
* @author Mark Paluch
@@ -73,4 +77,52 @@ class ReactiveFindOperationExtensionsTests {
distinctWithProjection.asType()
verify { distinctWithProjection.`as`(User::class.java) }
}
+
+ @Test
+ fun terminatingFindAwaitOne() {
+ val find = mockk>()
+ every { find.one() } returns Mono.just("foo")
+ runBlocking {
+ assertEquals("foo", find.awaitOne())
+ }
+ verify {
+ find.one()
+ }
+ }
+
+ @Test
+ fun terminatingFindAwaitFirst() {
+ val find = mockk>()
+ every { find.first() } returns Mono.just("foo")
+ runBlocking {
+ assertEquals("foo", find.awaitFirst())
+ }
+ verify {
+ find.first()
+ }
+ }
+
+ @Test
+ fun terminatingFindAwaitCount() {
+ val find = mockk>()
+ every { find.count() } returns Mono.just(1)
+ runBlocking {
+ assertEquals(1, find.awaitCount())
+ }
+ verify {
+ find.count()
+ }
+ }
+
+ @Test
+ fun terminatingFindAwaitExists() {
+ val find = mockk>()
+ every { find.exists() } returns Mono.just(true)
+ runBlocking {
+ assertEquals(true, find.awaitExists())
+ }
+ verify {
+ find.exists()
+ }
+ }
}
diff --git a/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveInsertOperationExtensionsTests.kt b/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveInsertOperationExtensionsTests.kt
index ccfa52866..5e2b00507 100644
--- a/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveInsertOperationExtensionsTests.kt
+++ b/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveInsertOperationExtensionsTests.kt
@@ -16,9 +16,13 @@
package org.springframework.data.mongodb.core
import example.first.First
+import io.mockk.every
import io.mockk.mockk
import io.mockk.verify
+import kotlinx.coroutines.runBlocking
+import org.junit.Assert.assertEquals
import org.junit.Test
+import reactor.core.publisher.Mono
/**
* @author Mark Paluch
@@ -41,4 +45,16 @@ class ReactiveInsertOperationExtensionsTests {
operation.insert()
verify { operation.insert(First::class.java) }
}
+
+ @Test
+ fun terminatingFindAwaitOne() {
+ val find = mockk>()
+ every { find.one("foo") } returns Mono.just("foo")
+ runBlocking {
+ assertEquals("foo", find.oneAndAwait("foo"))
+ }
+ verify {
+ find.one("foo")
+ }
+ }
}
diff --git a/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveRemoveOperationExtensionsTests.kt b/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveRemoveOperationExtensionsTests.kt
index f6fb4f478..3da9615dd 100644
--- a/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveRemoveOperationExtensionsTests.kt
+++ b/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveRemoveOperationExtensionsTests.kt
@@ -15,10 +15,15 @@
*/
package org.springframework.data.mongodb.core
+import com.mongodb.client.result.DeleteResult
import example.first.First
+import io.mockk.every
import io.mockk.mockk
import io.mockk.verify
+import kotlinx.coroutines.runBlocking
+import org.junit.Assert.assertEquals
import org.junit.Test
+import reactor.core.publisher.Mono
/**
* @author Mark Paluch
@@ -41,4 +46,17 @@ class ReactiveRemoveOperationExtensionsTests {
operation.remove()
verify { operation.remove(First::class.java) }
}
+
+ @Test
+ fun allAndAwait() {
+ val remove = mockk>()
+ val result = mockk()
+ every { remove.all() } returns Mono.just(result)
+ runBlocking {
+ assertEquals(result, remove.allAndAwait())
+ }
+ verify {
+ remove.all()
+ }
+ }
}
diff --git a/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveUpdateOperationExtensionsTests.kt b/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveUpdateOperationExtensionsTests.kt
index 3e571bb84..591d1ef4d 100644
--- a/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveUpdateOperationExtensionsTests.kt
+++ b/spring-data-mongodb/src/test/kotlin/org/springframework/data/mongodb/core/ReactiveUpdateOperationExtensionsTests.kt
@@ -15,10 +15,15 @@
*/
package org.springframework.data.mongodb.core
+import com.mongodb.client.result.UpdateResult
import example.first.First
+import io.mockk.every
import io.mockk.mockk
import io.mockk.verify
+import kotlinx.coroutines.runBlocking
+import org.junit.Assert.assertEquals
import org.junit.Test
+import reactor.core.publisher.Mono
/**
* Unit tests for `ReactiveExecutableUpdateOperationExtensions.kt`.
@@ -43,4 +48,78 @@ class ReactiveUpdateOperationExtensionsTests {
operation.update()
verify { operation.update(First::class.java) }
}
+
+ @Test
+ fun findModifyAndAwait() {
+ val find = mockk>()
+ every { find.findAndModify() } returns Mono.just("foo")
+ runBlocking {
+ assertEquals("foo", find.findModifyAndAwait())
+ }
+ verify {
+ find.findAndModify()
+ }
+ }
+
+ @Test
+ fun findReplaceAndAwait() {
+ val find = mockk>()
+ every { find.findAndReplace() } returns Mono.just("foo")
+ runBlocking {
+ assertEquals("foo", find.findReplaceAndAwait())
+ }
+ verify {
+ find.findAndReplace()
+ }
+ }
+
+ @Test
+ fun allAndAwait() {
+ val update = mockk>()
+ val result = mockk()
+ every { update.all() } returns Mono.just(result)
+ runBlocking {
+ assertEquals(result, update.allAndAwait())
+ }
+ verify {
+ update.all()
+ }
+ }
+
+ @Test
+ fun firstAndAwait() {
+ val update = mockk>()
+ val result = mockk()
+ every { update.first() } returns Mono.just(result)
+ runBlocking {
+ assertEquals(result, update.firstAndAwait())
+ }
+ verify {
+ update.first()
+ }
+ }
+
+ @Test
+ fun upsertAndAwait() {
+ val update = mockk>()
+ val result = mockk()
+ every { update.upsert() } returns Mono.just(result)
+ runBlocking {
+ assertEquals(result, update.upsertAndAwait())
+ }
+ verify {
+ update.upsert()
+ }
+ }
+
+ @Test
+ fun findAndReplaceWithProjectionAsType() {
+ val update = mockk>()
+ val result = mockk>()
+ every { update.`as`(String::class.java) } returns result
+ assertEquals(result, update.asType())
+ verify {
+ update.`as`(String::class.java)
+ }
+ }
}