Browse Source

DATAMONGO-2341 - Support shard key derivation in save operations via @Sharded annotation.

Spring Data MongoDB uses the @Sharded annotation to identify entities stored in sharded collections.
The shard key consists of a single or multiple properties present in every document within the target collection, and is used to distribute them across shards.

Spring Data MongoDB will do best effort optimisations for sharded scenarios when using repositories by adding required shard key information, if not already present, to replaceOne filter queries when upserting entities. This may require an additional server round trip to determine the actual value of the current shard key.

By setting @Sharded(immutableKey = true) no attempt will be made to check if an entities shard key changed.

Please see the MongoDB Documentation for further details and the list below for which operations are eligible to auto include the shard key.

* Reactive/CrudRepository.save(...)
* Reactive/CrudRepository.saveAll(...)
* Reactive/MongoTemplate.save(...)

Original pull request: #833.
pull/837/head
Christoph Strobl 6 years ago committed by Mark Paluch
parent
commit
6259cd2c3b
No known key found for this signature in database
GPG Key ID: 51A00FA751B91849
  1. 6
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MappedDocument.java
  2. 48
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java
  3. 76
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/QueryOperations.java
  4. 44
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java
  5. 30
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/mapping/BasicMongoPersistentEntity.java
  6. 34
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/mapping/MongoPersistentEntity.java
  7. 138
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/mapping/ShardKey.java
  8. 92
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/mapping/Sharded.java
  9. 35
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/mapping/ShardingStrategy.java
  10. 4
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/BsonUtils.java
  11. 89
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateUnitTests.java
  12. 96
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java
  13. 40
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ShardedEntityWithDefaultShardKey.java
  14. 40
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ShardedEntityWithNonDefaultImmutableShardKey.java
  15. 40
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ShardedEntityWithNonDefaultShardKey.java
  16. 43
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ShardedVersionedEntityWithNonDefaultShardKey.java
  17. 150
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/UpdateOperationsUnitTests.java
  18. 41
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/mapping/BasicMongoPersistentEntityUnitTests.java
  19. 1
      src/main/asciidoc/index.adoc
  20. 70
      src/main/asciidoc/reference/sharding.adoc

6
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MappedDocument.java

