diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractReactiveMongoQuery.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractReactiveMongoQuery.java index 48df07217..77346a5b3 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractReactiveMongoQuery.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractReactiveMongoQuery.java @@ -123,6 +123,10 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery { ResultProcessor processor = method.getResultProcessor().withDynamicProjection(convertingParamterAccessor); Class typeToRead = processor.getReturnedType().getTypeToRead(); + if(typeToRead == null && method.getReturnType().getComponentType() != null) { + typeToRead = method.getReturnType().getComponentType().getType(); + } + return doExecute(method, processor, convertingParamterAccessor, typeToRead); } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/ReactiveStringBasedAggregation.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/ReactiveStringBasedAggregation.java index dddb928fa..9bf3a3381 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/ReactiveStringBasedAggregation.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/ReactiveStringBasedAggregation.java @@ -16,11 +16,11 @@ package org.springframework.data.mongodb.repository.query; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import java.util.List; import org.bson.Document; - import org.springframework.data.mongodb.core.ReactiveMongoOperations; import org.springframework.data.mongodb.core.aggregation.Aggregation; import org.springframework.data.mongodb.core.aggregation.AggregationOperation; @@ -95,7 +95,8 @@ public class ReactiveStringBasedAggregation extends AbstractReactiveMongoQuery { Flux flux = reactiveMongoOperations.aggregate(aggregation, targetType); if (isSimpleReturnType && !isRawReturnType) { - flux = flux.map(it -> AggregationUtils.extractSimpleTypeResult((Document) it, typeToRead, mongoConverter)); + flux = flux.flatMap( + it -> Mono.justOrEmpty(AggregationUtils.extractSimpleTypeResult((Document) it, typeToRead, mongoConverter))); } if (method.isCollectionQuery()) { diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactiveMongoRepositoryTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactiveMongoRepositoryTests.java index a5640f4e4..86d33abe0 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactiveMongoRepositoryTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactiveMongoRepositoryTests.java @@ -27,6 +27,7 @@ import reactor.core.publisher.Mono; import reactor.test.StepVerifier; import java.util.Arrays; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; @@ -36,7 +37,6 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.reactivestreams.Publisher; - import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; @@ -535,6 +535,46 @@ public class ReactiveMongoRepositoryTests { }).verifyComplete(); } + @Test // DATAMONGO-2153 + public void annotatedAggregationWithAggregationResultAsMap() { + + repository.sumAgeAndReturnSumAsMap() // + .as(StepVerifier::create) // + .consumeNextWith(it -> { + assertThat(it).isInstanceOf(Map.class); + }).verifyComplete(); + } + + @Test // DATAMONGO-2403 + public void annotatedAggregationExtractingSimpleValueEmitsEmptyMonoForEmptyDocument() { + + Person p = new Person("project-on-lastanme", null); + repository.save(p).then().as(StepVerifier::create).verifyComplete(); + + repository.projectToLastnameAndRemoveId(p.getFirstname()) // + .as(StepVerifier::create) // + .verifyComplete(); + } + + @Test // DATAMONGO-2403 + public void annotatedAggregationSkipsEmptyDocumentsWhenExtractingSimpleValue() { + + String firstname = "project-on-lastanme"; + + Person p1 = new Person(firstname, null); + p1.setEmail("p1@example.com"); + Person p2 = new Person(firstname, "lastname"); + p2.setEmail("p2@example.com"); + Person p3 = new Person(firstname, null); + p3.setEmail("p3@example.com"); + + repository.saveAll(Arrays.asList(p1, p2, p3)).then().as(StepVerifier::create).verifyComplete(); + + repository.projectToLastnameAndRemoveId(firstname) // + .as(StepVerifier::create) // + .expectNext("lastname").verifyComplete(); + } + interface ReactivePersonRepository extends ReactiveMongoRepository, ReactiveQuerydslPredicateExecutor { @@ -596,6 +636,13 @@ public class ReactiveMongoRepositoryTests { @Aggregation(pipeline = "{ '$group' : { '_id' : null, 'total' : { $sum: '$age' } } }") Mono sumAgeAndReturnSumWrapper(); + @Aggregation(pipeline = "{ '$group' : { '_id' : null, 'total' : { $sum: '$age' } } }") + Mono sumAgeAndReturnSumAsMap(); + + @Aggregation( + pipeline = { "{ '$match' : { 'firstname' : '?0' } }", "{ '$project' : { '_id' : 0, 'lastname' : 1 } }" }) + Mono projectToLastnameAndRemoveId(String firstname); + @Query(value = "{_id:?0}") Mono findDocumentById(String id); }