diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/ClientSessionException.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/ClientSessionException.java new file mode 100644 index 000000000..89db56b52 --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/ClientSessionException.java @@ -0,0 +1,48 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb; + +import org.springframework.dao.NonTransientDataAccessException; +import org.springframework.lang.Nullable; + +/** + * {@link NonTransientDataAccessException} specific to MongoDB {@link com.mongodb.session.ClientSession} related data + * access failures such as reading data using an already closed session. + * + * @author Christoph Strobl + * @since 2.1 + */ +public class ClientSessionException extends NonTransientDataAccessException { + + /** + * Constructor for {@link ClientSessionException}. + * + * @param msg the detail message. Must not be {@literal null}. + */ + public ClientSessionException(String msg) { + super(msg); + } + + /** + * Constructor for {@link ClientSessionException}. + * + * @param msg the detail message. Can be {@literal null}. + * @param cause the root cause. Can be {@literal null}. + */ + public ClientSessionException(@Nullable String msg, @Nullable Throwable cause) { + super(msg, cause); + } +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/MongoDbFactory.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/MongoDbFactory.java index b46e624ce..1f51240f4 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/MongoDbFactory.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/MongoDbFactory.java @@ -20,8 +20,10 @@ import org.springframework.dao.DataAccessException; import org.springframework.dao.support.PersistenceExceptionTranslator; import org.springframework.data.mongodb.core.MongoExceptionTranslator; +import com.mongodb.ClientSessionOptions; import com.mongodb.DB; import com.mongodb.client.MongoDatabase; +import com.mongodb.session.ClientSession; /** * Interface for factories creating {@link DB} instances. @@ -67,4 +69,35 @@ public interface MongoDbFactory extends CodecRegistryProvider { default CodecRegistry getCodecRegistry() { return getDb().getCodecRegistry(); } + + /** + * Obtain a {@link ClientSession} for given ClientSessionOptions. + * + * @param options must not be {@literal null}. + * @return never {@literal null}. + * @since 2.1 + */ + ClientSession getSession(ClientSessionOptions options); + + /** + * Obtain a {@link ClientSession} bound instance of {@link MongoDbFactory} returning {@link MongoDatabase} instances + * that are aware and bound to a new session with given {@link ClientSessionOptions options}. + * + * @param options must not be {@literal null}. + * @return never {@literal null}. + * @since 2.1 + */ + default MongoDbFactory withSession(ClientSessionOptions options) { + return withSession(getSession(options)); + } + + /** + * Obtain a {@link ClientSession} bound instance of {@link MongoDbFactory} returning {@link MongoDatabase} instances + * that are aware and bound to the given session. + * + * @param options must not be {@literal null}. + * @return never {@literal null}. + * @since 2.1 + */ + MongoDbFactory withSession(ClientSession session); } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/ReactiveMongoDatabaseFactory.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/ReactiveMongoDatabaseFactory.java index 4ce6a714b..484b1d934 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/ReactiveMongoDatabaseFactory.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/ReactiveMongoDatabaseFactory.java @@ -16,12 +16,16 @@ package org.springframework.data.mongodb; +import reactor.core.publisher.Mono; + import org.bson.codecs.configuration.CodecRegistry; import org.springframework.dao.DataAccessException; import org.springframework.dao.support.PersistenceExceptionTranslator; import org.springframework.data.mongodb.core.MongoExceptionTranslator; +import com.mongodb.ClientSessionOptions; import com.mongodb.reactivestreams.client.MongoDatabase; +import com.mongodb.session.ClientSession; /** * Interface for factories creating reactive {@link MongoDatabase} instances. @@ -65,4 +69,24 @@ public interface ReactiveMongoDatabaseFactory extends CodecRegistryProvider { default CodecRegistry getCodecRegistry() { return getMongoDatabase().getCodecRegistry(); } + + /** + * Obtain a {@link Mono} emitting a {@link ClientSession} for given {@link ClientSessionOptions options}. + * + * @param options must not be {@literal null}. + * @return never {@literal null}. + * @since 2.1 + */ + Mono getSession(ClientSessionOptions options); + + /** + * Obtain a {@link ClientSession} bound instance of {@link ReactiveMongoDatabaseFactory} returning + * {@link MongoDatabase} instances that are aware and bound to the given session. + * + * @param options must not be {@literal null}. + * @return never {@literal null}. + * @since 2.1 + */ + ReactiveMongoDatabaseFactory withSession(ClientSession session); + } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/SessionAwareMethodInterceptor.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/SessionAwareMethodInterceptor.java new file mode 100644 index 000000000..5925115e0 --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/SessionAwareMethodInterceptor.java @@ -0,0 +1,173 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb; + +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.Optional; +import java.util.function.BiFunction; + +import org.aopalliance.intercept.MethodInterceptor; +import org.aopalliance.intercept.MethodInvocation; +import org.springframework.core.MethodClassKey; +import org.springframework.lang.Nullable; +import org.springframework.util.ClassUtils; +import org.springframework.util.ConcurrentReferenceHashMap; +import org.springframework.util.ObjectUtils; +import org.springframework.util.ReflectionUtils; + +import com.mongodb.WriteConcern; +import com.mongodb.session.ClientSession; + +/** + * {@link MethodInterceptor} implementation looking up and invoking an alternative target method having + * {@link ClientSession} as its first argument. This allows seamless integration with the existing code base. + *

+ * The {@link MethodInterceptor} is aware of methods on {@code MongoCollection} that my return new instances of itself + * like (eg. {@link com.mongodb.reactivestreams.client.MongoCollection#withWriteConcern(WriteConcern)} and decorate them + * if not already proxied. + * + * @since 2.1 + */ +public class SessionAwareMethodInterceptor implements MethodInterceptor { + + private static final MethodCache METHOD_CACHE = new MethodCache(); + + private final ClientSession session; + private final BiFunction collectionDecorator; + private final BiFunction databaseDecorator; + private final Object target; + private final Class targetType; + private final Class collectionType; + private final Class databaseType; + + /** + * Create a new SessionAwareMethodInterceptor for given target. + * + * @param session the {@link ClientSession} to be used on invocation. + * @param target the original target object. + * @param databaseType the MongoDB database type + * @param databaseDecorator a {@link BiFunction} used to create the proxy for an imperative / reactive + * {@code MongoDatabase}. + * @param collectionType the MongoDB collection type. + * @param collectionCallback a {@link BiFunction} used to create the proxy for an imperative / reactive + * {@code MongoCollection}. + * @param + */ + public SessionAwareMethodInterceptor(ClientSession session, T target, Class databaseType, + BiFunction databaseDecorator, Class collectionType, + BiFunction collectionDecorator) { + + this.session = session; + this.target = target; + this.databaseType = ClassUtils.getUserClass(databaseType); + this.collectionType = ClassUtils.getUserClass(collectionType); + this.collectionDecorator = collectionDecorator; + this.databaseDecorator = databaseDecorator; + + this.targetType = ClassUtils.isAssignable(databaseType, target.getClass()) ? databaseType : collectionType; + } + + /* + * (non-Javadoc) + * @see org.aopalliance.intercept.MethodInterceptor(org.aopalliance.intercept.MethodInvocation) + */ + @Override + public Object invoke(MethodInvocation methodInvocation) throws Throwable { + + if (requiresDecoration(methodInvocation)) { + + Object target = methodInvocation.proceed(); + if (target instanceof Proxy) { + return target; + } + + return decorate(target); + } + + if (!requiresSession(methodInvocation)) { + return methodInvocation.proceed(); + } + + Optional targetMethod = METHOD_CACHE.lookup(methodInvocation.getMethod(), targetType); + + return !targetMethod.isPresent() ? methodInvocation.proceed() + : ReflectionUtils.invokeMethod(targetMethod.get(), target, prependSessionToArguments(methodInvocation)); + } + + private boolean requiresDecoration(MethodInvocation methodInvocation) { + + return ClassUtils.isAssignable(databaseType, methodInvocation.getMethod().getReturnType()) + || ClassUtils.isAssignable(collectionType, methodInvocation.getMethod().getReturnType()); + } + + protected Object decorate(Object target) { + + return ClassUtils.isAssignable(databaseType, target.getClass()) ? databaseDecorator.apply(session, target) + : collectionDecorator.apply(session, target); + } + + private boolean requiresSession(MethodInvocation methodInvocation) { + + if (ObjectUtils.isEmpty(methodInvocation.getMethod().getParameterTypes()) + || !ClassUtils.isAssignable(ClientSession.class, methodInvocation.getMethod().getParameterTypes()[0])) { + return true; + } + + return false; + } + + private Object[] prependSessionToArguments(MethodInvocation invocation) { + + Object[] args = new Object[invocation.getArguments().length + 1]; + args[0] = session; + System.arraycopy(invocation.getArguments(), 0, args, 1, invocation.getArguments().length); + return args; + } + + /** + * Simple {@link Method} to {@link Method} caching facility for {@link ClientSession} overloaded targets. + * + * @since 2.1 + * @author Christoph Strobl + */ + static class MethodCache { + + private final ConcurrentReferenceHashMap> cache = new ConcurrentReferenceHashMap<>(); + + Optional lookup(Method method, Class targetClass) { + + return cache.computeIfAbsent(new MethodClassKey(method, targetClass), + val -> Optional.ofNullable(findTargetWithSession(method, targetClass))); + } + + @Nullable + private Method findTargetWithSession(Method sourceMethod, Class targetType) { + + Class[] argTypes = sourceMethod.getParameterTypes(); + Class[] args = new Class[argTypes.length + 1]; + args[0] = ClientSession.class; + System.arraycopy(argTypes, 0, args, 1, argTypes.length); + + return ReflectionUtils.findMethod(targetType, sourceMethod.getName(), args); + } + + boolean contains(Method method, Class targetClass) { + return cache.containsKey(new MethodClassKey(method, targetClass)); + } + } + +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultBulkOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultBulkOperations.java index 63c3256dc..7bce03edb 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultBulkOperations.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultBulkOperations.java @@ -274,19 +274,10 @@ class DefaultBulkOperations implements BulkOperations { public com.mongodb.bulk.BulkWriteResult execute() { try { - - MongoCollection collection = mongoOperations.getCollection(collectionName); - if (defaultWriteConcern != null) { - collection = collection.withWriteConcern(defaultWriteConcern); - } - - return collection.bulkWrite(models.stream().map(this::mapWriteModel).collect(Collectors.toList()), bulkOptions); - - } catch (BulkWriteException o_O) { - - DataAccessException toThrow = exceptionTranslator.translateExceptionIfPossible(o_O); - throw toThrow == null ? o_O : toThrow; - + + return mongoOperations.execute(collectionName, collection -> { + return collection.bulkWrite(models.stream().map(this::mapWriteModel).collect(Collectors.toList()), bulkOptions); + }); } finally { this.bulkOptions = getBulkWriteOptions(bulkOperationContext.getBulkMode()); } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultIndexOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultIndexOperations.java index 7871532f1..3ec168b72 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultIndexOperations.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultIndexOperations.java @@ -15,8 +15,6 @@ */ package org.springframework.data.mongodb.core; -import static org.springframework.data.mongodb.core.MongoTemplate.*; - import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -50,18 +48,22 @@ public class DefaultIndexOperations implements IndexOperations { private static final String PARTIAL_FILTER_EXPRESSION_KEY = "partialFilterExpression"; - private final MongoDbFactory mongoDbFactory; private final String collectionName; private final QueryMapper mapper; private final @Nullable Class type; + private MongoOperations mongoOperations; + /** * Creates a new {@link DefaultIndexOperations}. * * @param mongoDbFactory must not be {@literal null}. * @param collectionName must not be {@literal null}. * @param queryMapper must not be {@literal null}. + * @deprecated since 2.1. Please use + * {@link DefaultIndexOperations#DefaultIndexOperations(MongoOperations, String, Class)}. */ + @Deprecated public DefaultIndexOperations(MongoDbFactory mongoDbFactory, String collectionName, QueryMapper queryMapper) { this(mongoDbFactory, collectionName, queryMapper, null); } @@ -74,7 +76,10 @@ public class DefaultIndexOperations implements IndexOperations { * @param queryMapper must not be {@literal null}. * @param type Type used for mapping potential partial index filter expression. Can be {@literal null}. * @since 1.10 + * @deprecated since 2.1. Please use + * {@link DefaultIndexOperations#DefaultIndexOperations(MongoOperations, String, Class)}. */ + @Deprecated public DefaultIndexOperations(MongoDbFactory mongoDbFactory, String collectionName, QueryMapper queryMapper, @Nullable Class type) { @@ -82,10 +87,26 @@ public class DefaultIndexOperations implements IndexOperations { Assert.notNull(collectionName, "Collection name can not be null!"); Assert.notNull(queryMapper, "QueryMapper must not be null!"); - this.mongoDbFactory = mongoDbFactory; this.collectionName = collectionName; this.mapper = queryMapper; this.type = type; + this.mongoOperations = new MongoTemplate(mongoDbFactory); + } + + /** + * Creates a new {@link DefaultIndexOperations}. + * + * @param mongoOperations must not be {@literal null}. + * @param collectionName must not be {@literal null}. + * @param type can be {@literal null}. + * @since 2.1 + */ + public DefaultIndexOperations(MongoOperations mongoOperations, String collectionName, @Nullable Class type) { + + this.mongoOperations = mongoOperations; + this.mapper = new QueryMapper(mongoOperations.getConverter()); + this.collectionName = collectionName; + this.type = type; } /* @@ -187,11 +208,10 @@ public class DefaultIndexOperations implements IndexOperations { Assert.notNull(callback, "CollectionCallback must not be null!"); - try { - MongoCollection collection = mongoDbFactory.getDb().getCollection(collectionName); - return callback.doInCollection(collection); - } catch (RuntimeException e) { - throw potentiallyConvertRuntimeException(e, mongoDbFactory.getExceptionTranslator()); + if (type != null) { + return mongoOperations.execute(type, callback); } + + return mongoOperations.execute(collectionName, callback); } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoExceptionTranslator.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoExceptionTranslator.java index b6bcdb589..bc679de64 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoExceptionTranslator.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoExceptionTranslator.java @@ -29,6 +29,7 @@ import org.springframework.dao.InvalidDataAccessResourceUsageException; import org.springframework.dao.PermissionDeniedDataAccessException; import org.springframework.dao.support.PersistenceExceptionTranslator; import org.springframework.data.mongodb.BulkOperationException; +import org.springframework.data.mongodb.ClientSessionException; import org.springframework.data.mongodb.UncategorizedMongoDbException; import org.springframework.data.mongodb.util.MongoDbErrorCodes; import org.springframework.lang.Nullable; @@ -119,18 +120,26 @@ public class MongoExceptionTranslator implements PersistenceExceptionTranslator int code = ((MongoException) ex).getCode(); if (MongoDbErrorCodes.isDuplicateKeyCode(code)) { - throw new DuplicateKeyException(ex.getMessage(), ex); + return new DuplicateKeyException(ex.getMessage(), ex); } else if (MongoDbErrorCodes.isDataAccessResourceFailureCode(code)) { - throw new DataAccessResourceFailureException(ex.getMessage(), ex); + return new DataAccessResourceFailureException(ex.getMessage(), ex); } else if (MongoDbErrorCodes.isInvalidDataAccessApiUsageCode(code) || code == 10003 || code == 12001 || code == 12010 || code == 12011 || code == 12012) { - throw new InvalidDataAccessApiUsageException(ex.getMessage(), ex); + return new InvalidDataAccessApiUsageException(ex.getMessage(), ex); } else if (MongoDbErrorCodes.isPermissionDeniedCode(code)) { - throw new PermissionDeniedDataAccessException(ex.getMessage(), ex); + return new PermissionDeniedDataAccessException(ex.getMessage(), ex); } return new UncategorizedMongoDbException(ex.getMessage(), ex); } + if (ex instanceof IllegalStateException) { + for (StackTraceElement elm : ex.getStackTrace()) { + if (elm.getClassName().contains("ClientSession")) { + return new ClientSessionException(ex.getMessage(), ex); + } + } + } + // If we get here, we have an exception that resulted from user code, // rather than the persistence provider, so we return null to indicate // that translation should not occur. diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java index 038e998e4..2896cb40e 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java @@ -18,6 +18,8 @@ package org.springframework.data.mongodb.core; import java.util.Collection; import java.util.List; import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Supplier; import org.bson.Document; import org.springframework.data.geo.GeoResults; @@ -40,11 +42,13 @@ import org.springframework.data.mongodb.core.query.Update; import org.springframework.data.util.CloseableIterator; import org.springframework.lang.Nullable; +import com.mongodb.ClientSessionOptions; import com.mongodb.Cursor; import com.mongodb.ReadPreference; import com.mongodb.client.MongoCollection; import com.mongodb.client.result.DeleteResult; import com.mongodb.client.result.UpdateResult; +import com.mongodb.session.ClientSession; /** * Interface that specifies a basic set of MongoDB operations. Implemented by {@link MongoTemplate}. Not often used but @@ -151,6 +155,62 @@ public interface MongoOperations extends FluentMongoOperations { @Nullable T execute(String collectionName, CollectionCallback action); + /** + * Obtain a session bound instance of {@link SessionScoped} binding a new {@link ClientSession} with given + * {@literal sessionOptions} to each and every command issued against MongoDB. + * + * @param sessionOptions must not be {@literal null}. + * @return new instance of {@link SessionScoped}. Never {@literal null}. + * @since 2.1 + */ + SessionScoped withSession(ClientSessionOptions sessionOptions); + + /** + * Obtain a session bound instance of {@link SessionScoped} binding the {@link ClientSession} provided by the given + * {@link Supplier} to each and every command issued against MongoDB. + *

+ * Note: It is up to the caller to manage the {@link ClientSession} lifecycle. + * + * @param sessionProvider must not be {@literal null}. + * @param onComplete a simple hook called when done .Must not be {@literal null}. + * @since 2.1 + */ + default SessionScoped withSession(Supplier sessionProvider) { + + return new SessionScoped() { + + private final Object lock = new Object(); + private @Nullable ClientSession session = null; + + @Override + public T execute(SessionCallback action, Consumer onComplete) { + + synchronized (lock) { + if (session == null) { + session = sessionProvider.get(); + } + } + + try { + return action.doInSession(MongoOperations.this.withSession(session)); + } finally { + onComplete.accept(session); + } + } + }; + } + + /** + * Obtain a {@link ClientSession} bound instance of MongoOperations. + *

+ * Note: It is up to the caller to manage the {@link ClientSession} lifecycle. + * + * @param session must not be {@literal null}. + * @return {@link ClientSession} bound instance of {@link MongoOperations}. + * @since 2.1 + */ + MongoOperations withSession(ClientSession session); + /** * Executes the given {@link Query} on the entity collection of the specified {@code entityType} backed by a Mongo DB * {@link Cursor}. @@ -769,8 +829,8 @@ public interface MongoOperations extends FluentMongoOperations { } /** - * Triggers findAndModify - * to apply provided {@link Update} on documents matching {@link Criteria} of given {@link Query}. + * Triggers findAndModify + * to apply provided {@link Update} on documents matching {@link Criteria} of given {@link Query}. * * @param query the {@link Query} class that specifies the {@link Criteria} used to find a record and also an optional * fields specification. Must not be {@literal null}. @@ -782,8 +842,8 @@ public interface MongoOperations extends FluentMongoOperations { T findAndModify(Query query, Update update, Class entityClass); /** - * Triggers findAndModify - * to apply provided {@link Update} on documents matching {@link Criteria} of given {@link Query}. + * Triggers findAndModify + * to apply provided {@link Update} on documents matching {@link Criteria} of given {@link Query}. * * @param query the {@link Query} class that specifies the {@link Criteria} used to find a record and also an optional * fields specification. Must not be {@literal null}. @@ -796,8 +856,8 @@ public interface MongoOperations extends FluentMongoOperations { T findAndModify(Query query, Update update, Class entityClass, String collectionName); /** - * Triggers findAndModify - * to apply provided {@link Update} on documents matching {@link Criteria} of given {@link Query} taking + * Triggers findAndModify + * to apply provided {@link Update} on documents matching {@link Criteria} of given {@link Query} taking * {@link FindAndModifyOptions} into account. * * @param query the {@link Query} class that specifies the {@link Criteria} used to find a record and also an optional @@ -813,8 +873,8 @@ public interface MongoOperations extends FluentMongoOperations { T findAndModify(Query query, Update update, FindAndModifyOptions options, Class entityClass); /** - * Triggers findAndModify - * to apply provided {@link Update} on documents matching {@link Criteria} of given {@link Query} taking + * Triggers findAndModify + * to apply provided {@link Update} on documents matching {@link Criteria} of given {@link Query} taking * {@link FindAndModifyOptions} into account. * * @param query the {@link Query} class that specifies the {@link Criteria} used to find a record and also an optional diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java index 8362a1341..68d281b5b 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java @@ -124,6 +124,7 @@ import org.springframework.util.ObjectUtils; import org.springframework.util.ResourceUtils; import org.springframework.util.StringUtils; +import com.mongodb.ClientSessionOptions; import com.mongodb.Cursor; import com.mongodb.DBCollection; import com.mongodb.DBCursor; @@ -152,6 +153,7 @@ import com.mongodb.client.model.ValidationAction; import com.mongodb.client.model.ValidationLevel; import com.mongodb.client.result.DeleteResult; import com.mongodb.client.result.UpdateResult; +import com.mongodb.session.ClientSession; import com.mongodb.util.JSONParseException; /** @@ -219,7 +221,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, * @param databaseName must not be {@literal null} or empty. */ public MongoTemplate(MongoClient mongoClient, String databaseName) { - this(new SimpleMongoDbFactory(mongoClient, databaseName), null); + this(new SimpleMongoDbFactory(mongoClient, databaseName), (MongoConverter) null); } /** @@ -228,7 +230,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, * @param mongoDbFactory must not be {@literal null}. */ public MongoTemplate(MongoDbFactory mongoDbFactory) { - this(mongoDbFactory, null); + this(mongoDbFactory, (MongoConverter) null); } /** @@ -261,6 +263,20 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, } } + private MongoTemplate(MongoDbFactory dbFactory, MongoTemplate that) { + + this.mongoDbFactory = dbFactory; + this.exceptionTranslator = that.exceptionTranslator; + this.mongoConverter = that.mongoConverter instanceof MappingMongoConverter ? getDefaultMongoConverter(dbFactory) + : that.mongoConverter; + this.queryMapper = that.queryMapper; + this.updateMapper = that.updateMapper; + this.schemaMapper = that.schemaMapper; + this.projectionFactory = that.projectionFactory; + + this.mappingContext = that.mappingContext; + } + /** * Configures the {@link WriteResultChecking} to be used with the template. Setting {@literal null} will reset the * default of {@link #DEFAULT_WRITE_RESULT_CHECKING}. @@ -394,7 +410,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, Document mappedQuery = queryMapper.getMappedObject(query.getQueryObject(), persistentEntity); FindIterable cursor = new QueryCursorPreparer(query, entityType) - .prepare(collection.find(mappedQuery).projection(mappedFields)); + .prepare(collection.find(mappedQuery, Document.class).projection(mappedFields)); return new CloseableIterableCursorAdapter(cursor, exceptionTranslator, new ProjectingReadCallback<>(mongoConverter, entityType, returnType, collectionName)); @@ -509,7 +525,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, Assert.notNull(action, "DbCallbackmust not be null!"); try { - MongoDatabase db = this.getDb(); + MongoDatabase db = prepareDatabase(this.getDbInternal()); return action.doInDB(db); } catch (RuntimeException e) { throw potentiallyConvertRuntimeException(e, exceptionTranslator); @@ -536,13 +552,31 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, Assert.notNull(callback, "CollectionCallback must not be null!"); try { - MongoCollection collection = getAndPrepareCollection(getDb(), collectionName); + MongoCollection collection = getAndPrepareCollection(getDbInternal(), collectionName); return callback.doInCollection(collection); } catch (RuntimeException e) { throw potentiallyConvertRuntimeException(e, exceptionTranslator); } } + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.MongoOperations#withSession(com.mongodb.ClientSessionOptions) + */ + @Override + public SessionScoped withSession(ClientSessionOptions options) { + return withSession(() -> mongoDbFactory.getSession(options)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.MongoOperations#withSession(com.mongodb.session.ClientSession) + */ + @Override + public MongoTemplate withSession(ClientSession session) { + return new SessionBoundMongoTemplate(session, MongoTemplate.this); + } + /* * (non-Javadoc) * @see org.springframework.data.mongodb.core.MongoOperations#createCollection(java.lang.Class) @@ -617,6 +651,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, return execute(new DbCallback() { public Boolean doInDB(MongoDatabase db) throws MongoException, DataAccessException { + for (String name : db.listCollectionNames()) { if (name.equals(collectionName)) { return true; @@ -659,7 +694,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, * @see org.springframework.data.mongodb.core.ExecutableInsertOperation#indexOps(java.lang.String) */ public IndexOperations indexOps(String collectionName) { - return new DefaultIndexOperations(getMongoDbFactory(), collectionName, queryMapper); + return new DefaultIndexOperations(this, collectionName, null); } /* @@ -667,8 +702,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, * @see org.springframework.data.mongodb.core.ExecutableInsertOperation#indexOps(java.lang.Class) */ public IndexOperations indexOps(Class entityClass) { - return new DefaultIndexOperations(getMongoDbFactory(), determineCollectionName(entityClass), queryMapper, - entityClass); + return new DefaultIndexOperations(this, determineCollectionName(entityClass), entityClass); } /* @@ -847,10 +881,9 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, Class mongoDriverCompatibleType = getMongoDbFactory().getCodecFor(resultClass).map(Codec::getEncoderClass) .orElse((Class) BsonValue.class); - MongoIterable result = execute((db) -> { + MongoIterable result = execute(collectionName, (collection) -> { - DistinctIterable iterable = db.getCollection(collectionName).distinct(mappedFieldName, mappedQuery, - mongoDriverCompatibleType); + DistinctIterable iterable = collection.distinct(mappedFieldName, mappedQuery, mongoDriverCompatibleType); return query.getCollation().map(Collation::toMongoCollation).map(iterable::collation).orElse(iterable); }); @@ -1055,10 +1088,13 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, Assert.notNull(query, "Query must not be null!"); Assert.hasText(collectionName, "Collection name must not be null or empty!"); + CountOptions options = new CountOptions(); + query.getCollation().map(Collation::toMongoCollation).ifPresent(options::collation); + Document document = queryMapper.getMappedObject(query.getQueryObject(), Optional.ofNullable(entityClass).map(it -> mappingContext.getPersistentEntity(entityClass))); - return execute(collectionName, collection -> collection.count(document)); + return execute(collectionName, collection -> collection.count(document, options)); } /* @@ -1105,8 +1141,9 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, protected MongoCollection prepareCollection(MongoCollection collection) { if (this.readPreference != null) { - return collection.withReadPreference(readPreference); + collection = collection.withReadPreference(readPreference); } + return collection; } @@ -1372,6 +1409,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, MongoAction mongoAction = new MongoAction(writeConcern, MongoActionOperation.INSERT, collectionName, entityClass, document, null); WriteConcern writeConcernToUse = prepareWriteConcern(mongoAction); + if (writeConcernToUse == null) { collection.insertOne(document); } else { @@ -1781,10 +1819,10 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, String mapFunc = replaceWithResourceIfNecessary(mapFunction); String reduceFunc = replaceWithResourceIfNecessary(reduceFunction); - MongoCollection inputCollection = getCollection(inputCollectionName); + MongoCollection inputCollection = getAndPrepareCollection(getDbInternal(), inputCollectionName); // MapReduceOp - MapReduceIterable result = inputCollection.mapReduce(mapFunc, reduceFunc); + MapReduceIterable result = inputCollection.mapReduce(mapFunc, reduceFunc, Document.class); if (query != null && result != null) { if (query.getLimit() > 0 && mapReduceOptions.getLimit() == null) { @@ -1867,13 +1905,13 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, } if (document.containsKey("$reduce")) { - document.put("$reduce", replaceWithResourceIfNecessary(document.get("$reduce").toString())); + document.put("$reduce", replaceWithResourceIfNecessary(ObjectUtils.nullSafeToString(document.get("$reduce")))); } if (document.containsKey("$keyf")) { - document.put("$keyf", replaceWithResourceIfNecessary(document.get("$keyf").toString())); + document.put("$keyf", replaceWithResourceIfNecessary(ObjectUtils.nullSafeToString(document.get("$keyf")))); } if (document.containsKey("finalize")) { - document.put("finalize", replaceWithResourceIfNecessary(document.get("finalize").toString())); + document.put("finalize", replaceWithResourceIfNecessary(ObjectUtils.nullSafeToString(document.get("finalize")))); } Document commandObject = new Document("group", document); @@ -1882,7 +1920,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, LOGGER.debug("Executing Group with Document [{}]", serializeToJsonSafely(commandObject)); } - Document commandResult = executeCommand(commandObject); + Document commandResult = executeCommand(commandObject, this.readPreference); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Group command result = [{}]", commandResult); @@ -2107,7 +2145,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, return execute(collectionName, (CollectionCallback>) collection -> { - AggregateIterable cursor = collection.aggregate(pipeline) // + AggregateIterable cursor = collection.aggregate(pipeline, Document.class) // .allowDiskUse(options.isAllowDiskUse()) // .useCursor(true); @@ -2214,9 +2252,17 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, } public MongoDatabase getDb() { + return getDbInternal(); + } + + protected MongoDatabase getDbInternal() { return mongoDbFactory.getDb(); } + protected MongoDatabase prepareDatabase(MongoDatabase database) { + return database; + } + protected void maybeEmitEvent(MongoMappingEvent event) { if (null != eventPublisher) { eventPublisher.publishEvent(event); @@ -2559,7 +2605,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, try { T result = objectCallback - .doWith(collectionCallback.doInCollection(getAndPrepareCollection(getDb(), collectionName))); + .doWith(collectionCallback.doInCollection(getAndPrepareCollection(getDbInternal(), collectionName))); return result; } catch (RuntimeException e) { throw potentiallyConvertRuntimeException(e, exceptionTranslator); @@ -2594,7 +2640,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, try { FindIterable iterable = collectionCallback - .doInCollection(getAndPrepareCollection(getDb(), collectionName)); + .doInCollection(getAndPrepareCollection(getDbInternal(), collectionName)); if (preparer != null) { iterable = preparer.prepare(iterable); @@ -2630,7 +2676,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, try { FindIterable iterable = collectionCallback - .doInCollection(getAndPrepareCollection(getDb(), collectionName)); + .doInCollection(getAndPrepareCollection(getDbInternal(), collectionName)); if (preparer != null) { iterable = preparer.prepare(iterable); @@ -2775,7 +2821,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, public Document doInCollection(MongoCollection collection) throws MongoException, DataAccessException { - FindIterable iterable = collection.find(query); + FindIterable iterable = collection.find(query, Document.class); if (LOGGER.isDebugEnabled()) { @@ -2820,7 +2866,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, public FindIterable doInCollection(MongoCollection collection) throws MongoException, DataAccessException { - return collection.find(query).projection(fields); + return collection.find(query, Document.class).projection(fields); } } @@ -3374,4 +3420,53 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware, return ((Document) commandResult.get(CURSOR_FIELD)).get("id"); } } + + /** + * {@link MongoTemplate} extension bound to a specific {@link ClientSession} that is applied when interacting with the + * server through the driver API. + *

+ * The prepare steps for {@link MongoDatabase} and {@link MongoCollection} proxy the target and invoke the desired + * target method matching the actual arguments plus a {@link ClientSession}. + * + * @author Christoph Strobl + * @since 2.1 + */ + static class SessionBoundMongoTemplate extends MongoTemplate { + + private final MongoTemplate delegate; + + /** + * @param session must not be {@literal null}. + * @param mongoDbFactory must not be {@literal null}. + * @param mongoConverter must not be {@literal null}. + */ + SessionBoundMongoTemplate(ClientSession session, MongoTemplate that) { + + super(that.getMongoDbFactory().withSession(session), that); + + this.delegate = that; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.MongoTemplate#getCollection(java.lang.String) + */ + @Override + public MongoCollection getCollection(String collectionName) { + + // native MongoDB objects that offer methods with ClientSession must not be proxied. + return delegate.getCollection(collectionName); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.MongoTemplate#getDb() + */ + @Override + public MongoDatabase getDb() { + + // native MongoDB objects that offer methods with ClientSession must not be proxied. + return delegate.getDb(); + } + } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java index 9797979b0..99aee5584 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoOperations.java @@ -20,6 +20,8 @@ import reactor.core.publisher.Mono; import java.util.Collection; import java.util.List; +import java.util.function.Consumer; +import java.util.function.Supplier; import org.bson.Document; import org.reactivestreams.Publisher; @@ -37,11 +39,14 @@ import org.springframework.data.mongodb.core.query.NearQuery; import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Update; import org.springframework.lang.Nullable; +import org.springframework.util.Assert; +import com.mongodb.ClientSessionOptions; import com.mongodb.ReadPreference; import com.mongodb.client.result.DeleteResult; import com.mongodb.client.result.UpdateResult; import com.mongodb.reactivestreams.client.MongoCollection; +import com.mongodb.session.ClientSession; /** * Interface that specifies a basic set of MongoDB operations executed in a reactive way. @@ -141,6 +146,64 @@ public interface ReactiveMongoOperations extends ReactiveFluentMongoOperations { */ Flux execute(String collectionName, ReactiveCollectionCallback action); + /** + * Obtain a session bound instance of {@link SessionScoped} binding the {@link ClientSession} provided by the given + * {@link Supplier} to each and every command issued against MongoDB. + *

+ * Note: It is up to the caller to manage the {@link ClientSession} lifecycle. Use + * {@link #withSession(Supplier, Consumer)} to provide a hook for processing the {@link ClientSession} when done. + * + * @param sessionProvider must not be {@literal null}. + * @return new instance of {@link SessionScoped}. Never {@literal null}. + * @since 2.1 + */ + default ReactiveSessionScoped withSession(Supplier sessionProvider) { + + Assert.notNull(sessionProvider, "SessionProvider must not be null!"); + + return withSession(Mono.fromSupplier(sessionProvider)); + } + + /** + * Obtain a session bound instance of {@link SessionScoped} binding a new {@link ClientSession} with given + * {@literal sessionOptions} to each and every command issued against MongoDB. + * + * @param sessionOptions must not be {@literal null}. + * @return new instance of {@link SessionScoped}. Never {@literal null}. + * @since 2.1 + */ + ReactiveSessionScoped withSession(ClientSessionOptions sessionOptions); + + /** + * Obtain a session bound instance of {@link SessionScoped} binding the {@link ClientSession} provided by the given + * {@link Supplier} to each and every command issued against MongoDB. + *

+ * Note: It is up to the caller to manage the {@link ClientSession} lifecycle. Use the + * {@litera onComplete} hook to potentially close the {@link ClientSession}. + * + * @param sessionProvider must not be {@literal null}. + * @return new instance of {@link SessionScoped}. Never {@literal null}. + * @since 2.1 + */ + default ReactiveSessionScoped withSession(Publisher sessionProvider) { + + return new ReactiveSessionScoped() { + + private final Mono cachedSession = Mono.from(sessionProvider).cache(); + + @Override + public Flux execute(ReactiveSessionCallback action, Consumer doFinally) { + + return cachedSession.flatMapMany(session -> { + return Flux.from(action.doInSession(ReactiveMongoOperations.this.withSession(session))) + .doFinally((signalType) -> doFinally.accept(session)); + }); + } + }; + } + + ReactiveMongoOperations withSession(ClientSession session); + /** * Create an uncapped collection with a name based on the provided entity class. * diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java index f7e31741e..9fb8e80cf 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java @@ -26,6 +26,7 @@ import reactor.util.function.Tuple2; import java.util.*; import java.util.Map.Entry; +import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -105,6 +106,7 @@ import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; import com.mongodb.BasicDBObject; +import com.mongodb.ClientSessionOptions; import com.mongodb.CursorType; import com.mongodb.DBCollection; import com.mongodb.DBCursor; @@ -113,6 +115,7 @@ import com.mongodb.Mongo; import com.mongodb.MongoException; import com.mongodb.ReadPreference; import com.mongodb.WriteConcern; +import com.mongodb.client.model.CountOptions; import com.mongodb.client.model.CreateCollectionOptions; import com.mongodb.client.model.DeleteOptions; import com.mongodb.client.model.Filters; @@ -132,6 +135,7 @@ import com.mongodb.reactivestreams.client.MongoClient; import com.mongodb.reactivestreams.client.MongoCollection; import com.mongodb.reactivestreams.client.MongoDatabase; import com.mongodb.reactivestreams.client.Success; +import com.mongodb.session.ClientSession; import com.mongodb.util.JSONParseException; /** @@ -191,7 +195,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati * @param databaseName must not be {@literal null} or empty. */ public ReactiveMongoTemplate(MongoClient mongoClient, String databaseName) { - this(new SimpleReactiveMongoDatabaseFactory(mongoClient, databaseName), null); + this(new SimpleReactiveMongoDatabaseFactory(mongoClient, databaseName), (MongoConverter) null); } /** @@ -200,7 +204,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati * @param mongoDatabaseFactory must not be {@literal null}. */ public ReactiveMongoTemplate(ReactiveMongoDatabaseFactory mongoDatabaseFactory) { - this(mongoDatabaseFactory, null); + this(mongoDatabaseFactory, (MongoConverter) null); } /** @@ -236,6 +240,19 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati } } + private ReactiveMongoTemplate(ReactiveMongoDatabaseFactory dbFactory, ReactiveMongoTemplate that) { + + this.mongoDatabaseFactory = dbFactory; + this.exceptionTranslator = that.exceptionTranslator; + this.mongoConverter = that.mongoConverter; + this.queryMapper = that.queryMapper; + this.updateMapper = that.updateMapper; + this.schemaMapper = that.schemaMapper; + this.projectionFactory = that.projectionFactory; + + this.mappingContext = that.mappingContext; + } + /** * Configures the {@link WriteResultChecking} to be used with the template. Setting {@literal null} will reset the * default of {@link ReactiveMongoTemplate#DEFAULT_WRITE_RESULT_CHECKING}. @@ -406,6 +423,46 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati return createFlux(collectionName, callback); } + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.ReactiveMongoOperations#withSession(org.reactivestreams.Publisher, java.util.function.Consumer) + */ + @Override + public ReactiveSessionScoped withSession(Publisher sessionProvider) { + + return new ReactiveSessionScoped() { + + private final Mono cachedSession = Mono.from(sessionProvider).cache(); + + @Override + public Flux execute(ReactiveSessionCallback action, Consumer doFinally) { + + return cachedSession.flatMapMany(session -> { + return Flux + .from(action.doInSession(new ReactiveSessionBoundMongoTemplate(session, ReactiveMongoTemplate.this))) + .doFinally((signalType) -> doFinally.accept(session)); + }); + } + }; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.ReactiveMongoOperations#withSession(com.mongodb.session.ClientSession) + */ + public ReactiveMongoOperations withSession(ClientSession session) { + return new ReactiveSessionBoundMongoTemplate(session, ReactiveMongoTemplate.this); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.ReactiveMongoOperations#withSession(com.mongodb.ClientSessionOptions) + */ + @Override + public ReactiveSessionScoped withSession(ClientSessionOptions sessionOptions) { + return withSession(mongoDatabaseFactory.getSession(sessionOptions)); + } + /** * Create a reusable Flux for a {@link ReactiveDatabaseCallback}. It's up to the developer to choose to obtain a new * {@link Flux} or to reuse the {@link Flux}. @@ -417,7 +474,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati Assert.notNull(callback, "ReactiveDatabaseCallback must not be null!"); - return Flux.defer(() -> callback.doInDB(getMongoDatabase())).onErrorMap(translateException()); + return Flux.defer(() -> callback.doInDB(prepareDatabase(getDbInternal()))).onErrorMap(translateException()); } /** @@ -431,7 +488,8 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati Assert.notNull(callback, "ReactiveDatabaseCallback must not be null!"); - return Mono.defer(() -> Mono.from(callback.doInDB(getMongoDatabase()))).onErrorMap(translateException()); + return Mono.defer(() -> Mono.from(callback.doInDB(prepareDatabase(getDbInternal())))) + .onErrorMap(translateException()); } /** @@ -447,7 +505,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati Assert.notNull(callback, "ReactiveDatabaseCallback must not be null!"); Mono> collectionPublisher = Mono - .fromCallable(() -> getAndPrepareCollection(getMongoDatabase(), collectionName)); + .fromCallable(() -> getAndPrepareCollection(getDbInternal(), collectionName)); return collectionPublisher.flatMapMany(callback::doInCollection).onErrorMap(translateException()); } @@ -466,7 +524,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati Assert.notNull(callback, "ReactiveCollectionCallback must not be null!"); Mono> collectionPublisher = Mono - .fromCallable(() -> getAndPrepareCollection(getMongoDatabase(), collectionName)); + .fromCallable(() -> getAndPrepareCollection(getDbInternal(), collectionName)); return collectionPublisher.flatMap(collection -> Mono.from(callback.doInCollection(collection))) .onErrorMap(translateException()); @@ -548,7 +606,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati */ public Mono dropCollection(final String collectionName) { - return createMono(db -> db.getCollection(collectionName).drop()).doOnSuccess(success -> { + return createMono(collectionName, collection -> collection.drop()).doOnSuccess(success -> { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Dropped collection [" + collectionName + "]"); } @@ -564,6 +622,10 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati } public MongoDatabase getMongoDatabase() { + return getDbInternal(); + } + + protected MongoDatabase getDbInternal() { return mongoDatabaseFactory.getMongoDatabase(); } @@ -619,7 +681,8 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati return createFlux(collectionName, collection -> { Document mappedQuery = queryMapper.getMappedObject(query.getQueryObject(), getPersistentEntity(entityClass)); - FindPublisher findPublisher = collection.find(mappedQuery).projection(new Document("_id", 1)); + FindPublisher findPublisher = collection.find(mappedQuery, Document.class) + .projection(new Document("_id", 1)); findPublisher = query.getCollation().map(Collation::toMongoCollation).map(findPublisher::collation) .orElse(findPublisher); @@ -821,8 +884,8 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati private Flux aggregateAndMap(MongoCollection collection, List pipeline, AggregationOptions options, ReadDocumentCallback readCallback) { - AggregatePublisher cursor = collection.aggregate(pipeline).allowDiskUse(options.isAllowDiskUse()) - .useCursor(true); + AggregatePublisher cursor = collection.aggregate(pipeline, Document.class) + .allowDiskUse(options.isAllowDiskUse()).useCursor(true); if (options.getCollation().isPresent()) { cursor = cursor.collation(options.getCollation().map(Collation::toMongoCollation).get()); @@ -988,7 +1051,12 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati : queryMapper.getMappedObject(query.getQueryObject(), entityClass == null ? null : mappingContext.getPersistentEntity(entityClass)); - return collection.count(Document); + CountOptions options = new CountOptions(); + if (query != null) { + query.getCollation().map(Collation::toMongoCollation).ifPresent(options::collation); + } + + return collection.count(Document, options); }); } @@ -2197,7 +2265,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati private MongoCollection getAndPrepareCollection(MongoDatabase db, String collectionName) { try { - MongoCollection collection = db.getCollection(collectionName); + MongoCollection collection = db.getCollection(collectionName, Document.class); return prepareCollection(collection); } catch (RuntimeException e) { throw potentiallyConvertRuntimeException(e, exceptionTranslator); @@ -2235,6 +2303,15 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati return collection; } + /** + * @param database + * @return + * @since 2.1 + */ + protected MongoDatabase prepareDatabase(MongoDatabase database) { + return database; + } + /** * Prepare the WriteConcern before any processing is done using it. This allows a convenient way to apply custom * settings in sub-classes.
@@ -2323,7 +2400,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati Assert.notNull(action, "MongoDatabaseCallback must not be null!"); try { - MongoDatabase db = this.getMongoDatabase(); + MongoDatabase db = this.getDbInternal(); return action.doInDatabase(db); } catch (RuntimeException e) { throw potentiallyConvertRuntimeException(e, exceptionTranslator); @@ -2484,7 +2561,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati public Publisher doInCollection(MongoCollection collection) throws MongoException, DataAccessException { - FindPublisher publisher = collection.find(query); + FindPublisher publisher = collection.find(query, Document.class); if (LOGGER.isDebugEnabled()) { @@ -2526,13 +2603,13 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati public FindPublisher doInCollection(MongoCollection collection) { FindPublisher findPublisher; - if (query == null || query.isEmpty()) { - findPublisher = collection.find(); + if (ObjectUtils.isEmpty(query)) { + findPublisher = collection.find(Document.class); } else { - findPublisher = collection.find(query); + findPublisher = collection.find(query, Document.class); } - if (fields == null || fields.isEmpty()) { + if (ObjectUtils.isEmpty(fields)) { return findPublisher; } else { return findPublisher.projection(fields); @@ -2869,6 +2946,10 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati */ static class NoOpDbRefResolver implements DbRefResolver { + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.convert.DbRefResolver#resolveDbRef(org.springframework.data.mongodb.core.mapping.MongoPersistentProperty, org.springframework.data.mongodb.core.convert.DbRefResolverCallback) + */ @Override @Nullable public Object resolveDbRef(@Nonnull MongoPersistentProperty property, @Nonnull DBRef dbref, @@ -2876,6 +2957,10 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati return null; } + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.convert.DbRefResolver#created(org.springframework.data.mongodb.core.mapping.MongoPersistentProperty, org.springframework.data.mongodb.core.mapping.MongoPersistentEntity, java.lang.Object) + */ @Override @Nullable public DBRef createDbRef(org.springframework.data.mongodb.core.mapping.DBRef annotation, @@ -2883,14 +2968,71 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati return null; } + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.convert.DbRefResolver#fetch(com.mongodb.DBRef) + */ @Override public Document fetch(DBRef dbRef) { return null; } + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.convert.DbRefResolver#bulkFetch(java.util.List) + */ @Override public List bulkFetch(List dbRefs) { return Collections.emptyList(); } } + + /** + * {@link MongoTemplate} extension bound to a specific {@link ClientSession} that is applied when interacting with the + * server through the driver API. + *

+ * The prepare steps for {@link MongoDatabase} and {@link MongoCollection} proxy the target and invoke the desired + * target method matching the actual arguments plus a {@link ClientSession}. + * + * @author Christoph Strobl + * @since 2.1 + */ + static class ReactiveSessionBoundMongoTemplate extends ReactiveMongoTemplate { + + private final ReactiveMongoTemplate delegate; + + /** + * @param session must not be {@literal null}. + * @param mongoDbFactory must not be {@literal null}. + * @param mongoConverter must not be {@literal null}. + */ + ReactiveSessionBoundMongoTemplate(ClientSession session, ReactiveMongoTemplate that) { + + super(that.mongoDatabaseFactory.withSession(session), that); + + this.delegate = that; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.ReactiveMongoTemplate#getCollection(java.lang.String) + */ + @Override + public MongoCollection getCollection(String collectionName) { + + // native MongoDB objects that offer methods with ClientSession must not be proxied. + return delegate.getCollection(collectionName); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.core.ReactiveMongoTemplate#getMongoDatabase() + */ + @Override + public MongoDatabase getMongoDatabase() { + + // native MongoDB objects that offer methods with ClientSession must not be proxied. + return delegate.getMongoDatabase(); + } + } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveSessionCallback.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveSessionCallback.java new file mode 100644 index 000000000..2fd58c862 --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveSessionCallback.java @@ -0,0 +1,45 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.core; + +import org.reactivestreams.Publisher; +import org.springframework.data.mongodb.core.query.Query; + +/** + * Callback interface for executing operations within a {@link com.mongodb.session.ClientSession} reactively. + * + * @author Christoph Strobl + * @since 2.1 + */ +@FunctionalInterface +public interface ReactiveSessionCallback { + + /** + * Execute operations against a MongoDB instance via session bound {@link ReactiveMongoOperations}. The session is + * inferred directly into the operation so that no further interaction is necessary. + *

+ * Please note that only Spring Data specific abstractions like {@link ReactiveMongoOperations#find(Query, Class)} and + * others are enhanced with the {@link com.mongodb.session.ClientSession}. When obtaining plain MongoDB gateway + * objects like {@link com.mongodb.reactivestreams.client.MongoCollection} or + * {@link om.mongodb.reactivestreams.client.MongoDatabase} via eg. + * {@link ReactiveMongoOperations#getCollection(String)} we leave responsibility for + * {@link com.mongodb.session.ClientSession} again up to the caller. + * + * @param operations will never be {@literal null}. + * @return can be {@literal null}. + */ + Publisher doInSession(ReactiveMongoOperations operations); +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveSessionScoped.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveSessionScoped.java new file mode 100644 index 000000000..d5cc3d04f --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveSessionScoped.java @@ -0,0 +1,59 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.core; + +import reactor.core.publisher.Flux; + +import java.util.function.Consumer; + +import com.mongodb.session.ClientSession; + +/** + * Gateway interface to execute {@link ClientSession} bound operations against MongoDB via a + * {@link ReactiveSessionCallback}. + * + * @author Christoph Strobl + * @since 2.1 + */ +public interface ReactiveSessionScoped { + + /** + * Executes the given {@link ReactiveSessionCallback} within the {@link com.mongodb.session.ClientSession} + *

+ * It is up to the caller to make sure the {@link com.mongodb.session.ClientSession} is {@link ClientSession#close() + * closed} when done. + * + * @param action callback object that specifies the MongoDB action the callback action. Must not be {@literal null}. + * @param return type. + * @return a result object returned by the action. Can be {@literal null}. + */ + default Flux execute(ReactiveSessionCallback action) { + return execute(action, (session) -> {}); + } + + /** + * Executes the given {@link ReactiveSessionCallback} within the {@link com.mongodb.session.ClientSession} + *

+ * It is up to the caller to make sure the {@link com.mongodb.session.ClientSession} is {@link ClientSession#close() + * closed} when done. + * + * @param action callback object that specifies the MongoDB action the callback action. Must not be {@literal null}. + * @param doFinally + * @param return type. + * @return a result object returned by the action. Can be {@literal null}. + */ + Flux execute(ReactiveSessionCallback action, Consumer doFinally); +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/SessionCallback.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/SessionCallback.java new file mode 100644 index 000000000..af1963353 --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/SessionCallback.java @@ -0,0 +1,44 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.core; + +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.lang.Nullable; + +/** + * Callback interface for executing operations within a {@link com.mongodb.session.ClientSession}. + * + * @author Christoph Strobl + * @since 2.1 + */ +public interface SessionCallback { + + /** + * Execute operations against a MongoDB instance via session bound {@link MongoOperations}. The session is inferred + * directly into the operation so that no further interaction is necessary. + *

+ * Please note that only Spring Data specific abstractions like {@link MongoOperations#find(Query, Class)} and others + * are enhanced with the {@link com.mongodb.session.ClientSession}. When obtaining plain MongoDB gateway objects like + * {@link com.mongodb.client.MongoCollection} or {@link com.mongodb.client.MongoDatabase} via eg. + * {@link MongoOperations#getCollection(String)} we leave responsibility for {@link com.mongodb.session.ClientSession} + * again up to the caller. + * + * @param operations will never be {@literal null}. + * @return can be {@literal null}. + */ + @Nullable + T doInSession(MongoOperations operations); +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/SessionScoped.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/SessionScoped.java new file mode 100644 index 000000000..ecc6893c4 --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/SessionScoped.java @@ -0,0 +1,62 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.core; + +import java.util.function.Consumer; + +import org.springframework.lang.Nullable; + +import com.mongodb.session.ClientSession; + +/** + * Gateway interface to execute {@link ClientSession} bound operations against MongoDB via a {@link SessionCallback}. + *

+ * The very same bound {@link ClientSession} is used for all invocations of {@code execute} on the instance. + * + * @author Christoph Strobl + * @since 2.1 + */ +public interface SessionScoped { + + /** + * Executes the given {@link SessionCallback} within the {@link com.mongodb.session.ClientSession} + *

+ * It is up to the caller to make sure the {@link com.mongodb.session.ClientSession} is {@link ClientSession#close() + * closed} when done. + * + * @param action callback object that specifies the MongoDB action the callback action. Must not be {@literal null}. + * @param return type. + * @return a result object returned by the action. Can be {@literal null}. + */ + @Nullable + default T execute(SessionCallback action) { + return execute(action, session -> {}); + } + + /** + * Executes the given {@link SessionCallback} within the {@link com.mongodb.session.ClientSession} + *

+ * It is up to the caller to make sure the {@link com.mongodb.session.ClientSession} is {@link ClientSession#close() + * closed} when done. + * + * @param action callback object that specifies the MongoDB action the callback action. Must not be {@literal null}. + * @param doFinally + * @param return type. + * @return a result object returned by the action. Can be {@literal null}. + */ + @Nullable + T execute(SessionCallback action, Consumer doFinally); +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/SimpleMongoDbFactory.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/SimpleMongoDbFactory.java index a2d0d1ef7..f1ccf6f20 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/SimpleMongoDbFactory.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/SimpleMongoDbFactory.java @@ -17,18 +17,23 @@ package org.springframework.data.mongodb.core; import java.net.UnknownHostException; +import org.springframework.aop.framework.ProxyFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.dao.DataAccessException; import org.springframework.dao.support.PersistenceExceptionTranslator; import org.springframework.data.mongodb.MongoDbFactory; +import org.springframework.data.mongodb.SessionAwareMethodInterceptor; import org.springframework.lang.Nullable; import org.springframework.util.Assert; +import com.mongodb.ClientSessionOptions; import com.mongodb.DB; import com.mongodb.MongoClient; import com.mongodb.MongoClientURI; import com.mongodb.WriteConcern; +import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; +import com.mongodb.session.ClientSession; /** * Factory to create {@link DB} instances from a {@link MongoClient} instance. @@ -147,4 +152,120 @@ public class SimpleMongoDbFactory implements DisposableBean, MongoDbFactory { public DB getLegacyDb() { return mongoClient.getDB(databaseName); } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.MongoDbFactory#getSession(com.mongodb.ClientSessionOptions) + */ + @Override + public ClientSession getSession(ClientSessionOptions options) { + return mongoClient.startSession(options); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.MongoDbFactory#withSession(com.mongodb.session.ClientSession) + */ + @Override + public MongoDbFactory withSession(ClientSession session) { + return new ClientSessionBoundMongoDbFactory(session, this); + } + + /** + * {@link ClientSession} bound {@link MongoDbFactory} decorating the database with a + * {@link SessionAwareMethodInterceptor}. + * + * @author Christoph Strobl + * @since 2.1 + */ + static class ClientSessionBoundMongoDbFactory implements MongoDbFactory { + + private final ClientSession session; + private final MongoDbFactory delegate; + + ClientSessionBoundMongoDbFactory(ClientSession session, MongoDbFactory delegate) { + + this.session = session; + this.delegate = delegate; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.MongoDbFactory#getDb() + */ + @Override + public MongoDatabase getDb() throws DataAccessException { + return proxyMongoDatabase(delegate.getDb()); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.MongoDbFactory#getDb(java.lang.String) + */ + @Override + public MongoDatabase getDb(String dbName) throws DataAccessException { + return proxyMongoDatabase(delegate.getDb(dbName)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.MongoDbFactory#getExceptionTranslator() + */ + @Override + public PersistenceExceptionTranslator getExceptionTranslator() { + return delegate.getExceptionTranslator(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.MongoDbFactory#getLegacyDb() + */ + @Override + public DB getLegacyDb() { + return delegate.getLegacyDb(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.MongoDbFactory#getSession(com.mongodb.ClientSessionOptions) + */ + @Override + public ClientSession getSession(ClientSessionOptions options) { + return delegate.getSession(options); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.MongoDbFactory#withSession(com.mongodb.session.ClientSession) + */ + @Override + public MongoDbFactory withSession(ClientSession session) { + return delegate.withSession(session); + } + + private MongoDatabase proxyMongoDatabase(MongoDatabase database) { + return createProxyInstance(session, database, MongoDatabase.class); + } + + private MongoDatabase proxyDatabase(ClientSession session, MongoDatabase database) { + return createProxyInstance(session, database, MongoDatabase.class); + } + + private MongoCollection proxyCollection(ClientSession session, MongoCollection collection) { + return createProxyInstance(session, collection, MongoCollection.class); + } + + private T createProxyInstance(ClientSession session, T target, Class targetType) { + + ProxyFactory factory = new ProxyFactory(); + factory.setTarget(target); + factory.setInterfaces(targetType); + factory.setOpaque(true); + + factory.addAdvice(new SessionAwareMethodInterceptor<>(session, target, MongoDatabase.class, this::proxyDatabase, + MongoCollection.class, this::proxyCollection)); + + return (T) factory.getProxy(); + } + } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/SimpleReactiveMongoDatabaseFactory.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/SimpleReactiveMongoDatabaseFactory.java index c84ffcc46..7207c620e 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/SimpleReactiveMongoDatabaseFactory.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/SimpleReactiveMongoDatabaseFactory.java @@ -15,20 +15,27 @@ */ package org.springframework.data.mongodb.core; +import reactor.core.publisher.Mono; + import java.net.UnknownHostException; +import org.springframework.aop.framework.ProxyFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.dao.DataAccessException; import org.springframework.dao.support.PersistenceExceptionTranslator; import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory; +import org.springframework.data.mongodb.SessionAwareMethodInterceptor; import org.springframework.lang.Nullable; import org.springframework.util.Assert; +import com.mongodb.ClientSessionOptions; import com.mongodb.ConnectionString; import com.mongodb.WriteConcern; import com.mongodb.reactivestreams.client.MongoClient; import com.mongodb.reactivestreams.client.MongoClients; +import com.mongodb.reactivestreams.client.MongoCollection; import com.mongodb.reactivestreams.client.MongoDatabase; +import com.mongodb.session.ClientSession; /** * Factory to create {@link MongoDatabase} instances from a {@link MongoClient} instance. @@ -129,4 +136,111 @@ public class SimpleReactiveMongoDatabaseFactory implements DisposableBean, React public PersistenceExceptionTranslator getExceptionTranslator() { return this.exceptionTranslator; } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.ReactiveMongoDbFactory#getSession(com.mongodb.ClientSessionOptions) + */ + @Override + public Mono getSession(ClientSessionOptions options) { + return Mono.from(mongo.startSession(options)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.ReactiveMongoDbFactory#withSession(com.mongodb.session.ClientSession) + */ + @Override + public ReactiveMongoDatabaseFactory withSession(ClientSession session) { + return new ClientSessionBoundMongoDbFactory(session, this); + } + + /** + * {@link ClientSession} bound {@link ReactiveMongoDatabaseFactory} decorating the database with a + * {@link SessionAwareMethodInterceptor}. + * + * @author Christoph Strobl + * @since 2.1 + */ + static class ClientSessionBoundMongoDbFactory implements ReactiveMongoDatabaseFactory { + + private final ClientSession session; + private final ReactiveMongoDatabaseFactory delegate; + + ClientSessionBoundMongoDbFactory(ClientSession session, ReactiveMongoDatabaseFactory delegate) { + + this.session = session; + this.delegate = delegate; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.ReactiveMongoDatabaseFactory#getMongoDatabase() + */ + @Override + public MongoDatabase getMongoDatabase() throws DataAccessException { + return decorateDatabase(delegate.getMongoDatabase()); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.ReactiveMongoDatabaseFactory#getMongoDatabase(java.lang.String) + */ + @Override + public MongoDatabase getMongoDatabase(String dbName) throws DataAccessException { + return decorateDatabase(delegate.getMongoDatabase(dbName)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.ReactiveMongoDatabaseFactory#getExceptionTranslator() + */ + @Override + public PersistenceExceptionTranslator getExceptionTranslator() { + return delegate.getExceptionTranslator(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.ReactiveMongoDatabaseFactory#getSession(com.mongodb.ClientSessionOptions) + */ + @Override + public Mono getSession(ClientSessionOptions options) { + return delegate.getSession(options); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.ReactiveMongoDatabaseFactory#withSession(com.mongodb.session.ClientSession) + */ + @Override + public ReactiveMongoDatabaseFactory withSession(ClientSession session) { + return delegate.withSession(session); + } + + private MongoDatabase decorateDatabase(MongoDatabase database) { + return createProxyInstance(session, database, MongoDatabase.class); + } + + private MongoDatabase proxyDatabase(ClientSession session, MongoDatabase database) { + return createProxyInstance(session, database, MongoDatabase.class); + } + + private MongoCollection proxyCollection(ClientSession session, MongoCollection collection) { + return createProxyInstance(session, collection, MongoCollection.class); + } + + private T createProxyInstance(ClientSession session, T target, Class targetType) { + + ProxyFactory factory = new ProxyFactory(); + factory.setTarget(target); + factory.setInterfaces(targetType); + factory.setOpaque(true); + + factory.addAdvice(new SessionAwareMethodInterceptor<>(session, target, MongoDatabase.class, this::proxyDatabase, + MongoCollection.class, this::proxyCollection)); + + return (T) factory.getProxy(); + } + } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/DefaultDbRefResolver.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/DefaultDbRefResolver.java index e9b186570..cea30568d 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/DefaultDbRefResolver.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/DefaultDbRefResolver.java @@ -40,6 +40,7 @@ import org.springframework.cglib.proxy.MethodProxy; import org.springframework.dao.DataAccessException; import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.dao.support.PersistenceExceptionTranslator; +import org.springframework.data.mongodb.ClientSessionException; import org.springframework.data.mongodb.LazyLoadingException; import org.springframework.data.mongodb.MongoDbFactory; import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity; @@ -51,7 +52,7 @@ import org.springframework.util.ReflectionUtils; import org.springframework.util.StringUtils; import com.mongodb.DBRef; -import com.mongodb.client.MongoDatabase; +import com.mongodb.client.MongoCollection; import com.mongodb.client.model.Filters; /** @@ -84,6 +85,18 @@ public class DefaultDbRefResolver implements DbRefResolver { this.objenesis = new ObjenesisStd(true); } + /** + * Creates a new {@link DefaultDbRefResolver} with the given {@link MongoDbFactory}. + * + * @param mongoDbFactory must not be {@literal null}. + */ + private DefaultDbRefResolver(DefaultDbRefResolver delegate) { + + this.mongoDbFactory = delegate.mongoDbFactory; + this.exceptionTranslator = delegate.exceptionTranslator; + this.objenesis = delegate.objenesis; + } + /* * (non-Javadoc) * @see org.springframework.data.mongodb.core.convert.DbRefResolver#resolveDbRef(org.springframework.data.mongodb.core.mapping.MongoPersistentProperty, org.springframework.data.mongodb.core.convert.DbRefResolverCallback) @@ -126,9 +139,7 @@ public class DefaultDbRefResolver implements DbRefResolver { public Document fetch(DBRef dbRef) { StringUtils.hasText(dbRef.getDatabaseName()); - return (StringUtils.hasText(dbRef.getDatabaseName()) ? mongoDbFactory.getDb(dbRef.getDatabaseName()) - : mongoDbFactory.getDb()).getCollection(dbRef.getCollectionName(), Document.class) - .find(Filters.eq("_id", dbRef.getId())).first(); + return getCollection(dbRef).find(Filters.eq("_id", dbRef.getId())).first(); } /* @@ -158,9 +169,7 @@ public class DefaultDbRefResolver implements DbRefResolver { ids.add(ref.getId()); } - MongoDatabase db = mongoDbFactory.getDb(); - - List result = db.getCollection(collection) // + List result = getCollection(refs.iterator().next()) // .find(new Document("_id", new Document("$in", ids))) // .into(new ArrayList<>()); @@ -466,6 +475,11 @@ public class DefaultDbRefResolver implements DbRefResolver { } catch (RuntimeException ex) { DataAccessException translatedException = this.exceptionTranslator.translateExceptionIfPossible(ex); + + if (translatedException instanceof ClientSessionException) { + throw new LazyLoadingException("Unable to lazily resolve DBRef! Invalid session state.", ex); + } + throw new LazyLoadingException("Unable to lazily resolve DBRef!", translatedException != null ? translatedException : ex); } @@ -474,4 +488,17 @@ public class DefaultDbRefResolver implements DbRefResolver { return result; } } + + /** + * Customization hook for obtaining the {@link MongoCollection} for a given {@link DBRef}. + * + * @param dbref must not be {@literal null}. + * @return the {@link MongoCollection} the given {@link DBRef} points to. + * @since 2.1 + */ + protected MongoCollection getCollection(DBRef dbref) { + + return (StringUtils.hasText(dbref.getDatabaseName()) ? mongoDbFactory.getDb(dbref.getDatabaseName()) + : mongoDbFactory.getDb()).getCollection(dbref.getCollectionName(), Document.class); + } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/MappingMongoConverter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/MappingMongoConverter.java index c3e71b063..bd02a3ad1 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/MappingMongoConverter.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/MappingMongoConverter.java @@ -147,8 +147,7 @@ public class MappingMongoConverter extends AbstractMongoConverter implements App */ public void setTypeMapper(@Nullable MongoTypeMapper typeMapper) { this.typeMapper = typeMapper == null - ? new DefaultMongoTypeMapper(DefaultMongoTypeMapper.DEFAULT_TYPE_KEY, mappingContext) - : typeMapper; + ? new DefaultMongoTypeMapper(DefaultMongoTypeMapper.DEFAULT_TYPE_KEY, mappingContext) : typeMapper; } /* @@ -557,8 +556,7 @@ public class MappingMongoConverter extends AbstractMongoConverter implements App } MongoPersistentEntity entity = isSubtype(prop.getType(), obj.getClass()) - ? mappingContext.getRequiredPersistentEntity(obj.getClass()) - : mappingContext.getRequiredPersistentEntity(type); + ? mappingContext.getRequiredPersistentEntity(obj.getClass()) : mappingContext.getRequiredPersistentEntity(type); Object existingValue = accessor.get(prop); Document document = existingValue instanceof Document ? (Document) existingValue : new Document(); @@ -777,8 +775,7 @@ public class MappingMongoConverter extends AbstractMongoConverter implements App } return conversions.hasCustomWriteTarget(key.getClass(), String.class) - ? (String) getPotentiallyConvertedSimpleWrite(key) - : key.toString(); + ? (String) getPotentiallyConvertedSimpleWrite(key) : key.toString(); } /** @@ -1438,8 +1435,7 @@ public class MappingMongoConverter extends AbstractMongoConverter implements App } List referencedRawDocuments = dbrefs.size() == 1 - ? Collections.singletonList(readRef(dbrefs.iterator().next())) - : bulkReadRefs(dbrefs); + ? Collections.singletonList(readRef(dbrefs.iterator().next())) : bulkReadRefs(dbrefs); String collectionName = dbrefs.iterator().next().getCollectionName(); List targeList = new ArrayList<>(dbrefs.size()); diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/MongoConverter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/MongoConverter.java index 41d5aa62e..c007193b3 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/MongoConverter.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/MongoConverter.java @@ -69,7 +69,6 @@ public interface MongoConverter Assert.notNull(targetType, "TargetType must not be null!"); Assert.notNull(dbRefResolver, "DbRefResolver must not be null!"); - if (targetType != Object.class && ClassUtils.isAssignable(targetType, source.getClass())) { return (T) source; } diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/SessionAwareMethodInterceptorUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/SessionAwareMethodInterceptorUnitTests.java new file mode 100644 index 000000000..f2b2c75f2 --- /dev/null +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/SessionAwareMethodInterceptorUnitTests.java @@ -0,0 +1,182 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb; + +import static org.assertj.core.api.Assertions.*; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.any; + +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; + +import org.bson.Document; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.springframework.aop.framework.ProxyFactory; +import org.springframework.data.mongodb.SessionAwareMethodInterceptor.MethodCache; +import org.springframework.test.util.ReflectionTestUtils; +import org.springframework.util.ClassUtils; + +import com.mongodb.MongoClient; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; +import com.mongodb.session.ClientSession; + +/** + * @author Christoph Strobl + */ +@RunWith(MockitoJUnitRunner.class) +public class SessionAwareMethodInterceptorUnitTests { + + @Mock ClientSession session; + @Mock MongoCollection targetCollection; + @Mock MongoDatabase targetDatabase; + + MongoCollection collection; + MongoDatabase database; + + @Before + public void setUp() { + + collection = createProxyInstance(session, targetCollection, MongoCollection.class); + database = createProxyInstance(session, targetDatabase, MongoDatabase.class); + } + + @Test // DATAMONGO-1880 + public void proxyFactoryOnCollectionDelegatesToMethodWithSession() { + + collection.find(); + + verify(targetCollection).find(eq(session)); + } + + @Test // DATAMONGO-1880 + public void proxyFactoryOnCollectionWithSessionInArgumentListProceedsWithExecution() { + + ClientSession yetAnotherSession = mock(ClientSession.class); + collection.find(yetAnotherSession); + + verify(targetCollection).find(eq(yetAnotherSession)); + } + + @Test // DATAMONGO-1880 + public void proxyFactoryOnDatabaseDelegatesToMethodWithSession() { + + database.drop(); + + verify(targetDatabase).drop(eq(session)); + } + + @Test // DATAMONGO-1880 + public void proxyFactoryOnDatabaseWithSessionInArgumentListProceedsWithExecution() { + + ClientSession yetAnotherSession = mock(ClientSession.class); + database.drop(yetAnotherSession); + + verify(targetDatabase).drop(eq(yetAnotherSession)); + } + + @Test // DATAMONGO-1880 + public void justMoveOnIfNoOverloadWithSessionAvailable() { + + collection.getReadPreference(); + + verify(targetCollection).getReadPreference(); + } + + @Test // DATAMONGO-1880 + public void usesCacheForMethodLookup() { + + MethodCache cache = (MethodCache) ReflectionTestUtils.getField(SessionAwareMethodInterceptor.class, "METHOD_CACHE"); + Method countMethod = ClassUtils.getMethod(MongoCollection.class, "count"); + + assertThat(cache.contains(countMethod, MongoCollection.class)).isFalse(); + + collection.count(); + + assertThat(cache.contains(countMethod, MongoCollection.class)).isTrue(); + } + + @Test // DATAMONGO-1880 + public void cachesNullForMethodsThatDoNotHaveASessionOverload() { + + MethodCache cache = (MethodCache) ReflectionTestUtils.getField(SessionAwareMethodInterceptor.class, "METHOD_CACHE"); + Method readConcernMethod = ClassUtils.getMethod(MongoCollection.class, "getReadConcern"); + + assertThat(cache.contains(readConcernMethod, MongoCollection.class)).isFalse(); + + collection.getReadConcern(); + + collection.getReadConcern(); + + assertThat(cache.contains(readConcernMethod, MongoCollection.class)).isTrue(); + assertThat(cache.lookup(readConcernMethod, MongoCollection.class)).isEmpty(); + } + + @Test // DATAMONGO-1880 + public void proxiesNewDbInstanceReturnedByMethdod() { + + MongoDatabase otherDb = mock(MongoDatabase.class); + when(targetDatabase.withCodecRegistry(any())).thenReturn(otherDb); + + MongoDatabase target = database.withCodecRegistry(MongoClient.getDefaultCodecRegistry()); + assertThat(target).isInstanceOf(Proxy.class).isNotSameAs(database).isNotSameAs(targetDatabase); + + target.drop(); + + verify(otherDb).drop(eq(session)); + } + + @Test // DATAMONGO-1880 + public void proxiesNewCollectionInstanceReturnedByMethdod() { + + MongoCollection otherCollection = mock(MongoCollection.class); + when(targetCollection.withCodecRegistry(any())).thenReturn(otherCollection); + + MongoCollection target = collection.withCodecRegistry(MongoClient.getDefaultCodecRegistry()); + assertThat(target).isInstanceOf(Proxy.class).isNotSameAs(collection).isNotSameAs(targetCollection); + + target.drop(); + + verify(otherCollection).drop(eq(session)); + } + + private MongoDatabase proxyDatabase(ClientSession session, MongoDatabase database) { + return createProxyInstance(session, database, MongoDatabase.class); + } + + private MongoCollection proxyCollection(ClientSession session, MongoCollection collection) { + return createProxyInstance(session, collection, MongoCollection.class); + } + + private T createProxyInstance(ClientSession session, T target, Class targetType) { + + ProxyFactory factory = new ProxyFactory(); + factory.setTarget(target); + factory.setInterfaces(targetType); + factory.setOpaque(true); + + factory.addAdvice(new SessionAwareMethodInterceptor<>(session, target, MongoDatabase.class, this::proxyDatabase, + MongoCollection.class, this::proxyCollection)); + + return (T) factory.getProxy(); + } + +} diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ClientSessionTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ClientSessionTests.java new file mode 100644 index 000000000..0eeea77d6 --- /dev/null +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ClientSessionTests.java @@ -0,0 +1,68 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.core; + +import static org.assertj.core.api.Assertions.*; + +import org.bson.Document; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.data.mongodb.test.util.MongoVersionRule; +import org.springframework.data.util.Version; + +import com.mongodb.ClientSessionOptions; +import com.mongodb.MongoClient; +import com.mongodb.session.ClientSession; + +/** + * @author Christoph Strobl + */ +public class ClientSessionTests { + + public static @ClassRule MongoVersionRule REQUIRES_AT_LEAST_3_6_0 = MongoVersionRule.atLeast(Version.parse("3.6.0")); + + MongoTemplate template; + MongoClient client; + + @Before + public void setUp() { + + client = new MongoClient(); + template = new MongoTemplate(client, "reflective-client-session-tests"); + template.getDb().getCollection("test").drop(); + + template.getDb().getCollection("test").insertOne(new Document("_id", "id-1").append("value", "spring")); + } + + @Test // DATAMONGO-1880 + public void shouldApplyClientSession() { + + ClientSession session = client.startSession(ClientSessionOptions.builder().causallyConsistent(true).build()); + + assertThat(session.getOperationTime()).isNull(); + + Document doc = template.withSession(() -> session) + .execute(action -> action.findOne(new Query(), Document.class, "test")); + + assertThat(doc).isNotNull(); + assertThat(session.getOperationTime()).isNotNull(); + assertThat(session.getServerSession().isClosed()).isFalse(); + + session.close(); + } +} diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultBulkOperationsIntegrationTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultBulkOperationsIntegrationTests.java index 3ad96d7e2..6f5fe3eec 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultBulkOperationsIntegrationTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultBulkOperationsIntegrationTests.java @@ -23,11 +23,14 @@ import java.util.Arrays; import java.util.List; import java.util.Optional; +import com.mongodb.BulkWriteException; import org.bson.Document; +import org.hamcrest.core.IsInstanceOf; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.dao.DuplicateKeyException; import org.springframework.data.mongodb.core.BulkOperations.BulkMode; import org.springframework.data.mongodb.core.DefaultBulkOperations.BulkOperationContext; import org.springframework.data.mongodb.core.convert.QueryMapper; @@ -102,10 +105,12 @@ public class DefaultBulkOperationsIntegrationTests { try { createBulkOps(BulkMode.ORDERED).insert(documents).execute(); fail(); - } catch (MongoBulkWriteException e) { - assertThat(e.getWriteResult().getInsertedCount(), is(1)); // fails after first error - assertThat(e.getWriteErrors(), notNullValue()); - assertThat(e.getWriteErrors().size(), is(1)); + } catch (DuplicateKeyException e) { + + assertThat(e.getCause(), IsInstanceOf.instanceOf(MongoBulkWriteException.class)); + assertThat(((MongoBulkWriteException)e.getCause()).getWriteResult().getInsertedCount(), is(1)); // fails after first error + assertThat(((MongoBulkWriteException)e.getCause()).getWriteErrors(), notNullValue()); + assertThat(((MongoBulkWriteException)e.getCause()).getWriteErrors().size(), is(1)); } } @@ -125,10 +130,12 @@ public class DefaultBulkOperationsIntegrationTests { try { createBulkOps(BulkMode.UNORDERED).insert(documents).execute(); fail(); - } catch (MongoBulkWriteException e) { - assertThat(e.getWriteResult().getInsertedCount(), is(2)); // two docs were inserted - assertThat(e.getWriteErrors(), notNullValue()); - assertThat(e.getWriteErrors().size(), is(1)); + } catch (DuplicateKeyException e) { + + assertThat(e.getCause(), IsInstanceOf.instanceOf(MongoBulkWriteException.class)); + assertThat(((MongoBulkWriteException)e.getCause()).getWriteResult().getInsertedCount(), is(2)); // two docs were inserted + assertThat(((MongoBulkWriteException)e.getCause()).getWriteErrors(), notNullValue()); + assertThat(((MongoBulkWriteException)e.getCause()).getWriteErrors().size(), is(1)); } } diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultBulkOperationsUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultBulkOperationsUnitTests.java index 9f36332ed..797bfb08c 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultBulkOperationsUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/DefaultBulkOperationsUnitTests.java @@ -19,6 +19,7 @@ import static org.assertj.core.api.Assertions.*; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.*; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.eq; import static org.springframework.data.mongodb.core.query.Criteria.*; import static org.springframework.data.mongodb.core.query.Query.*; @@ -34,6 +35,7 @@ import org.mockito.Captor; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; import org.springframework.data.annotation.Id; +import org.springframework.data.mongodb.MongoDbFactory; import org.springframework.data.mongodb.core.BulkOperations.BulkMode; import org.springframework.data.mongodb.core.DefaultBulkOperations.BulkOperationContext; import org.springframework.data.mongodb.core.convert.DbRefResolver; @@ -48,6 +50,7 @@ import org.springframework.data.mongodb.core.query.Collation; import org.springframework.data.mongodb.core.query.Update; import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.DeleteManyModel; import com.mongodb.client.model.UpdateManyModel; import com.mongodb.client.model.UpdateOneModel; @@ -62,8 +65,10 @@ import com.mongodb.client.model.WriteModel; @RunWith(MockitoJUnitRunner.class) public class DefaultBulkOperationsUnitTests { - @Mock MongoTemplate template; + MongoTemplate template; + @Mock MongoDatabase database; @Mock MongoCollection collection; + @Mock MongoDbFactory factory; @Mock DbRefResolver dbRefResolver; @Captor ArgumentCaptor>> captor; MongoConverter converter; @@ -79,7 +84,10 @@ public class DefaultBulkOperationsUnitTests { converter = new MappingMongoConverter(dbRefResolver, mappingContext); - when(template.getCollection(anyString())).thenReturn(collection); + template = new MongoTemplate(factory, converter); + + when(factory.getDb()).thenReturn(database); + when(database.getCollection(anyString(), eq(Document.class))).thenReturn(collection); ops = new DefaultBulkOperations(template, "collection-1", new BulkOperationContext(BulkMode.ORDERED, diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoExceptionTranslatorUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoExceptionTranslatorUnitTests.java index b7fe8d583..272c5eac4 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoExceptionTranslatorUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoExceptionTranslatorUnitTests.java @@ -137,14 +137,13 @@ public class MongoExceptionTranslatorUnitTests { private void checkTranslatedMongoException(Class clazz, int code) { - try { - translator.translateExceptionIfPossible(new MongoException(code, "")); - fail("Expected exception of type " + clazz.getName() + "!"); - } catch (NestedRuntimeException e) { - Throwable cause = e.getRootCause(); - assertThat(cause, is(instanceOf(MongoException.class))); - assertThat(((MongoException) cause).getCode(), is(code)); - } + DataAccessException translated = translator.translateExceptionIfPossible(new MongoException(code, "")); + + assertThat("Expected exception of type " + clazz.getName() + "!", translated, is(not(nullValue()))); + + Throwable cause = translated.getRootCause(); + assertThat(cause, is(instanceOf(MongoException.class))); + assertThat(((MongoException) cause).getCode(), is(code)); } private static void expectExceptionWithCauseMessage(NestedRuntimeException e, diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateUnitTests.java index 5c9ffaa10..28c3b6072 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateUnitTests.java @@ -129,8 +129,8 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests { when(factory.getExceptionTranslator()).thenReturn(exceptionTranslator); when(db.getCollection(Mockito.any(String.class), eq(Document.class))).thenReturn(collection); when(db.runCommand(Mockito.any(), Mockito.any(Class.class))).thenReturn(commandResultDocument); - when(collection.find(Mockito.any(org.bson.Document.class))).thenReturn(findIterable); - when(collection.mapReduce(Mockito.any(), Mockito.any())).thenReturn(mapReduceIterable); + when(collection.find(Mockito.any(org.bson.Document.class), any(Class.class))).thenReturn(findIterable); + when(collection.mapReduce(Mockito.any(), Mockito.any(), eq(Document.class))).thenReturn(mapReduceIterable); when(collection.count(any(Bson.class), any(CountOptions.class))).thenReturn(1L); when(collection.aggregate(any(List.class), any())).thenReturn(aggregateIterable); when(collection.withReadPreference(any())).thenReturn(collection); @@ -324,7 +324,7 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests { BasicQuery query = new BasicQuery("{'foo':'bar'}"); template.findAllAndRemove(query, VersionedEntity.class); - verify(collection, times(1)).find(Matchers.eq(query.getQueryObject())); + verify(collection, times(1)).find(Mockito.eq(query.getQueryObject()), Mockito.any(Class.class)); } @Test // DATAMONGO-566 @@ -445,7 +445,7 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests { when(output.iterator()).thenReturn(cursor); when(cursor.hasNext()).thenReturn(false); - when(collection.mapReduce(anyString(), anyString())).thenReturn(output); + when(collection.mapReduce(anyString(), anyString(), eq(Document.class))).thenReturn(output); Query query = new BasicQuery("{'foo':'bar'}"); query.limit(100); @@ -466,7 +466,7 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests { when(output.iterator()).thenReturn(cursor); when(cursor.hasNext()).thenReturn(false); - when(collection.mapReduce(anyString(), anyString())).thenReturn(output); + when(collection.mapReduce(anyString(), anyString(), eq(Document.class))).thenReturn(output); Query query = new BasicQuery("{'foo':'bar'}"); @@ -487,7 +487,7 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests { when(output.iterator()).thenReturn(cursor); when(cursor.hasNext()).thenReturn(false); - when(collection.mapReduce(anyString(), anyString())).thenReturn(output); + when(collection.mapReduce(anyString(), anyString(), eq(Document.class))).thenReturn(output); template.mapReduce("collection", "function(){}", "function(key,values){}", new MapReduceOptions().limit(1000), Wrapper.class); @@ -506,7 +506,7 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests { when(output.iterator()).thenReturn(cursor); when(cursor.hasNext()).thenReturn(false); - when(collection.mapReduce(anyString(), anyString())).thenReturn(output); + when(collection.mapReduce(anyString(), anyString(), eq(Document.class))).thenReturn(output); Query query = new BasicQuery("{'foo':'bar'}"); query.limit(100); @@ -810,6 +810,18 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests { equalTo(new Document("locale", "fr"))); } + @Test // DATAMONGO-1880 + public void countShouldUseCollationWhenPresent() { + + template.count(new BasicQuery("{}").collation(Collation.of("fr")), AutogenerateableId.class); + + ArgumentCaptor options = ArgumentCaptor.forClass(CountOptions.class); + verify(collection).count(any(), options.capture()); + + assertThat(options.getValue().getCollation(), + is(equalTo(com.mongodb.client.model.Collation.builder().locale("fr").build()))); + } + @Test // DATAMONGO-1733 public void appliesFieldsWhenInterfaceProjectionIsClosedAndQueryDoesNotDefineFields() { diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveClientSessionTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveClientSessionTests.java new file mode 100644 index 000000000..95d7fe4f2 --- /dev/null +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveClientSessionTests.java @@ -0,0 +1,134 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.core; + +import static org.assertj.core.api.Assertions.*; + +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +import org.bson.Document; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.data.mongodb.test.util.MongoVersionRule; +import org.springframework.data.util.Version; + +import com.mongodb.ClientSessionOptions; +import com.mongodb.reactivestreams.client.MongoClient; +import com.mongodb.reactivestreams.client.MongoClients; +import com.mongodb.session.ClientSession; + +/** + * @author Christoph Strobl + */ +public class ReactiveClientSessionTests { + + public static @ClassRule MongoVersionRule REQUIRES_AT_LEAST_3_6_0 = MongoVersionRule.atLeast(Version.parse("3.6.0")); + + MongoClient client; + ReactiveMongoTemplate template; + + @Before + public void setUp() { + + client = MongoClients.create(); + + template = new ReactiveMongoTemplate(client, "reflective-client-session-tests"); + + StepVerifier.create(template.dropCollection("test")).verifyComplete(); + + StepVerifier.create(template.insert(new Document("_id", "id-1").append("value", "spring"), "test")) + .expectNextCount(1).verifyComplete(); + } + + @Test // DATAMONGO-1880 + public void shouldApplyClientSession() { + + ClientSession session = Mono + .from(client.startSession(ClientSessionOptions.builder().causallyConsistent(true).build())).block(); + + assertThat(session.getOperationTime()).isNull(); + + StepVerifier.create(template.withSession(() -> session).execute(action -> action.findAll(Document.class, "test"))) + .expectNextCount(1).verifyComplete(); + + assertThat(session.getOperationTime()).isNotNull(); + assertThat(session.getServerSession().isClosed()).isFalse(); + + session.close(); + } + + @Test // DATAMONGO-1880 + public void useMonoInCallback() { + + ClientSession session = Mono + .from(client.startSession(ClientSessionOptions.builder().causallyConsistent(true).build())).block(); + + assertThat(session.getOperationTime()).isNull(); + + StepVerifier + .create( + template.withSession(() -> session).execute(action -> action.findOne(new Query(), Document.class, "test"))) + .expectNextCount(1).verifyComplete(); + + assertThat(session.getOperationTime()).isNotNull(); + assertThat(session.getServerSession().isClosed()).isFalse(); + + session.close(); + } + + @Test // DATAMONGO-1880 + public void reusesClientSessionInSessionScopedCallback() { + + ClientSession session = Mono + .from(client.startSession(ClientSessionOptions.builder().causallyConsistent(true).build())).block(); + CountingSessionSupplier sessionSupplier = new CountingSessionSupplier(session); + + ReactiveSessionScoped sessionScoped = template.withSession(sessionSupplier); + + sessionScoped.execute(action -> action.findOne(new Query(), Document.class, "test")).blockFirst(); + assertThat(sessionSupplier.getInvocationCount()).isEqualTo(1); + + sessionScoped.execute(action -> action.findOne(new Query(), Document.class, "test")).blockFirst(); + assertThat(sessionSupplier.getInvocationCount()).isEqualTo(1); + } + + static class CountingSessionSupplier implements Supplier { + + AtomicInteger invocationCount = new AtomicInteger(0); + final ClientSession session; + + public CountingSessionSupplier(ClientSession session) { + this.session = session; + } + + @Override + public ClientSession get() { + + invocationCount.incrementAndGet(); + return session; + } + + int getInvocationCount() { + return invocationCount.get(); + } + } +} diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java index ff3a31179..06f445430 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java @@ -88,8 +88,8 @@ public class ReactiveMongoTemplateUnitTests { when(db.getCollection(any())).thenReturn(collection); when(db.getCollection(any(), any())).thenReturn(collection); when(db.runCommand(any(), any(Class.class))).thenReturn(runCommandPublisher); - when(collection.find()).thenReturn(findPublisher); - when(collection.find(Mockito.any(Document.class))).thenReturn(findPublisher); + when(collection.find(any(Class.class))).thenReturn(findPublisher); + when(collection.find(any(Document.class), any(Class.class))).thenReturn(findPublisher); when(findPublisher.projection(any())).thenReturn(findPublisher); when(findPublisher.limit(anyInt())).thenReturn(findPublisher); when(findPublisher.collation(any())).thenReturn(findPublisher); diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveSessionBoundMongoTemplateUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveSessionBoundMongoTemplateUnitTests.java new file mode 100644 index 000000000..6a0b244f2 --- /dev/null +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveSessionBoundMongoTemplateUnitTests.java @@ -0,0 +1,319 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.core; + +import static org.assertj.core.api.Assertions.*; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyBoolean; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.anyList; +import static org.mockito.Mockito.anyString; + +import java.lang.reflect.Proxy; + +import com.mongodb.reactivestreams.client.MongoClient; +import org.bson.Document; +import org.bson.codecs.BsonValueCodec; +import org.bson.codecs.configuration.CodecRegistry; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.reactivestreams.Publisher; +import org.springframework.data.geo.Metrics; +import org.springframework.data.geo.Point; +import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory; +import org.springframework.data.mongodb.core.ReactiveMongoTemplate.NoOpDbRefResolver; +import org.springframework.data.mongodb.core.ReactiveMongoTemplate.ReactiveSessionBoundMongoTemplate; +import org.springframework.data.mongodb.core.SimpleReactiveMongoDatabaseFactory.ClientSessionBoundMongoDbFactory; +import org.springframework.data.mongodb.core.aggregation.Aggregation; +import org.springframework.data.mongodb.core.convert.MappingMongoConverter; +import org.springframework.data.mongodb.core.mapping.MongoMappingContext; +import org.springframework.data.mongodb.core.query.NearQuery; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.data.mongodb.core.query.Update; + +import com.mongodb.client.model.CountOptions; +import com.mongodb.client.model.DeleteOptions; +import com.mongodb.client.model.FindOneAndUpdateOptions; +import com.mongodb.client.model.UpdateOptions; +import com.mongodb.reactivestreams.client.AggregatePublisher; +import com.mongodb.reactivestreams.client.DistinctPublisher; +import com.mongodb.reactivestreams.client.FindPublisher; +import com.mongodb.reactivestreams.client.MongoCollection; +import com.mongodb.reactivestreams.client.MongoDatabase; +import com.mongodb.session.ClientSession; + +/** + * @author Christoph Strobl + */ +@RunWith(MockitoJUnitRunner.Silent.class) +public class ReactiveSessionBoundMongoTemplateUnitTests { + + private static final String COLLECTION_NAME = "collection-1"; + + ReactiveSessionBoundMongoTemplate template; + MongoMappingContext mappingContext; + MappingMongoConverter converter; + + ReactiveMongoDatabaseFactory factory; + + @Mock MongoCollection collection; + @Mock MongoDatabase database; + @Mock ClientSession clientSession; + @Mock FindPublisher findPublisher; + @Mock AggregatePublisher aggregatePublisher; + @Mock DistinctPublisher distinctPublisher; + @Mock Publisher resultPublisher; + @Mock MongoClient client; + @Mock CodecRegistry codecRegistry; + + @Before + public void setUp() { + + when(client.getDatabase(anyString())).thenReturn(database); + when(codecRegistry.get(any(Class.class))).thenReturn(new BsonValueCodec()); + when(database.getCodecRegistry()).thenReturn(codecRegistry); + when(database.getCollection(anyString())).thenReturn(collection); + when(database.getCollection(anyString(), any())).thenReturn(collection); + when(database.listCollectionNames(any(ClientSession.class))).thenReturn(findPublisher); + when(database.createCollection(any(ClientSession.class), any(), any())).thenReturn(resultPublisher); + when(database.runCommand(any(ClientSession.class), any(), any(Class.class))).thenReturn(resultPublisher); + when(collection.find(any(ClientSession.class))).thenReturn(findPublisher); + when(collection.find(any(ClientSession.class), any(Document.class))).thenReturn(findPublisher); + when(collection.find(any(ClientSession.class), any(Class.class))).thenReturn(findPublisher); + when(collection.find(any(ClientSession.class), any(), any())).thenReturn(findPublisher); + when(collection.deleteMany(any(ClientSession.class), any(), any())).thenReturn(resultPublisher); + when(collection.insertOne(any(ClientSession.class), any(Document.class))).thenReturn(resultPublisher); + when(collection.aggregate(any(ClientSession.class), anyList(), any(Class.class))).thenReturn(aggregatePublisher); + when(collection.count(any(ClientSession.class), any(), any(CountOptions.class))).thenReturn(resultPublisher); + when(collection.drop(any(ClientSession.class))).thenReturn(resultPublisher); + when(collection.findOneAndUpdate(any(ClientSession.class), any(), any(), any())).thenReturn(resultPublisher); + when(collection.distinct(any(ClientSession.class), any(), any(), any())).thenReturn(distinctPublisher); + when(collection.updateOne(any(ClientSession.class), any(), any(), any(UpdateOptions.class))) + .thenReturn(resultPublisher); + when(collection.updateMany(any(ClientSession.class), any(), any(), any(UpdateOptions.class))) + .thenReturn(resultPublisher); + when(collection.dropIndex(any(ClientSession.class), anyString())).thenReturn(resultPublisher); + when(findPublisher.projection(any())).thenReturn(findPublisher); + when(findPublisher.limit(anyInt())).thenReturn(findPublisher); + when(findPublisher.collation(any())).thenReturn(findPublisher); + when(findPublisher.first()).thenReturn(resultPublisher); + when(aggregatePublisher.allowDiskUse(anyBoolean())).thenReturn(aggregatePublisher); + when(aggregatePublisher.useCursor(anyBoolean())).thenReturn(aggregatePublisher); + + factory = new SimpleReactiveMongoDatabaseFactory(client, "foo"); + + this.mappingContext = new MongoMappingContext(); + this.converter = new MappingMongoConverter(new NoOpDbRefResolver(), mappingContext); + this.template = new ReactiveSessionBoundMongoTemplate(clientSession, new ReactiveMongoTemplate(factory, converter)); + } + + @Test // DATAMONGO-1880 + public void executeUsesProxiedCollectionInCallback() { + + template.execute("collection", MongoCollection::find).subscribe(); + + verify(collection, never()).find(); + verify(collection).find(eq(clientSession)); + } + + @Test // DATAMONGO-1880 + public void executeUsesProxiedDatabaseInCallback() { + + template.execute(MongoDatabase::listCollectionNames).subscribe(); + + verify(database, never()).listCollectionNames(); + verify(database).listCollectionNames(eq(clientSession)); + } + + @Test // DATAMONGO-1880 + public void findOneUsesProxiedCollection() { + + template.findOne(new Query(), Person.class).subscribe(); + + verify(collection).find(eq(clientSession), any(), any()); + } + + @Test // DATAMONGO-1880 + public void findShouldUseProxiedCollection() { + + template.find(new Query(), Person.class).subscribe(); + + verify(collection).find(eq(clientSession), any(Class.class)); + } + + @Test // DATAMONGO-1880 + public void findAllShouldUseProxiedCollection() { + + template.findAll(Person.class).subscribe(); + + verify(collection).find(eq(clientSession), eq(Document.class)); + } + + @Test // DATAMONGO-1880 + public void executeCommandShouldUseProxiedDatabase() { + + template.executeCommand("{}").subscribe(); + + verify(database).runCommand(eq(clientSession), any(), any(Class.class)); + } + + @Test // DATAMONGO-1880 + public void removeShouldUseProxiedCollection() { + + template.remove(new Query(), Person.class).subscribe(); + + verify(collection).deleteMany(eq(clientSession), any(), any(DeleteOptions.class)); + } + + @Test // DATAMONGO-1880 + public void insertShouldUseProxiedCollection() { + + template.insert(new Person()).subscribe(); + + verify(collection).insertOne(eq(clientSession), any(Document.class)); + } + + @Test // DATAMONGO-1880 + public void aggregateShouldUseProxiedCollection() { + + template.aggregate(Aggregation.newAggregation(Aggregation.project("foo")), COLLECTION_NAME, Person.class) + .subscribe(); + + verify(collection).aggregate(eq(clientSession), anyList(), eq(Document.class)); + } + + @Test // DATAMONGO-1880 + public void collectionExistsShouldUseProxiedDatabase() { + + template.collectionExists(Person.class).subscribe(); + + verify(database).listCollectionNames(eq(clientSession)); + } + + @Test // DATAMONGO-1880 + public void countShouldUseProxiedCollection() { + + template.count(new Query(), Person.class).subscribe(); + + verify(collection).count(eq(clientSession), any(), any(CountOptions.class)); + } + + @Test // DATAMONGO-1880 + public void createCollectionShouldUseProxiedDatabase() { + + template.createCollection(Person.class).subscribe(); + + verify(database).createCollection(eq(clientSession), anyString(), any()); + } + + @Test // DATAMONGO-1880 + public void dropShouldUseProxiedCollection() { + + template.dropCollection(Person.class).subscribe(); + + verify(collection).drop(eq(clientSession)); + } + + @Test // DATAMONGO-1880 + public void findAndModifyShouldUseProxiedCollection() { + + template.findAndModify(new Query(), new Update().set("foo", "bar"), Person.class).subscribe(); + + verify(collection).findOneAndUpdate(eq(clientSession), any(), any(), any(FindOneAndUpdateOptions.class)); + } + + @Test // DATAMONGO-1880 + public void findDistinctShouldUseProxiedCollection() { + + template.findDistinct(new Query(), "firstName", Person.class, String.class).subscribe(); + + verify(collection).distinct(eq(clientSession), anyString(), any(), any()); + } + + @Test // DATAMONGO-1880 + public void geoNearShouldUseProxiedDatabase() { + + template.geoNear(NearQuery.near(new Point(0, 0), Metrics.NEUTRAL), Person.class).subscribe(); + + verify(database).runCommand(eq(clientSession), any(), eq(Document.class)); + } + + @Test // DATAMONGO-1880, DATAMONGO-1889 + @Ignore("No group by yet - DATAMONGO-1889") + public void groupShouldUseProxiedDatabase() { + + // template.group(COLLECTION_NAME, GroupBy.key("firstName"), Person.class).subscribe(); + + verify(database).runCommand(eq(clientSession), any(), eq(Document.class)); + } + + @Test // DATAMONGO-1880, DATAMONGO-1890 + @Ignore("No map reduce yet on template - DATAMONGO-1890") + public void mapReduceShouldUseProxiedCollection() { + + // template.mapReduce(COLLECTION_NAME, "foo", "bar", Person.class); + + verify(collection).mapReduce(eq(clientSession), anyString(), anyString(), eq(Document.class)); + } + + @Test // DATAMONGO-1880 + public void updateFirstShouldUseProxiedCollection() { + + template.updateFirst(new Query(), Update.update("foo", "bar"), Person.class).subscribe(); + + verify(collection).updateOne(eq(clientSession), any(), any(), any(UpdateOptions.class)); + } + + @Test // DATAMONGO-1880 + public void updateMultiShouldUseProxiedCollection() { + + template.updateMulti(new Query(), Update.update("foo", "bar"), Person.class).subscribe(); + + verify(collection).updateMany(eq(clientSession), any(), any(), any(UpdateOptions.class)); + } + + @Test // DATAMONGO-1880 + public void upsertShouldUseProxiedCollection() { + + template.upsert(new Query(), Update.update("foo", "bar"), Person.class).subscribe(); + + verify(collection).updateOne(eq(clientSession), any(), any(), any(UpdateOptions.class)); + } + + @Test // DATAMONGO-1880 + public void getCollectionShouldShouldJustReturnTheCollection/*No ClientSession binding*/() { + assertThat(template.getCollection(COLLECTION_NAME)).isNotInstanceOf(Proxy.class); + } + + @Test // DATAMONGO-1880 + public void getDbShouldJustReturnTheDatabase/*No ClientSession binding*/() { + assertThat(template.getMongoDatabase()).isNotInstanceOf(Proxy.class); + } + + @Test // DATAMONGO-1880 + public void indexOpsShouldUseProxiedCollection() { + + template.indexOps(COLLECTION_NAME).dropIndex("index-name").subscribe(); + + verify(collection).dropIndex(eq(clientSession), eq("index-name")); + } +} diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/SessionBoundMongoTemplateTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/SessionBoundMongoTemplateTests.java new file mode 100644 index 000000000..a9ff4353c --- /dev/null +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/SessionBoundMongoTemplateTests.java @@ -0,0 +1,298 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.core; + +import static org.assertj.core.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.*; + +import lombok.Data; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Proxy; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.aopalliance.aop.Advice; +import org.bson.Document; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.Mockito; +import org.springframework.aop.Advisor; +import org.springframework.aop.framework.Advised; +import org.springframework.dao.DataAccessException; +import org.springframework.data.annotation.Id; +import org.springframework.data.mongodb.ClientSessionException; +import org.springframework.data.mongodb.LazyLoadingException; +import org.springframework.data.mongodb.MongoDbFactory; +import org.springframework.data.mongodb.SessionAwareMethodInterceptor; +import org.springframework.data.mongodb.core.MongoTemplate.SessionBoundMongoTemplate; +import org.springframework.data.mongodb.core.aggregation.Aggregation; +import org.springframework.data.mongodb.core.convert.DbRefResolver; +import org.springframework.data.mongodb.core.convert.DefaultDbRefResolver; +import org.springframework.data.mongodb.core.convert.MappingMongoConverter; +import org.springframework.data.mongodb.core.convert.MongoConverter; +import org.springframework.data.mongodb.core.convert.MongoCustomConversions; +import org.springframework.data.mongodb.core.mapping.DBRef; +import org.springframework.data.mongodb.core.mapping.MongoMappingContext; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.data.mongodb.test.util.MongoVersionRule; +import org.springframework.data.util.Version; +import org.springframework.test.util.ReflectionTestUtils; + +import com.mongodb.ClientSessionOptions; +import com.mongodb.MongoClient; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; +import com.mongodb.session.ClientSession; + +/** + * Integration tests for {@link SessionBoundMongoTemplate} operating up an active {@link ClientSession}. + * + * @author Christoph Strobl + */ +public class SessionBoundMongoTemplateTests { + + public static @ClassRule MongoVersionRule REQUIRES_AT_LEAST_3_6_0 = MongoVersionRule.atLeast(Version.parse("3.6.0")); + + public @Rule ExpectedException exception = ExpectedException.none(); + + MongoTemplate template; + SessionBoundMongoTemplate sessionBoundTemplate; + ClientSession session; + volatile List> spiedCollections = new ArrayList<>(); + volatile List spiedDatabases = new ArrayList<>(); + + @Before + public void setUp() { + + MongoClient client = new MongoClient(); + + MongoDbFactory factory = new SimpleMongoDbFactory(client, "session-bound-mongo-template-tests") { + + @Override + public MongoDatabase getDb() throws DataAccessException { + + MongoDatabase spiedDatabse = Mockito.spy(super.getDb()); + spiedDatabases.add(spiedDatabse); + return spiedDatabse; + } + }; + + session = client.startSession(ClientSessionOptions.builder().build()); + + this.template = new MongoTemplate(factory); + + this.sessionBoundTemplate = new SessionBoundMongoTemplate(session, + new MongoTemplate(factory, getDefaultMongoConverter(factory))) { + + @Override + protected MongoCollection prepareCollection(MongoCollection collection) { + + InvocationHandler handler = Proxy.getInvocationHandler(collection); + + Advised advised = (Advised) ReflectionTestUtils.getField(handler, "advised"); + + for (Advisor advisor : advised.getAdvisors()) { + Advice advice = advisor.getAdvice(); + if (advice instanceof SessionAwareMethodInterceptor) { + + MongoCollection spiedCollection = Mockito + .spy((MongoCollection) ReflectionTestUtils.getField(advice, "target")); + spiedCollections.add(spiedCollection); + + ReflectionTestUtils.setField(advice, "target", spiedCollection); + } + } + + return super.prepareCollection(collection); + } + }; + } + + @After + public void tearDown() { + session.close(); + } + + @Test // DATAMONGO-1880 + public void findDelegatesToMethodWithSession() { + + sessionBoundTemplate.find(new Query(), Person.class); + + verify(operation(0)).find(eq(session), any(), any()); + } + + @Test // DATAMONGO-1880 + public void fluentFindDelegatesToMethodWithSession() { + + sessionBoundTemplate.query(Person.class).all(); + + verify(operation(0)).find(eq(session), any(), any()); + } + + @Test // DATAMONGO-1880 + public void aggregateDelegatesToMethoddWithSession() { + + sessionBoundTemplate.aggregate(Aggregation.newAggregation(Aggregation.project("firstName")), Person.class, + Person.class); + + verify(operation(0)).aggregate(eq(session), any(), any()); + } + + @Test // DATAMONGO-1880 + public void collectionExistsDelegatesToMethodWithSession() { + + sessionBoundTemplate.collectionExists(Person.class); + + verify(command(0)).listCollectionNames(eq(session)); + } + + @Test // DATAMONGO-1880 + public void shouldLoadDbRefWhenSessionIsActive() { + + Person person = new Person("Kylar Stern"); + + template.save(person); + + WithDbRef wdr = new WithDbRef(); + wdr.id = "id-1"; + wdr.personRef = person; + + template.save(wdr); + + WithDbRef result = sessionBoundTemplate.findById(wdr.id, WithDbRef.class); + + assertThat(result.personRef).isEqualTo(person); + } + + @Test // DATAMONGO-1880 + public void shouldErrorOnLoadDbRefWhenSessionIsClosed() { + + exception.expect(ClientSessionException.class); + + Person person = new Person("Kylar Stern"); + + template.save(person); + + WithDbRef wdr = new WithDbRef(); + wdr.id = "id-1"; + wdr.personRef = person; + + template.save(wdr); + + session.close(); + + sessionBoundTemplate.findById(wdr.id, WithDbRef.class); + } + + @Test // DATAMONGO-1880 + public void shouldLoadLazyDbRefWhenSessionIsActive() { + + Person person = new Person("Kylar Stern"); + + template.save(person); + + WithLazyDbRef wdr = new WithLazyDbRef(); + wdr.id = "id-1"; + wdr.personRef = person; + + template.save(wdr); + + WithLazyDbRef result = sessionBoundTemplate.findById(wdr.id, WithLazyDbRef.class); + + assertThat(result.getPersonRef()).isEqualTo(person); + } + + @Test // DATAMONGO-1880 + public void shouldErrorOnLoadLazyDbRefWhenSessionIsClosed() { + + exception.expect(LazyLoadingException.class); + exception.expectMessage("Invalid session state"); + + Person person = new Person("Kylar Stern"); + + template.save(person); + + WithLazyDbRef wdr = new WithLazyDbRef(); + wdr.id = "id-1"; + wdr.personRef = person; + + template.save(wdr); + + WithLazyDbRef result = null; + try { + result = sessionBoundTemplate.findById(wdr.id, WithLazyDbRef.class); + } catch (Exception e) { + fail("Someting went wrong, seems the session was already closed when reading.", e); + } + + session.close(); // now close the session + + assertThat(result.getPersonRef()).isEqualTo(person); // resolve the lazy loading proxy + } + + @Data + static class WithDbRef { + + @Id String id; + @DBRef Person personRef; + } + + @Data + static class WithLazyDbRef { + + @Id String id; + @DBRef(lazy = true) Person personRef; + + public Person getPersonRef() { + return personRef; + } + } + + // --> Just some helpers for testing + + MongoCollection operation(int index) { + return spiedCollections.get(index); + } + + MongoDatabase command(int index) { + return spiedDatabases.get(index); + } + + private MongoConverter getDefaultMongoConverter(MongoDbFactory factory) { + + DbRefResolver dbRefResolver = new DefaultDbRefResolver(factory); + MongoCustomConversions conversions = new MongoCustomConversions(Collections.emptyList()); + + MongoMappingContext mappingContext = new MongoMappingContext(); + mappingContext.setSimpleTypeHolder(conversions.getSimpleTypeHolder()); + mappingContext.afterPropertiesSet(); + + MappingMongoConverter converter = new MappingMongoConverter(dbRefResolver, mappingContext); + converter.setCustomConversions(conversions); + converter.afterPropertiesSet(); + + return converter; + } + +} diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/SessionBoundMongoTemplateUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/SessionBoundMongoTemplateUnitTests.java new file mode 100644 index 000000000..aeced5321 --- /dev/null +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/SessionBoundMongoTemplateUnitTests.java @@ -0,0 +1,358 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.core; + +import static org.mockito.Mockito.*; +import static org.springframework.data.mongodb.test.util.Assertions.*; + +import java.lang.reflect.Proxy; +import java.util.Collections; + +import org.bson.Document; +import org.bson.codecs.BsonValueCodec; +import org.bson.codecs.configuration.CodecRegistry; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.springframework.data.geo.Metrics; +import org.springframework.data.geo.Point; +import org.springframework.data.mongodb.MongoDbFactory; +import org.springframework.data.mongodb.core.BulkOperations.BulkMode; +import org.springframework.data.mongodb.core.MongoTemplate.SessionBoundMongoTemplate; +import org.springframework.data.mongodb.core.aggregation.Aggregation; +import org.springframework.data.mongodb.core.convert.DefaultDbRefResolver; +import org.springframework.data.mongodb.core.convert.MappingMongoConverter; +import org.springframework.data.mongodb.core.mapping.MongoMappingContext; +import org.springframework.data.mongodb.core.mapreduce.GroupBy; +import org.springframework.data.mongodb.core.query.NearQuery; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.data.mongodb.core.query.Update; + +import com.mongodb.MongoClient; +import com.mongodb.client.AggregateIterable; +import com.mongodb.client.DistinctIterable; +import com.mongodb.client.FindIterable; +import com.mongodb.client.MapReduceIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.MongoIterable; +import com.mongodb.client.model.CountOptions; +import com.mongodb.client.model.DeleteOptions; +import com.mongodb.client.model.FindOneAndUpdateOptions; +import com.mongodb.client.model.UpdateOptions; +import com.mongodb.session.ClientSession; + +/** + * Unit test for {@link SessionBoundMongoTemplate} making sure a proxied {@link MongoCollection} and + * {@link MongoDatabase} is used for executing high level commands like {@link MongoOperations#find(Query, Class)} + * provided by Spring Data. Those commands simply handing over MongoDB base types for interaction like when obtaining a + * {@link MongoCollection} via {@link MongoOperations#getCollection(String)} shall not be proxied as the user can + * control the behavior by using the methods dedicated for {@link ClientSession} directly. + * + * @author Christoph Strobl + */ +@RunWith(MockitoJUnitRunner.Silent.class) +public class SessionBoundMongoTemplateUnitTests { + + private static final String COLLECTION_NAME = "collection-1"; + + SessionBoundMongoTemplate template; + + MongoDbFactory factory; + + @Mock MongoCollection collection; + @Mock MongoDatabase database; + @Mock MongoClient client; + @Mock ClientSession clientSession; + @Mock FindIterable findIterable; + @Mock MongoIterable mongoIterable; + @Mock DistinctIterable distinctIterable; + @Mock AggregateIterable aggregateIterable; + @Mock MapReduceIterable mapReduceIterable; + @Mock MongoCursor cursor; + @Mock CodecRegistry codecRegistry; + + MappingMongoConverter converter; + MongoMappingContext mappingContext; + + @Before + public void setUp() { + + when(client.getDatabase(anyString())).thenReturn(database); + when(codecRegistry.get(any(Class.class))).thenReturn(new BsonValueCodec()); + when(database.getCodecRegistry()).thenReturn(codecRegistry); + when(database.getCollection(anyString(), any())).thenReturn(collection); + when(database.listCollectionNames(any(ClientSession.class))).thenReturn(mongoIterable); + when(collection.find(any(ClientSession.class), any(), any())).thenReturn(findIterable); + when(collection.aggregate(any(ClientSession.class), anyList(), any())).thenReturn(aggregateIterable); + when(collection.distinct(any(ClientSession.class), any(), any(), any())).thenReturn(distinctIterable); + when(collection.mapReduce(any(ClientSession.class), any(), any(), any())).thenReturn(mapReduceIterable); + when(findIterable.iterator()).thenReturn(cursor); + when(aggregateIterable.collation(any())).thenReturn(aggregateIterable); + when(aggregateIterable.allowDiskUse(anyBoolean())).thenReturn(aggregateIterable); + when(aggregateIterable.batchSize(anyInt())).thenReturn(aggregateIterable); + when(aggregateIterable.map(any())).thenReturn(aggregateIterable); + when(aggregateIterable.useCursor(anyBoolean())).thenReturn(aggregateIterable); + when(aggregateIterable.into(any())).thenReturn(Collections.emptyList()); + when(mongoIterable.iterator()).thenReturn(cursor); + when(distinctIterable.map(any())).thenReturn(distinctIterable); + when(distinctIterable.into(any())).thenReturn(Collections.emptyList()); + when(mapReduceIterable.sort(any())).thenReturn(mapReduceIterable); + when(mapReduceIterable.filter(any())).thenReturn(mapReduceIterable); + when(mapReduceIterable.map(any())).thenReturn(mapReduceIterable); + when(mapReduceIterable.iterator()).thenReturn(cursor); + when(cursor.hasNext()).thenReturn(false); + when(findIterable.projection(any())).thenReturn(findIterable); + + factory = new SimpleMongoDbFactory(client, "foo"); + + this.mappingContext = new MongoMappingContext(); + this.converter = new MappingMongoConverter(new DefaultDbRefResolver(factory), mappingContext); + this.template = new SessionBoundMongoTemplate(clientSession, new MongoTemplate(factory, converter)); + } + + @Test // DATAMONGO-1880 + public void executeUsesProxiedCollectionInCallback() { + + template.execute("collection", MongoCollection::find); + + verify(collection, never()).find(); + verify(collection).find(eq(clientSession)); + } + + @Test // DATAMONGO-1880 + public void executeUsesProxiedDatabaseInCallback() { + + template.execute(MongoDatabase::listCollectionNames); + + verify(database, never()).listCollectionNames(); + verify(database).listCollectionNames(eq(clientSession)); + } + + @Test // DATAMONGO-1880 + public void findOneUsesProxiedCollection() { + + template.findOne(new Query(), Person.class); + + verify(collection).find(eq(clientSession), any(), any()); + } + + @Test // DATAMONGO-1880 + public void findShouldUseProxiedCollection() { + + template.find(new Query(), Person.class); + + verify(collection).find(eq(clientSession), any(), any()); + } + + @Test // DATAMONGO-1880 + public void findAllShouldUseProxiedCollection() { + + template.findAll(Person.class); + + verify(collection).find(eq(clientSession), any(), any()); + } + + @Test // DATAMONGO-1880 + public void executeCommandShouldUseProxiedDatabase() { + + template.executeCommand("{}"); + + verify(database).runCommand(eq(clientSession), any(), any(Class.class)); + } + + @Test // DATAMONGO-1880 + public void removeShouldUseProxiedCollection() { + + template.remove(new Query(), Person.class); + + verify(collection).deleteMany(eq(clientSession), any(), any(DeleteOptions.class)); + } + + @Test // DATAMONGO-1880 + public void insertShouldUseProxiedCollection() { + + template.insert(new Person()); + + verify(collection).insertOne(eq(clientSession), any(Document.class)); + } + + @Test // DATAMONGO-1880 + public void aggregateShouldUseProxiedCollection() { + + template.aggregate(Aggregation.newAggregation(Aggregation.project("foo")), COLLECTION_NAME, Person.class); + + verify(collection).aggregate(eq(clientSession), anyList(), eq(Document.class)); + } + + @Test // DATAMONGO-1880 + public void aggregateStreamShouldUseProxiedCollection() { + + template.aggregateStream(Aggregation.newAggregation(Aggregation.project("foo")), COLLECTION_NAME, Person.class); + + verify(collection).aggregate(eq(clientSession), anyList(), eq(Document.class)); + } + + @Test // DATAMONGO-1880 + public void collectionExistsShouldUseProxiedDatabase() { + + template.collectionExists(Person.class); + + verify(database).listCollectionNames(eq(clientSession)); + } + + @Test // DATAMONGO-1880 + public void countShouldUseProxiedCollection() { + + template.count(new Query(), Person.class); + + verify(collection).count(eq(clientSession), any(), any(CountOptions.class)); + } + + @Test // DATAMONGO-1880 + public void createCollectionShouldUseProxiedDatabase() { + + template.createCollection(Person.class); + + verify(database).createCollection(eq(clientSession), anyString(), any()); + } + + @Test // DATAMONGO-1880 + public void dropShouldUseProxiedCollection() { + + template.dropCollection(Person.class); + + verify(collection).drop(eq(clientSession)); + } + + @Test // DATAMONGO-1880 + public void findAndModifyShouldUseProxiedCollection() { + + template.findAndModify(new Query(), new Update().set("foo", "bar"), Person.class); + + verify(collection).findOneAndUpdate(eq(clientSession), any(), any(), any(FindOneAndUpdateOptions.class)); + } + + @Test // DATAMONGO-1880 + public void findDistinctShouldUseProxiedCollection() { + + template.findDistinct(new Query(), "firstName", Person.class, String.class); + + verify(collection).distinct(eq(clientSession), anyString(), any(), any()); + } + + @Test // DATAMONGO-1880 + public void geoNearShouldUseProxiedDatabase() { + + when(database.runCommand(any(ClientSession.class), any(), eq(Document.class))) + .thenReturn(new Document("results", Collections.emptyList())); + template.geoNear(NearQuery.near(new Point(0, 0), Metrics.NEUTRAL), Person.class); + + verify(database).runCommand(eq(clientSession), any(), eq(Document.class)); + } + + @Test // DATAMONGO-1880 + public void groupShouldUseProxiedDatabase() { + + when(database.runCommand(any(ClientSession.class), any(), eq(Document.class))) + .thenReturn(new Document("retval", Collections.emptyList())); + + template.group(COLLECTION_NAME, GroupBy.key("firstName"), Person.class); + + verify(database).runCommand(eq(clientSession), any(), eq(Document.class)); + } + + @Test // DATAMONGO-1880 + public void mapReduceShouldUseProxiedCollection() { + + template.mapReduce(COLLECTION_NAME, "foo", "bar", Person.class); + + verify(collection).mapReduce(eq(clientSession), anyString(), anyString(), eq(Document.class)); + } + + @Test // DATAMONGO-1880 + public void streamShouldUseProxiedCollection() { + + template.stream(new Query(), Person.class); + + verify(collection).find(eq(clientSession), any(), eq(Document.class)); + } + + @Test // DATAMONGO-1880 + public void updateFirstShouldUseProxiedCollection() { + + template.updateFirst(new Query(), Update.update("foo", "bar"), Person.class); + + verify(collection).updateOne(eq(clientSession), any(), any(), any(UpdateOptions.class)); + } + + @Test // DATAMONGO-1880 + public void updateMultiShouldUseProxiedCollection() { + + template.updateMulti(new Query(), Update.update("foo", "bar"), Person.class); + + verify(collection).updateMany(eq(clientSession), any(), any(), any(UpdateOptions.class)); + } + + @Test // DATAMONGO-1880 + public void upsertShouldUseProxiedCollection() { + + template.upsert(new Query(), Update.update("foo", "bar"), Person.class); + + verify(collection).updateOne(eq(clientSession), any(), any(), any(UpdateOptions.class)); + } + + @Test // DATAMONGO-1880 + public void getCollectionShouldShouldJustReturnTheCollection/*No ClientSession binding*/() { + assertThat(template.getCollection(COLLECTION_NAME)).isNotInstanceOf(Proxy.class); + } + + @Test // DATAMONGO-1880 + public void getDbShouldJustReturnTheDatabase/*No ClientSession binding*/() { + assertThat(template.getDb()).isNotInstanceOf(Proxy.class); + } + + @Test // DATAMONGO-1880 + public void indexOpsShouldUseProxiedCollection() { + + template.indexOps(COLLECTION_NAME).dropIndex("index-name"); + + verify(collection).dropIndex(eq(clientSession), eq("index-name")); + } + + @Test // DATAMONGO-1880 + public void bulkOpsShouldUseProxiedCollection() { + + BulkOperations bulkOps = template.bulkOps(BulkMode.ORDERED, COLLECTION_NAME); + bulkOps.insert(new Document()); + + bulkOps.execute(); + + verify(collection).bulkWrite(eq(clientSession), anyList(), any()); + } + + @Test // DATAMONGO-1880 + public void scriptOpsShouldUseProxiedDatabase() { + + when(database.runCommand(eq(clientSession), any())).thenReturn(new Document("retval", new Object())); + template.scriptOps().call("W-O-P-R"); + + verify(database).runCommand(eq(clientSession), any()); + } +} diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/ReactiveAggregationUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/ReactiveAggregationUnitTests.java index 2c4ecad13..1a3aa210f 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/ReactiveAggregationUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/aggregation/ReactiveAggregationUnitTests.java @@ -59,8 +59,8 @@ public class ReactiveAggregationUnitTests { template = new ReactiveMongoTemplate(factory); when(mongoClient.getDatabase("db")).thenReturn(db); - when(db.getCollection(INPUT_COLLECTION)).thenReturn(collection); - when(collection.aggregate(any())).thenReturn(publisher); + when(db.getCollection(eq(INPUT_COLLECTION), any(Class.class))).thenReturn(collection); + when(collection.aggregate(anyList(), any(Class.class))).thenReturn(publisher); when(publisher.allowDiskUse(any())).thenReturn(publisher); when(publisher.useCursor(any())).thenReturn(publisher); when(publisher.collation(any())).thenReturn(publisher); diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/convert/DefaultDbRefResolverUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/convert/DefaultDbRefResolverUnitTests.java index 00117f0e7..c01adb9e5 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/convert/DefaultDbRefResolverUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/convert/DefaultDbRefResolverUnitTests.java @@ -16,7 +16,8 @@ package org.springframework.data.mongodb.core.convert; import static org.assertj.core.api.Assertions.*; -import static org.mockito.ArgumentMatchers.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.*; import java.util.Arrays; @@ -59,8 +60,8 @@ public class DefaultDbRefResolverUnitTests { public void setUp() { when(factoryMock.getDb()).thenReturn(dbMock); - when(dbMock.getCollection(anyString())).thenReturn(collectionMock); - when(collectionMock.find(Mockito.any(Document.class))).thenReturn(cursorMock); + when(dbMock.getCollection(anyString(), any(Class.class))).thenReturn(collectionMock); + when(collectionMock.find(any(Document.class))).thenReturn(cursorMock); resolver = new DefaultDbRefResolver(factoryMock); } diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/index/MongoPersistentEntityIndexCreatorUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/index/MongoPersistentEntityIndexCreatorUnitTests.java index 1ce59c964..b64729597 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/index/MongoPersistentEntityIndexCreatorUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/index/MongoPersistentEntityIndexCreatorUnitTests.java @@ -66,7 +66,6 @@ import com.mongodb.client.model.IndexOptions; public class MongoPersistentEntityIndexCreatorUnitTests { private @Mock MongoDbFactory factory; - private @Mock ApplicationContext context; private @Mock MongoDatabase db; private @Mock MongoCollection collection; private MongoTemplate mongoTemplate; @@ -84,7 +83,7 @@ public class MongoPersistentEntityIndexCreatorUnitTests { when(factory.getDb()).thenReturn(db); when(factory.getExceptionTranslator()).thenReturn(new MongoExceptionTranslator()); - when(db.getCollection(collectionCaptor.capture())).thenReturn(collection); + when(db.getCollection(collectionCaptor.capture(), Mockito.eq(org.bson.Document.class))).thenReturn((MongoCollection) collection); mongoTemplate = new MongoTemplate(factory); @@ -192,7 +191,7 @@ public class MongoPersistentEntityIndexCreatorUnitTests { ArgumentCaptor collectionNameCapturer = ArgumentCaptor.forClass(String.class); - verify(db, times(1)).getCollection(collectionNameCapturer.capture()); + verify(db, times(1)).getCollection(collectionNameCapturer.capture(), Mockito.any()); assertThat(collectionNameCapturer.getValue(), equalTo("wrapper")); } @@ -204,14 +203,13 @@ public class MongoPersistentEntityIndexCreatorUnitTests { ArgumentCaptor collectionNameCapturer = ArgumentCaptor.forClass(String.class); - verify(db, times(1)).getCollection(collectionNameCapturer.capture()); + verify(db, times(1)).getCollection(collectionNameCapturer.capture(), Mockito.any()); assertThat(collectionNameCapturer.getValue(), equalTo("indexedDocumentWrapper")); } @Test(expected = DataAccessException.class) // DATAMONGO-1125 public void createIndexShouldUsePersistenceExceptionTranslatorForNonDataIntegrityConcerns() { - when(factory.getExceptionTranslator()).thenReturn(new MongoExceptionTranslator()); doThrow(new MongoException(6, "HostUnreachable")).when(collection).createIndex(Mockito.any(org.bson.Document.class), Mockito.any(IndexOptions.class)); @@ -223,7 +221,6 @@ public class MongoPersistentEntityIndexCreatorUnitTests { @Test(expected = ClassCastException.class) // DATAMONGO-1125 public void createIndexShouldNotConvertUnknownExceptionTypes() { - when(factory.getExceptionTranslator()).thenReturn(new MongoExceptionTranslator()); doThrow(new ClassCastException("o_O")).when(collection).createIndex(Mockito.any(org.bson.Document.class), Mockito.any(IndexOptions.class)); diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/performance/ReactivePerformanceTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/performance/ReactivePerformanceTests.java index 37951e21b..7fecca17a 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/performance/ReactivePerformanceTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/performance/ReactivePerformanceTests.java @@ -74,7 +74,7 @@ public class ReactivePerformanceTests { private static final StopWatch watch = new StopWatch(); private static final Collection IGNORED_WRITE_CONCERNS = Arrays.asList("MAJORITY", "REPLICAS_SAFE", "FSYNC_SAFE", "FSYNCED", "JOURNAL_SAFE", "JOURNALED", "REPLICA_ACKNOWLEDGED"); - private static final int COLLECTION_SIZE = 1024-2018 * 1024-2018 * 256; // 256 MB + private static final int COLLECTION_SIZE = 1024 - 2018 * 1024 - 2018 * 256; // 256 MB private static final Collection COLLECTION_NAMES = Arrays.asList("template", "driver", "person"); MongoClient mongo; @@ -96,8 +96,8 @@ public class ReactivePerformanceTests { converter = new MappingMongoConverter(new DbRefResolver() { @Override - public Object resolveDbRef(MongoPersistentProperty property, DBRef dbref, - DbRefResolverCallback callback, DbRefProxyHandler proxyHandler) { + public Object resolveDbRef(MongoPersistentProperty property, DBRef dbref, DbRefResolverCallback callback, + DbRefProxyHandler proxyHandler) { return null; } diff --git a/src/main/asciidoc/index.adoc b/src/main/asciidoc/index.adoc index 4166c6eff..155dccdf3 100644 --- a/src/main/asciidoc/index.adoc +++ b/src/main/asciidoc/index.adoc @@ -27,6 +27,7 @@ include::{spring-data-commons-docs}/repositories.adoc[] include::reference/introduction.adoc[] include::reference/mongodb.adoc[] include::reference/reactive-mongodb.adoc[] +include::reference/client-session.adoc[] include::reference/mongo-repositories.adoc[] include::reference/reactive-mongo-repositories.adoc[] include::{spring-data-commons-docs}/auditing.adoc[] diff --git a/src/main/asciidoc/new-features.adoc b/src/main/asciidoc/new-features.adoc index d7628f27a..f248ca094 100644 --- a/src/main/asciidoc/new-features.adoc +++ b/src/main/asciidoc/new-features.adoc @@ -9,6 +9,7 @@ * <> for queries and collection creation. * <> for imperative and reactive drivers. * Tailable cursors for imperative driver. +* <> support for the imperative and reactive Template API. [[new-features.2-0-0]] == What's new in Spring Data MongoDB 2.0 diff --git a/src/main/asciidoc/reference/client-session.adoc b/src/main/asciidoc/reference/client-session.adoc new file mode 100644 index 000000000..6cf23f921 --- /dev/null +++ b/src/main/asciidoc/reference/client-session.adoc @@ -0,0 +1,79 @@ +[[mongo.sessions]] += MongoDB Sessions + +As of version 3.6 MongoDB supports a concept of Sessions. The use of sessions enables MongoDBs https://docs.mongodb.com/manual/core/read-isolation-consistency-recency/#causal-consistency[Causal Consistency] model guaranteeing to execute operations in an order that respect their causal relationships. Those are split into ``ServerSession``s and ``ClientSession``s. In the following when we speak of session we refer to `ClientSession`. + +WARNING: Operations within a client session are not isolated from operations outside the session. + +Both `MongoOperations` and `ReactiveMongoOperations` provide gateway methods for tying a `ClientSession` to the operations themselves. Within the callback all operations on `MongoCollection` and `MongoDatabase` are called with the provided session via a `Proxy` without the need to add it manually. This means that a potential call to `MongoCollection#find()` is delegated to `MongoCollection#find(ClientSession)`. + +NOTE: Methods like `(Reactive)MongoOperations#getCollection` returning native MongoDB java driver gateway objects, such as `MongoCollection`, that themselves offer dedicated methods for `ClientSession` will *NOT* be wrapped by the `Proxy`. So please make sure to provide the `ClientSession` where needed when interacting directly with a `MongoCollection` or `MongoDatabase` and not via one of the `#excute` callbacks on `MongoOperations`. + +.ClientSession with MongoOperations. +==== +[source,java] +---- +ClientSessionOptions sessionOptions = ClientSessionOptions.builder() + .causallyConsistent(true) + .build(); + +ClientSession session = client.startSession(sessionOptions); <1> + +template.withSession(() -> session) + .execute(action -> { + + Query query = query(where("name").is("Durzo Blint")); + Person durzo = action.findOne(query, Person.class); <2> + + Person azoth = new Person("Kylar Stern"); + azoth.setMaster(durzo); + + action.insert(azoth); <2> + + return azoth; + }); + +session.close() <4> +---- +<1> Obtain a new session from the server. +<2> Use `MongoOperation` methods as before. The `ClientSession` gets applied automatically. +<3> Important! Do not forget to close the session. +==== + +WARNING: When dealing with ``DBRef``s, especially lazily loaded ones, it is essential to **not** close the `ClientSession` before all data is loaded. + +The reactive counterpart uses the very same building blocks as the imperative one. + +.ClientSession with ReactiveMongoOperations. +==== +[source,java] +---- +ClientSessionOptions sessionOptions = ClientSessionOptions.builder() + .causallyConsistent(true) + .build(); + +Publisher session = client.startSession(sessionOptions); <1> + +template.withSession(session) + .execute(action -> { + + Query query = query(where("name").is("Durzo Blint")); + return action.findOne(query, Person.class) + .flatMap(durzo -> { + + Person azoth = new Person("Kylar Stern"); + azoth.setMaster(durzo); + + return action.insert(azoth); <2> + }); + }, ClientSession::close) <4> + .subscribe(); +---- +<1> Obtain a `Publisher` for new session retrieval. +<2> Use `MongoOperation` methods as before. The `ClientSession` is obtained and applied automatically. +<3> Important! Do not forget to close the session. +==== + +By using a `Publisher` providing the actual session you can defer session acquisition to the point of actual subscription. +Still you need to close the session when done in order to not pollute the server with stale sessions. Use the `doFinally` hook on `execute` to call `ClientSession#close()` when you don't need the session any more. +In case you prefer having more control over the session itself, you can always obtain the `ClientSession` via the driver and provide it via a `Supplier`. \ No newline at end of file