@ -80,7 +80,11 @@ public class MappedDocument { @@ -80,7 +80,11 @@ public class MappedDocument {
}
public Bson getIdFilter() {
return Filters.eq(ID_FIELD, document.get(ID_FIELD));
return new Document(ID_FIELD, document.get(ID_FIELD));
}
public Object get(String key) {
return document.get(key);
}
public UpdateDefinition updateWithoutId() {

48
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java

@ -33,7 +33,6 @@ import org.bson.Document; @@ -33,7 +33,6 @@ import org.bson.Document;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
@ -1480,23 +1479,38 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, @@ -1480,23 +1479,38 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
}
return execute(collectionName, collection -> {
MongoAction mongoAction = new MongoAction(writeConcern, MongoActionOperation.SAVE, collectionName, entityClass,
dbDoc, null);
WriteConcern writeConcernToUse = prepareWriteConcern(mongoAction);
MappedDocument mapped = MappedDocument.of(dbDoc);
MongoCollection<Document> collectionToUse = writeConcernToUse == null //
? collection //
: collection.withWriteConcern(writeConcernToUse);
if (!mapped.hasId()) {
if (writeConcernToUse == null) {
collection.insertOne(dbDoc);
} else {
collection.withWriteConcern(writeConcernToUse).insertOne(dbDoc);
}
} else if (writeConcernToUse == null) {
collection.replaceOne(mapped.getIdFilter(), dbDoc, new ReplaceOptions().upsert(true));
collectionToUse.insertOne(dbDoc);
} else {
collection.withWriteConcern(writeConcernToUse).replaceOne(mapped.getIdFilter(), dbDoc,
new ReplaceOptions().upsert(true));
MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityClass);
UpdateContext updateContext = queryOperations.replaceSingleContext(mapped, true);
Document replacement = updateContext.getMappedUpdate(entity);
Document filter = updateContext.getMappedQuery(entity);
if (updateContext.requiresShardKey(filter, entity)) {
if (entity.getShardKey().isImmutable()) {
filter = updateContext.applyShardKey(entity, filter, null);
} else {
filter = updateContext.applyShardKey(entity, filter,
collection.find(filter, Document.class).projection(updateContext.getMappedShardKey(entity)).first());
}
}
collectionToUse.replaceOne(filter, replacement, new ReplaceOptions().upsert(true));
}
return mapped.getId();
});
@ -1615,8 +1629,20 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, @@ -1615,8 +1629,20 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
if (!UpdateMapper.isUpdateObject(updateObj)) {
Document filter = new Document(queryObj);
if (updateContext.requiresShardKey(filter, entity)) {
if (entity.getShardKey().isImmutable()) {
filter = updateContext.applyShardKey(entity, filter, null);
} else {
filter = updateContext.applyShardKey(entity, filter,
collection.find(filter, Document.class).projection(updateContext.getMappedShardKey(entity)).first());
}
}
ReplaceOptions replaceOptions = updateContext.getReplaceOptions(entityClass);
return collection.replaceOne(queryObj, updateObj, replaceOptions);
return collection.replaceOne(filter, updateObj, replaceOptions);
} else {
return multi ? collection.updateMany(queryObj, updateObj, opts)
: collection.updateOne(queryObj, updateObj, opts);

76
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/QueryOperations.java

@ -16,7 +16,10 @@ @@ -16,7 +16,10 @@
package org.springframework.data.mongodb.core;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -154,6 +157,15 @@ class QueryOperations { @@ -154,6 +157,15 @@ class QueryOperations {
return new UpdateContext(updateDefinition, query, false, upsert);
}
/**
* @param replacement the {@link MappedDocument mapped replacement} document.
* @param upsert use {@literal true} to insert diff when no existing document found.
* @return new instance of {@link UpdateContext}.
*/
UpdateContext replaceSingleContext(MappedDocument replacement, boolean upsert) {
return new UpdateContext(replacement, upsert);
}
/**
* Create a new {@link DeleteContext} instance removing all matching documents.
*
@ -253,7 +265,6 @@ class QueryOperations { @@ -253,7 +265,6 @@ class QueryOperations {
*/
Document getMappedSort(@Nullable MongoPersistentEntity<?> entity) {
return queryMapper.getMappedSort(query.getSortObject(), entity);
}
/**
@ -353,7 +364,6 @@ class QueryOperations { @@ -353,7 +364,6 @@ class QueryOperations {
if (ClassUtils.isAssignable(requestedTargetType, propertyType)) {
conversionTargetType = propertyType;
}
} catch (PropertyReferenceException e) {
// just don't care about it as we default to Object.class anyway.
}
@ -491,7 +501,9 @@ class QueryOperations { @@ -491,7 +501,9 @@ class QueryOperations {
private final boolean multi;
private final boolean upsert;
private final UpdateDefinition update;
private final @Nullable UpdateDefinition update;
private final @Nullable MappedDocument mappedDocument;
private final Map<Class<?>, Document> mappedShardKey = new ConcurrentHashMap<>(1);
/**
* Create a new {@link UpdateContext} instance.
@ -520,6 +532,16 @@ class QueryOperations { @@ -520,6 +532,16 @@ class QueryOperations {
this.multi = multi;
this.upsert = upsert;
this.update = update;
this.mappedDocument = null;
}
UpdateContext(MappedDocument update, boolean upsert) {
super(new BasicQuery(new Document(BsonUtils.asMap(update.getIdFilter()))));
this.multi = false;
this.upsert = upsert;
this.mappedDocument = update;
this.update = null;
}
/**
@ -544,7 +566,7 @@ class QueryOperations { @@ -544,7 +566,7 @@ class QueryOperations {
UpdateOptions options = new UpdateOptions();
options.upsert(upsert);
if (update.hasArrayFilters()) {
if (update != null && update.hasArrayFilters()) {
options
.arrayFilters(update.getArrayFilters().stream().map(ArrayFilter::asDocument).collect(Collectors.toList()));
}
@ -602,6 +624,45 @@ class QueryOperations { @@ -602,6 +624,45 @@ class QueryOperations {
return mappedQuery;
}
<T> Document applyShardKey(@Nullable MongoPersistentEntity<T> domainType, Document filter,
@Nullable Document existing) {
Document shardKeySource = existing != null ? existing
: mappedDocument != null ? mappedDocument.getDocument() : getMappedUpdate(domainType);
Document filterWithShardKey = new Document(filter);
for (String key : getMappedShardKeyFields(domainType)) {
if (!filterWithShardKey.containsKey(key)) {
filterWithShardKey.append(key, shardKeySource.get(key));
}
}
return filterWithShardKey;
}
<T> boolean requiresShardKey(Document filter, @Nullable MongoPersistentEntity<T> domainType) {
if (multi || domainType == null || !domainType.isSharded() || domainType.idPropertyIsShardKey()) {
return false;
}
if (filter.keySet().containsAll(getMappedShardKeyFields(domainType))) {
return false;
}
return true;
}
Set<String> getMappedShardKeyFields(@Nullable MongoPersistentEntity<?> entity) {
return getMappedShardKey(entity).keySet();
}
Document getMappedShardKey(@Nullable MongoPersistentEntity<?> entity) {
return mappedShardKey.computeIfAbsent(entity.getType(),
key -> queryMapper.getMappedFields(entity.getShardKey().getDocument(), entity));
}
/**
* Get the already mapped aggregation pipeline to use with an {@link #isAggregationUpdate()}.
*
@ -625,8 +686,11 @@ class QueryOperations { @@ -625,8 +686,11 @@ class QueryOperations {
*/
Document getMappedUpdate(@Nullable MongoPersistentEntity<?> entity) {
return update instanceof MappedUpdate ? update.getUpdateObject()
: updateMapper.getMappedObject(update.getUpdateObject(), entity);
if (update != null) {
return update instanceof MappedUpdate ? update.getUpdateObject()
: updateMapper.getMappedObject(update.getUpdateObject(), entity);
}
return mappedDocument.getDocument();
}
/**

44
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java

@ -39,7 +39,6 @@ import org.reactivestreams.Publisher; @@ -39,7 +39,6 @@ import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
@ -1638,9 +1637,31 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati @@ -1638,9 +1637,31 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
? collection //
: collection.withWriteConcern(writeConcernToUse);
Publisher<?> publisher = !mapped.hasId() //
? collectionToUse.insertOne(document) //
: collectionToUse.replaceOne(mapped.getIdFilter(), document, new ReplaceOptions().upsert(true));
Publisher<?> publisher = null;
if (!mapped.hasId()) {
publisher = collectionToUse.insertOne(document);
} else {
MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityClass);
UpdateContext updateContext = queryOperations.replaceSingleContext(mapped, true);
Document filter = updateContext.getMappedQuery(entity);
Document replacement = updateContext.getMappedUpdate(entity);
Mono<Document> theFilter = Mono.just(filter);
if(updateContext.requiresShardKey(filter, entity)) {
if (entity.getShardKey().isImmutable()) {
theFilter = Mono.just(updateContext.applyShardKey(entity, filter, null));
} else {
theFilter = Mono.from(
collection.find(filter, Document.class).projection(updateContext.getMappedShardKey(entity)).first())
.defaultIfEmpty(replacement).map(it -> updateContext.applyShardKey(entity, filter, it));
}
}
publisher = theFilter.flatMap(
it -> Mono.from(collectionToUse.replaceOne(it, replacement, updateContext.getReplaceOptions(entityClass))));
}
return Mono.from(publisher).map(o -> mapped.getId());
});
@ -1778,8 +1799,21 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati @@ -1778,8 +1799,21 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
if (!UpdateMapper.isUpdateObject(updateObj)) {
Document filter = new Document(queryObj);
Mono<Document> theFilter = Mono.just(filter);
if(updateContext.requiresShardKey(filter, entity)) {
if (entity.getShardKey().isImmutable()) {
theFilter = Mono.just(updateContext.applyShardKey(entity, filter, null));
} else {
theFilter = Mono.from(
collection.find(filter, Document.class).projection(updateContext.getMappedShardKey(entity)).first())
.defaultIfEmpty(updateObj).map(it -> updateContext.applyShardKey(entity, filter, it));
}
}
ReplaceOptions replaceOptions = updateContext.getReplaceOptions(entityClass);
return collectionToUse.replaceOne(queryObj, updateObj, replaceOptions);
return theFilter.flatMap(it -> Mono.from(collectionToUse.replaceOne(it, updateObj, replaceOptions)));
}
return multi ? collectionToUse.updateMany(queryObj, updateObj, updateOptions)

30
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/mapping/BasicMongoPersistentEntity.java

@ -37,6 +37,7 @@ import org.springframework.expression.spel.standard.SpelExpressionParser; @@ -37,6 +37,7 @@ import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
/**
@ -63,6 +64,8 @@ public class BasicMongoPersistentEntity<T> extends BasicPersistentEntity<T, Mong @@ -63,6 +64,8 @@ public class BasicMongoPersistentEntity<T> extends BasicPersistentEntity<T, Mong
private final @Nullable String collation;
private final @Nullable Expression collationExpression;
private final ShardKey shardKey;
/**
* Creates a new {@link BasicMongoPersistentEntity} with the given {@link TypeInformation}. Will default the
* collection name to the entities simple type name.
@ -92,6 +95,8 @@ public class BasicMongoPersistentEntity<T> extends BasicPersistentEntity<T, Mong @@ -92,6 +95,8 @@ public class BasicMongoPersistentEntity<T> extends BasicPersistentEntity<T, Mong
this.collation = null;
this.collationExpression = null;
}
this.shardKey = detectShardKey(this);
}
/*
@ -160,6 +165,11 @@ public class BasicMongoPersistentEntity<T> extends BasicPersistentEntity<T, Mong @@ -160,6 +165,11 @@ public class BasicMongoPersistentEntity<T> extends BasicPersistentEntity<T, Mong
: null;
}
@Override
public ShardKey getShardKey() {
return shardKey;
}
/*
* (non-Javadoc)
* @see org.springframework.data.mapping.model.BasicPersistentEntity#verify()
@ -297,6 +307,26 @@ public class BasicMongoPersistentEntity<T> extends BasicPersistentEntity<T, Mong @@ -297,6 +307,26 @@ public class BasicMongoPersistentEntity<T> extends BasicPersistentEntity<T, Mong
return expression instanceof LiteralExpression ? null : expression;
}
@Nullable
private static ShardKey detectShardKey(BasicMongoPersistentEntity<?> entity) {
if (!entity.isAnnotationPresent(Sharded.class)) {
return ShardKey.none();
}
Sharded sharded = entity.getRequiredAnnotation(Sharded.class);
String[] keyProperties = sharded.shardKey();
if (ObjectUtils.isEmpty(keyProperties)) {
keyProperties = new String[] { "_id" };
}
ShardKey shardKey = ShardingStrategy.HASH.equals(sharded.shardingStrategy()) ? ShardKey.hash(keyProperties)
: ShardKey.range(keyProperties);
return sharded.immutableKey() ? ShardKey.immutable(shardKey) : shardKey;
}
/**
* Handler to collect {@link MongoPersistentProperty} instances and check that each of them is mapped to a distinct
* field name.

34
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/mapping/MongoPersistentEntity.java

@ -77,4 +77,38 @@ public interface MongoPersistentEntity<T> extends PersistentEntity<T, MongoPersi @@ -77,4 +77,38 @@ public interface MongoPersistentEntity<T> extends PersistentEntity<T, MongoPersi
return getCollation() != null;
}
/**
* Get the entities shard key if defined.
*
* @return {@link ShardKey#none()} if not not set.
* @since 3.0
*/
ShardKey getShardKey();
/**
* @return {@literal true} if the {@link #getShardKey() shard key} does not match {@link ShardKey#none()}.
* @since 3.0
*/
default boolean isSharded() {
return !ShardKey.none().equals(getShardKey());
}
/**
* @return {@literal true} if the {@link #getShardKey() shard key} is the entities {@literal id} property.
* @since 3.0
*/
default boolean idPropertyIsShardKey() {
ShardKey shardKey = getShardKey();
if (shardKey.size() != 1) {
return false;
}
String key = shardKey.getPropertyNames().iterator().next();
if ("_id".equals(key)) {
return true;
}
MongoPersistentProperty idProperty = getIdProperty();
return idProperty != null && idProperty.getName().equals(key);
}
}

138
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/mapping/ShardKey.java

@ -0,0 +1,138 @@ @@ -0,0 +1,138 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.mapping;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.bson.Document;
import org.springframework.lang.Nullable;
import org.springframework.util.ObjectUtils;
/**
* Value object representing an entities <a href="https://docs.mongodb.com/manual/core/sharding-shard-key/">Shard
* Key</a> used to distribute documents across a sharded MongoDB cluster.
* <p />
* {@link ShardKey#isImmutable() Immutable} shard keys indicate a fixed value that is not updated (see
* <a href="https://docs.mongodb.com/manual/core/sharding-shard-key/#change-a-document-s-shard-key-value">MongoDB
* Reference: Change a Documents Shard Key Value</a>), which allows to skip server round trips in cases where a
* potential shard key change might have occurred.
*
* @author Christoph Strobl
* @since 3.0
*/
public class ShardKey {
private static final ShardKey NONE = new ShardKey(Collections.emptyList(), null, true);
private final List<String> propertyNames;
private final @Nullable ShardingStrategy shardingStrategy;
private final boolean immutable;
private ShardKey(List<String> propertyNames, @Nullable ShardingStrategy shardingStrategy, boolean immutable) {
this.propertyNames = propertyNames;
this.shardingStrategy = shardingStrategy;
this.immutable = immutable;
}
/**
* @return the number of properties used to form the shard key.
*/
public int size() {
return propertyNames.size();
}
/**
* @return the unmodifiable collection of property names forming the shard key.
*/
public Collection<String> getPropertyNames() {
return propertyNames;
}
/**
* @return {@literal true} if the shard key of an document does not change.
* @see <a href="https://docs.mongodb.com/manual/core/sharding-shard-key/#change-a-document-s-shard-key-value">MongoDB
* Reference: Change a Documents Shard Key Value</a>
*/
public boolean isImmutable() {
return immutable;
}
/**
* Get the unmapped MongoDB representation of the {@link ShardKey}.
*
* @return never {@literal null}.
*/
public Document getDocument() {
Document doc = new Document();
for (String field : propertyNames) {
doc.append(field, shardingValue());
}
return doc;
}
private Object shardingValue() {
return ObjectUtils.nullSafeEquals(ShardingStrategy.HASH, shardingStrategy) ? "hash" : 1;
}
/**
* {@link ShardKey} indicating no shard key has been defined.
*
* @return {@link #NONE}
*/
public static ShardKey none() {
return NONE;
}
/**
* Create a new {@link ShardingStrategy#RANGE} shard key.
*
* @param propertyNames must not be {@literal null}.
* @return new instance of {@link ShardKey}.
*/
public static ShardKey range(String... propertyNames) {
return new ShardKey(Arrays.asList(propertyNames), ShardingStrategy.RANGE, false);
}
/**
* Create a new {@link ShardingStrategy#RANGE} shard key.
*
* @param propertyNames must not be {@literal null}.
* @return new instance of {@link ShardKey}.
*/
public static ShardKey hash(String... propertyNames) {
return new ShardKey(Arrays.asList(propertyNames), ShardingStrategy.HASH, false);
}
/**
* Turn the given {@link ShardKey} into an {@link #isImmutable() immutable} one.
*
* @param shardKey must not be {@literal null}.
* @return new instance of {@link ShardKey} if the given shard key is not already immutable.
*/
public static ShardKey immutable(ShardKey shardKey) {
if (shardKey.isImmutable()) {
return shardKey;
}
return new ShardKey(shardKey.propertyNames, shardKey.shardingStrategy, true);
}
}

92
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/mapping/Sharded.java

@ -0,0 +1,92 @@ @@ -0,0 +1,92 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.mapping;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.core.annotation.AliasFor;
import org.springframework.data.annotation.Persistent;
/**
* The {@link Sharded} annotation provides meta information about the actual distribution of data across multiple
* machines. The {@link #shardKey()} is used to distribute documents across shards. <br />
* Please visit the <a href="https://docs.mongodb.com/manual/sharding/">MongoDB Documentation</a> for more information
* about requirements and limitations of sharding. <br />
* Spring Data will automatically add the shard key to filter queries used for
* {@link com.mongodb.client.MongoCollection#replaceOne(org.bson.conversions.Bson, Object)} operations triggered by
* {@code save} operations on {@link org.springframework.data.mongodb.core.MongoOperations} and
* {@link org.springframework.data.mongodb.core.ReactiveMongoOperations} as well as {@code update/upsert} operation
* replacing/upserting a single existing document as long as the given
* {@link org.springframework.data.mongodb.core.query.UpdateDefinition} holds a full copy of the entity. <br />
* All other operations that require the presence of the {@literal shard key} in the filter query need to provide the
* information via the {@link org.springframework.data.mongodb.core.query.Query} parameter when invoking the method.
*
* @author Christoph Strobl
* @since 3.0
*/
@Persistent
@Inherited
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.TYPE })
public @interface Sharded {
/**
* Alias for {@link #shardKey()}.
*
* @return {@literal _id} by default.
* @see #shardKey()
*/
@AliasFor("shardKey")
String[] value() default {};
/**
* The shard key determines the distribution of the collections documents among the clusters shards. The shard key
* is either a single or multiple indexed properties that exist in every document in the collection. <br />
* By default the {@literal id} property is used for sharding. <br />
* <strong>NOTE</strong> Required indexes will not be created automatically. Use
* {@link org.springframework.data.mongodb.core.index.Indexed} or
* {@link org.springframework.data.mongodb.core.index.CompoundIndex} along with enabled
* {@link org.springframework.data.mongodb.config.MongoConfigurationSupport#autoIndexCreation() auto index creation}
* or set up them up via
* {@link org.springframework.data.mongodb.core.index.IndexOperations#ensureIndex(org.springframework.data.mongodb.core.index.IndexDefinition)}.
*
* @return an empty key by default. Which indicates to use the entities {@literal id} property.
*/
@AliasFor("value")
String[] shardKey() default {};
/**
* The sharding strategy to use for distributing data across sharded clusters.
*
* @return {@link ShardingStrategy#RANGE} by default
*/
ShardingStrategy shardingStrategy() default ShardingStrategy.RANGE;
/**
* As of MongoDB 4.2 it is possible to change the shard key using update. Using immutable shard keys avoids server
* round trips to obtain an entities actual shard key from the database.
*
* @return {@literal false} by default;
* @see <a href="https://docs.mongodb.com/manual/core/sharding-shard-key/#change-a-document-s-shard-key-value">MongoDB
* Reference: Change a Documents Shard Key Value</a>
*/
boolean immutableKey() default false;
}

