From d3976f51992d2c3a1c78f3b64ee5aa6c0304a968 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Wed, 27 Jun 2018 15:43:10 +0200 Subject: [PATCH] DATAMONGO-1311 - Add configuration options for query batch size. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We now allow configuration of the find cursor/find publisher batch sizes using Query.cursorBatchSize(…). Configuring the batch size gives users more fine grained control over the fetch behavior especially in reactive usage scenarios as the batch size defaults in FindPublisher to the remaining demand. This can cause several roundtrips in cases the remaining demand is small and the emitted elements are dropped rapidly (e.g. using filter(…)). On the repository level @Meta allows now configuration of the cursor batch size for derived finder methods. interface PersonRepository extends Repository { @Meta(cursorBatchSize = 100) Stream findAllByLastname(String lastname); } Original Pull Request: #575 --- .../data/mongodb/core/MongoTemplate.java | 21 +++++++++------ .../mongodb/core/ReactiveMongoTemplate.java | 17 +++++++++--- .../data/mongodb/core/query/Meta.java | 22 +++++++++++++++- .../data/mongodb/core/query/Query.java | 12 +++++++++ .../data/mongodb/repository/Meta.java | 8 ++++++ .../repository/query/MongoQueryMethod.java | 4 +++ .../mongodb/core/MongoTemplateUnitTests.java | 17 +++++++++--- .../core/ReactiveMongoTemplateUnitTests.java | 19 ++++++++++++++ .../ReactiveAggregationUnitTests.java | 26 ++++++++++++------- .../query/MongoQueryMethodUnitTests.java | 12 +++++++++ 10 files changed, 132 insertions(+), 26 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java index f0ac8e53a..6e561d59d 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java @@ -3279,8 +3279,9 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, return cursor; } + Meta meta = query.getMeta(); if (query.getSkip() <= 0 && query.getLimit() <= 0 && ObjectUtils.isEmpty(query.getSortObject()) - && !StringUtils.hasText(query.getHint()) && !query.getMeta().hasValues() + && !StringUtils.hasText(query.getHint()) && !meta.hasValues() && !query.getCollation().isPresent()) { return cursor; } @@ -3301,18 +3302,18 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, cursorToUse = cursorToUse.sort(sort); } - Document meta = new Document(); + Document metaDocument = new Document(); if (StringUtils.hasText(query.getHint())) { - meta.put("$hint", query.getHint()); + metaDocument.put("$hint", query.getHint()); } - if (query.getMeta().hasValues()) { + if (meta.hasValues()) { - for (Entry entry : query.getMeta().values()) { - meta.put(entry.getKey(), entry.getValue()); + for (Entry entry : meta.values()) { + metaDocument.put(entry.getKey(), entry.getValue()); } - for (Meta.CursorOption option : query.getMeta().getFlags()) { + for (Meta.CursorOption option : meta.getFlags()) { switch (option) { @@ -3326,9 +3327,13 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, throw new IllegalArgumentException(String.format("%s is no supported flag.", option)); } } + + if (meta.getCursorBatchSize() != null) { + cursorToUse = cursorToUse.batchSize(meta.getCursorBatchSize()); + } } - cursorToUse = cursorToUse.modifiers(meta); + cursorToUse = cursorToUse.modifiers(metaDocument); } catch (RuntimeException e) { throw potentiallyConvertRuntimeException(e, exceptionTranslator); } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java index 2a5ba2a8a..99948eef4 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java @@ -98,6 +98,7 @@ import org.springframework.data.mongodb.core.mapreduce.MapReduceOptions; import org.springframework.data.mongodb.core.query.Collation; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.CriteriaDefinition; +import org.springframework.data.mongodb.core.query.Meta; import org.springframework.data.mongodb.core.query.NearQuery; import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Update; @@ -971,7 +972,6 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati List pipeline = aggregationUtil.createPipeline(aggregation, rootContext); Assert.isTrue(!options.isExplain(), "Cannot use explain option with streaming!"); - Assert.isNull(options.getCursorBatchSize(), "Cannot use batchSize cursor option with streaming!"); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Streaming aggregation: {} in collection {}", serializeToJsonSafely(pipeline), collectionName); @@ -987,6 +987,10 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati AggregatePublisher cursor = collection.aggregate(pipeline, Document.class) .allowDiskUse(options.isAllowDiskUse()); + if (options.getCursorBatchSize() != null) { + cursor = cursor.batchSize(options.getCursorBatchSize()); + } + if (options.getCollation().isPresent()) { cursor = cursor.collation(options.getCollation().map(Collation::toMongoCollation).get()); } @@ -3216,8 +3220,9 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati findPublisherToUse = query.getCollation().map(Collation::toMongoCollation).map(findPublisher::collation) .orElse(findPublisher); + Meta meta = query.getMeta(); if (query.getSkip() <= 0 && query.getLimit() <= 0 && ObjectUtils.isEmpty(query.getSortObject()) - && !StringUtils.hasText(query.getHint()) && !query.getMeta().hasValues()) { + && !StringUtils.hasText(query.getHint()) && !meta.hasValues()) { return findPublisherToUse; } @@ -3238,10 +3243,14 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati modifiers.append("$hint", query.getHint()); } - if (query.getMeta().hasValues()) { - for (Entry entry : query.getMeta().values()) { + if (meta.hasValues()) { + for (Entry entry : meta.values()) { modifiers.append(entry.getKey(), entry.getValue()); } + + if (meta.getCursorBatchSize() != null) { + findPublisherToUse = findPublisherToUse.batchSize(meta.getCursorBatchSize()); + } } if (!modifiers.isEmpty()) { diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/query/Meta.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/query/Meta.java index a9b4ba486..bb49e83e9 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/query/Meta.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/query/Meta.java @@ -50,6 +50,7 @@ public class Meta { private final Map values = new LinkedHashMap(2); private final Set flags = new LinkedHashSet(); + private Integer cursorBatchSize; /** * @return {@literal null} if not set. @@ -128,6 +129,25 @@ public class Meta { return getValue(MetaKey.SNAPSHOT.key, false); } + /** + * @return {@literal null} if not set. + * @since 2.1 + */ + @Nullable + public Integer getCursorBatchSize() { + return cursorBatchSize; + } + + /** + * Apply the batch size for a query. + * + * @param cursorBatchSize + * @since 2.1 + */ + public void setCursorBatchSize(int cursorBatchSize) { + this.cursorBatchSize = cursorBatchSize; + } + /** * Add {@link CursorOption} influencing behavior of the {@link com.mongodb.DBCursor}. * @@ -153,7 +173,7 @@ public class Meta { * @return */ public boolean hasValues() { - return !this.values.isEmpty() || !this.flags.isEmpty(); + return !this.values.isEmpty() || !this.flags.isEmpty() || this.cursorBatchSize != null; } /** diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/query/Query.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/query/Query.java index af8a61613..d4b18b123 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/query/Query.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/query/Query.java @@ -347,6 +347,18 @@ public class Query { return this; } + /** + * @param batchSize + * @return + * @see Meta#setCursorBatchSize(int) + * @since 2.1 + */ + public Query cursorBatchSize(int batchSize) { + + meta.setCursorBatchSize(batchSize); + return this; + } + /** * @return * @see org.springframework.data.mongodb.core.query.Meta.CursorOption#NO_TIMEOUT diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/Meta.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/Meta.java index c350151b9..acfabe827 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/Meta.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/Meta.java @@ -50,6 +50,14 @@ public @interface Meta { */ long maxScanDocuments() default -1; + /** + * Sets the number of documents to return per batch. + * + * @return + * @since 2.1 + */ + int cursorBatchSize() default -1; + /** * Add a comment to the query. * diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/MongoQueryMethod.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/MongoQueryMethod.java index eb61f3c85..bc6375e82 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/MongoQueryMethod.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/MongoQueryMethod.java @@ -274,6 +274,10 @@ public class MongoQueryMethod extends QueryMethod { metaAttributes.setMaxScan(meta.maxScanDocuments()); } + if (meta.cursorBatchSize() > 0) { + metaAttributes.setCursorBatchSize(meta.cursorBatchSize()); + } + if (StringUtils.hasText(meta.comment())) { metaAttributes.setComment(meta.comment()); } diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateUnitTests.java index 8bdaf46ff..b20e558b6 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateUnitTests.java @@ -22,7 +22,6 @@ import static org.mockito.Mockito.any; import static org.springframework.data.mongodb.core.aggregation.Aggregation.*; import static org.springframework.data.mongodb.test.util.IsBsonObject.*; -import com.mongodb.client.model.ReplaceOptions; import lombok.Data; import java.math.BigInteger; @@ -93,6 +92,7 @@ import com.mongodb.client.model.CountOptions; import com.mongodb.client.model.DeleteOptions; import com.mongodb.client.model.FindOneAndDeleteOptions; import com.mongodb.client.model.FindOneAndUpdateOptions; +import com.mongodb.client.model.ReplaceOptions; import com.mongodb.client.model.UpdateOptions; import com.mongodb.client.result.UpdateResult; @@ -161,7 +161,7 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests { new MongoTemplate(mongo, null); } - @Test(expected = IllegalArgumentException.class) // DATAMONGO-1968 + @Test(expected = IllegalArgumentException.class) // DATAMONGO-1968 public void rejectsNullMongo() { new MongoTemplate((MongoClient) null, "database"); } @@ -653,6 +653,17 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests { assertThat(updateCaptor.getValue(), isBsonObject().containing("$set.jon", "snow").notContaining("$isolated")); } + @Test // DATAMONGO-1311 + public void executeQueryShouldUseBatchSizeWhenPresent() { + + when(findIterable.batchSize(anyInt())).thenReturn(findIterable); + + Query query = new Query().cursorBatchSize(1234); + template.find(query, Person.class); + + verify(findIterable).batchSize(1234); + } + @Test // DATAMONGO-1518 public void executeQueryShouldUseCollationWhenPresent() { @@ -969,7 +980,7 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests { return template; } - /* + /* * (non-Javadoc) * @see org.springframework.data.mongodb.core.core.MongoOperationsUnitTests#getOperations() */ diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java index fd44ad563..7f4f4e40a 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java @@ -22,7 +22,9 @@ import static org.mockito.Mockito.any; import static org.springframework.data.mongodb.core.aggregation.Aggregation.*; import com.mongodb.client.model.ReplaceOptions; +import com.mongodb.reactivestreams.client.AggregatePublisher; import lombok.Data; +import org.springframework.data.mongodb.core.query.Query; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -79,6 +81,7 @@ public class ReactiveMongoTemplateUnitTests { @Mock MongoDatabase db; @Mock MongoCollection collection; @Mock FindPublisher findPublisher; + @Mock AggregatePublisher aggregatePublisher; @Mock Publisher runCommandPublisher; MongoExceptionTranslator exceptionTranslator = new MongoExceptionTranslator(); @@ -95,10 +98,15 @@ public class ReactiveMongoTemplateUnitTests { when(db.runCommand(any(), any(Class.class))).thenReturn(runCommandPublisher); when(collection.find(any(Class.class))).thenReturn(findPublisher); when(collection.find(any(Document.class), any(Class.class))).thenReturn(findPublisher); + when(collection.aggregate(anyList())).thenReturn(aggregatePublisher); + when(collection.aggregate(anyList(), any(Class.class))).thenReturn(aggregatePublisher); when(findPublisher.projection(any())).thenReturn(findPublisher); when(findPublisher.limit(anyInt())).thenReturn(findPublisher); when(findPublisher.collation(any())).thenReturn(findPublisher); when(findPublisher.first()).thenReturn(findPublisher); + when(aggregatePublisher.allowDiskUse(anyBoolean())).thenReturn(aggregatePublisher); + when(aggregatePublisher.collation(any())).thenReturn(aggregatePublisher); + when(aggregatePublisher.first()).thenReturn(findPublisher); this.mappingContext = new MongoMappingContext(); this.converter = new MappingMongoConverter(new NoOpDbRefResolver(), mappingContext); @@ -136,6 +144,17 @@ public class ReactiveMongoTemplateUnitTests { }).verifyComplete(); } + @Test // DATAMONGO-1311 + public void executeQueryShouldUseBatchSizeWhenPresent() { + + when(findPublisher.batchSize(anyInt())).thenReturn(findPublisher); + + Query query = new Query().cursorBatchSize(1234); + template.find(query, Person.class).subscribe(); + + verify(findPublisher).batchSize(1234); + } + @Test // DATAMONGO-1518 public void findShouldUseCollationWhenPresent() { diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/ReactiveAggregationUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/ReactiveAggregationUnitTests.java index d751f1320..9b83ef1be 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/ReactiveAggregationUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/ReactiveAggregationUnitTests.java @@ -18,6 +18,8 @@ package org.springframework.data.mongodb.core.aggregation; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.*; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.anyList; import static org.springframework.data.mongodb.core.aggregation.Aggregation.*; import org.bson.Document; @@ -80,16 +82,6 @@ public class ReactiveAggregationUnitTests { template.aggregate(newAggregation(), INPUT_COLLECTION, null); } - @Test(expected = IllegalArgumentException.class) // DATAMONGO-1646 - public void errorsOnCursorBatchSizeUsage() { - - template.aggregate( - newAggregation(Product.class, // - project("name", "netPrice")) // - .withOptions(AggregationOptions.builder().cursorBatchSize(10).build()), - INPUT_COLLECTION, TagCount.class).subscribe(); - } - @Test(expected = IllegalArgumentException.class) // DATAMONGO-1646 public void errorsOnExplainUsage() { @@ -101,6 +93,20 @@ public class ReactiveAggregationUnitTests { .subscribe(); } + @Test // DATAMONGO-1646, DATAMONGO-1311 + public void appliesBatchSizeWhenPresent() { + + when(publisher.batchSize(anyInt())).thenReturn(publisher); + + AggregationOptions options = AggregationOptions.builder().cursorBatchSize(1234).build(); + template.aggregate(newAggregation(Product.class, // + project("name", "netPrice")) // + .withOptions(options), + INPUT_COLLECTION, TagCount.class).subscribe(); + + verify(publisher).batchSize(1234); + } + @Test // DATAMONGO-1646 public void appliesCollationCorrectlyWhenPresent() { diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/query/MongoQueryMethodUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/query/MongoQueryMethodUnitTests.java index 4ea7793df..087ca84e0 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/query/MongoQueryMethodUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/query/MongoQueryMethodUnitTests.java @@ -146,6 +146,15 @@ public class MongoQueryMethodUnitTests { assertThat(method.getQueryMetaAttributes().getMaxTimeMsec(), is(100L)); } + @Test // DATAMONGO-1311 + public void createsMongoQueryMethodWithBatchSizeCorrectly() throws Exception { + + MongoQueryMethod method = queryMethod(PersonRepository.class, "batchSize"); + + assertThat(method.hasQueryMetaAttributes(), is(true)); + assertThat(method.getQueryMetaAttributes().getCursorBatchSize(), is(100)); + } + @Test // DATAMONGO-1403 public void createsMongoQueryMethodWithSpellFixedMaxExecutionTimeCorrectly() throws Exception { @@ -233,6 +242,9 @@ public class MongoQueryMethodUnitTests { @Meta List emptyMetaAnnotation(); + @Meta(cursorBatchSize = 100) + List batchSize(); + @Meta(maxExecutionTimeMs = 100) List metaWithMaxExecutionTime();