Browse Source

Refactor String-based repository aggretation methods into common utility callback.

Also, support aggregation result projections for reactive flows.

See #4839
Original pull request: #4841
4.4.x
Mark Paluch 1 year ago
parent
commit
c8e13ae7e2
No known key found for this signature in database
GPG Key ID: 55BC6374BAA9D973
  1. 46
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractReactiveMongoQuery.java
  2. 125
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AggregationUtils.java
  3. 77
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/ReactiveStringBasedAggregation.java
  4. 132
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/StringBasedAggregation.java
  5. 12
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/AbstractPersonRepositoryIntegrationTests.java
  6. 5
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepository.java
  7. 15
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactiveMongoRepositoryTests.java
  8. 1
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactivePersonRepository.java

46
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractReactiveMongoQuery.java

@ -17,6 +17,7 @@ package org.springframework.data.mongodb.repository.query; @@ -17,6 +17,7 @@ package org.springframework.data.mongodb.repository.query;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import java.util.ArrayList;
import java.util.List;
@ -61,7 +62,6 @@ import org.springframework.data.repository.query.RepositoryQuery; @@ -61,7 +62,6 @@ import org.springframework.data.repository.query.RepositoryQuery;
import org.springframework.data.repository.query.ResultProcessor;
import org.springframework.data.repository.query.ValueExpressionDelegate;
import org.springframework.data.spel.ExpressionDependencies;
import org.springframework.data.util.TypeInformation;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.lang.Nullable;
@ -70,7 +70,6 @@ import org.springframework.util.ObjectUtils; @@ -70,7 +70,6 @@ import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import com.mongodb.MongoClientSettings;
import reactor.util.function.Tuple2;
/**
* Base class for reactive {@link RepositoryQuery} implementations for MongoDB.
@ -112,16 +111,20 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery { @@ -112,16 +111,20 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery {
this.method = method;
this.operations = operations;
this.instantiators = new EntityInstantiators();
this.valueExpressionDelegate = new ValueExpressionDelegate(new QueryMethodValueEvaluationContextAccessor(new StandardEnvironment(), evaluationContextProvider.getEvaluationContextProvider()), ValueExpressionParser.create(() -> expressionParser));
this.valueExpressionDelegate = new ValueExpressionDelegate(
new QueryMethodValueEvaluationContextAccessor(new StandardEnvironment(),
evaluationContextProvider.getEvaluationContextProvider()),
ValueExpressionParser.create(() -> expressionParser));
MongoEntityMetadata<?> metadata = method.getEntityInformation();
Class<?> type = metadata.getCollectionEntity().getType();
this.findOperationWithProjection = operations.query(type);
this.updateOps = operations.update(type);
ValueEvaluationContextProvider valueContextProvider = valueExpressionDelegate.createValueContextProvider(
method.getParameters());
Assert.isInstanceOf(ReactiveValueEvaluationContextProvider.class, valueContextProvider, "ValueEvaluationContextProvider must be reactive");
ValueEvaluationContextProvider valueContextProvider = valueExpressionDelegate
.createValueContextProvider(method.getParameters());
Assert.isInstanceOf(ReactiveValueEvaluationContextProvider.class, valueContextProvider,
"ValueEvaluationContextProvider must be reactive");
this.valueEvaluationContextProvider = (ReactiveValueEvaluationContextProvider) valueContextProvider;
}
@ -151,9 +154,10 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery { @@ -151,9 +154,10 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery {
this.findOperationWithProjection = operations.query(type);
this.updateOps = operations.update(type);
ValueEvaluationContextProvider valueContextProvider = valueExpressionDelegate.createValueContextProvider(
method.getParameters());
Assert.isInstanceOf(ReactiveValueEvaluationContextProvider.class, valueContextProvider, "ValueEvaluationContextProvider must be reactive");
ValueEvaluationContextProvider valueContextProvider = valueExpressionDelegate
.createValueContextProvider(method.getParameters());
Assert.isInstanceOf(ReactiveValueEvaluationContextProvider.class, valueContextProvider,
"ValueEvaluationContextProvider must be reactive");
this.valueEvaluationContextProvider = (ReactiveValueEvaluationContextProvider) valueContextProvider;
}
@ -182,14 +186,9 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery { @@ -182,14 +186,9 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery {
ConvertingParameterAccessor accessor = new ConvertingParameterAccessor(operations.getConverter(),
parameterAccessor);
TypeInformation<?> returnType = method.getReturnType();
ResultProcessor processor = method.getResultProcessor().withDynamicProjection(accessor);
Class<?> typeToRead = processor.getReturnedType().getTypeToRead();
if (typeToRead == null && returnType.getComponentType() != null) {
typeToRead = returnType.getComponentType().getType();
}
return doExecute(method, processor, accessor, typeToRead);
}
@ -221,11 +220,15 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery { @@ -221,11 +220,15 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery {
String collection = method.getEntityInformation().getCollectionName();
ReactiveMongoQueryExecution execution = getExecution(accessor,
new ResultProcessingConverter(processor, operations, instantiators), find);
getResultProcessing(processor), find);
return execution.execute(query, processor.getReturnedType().getDomainType(), collection);
});
}
ResultProcessingConverter getResultProcessing(ResultProcessor processor) {
return new ResultProcessingConverter(processor, operations, instantiators);
}
/**
* Returns the execution instance to use.
*
@ -439,8 +442,8 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery { @@ -439,8 +442,8 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery {
return getValueExpressionEvaluatorLater(dependencies, accessor).zipWith(Mono.just(codec));
}
private Document decode(Tuple2<ValueExpressionEvaluator, ParameterBindingDocumentCodec> expressionEvaluator, String source, MongoParameterAccessor accessor,
ParameterBindingDocumentCodec codec) {
private Document decode(Tuple2<ValueExpressionEvaluator, ParameterBindingDocumentCodec> expressionEvaluator,
String source, MongoParameterAccessor accessor, ParameterBindingDocumentCodec codec) {
ParameterBindingContext bindingContext = new ParameterBindingContext(accessor::getBindableValue,
expressionEvaluator.getT1());
@ -490,8 +493,8 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery { @@ -490,8 +493,8 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery {
@Override
public <T> T evaluate(String expressionString) {
ValueExpression expression = valueExpressionDelegate.parse(expressionString);
ValueEvaluationContext evaluationContext = valueEvaluationContextProvider.getEvaluationContext(accessor.getValues(),
expression.getExpressionDependencies());
ValueEvaluationContext evaluationContext = valueEvaluationContextProvider
.getEvaluationContext(accessor.getValues(), expression.getExpressionDependencies());
return (T) expression.evaluate(evaluationContext);
}
};
@ -509,8 +512,9 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery { @@ -509,8 +512,9 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery {
protected Mono<ValueExpressionEvaluator> getValueExpressionEvaluatorLater(ExpressionDependencies dependencies,
MongoParameterAccessor accessor) {
return valueEvaluationContextProvider.getEvaluationContextLater(accessor.getValues(), dependencies)
.map(evaluationContext -> new ValueExpressionDelegateValueExpressionEvaluator(valueExpressionDelegate, valueExpression -> evaluationContext));
return valueEvaluationContextProvider.getEvaluationContextLater(accessor.getValues(), dependencies)
.map(evaluationContext -> new ValueExpressionDelegateValueExpressionEvaluator(valueExpressionDelegate,
valueExpression -> evaluationContext));
}
/**

125
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AggregationUtils.java

@ -17,6 +17,7 @@ package org.springframework.data.mongodb.repository.query; @@ -17,6 +17,7 @@ package org.springframework.data.mongodb.repository.query;
import java.time.Duration;
import java.util.Map;
import java.util.function.Function;
import java.util.function.IntUnaryOperator;
import java.util.function.LongUnaryOperator;
@ -28,11 +29,18 @@ import org.springframework.data.mapping.model.ValueExpressionEvaluator; @@ -28,11 +29,18 @@ import org.springframework.data.mapping.model.ValueExpressionEvaluator;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
import org.springframework.data.mongodb.core.aggregation.AggregationPipeline;
import org.springframework.data.mongodb.core.aggregation.AggregationResults;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.mapping.FieldName;
import org.springframework.data.mongodb.core.mapping.MongoSimpleTypes;
import org.springframework.data.mongodb.core.query.Collation;
import org.springframework.data.mongodb.core.query.Meta;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.repository.query.ResultProcessor;
import org.springframework.data.repository.query.ReturnedType;
import org.springframework.data.util.ReflectionUtils;
import org.springframework.data.util.TypeInformation;
import org.springframework.lang.Nullable;
import org.springframework.util.ClassUtils;
import org.springframework.util.ObjectUtils;
@ -116,13 +124,15 @@ abstract class AggregationUtils { @@ -116,13 +124,15 @@ abstract class AggregationUtils {
}
/**
* If present apply the preference from the {@link org.springframework.data.mongodb.repository.ReadPreference} annotation.
* If present apply the preference from the {@link org.springframework.data.mongodb.repository.ReadPreference}
* annotation.
*
* @param builder must not be {@literal null}.
* @return never {@literal null}.
* @since 4.2
*/
static AggregationOptions.Builder applyReadPreference(AggregationOptions.Builder builder, MongoQueryMethod queryMethod) {
static AggregationOptions.Builder applyReadPreference(AggregationOptions.Builder builder,
MongoQueryMethod queryMethod) {
if (!queryMethod.hasAnnotatedReadPreference()) {
return builder;
@ -131,6 +141,93 @@ abstract class AggregationUtils { @@ -131,6 +141,93 @@ abstract class AggregationUtils {
return builder.readPreference(ReadPreference.valueOf(queryMethod.getAnnotatedReadPreference()));
}
static AggregationOptions computeOptions(MongoQueryMethod method, ConvertingParameterAccessor accessor,
AggregationPipeline pipeline, ValueExpressionEvaluator evaluator) {
AggregationOptions.Builder builder = Aggregation.newAggregationOptions();
AggregationUtils.applyCollation(builder, method.getAnnotatedCollation(), accessor, evaluator);
AggregationUtils.applyMeta(builder, method);
AggregationUtils.applyHint(builder, method);
AggregationUtils.applyReadPreference(builder, method);
TypeInformation<?> returnType = method.getReturnType();
if (returnType.getComponentType() != null) {
returnType = returnType.getRequiredComponentType();
}
if (ReflectionUtils.isVoid(returnType.getType()) && pipeline.isOutOrMerge()) {
builder.skipOutput();
}
return builder.build();
}
/**
* Prepares the AggregationPipeline including type discovery and calling {@link AggregationCallback} to run the
* aggregation.
*/
@Nullable
static <T> T doAggregate(AggregationPipeline pipeline, MongoQueryMethod method, ResultProcessor processor,
ConvertingParameterAccessor accessor,
Function<MongoParameterAccessor, ValueExpressionEvaluator> evaluatorFunction, AggregationCallback<T> callback) {
Class<?> sourceType = method.getDomainClass();
ReturnedType returnedType = processor.getReturnedType();
// 🙈Interface Projections do not happen on the Aggregation level but through our repository infrastructure.
// Non-projections and raw results (AggregationResults<…>) are handled here. Interface projections read a Document
// and DTO projections read the returned type.
// We also support simple return types (String) that are read from a Document
TypeInformation<?> returnType = method.getReturnType();
Class<?> returnElementType = (returnType.getComponentType() != null ? returnType.getRequiredComponentType()
: returnType).getType();
Class<?> entityType;
boolean isRawAggregationResult = ClassUtils.isAssignable(AggregationResults.class, method.getReturnedObjectType());
if (returnElementType.equals(Document.class)) {
entityType = sourceType;
} else {
entityType = returnElementType;
}
AggregationUtils.appendSortIfPresent(pipeline, accessor, entityType);
if (method.isSliceQuery()) {
AggregationUtils.appendLimitAndOffsetIfPresent(pipeline, accessor, LongUnaryOperator.identity(),
limit -> limit + 1);
} else {
AggregationUtils.appendLimitAndOffsetIfPresent(pipeline, accessor);
}
AggregationOptions options = AggregationUtils.computeOptions(method, accessor, pipeline,
evaluatorFunction.apply(accessor));
TypedAggregation<?> aggregation = new TypedAggregation<>(sourceType, pipeline.getOperations(), options);
boolean isSimpleReturnType = MongoSimpleTypes.HOLDER.isSimpleType(returnElementType);
Class<?> typeToRead;
if (isSimpleReturnType) {
typeToRead = Document.class;
} else if (isRawAggregationResult) {
typeToRead = returnElementType;
} else {
if (returnedType.isProjecting()) {
typeToRead = returnedType.getReturnedType().isInterface() ? Document.class : returnedType.getReturnedType();
} else {
typeToRead = entityType;
}
}
return callback.doAggregate(aggregation, sourceType, typeToRead, returnElementType, isSimpleReturnType,
isRawAggregationResult);
}
static AggregationPipeline computePipeline(AbstractMongoQuery mongoQuery, MongoQueryMethod method,
ConvertingParameterAccessor accessor) {
return new AggregationPipeline(mongoQuery.parseAggregationPipeline(method.getAnnotatedAggregation(), accessor));
}
/**
* Append {@code $sort} aggregation stage if {@link ConvertingParameterAccessor#getSort()} is present.
*
@ -139,7 +236,7 @@ abstract class AggregationUtils { @@ -139,7 +236,7 @@ abstract class AggregationUtils {
* @param targetType
*/
static void appendSortIfPresent(AggregationPipeline aggregationPipeline, ConvertingParameterAccessor accessor,
Class<?> targetType) {
@Nullable Class<?> targetType) {
if (accessor.getSort().isUnsorted()) {
return;
@ -254,4 +351,26 @@ abstract class AggregationUtils { @@ -254,4 +351,26 @@ abstract class AggregationUtils {
return converter.getConversionService().convert(value, targetType);
}
/**
* Interface to invoke an aggregation along with source, intermediate, and target types.
*
* @param <T>
*/
interface AggregationCallback<T> {
/**
* @param aggregation
* @param domainType
* @param typeToRead
* @param elementType
* @param simpleType whether the aggregation returns {@link Document} or a
* {@link org.springframework.data.mapping.model.SimpleTypeHolder simple type}.
* @param rawResult whether the aggregation returns {@link AggregationResults}.
* @return
*/
@Nullable
T doAggregate(TypedAggregation<?> aggregation, Class<?> domainType, Class<?> typeToRead, Class<?> elementType,
boolean simpleType, boolean rawResult);
}
}

77
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/ReactiveStringBasedAggregation.java

@ -24,11 +24,8 @@ import org.bson.Document; @@ -24,11 +24,8 @@ import org.bson.Document;
import org.reactivestreams.Publisher;
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
import org.springframework.data.mongodb.core.aggregation.AggregationPipeline;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.mapping.MongoSimpleTypes;
import org.springframework.data.mongodb.core.query.Query;
@ -36,9 +33,8 @@ import org.springframework.data.repository.query.ReactiveQueryMethodEvaluationCo @@ -36,9 +33,8 @@ import org.springframework.data.repository.query.ReactiveQueryMethodEvaluationCo
import org.springframework.data.repository.query.ResultProcessor;
import org.springframework.data.repository.query.ValueExpressionDelegate;
import org.springframework.data.util.ReflectionUtils;
import org.springframework.data.util.TypeInformation;
import org.springframework.expression.ExpressionParser;
import org.springframework.util.ClassUtils;
import org.springframework.lang.Nullable;
/**
* A reactive {@link org.springframework.data.repository.query.RepositoryQuery} to use a plain JSON String to create an
@ -87,46 +83,39 @@ public class ReactiveStringBasedAggregation extends AbstractReactiveMongoQuery { @@ -87,46 +83,39 @@ public class ReactiveStringBasedAggregation extends AbstractReactiveMongoQuery {
}
@Override
@SuppressWarnings("ReactiveStreamsNullableInLambdaInTransform")
protected Publisher<Object> doExecute(ReactiveMongoQueryMethod method, ResultProcessor processor,
ConvertingParameterAccessor accessor, Class<?> typeToRead) {
ConvertingParameterAccessor accessor, @Nullable Class<?> ignored) {
return computePipeline(accessor).flatMapMany(it -> {
Class<?> sourceType = method.getDomainClass();
Class<?> targetType = typeToRead;
return AggregationUtils.doAggregate(new AggregationPipeline(it), method, processor, accessor,
this::getValueExpressionEvaluator,
(aggregation, sourceType, typeToRead, elementType, simpleType, rawResult) -> {
AggregationPipeline pipeline = new AggregationPipeline(it);
Flux<?> flux = reactiveMongoOperations.aggregate(aggregation, typeToRead);
if (ReflectionUtils.isVoid(elementType)) {
return flux.then();
}
AggregationUtils.appendSortIfPresent(pipeline, accessor, typeToRead);
AggregationUtils.appendLimitAndOffsetIfPresent(pipeline, accessor);
ReactiveMongoQueryExecution.ResultProcessingConverter resultProcessing = getResultProcessing(processor);
boolean isSimpleReturnType = isSimpleReturnType(typeToRead);
boolean isRawReturnType = ClassUtils.isAssignable(org.bson.Document.class, typeToRead);
if (simpleType && !rawResult && !elementType.equals(Document.class)) {
if (isSimpleReturnType || isRawReturnType) {
targetType = Document.class;
}
flux = flux.handle((item, sink) -> {
AggregationOptions options = computeOptions(method, accessor, pipeline);
TypedAggregation<?> aggregation = new TypedAggregation<>(sourceType, pipeline.getOperations(), options);
Object result = AggregationUtils.extractSimpleTypeResult((Document) item, elementType, mongoConverter);
Flux<?> flux = reactiveMongoOperations.aggregate(aggregation, targetType);
if (ReflectionUtils.isVoid(typeToRead)) {
return flux.then();
}
if (result != null) {
sink.next(result);
}
});
}
if (isSimpleReturnType && !isRawReturnType) {
flux = flux.handle((item, sink) -> {
flux = flux.map(resultProcessing::convert);
Object result = AggregationUtils.extractSimpleTypeResult((Document) item, typeToRead, mongoConverter);
if (result != null) {
sink.next(result);
}
});
}
return method.isCollectionQuery() ? flux : flux.next();
return method.isCollectionQuery() ? flux : flux.next();
});
});
}
@ -138,28 +127,6 @@ public class ReactiveStringBasedAggregation extends AbstractReactiveMongoQuery { @@ -138,28 +127,6 @@ public class ReactiveStringBasedAggregation extends AbstractReactiveMongoQuery {
return parseAggregationPipeline(getQueryMethod().getAnnotatedAggregation(), accessor);
}
private AggregationOptions computeOptions(MongoQueryMethod method, ConvertingParameterAccessor accessor,
AggregationPipeline pipeline) {
AggregationOptions.Builder builder = Aggregation.newAggregationOptions();
AggregationUtils.applyCollation(builder, method.getAnnotatedCollation(), accessor,
getValueExpressionEvaluator(accessor));
AggregationUtils.applyMeta(builder, method);
AggregationUtils.applyHint(builder, method);
AggregationUtils.applyReadPreference(builder, method);
TypeInformation<?> returnType = method.getReturnType();
if (returnType.getComponentType() != null) {
returnType = returnType.getRequiredComponentType();
}
if (ReflectionUtils.isVoid(returnType.getType()) && pipeline.isOutOrMerge()) {
builder.skipOutput();
}
return builder.build();
}
@Override
protected Mono<Query> createQuery(ConvertingParameterAccessor accessor) {
throw new UnsupportedOperationException("No query support for aggregation");

132
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/StringBasedAggregation.java

@ -17,7 +17,6 @@ package org.springframework.data.mongodb.repository.query; @@ -17,7 +17,6 @@ package org.springframework.data.mongodb.repository.query;
import java.util.ArrayList;
import java.util.List;
import java.util.function.LongUnaryOperator;
import java.util.stream.Stream;
import org.bson.Document;
@ -26,11 +25,7 @@ import org.springframework.data.domain.Pageable; @@ -26,11 +25,7 @@ import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.SliceImpl;
import org.springframework.data.mongodb.InvalidMongoDbApiUsageException;
import org.springframework.data.mongodb.core.MongoOperations;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
import org.springframework.data.mongodb.core.aggregation.AggregationPipeline;
import org.springframework.data.mongodb.core.aggregation.AggregationResults;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.mapping.MongoSimpleTypes;
import org.springframework.data.mongodb.core.query.Query;
@ -41,7 +36,6 @@ import org.springframework.data.repository.query.ValueExpressionDelegate; @@ -41,7 +36,6 @@ import org.springframework.data.repository.query.ValueExpressionDelegate;
import org.springframework.data.util.ReflectionUtils;
import org.springframework.expression.ExpressionParser;
import org.springframework.lang.Nullable;
import org.springframework.util.ClassUtils;
/**
* {@link AbstractMongoQuery} implementation to run string-based aggregations using
@ -103,87 +97,67 @@ public class StringBasedAggregation extends AbstractMongoQuery { @@ -103,87 +97,67 @@ public class StringBasedAggregation extends AbstractMongoQuery {
this.mongoConverter = mongoOperations.getConverter();
}
@SuppressWarnings("unchecked")
@Override
@Nullable
protected Object doExecute(MongoQueryMethod method, ResultProcessor resultProcessor,
ConvertingParameterAccessor accessor, @Nullable Class<?> typeToRead) {
protected Object doExecute(MongoQueryMethod method, ResultProcessor processor, ConvertingParameterAccessor accessor,
@Nullable Class<?> ignore) {
Class<?> sourceType = method.getDomainClass();
Class<?> targetType = typeToRead;
return AggregationUtils.doAggregate(AggregationUtils.computePipeline(this, method, accessor), method, processor,
accessor, this::getExpressionEvaluatorFor,
(aggregation, sourceType, typeToRead, elementType, simpleType, rawResult) -> {
AggregationPipeline pipeline = computePipeline(method, accessor);
AggregationUtils.appendSortIfPresent(pipeline, accessor, typeToRead);
if (method.isStreamQuery()) {
if (method.isSliceQuery()) {
AggregationUtils.appendLimitAndOffsetIfPresent(pipeline, accessor, LongUnaryOperator.identity(),
limit -> limit + 1);
} else {
AggregationUtils.appendLimitAndOffsetIfPresent(pipeline, accessor);
}
boolean isSimpleReturnType = typeToRead != null && isSimpleReturnType(typeToRead);
boolean isRawAggregationResult = typeToRead != null && ClassUtils.isAssignable(AggregationResults.class, typeToRead);
Stream<?> stream = mongoOperations.aggregateStream(aggregation, typeToRead);
if (isSimpleReturnType) {
targetType = Document.class;
} else if (isRawAggregationResult) {
if (!simpleType || elementType.equals(Document.class)) {
return stream;
}
// 🙈
targetType = method.getReturnType().getRequiredActualType().getRequiredComponentType().getType();
} else if (resultProcessor.getReturnedType().isProjecting()) {
targetType = resultProcessor.getReturnedType().getReturnedType().isInterface() ? Document.class :resultProcessor.getReturnedType().getReturnedType();
}
return stream
.map(it -> AggregationUtils.extractSimpleTypeResult((Document) it, elementType, mongoConverter));
}
AggregationOptions options = computeOptions(method, accessor, pipeline);
TypedAggregation<?> aggregation = new TypedAggregation<>(sourceType, pipeline.getOperations(), options);
AggregationResults<Object> result = (AggregationResults<Object>) mongoOperations.aggregate(aggregation,
typeToRead);
if (method.isStreamQuery()) {
if (ReflectionUtils.isVoid(elementType)) {
return null;
}
Stream<?> stream = mongoOperations.aggregateStream(aggregation, targetType);
if (rawResult) {
return result;
}
if (isSimpleReturnType) {
return stream.map(it -> AggregationUtils.extractSimpleTypeResult((Document) it, typeToRead, mongoConverter));
}
List<?> results = result.getMappedResults();
if (method.isCollectionQuery()) {
return simpleType ? convertResults(elementType, (List<Document>) results) : results;
}
return stream;
}
if (method.isSliceQuery()) {
AggregationResults<Object> result = (AggregationResults<Object>) mongoOperations.aggregate(aggregation, targetType);
if (typeToRead != null && ReflectionUtils.isVoid(typeToRead)) {
return null;
}
Pageable pageable = accessor.getPageable();
int pageSize = pageable.getPageSize();
List<Object> resultsToUse = simpleType ? convertResults(elementType, (List<Document>) results)
: (List<Object>) results;
boolean hasNext = resultsToUse.size() > pageSize;
return new SliceImpl<>(hasNext ? resultsToUse.subList(0, pageSize) : resultsToUse, pageable, hasNext);
}
if (isRawAggregationResult) {
return result;
}
List<Object> results = result.getMappedResults();
if (method.isCollectionQuery()) {
return isSimpleReturnType ? convertResults(typeToRead, results) : results;
}
if (method.isSliceQuery()) {
Pageable pageable = accessor.getPageable();
int pageSize = pageable.getPageSize();
List<Object> resultsToUse = isSimpleReturnType ? convertResults(typeToRead, results) : results;
boolean hasNext = resultsToUse.size() > pageSize;
return new SliceImpl<>(hasNext ? resultsToUse.subList(0, pageSize) : resultsToUse, pageable, hasNext);
}
Object uniqueResult = result.getUniqueMappedResult();
Object uniqueResult = result.getUniqueMappedResult();
return isSimpleReturnType
? AggregationUtils.extractSimpleTypeResult((Document) uniqueResult, typeToRead, mongoConverter)
: uniqueResult;
return simpleType
? AggregationUtils.extractSimpleTypeResult((Document) uniqueResult, elementType, mongoConverter)
: uniqueResult;
});
}
private List<Object> convertResults(Class<?> typeToRead, List<Object> mappedResults) {
private List<Object> convertResults(Class<?> targetType, List<Document> mappedResults) {
List<Object> list = new ArrayList<>(mappedResults.size());
for (Object it : mappedResults) {
Object extractSimpleTypeResult = AggregationUtils.extractSimpleTypeResult((Document) it, typeToRead,
mongoConverter);
for (Document it : mappedResults) {
Object extractSimpleTypeResult = AggregationUtils.extractSimpleTypeResult(it, targetType, mongoConverter);
list.add(extractSimpleTypeResult);
}
return list;
@ -193,28 +167,6 @@ public class StringBasedAggregation extends AbstractMongoQuery { @@ -193,28 +167,6 @@ public class StringBasedAggregation extends AbstractMongoQuery {
return MongoSimpleTypes.HOLDER.isSimpleType(targetType);
}
AggregationPipeline computePipeline(MongoQueryMethod method, ConvertingParameterAccessor accessor) {
return new AggregationPipeline(parseAggregationPipeline(method.getAnnotatedAggregation(), accessor));
}
private AggregationOptions computeOptions(MongoQueryMethod method, ConvertingParameterAccessor accessor,
AggregationPipeline pipeline) {
AggregationOptions.Builder builder = Aggregation.newAggregationOptions();
AggregationUtils.applyCollation(builder, method.getAnnotatedCollation(), accessor,
getExpressionEvaluatorFor(accessor));
AggregationUtils.applyMeta(builder, method);
AggregationUtils.applyHint(builder, method);
AggregationUtils.applyReadPreference(builder, method);
if (ReflectionUtils.isVoid(method.getReturnType().getType()) && pipeline.isOutOrMerge()) {
builder.skipOutput();
}
return builder.build();
}
@Override
protected Query createQuery(ConvertingParameterAccessor accessor) {
throw new UnsupportedOperationException("No query support for aggregation");

12
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/AbstractPersonRepositoryIntegrationTests.java

@ -1396,6 +1396,18 @@ public abstract class AbstractPersonRepositoryIntegrationTests implements Dirtie @@ -1396,6 +1396,18 @@ public abstract class AbstractPersonRepositoryIntegrationTests implements Dirtie
}
}
@Test // DATAMONGO-4841
void annotatedAggregationStreamWithPlaceholderValue() {
assertThat(repository.groupStreamByLastnameAnd("firstname"))
.contains(new PersonAggregate("Lessard", Collections.singletonList("Stefan"))) //
.contains(new PersonAggregate("Keys", Collections.singletonList("Alicia"))) //
.contains(new PersonAggregate("Tinsley", Collections.singletonList("Boyd"))) //
.contains(new PersonAggregate("Beauford", Collections.singletonList("Carter"))) //
.contains(new PersonAggregate("Moore", Collections.singletonList("Leroi"))) //
.contains(new PersonAggregate("Matthews", Arrays.asList("Dave", "Oliver August")));
}
@Test // DATAMONGO-2153
void annotatedAggregationWithPlaceholderValue() {

5
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepository.java

@ -27,10 +27,10 @@ import org.springframework.data.domain.Limit; @@ -27,10 +27,10 @@ import org.springframework.data.domain.Limit;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Range;
import org.springframework.data.domain.Window;
import org.springframework.data.domain.ScrollPosition;
import org.springframework.data.domain.Slice;
import org.springframework.data.domain.Sort;
import org.springframework.data.domain.Window;
import org.springframework.data.geo.Box;
import org.springframework.data.geo.Circle;
import org.springframework.data.geo.Distance;
@ -413,6 +413,9 @@ public interface PersonRepository extends MongoRepository<Person, String>, Query @@ -413,6 +413,9 @@ public interface PersonRepository extends MongoRepository<Person, String>, Query
@Aggregation("{ '$project': { '_id' : '$lastname' } }")
Stream<String> findAllLastnamesAsStream();
@Aggregation("{ '$group': { '_id' : '$lastname', names : { $addToSet : '$?0' } } }")
Stream<PersonAggregate> groupStreamByLastnameAnd(String property);
@Aggregation("{ '$group': { '_id' : '$lastname', names : { $addToSet : '$?0' } } }")
List<PersonAggregate> groupByLastnameAnd(String property);

15
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactiveMongoRepositoryTests.java

@ -600,6 +600,17 @@ class ReactiveMongoRepositoryTests implements DirtiesStateExtension.StateFunctio @@ -600,6 +600,17 @@ class ReactiveMongoRepositoryTests implements DirtiesStateExtension.StateFunctio
}).verifyComplete();
}
@Test // GH-4839
void annotatedAggregationWithAggregationResultAsClosedInterfaceProjection() {
repository.findAggregatedClosedInterfaceProjectionBy() //
.as(StepVerifier::create) //
.consumeNextWith(it -> {
assertThat(it.getFirstname()).isIn(dave.getFirstname(), oliver.getFirstname());
assertThat(it.getLastname()).isEqualTo(dave.getLastname());
}).expectNextCount(1).verifyComplete();
}
@Test // DATAMONGO-2403
@DirtiesState
void annotatedAggregationExtractingSimpleValueIsEmptyForEmptyDocument() {
@ -816,6 +827,10 @@ class ReactiveMongoRepositoryTests implements DirtiesStateExtension.StateFunctio @@ -816,6 +827,10 @@ class ReactiveMongoRepositoryTests implements DirtiesStateExtension.StateFunctio
@Aggregation(pipeline = "{ '$group' : { '_id' : null, 'total' : { $sum: '$age' } } }")
Mono<Map> sumAgeAndReturnSumAsMap();
@Aggregation({ "{ '$match' : { 'lastname' : 'Matthews'} }",
"{ '$project': { _id : 0, firstname : 1, lastname : 1 } }" })
Flux<PersonSummary> findAggregatedClosedInterfaceProjectionBy();
@Aggregation(
pipeline = { "{ '$match' : { 'firstname' : '?0' } }", "{ '$project' : { '_id' : 0, 'lastname' : 1 } }" })
Mono<String> projectToLastnameAndRemoveId(String firstname);

1
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactivePersonRepository.java

@ -32,4 +32,5 @@ public interface ReactivePersonRepository extends ReactiveMongoRepository<Person @@ -32,4 +32,5 @@ public interface ReactivePersonRepository extends ReactiveMongoRepository<Person
* @return
*/
Flux<Person> findByLastname(String lastname);
}

Loading…
Cancel
Save