Browse Source

Introduce AggregationStage API

With the introduction of AggregationStage we move the API closer to the MongoDB terminology removing kognitive overhead.
Also the change allows us to switch back and forth with the default implementations of toDocument and toDocuments which let's us remove the deprecation warnings having dedicated interfaces that indicate what to implement in order to comply with the usage pattern.
issue/4306
Christoph Strobl 3 years ago
parent
commit
38e406867a
No known key found for this signature in database
GPG Key ID: 8CC1AB53391458C8
  1. 14
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java
  2. 4
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java
  3. 15
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java
  4. 4
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java
  5. 80
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/Aggregation.java
  6. 21
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationOperation.java
  7. 10
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationOperationRenderer.java
  8. 83
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationPipeline.java
  9. 45
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationStage.java
  10. 18
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationUpdate.java
  11. 37
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/FacetOperation.java
  12. 13
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/LookupOperation.java
  13. 103
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/MultiOperationAggregationStage.java
  14. 19
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/TypedAggregation.java
  15. 13
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AggregationUtils.java
  16. 68
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/MultiOperationAggregationStageUnitTests.java

14
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java

@ -30,6 +30,7 @@ import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions; import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
import org.springframework.data.mongodb.core.aggregation.AggregationPipeline; import org.springframework.data.mongodb.core.aggregation.AggregationPipeline;
import org.springframework.data.mongodb.core.aggregation.AggregationResults; import org.springframework.data.mongodb.core.aggregation.AggregationResults;
import org.springframework.data.mongodb.core.aggregation.AggregationStage;
import org.springframework.data.mongodb.core.aggregation.AggregationUpdate; import org.springframework.data.mongodb.core.aggregation.AggregationUpdate;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation; import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.convert.MappingMongoConverter; import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
@ -296,6 +297,19 @@ public interface MongoOperations extends FluentMongoOperations {
return createView(name, source, AggregationPipeline.of(stages)); return createView(name, source, AggregationPipeline.of(stages));
} }
/**
* Create a view with the provided name. The view content is defined by the {@link AggregationStage pipeline stages}
* on another collection or view identified by the given {@link #getCollectionName(Class) source type}.
*
* @param name the name of the view to create.
* @param source the type defining the views source collection.
* @param stages the {@link AggregationOperation aggregation pipeline stages} defining the view content.
* @since 4.1
*/
default MongoCollection<Document> createView(String name, Class<?> source, AggregationStage... stages) {
return createView(name, source, AggregationPipeline.of(stages));
}
/** /**
* Create a view with the provided name. The view content is defined by the {@link AggregationPipeline pipeline} on * Create a view with the provided name. The view content is defined by the {@link AggregationPipeline pipeline} on
* another collection or view identified by the given {@link #getCollectionName(Class) source type}. * another collection or view identified by the given {@link #getCollectionName(Class) source type}.

4
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java

@ -648,7 +648,7 @@ public class MongoTemplate
@Nullable ViewOptions options) { @Nullable ViewOptions options) {
return createView(name, getCollectionName(source), return createView(name, getCollectionName(source),
queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getOperations()), source), queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getStages()), source),
options); options);
} }
@ -657,7 +657,7 @@ public class MongoTemplate
@Nullable ViewOptions options) { @Nullable ViewOptions options) {
return createView(name, source, return createView(name, source,
queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getOperations()), (Class<?>) null), queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getStages()), (Class<?>) null),
options); options);
} }

15
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java

@ -25,13 +25,13 @@ import java.util.function.Supplier;
import org.bson.Document; import org.bson.Document;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
import org.springframework.data.geo.GeoResult; import org.springframework.data.geo.GeoResult;
import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory; import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
import org.springframework.data.mongodb.core.aggregation.Aggregation; import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperation; import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions; import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
import org.springframework.data.mongodb.core.aggregation.AggregationPipeline; import org.springframework.data.mongodb.core.aggregation.AggregationPipeline;
import org.springframework.data.mongodb.core.aggregation.AggregationStage;
import org.springframework.data.mongodb.core.aggregation.AggregationUpdate; import org.springframework.data.mongodb.core.aggregation.AggregationUpdate;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation; import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.convert.MappingMongoConverter; import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
@ -256,6 +256,19 @@ public interface ReactiveMongoOperations extends ReactiveFluentMongoOperations {
return createView(name, source, AggregationPipeline.of(stages)); return createView(name, source, AggregationPipeline.of(stages));
} }
/**
* Create a view with the provided name. The view content is defined by the {@link AggregationStage pipeline stages}
* on another collection or view identified by the given {@link #getCollectionName(Class) source type}.
*
* @param name the name of the view to create.
* @param source the type defining the views source collection.
* @param stages the {@link AggregationOperation aggregation pipeline stages} defining the view content.
* @since 4.1
*/
default Mono<MongoCollection<Document>> createView(String name, Class<?> source, AggregationStage... stages) {
return createView(name, source, AggregationPipeline.of(stages));
}
/** /**
* Create a view with the provided name. The view content is defined by the {@link AggregationPipeline pipeline} on * Create a view with the provided name. The view content is defined by the {@link AggregationPipeline pipeline} on
* another collection or view identified by the given {@link #getCollectionName(Class) source type}. * another collection or view identified by the given {@link #getCollectionName(Class) source type}.

4
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java

@ -676,7 +676,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
@Nullable ViewOptions options) { @Nullable ViewOptions options) {
return createView(name, getCollectionName(source), return createView(name, getCollectionName(source),
queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getOperations()), source), queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getStages()), source),
options); options);
} }
@ -685,7 +685,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
@Nullable ViewOptions options) { @Nullable ViewOptions options) {
return createView(name, source, return createView(name, source,
queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getOperations()), (Class<?>) null), queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getStages()), (Class<?>) null),
options); options);
} }

80
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/Aggregation.java

@ -102,12 +102,12 @@ public class Aggregation {
private final AggregationOptions options; private final AggregationOptions options;
/** /**
* Creates a new {@link Aggregation} from the given {@link AggregationOperation}s. * Creates a new {@link Aggregation} from the given {@link AggregationStage}s.
* *
* @param operations must not be {@literal null} or empty. * @param operations must not be {@literal null} or empty.
*/ */
public static Aggregation newAggregation(List<? extends AggregationOperation> operations) { public static Aggregation newAggregation(List<? extends AggregationStage> operations) {
return newAggregation(operations.toArray(new AggregationOperation[operations.size()])); return newAggregation(operations.toArray(AggregationStage[]::new));
} }
/** /**
@ -119,6 +119,16 @@ public class Aggregation {
return new Aggregation(operations); return new Aggregation(operations);
} }
/**
* Creates a new {@link Aggregation} from the given {@link AggregationOperation}s.
*
* @param stages must not be {@literal null} or empty.
* @since 4.1
*/
public static Aggregation newAggregation(AggregationStage... stages) {
return new Aggregation(stages);
}
/** /**
* Creates a new {@link AggregationUpdate} from the given {@link AggregationOperation}s. * Creates a new {@link AggregationUpdate} from the given {@link AggregationOperation}s.
* *
@ -130,6 +140,17 @@ public class Aggregation {
return AggregationUpdate.from(Arrays.asList(operations)); return AggregationUpdate.from(Arrays.asList(operations));
} }
/**
* Creates a new {@link AggregationUpdate} from the given {@link AggregationOperation}s.
*
* @param operations can be {@literal empty} but must not be {@literal null}.
* @return new instance of {@link AggregationUpdate}.
* @since 4.1
*/
public static AggregationUpdate newUpdate(AggregationStage... operations) {
return AggregationUpdate.updateFrom(Arrays.asList(operations));
}
/** /**
* Returns a copy of this {@link Aggregation} with the given {@link AggregationOptions} set. Note that options are * Returns a copy of this {@link Aggregation} with the given {@link AggregationOptions} set. Note that options are
* supported in MongoDB version 2.6+. * supported in MongoDB version 2.6+.
@ -141,7 +162,7 @@ public class Aggregation {
public Aggregation withOptions(AggregationOptions options) { public Aggregation withOptions(AggregationOptions options) {
Assert.notNull(options, "AggregationOptions must not be null"); Assert.notNull(options, "AggregationOptions must not be null");
return new Aggregation(this.pipeline.getOperations(), options); return new Aggregation(this.pipeline.getStages(), options);
} }
/** /**
@ -150,8 +171,8 @@ public class Aggregation {
* @param type must not be {@literal null}. * @param type must not be {@literal null}.
* @param operations must not be {@literal null} or empty. * @param operations must not be {@literal null} or empty.
*/ */
public static <T> TypedAggregation<T> newAggregation(Class<T> type, List<? extends AggregationOperation> operations) { public static <T> TypedAggregation<T> newAggregation(Class<T> type, List<? extends AggregationStage> operations) {
return newAggregation(type, operations.toArray(new AggregationOperation[operations.size()])); return newAggregation(type, operations.toArray(AggregationStage[]::new));
} }
/** /**
@ -164,6 +185,17 @@ public class Aggregation {
return new TypedAggregation<T>(type, operations); return new TypedAggregation<T>(type, operations);
} }
/**
* Creates a new {@link TypedAggregation} for the given type and {@link AggregationOperation}s.
*
* @param type must not be {@literal null}.
* @param stages must not be {@literal null} or empty.
* @since 4.1
*/
public static <T> TypedAggregation<T> newAggregation(Class<T> type, AggregationStage... stages) {
return new TypedAggregation<>(type, stages);
}
/** /**
* Creates a new {@link Aggregation} from the given {@link AggregationOperation}s. * Creates a new {@link Aggregation} from the given {@link AggregationOperation}s.
* *
@ -173,6 +205,15 @@ public class Aggregation {
this(asAggregationList(aggregationOperations)); this(asAggregationList(aggregationOperations));
} }
/**
* Creates a new {@link Aggregation} from the given {@link AggregationOperation}s.
*
* @param aggregationOperations must not be {@literal null} or empty.
*/
protected Aggregation(AggregationStage... aggregationOperations) {
this(Arrays.asList(aggregationOperations));
}
/** /**
* @param aggregationOperations must not be {@literal null} or empty. * @param aggregationOperations must not be {@literal null} or empty.
* @return * @return
@ -189,7 +230,7 @@ public class Aggregation {
* *
* @param aggregationOperations must not be {@literal null} or empty. * @param aggregationOperations must not be {@literal null} or empty.
*/ */
protected Aggregation(List<AggregationOperation> aggregationOperations) { protected Aggregation(List<? extends AggregationStage> aggregationOperations) {
this(aggregationOperations, DEFAULT_OPTIONS); this(aggregationOperations, DEFAULT_OPTIONS);
} }
@ -199,7 +240,7 @@ public class Aggregation {
* @param aggregationOperations must not be {@literal null}. * @param aggregationOperations must not be {@literal null}.
* @param options must not be {@literal null} or empty. * @param options must not be {@literal null} or empty.
*/ */
protected Aggregation(List<AggregationOperation> aggregationOperations, AggregationOptions options) { protected Aggregation(List<? extends AggregationStage> aggregationOperations, AggregationOptions options) {
Assert.notNull(aggregationOperations, "AggregationOperations must not be null"); Assert.notNull(aggregationOperations, "AggregationOperations must not be null");
Assert.notNull(options, "AggregationOptions must not be null"); Assert.notNull(options, "AggregationOptions must not be null");
@ -638,6 +679,17 @@ public class Aggregation {
return facet().and(aggregationOperations); return facet().and(aggregationOperations);
} }
/**
* Creates a new {@link FacetOperationBuilder} given {@link Aggregation}.
*
* @param stages the sub-pipeline, must not be {@literal null}.
* @return new instance of {@link FacetOperation}.
* @since 4.1
*/
public static FacetOperationBuilder facet(AggregationStage... stages) {
return facet().and(stages);
}
/** /**
* Creates a new {@link LookupOperation}. * Creates a new {@link LookupOperation}.
* *
@ -668,14 +720,14 @@ public class Aggregation {
/** /**
* Entrypoint for creating {@link LookupOperation $lookup} using a fluent builder API. * Entrypoint for creating {@link LookupOperation $lookup} using a fluent builder API.
*
* <pre class="code"> * <pre class="code">
* Aggregation.lookup().from("restaurants") * Aggregation.lookup().from("restaurants").localField("restaurant_name").foreignField("name")
* .localField("restaurant_name") * .let(newVariable("orders_drink").forField("drink"))
* .foreignField("name") * .pipeline(match(ctx -> new Document("$expr", new Document("$in", List.of("$$orders_drink", "$beverages")))))
* .let(newVariable("orders_drink").forField("drink")) * .as("matches")
* .pipeline(match(ctx -> new Document("$expr", new Document("$in", List.of("$$orders_drink", "$beverages")))))
* .as("matches")
* </pre> * </pre>
*
* @return new instance of {@link LookupOperationBuilder}. * @return new instance of {@link LookupOperationBuilder}.
* @since 4.1 * @since 4.1
*/ */

21
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationOperation.java

@ -15,7 +15,6 @@
*/ */
package org.springframework.data.mongodb.core.aggregation; package org.springframework.data.mongodb.core.aggregation;
import java.util.Collections;
import java.util.List; import java.util.List;
import org.bson.Document; import org.bson.Document;
@ -30,30 +29,24 @@ import org.springframework.util.CollectionUtils;
* @author Christoph Strobl * @author Christoph Strobl
* @since 1.3 * @since 1.3
*/ */
public interface AggregationOperation { public interface AggregationOperation extends MultiOperationAggregationStage {
/** /**
* Turns the {@link AggregationOperation} into a {@link Document} by using the given
* {@link AggregationOperationContext}.
*
* @param context the {@link AggregationOperationContext} to operate within. Must not be {@literal null}. * @param context the {@link AggregationOperationContext} to operate within. Must not be {@literal null}.
* @return the Document * @return
* @deprecated since 2.2 in favor of {@link #toPipelineStages(AggregationOperationContext)}.
*/ */
@Deprecated @Override
Document toDocument(AggregationOperationContext context); Document toDocument(AggregationOperationContext context);
/** /**
* Turns the {@link AggregationOperation} into list of {@link Document stages} by using the given * More the exception than the default.
* {@link AggregationOperationContext}. This allows a single {@link AggregationOptions} to add additional stages for
* eg. {@code $sort} or {@code $limit}.
* *
* @param context the {@link AggregationOperationContext} to operate within. Must not be {@literal null}. * @param context the {@link AggregationOperationContext} to operate within. Must not be {@literal null}.
* @return the pipeline stages to run through. Never {@literal null}. * @return never {@literal null}.
* @since 2.2
*/ */
@Override
default List<Document> toPipelineStages(AggregationOperationContext context) { default List<Document> toPipelineStages(AggregationOperationContext context) {
return Collections.singletonList(toDocument(context)); return List.of(toDocument(context));
} }
/** /**

10
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationOperationRenderer.java

@ -45,15 +45,19 @@ class AggregationOperationRenderer {
* @param rootContext must not be {@literal null}. * @param rootContext must not be {@literal null}.
* @return the {@link List} of {@link Document}. * @return the {@link List} of {@link Document}.
*/ */
static List<Document> toDocument(List<AggregationOperation> operations, AggregationOperationContext rootContext) { static List<Document> toDocument(List<? extends AggregationStage> operations, AggregationOperationContext rootContext) {
List<Document> operationDocuments = new ArrayList<Document>(operations.size()); List<Document> operationDocuments = new ArrayList<Document>(operations.size());
AggregationOperationContext contextToUse = rootContext; AggregationOperationContext contextToUse = rootContext;
for (AggregationOperation operation : operations) { for (AggregationStage operation : operations) {
operationDocuments.addAll(operation.toPipelineStages(contextToUse)); if(operation instanceof MultiOperationAggregationStage mops) {
operationDocuments.addAll(mops.toPipelineStages(contextToUse));
} else {
operationDocuments.add(operation.toDocument(contextToUse));
}
if (operation instanceof FieldsExposingAggregationOperation) { if (operation instanceof FieldsExposingAggregationOperation) {

83
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationPipeline.java

@ -23,9 +23,10 @@ import java.util.function.Predicate;
import org.bson.Document; import org.bson.Document;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
/** /**
* The {@link AggregationPipeline} holds the collection of {@link AggregationOperation aggregation stages}. * The {@link AggregationPipeline} holds the collection of {@link AggregationStage aggregation stages}.
* *
* @author Christoph Strobl * @author Christoph Strobl
* @author Mark Paluch * @author Mark Paluch
@ -33,12 +34,23 @@ import org.springframework.util.Assert;
*/ */
public class AggregationPipeline { public class AggregationPipeline {
private final List<AggregationOperation> pipeline; private final List<AggregationStage> pipeline;
public static AggregationPipeline of(AggregationOperation... stages) { public static AggregationPipeline of(AggregationOperation... stages) {
return new AggregationPipeline(Arrays.asList(stages)); return new AggregationPipeline(Arrays.asList(stages));
} }
/**
* Create a new {@link AggregationPipeline} out of the given {@link AggregationStage stages}.
*
* @param stages the pipeline stages.
* @return new instance of {@link AggregationPipeline}.
* @since 4.1
*/
public static AggregationPipeline of(AggregationStage... stages) {
return new AggregationPipeline(Arrays.asList(stages));
}
/** /**
* Create an empty pipeline * Create an empty pipeline
*/ */
@ -49,12 +61,12 @@ public class AggregationPipeline {
/** /**
* Create a new pipeline with given {@link AggregationOperation stages}. * Create a new pipeline with given {@link AggregationOperation stages}.
* *
* @param aggregationOperations must not be {@literal null}. * @param aggregationStages must not be {@literal null}.
*/ */
public AggregationPipeline(List<AggregationOperation> aggregationOperations) { public AggregationPipeline(List<? extends AggregationStage> aggregationStages) {
Assert.notNull(aggregationOperations, "AggregationOperations must not be null"); Assert.notNull(aggregationStages, "AggregationStages must not be null");
pipeline = new ArrayList<>(aggregationOperations); pipeline = new ArrayList<>(aggregationStages);
} }
/** /**
@ -64,10 +76,21 @@ public class AggregationPipeline {
* @return this. * @return this.
*/ */
public AggregationPipeline add(AggregationOperation aggregationOperation) { public AggregationPipeline add(AggregationOperation aggregationOperation) {
return add((AggregationStage) aggregationOperation);
}
/**
* Append the given {@link AggregationOperation stage} to the pipeline.
*
* @param stage must not be {@literal null}.
* @return this.
* @since 4.1
*/
public AggregationPipeline add(AggregationStage stage) {
Assert.notNull(aggregationOperation, "AggregationOperation must not be null"); Assert.notNull(stage, "AggregationOperation must not be null");
pipeline.add(aggregationOperation); pipeline.add(stage);
return this; return this;
} }
@ -76,7 +99,17 @@ public class AggregationPipeline {
* *
* @return never {@literal null}. * @return never {@literal null}.
*/ */
public List<AggregationOperation> getOperations() { public List<AggregationStage> getOperations() {
return getStages();
}
/**
* Get the list of {@link AggregationOperation aggregation stages}.
*
* @return never {@literal null}.
* @since 4.1
*/
public List<AggregationStage> getStages() {
return Collections.unmodifiableList(pipeline); return Collections.unmodifiableList(pipeline);
} }
@ -95,14 +128,14 @@ public class AggregationPipeline {
return false; return false;
} }
AggregationOperation operation = pipeline.get(pipeline.size() - 1); AggregationStage operation = pipeline.get(pipeline.size() - 1);
return isOut(operation) || isMerge(operation); return isOut(operation) || isMerge(operation);
} }
void verify() { void verify() {
// check $out/$merge is the last operation if it exists // check $out/$merge is the last operation if it exists
for (AggregationOperation operation : pipeline) { for (AggregationStage operation : pipeline) {
if (isOut(operation) && !isLast(operation)) { if (isOut(operation) && !isLast(operation)) {
throw new IllegalArgumentException("The $out operator must be the last stage in the pipeline"); throw new IllegalArgumentException("The $out operator must be the last stage in the pipeline");
@ -134,13 +167,13 @@ public class AggregationPipeline {
return pipeline.isEmpty(); return pipeline.isEmpty();
} }
private boolean containsOperation(Predicate<AggregationOperation> predicate) { private boolean containsOperation(Predicate<AggregationStage> predicate) {
if (isEmpty()) { if (isEmpty()) {
return false; return false;
} }
for (AggregationOperation element : pipeline) { for (AggregationStage element : pipeline) {
if (predicate.test(element)) { if (predicate.test(element)) {
return true; return true;
} }
@ -149,19 +182,29 @@ public class AggregationPipeline {
return false; return false;
} }
private boolean isLast(AggregationOperation aggregationOperation) { private boolean isLast(AggregationStage aggregationOperation) {
return pipeline.indexOf(aggregationOperation) == pipeline.size() - 1; return pipeline.indexOf(aggregationOperation) == pipeline.size() - 1;
} }
private static boolean isUnionWith(AggregationOperation operator) { private static boolean isUnionWith(AggregationStage stage) {
return operator instanceof UnionWithOperation || operator.getOperator().equals("$unionWith"); return isSpecificStage(stage, UnionWithOperation.class, "$unionWith");
}
private static boolean isMerge(AggregationStage stage) {
return isSpecificStage(stage, MergeOperation.class, "$merge");
} }
private static boolean isMerge(AggregationOperation operator) { private static boolean isOut(AggregationStage stage) {
return operator instanceof MergeOperation || operator.getOperator().equals("$merge"); return isSpecificStage(stage, OutOperation.class, "$out");
} }
private static boolean isOut(AggregationOperation operator) { private static boolean isSpecificStage(AggregationStage stage, Class<?> type, String operator) {
return operator instanceof OutOperation || operator.getOperator().equals("$out"); if (ClassUtils.isAssignable(type, stage.getClass())) {
return true;
}
if (stage instanceof AggregationOperation operation) {
return operation.getOperator().equals(operator);
}
return stage.toDocument(Aggregation.DEFAULT_CONTEXT).keySet().iterator().next().equals(operator);
} }
} }

45
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationStage.java

@ -0,0 +1,45 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.aggregation;
import org.bson.Document;
/**
* Abstraction for a single
* <a href="https://www.mongodb.com/docs/manual/reference/operator/aggregation-pipeline/#stages">Aggregation Pipeline
* Stage</a> to be used within an {@link AggregationPipeline}.
* <p>
* An {@link AggregationStage} may operate upon domain specific types but will render to a ready to use store native
* representation within a given {@link AggregationOperationContext context}. The most straight forward way of writing a
* custom {@link AggregationStage} is just returning the raw document.
*
* <pre class="code">
* AggregationStage stage = (ctx) -> Document.parse("{ $sort : { borough : 1 } }");
* </pre>
*
* @author Christoph Strobl
* @since 4.1
*/
public interface AggregationStage {
/**
* Turns the {@link AggregationStage} into a {@link Document} by using the given {@link AggregationOperationContext}.
*
* @param context the {@link AggregationOperationContext} to operate within. Must not be {@literal null}.
* @return the ready to use {@link Document} representing the stage.
*/
Document toDocument(AggregationOperationContext context);
}

18
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/AggregationUpdate.java

@ -25,7 +25,6 @@ import java.util.StringJoiner;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.bson.Document; import org.bson.Document;
import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.SerializationUtils; import org.springframework.data.mongodb.core.query.SerializationUtils;
import org.springframework.data.mongodb.core.query.UpdateDefinition; import org.springframework.data.mongodb.core.query.UpdateDefinition;
@ -71,7 +70,8 @@ import org.springframework.util.Assert;
* *
* @author Christoph Strobl * @author Christoph Strobl
* @author Mark Paluch * @author Mark Paluch
* @see <a href="https://docs.mongodb.com/manual/reference/method/db.collection.update/#update-with-aggregation-pipeline">MongoDB * @see <a href=
* "https://docs.mongodb.com/manual/reference/method/db.collection.update/#update-with-aggregation-pipeline">MongoDB
* Reference Documentation</a> * Reference Documentation</a>
* @since 3.0 * @since 3.0
*/ */
@ -92,11 +92,11 @@ public class AggregationUpdate extends Aggregation implements UpdateDefinition {
* *
* @param pipeline must not be {@literal null}. * @param pipeline must not be {@literal null}.
*/ */
protected AggregationUpdate(List<AggregationOperation> pipeline) { protected AggregationUpdate(List<? extends AggregationStage> pipeline) {
super(pipeline); super(pipeline);
for (AggregationOperation operation : pipeline) { for (AggregationStage operation : pipeline) {
if (operation instanceof FieldsExposingAggregationOperation) { if (operation instanceof FieldsExposingAggregationOperation) {
((FieldsExposingAggregationOperation) operation).getFields().forEach(it -> { ((FieldsExposingAggregationOperation) operation).getFields().forEach(it -> {
keysTouched.add(it.getName()); keysTouched.add(it.getName());
@ -123,6 +123,16 @@ public class AggregationUpdate extends Aggregation implements UpdateDefinition {
return new AggregationUpdate(pipeline); return new AggregationUpdate(pipeline);
} }
/**
* Create a new AggregationUpdate from the given {@link AggregationStage stages}.
*
* @return new instance of {@link AggregationUpdate}.
* @since 4.1
*/
public static AggregationUpdate updateFrom(List<? extends AggregationStage> stages) {
return new AggregationUpdate(stages);
}
/** /**
* Adds new fields to documents. {@code $set} outputs documents that contain all existing fields from the input * Adds new fields to documents. {@code $set} outputs documents that contain all existing fields from the input
* documents and newly added fields. * documents and newly added fields.

37
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/FacetOperation.java

@ -78,6 +78,23 @@ public class FacetOperation implements FieldsExposingAggregationOperation {
return new FacetOperationBuilder(facets, Arrays.asList(operations)); return new FacetOperationBuilder(facets, Arrays.asList(operations));
} }
/**
* Creates a new {@link FacetOperationBuilder} to append a new facet using {@literal operations}. <br />
* {@link FacetOperationBuilder} takes a pipeline of {@link AggregationStage stages} to categorize documents into a
* single facet.
*
* @param stages must not be {@literal null} or empty.
* @return
* @since 4.1
*/
public FacetOperationBuilder and(AggregationStage... stages) {
Assert.notNull(stages, "Stages must not be null");
Assert.notEmpty(stages, "Stages must not be empty");
return new FacetOperationBuilder(facets, Arrays.asList(stages));
}
@Override @Override
public Document toDocument(AggregationOperationContext context) { public Document toDocument(AggregationOperationContext context) {
return new Document(getOperator(), facets.toDocument(context)); return new Document(getOperator(), facets.toDocument(context));
@ -102,11 +119,11 @@ public class FacetOperation implements FieldsExposingAggregationOperation {
public static class FacetOperationBuilder { public static class FacetOperationBuilder {
private final Facets current; private final Facets current;
private final List<AggregationOperation> operations; private final List<AggregationStage> operations;
private FacetOperationBuilder(Facets current, List<AggregationOperation> operations) { private FacetOperationBuilder(Facets current, List<? extends AggregationStage> operations) {
this.current = current; this.current = current;
this.operations = operations; this.operations = new ArrayList<>(operations);
} }
/** /**
@ -176,7 +193,7 @@ public class FacetOperation implements FieldsExposingAggregationOperation {
* @param operations must not be {@literal null}. * @param operations must not be {@literal null}.
* @return the new {@link Facets}. * @return the new {@link Facets}.
*/ */
Facets and(String fieldName, List<AggregationOperation> operations) { Facets and(String fieldName, List<? extends AggregationStage> operations) {
Assert.hasText(fieldName, "FieldName must not be null or empty"); Assert.hasText(fieldName, "FieldName must not be null or empty");
Assert.notNull(operations, "AggregationOperations must not be null"); Assert.notNull(operations, "AggregationOperations must not be null");
@ -197,21 +214,21 @@ public class FacetOperation implements FieldsExposingAggregationOperation {
private static class Facet { private static class Facet {
private final ExposedField exposedField; private final ExposedField exposedField;
private final List<AggregationOperation> operations; private final List<AggregationStage> stages;
/** /**
* Creates a new {@link Facet} given {@link ExposedField} and {@link AggregationOperation} pipeline. * Creates a new {@link Facet} given {@link ExposedField} and {@link AggregationOperation} pipeline.
* *
* @param exposedField must not be {@literal null}. * @param exposedField must not be {@literal null}.
* @param operations must not be {@literal null}. * @param stages must not be {@literal null}.
*/ */
Facet(ExposedField exposedField, List<AggregationOperation> operations) { Facet(ExposedField exposedField, List<? extends AggregationStage> stages) {
Assert.notNull(exposedField, "ExposedField must not be null"); Assert.notNull(exposedField, "ExposedField must not be null");
Assert.notNull(operations, "AggregationOperations must not be null"); Assert.notNull(stages, "AggregationOperations must not be null");
this.exposedField = exposedField; this.exposedField = exposedField;
this.operations = operations; this.stages = new ArrayList<>(stages);
} }
ExposedField getExposedField() { ExposedField getExposedField() {
@ -219,7 +236,7 @@ public class FacetOperation implements FieldsExposingAggregationOperation {
} }
protected List<Document> toDocuments(AggregationOperationContext context) { protected List<Document> toDocuments(AggregationOperationContext context) {
return AggregationOperationRenderer.toDocument(operations, context); return AggregationOperationRenderer.toDocument(stages, context);
} }
} }
} }

13
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/LookupOperation.java

@ -230,7 +230,7 @@ public class LookupOperation implements FieldsExposingAggregationOperation, Inhe
AsBuilder pipeline(AggregationPipeline pipeline); AsBuilder pipeline(AggregationPipeline pipeline);
/** /**
* Specifies the {@link AggregationPipeline#getOperations() stages} that determine the resulting documents. * Specifies the {@link AggregationPipeline#getStages() stages} that determine the resulting documents.
* *
* @param stages must not be {@literal null} can be empty. * @param stages must not be {@literal null} can be empty.
* @return never {@literal null}. * @return never {@literal null}.
@ -239,6 +239,17 @@ public class LookupOperation implements FieldsExposingAggregationOperation, Inhe
return pipeline(AggregationPipeline.of(stages)); return pipeline(AggregationPipeline.of(stages));
} }
/**
* Specifies the {@link AggregationPipeline#getStages() stages} that determine the resulting documents.
*
* @param stages must not be {@literal null} can be empty.
* @return never {@literal null}.
* @since 4.1
*/
default AsBuilder pipeline(AggregationStage... stages) {
return pipeline(AggregationPipeline.of(stages));
}
/** /**
* @param name the name of the new array field to add to the input documents, must not be {@literal null} or empty. * @param name the name of the new array field to add to the input documents, must not be {@literal null} or empty.
* @return new instance of {@link LookupOperation}. * @return new instance of {@link LookupOperation}.

103
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/MultiOperationAggregationStage.java

@ -0,0 +1,103 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.aggregation;
import java.util.List;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import org.bson.Document;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
/**
* An {@link AggregationStage} that may consist of a main operation and potential follow up stages for eg. {@code $sort}
* or {@code $limit}.
* <p>
* The {@link MultiOperationAggregationStage} may operate upon domain specific types but will render to the store native
* representation within a given {@link AggregationOperationContext context}.
* <p>
* {@link #toDocument(AggregationOperationContext)} will render a synthetic {@link Document} that contains the ordered
* stages. The list returned from {@link #toPipelineStages(AggregationOperationContext)}
*
* <pre class="code">
* [
* { $match: { $text: { $search: "operating" } } },
* { $sort: { score: { $meta: "textScore" }, posts: -1 } }
* ]
* </pre>
*
* will be represented as
*
* <pre class="code">
* {
* $match: { $text: { $search: "operating" } },
* $sort: { score: { $meta: "textScore" }, posts: -1 }
* }
* </pre>
*
* In case stages appear multiple times the order no longer can be guaranteed when calling
* {@link #toDocument(AggregationOperationContext)}, so consumers of the API should rely on
* {@link #toPipelineStages(AggregationOperationContext)}. Nevertheless, by default the values will be collected into a
* list rendering to
*
* <pre class="code">
* {
* $match: [{ $text: { $search: "operating" } }, { $text: ... }],
* $sort: { score: { $meta: "textScore" }, posts: -1 }
* }
* </pre>
*
* @author Christoph Strobl
* @since 4.1
*/
public interface MultiOperationAggregationStage extends AggregationStage {
/**
* Returns a synthetic {@link Document stage} that contains the {@link #toPipelineStages(AggregationOperationContext)
* actual stages} by folding them into a single {@link Document}. In case of colliding entries, those used multiple
* times thus having the same key, the entries will be held as a list for the given operator.
*
* @param context the {@link AggregationOperationContext} to operate within. Must not be {@literal null}.
* @return never {@literal null}.
*/
@Override
default Document toDocument(AggregationOperationContext context) {
List<Document> documents = toPipelineStages(context);
if (documents.size() == 1) {
return documents.get(0);
}
MultiValueMap<String, Document> stages = new LinkedMultiValueMap<>(documents.size());
for (Document current : documents) {
String key = current.keySet().iterator().next();
stages.add(key, current.get(key, Document.class));
}
return new Document(stages.entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, v -> v.getValue().size() == 1 ? v.getValue().get(0) : v.getValue())));
}
/**
* Turns the {@link MultiOperationAggregationStage} into list of {@link Document stages} by using the given
* {@link AggregationOperationContext}. This allows an {@link AggregationStage} to add follow up stages for eg.
* {@code $sort} or {@code $limit}.
*
* @param context the {@link AggregationOperationContext} to operate within. Must not be {@literal null}.
* @return the pipeline stages to run through. Never {@literal null}.
*/
List<Document> toPipelineStages(AggregationOperationContext context);
}

19
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/TypedAggregation.java

@ -15,6 +15,7 @@
*/ */
package org.springframework.data.mongodb.core.aggregation; package org.springframework.data.mongodb.core.aggregation;
import java.util.Arrays;
import java.util.List; import java.util.List;
import org.springframework.util.Assert; import org.springframework.util.Assert;
@ -24,6 +25,7 @@ import org.springframework.util.Assert;
* *
* @author Thomas Darimont * @author Thomas Darimont
* @author Oliver Gierke * @author Oliver Gierke
* @author Christoph Strobl
*/ */
public class TypedAggregation<I> extends Aggregation { public class TypedAggregation<I> extends Aggregation {
@ -39,13 +41,24 @@ public class TypedAggregation<I> extends Aggregation {
this(inputType, asAggregationList(operations)); this(inputType, asAggregationList(operations));
} }
/**
* Creates a new {@link TypedAggregation} from the given {@link AggregationStage stages}.
*
* @param inputType must not be {@literal null}.
* @param stages must not be {@literal null} or empty.
* @since 4.1
*/
public TypedAggregation(Class<I> inputType, AggregationStage... stages) {
this(inputType, Arrays.asList(stages));
}
/** /**
* Creates a new {@link TypedAggregation} from the given {@link AggregationOperation}s. * Creates a new {@link TypedAggregation} from the given {@link AggregationOperation}s.
* *
* @param inputType must not be {@literal null}. * @param inputType must not be {@literal null}.
* @param operations must not be {@literal null} or empty. * @param operations must not be {@literal null} or empty.
*/ */
public TypedAggregation(Class<I> inputType, List<AggregationOperation> operations) { public TypedAggregation(Class<I> inputType, List<? extends AggregationStage> operations) {
this(inputType, operations, DEFAULT_OPTIONS); this(inputType, operations, DEFAULT_OPTIONS);
} }
@ -57,7 +70,7 @@ public class TypedAggregation<I> extends Aggregation {
* @param operations must not be {@literal null} or empty. * @param operations must not be {@literal null} or empty.
* @param options must not be {@literal null}. * @param options must not be {@literal null}.
*/ */
public TypedAggregation(Class<I> inputType, List<AggregationOperation> operations, AggregationOptions options) { public TypedAggregation(Class<I> inputType, List<? extends AggregationStage> operations, AggregationOptions options) {
super(operations, options); super(operations, options);
@ -77,6 +90,6 @@ public class TypedAggregation<I> extends Aggregation {
public TypedAggregation<I> withOptions(AggregationOptions options) { public TypedAggregation<I> withOptions(AggregationOptions options) {
Assert.notNull(options, "AggregationOptions must not be null"); Assert.notNull(options, "AggregationOptions must not be null");
return new TypedAggregation<I>(inputType, pipeline.getOperations(), options); return new TypedAggregation<>(inputType, pipeline.getStages(), options);
} }
} }

13
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AggregationUtils.java

@ -27,6 +27,7 @@ import org.springframework.data.domain.Sort.Order;
import org.springframework.data.mongodb.core.aggregation.Aggregation; import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperation; import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions; import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
import org.springframework.data.mongodb.core.aggregation.AggregationStage;
import org.springframework.data.mongodb.core.convert.MongoConverter; import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.query.Collation; import org.springframework.data.mongodb.core.query.Collation;
import org.springframework.data.mongodb.core.query.Meta; import org.springframework.data.mongodb.core.query.Meta;
@ -109,14 +110,14 @@ abstract class AggregationUtils {
* @param accessor * @param accessor
* @param targetType * @param targetType
*/ */
static void appendSortIfPresent(List<AggregationOperation> aggregationPipeline, ConvertingParameterAccessor accessor, static void appendSortIfPresent(List<? extends AggregationStage> aggregationPipeline, ConvertingParameterAccessor accessor,
Class<?> targetType) { Class<?> targetType) {
if (accessor.getSort().isUnsorted()) { if (accessor.getSort().isUnsorted()) {
return; return;
} }
aggregationPipeline.add(ctx -> { ((List<AggregationStage>) aggregationPipeline).add(ctx -> {
Document sort = new Document(); Document sort = new Document();
for (Order order : accessor.getSort()) { for (Order order : accessor.getSort()) {
@ -134,7 +135,7 @@ abstract class AggregationUtils {
* @param aggregationPipeline * @param aggregationPipeline
* @param accessor * @param accessor
*/ */
static void appendLimitAndOffsetIfPresent(List<AggregationOperation> aggregationPipeline, static void appendLimitAndOffsetIfPresent(List<? extends AggregationStage> aggregationPipeline,
ConvertingParameterAccessor accessor) { ConvertingParameterAccessor accessor) {
appendLimitAndOffsetIfPresent(aggregationPipeline, accessor, LongUnaryOperator.identity(), appendLimitAndOffsetIfPresent(aggregationPipeline, accessor, LongUnaryOperator.identity(),
IntUnaryOperator.identity()); IntUnaryOperator.identity());
@ -150,7 +151,7 @@ abstract class AggregationUtils {
* @param limitOperator * @param limitOperator
* @since 3.3 * @since 3.3
*/ */
static void appendLimitAndOffsetIfPresent(List<AggregationOperation> aggregationPipeline, static void appendLimitAndOffsetIfPresent(List<? extends AggregationStage> aggregationPipeline,
ConvertingParameterAccessor accessor, LongUnaryOperator offsetOperator, IntUnaryOperator limitOperator) { ConvertingParameterAccessor accessor, LongUnaryOperator offsetOperator, IntUnaryOperator limitOperator) {
Pageable pageable = accessor.getPageable(); Pageable pageable = accessor.getPageable();
@ -159,10 +160,10 @@ abstract class AggregationUtils {
} }
if (pageable.getOffset() > 0) { if (pageable.getOffset() > 0) {
aggregationPipeline.add(Aggregation.skip(offsetOperator.applyAsLong(pageable.getOffset()))); ((List<AggregationStage>) aggregationPipeline).add(Aggregation.skip(offsetOperator.applyAsLong(pageable.getOffset())));
} }
aggregationPipeline.add(Aggregation.limit(limitOperator.applyAsInt(pageable.getPageSize()))); ((List<AggregationStage>) aggregationPipeline).add(Aggregation.limit(limitOperator.applyAsInt(pageable.getPageSize())));
} }
/** /**

68
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/MultiOperationAggregationStageUnitTests.java

@ -0,0 +1,68 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.aggregation;
import static org.springframework.data.mongodb.test.util.Assertions.*;
import java.util.List;
import org.bson.Document;
import org.junit.jupiter.api.Test;
/**
* @author Christoph Strobl
*/
class MultiOperationAggregationStageUnitTests {
@Test // GH-4306
void toDocumentRendersSingleOperation() {
MultiOperationAggregationStage stage = (ctx) -> List.of(Document.parse("{ $text: { $search: 'operating' } }"));
assertThat(stage.toDocument(Aggregation.DEFAULT_CONTEXT)).isEqualTo("{ $text: { $search: 'operating' } }");
}
@Test // GH-4306
void toDocumentRendersMultiOperation() {
MultiOperationAggregationStage stage = (ctx) -> List.of(Document.parse("{ $text: { $search: 'operating' } }"),
Document.parse("{ $sort: { score: { $meta: 'textScore' } } }"));
assertThat(stage.toDocument(Aggregation.DEFAULT_CONTEXT)).isEqualTo("""
{
$text: { $search: 'operating' },
$sort: { score: { $meta: 'textScore' } }
}
""");
}
@Test // GH-4306
void toDocumentCollectsDuplicateOperation() {
MultiOperationAggregationStage stage = (ctx) -> List.of(Document.parse("{ $text: { $search: 'operating' } }"),
Document.parse("{ $sort: { score: { $meta: 'textScore' } } }"), Document.parse("{ $sort: { posts: -1 } }"));
assertThat(stage.toDocument(Aggregation.DEFAULT_CONTEXT)).isEqualTo("""
{
$text: { $search: 'operating' },
$sort: [
{ score: { $meta: 'textScore' } },
{ posts: -1 }
]
}
""");
}
}
Loading…
Cancel
Save