From c8e13ae7e22388cd3e54d19407effd472bbfe095 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Thu, 28 Nov 2024 10:43:34 +0100 Subject: [PATCH] Refactor String-based repository aggretation methods into common utility callback. Also, support aggregation result projections for reactive flows. See #4839 Original pull request: #4841 --- .../query/AbstractReactiveMongoQuery.java | 46 +++--- .../repository/query/AggregationUtils.java | 125 ++++++++++++++++- .../query/ReactiveStringBasedAggregation.java | 77 +++------- .../query/StringBasedAggregation.java | 132 ++++++------------ ...tractPersonRepositoryIntegrationTests.java | 12 ++ .../mongodb/repository/PersonRepository.java | 5 +- .../ReactiveMongoRepositoryTests.java | 15 ++ .../repository/ReactivePersonRepository.java | 1 + 8 files changed, 243 insertions(+), 170 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractReactiveMongoQuery.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractReactiveMongoQuery.java index 15ff5e5e2..0209ab05b 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractReactiveMongoQuery.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractReactiveMongoQuery.java @@ -17,6 +17,7 @@ package org.springframework.data.mongodb.repository.query; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; import java.util.ArrayList; import java.util.List; @@ -61,7 +62,6 @@ import org.springframework.data.repository.query.RepositoryQuery; import org.springframework.data.repository.query.ResultProcessor; import org.springframework.data.repository.query.ValueExpressionDelegate; import org.springframework.data.spel.ExpressionDependencies; -import org.springframework.data.util.TypeInformation; import org.springframework.expression.ExpressionParser; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.lang.Nullable; @@ -70,7 +70,6 @@ import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; import com.mongodb.MongoClientSettings; -import reactor.util.function.Tuple2; /** * Base class for reactive {@link RepositoryQuery} implementations for MongoDB. @@ -112,16 +111,20 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery { this.method = method; this.operations = operations; this.instantiators = new EntityInstantiators(); - this.valueExpressionDelegate = new ValueExpressionDelegate(new QueryMethodValueEvaluationContextAccessor(new StandardEnvironment(), evaluationContextProvider.getEvaluationContextProvider()), ValueExpressionParser.create(() -> expressionParser)); + this.valueExpressionDelegate = new ValueExpressionDelegate( + new QueryMethodValueEvaluationContextAccessor(new StandardEnvironment(), + evaluationContextProvider.getEvaluationContextProvider()), + ValueExpressionParser.create(() -> expressionParser)); MongoEntityMetadata metadata = method.getEntityInformation(); Class type = metadata.getCollectionEntity().getType(); this.findOperationWithProjection = operations.query(type); this.updateOps = operations.update(type); - ValueEvaluationContextProvider valueContextProvider = valueExpressionDelegate.createValueContextProvider( - method.getParameters()); - Assert.isInstanceOf(ReactiveValueEvaluationContextProvider.class, valueContextProvider, "ValueEvaluationContextProvider must be reactive"); + ValueEvaluationContextProvider valueContextProvider = valueExpressionDelegate + .createValueContextProvider(method.getParameters()); + Assert.isInstanceOf(ReactiveValueEvaluationContextProvider.class, valueContextProvider, + "ValueEvaluationContextProvider must be reactive"); this.valueEvaluationContextProvider = (ReactiveValueEvaluationContextProvider) valueContextProvider; } @@ -151,9 +154,10 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery { this.findOperationWithProjection = operations.query(type); this.updateOps = operations.update(type); - ValueEvaluationContextProvider valueContextProvider = valueExpressionDelegate.createValueContextProvider( - method.getParameters()); - Assert.isInstanceOf(ReactiveValueEvaluationContextProvider.class, valueContextProvider, "ValueEvaluationContextProvider must be reactive"); + ValueEvaluationContextProvider valueContextProvider = valueExpressionDelegate + .createValueContextProvider(method.getParameters()); + Assert.isInstanceOf(ReactiveValueEvaluationContextProvider.class, valueContextProvider, + "ValueEvaluationContextProvider must be reactive"); this.valueEvaluationContextProvider = (ReactiveValueEvaluationContextProvider) valueContextProvider; } @@ -182,14 +186,9 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery { ConvertingParameterAccessor accessor = new ConvertingParameterAccessor(operations.getConverter(), parameterAccessor); - TypeInformation returnType = method.getReturnType(); ResultProcessor processor = method.getResultProcessor().withDynamicProjection(accessor); Class typeToRead = processor.getReturnedType().getTypeToRead(); - if (typeToRead == null && returnType.getComponentType() != null) { - typeToRead = returnType.getComponentType().getType(); - } - return doExecute(method, processor, accessor, typeToRead); } @@ -221,11 +220,15 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery { String collection = method.getEntityInformation().getCollectionName(); ReactiveMongoQueryExecution execution = getExecution(accessor, - new ResultProcessingConverter(processor, operations, instantiators), find); + getResultProcessing(processor), find); return execution.execute(query, processor.getReturnedType().getDomainType(), collection); }); } + ResultProcessingConverter getResultProcessing(ResultProcessor processor) { + return new ResultProcessingConverter(processor, operations, instantiators); + } + /** * Returns the execution instance to use. * @@ -439,8 +442,8 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery { return getValueExpressionEvaluatorLater(dependencies, accessor).zipWith(Mono.just(codec)); } - private Document decode(Tuple2 expressionEvaluator, String source, MongoParameterAccessor accessor, - ParameterBindingDocumentCodec codec) { + private Document decode(Tuple2 expressionEvaluator, + String source, MongoParameterAccessor accessor, ParameterBindingDocumentCodec codec) { ParameterBindingContext bindingContext = new ParameterBindingContext(accessor::getBindableValue, expressionEvaluator.getT1()); @@ -490,8 +493,8 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery { @Override public T evaluate(String expressionString) { ValueExpression expression = valueExpressionDelegate.parse(expressionString); - ValueEvaluationContext evaluationContext = valueEvaluationContextProvider.getEvaluationContext(accessor.getValues(), - expression.getExpressionDependencies()); + ValueEvaluationContext evaluationContext = valueEvaluationContextProvider + .getEvaluationContext(accessor.getValues(), expression.getExpressionDependencies()); return (T) expression.evaluate(evaluationContext); } }; @@ -509,8 +512,9 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery { protected Mono getValueExpressionEvaluatorLater(ExpressionDependencies dependencies, MongoParameterAccessor accessor) { - return valueEvaluationContextProvider.getEvaluationContextLater(accessor.getValues(), dependencies) - .map(evaluationContext -> new ValueExpressionDelegateValueExpressionEvaluator(valueExpressionDelegate, valueExpression -> evaluationContext)); + return valueEvaluationContextProvider.getEvaluationContextLater(accessor.getValues(), dependencies) + .map(evaluationContext -> new ValueExpressionDelegateValueExpressionEvaluator(valueExpressionDelegate, + valueExpression -> evaluationContext)); } /** diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AggregationUtils.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AggregationUtils.java index bef165930..4d36b0b24 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AggregationUtils.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AggregationUtils.java @@ -17,6 +17,7 @@ package org.springframework.data.mongodb.repository.query; import java.time.Duration; import java.util.Map; +import java.util.function.Function; import java.util.function.IntUnaryOperator; import java.util.function.LongUnaryOperator; @@ -28,11 +29,18 @@ import org.springframework.data.mapping.model.ValueExpressionEvaluator; import org.springframework.data.mongodb.core.aggregation.Aggregation; import org.springframework.data.mongodb.core.aggregation.AggregationOptions; import org.springframework.data.mongodb.core.aggregation.AggregationPipeline; +import org.springframework.data.mongodb.core.aggregation.AggregationResults; +import org.springframework.data.mongodb.core.aggregation.TypedAggregation; import org.springframework.data.mongodb.core.convert.MongoConverter; import org.springframework.data.mongodb.core.mapping.FieldName; +import org.springframework.data.mongodb.core.mapping.MongoSimpleTypes; import org.springframework.data.mongodb.core.query.Collation; import org.springframework.data.mongodb.core.query.Meta; import org.springframework.data.mongodb.core.query.Query; +import org.springframework.data.repository.query.ResultProcessor; +import org.springframework.data.repository.query.ReturnedType; +import org.springframework.data.util.ReflectionUtils; +import org.springframework.data.util.TypeInformation; import org.springframework.lang.Nullable; import org.springframework.util.ClassUtils; import org.springframework.util.ObjectUtils; @@ -116,13 +124,15 @@ abstract class AggregationUtils { } /** - * If present apply the preference from the {@link org.springframework.data.mongodb.repository.ReadPreference} annotation. + * If present apply the preference from the {@link org.springframework.data.mongodb.repository.ReadPreference} + * annotation. * * @param builder must not be {@literal null}. * @return never {@literal null}. * @since 4.2 */ - static AggregationOptions.Builder applyReadPreference(AggregationOptions.Builder builder, MongoQueryMethod queryMethod) { + static AggregationOptions.Builder applyReadPreference(AggregationOptions.Builder builder, + MongoQueryMethod queryMethod) { if (!queryMethod.hasAnnotatedReadPreference()) { return builder; @@ -131,6 +141,93 @@ abstract class AggregationUtils { return builder.readPreference(ReadPreference.valueOf(queryMethod.getAnnotatedReadPreference())); } + static AggregationOptions computeOptions(MongoQueryMethod method, ConvertingParameterAccessor accessor, + AggregationPipeline pipeline, ValueExpressionEvaluator evaluator) { + + AggregationOptions.Builder builder = Aggregation.newAggregationOptions(); + + AggregationUtils.applyCollation(builder, method.getAnnotatedCollation(), accessor, evaluator); + AggregationUtils.applyMeta(builder, method); + AggregationUtils.applyHint(builder, method); + AggregationUtils.applyReadPreference(builder, method); + + TypeInformation returnType = method.getReturnType(); + if (returnType.getComponentType() != null) { + returnType = returnType.getRequiredComponentType(); + } + if (ReflectionUtils.isVoid(returnType.getType()) && pipeline.isOutOrMerge()) { + builder.skipOutput(); + } + + return builder.build(); + } + + /** + * Prepares the AggregationPipeline including type discovery and calling {@link AggregationCallback} to run the + * aggregation. + */ + @Nullable + static T doAggregate(AggregationPipeline pipeline, MongoQueryMethod method, ResultProcessor processor, + ConvertingParameterAccessor accessor, + Function evaluatorFunction, AggregationCallback callback) { + + Class sourceType = method.getDomainClass(); + ReturnedType returnedType = processor.getReturnedType(); + // 🙈Interface Projections do not happen on the Aggregation level but through our repository infrastructure. + // Non-projections and raw results (AggregationResults<…>) are handled here. Interface projections read a Document + // and DTO projections read the returned type. + // We also support simple return types (String) that are read from a Document + TypeInformation returnType = method.getReturnType(); + Class returnElementType = (returnType.getComponentType() != null ? returnType.getRequiredComponentType() + : returnType).getType(); + Class entityType; + + boolean isRawAggregationResult = ClassUtils.isAssignable(AggregationResults.class, method.getReturnedObjectType()); + + if (returnElementType.equals(Document.class)) { + entityType = sourceType; + } else { + entityType = returnElementType; + } + + AggregationUtils.appendSortIfPresent(pipeline, accessor, entityType); + + if (method.isSliceQuery()) { + AggregationUtils.appendLimitAndOffsetIfPresent(pipeline, accessor, LongUnaryOperator.identity(), + limit -> limit + 1); + } else { + AggregationUtils.appendLimitAndOffsetIfPresent(pipeline, accessor); + } + + AggregationOptions options = AggregationUtils.computeOptions(method, accessor, pipeline, + evaluatorFunction.apply(accessor)); + TypedAggregation aggregation = new TypedAggregation<>(sourceType, pipeline.getOperations(), options); + + boolean isSimpleReturnType = MongoSimpleTypes.HOLDER.isSimpleType(returnElementType); + Class typeToRead; + + if (isSimpleReturnType) { + typeToRead = Document.class; + } else if (isRawAggregationResult) { + typeToRead = returnElementType; + } else { + + if (returnedType.isProjecting()) { + typeToRead = returnedType.getReturnedType().isInterface() ? Document.class : returnedType.getReturnedType(); + } else { + typeToRead = entityType; + } + } + + return callback.doAggregate(aggregation, sourceType, typeToRead, returnElementType, isSimpleReturnType, + isRawAggregationResult); + } + + static AggregationPipeline computePipeline(AbstractMongoQuery mongoQuery, MongoQueryMethod method, + ConvertingParameterAccessor accessor) { + return new AggregationPipeline(mongoQuery.parseAggregationPipeline(method.getAnnotatedAggregation(), accessor)); + } + /** * Append {@code $sort} aggregation stage if {@link ConvertingParameterAccessor#getSort()} is present. * @@ -139,7 +236,7 @@ abstract class AggregationUtils { * @param targetType */ static void appendSortIfPresent(AggregationPipeline aggregationPipeline, ConvertingParameterAccessor accessor, - Class targetType) { + @Nullable Class targetType) { if (accessor.getSort().isUnsorted()) { return; @@ -254,4 +351,26 @@ abstract class AggregationUtils { return converter.getConversionService().convert(value, targetType); } + + /** + * Interface to invoke an aggregation along with source, intermediate, and target types. + * + * @param + */ + interface AggregationCallback { + + /** + * @param aggregation + * @param domainType + * @param typeToRead + * @param elementType + * @param simpleType whether the aggregation returns {@link Document} or a + * {@link org.springframework.data.mapping.model.SimpleTypeHolder simple type}. + * @param rawResult whether the aggregation returns {@link AggregationResults}. + * @return + */ + @Nullable + T doAggregate(TypedAggregation aggregation, Class domainType, Class typeToRead, Class elementType, + boolean simpleType, boolean rawResult); + } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/ReactiveStringBasedAggregation.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/ReactiveStringBasedAggregation.java index a74694d96..57c2c1083 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/ReactiveStringBasedAggregation.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/ReactiveStringBasedAggregation.java @@ -24,11 +24,8 @@ import org.bson.Document; import org.reactivestreams.Publisher; import org.springframework.data.mongodb.core.ReactiveMongoOperations; -import org.springframework.data.mongodb.core.aggregation.Aggregation; import org.springframework.data.mongodb.core.aggregation.AggregationOperation; -import org.springframework.data.mongodb.core.aggregation.AggregationOptions; import org.springframework.data.mongodb.core.aggregation.AggregationPipeline; -import org.springframework.data.mongodb.core.aggregation.TypedAggregation; import org.springframework.data.mongodb.core.convert.MongoConverter; import org.springframework.data.mongodb.core.mapping.MongoSimpleTypes; import org.springframework.data.mongodb.core.query.Query; @@ -36,9 +33,8 @@ import org.springframework.data.repository.query.ReactiveQueryMethodEvaluationCo import org.springframework.data.repository.query.ResultProcessor; import org.springframework.data.repository.query.ValueExpressionDelegate; import org.springframework.data.util.ReflectionUtils; -import org.springframework.data.util.TypeInformation; import org.springframework.expression.ExpressionParser; -import org.springframework.util.ClassUtils; +import org.springframework.lang.Nullable; /** * A reactive {@link org.springframework.data.repository.query.RepositoryQuery} to use a plain JSON String to create an @@ -87,46 +83,39 @@ public class ReactiveStringBasedAggregation extends AbstractReactiveMongoQuery { } @Override + @SuppressWarnings("ReactiveStreamsNullableInLambdaInTransform") protected Publisher doExecute(ReactiveMongoQueryMethod method, ResultProcessor processor, - ConvertingParameterAccessor accessor, Class typeToRead) { + ConvertingParameterAccessor accessor, @Nullable Class ignored) { return computePipeline(accessor).flatMapMany(it -> { - Class sourceType = method.getDomainClass(); - Class targetType = typeToRead; + return AggregationUtils.doAggregate(new AggregationPipeline(it), method, processor, accessor, + this::getValueExpressionEvaluator, + (aggregation, sourceType, typeToRead, elementType, simpleType, rawResult) -> { - AggregationPipeline pipeline = new AggregationPipeline(it); + Flux flux = reactiveMongoOperations.aggregate(aggregation, typeToRead); + if (ReflectionUtils.isVoid(elementType)) { + return flux.then(); + } - AggregationUtils.appendSortIfPresent(pipeline, accessor, typeToRead); - AggregationUtils.appendLimitAndOffsetIfPresent(pipeline, accessor); + ReactiveMongoQueryExecution.ResultProcessingConverter resultProcessing = getResultProcessing(processor); - boolean isSimpleReturnType = isSimpleReturnType(typeToRead); - boolean isRawReturnType = ClassUtils.isAssignable(org.bson.Document.class, typeToRead); + if (simpleType && !rawResult && !elementType.equals(Document.class)) { - if (isSimpleReturnType || isRawReturnType) { - targetType = Document.class; - } + flux = flux.handle((item, sink) -> { - AggregationOptions options = computeOptions(method, accessor, pipeline); - TypedAggregation aggregation = new TypedAggregation<>(sourceType, pipeline.getOperations(), options); + Object result = AggregationUtils.extractSimpleTypeResult((Document) item, elementType, mongoConverter); - Flux flux = reactiveMongoOperations.aggregate(aggregation, targetType); - if (ReflectionUtils.isVoid(typeToRead)) { - return flux.then(); - } + if (result != null) { + sink.next(result); + } + }); + } - if (isSimpleReturnType && !isRawReturnType) { - flux = flux.handle((item, sink) -> { + flux = flux.map(resultProcessing::convert); - Object result = AggregationUtils.extractSimpleTypeResult((Document) item, typeToRead, mongoConverter); - - if (result != null) { - sink.next(result); - } - }); - } - - return method.isCollectionQuery() ? flux : flux.next(); + return method.isCollectionQuery() ? flux : flux.next(); + }); }); } @@ -138,28 +127,6 @@ public class ReactiveStringBasedAggregation extends AbstractReactiveMongoQuery { return parseAggregationPipeline(getQueryMethod().getAnnotatedAggregation(), accessor); } - private AggregationOptions computeOptions(MongoQueryMethod method, ConvertingParameterAccessor accessor, - AggregationPipeline pipeline) { - - AggregationOptions.Builder builder = Aggregation.newAggregationOptions(); - - AggregationUtils.applyCollation(builder, method.getAnnotatedCollation(), accessor, - getValueExpressionEvaluator(accessor)); - AggregationUtils.applyMeta(builder, method); - AggregationUtils.applyHint(builder, method); - AggregationUtils.applyReadPreference(builder, method); - - TypeInformation returnType = method.getReturnType(); - if (returnType.getComponentType() != null) { - returnType = returnType.getRequiredComponentType(); - } - if (ReflectionUtils.isVoid(returnType.getType()) && pipeline.isOutOrMerge()) { - builder.skipOutput(); - } - - return builder.build(); - } - @Override protected Mono createQuery(ConvertingParameterAccessor accessor) { throw new UnsupportedOperationException("No query support for aggregation"); diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/StringBasedAggregation.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/StringBasedAggregation.java index c2ba74328..00fc79d4a 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/StringBasedAggregation.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/StringBasedAggregation.java @@ -17,7 +17,6 @@ package org.springframework.data.mongodb.repository.query; import java.util.ArrayList; import java.util.List; -import java.util.function.LongUnaryOperator; import java.util.stream.Stream; import org.bson.Document; @@ -26,11 +25,7 @@ import org.springframework.data.domain.Pageable; import org.springframework.data.domain.SliceImpl; import org.springframework.data.mongodb.InvalidMongoDbApiUsageException; import org.springframework.data.mongodb.core.MongoOperations; -import org.springframework.data.mongodb.core.aggregation.Aggregation; -import org.springframework.data.mongodb.core.aggregation.AggregationOptions; -import org.springframework.data.mongodb.core.aggregation.AggregationPipeline; import org.springframework.data.mongodb.core.aggregation.AggregationResults; -import org.springframework.data.mongodb.core.aggregation.TypedAggregation; import org.springframework.data.mongodb.core.convert.MongoConverter; import org.springframework.data.mongodb.core.mapping.MongoSimpleTypes; import org.springframework.data.mongodb.core.query.Query; @@ -41,7 +36,6 @@ import org.springframework.data.repository.query.ValueExpressionDelegate; import org.springframework.data.util.ReflectionUtils; import org.springframework.expression.ExpressionParser; import org.springframework.lang.Nullable; -import org.springframework.util.ClassUtils; /** * {@link AbstractMongoQuery} implementation to run string-based aggregations using @@ -103,87 +97,67 @@ public class StringBasedAggregation extends AbstractMongoQuery { this.mongoConverter = mongoOperations.getConverter(); } + @SuppressWarnings("unchecked") @Override @Nullable - protected Object doExecute(MongoQueryMethod method, ResultProcessor resultProcessor, - ConvertingParameterAccessor accessor, @Nullable Class typeToRead) { + protected Object doExecute(MongoQueryMethod method, ResultProcessor processor, ConvertingParameterAccessor accessor, + @Nullable Class ignore) { - Class sourceType = method.getDomainClass(); - Class targetType = typeToRead; + return AggregationUtils.doAggregate(AggregationUtils.computePipeline(this, method, accessor), method, processor, + accessor, this::getExpressionEvaluatorFor, + (aggregation, sourceType, typeToRead, elementType, simpleType, rawResult) -> { - AggregationPipeline pipeline = computePipeline(method, accessor); - AggregationUtils.appendSortIfPresent(pipeline, accessor, typeToRead); + if (method.isStreamQuery()) { - if (method.isSliceQuery()) { - AggregationUtils.appendLimitAndOffsetIfPresent(pipeline, accessor, LongUnaryOperator.identity(), - limit -> limit + 1); - } else { - AggregationUtils.appendLimitAndOffsetIfPresent(pipeline, accessor); - } - - boolean isSimpleReturnType = typeToRead != null && isSimpleReturnType(typeToRead); - boolean isRawAggregationResult = typeToRead != null && ClassUtils.isAssignable(AggregationResults.class, typeToRead); + Stream stream = mongoOperations.aggregateStream(aggregation, typeToRead); - if (isSimpleReturnType) { - targetType = Document.class; - } else if (isRawAggregationResult) { + if (!simpleType || elementType.equals(Document.class)) { + return stream; + } - // 🙈 - targetType = method.getReturnType().getRequiredActualType().getRequiredComponentType().getType(); - } else if (resultProcessor.getReturnedType().isProjecting()) { - targetType = resultProcessor.getReturnedType().getReturnedType().isInterface() ? Document.class :resultProcessor.getReturnedType().getReturnedType(); - } + return stream + .map(it -> AggregationUtils.extractSimpleTypeResult((Document) it, elementType, mongoConverter)); + } - AggregationOptions options = computeOptions(method, accessor, pipeline); - TypedAggregation aggregation = new TypedAggregation<>(sourceType, pipeline.getOperations(), options); + AggregationResults result = (AggregationResults) mongoOperations.aggregate(aggregation, + typeToRead); - if (method.isStreamQuery()) { + if (ReflectionUtils.isVoid(elementType)) { + return null; + } - Stream stream = mongoOperations.aggregateStream(aggregation, targetType); + if (rawResult) { + return result; + } - if (isSimpleReturnType) { - return stream.map(it -> AggregationUtils.extractSimpleTypeResult((Document) it, typeToRead, mongoConverter)); - } + List results = result.getMappedResults(); + if (method.isCollectionQuery()) { + return simpleType ? convertResults(elementType, (List) results) : results; + } - return stream; - } + if (method.isSliceQuery()) { - AggregationResults result = (AggregationResults) mongoOperations.aggregate(aggregation, targetType); - if (typeToRead != null && ReflectionUtils.isVoid(typeToRead)) { - return null; - } + Pageable pageable = accessor.getPageable(); + int pageSize = pageable.getPageSize(); + List resultsToUse = simpleType ? convertResults(elementType, (List) results) + : (List) results; + boolean hasNext = resultsToUse.size() > pageSize; + return new SliceImpl<>(hasNext ? resultsToUse.subList(0, pageSize) : resultsToUse, pageable, hasNext); + } - if (isRawAggregationResult) { - return result; - } - - List results = result.getMappedResults(); - if (method.isCollectionQuery()) { - return isSimpleReturnType ? convertResults(typeToRead, results) : results; - } - - if (method.isSliceQuery()) { - - Pageable pageable = accessor.getPageable(); - int pageSize = pageable.getPageSize(); - List resultsToUse = isSimpleReturnType ? convertResults(typeToRead, results) : results; - boolean hasNext = resultsToUse.size() > pageSize; - return new SliceImpl<>(hasNext ? resultsToUse.subList(0, pageSize) : resultsToUse, pageable, hasNext); - } + Object uniqueResult = result.getUniqueMappedResult(); - Object uniqueResult = result.getUniqueMappedResult(); - - return isSimpleReturnType - ? AggregationUtils.extractSimpleTypeResult((Document) uniqueResult, typeToRead, mongoConverter) - : uniqueResult; + return simpleType + ? AggregationUtils.extractSimpleTypeResult((Document) uniqueResult, elementType, mongoConverter) + : uniqueResult; + }); } - private List convertResults(Class typeToRead, List mappedResults) { + private List convertResults(Class targetType, List mappedResults) { List list = new ArrayList<>(mappedResults.size()); - for (Object it : mappedResults) { - Object extractSimpleTypeResult = AggregationUtils.extractSimpleTypeResult((Document) it, typeToRead, - mongoConverter); + for (Document it : mappedResults) { + Object extractSimpleTypeResult = AggregationUtils.extractSimpleTypeResult(it, targetType, mongoConverter); list.add(extractSimpleTypeResult); } return list; @@ -193,28 +167,6 @@ public class StringBasedAggregation extends AbstractMongoQuery { return MongoSimpleTypes.HOLDER.isSimpleType(targetType); } - AggregationPipeline computePipeline(MongoQueryMethod method, ConvertingParameterAccessor accessor) { - return new AggregationPipeline(parseAggregationPipeline(method.getAnnotatedAggregation(), accessor)); - } - - private AggregationOptions computeOptions(MongoQueryMethod method, ConvertingParameterAccessor accessor, - AggregationPipeline pipeline) { - - AggregationOptions.Builder builder = Aggregation.newAggregationOptions(); - - AggregationUtils.applyCollation(builder, method.getAnnotatedCollation(), accessor, - getExpressionEvaluatorFor(accessor)); - AggregationUtils.applyMeta(builder, method); - AggregationUtils.applyHint(builder, method); - AggregationUtils.applyReadPreference(builder, method); - - if (ReflectionUtils.isVoid(method.getReturnType().getType()) && pipeline.isOutOrMerge()) { - builder.skipOutput(); - } - - return builder.build(); - } - @Override protected Query createQuery(ConvertingParameterAccessor accessor) { throw new UnsupportedOperationException("No query support for aggregation"); diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/AbstractPersonRepositoryIntegrationTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/AbstractPersonRepositoryIntegrationTests.java index 12f782195..247caada0 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/AbstractPersonRepositoryIntegrationTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/AbstractPersonRepositoryIntegrationTests.java @@ -1396,6 +1396,18 @@ public abstract class AbstractPersonRepositoryIntegrationTests implements Dirtie } } + @Test // DATAMONGO-4841 + void annotatedAggregationStreamWithPlaceholderValue() { + + assertThat(repository.groupStreamByLastnameAnd("firstname")) + .contains(new PersonAggregate("Lessard", Collections.singletonList("Stefan"))) // + .contains(new PersonAggregate("Keys", Collections.singletonList("Alicia"))) // + .contains(new PersonAggregate("Tinsley", Collections.singletonList("Boyd"))) // + .contains(new PersonAggregate("Beauford", Collections.singletonList("Carter"))) // + .contains(new PersonAggregate("Moore", Collections.singletonList("Leroi"))) // + .contains(new PersonAggregate("Matthews", Arrays.asList("Dave", "Oliver August"))); + } + @Test // DATAMONGO-2153 void annotatedAggregationWithPlaceholderValue() { diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepository.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepository.java index de8b1f569..0f731535e 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepository.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepository.java @@ -27,10 +27,10 @@ import org.springframework.data.domain.Limit; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Range; -import org.springframework.data.domain.Window; import org.springframework.data.domain.ScrollPosition; import org.springframework.data.domain.Slice; import org.springframework.data.domain.Sort; +import org.springframework.data.domain.Window; import org.springframework.data.geo.Box; import org.springframework.data.geo.Circle; import org.springframework.data.geo.Distance; @@ -413,6 +413,9 @@ public interface PersonRepository extends MongoRepository, Query @Aggregation("{ '$project': { '_id' : '$lastname' } }") Stream findAllLastnamesAsStream(); + @Aggregation("{ '$group': { '_id' : '$lastname', names : { $addToSet : '$?0' } } }") + Stream groupStreamByLastnameAnd(String property); + @Aggregation("{ '$group': { '_id' : '$lastname', names : { $addToSet : '$?0' } } }") List groupByLastnameAnd(String property); diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactiveMongoRepositoryTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactiveMongoRepositoryTests.java index fc916b49d..e692d8627 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactiveMongoRepositoryTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactiveMongoRepositoryTests.java @@ -600,6 +600,17 @@ class ReactiveMongoRepositoryTests implements DirtiesStateExtension.StateFunctio }).verifyComplete(); } + @Test // GH-4839 + void annotatedAggregationWithAggregationResultAsClosedInterfaceProjection() { + + repository.findAggregatedClosedInterfaceProjectionBy() // + .as(StepVerifier::create) // + .consumeNextWith(it -> { + assertThat(it.getFirstname()).isIn(dave.getFirstname(), oliver.getFirstname()); + assertThat(it.getLastname()).isEqualTo(dave.getLastname()); + }).expectNextCount(1).verifyComplete(); + } + @Test // DATAMONGO-2403 @DirtiesState void annotatedAggregationExtractingSimpleValueIsEmptyForEmptyDocument() { @@ -816,6 +827,10 @@ class ReactiveMongoRepositoryTests implements DirtiesStateExtension.StateFunctio @Aggregation(pipeline = "{ '$group' : { '_id' : null, 'total' : { $sum: '$age' } } }") Mono sumAgeAndReturnSumAsMap(); + @Aggregation({ "{ '$match' : { 'lastname' : 'Matthews'} }", + "{ '$project': { _id : 0, firstname : 1, lastname : 1 } }" }) + Flux findAggregatedClosedInterfaceProjectionBy(); + @Aggregation( pipeline = { "{ '$match' : { 'firstname' : '?0' } }", "{ '$project' : { '_id' : 0, 'lastname' : 1 } }" }) Mono projectToLastnameAndRemoveId(String firstname); diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactivePersonRepository.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactivePersonRepository.java index 126a168b0..45c5f009b 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactivePersonRepository.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactivePersonRepository.java @@ -32,4 +32,5 @@ public interface ReactivePersonRepository extends ReactiveMongoRepository findByLastname(String lastname); + }