35
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/mapping/ShardingStrategy.java

@ -0,0 +1,35 @@ @@ -0,0 +1,35 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.mapping;
/**
* @author Christoph Strobl
* @since 3.0
*/
public enum ShardingStrategy {
/**
* Ranged sharding involves dividing data into ranges based on the shard key values. Each chunk is then assigned a
* range based on the shard key values.
*/
RANGE,
/**
* Hashed Sharding involves computing a hash of the shard key fields value. Each chunk is then assigned a range based
* on the hashed shard key values.
*/
HASH
}

4
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/BsonUtils.java

@ -54,13 +54,15 @@ public class BsonUtils { @@ -54,13 +54,15 @@ public class BsonUtils {
}
public static Map<String, Object> asMap(Bson bson) {
if (bson instanceof Document) {
return (Document) bson;
}
if (bson instanceof BasicDBObject) {
return ((BasicDBObject) bson);
}
throw new IllegalArgumentException("o_O what's that? Cannot read values from " + bson.getClass());
return (Map) bson.toBsonDocument(Document.class, MongoClientSettings.getDefaultCodecRegistry());
}
public static void addToMap(Bson bson, String key, @Nullable Object value) {

89
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateUnitTests.java

@ -1829,6 +1829,95 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests { @@ -1829,6 +1829,95 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests {
assertThat(captor.getValue()).isEqualTo(Collections.singletonList(Document.parse("{ $unset : \"firstname\" }")));
}
@Test // DATAMONGO-2341
void saveShouldAppendNonDefaultShardKeyIfNotPresentInFilter() {
template.save(new ShardedEntityWithNonDefaultShardKey("id-1", "AT", 4230));
ArgumentCaptor<Bson> filter = ArgumentCaptor.forClass(Bson.class);
verify(collection).replaceOne(filter.capture(), any(), any());
assertThat(filter.getValue()).isEqualTo(new Document("_id", "id-1").append("country", "AT").append("userid", 4230));
}
@Test // DATAMONGO-2341
void saveShouldAppendNonDefaultShardKeyToVersionedEntityIfNotPresentInFilter() {
when(collection.replaceOne(any(), any(), any(ReplaceOptions.class))).thenReturn(UpdateResult.acknowledged(1, 1L, null));
template.save(new ShardedVersionedEntityWithNonDefaultShardKey("id-1", 1L, "AT", 4230));
ArgumentCaptor<Bson> filter = ArgumentCaptor.forClass(Bson.class);
verify(collection).replaceOne(filter.capture(), any(), any());
assertThat(filter.getValue()).isEqualTo(new Document("_id", "id-1").append("version", 1L).append("country", "AT").append("userid", 4230));
}
@Test // DATAMONGO-2341
void saveShouldAppendNonDefaultShardKeyFromExistingDocumentIfNotPresentInFilter() {
when(findIterable.first()).thenReturn(new Document("_id", "id-1").append("country", "US").append("userid", 4230));
template.save(new ShardedEntityWithNonDefaultShardKey("id-1", "AT", 4230));
ArgumentCaptor<Bson> filter = ArgumentCaptor.forClass(Bson.class);
ArgumentCaptor<Document> replacement = ArgumentCaptor.forClass(Document.class);
verify(collection).replaceOne(filter.capture(), replacement.capture(), any());
assertThat(filter.getValue()).isEqualTo(new Document("_id", "id-1").append("country", "US").append("userid", 4230));
assertThat(replacement.getValue()).containsEntry("country", "AT").containsEntry("userid", 4230);
}
@Test // DATAMONGO-2341
void saveShouldAppendNonDefaultShardKeyFromGivenDocumentIfShardKeyIsImmutable() {
template.save(new ShardedEntityWithNonDefaultImmutableShardKey("id-1", "AT", 4230));
ArgumentCaptor<Bson> filter = ArgumentCaptor.forClass(Bson.class);
ArgumentCaptor<Document> replacement = ArgumentCaptor.forClass(Document.class);
verify(collection).replaceOne(filter.capture(), replacement.capture(), any());
assertThat(filter.getValue()).isEqualTo(new Document("_id", "id-1").append("country", "AT").append("userid", 4230));
assertThat(replacement.getValue()).containsEntry("country", "AT").containsEntry("userid", 4230);
verifyNoInteractions(findIterable);
}
@Test // DATAMONGO-2341
void saveShouldAppendDefaultShardKeyIfNotPresentInFilter() {
template.save(new ShardedEntityWithDefaultShardKey("id-1", "AT", 4230));
ArgumentCaptor<Bson> filter = ArgumentCaptor.forClass(Bson.class);
verify(collection).replaceOne(filter.capture(), any(), any());
assertThat(filter.getValue()).isEqualTo(new Document("_id", "id-1"));
verify(findIterable, never()).first();
}
@Test // DATAMONGO-2341
void saveShouldProjectOnShardKeyWhenLoadingExistingDocument() {
when(findIterable.first()).thenReturn(new Document("_id", "id-1").append("country", "US").append("userid", 4230));
template.save(new ShardedEntityWithNonDefaultShardKey("id-1", "AT", 4230));
verify(findIterable).projection(new Document("country", 1).append("userid", 1));
}
@Test // DATAMONGO-2341
void saveVersionedShouldProjectOnShardKeyWhenLoadingExistingDocument() {
when(collection.replaceOne(any(), any(), any(ReplaceOptions.class))).thenReturn(UpdateResult.acknowledged(1, 1L, null));
when(findIterable.first()).thenReturn(new Document("_id", "id-1").append("country", "US").append("userid", 4230));
template.save(new ShardedVersionedEntityWithNonDefaultShardKey("id-1", 1L, "AT", 4230));
verify(findIterable).projection(new Document("country", 1).append("userid", 1));
}
class AutogenerateableId {
@Id BigInteger id;

96
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java

@ -18,6 +18,7 @@ package org.springframework.data.mongodb.core; @@ -18,6 +18,7 @@ package org.springframework.data.mongodb.core;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.*;
import static org.springframework.data.mongodb.core.aggregation.Aggregation.*;
import static org.springframework.data.mongodb.test.util.Assertions.assertThat;
import lombok.Data;
import reactor.core.publisher.Mono;
@ -999,6 +1000,101 @@ public class ReactiveMongoTemplateUnitTests { @@ -999,6 +1000,101 @@ public class ReactiveMongoTemplateUnitTests {
assertThat(captor.getValue()).isEqualTo(Collections.singletonList(Document.parse("{ $unset : \"firstname\" }")));
}
@Test // DATAMONGO-2341
void saveShouldAppendNonDefaultShardKeyIfNotPresentInFilter() {
when(findPublisher.first()).thenReturn(Mono.empty());
template.save(new ShardedEntityWithNonDefaultShardKey("id-1", "AT", 4230)).subscribe();
ArgumentCaptor<Bson> filter = ArgumentCaptor.forClass(Bson.class);
verify(collection).replaceOne(filter.capture(), any(), any());
assertThat(filter.getValue()).isEqualTo(new Document("_id", "id-1").append("country", "AT").append("userid", 4230));
}
@Test // DATAMONGO-2341
void saveShouldAppendNonDefaultShardKeyFromGivenDocumentIfShardKeyIsImmutable() {
template.save(new ShardedEntityWithNonDefaultImmutableShardKey("id-1", "AT", 4230)).subscribe();
ArgumentCaptor<Bson> filter = ArgumentCaptor.forClass(Bson.class);
ArgumentCaptor<Document> replacement = ArgumentCaptor.forClass(Document.class);
verify(collection).replaceOne(filter.capture(), replacement.capture(), any());
assertThat(filter.getValue()).isEqualTo(new Document("_id", "id-1").append("country", "AT").append("userid", 4230));
assertThat(replacement.getValue()).containsEntry("country", "AT").containsEntry("userid", 4230);
verifyNoInteractions(findPublisher);
}
@Test // DATAMONGO-2341
void saveShouldAppendNonDefaultShardKeyToVersionedEntityIfNotPresentInFilter() {
when(collection.replaceOne(any(Bson.class), any(Document.class), any(ReplaceOptions.class)))
.thenReturn(Mono.just(UpdateResult.acknowledged(1, 1L, null)));
when(findPublisher.first()).thenReturn(Mono.empty());
template.save(new ShardedVersionedEntityWithNonDefaultShardKey("id-1", 1L, "AT", 4230)).subscribe();
ArgumentCaptor<Bson> filter = ArgumentCaptor.forClass(Bson.class);
verify(collection).replaceOne(filter.capture(), any(), any());
assertThat(filter.getValue())
.isEqualTo(new Document("_id", "id-1").append("version", 1L).append("country", "AT").append("userid", 4230));
}
@Test // DATAMONGO-2341
void saveShouldAppendNonDefaultShardKeyFromExistingDocumentIfNotPresentInFilter() {
when(findPublisher.first())
.thenReturn(Mono.just(new Document("_id", "id-1").append("country", "US").append("userid", 4230)));
template.save(new ShardedEntityWithNonDefaultShardKey("id-1", "AT", 4230)).subscribe();
ArgumentCaptor<Bson> filter = ArgumentCaptor.forClass(Bson.class);
ArgumentCaptor<Document> replacement = ArgumentCaptor.forClass(Document.class);
verify(collection).replaceOne(filter.capture(), replacement.capture(), any());
assertThat(filter.getValue()).isEqualTo(new Document("_id", "id-1").append("country", "US").append("userid", 4230));
assertThat(replacement.getValue()).containsEntry("country", "AT").containsEntry("userid", 4230);
}
@Test // DATAMONGO-2341
void saveShouldAppendDefaultShardKeyIfNotPresentInFilter() {
template.save(new ShardedEntityWithDefaultShardKey("id-1", "AT", 4230)).subscribe();
ArgumentCaptor<Bson> filter = ArgumentCaptor.forClass(Bson.class);
verify(collection).replaceOne(filter.capture(), any(), any());
assertThat(filter.getValue()).isEqualTo(new Document("_id", "id-1"));
}
@Test // DATAMONGO-2341
void saveShouldProjectOnShardKeyWhenLoadingExistingDocument() {
when(findPublisher.first()).thenReturn(Mono.just(new Document("_id", "id-1").append("country", "US").append("userid", 4230)));
template.save(new ShardedEntityWithNonDefaultShardKey("id-1", "AT", 4230)).subscribe();
verify(findPublisher).projection(new Document("country", 1).append("userid", 1));
}
@Test // DATAMONGO-2341
void saveVersionedShouldProjectOnShardKeyWhenLoadingExistingDocument() {
when(collection.replaceOne(any(Bson.class), any(Document.class), any(ReplaceOptions.class)))
.thenReturn(Mono.just(UpdateResult.acknowledged(1, 1L, null)));
when(findPublisher.first()).thenReturn(Mono.empty());
template.save(new ShardedVersionedEntityWithNonDefaultShardKey("id-1", 1L, "AT", 4230)).subscribe();
verify(findPublisher).projection(new Document("country", 1).append("userid", 1));
}
@Data
@org.springframework.data.mongodb.core.mapping.Document(collection = "star-wars")
static class Person {

40
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ShardedEntityWithDefaultShardKey.java

@ -0,0 +1,40 @@ @@ -0,0 +1,40 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Field;
import org.springframework.data.mongodb.core.mapping.Sharded;
/**
* @author Christoph Strobl
*/
@Data
@AllArgsConstructor
@Sharded
public class ShardedEntityWithDefaultShardKey {
private @Id String id;
private String country;
@Field("userid") //
private Integer userId;
}

40
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ShardedEntityWithNonDefaultImmutableShardKey.java

@ -0,0 +1,40 @@ @@ -0,0 +1,40 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Field;
import org.springframework.data.mongodb.core.mapping.Sharded;
/**
* @author Christoph Strobl
*/
@Data
@AllArgsConstructor
@Sharded(shardKey = { "country", "userId" }, immutableKey = true)
public class ShardedEntityWithNonDefaultImmutableShardKey {
private @Id String id;
private String country;
@Field("userid") //
private Integer userId;
}

40
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ShardedEntityWithNonDefaultShardKey.java

@ -0,0 +1,40 @@ @@ -0,0 +1,40 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Field;
import org.springframework.data.mongodb.core.mapping.Sharded;
/**
* @author Christoph Strobl
*/
@Data
@AllArgsConstructor
@Sharded(shardKey = { "country", "userId" })
public class ShardedEntityWithNonDefaultShardKey {
private @Id String id;
private String country;
@Field("userid") //
private Integer userId;
}

43
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ShardedVersionedEntityWithNonDefaultShardKey.java

@ -0,0 +1,43 @@ @@ -0,0 +1,43 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Version;
import org.springframework.data.mongodb.core.mapping.Field;
import org.springframework.data.mongodb.core.mapping.Sharded;
/**
* @author Christoph Strobl
*/
@Data
@AllArgsConstructor
@Sharded(shardKey = { "country", "userId" })
public class ShardedVersionedEntityWithNonDefaultShardKey {
private @Id String id;
private @Version Long version;
private String country;
@Field("userid") //
private Integer userId;
}

150
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/UpdateOperationsUnitTests.java

@ -0,0 +1,150 @@ @@ -0,0 +1,150 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core;
import static org.assertj.core.api.Assertions.*;
import java.util.Arrays;
import org.bson.Document;
import org.junit.jupiter.api.Test;
import org.springframework.data.mongodb.CodecRegistryProvider;
import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.convert.NoOpDbRefResolver;
import org.springframework.data.mongodb.core.convert.QueryMapper;
import org.springframework.data.mongodb.core.convert.UpdateMapper;
import org.springframework.data.mongodb.core.mapping.MongoMappingContext;
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import com.mongodb.MongoClientSettings;
/**
* @author Christoph Strobl
*/
class UpdateOperationsUnitTests {
static final Document SHARD_KEY = new Document("country", "AT").append("userid", "4230");
static final Document SOURCE_DOC = appendShardKey(new Document("_id", "id-1"));
MongoMappingContext mappingContext = new MongoMappingContext();
MongoConverter mongoConverter = new MappingMongoConverter(NoOpDbRefResolver.INSTANCE, mappingContext);
QueryMapper queryMapper = new QueryMapper(mongoConverter);
UpdateMapper updateMapper = new UpdateMapper(mongoConverter);
EntityOperations entityOperations = new EntityOperations(mappingContext);
ExtendedQueryOperations queryOperations = new ExtendedQueryOperations(queryMapper, updateMapper, entityOperations,
MongoClientSettings::getDefaultCodecRegistry);
@Test // DATAMONGO-2341
void appliesShardKeyToFilter() {
Document sourceFilter = new Document("name", "kaladin");
assertThat(shardedFilter(sourceFilter, ShardedEntityWithNonDefaultShardKey.class, null))
.isEqualTo(appendShardKey(sourceFilter));
}
@Test
void applyShardKeyDoesNotAlterSourceFilter() {
Document sourceFilter = new Document("name", "kaladin");
shardedFilter(sourceFilter, ShardedEntityWithNonDefaultShardKey.class, null);
assertThat(sourceFilter).isEqualTo(new Document("name", "kaladin"));
}
@Test // DATAMONGO-2341
void appliesExistingShardKeyToFilter() {
Document sourceFilter = new Document("name", "kaladin");
Document existing = new Document("country", "GB").append("userid", "007");
assertThat(shardedFilter(sourceFilter, ShardedEntityWithNonDefaultShardKey.class, existing))
.isEqualTo(new Document(existing).append("name", "kaladin"));
}
@Test // DATAMONGO-2341
void recognizesExistingShardKeyInFilter() {
Document sourceFilter = appendShardKey(new Document("name", "kaladin"));
assertThat(queryOperations.replaceSingleContextFor(SOURCE_DOC).requiresShardKey(sourceFilter,
entityOf(ShardedEntityWithNonDefaultShardKey.class))).isFalse();
}
@Test // DATAMONGO-2341
void recognizesIdPropertyAsShardKey() {
Document sourceFilter = new Document("_id", "id-1");
assertThat(queryOperations.replaceSingleContextFor(SOURCE_DOC).requiresShardKey(sourceFilter,
entityOf(ShardedEntityWithDefaultShardKey.class))).isFalse();
}
@Test // DATAMONGO-2341
void returnsMappedShardKey() {
queryOperations.replaceSingleContextFor(SOURCE_DOC)
.getMappedShardKeyFields(entityOf(ShardedEntityWithDefaultShardKey.class))
.containsAll(Arrays.asList("country", "userid"));
}
@NonNull
private Document shardedFilter(Document sourceFilter, Class<?> entity, Document existing) {
return queryOperations.replaceSingleContextFor(SOURCE_DOC).applyShardKey(entity, sourceFilter, existing);
}
private static Document appendShardKey(Document source) {
Document target = new Document(source);
target.putAll(SHARD_KEY);
return target;
}
MongoPersistentEntity<?> entityOf(Class<?> type) {
return mappingContext.getPersistentEntity(type);
}
class ExtendedQueryOperations extends QueryOperations {
ExtendedQueryOperations(QueryMapper queryMapper, UpdateMapper updateMapper, EntityOperations entityOperations,
CodecRegistryProvider codecRegistryProvider) {
super(queryMapper, updateMapper, entityOperations, codecRegistryProvider);
}
@NonNull
private ExtendedUpdateContext replaceSingleContextFor(Document source) {
return new ExtendedUpdateContext(MappedDocument.of(source), true);
}
MongoPersistentEntity<?> entityOf(Class<?> type) {
return mappingContext.getPersistentEntity(type);
}
class ExtendedUpdateContext extends UpdateContext {
ExtendedUpdateContext(MappedDocument update, boolean upsert) {
super(update, upsert);
}
<T> Document applyShardKey(@Nullable Class<T> domainType, Document filter, @Nullable Document existing) {
return applyShardKey(entityOf(domainType), filter, existing);
}
}
}
}

41
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/mapping/BasicMongoPersistentEntityUnitTests.java

@ -255,6 +255,38 @@ public class BasicMongoPersistentEntityUnitTests { @@ -255,6 +255,38 @@ public class BasicMongoPersistentEntityUnitTests {
assertThat(entity.getCollation()).isEqualTo(org.springframework.data.mongodb.core.query.Collation.of("en_US"));
}
@Test // DATAMONGO-2341
public void detectsShardedEntityCorrectly() {
assertThat(entityOf(WithDefaultShardKey.class).isSharded()).isTrue();
assertThat(entityOf(Contact.class).isSharded()).isFalse();
}
@Test // DATAMONGO-2341
public void readsDefaultShardKey() {
assertThat(entityOf(WithDefaultShardKey.class).getShardKey().getDocument())
.isEqualTo(new org.bson.Document("_id", 1));
}
@Test // DATAMONGO-2341
public void readsSingleShardKey() {
assertThat(entityOf(WithSingleShardKey.class).getShardKey().getDocument())
.isEqualTo(new org.bson.Document("country", 1));
}
@Test // DATAMONGO-2341
public void readsMultiShardKey() {
assertThat(entityOf(WithMultiShardKey.class).getShardKey().getDocument())
.isEqualTo(new org.bson.Document("country", 1).append("userid", 1));
}
static <T> BasicMongoPersistentEntity<T> entityOf(Class<T> type) {
return new BasicMongoPersistentEntity<>(ClassTypeInformation.from(type));
}
@Document("contacts")
class Contact {}
@ -313,6 +345,15 @@ public class BasicMongoPersistentEntityUnitTests { @@ -313,6 +345,15 @@ public class BasicMongoPersistentEntityUnitTests {
@Document(collation = "{ 'locale' : 'en_US' }")
class WithDocumentCollation {}
@Sharded
class WithDefaultShardKey {}
@Sharded("country")
class WithSingleShardKey {}
@Sharded({ "country", "userid" })
class WithMultiShardKey {}
static class SampleExtension implements EvaluationContextExtension {
/*

1
src/main/asciidoc/index.adoc

@ -30,6 +30,7 @@ include::reference/reactive-mongo-repositories.adoc[leveloffset=+1] @@ -30,6 +30,7 @@ include::reference/reactive-mongo-repositories.adoc[leveloffset=+1]
include::{spring-data-commons-docs}/auditing.adoc[leveloffset=+1]
include::reference/mongo-auditing.adoc[leveloffset=+1]
include::reference/mapping.adoc[leveloffset=+1]
include::reference/sharding.adoc[leveloffset=+1]
include::reference/kotlin.adoc[leveloffset=+1]
include::reference/cross-store.adoc[leveloffset=+1]
include::reference/jmx.adoc[leveloffset=+1]

70
src/main/asciidoc/reference/sharding.adoc

@ -0,0 +1,70 @@ @@ -0,0 +1,70 @@
[[sharding]]
= Sharding
MongoDB supports large data sets via sharding, a method for distributing data across multiple machines. Please refer to the https://docs.mongodb.com/manual/sharding/[MongoDB Documentation] to learn how to set up a sharded cluster, its requirements and limitations.
Spring Data MongoDB uses the `@Sharded` annotation to identify entities stored in sharded collections as shown below.
====
[source, java]
----
@Document("users")
@Sharded(shardKey = { "country", "userId" }) <1>
public class User {
@Id
Long id;
@Field("userid")
String userId;
String country;
}
----
<1> The properties of the shard key are mapped to the actual field names. See
====
[[sharding.sharded-collections]]
== Sharded Collections
Spring Data MongoDB does not auto set up sharding for collections nor indexes required for it. The snippet below shows how to do so using the MongoDB client API.
====
[source, java]
----
MongoDatabase adminDB = template.getMongoDbFactory()
.getMongoDatabase("admin"); <1>
adminDB.runCommand(new Document("enableSharding", "db")); <2>
Document shardCmd = new Document("shardCollection", "db.users") <3>
.append("key", new Document("country", 1).append("userid", 1)); <4>
adminDB.runCommand(shardCmd);
----
<1> Sharding commands need to be run against the _admin_ database.
<2> Enable sharding for a specific database if necessary.
<3> Shard a collection within the database having sharding enabled.
<4> Set the shard key (Range based sharding in this case).
====
[[sharding.shard-key]]
== Shard Key Handling
The shard key consists of a single or multiple properties present in every document within the target collection, and is used to distribute them across shards.
Adding the `@Sharded` annotation to an entity enables Spring Data MongoDB to do best effort optimisations required for sharded scenarios when using repositories.
This means essentially adding required shard key information, if not already present, to `replaceOne` filter queries when upserting entities. This may require an additional server round trip to determine the actual value of the current shard key.
TIP: By setting `@Sharded(immutableKey = true)` no attempt will be made to check if an entities shard key changed.
Please see the https://docs.mongodb.com/manual/reference/method/db.collection.replaceOne/#upsert[MongoDB Documentation] for further details and the list below for which operations are eligible for auto include the shard key.
* `Reactive/CrudRepository.save(...)`
* `Reactive/CrudRepository.saveAll(...)`
* `Reactive/MongoTemplate.save(...)`
Loading…
Cancel
Save