From 16a35e032919514383336af34ffd7b1ce3d3cbe0 Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Mon, 26 Sep 2022 14:42:24 +0200 Subject: [PATCH] Add support for $densify aggregation stage. See #4139 Original pull request: #4182. --- .../core/aggregation/DensifyOperation.java | 381 ++++++++++++++++++ .../DensifyOperationUnitTests.java | 146 +++++++ 2 files changed, 527 insertions(+) create mode 100644 spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/DensifyOperation.java create mode 100644 spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/DensifyOperationUnitTests.java diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/DensifyOperation.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/DensifyOperation.java new file mode 100644 index 000000000..253db727c --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/DensifyOperation.java @@ -0,0 +1,381 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.core.aggregation; + +import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import org.bson.Document; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; +import org.springframework.util.ObjectUtils; + +/*** + * Encapsulates the aggregation framework {@code $densify}-operation. + * + * @author Christoph Strobl + * @since 4.0 + */ +public class DensifyOperation implements AggregationOperation { + + private @Nullable Field field; + private @Nullable List partitionBy; + private @Nullable Range range; + + protected DensifyOperation(@Nullable Field field, @Nullable List partitionBy, @Nullable Range range) { + + this.field = field; + this.partitionBy = partitionBy; + this.range = range; + } + + /** + * Obtain a builder to create the {@link DensifyOperation}. + * + * @return new instance of {@link DensifyOperationBuilder}. + */ + public static DensifyOperationBuilder builder() { + return new DensifyOperationBuilder(); + } + + @Override + public Document toDocument(AggregationOperationContext context) { + + Document densify = new Document(); + densify.put("field", context.getReference(field).getRaw()); + if (!ObjectUtils.isEmpty(partitionBy)) { + densify.put("partitionByFields", partitionBy.stream().map(it -> { + if (it instanceof Field field) { + return context.getReference(field).getRaw(); + } + if (it instanceof AggregationExpression expression) { + return expression.toDocument(context); + } + return it; + }).collect(Collectors.toList())); + } + densify.put("range", range.toDocument(context)); + return new Document("$densify", densify); + } + + /** + * The {@link Range} specifies how the data is densified. + */ + public interface Range { + + /** + * Add documents spanning the range of values within the given lower (inclusive) and upper (exclusive) bound. + * + * @param lower must not be {@literal null}. + * @param upper must not be {@literal null}. + * @return new instance of {@link DensifyRange}. + */ + static DensifyRange bounded(Object lower, Object upper) { + return new BoundedRange(lower, upper, DensifyUnits.NONE); + } + + /** + * Add documents spanning the full value range. + * + * @return new instance of {@link DensifyRange}. + */ + static DensifyRange full() { + + return new DensifyRange(DensifyUnits.NONE) { + + @Override + Object getBounds(AggregationOperationContext ctx) { + return "full"; + } + }; + } + + /** + * Add documents spanning the full value range for each partition. + * + * @return new instance of {@link DensifyRange}. + */ + static DensifyRange partition() { + return new DensifyRange(DensifyUnits.NONE) { + + @Override + Object getBounds(AggregationOperationContext ctx) { + return "partition"; + } + }; + } + + /** + * Obtain the document representation of the window in a default {@link AggregationOperationContext context}. + * + * @return never {@literal null}. + */ + default Document toDocument() { + return toDocument(Aggregation.DEFAULT_CONTEXT); + } + + /** + * Obtain the document representation of the window in the given {@link AggregationOperationContext context}. + * + * @return never {@literal null}. + */ + Document toDocument(AggregationOperationContext ctx); + } + + /** + * Base {@link Range} implementation. + * + * @author Christoph Strobl + */ + public static abstract class DensifyRange implements Range { + + private @Nullable DensifyUnit unit; + private Number step; + + public DensifyRange(DensifyUnit unit) { + this.unit = unit; + } + + @Override + public Document toDocument(AggregationOperationContext ctx) { + + Document range = new Document("step", step); + if (unit != null && !DensifyUnits.NONE.equals(unit)) { + range.put("unit", unit.name().toLowerCase(Locale.US)); + } + range.put("bounds", getBounds(ctx)); + return range; + } + + /** + * Set the increment for the value. + * + * @param step must not be {@literal null}. + * @return this. + */ + public DensifyRange incrementBy(Number step) { + this.step = step; + return this; + } + + /** + * Set the increment for the value. + * + * @param step must not be {@literal null}. + * @return this. + */ + public DensifyRange incrementBy(Number step, DensifyUnit unit) { + this.step = step; + return unit(unit); + } + + /** + * Set the {@link DensifyUnit unit} for the step field. + * + * @param unit + * @return this. + */ + public DensifyRange unit(DensifyUnit unit) { + + this.unit = unit; + return this; + } + + abstract Object getBounds(AggregationOperationContext ctx); + } + + /** + * {@link Range} implementation holding lower and upper bound values. + * + * @author Christoph Strobl + */ + public static class BoundedRange extends DensifyRange { + + private List bounds; + + protected BoundedRange(Object lower, Object upper, DensifyUnit unit) { + + super(unit); + this.bounds = Arrays.asList(lower, upper); + } + + @Override + List getBounds(AggregationOperationContext ctx) { + return bounds.stream().map(it -> { + if (it instanceof AggregationExpression expression) { + return expression.toDocument(ctx); + } + return it; + }).collect(Collectors.toList()); + } + } + + /** + * The actual time unit to apply to a {@link Range}. + */ + public interface DensifyUnit { + + String name(); + + /** + * Converts the given time unit into a {@link DensifyUnit}. Supported units are: days, hours, minutes, seconds, and + * milliseconds. + * + * @param timeUnit the time unit to convert, must not be {@literal null}. + * @return + * @throws IllegalArgumentException if the {@link TimeUnit} is {@literal null} or not supported for conversion. + */ + static DensifyUnit from(TimeUnit timeUnit) { + + Assert.notNull(timeUnit, "TimeUnit must not be null"); + + switch (timeUnit) { + case DAYS: + return DensifyUnits.DAY; + case HOURS: + return DensifyUnits.HOUR; + case MINUTES: + return DensifyUnits.MINUTE; + case SECONDS: + return DensifyUnits.SECOND; + case MILLISECONDS: + return DensifyUnits.MILLISECOND; + } + + throw new IllegalArgumentException(String.format("Cannot create DensifyUnit from %s", timeUnit)); + } + + /** + * Converts the given chrono unit into a {@link DensifyUnit}. Supported units are: years, weeks, months, days, + * hours, minutes, seconds, and millis. + * + * @param chronoUnit the chrono unit to convert, must not be {@literal null}. + * @return + * @throws IllegalArgumentException if the {@link TimeUnit} is {@literal null} or not supported for conversion. + */ + static DensifyUnits from(ChronoUnit chronoUnit) { + + switch (chronoUnit) { + case YEARS: + return DensifyUnits.YEAR; + case WEEKS: + return DensifyUnits.WEEK; + case MONTHS: + return DensifyUnits.MONTH; + case DAYS: + return DensifyUnits.DAY; + case HOURS: + return DensifyUnits.HOUR; + case MINUTES: + return DensifyUnits.MINUTE; + case SECONDS: + return DensifyUnits.SECOND; + case MILLIS: + return DensifyUnits.MILLISECOND; + } + + throw new IllegalArgumentException(String.format("Cannot create DensifyUnit from %s", chronoUnit)); + } + } + + /** + * Quick access to available {@link DensifyUnit units}. + */ + public enum DensifyUnits implements DensifyUnit { + NONE, YEAR, QUARTER, MONTH, WEEK, DAY, HOUR, MINUTE, SECOND, MILLISECOND + } + + public static class DensifyOperationBuilder { + + DensifyOperation target; + + public DensifyOperationBuilder() { + this.target = new DensifyOperation(null, Collections.emptyList(), null); + } + + /** + * Set the field to densify. + * + * @param fieldname must not be {@literal null}. + * @return this. + */ + public DensifyOperationBuilder densify(String fieldname) { + this.target.field = Fields.field(fieldname); + return this; + } + + /** + * Set the fields used for grouping documents. + * + * @param fields must not be {@literal null}. + * @return this. + */ + public DensifyOperationBuilder partitionBy(String... fields) { + target.partitionBy = Fields.fields(fields).asList(); + return this; + } + + /** + * Set the operational range. + * + * @param range must not be {@literal null}. + * @return this. + */ + public DensifyOperationBuilder range(Range range) { + + target.range = range; + return this; + } + + /** + * Operate on full range. + * + * @param consumer + * @return this. + */ + public DensifyOperationBuilder fullRange(Consumer consumer) { + + DensifyRange range = Range.full(); + consumer.accept(range); + + return range(range); + } + + /** + * Operate on full range. + * + * @param consumer + * @return this. + */ + public DensifyOperationBuilder partitionRange(Consumer consumer) { + + DensifyRange range = Range.partition(); + consumer.accept(range); + + return range(range); + } + + DensifyOperation build() { + return new DensifyOperation(target.field, target.partitionBy, target.range); + } + } +} diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/DensifyOperationUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/DensifyOperationUnitTests.java new file mode 100644 index 000000000..60284074c --- /dev/null +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/DensifyOperationUnitTests.java @@ -0,0 +1,146 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.core.aggregation; + +import static org.springframework.data.mongodb.test.util.Assertions.*; + +import java.util.Date; + +import org.junit.jupiter.api.Test; +import org.springframework.data.mongodb.core.aggregation.DensifyOperation.DensifyUnits; +import org.springframework.data.mongodb.core.aggregation.DensifyOperation.Range; +import org.springframework.data.mongodb.core.convert.MappingMongoConverter; +import org.springframework.data.mongodb.core.convert.NoOpDbRefResolver; +import org.springframework.data.mongodb.core.convert.QueryMapper; +import org.springframework.data.mongodb.core.mapping.Field; +import org.springframework.data.mongodb.core.mapping.MongoMappingContext; +import org.springframework.lang.Nullable; + +/** + * @author Christoph Strobl + */ +class DensifyOperationUnitTests { + + @Test // GH-4139 + void rendersFieldNamesAsIsForUntypedContext() { + + DensifyOperation densify = DensifyOperation.builder().densify("ts") + .range(Range.bounded("2021-05-18T00:00:00", "2021-05-18T08:00:00").incrementBy(1).unit(DensifyUnits.HOUR)).build(); + + assertThat(densify.toDocument(contextFor(null))).isEqualTo(""" + { + $densify: { + field: "ts", + range: { + step: 1, + unit: "hour", + bounds:[ "2021-05-18T00:00:00", "2021-05-18T08:00:00" ] + } + } + } + """); + } + + @Test // GH-4139 + void rendersFieldNamesCorrectly() { + + DensifyOperation densify = DensifyOperation.builder().densify("ts") + .range(Range.bounded("2021-05-18T00:00:00", "2021-05-18T08:00:00").incrementBy(1).unit(DensifyUnits.HOUR)).build(); + + assertThat(densify.toDocument(contextFor(Weather.class))).isEqualTo(""" + { + $densify: { + field: "timestamp", + range: { + step: 1, + unit: "hour", + bounds:[ "2021-05-18T00:00:00", "2021-05-18T08:00:00" ] + } + } + } + """); + } + + @Test // GH-4139 + void rendersPartitonNamesCorrectly() { + + DensifyOperation densify = DensifyOperation.builder().densify("alt").partitionBy("var") + .fullRange(range -> range.incrementBy(200)).build(); + + assertThat(densify.toDocument(contextFor(Coffee.class))).isEqualTo(""" + { + $densify: { + field: "altitude", + partitionByFields : [ "variety" ], + range: { + step: 200, + bounds: "full" + } + } + } + """); + } + + @Test // GH-4139 + void rendersPartitonRangeCorrectly() { + + DensifyOperation densify = DensifyOperation.builder().densify("alt").partitionBy("var") + .partitionRange(range -> range.incrementBy(200)).build(); + + assertThat(densify.toDocument(contextFor(Coffee.class))).isEqualTo(""" + { + $densify: { + field: "altitude", + partitionByFields : [ "variety" ], + range: { + step: 200, + bounds: "partition" + } + } + } + """); + } + + private static AggregationOperationContext contextFor(@Nullable Class type) { + + if (type == null) { + return Aggregation.DEFAULT_CONTEXT; + } + + MappingMongoConverter mongoConverter = new MappingMongoConverter(NoOpDbRefResolver.INSTANCE, + new MongoMappingContext()); + mongoConverter.afterPropertiesSet(); + + return new TypeBasedAggregationOperationContext(type, mongoConverter.getMappingContext(), + new QueryMapper(mongoConverter)); + } + + class Weather { + + @Field("timestamp") Date ts; + + @Field("temp") Long temperature; + } + + class Coffee { + + @Field("altitude") Long alt; + + @Field("variety") String var; + + Float score; + } +}