Browse Source

Refactor String-based repository aggretation methods into common utility callback.

Also, support aggregation result projections for reactive flows.

See #4839
Original pull request: #4841
issue/4851
Mark Paluch 1 year ago
parent
commit
0a29d13774
No known key found for this signature in database
GPG Key ID: 55BC6374BAA9D973
  1. 46
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractReactiveMongoQuery.java
  2. 125
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AggregationUtils.java
  3. 77
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/ReactiveStringBasedAggregation.java
  4. 132
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/StringBasedAggregation.java
  5. 12
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/AbstractPersonRepositoryIntegrationTests.java
  6. 5
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepository.java
  7. 15
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactiveMongoRepositoryTests.java
  8. 1
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactivePersonRepository.java

46
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractReactiveMongoQuery.java

@ -17,6 +17,7 @@ package org.springframework.data.mongodb.repository.query;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -61,7 +62,6 @@ import org.springframework.data.repository.query.RepositoryQuery;
import org.springframework.data.repository.query.ResultProcessor; import org.springframework.data.repository.query.ResultProcessor;
import org.springframework.data.repository.query.ValueExpressionDelegate; import org.springframework.data.repository.query.ValueExpressionDelegate;
import org.springframework.data.spel.ExpressionDependencies; import org.springframework.data.spel.ExpressionDependencies;
import org.springframework.data.util.TypeInformation;
import org.springframework.expression.ExpressionParser; import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
@ -70,7 +70,6 @@ import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import com.mongodb.MongoClientSettings; import com.mongodb.MongoClientSettings;
import reactor.util.function.Tuple2;
/** /**
* Base class for reactive {@link RepositoryQuery} implementations for MongoDB. * Base class for reactive {@link RepositoryQuery} implementations for MongoDB.
@ -112,16 +111,20 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery {
this.method = method; this.method = method;
this.operations = operations; this.operations = operations;
this.instantiators = new EntityInstantiators(); this.instantiators = new EntityInstantiators();
this.valueExpressionDelegate = new ValueExpressionDelegate(new QueryMethodValueEvaluationContextAccessor(new StandardEnvironment(), evaluationContextProvider.getEvaluationContextProvider()), ValueExpressionParser.create(() -> expressionParser)); this.valueExpressionDelegate = new ValueExpressionDelegate(
new QueryMethodValueEvaluationContextAccessor(new StandardEnvironment(),
evaluationContextProvider.getEvaluationContextProvider()),
ValueExpressionParser.create(() -> expressionParser));
MongoEntityMetadata<?> metadata = method.getEntityInformation(); MongoEntityMetadata<?> metadata = method.getEntityInformation();
Class<?> type = metadata.getCollectionEntity().getType(); Class<?> type = metadata.getCollectionEntity().getType();
this.findOperationWithProjection = operations.query(type); this.findOperationWithProjection = operations.query(type);
this.updateOps = operations.update(type); this.updateOps = operations.update(type);
ValueEvaluationContextProvider valueContextProvider = valueExpressionDelegate.createValueContextProvider( ValueEvaluationContextProvider valueContextProvider = valueExpressionDelegate
method.getParameters()); .createValueContextProvider(method.getParameters());
Assert.isInstanceOf(ReactiveValueEvaluationContextProvider.class, valueContextProvider, "ValueEvaluationContextProvider must be reactive"); Assert.isInstanceOf(ReactiveValueEvaluationContextProvider.class, valueContextProvider,
"ValueEvaluationContextProvider must be reactive");
this.valueEvaluationContextProvider = (ReactiveValueEvaluationContextProvider) valueContextProvider; this.valueEvaluationContextProvider = (ReactiveValueEvaluationContextProvider) valueContextProvider;
} }
@ -151,9 +154,10 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery {
this.findOperationWithProjection = operations.query(type); this.findOperationWithProjection = operations.query(type);
this.updateOps = operations.update(type); this.updateOps = operations.update(type);
ValueEvaluationContextProvider valueContextProvider = valueExpressionDelegate.createValueContextProvider( ValueEvaluationContextProvider valueContextProvider = valueExpressionDelegate
method.getParameters()); .createValueContextProvider(method.getParameters());
Assert.isInstanceOf(ReactiveValueEvaluationContextProvider.class, valueContextProvider, "ValueEvaluationContextProvider must be reactive"); Assert.isInstanceOf(ReactiveValueEvaluationContextProvider.class, valueContextProvider,
"ValueEvaluationContextProvider must be reactive");
this.valueEvaluationContextProvider = (ReactiveValueEvaluationContextProvider) valueContextProvider; this.valueEvaluationContextProvider = (ReactiveValueEvaluationContextProvider) valueContextProvider;
} }
@ -182,14 +186,9 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery {
ConvertingParameterAccessor accessor = new ConvertingParameterAccessor(operations.getConverter(), ConvertingParameterAccessor accessor = new ConvertingParameterAccessor(operations.getConverter(),
parameterAccessor); parameterAccessor);
TypeInformation<?> returnType = method.getReturnType();
ResultProcessor processor = method.getResultProcessor().withDynamicProjection(accessor); ResultProcessor processor = method.getResultProcessor().withDynamicProjection(accessor);
Class<?> typeToRead = processor.getReturnedType().getTypeToRead(); Class<?> typeToRead = processor.getReturnedType().getTypeToRead();
if (typeToRead == null && returnType.getComponentType() != null) {
typeToRead = returnType.getComponentType().getType();
}
return doExecute(method, processor, accessor, typeToRead); return doExecute(method, processor, accessor, typeToRead);
} }
@ -221,11 +220,15 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery {
String collection = method.getEntityInformation().getCollectionName(); String collection = method.getEntityInformation().getCollectionName();
ReactiveMongoQueryExecution execution = getExecution(accessor, ReactiveMongoQueryExecution execution = getExecution(accessor,
new ResultProcessingConverter(processor, operations, instantiators), find); getResultProcessing(processor), find);
return execution.execute(query, processor.getReturnedType().getDomainType(), collection); return execution.execute(query, processor.getReturnedType().getDomainType(), collection);
}); });
} }
ResultProcessingConverter getResultProcessing(ResultProcessor processor) {
return new ResultProcessingConverter(processor, operations, instantiators);
}
/** /**
* Returns the execution instance to use. * Returns the execution instance to use.
* *
@ -439,8 +442,8 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery {
return getValueExpressionEvaluatorLater(dependencies, accessor).zipWith(Mono.just(codec)); return getValueExpressionEvaluatorLater(dependencies, accessor).zipWith(Mono.just(codec));
} }
private Document decode(Tuple2<ValueExpressionEvaluator, ParameterBindingDocumentCodec> expressionEvaluator, String source, MongoParameterAccessor accessor, private Document decode(Tuple2<ValueExpressionEvaluator, ParameterBindingDocumentCodec> expressionEvaluator,
ParameterBindingDocumentCodec codec) { String source, MongoParameterAccessor accessor, ParameterBindingDocumentCodec codec) {
ParameterBindingContext bindingContext = new ParameterBindingContext(accessor::getBindableValue, ParameterBindingContext bindingContext = new ParameterBindingContext(accessor::getBindableValue,
expressionEvaluator.getT1()); expressionEvaluator.getT1());
@ -490,8 +493,8 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery {
@Override @Override
public <T> T evaluate(String expressionString) { public <T> T evaluate(String expressionString) {
ValueExpression expression = valueExpressionDelegate.parse(expressionString); ValueExpression expression = valueExpressionDelegate.parse(expressionString);
ValueEvaluationContext evaluationContext = valueEvaluationContextProvider.getEvaluationContext(accessor.getValues(), ValueEvaluationContext evaluationContext = valueEvaluationContextProvider
expression.getExpressionDependencies()); .getEvaluationContext(accessor.getValues(), expression.getExpressionDependencies());
return (T) expression.evaluate(evaluationContext); return (T) expression.evaluate(evaluationContext);
} }
}; };
@ -509,8 +512,9 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery {
protected Mono<ValueExpressionEvaluator> getValueExpressionEvaluatorLater(ExpressionDependencies dependencies, protected Mono<ValueExpressionEvaluator> getValueExpressionEvaluatorLater(ExpressionDependencies dependencies,
MongoParameterAccessor accessor) { MongoParameterAccessor accessor) {
return valueEvaluationContextProvider.getEvaluationContextLater(accessor.getValues(), dependencies) return valueEvaluationContextProvider.getEvaluationContextLater(accessor.getValues(), dependencies)
.map(evaluationContext -> new ValueExpressionDelegateValueExpressionEvaluator(valueExpressionDelegate, valueExpression -> evaluationContext)); .map(evaluationContext -> new ValueExpressionDelegateValueExpressionEvaluator(valueExpressionDelegate,
valueExpression -> evaluationContext));
} }
/** /**

125
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AggregationUtils.java

@ -17,6 +17,7 @@ package org.springframework.data.mongodb.repository.query;
import java.time.Duration; import java.time.Duration;
import java.util.Map; import java.util.Map;
import java.util.function.Function;
import java.util.function.IntUnaryOperator; import java.util.function.IntUnaryOperator;
import java.util.function.LongUnaryOperator; import java.util.function.LongUnaryOperator;
@ -28,11 +29,18 @@ import org.springframework.data.mapping.model.ValueExpressionEvaluator;
import org.springframework.data.mongodb.core.aggregation.Aggregation; import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions; import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
import org.springframework.data.mongodb.core.aggregation.AggregationPipeline; import org.springframework.data.mongodb.core.aggregation.AggregationPipeline;
import org.springframework.data.mongodb.core.aggregation.AggregationResults;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.convert.MongoConverter; import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.mapping.FieldName; import org.springframework.data.mongodb.core.mapping.FieldName;
import org.springframework.data.mongodb.core.mapping.MongoSimpleTypes;
import org.springframework.data.mongodb.core.query.Collation; import org.springframework.data.mongodb.core.query.Collation;
import org.springframework.data.mongodb.core.query.Meta; import org.springframework.data.mongodb.core.query.Meta;
import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.repository.query.ResultProcessor;
import org.springframework.data.repository.query.ReturnedType;
import org.springframework.data.util.ReflectionUtils;
import org.springframework.data.util.TypeInformation;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.springframework.util.ClassUtils; import org.springframework.util.ClassUtils;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
@ -116,13 +124,15 @@ abstract class AggregationUtils {
} }
/** /**
* If present apply the preference from the {@link org.springframework.data.mongodb.repository.ReadPreference} annotation. * If present apply the preference from the {@link org.springframework.data.mongodb.repository.ReadPreference}
* annotation.
* *
* @param builder must not be {@literal null}. * @param builder must not be {@literal null}.
* @return never {@literal null}. * @return never {@literal null}.
* @since 4.2 * @since 4.2
*/ */
static AggregationOptions.Builder applyReadPreference(AggregationOptions.Builder builder, MongoQueryMethod queryMethod) { static AggregationOptions.Builder applyReadPreference(AggregationOptions.Builder builder,
MongoQueryMethod queryMethod) {
if (!queryMethod.hasAnnotatedReadPreference()) { if (!queryMethod.hasAnnotatedReadPreference()) {
return builder; return builder;
@ -131,6 +141,93 @@ abstract class AggregationUtils {
return builder.readPreference(ReadPreference.valueOf(queryMethod.getAnnotatedReadPreference())); return builder.readPreference(ReadPreference.valueOf(queryMethod.getAnnotatedReadPreference()));
} }
static AggregationOptions computeOptions(MongoQueryMethod method, ConvertingParameterAccessor accessor,
AggregationPipeline pipeline, ValueExpressionEvaluator evaluator) {
AggregationOptions.Builder builder = Aggregation.newAggregationOptions();
AggregationUtils.applyCollation(builder, method.getAnnotatedCollation(), accessor, evaluator);
AggregationUtils.applyMeta(builder, method);
AggregationUtils.applyHint(builder, method);
AggregationUtils.applyReadPreference(builder, method);
TypeInformation<?> returnType = method.getReturnType();
if (returnType.getComponentType() != null) {
returnType = returnType.getRequiredComponentType();
}
if (ReflectionUtils.isVoid(returnType.getType()) && pipeline.isOutOrMerge()) {
builder.skipOutput();
}
return builder.build();
}
/**
* Prepares the AggregationPipeline including type discovery and calling {@link AggregationCallback} to run the
* aggregation.
*/
@Nullable
static <T> T doAggregate(AggregationPipeline pipeline, MongoQueryMethod method, ResultProcessor processor,
ConvertingParameterAccessor accessor,
Function<MongoParameterAccessor, ValueExpressionEvaluator> evaluatorFunction, AggregationCallback<T> callback) {
Class<?> sourceType = method.getDomainClass();
ReturnedType returnedType = processor.getReturnedType();
// 🙈Interface Projections do not happen on the Aggregation level but through our repository infrastructure.
// Non-projections and raw results (AggregationResults<…>) are handled here. Interface projections read a Document
// and DTO projections read the returned type.
// We also support simple return types (String) that are read from a Document
TypeInformation<?> returnType = method.getReturnType();
Class<?> returnElementType = (returnType.getComponentType() != null ? returnType.getRequiredComponentType()
: returnType).getType();
Class<?> entityType;
boolean isRawAggregationResult = ClassUtils.isAssignable(AggregationResults.class, method.getReturnedObjectType());
if (returnElementType.equals(Document.class)) {
entityType = sourceType;
} else {
entityType = returnElementType;
}
AggregationUtils.appendSortIfPresent(pipeline, accessor, entityType);
if (method.isSliceQuery()) {
AggregationUtils.appendLimitAndOffsetIfPresent(pipeline, accessor, LongUnaryOperator.identity(),
limit -> limit + 1);
} else {
AggregationUtils.appendLimitAndOffsetIfPresent(pipeline, accessor);
}
AggregationOptions options = AggregationUtils.computeOptions(method, accessor, pipeline,
evaluatorFunction.apply(accessor));
TypedAggregation<?> aggregation = new TypedAggregation<>(sourceType, pipeline.getOperations(), options);
boolean isSimpleReturnType = MongoSimpleTypes.HOLDER.isSimpleType(returnElementType);
Class<?> typeToRead;
if (isSimpleReturnType) {
typeToRead = Document.class;
} else if (isRawAggregationResult) {
typeToRead = returnElementType;
} else {
if (returnedType.isProjecting()) {
typeToRead = returnedType.getReturnedType().isInterface() ? Document.class : returnedType.getReturnedType();
} else {
typeToRead = entityType;
}
}
return callback.doAggregate(aggregation, sourceType, typeToRead, returnElementType, isSimpleReturnType,
isRawAggregationResult);
}
static AggregationPipeline computePipeline(AbstractMongoQuery mongoQuery, MongoQueryMethod method,
ConvertingParameterAccessor accessor) {
return new AggregationPipeline(mongoQuery.parseAggregationPipeline(method.getAnnotatedAggregation(), accessor));
}
/** /**
* Append {@code $sort} aggregation stage if {@link ConvertingParameterAccessor#getSort()} is present. * Append {@code $sort} aggregation stage if {@link ConvertingParameterAccessor#getSort()} is present.
* *
@ -139,7 +236,7 @@ abstract class AggregationUtils {
* @param targetType * @param targetType
*/ */
static void appendSortIfPresent(AggregationPipeline aggregationPipeline, ConvertingParameterAccessor accessor, static void appendSortIfPresent(AggregationPipeline aggregationPipeline, ConvertingParameterAccessor accessor,
Class<?> targetType) { @Nullable Class<?> targetType) {
if (accessor.getSort().isUnsorted()) { if (accessor.getSort().isUnsorted()) {
return; return;
@ -254,4 +351,26 @@ abstract class AggregationUtils {
return converter.getConversionService().convert(value, targetType); return converter.getConversionService().convert(value, targetType);
} }
/**
* Interface to invoke an aggregation along with source, intermediate, and target types.
*
* @param <T>
*/
interface AggregationCallback<T> {
/**
* @param aggregation
* @param domainType
* @param typeToRead
* @param elementType
* @param simpleType whether the aggregation returns {@link Document} or a
* {@link org.springframework.data.mapping.model.SimpleTypeHolder simple type}.
* @param rawResult whether the aggregation returns {@link AggregationResults}.
* @return
*/
@Nullable
T doAggregate(TypedAggregation<?> aggregation, Class<?> domainType, Class<?> typeToRead, Class<?> elementType,
boolean simpleType, boolean rawResult);
}
} }

77
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/ReactiveStringBasedAggregation.java

@ -24,11 +24,8 @@ import org.bson.Document;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.springframework.data.mongodb.core.ReactiveMongoOperations; import org.springframework.data.mongodb.core.ReactiveMongoOperations;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperation; import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
import org.springframework.data.mongodb.core.aggregation.AggregationPipeline; import org.springframework.data.mongodb.core.aggregation.AggregationPipeline;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.convert.MongoConverter; import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.mapping.MongoSimpleTypes; import org.springframework.data.mongodb.core.mapping.MongoSimpleTypes;
import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Query;
@ -36,9 +33,8 @@ import org.springframework.data.repository.query.ReactiveQueryMethodEvaluationCo
import org.springframework.data.repository.query.ResultProcessor; import org.springframework.data.repository.query.ResultProcessor;
import org.springframework.data.repository.query.ValueExpressionDelegate; import org.springframework.data.repository.query.ValueExpressionDelegate;
import org.springframework.data.util.ReflectionUtils; import org.springframework.data.util.ReflectionUtils;
import org.springframework.data.util.TypeInformation;
import org.springframework.expression.ExpressionParser; import org.springframework.expression.ExpressionParser;
import org.springframework.util.ClassUtils; import org.springframework.lang.Nullable;
/** /**
* A reactive {@link org.springframework.data.repository.query.RepositoryQuery} to use a plain JSON String to create an * A reactive {@link org.springframework.data.repository.query.RepositoryQuery} to use a plain JSON String to create an
@ -87,46 +83,39 @@ public class ReactiveStringBasedAggregation extends AbstractReactiveMongoQuery {
} }
@Override @Override
@SuppressWarnings("ReactiveStreamsNullableInLambdaInTransform")
protected Publisher<Object> doExecute(ReactiveMongoQueryMethod method, ResultProcessor processor, protected Publisher<Object> doExecute(ReactiveMongoQueryMethod method, ResultProcessor processor,
ConvertingParameterAccessor accessor, Class<?> typeToRead) { ConvertingParameterAccessor accessor, @Nullable Class<?> ignored) {
return computePipeline(accessor).flatMapMany(it -> { return computePipeline(accessor).flatMapMany(it -> {
Class<?> sourceType = method.getDomainClass(); return AggregationUtils.doAggregate(new AggregationPipeline(it), method, processor, accessor,
Class<?> targetType = typeToRead; this::getValueExpressionEvaluator,
(aggregation, sourceType, typeToRead, elementType, simpleType, rawResult) -> {
AggregationPipeline pipeline = new AggregationPipeline(it); Flux<?> flux = reactiveMongoOperations.aggregate(aggregation, typeToRead);
if (ReflectionUtils.isVoid(elementType)) {
return flux.then();
}
AggregationUtils.appendSortIfPresent(pipeline, accessor, typeToRead); ReactiveMongoQueryExecution.ResultProcessingConverter resultProcessing = getResultProcessing(processor);
AggregationUtils.appendLimitAndOffsetIfPresent(pipeline, accessor);
boolean isSimpleReturnType = isSimpleReturnType(typeToRead); if (simpleType && !rawResult && !elementType.equals(Document.class)) {
boolean isRawReturnType = ClassUtils.isAssignable(org.bson.Document.class, typeToRead);
if (isSimpleReturnType || isRawReturnType) { flux = flux.handle((item, sink) -> {
targetType = Document.class;
}
AggregationOptions options = computeOptions(method, accessor, pipeline); Object result = AggregationUtils.extractSimpleTypeResult((Document) item, elementType, mongoConverter);
TypedAggregation<?> aggregation = new TypedAggregation<>(sourceType, pipeline.getOperations(), options);
Flux<?> flux = reactiveMongoOperations.aggregate(aggregation, targetType); if (result != null) {
if (ReflectionUtils.isVoid(typeToRead)) { sink.next(result);
return flux.then(); }
} });
}
if (isSimpleReturnType && !isRawReturnType) { flux = flux.map(resultProcessing::convert);
flux = flux.handle((item, sink) -> {
Object result = AggregationUtils.extractSimpleTypeResult((Document) item, typeToRead, mongoConverter); return method.isCollectionQuery() ? flux : flux.next();
});
if (result != null) {
sink.next(result);
}
});
}
return method.isCollectionQuery() ? flux : flux.next();
}); });
} }
@ -138,28 +127,6 @@ public class ReactiveStringBasedAggregation extends AbstractReactiveMongoQuery {
return parseAggregationPipeline(getQueryMethod().getAnnotatedAggregation(), accessor); return parseAggregationPipeline(getQueryMethod().getAnnotatedAggregation(), accessor);
} }
private AggregationOptions computeOptions(MongoQueryMethod method, ConvertingParameterAccessor accessor,
AggregationPipeline pipeline) {
AggregationOptions.Builder builder = Aggregation.newAggregationOptions();
AggregationUtils.applyCollation(builder, method.getAnnotatedCollation(), accessor,
getValueExpressionEvaluator(accessor));
AggregationUtils.applyMeta(builder, method);
AggregationUtils.applyHint(builder, method);
AggregationUtils.applyReadPreference(builder, method);
TypeInformation<?> returnType = method.getReturnType();
if (returnType.getComponentType() != null) {
returnType = returnType.getRequiredComponentType();
}
if (ReflectionUtils.isVoid(returnType.getType()) && pipeline.isOutOrMerge()) {
builder.skipOutput();
}
return builder.build();
}
@Override @Override
protected Mono<Query> createQuery(ConvertingParameterAccessor accessor) { protected Mono<Query> createQuery(ConvertingParameterAccessor accessor) {
throw new UnsupportedOperationException("No query support for aggregation"); throw new UnsupportedOperationException("No query support for aggregation");

132
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/StringBasedAggregation.java

@ -17,7 +17,6 @@ package org.springframework.data.mongodb.repository.query;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.function.LongUnaryOperator;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.bson.Document; import org.bson.Document;
@ -26,11 +25,7 @@ import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.SliceImpl; import org.springframework.data.domain.SliceImpl;
import org.springframework.data.mongodb.InvalidMongoDbApiUsageException; import org.springframework.data.mongodb.InvalidMongoDbApiUsageException;
import org.springframework.data.mongodb.core.MongoOperations; import org.springframework.data.mongodb.core.MongoOperations;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
import org.springframework.data.mongodb.core.aggregation.AggregationPipeline;
import org.springframework.data.mongodb.core.aggregation.AggregationResults; import org.springframework.data.mongodb.core.aggregation.AggregationResults;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.convert.MongoConverter; import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.mapping.MongoSimpleTypes; import org.springframework.data.mongodb.core.mapping.MongoSimpleTypes;
import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Query;
@ -41,7 +36,6 @@ import org.springframework.data.repository.query.ValueExpressionDelegate;
import org.springframework.data.util.ReflectionUtils; import org.springframework.data.util.ReflectionUtils;
import org.springframework.expression.ExpressionParser; import org.springframework.expression.ExpressionParser;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.springframework.util.ClassUtils;
/** /**
* {@link AbstractMongoQuery} implementation to run string-based aggregations using * {@link AbstractMongoQuery} implementation to run string-based aggregations using
@ -103,87 +97,67 @@ public class StringBasedAggregation extends AbstractMongoQuery {
this.mongoConverter = mongoOperations.getConverter(); this.mongoConverter = mongoOperations.getConverter();
} }
@SuppressWarnings("unchecked")
@Override @Override
@Nullable @Nullable
protected Object doExecute(MongoQueryMethod method, ResultProcessor resultProcessor, protected Object doExecute(MongoQueryMethod method, ResultProcessor processor, ConvertingParameterAccessor accessor,
ConvertingParameterAccessor accessor, @Nullable Class<?> typeToRead) { @Nullable Class<?> ignore) {
Class<?> sourceType = method.getDomainClass(); return AggregationUtils.doAggregate(AggregationUtils.computePipeline(this, method, accessor), method, processor,
Class<?> targetType = typeToRead; accessor, this::getExpressionEvaluatorFor,
(aggregation, sourceType, typeToRead, elementType, simpleType, rawResult) -> {
AggregationPipeline pipeline = computePipeline(method, accessor); if (method.isStreamQuery()) {
AggregationUtils.appendSortIfPresent(pipeline, accessor, typeToRead);
if (method.isSliceQuery()) { Stream<?> stream = mongoOperations.aggregateStream(aggregation, typeToRead);
AggregationUtils.appendLimitAndOffsetIfPresent(pipeline, accessor, LongUnaryOperator.identity(),
limit -> limit + 1);
} else {
AggregationUtils.appendLimitAndOffsetIfPresent(pipeline, accessor);
}
boolean isSimpleReturnType = typeToRead != null && isSimpleReturnType(typeToRead);
boolean isRawAggregationResult = typeToRead != null && ClassUtils.isAssignable(AggregationResults.class, typeToRead);
if (isSimpleReturnType) { if (!simpleType || elementType.equals(Document.class)) {
targetType = Document.class; return stream;
} else if (isRawAggregationResult) { }
// 🙈 return stream
targetType = method.getReturnType().getRequiredActualType().getRequiredComponentType().getType(); .map(it -> AggregationUtils.extractSimpleTypeResult((Document) it, elementType, mongoConverter));
} else if (resultProcessor.getReturnedType().isProjecting()) { }
targetType = resultProcessor.getReturnedType().getReturnedType().isInterface() ? Document.class :resultProcessor.getReturnedType().getReturnedType();
}
AggregationOptions options = computeOptions(method, accessor, pipeline); AggregationResults<Object> result = (AggregationResults<Object>) mongoOperations.aggregate(aggregation,
TypedAggregation<?> aggregation = new TypedAggregation<>(sourceType, pipeline.getOperations(), options); typeToRead);
if (method.isStreamQuery()) { if (ReflectionUtils.isVoid(elementType)) {
return null;
}
Stream<?> stream = mongoOperations.aggregateStream(aggregation, targetType); if (rawResult) {
return result;
}
if (isSimpleReturnType) { List<?> results = result.getMappedResults();
return stream.map(it -> AggregationUtils.extractSimpleTypeResult((Document) it, typeToRead, mongoConverter)); if (method.isCollectionQuery()) {
} return simpleType ? convertResults(elementType, (List<Document>) results) : results;
}
return stream; if (method.isSliceQuery()) {
}
AggregationResults<Object> result = (AggregationResults<Object>) mongoOperations.aggregate(aggregation, targetType); Pageable pageable = accessor.getPageable();
if (typeToRead != null && ReflectionUtils.isVoid(typeToRead)) { int pageSize = pageable.getPageSize();
return null; List<Object> resultsToUse = simpleType ? convertResults(elementType, (List<Document>) results)
} : (List<Object>) results;
boolean hasNext = resultsToUse.size() > pageSize;
return new SliceImpl<>(hasNext ? resultsToUse.subList(0, pageSize) : resultsToUse, pageable, hasNext);
}
if (isRawAggregationResult) { Object uniqueResult = result.getUniqueMappedResult();
return result;
}
List<Object> results = result.getMappedResults();
if (method.isCollectionQuery()) {
return isSimpleReturnType ? convertResults(typeToRead, results) : results;
}
if (method.isSliceQuery()) {
Pageable pageable = accessor.getPageable();
int pageSize = pageable.getPageSize();
List<Object> resultsToUse = isSimpleReturnType ? convertResults(typeToRead, results) : results;
boolean hasNext = resultsToUse.size() > pageSize;
return new SliceImpl<>(hasNext ? resultsToUse.subList(0, pageSize) : resultsToUse, pageable, hasNext);
}
Object uniqueResult = result.getUniqueMappedResult(); return simpleType
? AggregationUtils.extractSimpleTypeResult((Document) uniqueResult, elementType, mongoConverter)
return isSimpleReturnType : uniqueResult;
? AggregationUtils.extractSimpleTypeResult((Document) uniqueResult, typeToRead, mongoConverter) });
: uniqueResult;
} }
private List<Object> convertResults(Class<?> typeToRead, List<Object> mappedResults) { private List<Object> convertResults(Class<?> targetType, List<Document> mappedResults) {
List<Object> list = new ArrayList<>(mappedResults.size()); List<Object> list = new ArrayList<>(mappedResults.size());
for (Object it : mappedResults) { for (Document it : mappedResults) {
Object extractSimpleTypeResult = AggregationUtils.extractSimpleTypeResult((Document) it, typeToRead, Object extractSimpleTypeResult = AggregationUtils.extractSimpleTypeResult(it, targetType, mongoConverter);
mongoConverter);
list.add(extractSimpleTypeResult); list.add(extractSimpleTypeResult);
} }
return list; return list;
@ -193,28 +167,6 @@ public class StringBasedAggregation extends AbstractMongoQuery {
return MongoSimpleTypes.HOLDER.isSimpleType(targetType); return MongoSimpleTypes.HOLDER.isSimpleType(targetType);
} }
AggregationPipeline computePipeline(MongoQueryMethod method, ConvertingParameterAccessor accessor) {
return new AggregationPipeline(parseAggregationPipeline(method.getAnnotatedAggregation(), accessor));
}
private AggregationOptions computeOptions(MongoQueryMethod method, ConvertingParameterAccessor accessor,
AggregationPipeline pipeline) {
AggregationOptions.Builder builder = Aggregation.newAggregationOptions();
AggregationUtils.applyCollation(builder, method.getAnnotatedCollation(), accessor,
getExpressionEvaluatorFor(accessor));
AggregationUtils.applyMeta(builder, method);
AggregationUtils.applyHint(builder, method);
AggregationUtils.applyReadPreference(builder, method);
if (ReflectionUtils.isVoid(method.getReturnType().getType()) && pipeline.isOutOrMerge()) {
builder.skipOutput();
}
return builder.build();
}
@Override @Override
protected Query createQuery(ConvertingParameterAccessor accessor) { protected Query createQuery(ConvertingParameterAccessor accessor) {
throw new UnsupportedOperationException("No query support for aggregation"); throw new UnsupportedOperationException("No query support for aggregation");

12
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/AbstractPersonRepositoryIntegrationTests.java

@ -1396,6 +1396,18 @@ public abstract class AbstractPersonRepositoryIntegrationTests implements Dirtie
} }
} }
@Test // DATAMONGO-4841
void annotatedAggregationStreamWithPlaceholderValue() {
assertThat(repository.groupStreamByLastnameAnd("firstname"))
.contains(new PersonAggregate("Lessard", Collections.singletonList("Stefan"))) //
.contains(new PersonAggregate("Keys", Collections.singletonList("Alicia"))) //
.contains(new PersonAggregate("Tinsley", Collections.singletonList("Boyd"))) //
.contains(new PersonAggregate("Beauford", Collections.singletonList("Carter"))) //
.contains(new PersonAggregate("Moore", Collections.singletonList("Leroi"))) //
.contains(new PersonAggregate("Matthews", Arrays.asList("Dave", "Oliver August")));
}
@Test // DATAMONGO-2153 @Test // DATAMONGO-2153
void annotatedAggregationWithPlaceholderValue() { void annotatedAggregationWithPlaceholderValue() {

5
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepository.java

@ -27,10 +27,10 @@ import org.springframework.data.domain.Limit;
import org.springframework.data.domain.Page; import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Range; import org.springframework.data.domain.Range;
import org.springframework.data.domain.Window;
import org.springframework.data.domain.ScrollPosition; import org.springframework.data.domain.ScrollPosition;
import org.springframework.data.domain.Slice; import org.springframework.data.domain.Slice;
import org.springframework.data.domain.Sort; import org.springframework.data.domain.Sort;
import org.springframework.data.domain.Window;
import org.springframework.data.geo.Box; import org.springframework.data.geo.Box;
import org.springframework.data.geo.Circle; import org.springframework.data.geo.Circle;
import org.springframework.data.geo.Distance; import org.springframework.data.geo.Distance;
@ -413,6 +413,9 @@ public interface PersonRepository extends MongoRepository<Person, String>, Query
@Aggregation("{ '$project': { '_id' : '$lastname' } }") @Aggregation("{ '$project': { '_id' : '$lastname' } }")
Stream<String> findAllLastnamesAsStream(); Stream<String> findAllLastnamesAsStream();
@Aggregation("{ '$group': { '_id' : '$lastname', names : { $addToSet : '$?0' } } }")
Stream<PersonAggregate> groupStreamByLastnameAnd(String property);
@Aggregation("{ '$group': { '_id' : '$lastname', names : { $addToSet : '$?0' } } }") @Aggregation("{ '$group': { '_id' : '$lastname', names : { $addToSet : '$?0' } } }")
List<PersonAggregate> groupByLastnameAnd(String property); List<PersonAggregate> groupByLastnameAnd(String property);

15
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactiveMongoRepositoryTests.java

@ -600,6 +600,17 @@ class ReactiveMongoRepositoryTests implements DirtiesStateExtension.StateFunctio
}).verifyComplete(); }).verifyComplete();
} }
@Test // GH-4839
void annotatedAggregationWithAggregationResultAsClosedInterfaceProjection() {
repository.findAggregatedClosedInterfaceProjectionBy() //
.as(StepVerifier::create) //
.consumeNextWith(it -> {
assertThat(it.getFirstname()).isIn(dave.getFirstname(), oliver.getFirstname());
assertThat(it.getLastname()).isEqualTo(dave.getLastname());
}).expectNextCount(1).verifyComplete();
}
@Test // DATAMONGO-2403 @Test // DATAMONGO-2403
@DirtiesState @DirtiesState
void annotatedAggregationExtractingSimpleValueIsEmptyForEmptyDocument() { void annotatedAggregationExtractingSimpleValueIsEmptyForEmptyDocument() {
@ -816,6 +827,10 @@ class ReactiveMongoRepositoryTests implements DirtiesStateExtension.StateFunctio
@Aggregation(pipeline = "{ '$group' : { '_id' : null, 'total' : { $sum: '$age' } } }") @Aggregation(pipeline = "{ '$group' : { '_id' : null, 'total' : { $sum: '$age' } } }")
Mono<Map> sumAgeAndReturnSumAsMap(); Mono<Map> sumAgeAndReturnSumAsMap();
@Aggregation({ "{ '$match' : { 'lastname' : 'Matthews'} }",
"{ '$project': { _id : 0, firstname : 1, lastname : 1 } }" })
Flux<PersonSummary> findAggregatedClosedInterfaceProjectionBy();
@Aggregation( @Aggregation(
pipeline = { "{ '$match' : { 'firstname' : '?0' } }", "{ '$project' : { '_id' : 0, 'lastname' : 1 } }" }) pipeline = { "{ '$match' : { 'firstname' : '?0' } }", "{ '$project' : { '_id' : 0, 'lastname' : 1 } }" })
Mono<String> projectToLastnameAndRemoveId(String firstname); Mono<String> projectToLastnameAndRemoveId(String firstname);

1
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactivePersonRepository.java

@ -32,4 +32,5 @@ public interface ReactivePersonRepository extends ReactiveMongoRepository<Person
* @return * @return
*/ */
Flux<Person> findByLastname(String lastname); Flux<Person> findByLastname(String lastname);
} }

Loading…
Cancel
Save