@ -3281,8 +3281,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
Meta meta = query . getMeta ( ) ;
Meta meta = query . getMeta ( ) ;
if ( query . getSkip ( ) < = 0 & & query . getLimit ( ) < = 0 & & ObjectUtils . isEmpty ( query . getSortObject ( ) )
if ( query . getSkip ( ) < = 0 & & query . getLimit ( ) < = 0 & & ObjectUtils . isEmpty ( query . getSortObject ( ) )
& & ! StringUtils . hasText ( query . getHint ( ) ) & & ! meta . hasValues ( )
& & ! StringUtils . hasText ( query . getHint ( ) ) & & ! meta . hasValues ( ) & & ! query . getCollation ( ) . isPresent ( ) ) {
& & ! query . getCollation ( ) . isPresent ( ) ) {
return cursor ;
return cursor ;
}
}
@ -3302,15 +3301,30 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
cursorToUse = cursorToUse . sort ( sort ) ;
cursorToUse = cursorToUse . sort ( sort ) ;
}
}
Document metaDocument = new Document ( ) ;
if ( StringUtils . hasText ( query . getHint ( ) ) ) {
if ( StringUtils . hasText ( query . getHint ( ) ) ) {
metaDocument . put ( "$hint" , query . getHint ( ) ) ;
cursorToUse = cursorToUse . hint ( Document . parse ( query . getHint ( ) ) ) ;
}
}
if ( meta . hasValues ( ) ) {
if ( meta . hasValues ( ) ) {
for ( Entry < String , Object > entry : meta . values ( ) ) {
if ( StringUtils . hasText ( meta . getComment ( ) ) ) {
metaDocument . put ( entry . getKey ( ) , entry . getValue ( ) ) ;
cursorToUse = cursorToUse . comment ( meta . getComment ( ) ) ;
}
if ( meta . getSnapshot ( ) ) {
cursorToUse = cursorToUse . snapshot ( meta . getSnapshot ( ) ) ;
}
if ( meta . getMaxScan ( ) ! = null ) {
cursorToUse = cursorToUse . maxScan ( meta . getMaxScan ( ) ) ;
}
if ( meta . getMaxTimeMsec ( ) ! = null ) {
cursorToUse = cursorToUse . maxTime ( meta . getMaxTimeMsec ( ) , TimeUnit . MILLISECONDS ) ;
}
if ( meta . getCursorBatchSize ( ) ! = null ) {
cursorToUse = cursorToUse . batchSize ( meta . getCursorBatchSize ( ) ) ;
}
}
for ( Meta . CursorOption option : meta . getFlags ( ) ) {
for ( Meta . CursorOption option : meta . getFlags ( ) ) {
@ -3327,13 +3341,8 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
throw new IllegalArgumentException ( String . format ( "%s is no supported flag." , option ) ) ;
throw new IllegalArgumentException ( String . format ( "%s is no supported flag." , option ) ) ;
}
}
}
}
if ( meta . getCursorBatchSize ( ) ! = null ) {
cursorToUse = cursorToUse . batchSize ( meta . getCursorBatchSize ( ) ) ;
}
}
}
cursorToUse = cursorToUse . modifiers ( metaDocument ) ;
} catch ( RuntimeException e ) {
} catch ( RuntimeException e ) {
throw potentiallyConvertRuntimeException ( e , exceptionTranslator ) ;
throw potentiallyConvertRuntimeException ( e , exceptionTranslator ) ;
}
}
@ -3464,149 +3473,6 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
return mongoDbFactory ;
return mongoDbFactory ;
}
}
/ * *
* { @link BatchAggregationLoader } is a little helper that can process cursor results returned by an aggregation
* command execution . On presence of a { @literal nextBatch } indicated by presence of an { @code id } field in the
* { @code cursor } another { @code getMore } command gets executed reading the next batch of documents until all results
* are loaded .
*
* @author Christoph Strobl
* @since 1 . 10
* /
static class BatchAggregationLoader {
private static final String CURSOR_FIELD = "cursor" ;
private static final String RESULT_FIELD = "result" ;
private static final String BATCH_SIZE_FIELD = "batchSize" ;
private static final String FIRST_BATCH = "firstBatch" ;
private static final String NEXT_BATCH = "nextBatch" ;
private static final String SERVER_USED = "serverUsed" ;
private static final String OK = "ok" ;
private final MongoTemplate template ;
private final ReadPreference readPreference ;
private final int batchSize ;
BatchAggregationLoader ( MongoTemplate template , ReadPreference readPreference , int batchSize ) {
this . template = template ;
this . readPreference = readPreference ;
this . batchSize = batchSize ;
}
/ * *
* Run aggregation command and fetch all results .
* /
Document aggregate ( String collectionName , Aggregation aggregation , AggregationOperationContext context ) {
Document command = prepareAggregationCommand ( collectionName , aggregation , context , batchSize ) ;
if ( LOGGER . isDebugEnabled ( ) ) {
LOGGER . debug ( "Executing aggregation: {}" , serializeToJsonSafely ( command ) ) ;
}
return mergeAggregationResults ( aggregateBatched ( command , collectionName , batchSize ) ) ;
}
/ * *
* Pre process the aggregation command sent to the server by adding { @code cursor } options to match execution on
* different server versions .
* /
private static Document prepareAggregationCommand ( String collectionName , Aggregation aggregation ,
@Nullable AggregationOperationContext context , int batchSize ) {
AggregationOperationContext rootContext = context = = null ? Aggregation . DEFAULT_CONTEXT : context ;
Document command = aggregation . toDocument ( collectionName , rootContext ) ;
if ( ! aggregation . getOptions ( ) . isExplain ( ) ) {
command . put ( CURSOR_FIELD , new Document ( BATCH_SIZE_FIELD , batchSize ) ) ;
}
return command ;
}
private List < Document > aggregateBatched ( Document command , String collectionName , int batchSize ) {
List < Document > results = new ArrayList < > ( ) ;
Document commandResult = template . executeCommand ( command , readPreference ) ;
results . add ( postProcessResult ( commandResult ) ) ;
while ( hasNext ( commandResult ) ) {
Document getMore = new Document ( "getMore" , getNextBatchId ( commandResult ) ) //
. append ( "collection" , collectionName ) //
. append ( BATCH_SIZE_FIELD , batchSize ) ;
commandResult = template . executeCommand ( getMore , this . readPreference ) ;
results . add ( postProcessResult ( commandResult ) ) ;
}
return results ;
}
private static Document postProcessResult ( Document commandResult ) {
if ( ! commandResult . containsKey ( CURSOR_FIELD ) ) {
return commandResult ;
}
Document resultObject = new Document ( SERVER_USED , commandResult . get ( SERVER_USED ) ) ;
resultObject . put ( OK , commandResult . get ( OK ) ) ;
Document cursor = ( Document ) commandResult . get ( CURSOR_FIELD ) ;
if ( cursor . containsKey ( FIRST_BATCH ) ) {
resultObject . put ( RESULT_FIELD , cursor . get ( FIRST_BATCH ) ) ;
} else {
resultObject . put ( RESULT_FIELD , cursor . get ( NEXT_BATCH ) ) ;
}
return resultObject ;
}
private static Document mergeAggregationResults ( List < Document > batchResults ) {
if ( batchResults . size ( ) = = 1 ) {
return batchResults . iterator ( ) . next ( ) ;
}
Document commandResult = new Document ( ) ;
List < Object > allResults = new ArrayList < > ( ) ;
for ( Document batchResult : batchResults ) {
Collection documents = ( Collection < ? > ) batchResult . get ( RESULT_FIELD ) ;
if ( ! CollectionUtils . isEmpty ( documents ) ) {
allResults . addAll ( documents ) ;
}
}
// take general info from first batch
commandResult . put ( SERVER_USED , batchResults . iterator ( ) . next ( ) . get ( SERVER_USED ) ) ;
commandResult . put ( OK , batchResults . iterator ( ) . next ( ) . get ( OK ) ) ;
// and append the merged batchResults
commandResult . put ( RESULT_FIELD , allResults ) ;
return commandResult ;
}
private static boolean hasNext ( Document commandResult ) {
if ( ! commandResult . containsKey ( CURSOR_FIELD ) ) {
return false ;
}
Object next = getNextBatchId ( commandResult ) ;
return next ! = null & & ( ( Number ) next ) . longValue ( ) ! = 0L ;
}
@Nullable
private static Object getNextBatchId ( Document commandResult ) {
return ( ( Document ) commandResult . get ( CURSOR_FIELD ) ) . get ( "id" ) ;
}
}
/ * *
/ * *
* { @link MongoTemplate } extension bound to a specific { @link ClientSession } that is applied when interacting with the
* { @link MongoTemplate } extension bound to a specific { @link ClientSession } that is applied when interacting with the
* server through the driver API .
* server through the driver API .