Browse Source
We now support ClientSession via MongoOperations and ReactiveMongoOperations. Client sessions introduce causal consistency and retryable writes. A client Session can be either provided by application code or managed by specifying ClientSessionOptions. Binding a ClientSession via MongoOperations.withSession(…) provides access to a Session-bound MongoOperations instance that associates the session with each MongoDB operation.
ClientSession support applies only to MongoOperations and ReactiveMongoOperations and is not yet available via repositories.
ClientSession session = client.startSession(ClientSessionOptions.builder().causallyConsistent(true).build());
Person person = template.withSession(() -> session)
.execute(action -> {
action.insert(new Person("wohoo"));
return action.findOne(query(where("id").is("wohoo")), Person.class);
});
session.close();
Original pull request: #536.
pull/536/merge
38 changed files with 2723 additions and 134 deletions
@ -0,0 +1,48 @@
@@ -0,0 +1,48 @@
|
||||
/* |
||||
* Copyright 2018 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb; |
||||
|
||||
import org.springframework.dao.NonTransientDataAccessException; |
||||
import org.springframework.lang.Nullable; |
||||
|
||||
/** |
||||
* {@link NonTransientDataAccessException} specific to MongoDB {@link com.mongodb.session.ClientSession} related data |
||||
* access failures such as reading data using an already closed session. |
||||
* |
||||
* @author Christoph Strobl |
||||
* @since 2.1 |
||||
*/ |
||||
public class ClientSessionException extends NonTransientDataAccessException { |
||||
|
||||
/** |
||||
* Constructor for {@link ClientSessionException}. |
||||
* |
||||
* @param msg the detail message. Must not be {@literal null}. |
||||
*/ |
||||
public ClientSessionException(String msg) { |
||||
super(msg); |
||||
} |
||||
|
||||
/** |
||||
* Constructor for {@link ClientSessionException}. |
||||
* |
||||
* @param msg the detail message. Can be {@literal null}. |
||||
* @param cause the root cause. Can be {@literal null}. |
||||
*/ |
||||
public ClientSessionException(@Nullable String msg, @Nullable Throwable cause) { |
||||
super(msg, cause); |
||||
} |
||||
} |
||||
@ -0,0 +1,173 @@
@@ -0,0 +1,173 @@
|
||||
/* |
||||
* Copyright 2018 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb; |
||||
|
||||
import java.lang.reflect.Method; |
||||
import java.lang.reflect.Proxy; |
||||
import java.util.Optional; |
||||
import java.util.function.BiFunction; |
||||
|
||||
import org.aopalliance.intercept.MethodInterceptor; |
||||
import org.aopalliance.intercept.MethodInvocation; |
||||
import org.springframework.core.MethodClassKey; |
||||
import org.springframework.lang.Nullable; |
||||
import org.springframework.util.ClassUtils; |
||||
import org.springframework.util.ConcurrentReferenceHashMap; |
||||
import org.springframework.util.ObjectUtils; |
||||
import org.springframework.util.ReflectionUtils; |
||||
|
||||
import com.mongodb.WriteConcern; |
||||
import com.mongodb.session.ClientSession; |
||||
|
||||
/** |
||||
* {@link MethodInterceptor} implementation looking up and invoking an alternative target method having |
||||
* {@link ClientSession} as its first argument. This allows seamless integration with the existing code base. |
||||
* <p /> |
||||
* The {@link MethodInterceptor} is aware of methods on {@code MongoCollection} that my return new instances of itself |
||||
* like (eg. {@link com.mongodb.reactivestreams.client.MongoCollection#withWriteConcern(WriteConcern)} and decorate them |
||||
* if not already proxied. |
||||
* |
||||
* @since 2.1 |
||||
*/ |
||||
public class SessionAwareMethodInterceptor<D, C> implements MethodInterceptor { |
||||
|
||||
private static final MethodCache METHOD_CACHE = new MethodCache(); |
||||
|
||||
private final ClientSession session; |
||||
private final BiFunction collectionDecorator; |
||||
private final BiFunction databaseDecorator; |
||||
private final Object target; |
||||
private final Class<?> targetType; |
||||
private final Class<?> collectionType; |
||||
private final Class<?> databaseType; |
||||
|
||||
/** |
||||
* Create a new SessionAwareMethodInterceptor for given target. |
||||
* |
||||
* @param session the {@link ClientSession} to be used on invocation. |
||||
* @param target the original target object. |
||||
* @param databaseType the MongoDB database type |
||||
* @param databaseDecorator a {@link BiFunction} used to create the proxy for an imperative / reactive |
||||
* {@code MongoDatabase}. |
||||
* @param collectionType the MongoDB collection type. |
||||
* @param collectionCallback a {@link BiFunction} used to create the proxy for an imperative / reactive |
||||
* {@code MongoCollection}. |
||||
* @param <T> |
||||
*/ |
||||
public <T> SessionAwareMethodInterceptor(ClientSession session, T target, Class<D> databaseType, |
||||
BiFunction<ClientSession, D, D> databaseDecorator, Class<C> collectionType, |
||||
BiFunction<ClientSession, C, C> collectionDecorator) { |
||||
|
||||
this.session = session; |
||||
this.target = target; |
||||
this.databaseType = ClassUtils.getUserClass(databaseType); |
||||
this.collectionType = ClassUtils.getUserClass(collectionType); |
||||
this.collectionDecorator = collectionDecorator; |
||||
this.databaseDecorator = databaseDecorator; |
||||
|
||||
this.targetType = ClassUtils.isAssignable(databaseType, target.getClass()) ? databaseType : collectionType; |
||||
} |
||||
|
||||
/* |
||||
* (non-Javadoc) |
||||
* @see org.aopalliance.intercept.MethodInterceptor(org.aopalliance.intercept.MethodInvocation) |
||||
*/ |
||||
@Override |
||||
public Object invoke(MethodInvocation methodInvocation) throws Throwable { |
||||
|
||||
if (requiresDecoration(methodInvocation)) { |
||||
|
||||
Object target = methodInvocation.proceed(); |
||||
if (target instanceof Proxy) { |
||||
return target; |
||||
} |
||||
|
||||
return decorate(target); |
||||
} |
||||
|
||||
if (!requiresSession(methodInvocation)) { |
||||
return methodInvocation.proceed(); |
||||
} |
||||
|
||||
Optional<Method> targetMethod = METHOD_CACHE.lookup(methodInvocation.getMethod(), targetType); |
||||
|
||||
return !targetMethod.isPresent() ? methodInvocation.proceed() |
||||
: ReflectionUtils.invokeMethod(targetMethod.get(), target, prependSessionToArguments(methodInvocation)); |
||||
} |
||||
|
||||
private boolean requiresDecoration(MethodInvocation methodInvocation) { |
||||
|
||||
return ClassUtils.isAssignable(databaseType, methodInvocation.getMethod().getReturnType()) |
||||
|| ClassUtils.isAssignable(collectionType, methodInvocation.getMethod().getReturnType()); |
||||
} |
||||
|
||||
protected Object decorate(Object target) { |
||||
|
||||
return ClassUtils.isAssignable(databaseType, target.getClass()) ? databaseDecorator.apply(session, target) |
||||
: collectionDecorator.apply(session, target); |
||||
} |
||||
|
||||
private boolean requiresSession(MethodInvocation methodInvocation) { |
||||
|
||||
if (ObjectUtils.isEmpty(methodInvocation.getMethod().getParameterTypes()) |
||||
|| !ClassUtils.isAssignable(ClientSession.class, methodInvocation.getMethod().getParameterTypes()[0])) { |
||||
return true; |
||||
} |
||||
|
||||
return false; |
||||
} |
||||
|
||||
private Object[] prependSessionToArguments(MethodInvocation invocation) { |
||||
|
||||
Object[] args = new Object[invocation.getArguments().length + 1]; |
||||
args[0] = session; |
||||
System.arraycopy(invocation.getArguments(), 0, args, 1, invocation.getArguments().length); |
||||
return args; |
||||
} |
||||
|
||||
/** |
||||
* Simple {@link Method} to {@link Method} caching facility for {@link ClientSession} overloaded targets. |
||||
* |
||||
* @since 2.1 |
||||
* @author Christoph Strobl |
||||
*/ |
||||
static class MethodCache { |
||||
|
||||
private final ConcurrentReferenceHashMap<MethodClassKey, Optional<Method>> cache = new ConcurrentReferenceHashMap<>(); |
||||
|
||||
Optional<Method> lookup(Method method, Class<?> targetClass) { |
||||
|
||||
return cache.computeIfAbsent(new MethodClassKey(method, targetClass), |
||||
val -> Optional.ofNullable(findTargetWithSession(method, targetClass))); |
||||
} |
||||
|
||||
@Nullable |
||||
private Method findTargetWithSession(Method sourceMethod, Class<?> targetType) { |
||||
|
||||
Class<?>[] argTypes = sourceMethod.getParameterTypes(); |
||||
Class<?>[] args = new Class<?>[argTypes.length + 1]; |
||||
args[0] = ClientSession.class; |
||||
System.arraycopy(argTypes, 0, args, 1, argTypes.length); |
||||
|
||||
return ReflectionUtils.findMethod(targetType, sourceMethod.getName(), args); |
||||
} |
||||
|
||||
boolean contains(Method method, Class<?> targetClass) { |
||||
return cache.containsKey(new MethodClassKey(method, targetClass)); |
||||
} |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,45 @@
@@ -0,0 +1,45 @@
|
||||
/* |
||||
* Copyright 2018 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb.core; |
||||
|
||||
import org.reactivestreams.Publisher; |
||||
import org.springframework.data.mongodb.core.query.Query; |
||||
|
||||
/** |
||||
* Callback interface for executing operations within a {@link com.mongodb.session.ClientSession} reactively. |
||||
* |
||||
* @author Christoph Strobl |
||||
* @since 2.1 |
||||
*/ |
||||
@FunctionalInterface |
||||
public interface ReactiveSessionCallback<T> { |
||||
|
||||
/** |
||||
* Execute operations against a MongoDB instance via session bound {@link ReactiveMongoOperations}. The session is |
||||
* inferred directly into the operation so that no further interaction is necessary. |
||||
* <p /> |
||||
* Please note that only Spring Data specific abstractions like {@link ReactiveMongoOperations#find(Query, Class)} and |
||||
* others are enhanced with the {@link com.mongodb.session.ClientSession}. When obtaining plain MongoDB gateway |
||||
* objects like {@link com.mongodb.reactivestreams.client.MongoCollection} or |
||||
* {@link om.mongodb.reactivestreams.client.MongoDatabase} via eg. |
||||
* {@link ReactiveMongoOperations#getCollection(String)} we leave responsibility for |
||||
* {@link com.mongodb.session.ClientSession} again up to the caller. |
||||
* |
||||
* @param operations will never be {@literal null}. |
||||
* @return can be {@literal null}. |
||||
*/ |
||||
Publisher<T> doInSession(ReactiveMongoOperations operations); |
||||
} |
||||
@ -0,0 +1,59 @@
@@ -0,0 +1,59 @@
|
||||
/* |
||||
* Copyright 2018 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb.core; |
||||
|
||||
import reactor.core.publisher.Flux; |
||||
|
||||
import java.util.function.Consumer; |
||||
|
||||
import com.mongodb.session.ClientSession; |
||||
|
||||
/** |
||||
* Gateway interface to execute {@link ClientSession} bound operations against MongoDB via a |
||||
* {@link ReactiveSessionCallback}. |
||||
* |
||||
* @author Christoph Strobl |
||||
* @since 2.1 |
||||
*/ |
||||
public interface ReactiveSessionScoped { |
||||
|
||||
/** |
||||
* Executes the given {@link ReactiveSessionCallback} within the {@link com.mongodb.session.ClientSession} |
||||
* <p/> |
||||
* It is up to the caller to make sure the {@link com.mongodb.session.ClientSession} is {@link ClientSession#close() |
||||
* closed} when done. |
||||
* |
||||
* @param action callback object that specifies the MongoDB action the callback action. Must not be {@literal null}. |
||||
* @param <T> return type. |
||||
* @return a result object returned by the action. Can be {@literal null}. |
||||
*/ |
||||
default <T> Flux<T> execute(ReactiveSessionCallback<T> action) { |
||||
return execute(action, (session) -> {}); |
||||
} |
||||
|
||||
/** |
||||
* Executes the given {@link ReactiveSessionCallback} within the {@link com.mongodb.session.ClientSession} |
||||
* <p/> |
||||
* It is up to the caller to make sure the {@link com.mongodb.session.ClientSession} is {@link ClientSession#close() |
||||
* closed} when done. |
||||
* |
||||
* @param action callback object that specifies the MongoDB action the callback action. Must not be {@literal null}. |
||||
* @param doFinally |
||||
* @param <T> return type. |
||||
* @return a result object returned by the action. Can be {@literal null}. |
||||
*/ |
||||
<T> Flux<T> execute(ReactiveSessionCallback<T> action, Consumer<ClientSession> doFinally); |
||||
} |
||||
@ -0,0 +1,44 @@
@@ -0,0 +1,44 @@
|
||||
/* |
||||
* Copyright 2018 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb.core; |
||||
|
||||
import org.springframework.data.mongodb.core.query.Query; |
||||
import org.springframework.lang.Nullable; |
||||
|
||||
/** |
||||
* Callback interface for executing operations within a {@link com.mongodb.session.ClientSession}. |
||||
* |
||||
* @author Christoph Strobl |
||||
* @since 2.1 |
||||
*/ |
||||
public interface SessionCallback<T> { |
||||
|
||||
/** |
||||
* Execute operations against a MongoDB instance via session bound {@link MongoOperations}. The session is inferred |
||||
* directly into the operation so that no further interaction is necessary. |
||||
* <p /> |
||||
* Please note that only Spring Data specific abstractions like {@link MongoOperations#find(Query, Class)} and others |
||||
* are enhanced with the {@link com.mongodb.session.ClientSession}. When obtaining plain MongoDB gateway objects like |
||||
* {@link com.mongodb.client.MongoCollection} or {@link com.mongodb.client.MongoDatabase} via eg. |
||||
* {@link MongoOperations#getCollection(String)} we leave responsibility for {@link com.mongodb.session.ClientSession} |
||||
* again up to the caller. |
||||
* |
||||
* @param operations will never be {@literal null}. |
||||
* @return can be {@literal null}. |
||||
*/ |
||||
@Nullable |
||||
T doInSession(MongoOperations operations); |
||||
} |
||||
@ -0,0 +1,62 @@
@@ -0,0 +1,62 @@
|
||||
/* |
||||
* Copyright 2018 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb.core; |
||||
|
||||
import java.util.function.Consumer; |
||||
|
||||
import org.springframework.lang.Nullable; |
||||
|
||||
import com.mongodb.session.ClientSession; |
||||
|
||||
/** |
||||
* Gateway interface to execute {@link ClientSession} bound operations against MongoDB via a {@link SessionCallback}. |
||||
* <p /> |
||||
* The very same bound {@link ClientSession} is used for all invocations of {@code execute} on the instance. |
||||
* |
||||
* @author Christoph Strobl |
||||
* @since 2.1 |
||||
*/ |
||||
public interface SessionScoped { |
||||
|
||||
/** |
||||
* Executes the given {@link SessionCallback} within the {@link com.mongodb.session.ClientSession} |
||||
* <p/> |
||||
* It is up to the caller to make sure the {@link com.mongodb.session.ClientSession} is {@link ClientSession#close() |
||||
* closed} when done. |
||||
* |
||||
* @param action callback object that specifies the MongoDB action the callback action. Must not be {@literal null}. |
||||
* @param <T> return type. |
||||
* @return a result object returned by the action. Can be {@literal null}. |
||||
*/ |
||||
@Nullable |
||||
default <T> T execute(SessionCallback<T> action) { |
||||
return execute(action, session -> {}); |
||||
} |
||||
|
||||
/** |
||||
* Executes the given {@link SessionCallback} within the {@link com.mongodb.session.ClientSession} |
||||
* <p/> |
||||
* It is up to the caller to make sure the {@link com.mongodb.session.ClientSession} is {@link ClientSession#close() |
||||
* closed} when done. |
||||
* |
||||
* @param action callback object that specifies the MongoDB action the callback action. Must not be {@literal null}. |
||||
* @param doFinally |
||||
* @param <T> return type. |
||||
* @return a result object returned by the action. Can be {@literal null}. |
||||
*/ |
||||
@Nullable |
||||
<T> T execute(SessionCallback<T> action, Consumer<ClientSession> doFinally); |
||||
} |
||||
@ -0,0 +1,182 @@
@@ -0,0 +1,182 @@
|
||||
/* |
||||
* Copyright 2018 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb; |
||||
|
||||
import static org.assertj.core.api.Assertions.*; |
||||
import static org.mockito.ArgumentMatchers.eq; |
||||
import static org.mockito.Mockito.*; |
||||
import static org.mockito.Mockito.any; |
||||
|
||||
import java.lang.reflect.Method; |
||||
import java.lang.reflect.Proxy; |
||||
|
||||
import org.bson.Document; |
||||
import org.junit.Before; |
||||
import org.junit.Test; |
||||
import org.junit.runner.RunWith; |
||||
import org.mockito.Mock; |
||||
import org.mockito.junit.MockitoJUnitRunner; |
||||
import org.springframework.aop.framework.ProxyFactory; |
||||
import org.springframework.data.mongodb.SessionAwareMethodInterceptor.MethodCache; |
||||
import org.springframework.test.util.ReflectionTestUtils; |
||||
import org.springframework.util.ClassUtils; |
||||
|
||||
import com.mongodb.MongoClient; |
||||
import com.mongodb.client.MongoCollection; |
||||
import com.mongodb.client.MongoDatabase; |
||||
import com.mongodb.session.ClientSession; |
||||
|
||||
/** |
||||
* @author Christoph Strobl |
||||
*/ |
||||
@RunWith(MockitoJUnitRunner.class) |
||||
public class SessionAwareMethodInterceptorUnitTests { |
||||
|
||||
@Mock ClientSession session; |
||||
@Mock MongoCollection<Document> targetCollection; |
||||
@Mock MongoDatabase targetDatabase; |
||||
|
||||
MongoCollection collection; |
||||
MongoDatabase database; |
||||
|
||||
@Before |
||||
public void setUp() { |
||||
|
||||
collection = createProxyInstance(session, targetCollection, MongoCollection.class); |
||||
database = createProxyInstance(session, targetDatabase, MongoDatabase.class); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void proxyFactoryOnCollectionDelegatesToMethodWithSession() { |
||||
|
||||
collection.find(); |
||||
|
||||
verify(targetCollection).find(eq(session)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void proxyFactoryOnCollectionWithSessionInArgumentListProceedsWithExecution() { |
||||
|
||||
ClientSession yetAnotherSession = mock(ClientSession.class); |
||||
collection.find(yetAnotherSession); |
||||
|
||||
verify(targetCollection).find(eq(yetAnotherSession)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void proxyFactoryOnDatabaseDelegatesToMethodWithSession() { |
||||
|
||||
database.drop(); |
||||
|
||||
verify(targetDatabase).drop(eq(session)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void proxyFactoryOnDatabaseWithSessionInArgumentListProceedsWithExecution() { |
||||
|
||||
ClientSession yetAnotherSession = mock(ClientSession.class); |
||||
database.drop(yetAnotherSession); |
||||
|
||||
verify(targetDatabase).drop(eq(yetAnotherSession)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void justMoveOnIfNoOverloadWithSessionAvailable() { |
||||
|
||||
collection.getReadPreference(); |
||||
|
||||
verify(targetCollection).getReadPreference(); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void usesCacheForMethodLookup() { |
||||
|
||||
MethodCache cache = (MethodCache) ReflectionTestUtils.getField(SessionAwareMethodInterceptor.class, "METHOD_CACHE"); |
||||
Method countMethod = ClassUtils.getMethod(MongoCollection.class, "count"); |
||||
|
||||
assertThat(cache.contains(countMethod, MongoCollection.class)).isFalse(); |
||||
|
||||
collection.count(); |
||||
|
||||
assertThat(cache.contains(countMethod, MongoCollection.class)).isTrue(); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void cachesNullForMethodsThatDoNotHaveASessionOverload() { |
||||
|
||||
MethodCache cache = (MethodCache) ReflectionTestUtils.getField(SessionAwareMethodInterceptor.class, "METHOD_CACHE"); |
||||
Method readConcernMethod = ClassUtils.getMethod(MongoCollection.class, "getReadConcern"); |
||||
|
||||
assertThat(cache.contains(readConcernMethod, MongoCollection.class)).isFalse(); |
||||
|
||||
collection.getReadConcern(); |
||||
|
||||
collection.getReadConcern(); |
||||
|
||||
assertThat(cache.contains(readConcernMethod, MongoCollection.class)).isTrue(); |
||||
assertThat(cache.lookup(readConcernMethod, MongoCollection.class)).isEmpty(); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void proxiesNewDbInstanceReturnedByMethdod() { |
||||
|
||||
MongoDatabase otherDb = mock(MongoDatabase.class); |
||||
when(targetDatabase.withCodecRegistry(any())).thenReturn(otherDb); |
||||
|
||||
MongoDatabase target = database.withCodecRegistry(MongoClient.getDefaultCodecRegistry()); |
||||
assertThat(target).isInstanceOf(Proxy.class).isNotSameAs(database).isNotSameAs(targetDatabase); |
||||
|
||||
target.drop(); |
||||
|
||||
verify(otherDb).drop(eq(session)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void proxiesNewCollectionInstanceReturnedByMethdod() { |
||||
|
||||
MongoCollection otherCollection = mock(MongoCollection.class); |
||||
when(targetCollection.withCodecRegistry(any())).thenReturn(otherCollection); |
||||
|
||||
MongoCollection target = collection.withCodecRegistry(MongoClient.getDefaultCodecRegistry()); |
||||
assertThat(target).isInstanceOf(Proxy.class).isNotSameAs(collection).isNotSameAs(targetCollection); |
||||
|
||||
target.drop(); |
||||
|
||||
verify(otherCollection).drop(eq(session)); |
||||
} |
||||
|
||||
private MongoDatabase proxyDatabase(ClientSession session, MongoDatabase database) { |
||||
return createProxyInstance(session, database, MongoDatabase.class); |
||||
} |
||||
|
||||
private MongoCollection proxyCollection(ClientSession session, MongoCollection collection) { |
||||
return createProxyInstance(session, collection, MongoCollection.class); |
||||
} |
||||
|
||||
private <T> T createProxyInstance(ClientSession session, T target, Class<T> targetType) { |
||||
|
||||
ProxyFactory factory = new ProxyFactory(); |
||||
factory.setTarget(target); |
||||
factory.setInterfaces(targetType); |
||||
factory.setOpaque(true); |
||||
|
||||
factory.addAdvice(new SessionAwareMethodInterceptor<>(session, target, MongoDatabase.class, this::proxyDatabase, |
||||
MongoCollection.class, this::proxyCollection)); |
||||
|
||||
return (T) factory.getProxy(); |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,68 @@
@@ -0,0 +1,68 @@
|
||||
/* |
||||
* Copyright 2018 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb.core; |
||||
|
||||
import static org.assertj.core.api.Assertions.*; |
||||
|
||||
import org.bson.Document; |
||||
import org.junit.Before; |
||||
import org.junit.ClassRule; |
||||
import org.junit.Test; |
||||
import org.springframework.data.mongodb.core.query.Query; |
||||
import org.springframework.data.mongodb.test.util.MongoVersionRule; |
||||
import org.springframework.data.util.Version; |
||||
|
||||
import com.mongodb.ClientSessionOptions; |
||||
import com.mongodb.MongoClient; |
||||
import com.mongodb.session.ClientSession; |
||||
|
||||
/** |
||||
* @author Christoph Strobl |
||||
*/ |
||||
public class ClientSessionTests { |
||||
|
||||
public static @ClassRule MongoVersionRule REQUIRES_AT_LEAST_3_6_0 = MongoVersionRule.atLeast(Version.parse("3.6.0")); |
||||
|
||||
MongoTemplate template; |
||||
MongoClient client; |
||||
|
||||
@Before |
||||
public void setUp() { |
||||
|
||||
client = new MongoClient(); |
||||
template = new MongoTemplate(client, "reflective-client-session-tests"); |
||||
template.getDb().getCollection("test").drop(); |
||||
|
||||
template.getDb().getCollection("test").insertOne(new Document("_id", "id-1").append("value", "spring")); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void shouldApplyClientSession() { |
||||
|
||||
ClientSession session = client.startSession(ClientSessionOptions.builder().causallyConsistent(true).build()); |
||||
|
||||
assertThat(session.getOperationTime()).isNull(); |
||||
|
||||
Document doc = template.withSession(() -> session) |
||||
.execute(action -> action.findOne(new Query(), Document.class, "test")); |
||||
|
||||
assertThat(doc).isNotNull(); |
||||
assertThat(session.getOperationTime()).isNotNull(); |
||||
assertThat(session.getServerSession().isClosed()).isFalse(); |
||||
|
||||
session.close(); |
||||
} |
||||
} |
||||
@ -0,0 +1,134 @@
@@ -0,0 +1,134 @@
|
||||
/* |
||||
* Copyright 2018 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb.core; |
||||
|
||||
import static org.assertj.core.api.Assertions.*; |
||||
|
||||
import reactor.core.publisher.Mono; |
||||
import reactor.test.StepVerifier; |
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger; |
||||
import java.util.function.Supplier; |
||||
|
||||
import org.bson.Document; |
||||
import org.junit.Before; |
||||
import org.junit.ClassRule; |
||||
import org.junit.Test; |
||||
import org.springframework.data.mongodb.core.query.Query; |
||||
import org.springframework.data.mongodb.test.util.MongoVersionRule; |
||||
import org.springframework.data.util.Version; |
||||
|
||||
import com.mongodb.ClientSessionOptions; |
||||
import com.mongodb.reactivestreams.client.MongoClient; |
||||
import com.mongodb.reactivestreams.client.MongoClients; |
||||
import com.mongodb.session.ClientSession; |
||||
|
||||
/** |
||||
* @author Christoph Strobl |
||||
*/ |
||||
public class ReactiveClientSessionTests { |
||||
|
||||
public static @ClassRule MongoVersionRule REQUIRES_AT_LEAST_3_6_0 = MongoVersionRule.atLeast(Version.parse("3.6.0")); |
||||
|
||||
MongoClient client; |
||||
ReactiveMongoTemplate template; |
||||
|
||||
@Before |
||||
public void setUp() { |
||||
|
||||
client = MongoClients.create(); |
||||
|
||||
template = new ReactiveMongoTemplate(client, "reflective-client-session-tests"); |
||||
|
||||
StepVerifier.create(template.dropCollection("test")).verifyComplete(); |
||||
|
||||
StepVerifier.create(template.insert(new Document("_id", "id-1").append("value", "spring"), "test")) |
||||
.expectNextCount(1).verifyComplete(); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void shouldApplyClientSession() { |
||||
|
||||
ClientSession session = Mono |
||||
.from(client.startSession(ClientSessionOptions.builder().causallyConsistent(true).build())).block(); |
||||
|
||||
assertThat(session.getOperationTime()).isNull(); |
||||
|
||||
StepVerifier.create(template.withSession(() -> session).execute(action -> action.findAll(Document.class, "test"))) |
||||
.expectNextCount(1).verifyComplete(); |
||||
|
||||
assertThat(session.getOperationTime()).isNotNull(); |
||||
assertThat(session.getServerSession().isClosed()).isFalse(); |
||||
|
||||
session.close(); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void useMonoInCallback() { |
||||
|
||||
ClientSession session = Mono |
||||
.from(client.startSession(ClientSessionOptions.builder().causallyConsistent(true).build())).block(); |
||||
|
||||
assertThat(session.getOperationTime()).isNull(); |
||||
|
||||
StepVerifier |
||||
.create( |
||||
template.withSession(() -> session).execute(action -> action.findOne(new Query(), Document.class, "test"))) |
||||
.expectNextCount(1).verifyComplete(); |
||||
|
||||
assertThat(session.getOperationTime()).isNotNull(); |
||||
assertThat(session.getServerSession().isClosed()).isFalse(); |
||||
|
||||
session.close(); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void reusesClientSessionInSessionScopedCallback() { |
||||
|
||||
ClientSession session = Mono |
||||
.from(client.startSession(ClientSessionOptions.builder().causallyConsistent(true).build())).block(); |
||||
CountingSessionSupplier sessionSupplier = new CountingSessionSupplier(session); |
||||
|
||||
ReactiveSessionScoped sessionScoped = template.withSession(sessionSupplier); |
||||
|
||||
sessionScoped.execute(action -> action.findOne(new Query(), Document.class, "test")).blockFirst(); |
||||
assertThat(sessionSupplier.getInvocationCount()).isEqualTo(1); |
||||
|
||||
sessionScoped.execute(action -> action.findOne(new Query(), Document.class, "test")).blockFirst(); |
||||
assertThat(sessionSupplier.getInvocationCount()).isEqualTo(1); |
||||
} |
||||
|
||||
static class CountingSessionSupplier implements Supplier<ClientSession> { |
||||
|
||||
AtomicInteger invocationCount = new AtomicInteger(0); |
||||
final ClientSession session; |
||||
|
||||
public CountingSessionSupplier(ClientSession session) { |
||||
this.session = session; |
||||
} |
||||
|
||||
@Override |
||||
public ClientSession get() { |
||||
|
||||
invocationCount.incrementAndGet(); |
||||
return session; |
||||
} |
||||
|
||||
int getInvocationCount() { |
||||
return invocationCount.get(); |
||||
} |
||||
} |
||||
} |
||||
@ -0,0 +1,319 @@
@@ -0,0 +1,319 @@
|
||||
/* |
||||
* Copyright 2018 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb.core; |
||||
|
||||
import static org.assertj.core.api.Assertions.*; |
||||
import static org.mockito.ArgumentMatchers.eq; |
||||
import static org.mockito.Mockito.*; |
||||
import static org.mockito.Mockito.any; |
||||
import static org.mockito.Mockito.anyBoolean; |
||||
import static org.mockito.Mockito.anyInt; |
||||
import static org.mockito.Mockito.anyList; |
||||
import static org.mockito.Mockito.anyString; |
||||
|
||||
import java.lang.reflect.Proxy; |
||||
|
||||
import com.mongodb.reactivestreams.client.MongoClient; |
||||
import org.bson.Document; |
||||
import org.bson.codecs.BsonValueCodec; |
||||
import org.bson.codecs.configuration.CodecRegistry; |
||||
import org.junit.Before; |
||||
import org.junit.Ignore; |
||||
import org.junit.Test; |
||||
import org.junit.runner.RunWith; |
||||
import org.mockito.Mock; |
||||
import org.mockito.junit.MockitoJUnitRunner; |
||||
import org.reactivestreams.Publisher; |
||||
import org.springframework.data.geo.Metrics; |
||||
import org.springframework.data.geo.Point; |
||||
import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory; |
||||
import org.springframework.data.mongodb.core.ReactiveMongoTemplate.NoOpDbRefResolver; |
||||
import org.springframework.data.mongodb.core.ReactiveMongoTemplate.ReactiveSessionBoundMongoTemplate; |
||||
import org.springframework.data.mongodb.core.SimpleReactiveMongoDatabaseFactory.ClientSessionBoundMongoDbFactory; |
||||
import org.springframework.data.mongodb.core.aggregation.Aggregation; |
||||
import org.springframework.data.mongodb.core.convert.MappingMongoConverter; |
||||
import org.springframework.data.mongodb.core.mapping.MongoMappingContext; |
||||
import org.springframework.data.mongodb.core.query.NearQuery; |
||||
import org.springframework.data.mongodb.core.query.Query; |
||||
import org.springframework.data.mongodb.core.query.Update; |
||||
|
||||
import com.mongodb.client.model.CountOptions; |
||||
import com.mongodb.client.model.DeleteOptions; |
||||
import com.mongodb.client.model.FindOneAndUpdateOptions; |
||||
import com.mongodb.client.model.UpdateOptions; |
||||
import com.mongodb.reactivestreams.client.AggregatePublisher; |
||||
import com.mongodb.reactivestreams.client.DistinctPublisher; |
||||
import com.mongodb.reactivestreams.client.FindPublisher; |
||||
import com.mongodb.reactivestreams.client.MongoCollection; |
||||
import com.mongodb.reactivestreams.client.MongoDatabase; |
||||
import com.mongodb.session.ClientSession; |
||||
|
||||
/** |
||||
* @author Christoph Strobl |
||||
*/ |
||||
@RunWith(MockitoJUnitRunner.Silent.class) |
||||
public class ReactiveSessionBoundMongoTemplateUnitTests { |
||||
|
||||
private static final String COLLECTION_NAME = "collection-1"; |
||||
|
||||
ReactiveSessionBoundMongoTemplate template; |
||||
MongoMappingContext mappingContext; |
||||
MappingMongoConverter converter; |
||||
|
||||
ReactiveMongoDatabaseFactory factory; |
||||
|
||||
@Mock MongoCollection collection; |
||||
@Mock MongoDatabase database; |
||||
@Mock ClientSession clientSession; |
||||
@Mock FindPublisher findPublisher; |
||||
@Mock AggregatePublisher aggregatePublisher; |
||||
@Mock DistinctPublisher distinctPublisher; |
||||
@Mock Publisher resultPublisher; |
||||
@Mock MongoClient client; |
||||
@Mock CodecRegistry codecRegistry; |
||||
|
||||
@Before |
||||
public void setUp() { |
||||
|
||||
when(client.getDatabase(anyString())).thenReturn(database); |
||||
when(codecRegistry.get(any(Class.class))).thenReturn(new BsonValueCodec()); |
||||
when(database.getCodecRegistry()).thenReturn(codecRegistry); |
||||
when(database.getCollection(anyString())).thenReturn(collection); |
||||
when(database.getCollection(anyString(), any())).thenReturn(collection); |
||||
when(database.listCollectionNames(any(ClientSession.class))).thenReturn(findPublisher); |
||||
when(database.createCollection(any(ClientSession.class), any(), any())).thenReturn(resultPublisher); |
||||
when(database.runCommand(any(ClientSession.class), any(), any(Class.class))).thenReturn(resultPublisher); |
||||
when(collection.find(any(ClientSession.class))).thenReturn(findPublisher); |
||||
when(collection.find(any(ClientSession.class), any(Document.class))).thenReturn(findPublisher); |
||||
when(collection.find(any(ClientSession.class), any(Class.class))).thenReturn(findPublisher); |
||||
when(collection.find(any(ClientSession.class), any(), any())).thenReturn(findPublisher); |
||||
when(collection.deleteMany(any(ClientSession.class), any(), any())).thenReturn(resultPublisher); |
||||
when(collection.insertOne(any(ClientSession.class), any(Document.class))).thenReturn(resultPublisher); |
||||
when(collection.aggregate(any(ClientSession.class), anyList(), any(Class.class))).thenReturn(aggregatePublisher); |
||||
when(collection.count(any(ClientSession.class), any(), any(CountOptions.class))).thenReturn(resultPublisher); |
||||
when(collection.drop(any(ClientSession.class))).thenReturn(resultPublisher); |
||||
when(collection.findOneAndUpdate(any(ClientSession.class), any(), any(), any())).thenReturn(resultPublisher); |
||||
when(collection.distinct(any(ClientSession.class), any(), any(), any())).thenReturn(distinctPublisher); |
||||
when(collection.updateOne(any(ClientSession.class), any(), any(), any(UpdateOptions.class))) |
||||
.thenReturn(resultPublisher); |
||||
when(collection.updateMany(any(ClientSession.class), any(), any(), any(UpdateOptions.class))) |
||||
.thenReturn(resultPublisher); |
||||
when(collection.dropIndex(any(ClientSession.class), anyString())).thenReturn(resultPublisher); |
||||
when(findPublisher.projection(any())).thenReturn(findPublisher); |
||||
when(findPublisher.limit(anyInt())).thenReturn(findPublisher); |
||||
when(findPublisher.collation(any())).thenReturn(findPublisher); |
||||
when(findPublisher.first()).thenReturn(resultPublisher); |
||||
when(aggregatePublisher.allowDiskUse(anyBoolean())).thenReturn(aggregatePublisher); |
||||
when(aggregatePublisher.useCursor(anyBoolean())).thenReturn(aggregatePublisher); |
||||
|
||||
factory = new SimpleReactiveMongoDatabaseFactory(client, "foo"); |
||||
|
||||
this.mappingContext = new MongoMappingContext(); |
||||
this.converter = new MappingMongoConverter(new NoOpDbRefResolver(), mappingContext); |
||||
this.template = new ReactiveSessionBoundMongoTemplate(clientSession, new ReactiveMongoTemplate(factory, converter)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void executeUsesProxiedCollectionInCallback() { |
||||
|
||||
template.execute("collection", MongoCollection::find).subscribe(); |
||||
|
||||
verify(collection, never()).find(); |
||||
verify(collection).find(eq(clientSession)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void executeUsesProxiedDatabaseInCallback() { |
||||
|
||||
template.execute(MongoDatabase::listCollectionNames).subscribe(); |
||||
|
||||
verify(database, never()).listCollectionNames(); |
||||
verify(database).listCollectionNames(eq(clientSession)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void findOneUsesProxiedCollection() { |
||||
|
||||
template.findOne(new Query(), Person.class).subscribe(); |
||||
|
||||
verify(collection).find(eq(clientSession), any(), any()); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void findShouldUseProxiedCollection() { |
||||
|
||||
template.find(new Query(), Person.class).subscribe(); |
||||
|
||||
verify(collection).find(eq(clientSession), any(Class.class)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void findAllShouldUseProxiedCollection() { |
||||
|
||||
template.findAll(Person.class).subscribe(); |
||||
|
||||
verify(collection).find(eq(clientSession), eq(Document.class)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void executeCommandShouldUseProxiedDatabase() { |
||||
|
||||
template.executeCommand("{}").subscribe(); |
||||
|
||||
verify(database).runCommand(eq(clientSession), any(), any(Class.class)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void removeShouldUseProxiedCollection() { |
||||
|
||||
template.remove(new Query(), Person.class).subscribe(); |
||||
|
||||
verify(collection).deleteMany(eq(clientSession), any(), any(DeleteOptions.class)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void insertShouldUseProxiedCollection() { |
||||
|
||||
template.insert(new Person()).subscribe(); |
||||
|
||||
verify(collection).insertOne(eq(clientSession), any(Document.class)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void aggregateShouldUseProxiedCollection() { |
||||
|
||||
template.aggregate(Aggregation.newAggregation(Aggregation.project("foo")), COLLECTION_NAME, Person.class) |
||||
.subscribe(); |
||||
|
||||
verify(collection).aggregate(eq(clientSession), anyList(), eq(Document.class)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void collectionExistsShouldUseProxiedDatabase() { |
||||
|
||||
template.collectionExists(Person.class).subscribe(); |
||||
|
||||
verify(database).listCollectionNames(eq(clientSession)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void countShouldUseProxiedCollection() { |
||||
|
||||
template.count(new Query(), Person.class).subscribe(); |
||||
|
||||
verify(collection).count(eq(clientSession), any(), any(CountOptions.class)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void createCollectionShouldUseProxiedDatabase() { |
||||
|
||||
template.createCollection(Person.class).subscribe(); |
||||
|
||||
verify(database).createCollection(eq(clientSession), anyString(), any()); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void dropShouldUseProxiedCollection() { |
||||
|
||||
template.dropCollection(Person.class).subscribe(); |
||||
|
||||
verify(collection).drop(eq(clientSession)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void findAndModifyShouldUseProxiedCollection() { |
||||
|
||||
template.findAndModify(new Query(), new Update().set("foo", "bar"), Person.class).subscribe(); |
||||
|
||||
verify(collection).findOneAndUpdate(eq(clientSession), any(), any(), any(FindOneAndUpdateOptions.class)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void findDistinctShouldUseProxiedCollection() { |
||||
|
||||
template.findDistinct(new Query(), "firstName", Person.class, String.class).subscribe(); |
||||
|
||||
verify(collection).distinct(eq(clientSession), anyString(), any(), any()); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void geoNearShouldUseProxiedDatabase() { |
||||
|
||||
template.geoNear(NearQuery.near(new Point(0, 0), Metrics.NEUTRAL), Person.class).subscribe(); |
||||
|
||||
verify(database).runCommand(eq(clientSession), any(), eq(Document.class)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880, DATAMONGO-1889
|
||||
@Ignore("No group by yet - DATAMONGO-1889") |
||||
public void groupShouldUseProxiedDatabase() { |
||||
|
||||
// template.group(COLLECTION_NAME, GroupBy.key("firstName"), Person.class).subscribe();
|
||||
|
||||
verify(database).runCommand(eq(clientSession), any(), eq(Document.class)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880, DATAMONGO-1890
|
||||
@Ignore("No map reduce yet on template - DATAMONGO-1890") |
||||
public void mapReduceShouldUseProxiedCollection() { |
||||
|
||||
// template.mapReduce(COLLECTION_NAME, "foo", "bar", Person.class);
|
||||
|
||||
verify(collection).mapReduce(eq(clientSession), anyString(), anyString(), eq(Document.class)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void updateFirstShouldUseProxiedCollection() { |
||||
|
||||
template.updateFirst(new Query(), Update.update("foo", "bar"), Person.class).subscribe(); |
||||
|
||||
verify(collection).updateOne(eq(clientSession), any(), any(), any(UpdateOptions.class)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void updateMultiShouldUseProxiedCollection() { |
||||
|
||||
template.updateMulti(new Query(), Update.update("foo", "bar"), Person.class).subscribe(); |
||||
|
||||
verify(collection).updateMany(eq(clientSession), any(), any(), any(UpdateOptions.class)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void upsertShouldUseProxiedCollection() { |
||||
|
||||
template.upsert(new Query(), Update.update("foo", "bar"), Person.class).subscribe(); |
||||
|
||||
verify(collection).updateOne(eq(clientSession), any(), any(), any(UpdateOptions.class)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void getCollectionShouldShouldJustReturnTheCollection/*No ClientSession binding*/() { |
||||
assertThat(template.getCollection(COLLECTION_NAME)).isNotInstanceOf(Proxy.class); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void getDbShouldJustReturnTheDatabase/*No ClientSession binding*/() { |
||||
assertThat(template.getMongoDatabase()).isNotInstanceOf(Proxy.class); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void indexOpsShouldUseProxiedCollection() { |
||||
|
||||
template.indexOps(COLLECTION_NAME).dropIndex("index-name").subscribe(); |
||||
|
||||
verify(collection).dropIndex(eq(clientSession), eq("index-name")); |
||||
} |
||||
} |
||||
@ -0,0 +1,298 @@
@@ -0,0 +1,298 @@
|
||||
/* |
||||
* Copyright 2018 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb.core; |
||||
|
||||
import static org.assertj.core.api.Assertions.*; |
||||
import static org.mockito.ArgumentMatchers.any; |
||||
import static org.mockito.ArgumentMatchers.eq; |
||||
import static org.mockito.Mockito.*; |
||||
|
||||
import lombok.Data; |
||||
|
||||
import java.lang.reflect.InvocationHandler; |
||||
import java.lang.reflect.Proxy; |
||||
import java.util.ArrayList; |
||||
import java.util.Collections; |
||||
import java.util.List; |
||||
|
||||
import org.aopalliance.aop.Advice; |
||||
import org.bson.Document; |
||||
import org.junit.After; |
||||
import org.junit.Before; |
||||
import org.junit.ClassRule; |
||||
import org.junit.Rule; |
||||
import org.junit.Test; |
||||
import org.junit.rules.ExpectedException; |
||||
import org.mockito.Mockito; |
||||
import org.springframework.aop.Advisor; |
||||
import org.springframework.aop.framework.Advised; |
||||
import org.springframework.dao.DataAccessException; |
||||
import org.springframework.data.annotation.Id; |
||||
import org.springframework.data.mongodb.ClientSessionException; |
||||
import org.springframework.data.mongodb.LazyLoadingException; |
||||
import org.springframework.data.mongodb.MongoDbFactory; |
||||
import org.springframework.data.mongodb.SessionAwareMethodInterceptor; |
||||
import org.springframework.data.mongodb.core.MongoTemplate.SessionBoundMongoTemplate; |
||||
import org.springframework.data.mongodb.core.aggregation.Aggregation; |
||||
import org.springframework.data.mongodb.core.convert.DbRefResolver; |
||||
import org.springframework.data.mongodb.core.convert.DefaultDbRefResolver; |
||||
import org.springframework.data.mongodb.core.convert.MappingMongoConverter; |
||||
import org.springframework.data.mongodb.core.convert.MongoConverter; |
||||
import org.springframework.data.mongodb.core.convert.MongoCustomConversions; |
||||
import org.springframework.data.mongodb.core.mapping.DBRef; |
||||
import org.springframework.data.mongodb.core.mapping.MongoMappingContext; |
||||
import org.springframework.data.mongodb.core.query.Query; |
||||
import org.springframework.data.mongodb.test.util.MongoVersionRule; |
||||
import org.springframework.data.util.Version; |
||||
import org.springframework.test.util.ReflectionTestUtils; |
||||
|
||||
import com.mongodb.ClientSessionOptions; |
||||
import com.mongodb.MongoClient; |
||||
import com.mongodb.client.MongoCollection; |
||||
import com.mongodb.client.MongoDatabase; |
||||
import com.mongodb.session.ClientSession; |
||||
|
||||
/** |
||||
* Integration tests for {@link SessionBoundMongoTemplate} operating up an active {@link ClientSession}. |
||||
* |
||||
* @author Christoph Strobl |
||||
*/ |
||||
public class SessionBoundMongoTemplateTests { |
||||
|
||||
public static @ClassRule MongoVersionRule REQUIRES_AT_LEAST_3_6_0 = MongoVersionRule.atLeast(Version.parse("3.6.0")); |
||||
|
||||
public @Rule ExpectedException exception = ExpectedException.none(); |
||||
|
||||
MongoTemplate template; |
||||
SessionBoundMongoTemplate sessionBoundTemplate; |
||||
ClientSession session; |
||||
volatile List<MongoCollection<Document>> spiedCollections = new ArrayList<>(); |
||||
volatile List<MongoDatabase> spiedDatabases = new ArrayList<>(); |
||||
|
||||
@Before |
||||
public void setUp() { |
||||
|
||||
MongoClient client = new MongoClient(); |
||||
|
||||
MongoDbFactory factory = new SimpleMongoDbFactory(client, "session-bound-mongo-template-tests") { |
||||
|
||||
@Override |
||||
public MongoDatabase getDb() throws DataAccessException { |
||||
|
||||
MongoDatabase spiedDatabse = Mockito.spy(super.getDb()); |
||||
spiedDatabases.add(spiedDatabse); |
||||
return spiedDatabse; |
||||
} |
||||
}; |
||||
|
||||
session = client.startSession(ClientSessionOptions.builder().build()); |
||||
|
||||
this.template = new MongoTemplate(factory); |
||||
|
||||
this.sessionBoundTemplate = new SessionBoundMongoTemplate(session, |
||||
new MongoTemplate(factory, getDefaultMongoConverter(factory))) { |
||||
|
||||
@Override |
||||
protected MongoCollection<Document> prepareCollection(MongoCollection<Document> collection) { |
||||
|
||||
InvocationHandler handler = Proxy.getInvocationHandler(collection); |
||||
|
||||
Advised advised = (Advised) ReflectionTestUtils.getField(handler, "advised"); |
||||
|
||||
for (Advisor advisor : advised.getAdvisors()) { |
||||
Advice advice = advisor.getAdvice(); |
||||
if (advice instanceof SessionAwareMethodInterceptor) { |
||||
|
||||
MongoCollection<Document> spiedCollection = Mockito |
||||
.spy((MongoCollection<Document>) ReflectionTestUtils.getField(advice, "target")); |
||||
spiedCollections.add(spiedCollection); |
||||
|
||||
ReflectionTestUtils.setField(advice, "target", spiedCollection); |
||||
} |
||||
} |
||||
|
||||
return super.prepareCollection(collection); |
||||
} |
||||
}; |
||||
} |
||||
|
||||
@After |
||||
public void tearDown() { |
||||
session.close(); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void findDelegatesToMethodWithSession() { |
||||
|
||||
sessionBoundTemplate.find(new Query(), Person.class); |
||||
|
||||
verify(operation(0)).find(eq(session), any(), any()); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void fluentFindDelegatesToMethodWithSession() { |
||||
|
||||
sessionBoundTemplate.query(Person.class).all(); |
||||
|
||||
verify(operation(0)).find(eq(session), any(), any()); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void aggregateDelegatesToMethoddWithSession() { |
||||
|
||||
sessionBoundTemplate.aggregate(Aggregation.newAggregation(Aggregation.project("firstName")), Person.class, |
||||
Person.class); |
||||
|
||||
verify(operation(0)).aggregate(eq(session), any(), any()); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void collectionExistsDelegatesToMethodWithSession() { |
||||
|
||||
sessionBoundTemplate.collectionExists(Person.class); |
||||
|
||||
verify(command(0)).listCollectionNames(eq(session)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void shouldLoadDbRefWhenSessionIsActive() { |
||||
|
||||
Person person = new Person("Kylar Stern"); |
||||
|
||||
template.save(person); |
||||
|
||||
WithDbRef wdr = new WithDbRef(); |
||||
wdr.id = "id-1"; |
||||
wdr.personRef = person; |
||||
|
||||
template.save(wdr); |
||||
|
||||
WithDbRef result = sessionBoundTemplate.findById(wdr.id, WithDbRef.class); |
||||
|
||||
assertThat(result.personRef).isEqualTo(person); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void shouldErrorOnLoadDbRefWhenSessionIsClosed() { |
||||
|
||||
exception.expect(ClientSessionException.class); |
||||
|
||||
Person person = new Person("Kylar Stern"); |
||||
|
||||
template.save(person); |
||||
|
||||
WithDbRef wdr = new WithDbRef(); |
||||
wdr.id = "id-1"; |
||||
wdr.personRef = person; |
||||
|
||||
template.save(wdr); |
||||
|
||||
session.close(); |
||||
|
||||
sessionBoundTemplate.findById(wdr.id, WithDbRef.class); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void shouldLoadLazyDbRefWhenSessionIsActive() { |
||||
|
||||
Person person = new Person("Kylar Stern"); |
||||
|
||||
template.save(person); |
||||
|
||||
WithLazyDbRef wdr = new WithLazyDbRef(); |
||||
wdr.id = "id-1"; |
||||
wdr.personRef = person; |
||||
|
||||
template.save(wdr); |
||||
|
||||
WithLazyDbRef result = sessionBoundTemplate.findById(wdr.id, WithLazyDbRef.class); |
||||
|
||||
assertThat(result.getPersonRef()).isEqualTo(person); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void shouldErrorOnLoadLazyDbRefWhenSessionIsClosed() { |
||||
|
||||
exception.expect(LazyLoadingException.class); |
||||
exception.expectMessage("Invalid session state"); |
||||
|
||||
Person person = new Person("Kylar Stern"); |
||||
|
||||
template.save(person); |
||||
|
||||
WithLazyDbRef wdr = new WithLazyDbRef(); |
||||
wdr.id = "id-1"; |
||||
wdr.personRef = person; |
||||
|
||||
template.save(wdr); |
||||
|
||||
WithLazyDbRef result = null; |
||||
try { |
||||
result = sessionBoundTemplate.findById(wdr.id, WithLazyDbRef.class); |
||||
} catch (Exception e) { |
||||
fail("Someting went wrong, seems the session was already closed when reading.", e); |
||||
} |
||||
|
||||
session.close(); // now close the session
|
||||
|
||||
assertThat(result.getPersonRef()).isEqualTo(person); // resolve the lazy loading proxy
|
||||
} |
||||
|
||||
@Data |
||||
static class WithDbRef { |
||||
|
||||
@Id String id; |
||||
@DBRef Person personRef; |
||||
} |
||||
|
||||
@Data |
||||
static class WithLazyDbRef { |
||||
|
||||
@Id String id; |
||||
@DBRef(lazy = true) Person personRef; |
||||
|
||||
public Person getPersonRef() { |
||||
return personRef; |
||||
} |
||||
} |
||||
|
||||
// --> Just some helpers for testing
|
||||
|
||||
MongoCollection<Document> operation(int index) { |
||||
return spiedCollections.get(index); |
||||
} |
||||
|
||||
MongoDatabase command(int index) { |
||||
return spiedDatabases.get(index); |
||||
} |
||||
|
||||
private MongoConverter getDefaultMongoConverter(MongoDbFactory factory) { |
||||
|
||||
DbRefResolver dbRefResolver = new DefaultDbRefResolver(factory); |
||||
MongoCustomConversions conversions = new MongoCustomConversions(Collections.emptyList()); |
||||
|
||||
MongoMappingContext mappingContext = new MongoMappingContext(); |
||||
mappingContext.setSimpleTypeHolder(conversions.getSimpleTypeHolder()); |
||||
mappingContext.afterPropertiesSet(); |
||||
|
||||
MappingMongoConverter converter = new MappingMongoConverter(dbRefResolver, mappingContext); |
||||
converter.setCustomConversions(conversions); |
||||
converter.afterPropertiesSet(); |
||||
|
||||
return converter; |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,358 @@
@@ -0,0 +1,358 @@
|
||||
/* |
||||
* Copyright 2018 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.data.mongodb.core; |
||||
|
||||
import static org.mockito.Mockito.*; |
||||
import static org.springframework.data.mongodb.test.util.Assertions.*; |
||||
|
||||
import java.lang.reflect.Proxy; |
||||
import java.util.Collections; |
||||
|
||||
import org.bson.Document; |
||||
import org.bson.codecs.BsonValueCodec; |
||||
import org.bson.codecs.configuration.CodecRegistry; |
||||
import org.junit.Before; |
||||
import org.junit.Test; |
||||
import org.junit.runner.RunWith; |
||||
import org.mockito.Mock; |
||||
import org.mockito.junit.MockitoJUnitRunner; |
||||
import org.springframework.data.geo.Metrics; |
||||
import org.springframework.data.geo.Point; |
||||
import org.springframework.data.mongodb.MongoDbFactory; |
||||
import org.springframework.data.mongodb.core.BulkOperations.BulkMode; |
||||
import org.springframework.data.mongodb.core.MongoTemplate.SessionBoundMongoTemplate; |
||||
import org.springframework.data.mongodb.core.aggregation.Aggregation; |
||||
import org.springframework.data.mongodb.core.convert.DefaultDbRefResolver; |
||||
import org.springframework.data.mongodb.core.convert.MappingMongoConverter; |
||||
import org.springframework.data.mongodb.core.mapping.MongoMappingContext; |
||||
import org.springframework.data.mongodb.core.mapreduce.GroupBy; |
||||
import org.springframework.data.mongodb.core.query.NearQuery; |
||||
import org.springframework.data.mongodb.core.query.Query; |
||||
import org.springframework.data.mongodb.core.query.Update; |
||||
|
||||
import com.mongodb.MongoClient; |
||||
import com.mongodb.client.AggregateIterable; |
||||
import com.mongodb.client.DistinctIterable; |
||||
import com.mongodb.client.FindIterable; |
||||
import com.mongodb.client.MapReduceIterable; |
||||
import com.mongodb.client.MongoCollection; |
||||
import com.mongodb.client.MongoCursor; |
||||
import com.mongodb.client.MongoDatabase; |
||||
import com.mongodb.client.MongoIterable; |
||||
import com.mongodb.client.model.CountOptions; |
||||
import com.mongodb.client.model.DeleteOptions; |
||||
import com.mongodb.client.model.FindOneAndUpdateOptions; |
||||
import com.mongodb.client.model.UpdateOptions; |
||||
import com.mongodb.session.ClientSession; |
||||
|
||||
/** |
||||
* Unit test for {@link SessionBoundMongoTemplate} making sure a proxied {@link MongoCollection} and |
||||
* {@link MongoDatabase} is used for executing high level commands like {@link MongoOperations#find(Query, Class)} |
||||
* provided by Spring Data. Those commands simply handing over MongoDB base types for interaction like when obtaining a |
||||
* {@link MongoCollection} via {@link MongoOperations#getCollection(String)} shall not be proxied as the user can |
||||
* control the behavior by using the methods dedicated for {@link ClientSession} directly. |
||||
* |
||||
* @author Christoph Strobl |
||||
*/ |
||||
@RunWith(MockitoJUnitRunner.Silent.class) |
||||
public class SessionBoundMongoTemplateUnitTests { |
||||
|
||||
private static final String COLLECTION_NAME = "collection-1"; |
||||
|
||||
SessionBoundMongoTemplate template; |
||||
|
||||
MongoDbFactory factory; |
||||
|
||||
@Mock MongoCollection collection; |
||||
@Mock MongoDatabase database; |
||||
@Mock MongoClient client; |
||||
@Mock ClientSession clientSession; |
||||
@Mock FindIterable findIterable; |
||||
@Mock MongoIterable mongoIterable; |
||||
@Mock DistinctIterable distinctIterable; |
||||
@Mock AggregateIterable aggregateIterable; |
||||
@Mock MapReduceIterable mapReduceIterable; |
||||
@Mock MongoCursor cursor; |
||||
@Mock CodecRegistry codecRegistry; |
||||
|
||||
MappingMongoConverter converter; |
||||
MongoMappingContext mappingContext; |
||||
|
||||
@Before |
||||
public void setUp() { |
||||
|
||||
when(client.getDatabase(anyString())).thenReturn(database); |
||||
when(codecRegistry.get(any(Class.class))).thenReturn(new BsonValueCodec()); |
||||
when(database.getCodecRegistry()).thenReturn(codecRegistry); |
||||
when(database.getCollection(anyString(), any())).thenReturn(collection); |
||||
when(database.listCollectionNames(any(ClientSession.class))).thenReturn(mongoIterable); |
||||
when(collection.find(any(ClientSession.class), any(), any())).thenReturn(findIterable); |
||||
when(collection.aggregate(any(ClientSession.class), anyList(), any())).thenReturn(aggregateIterable); |
||||
when(collection.distinct(any(ClientSession.class), any(), any(), any())).thenReturn(distinctIterable); |
||||
when(collection.mapReduce(any(ClientSession.class), any(), any(), any())).thenReturn(mapReduceIterable); |
||||
when(findIterable.iterator()).thenReturn(cursor); |
||||
when(aggregateIterable.collation(any())).thenReturn(aggregateIterable); |
||||
when(aggregateIterable.allowDiskUse(anyBoolean())).thenReturn(aggregateIterable); |
||||
when(aggregateIterable.batchSize(anyInt())).thenReturn(aggregateIterable); |
||||
when(aggregateIterable.map(any())).thenReturn(aggregateIterable); |
||||
when(aggregateIterable.useCursor(anyBoolean())).thenReturn(aggregateIterable); |
||||
when(aggregateIterable.into(any())).thenReturn(Collections.emptyList()); |
||||
when(mongoIterable.iterator()).thenReturn(cursor); |
||||
when(distinctIterable.map(any())).thenReturn(distinctIterable); |
||||
when(distinctIterable.into(any())).thenReturn(Collections.emptyList()); |
||||
when(mapReduceIterable.sort(any())).thenReturn(mapReduceIterable); |
||||
when(mapReduceIterable.filter(any())).thenReturn(mapReduceIterable); |
||||
when(mapReduceIterable.map(any())).thenReturn(mapReduceIterable); |
||||
when(mapReduceIterable.iterator()).thenReturn(cursor); |
||||
when(cursor.hasNext()).thenReturn(false); |
||||
when(findIterable.projection(any())).thenReturn(findIterable); |
||||
|
||||
factory = new SimpleMongoDbFactory(client, "foo"); |
||||
|
||||
this.mappingContext = new MongoMappingContext(); |
||||
this.converter = new MappingMongoConverter(new DefaultDbRefResolver(factory), mappingContext); |
||||
this.template = new SessionBoundMongoTemplate(clientSession, new MongoTemplate(factory, converter)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void executeUsesProxiedCollectionInCallback() { |
||||
|
||||
template.execute("collection", MongoCollection::find); |
||||
|
||||
verify(collection, never()).find(); |
||||
verify(collection).find(eq(clientSession)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void executeUsesProxiedDatabaseInCallback() { |
||||
|
||||
template.execute(MongoDatabase::listCollectionNames); |
||||
|
||||
verify(database, never()).listCollectionNames(); |
||||
verify(database).listCollectionNames(eq(clientSession)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void findOneUsesProxiedCollection() { |
||||
|
||||
template.findOne(new Query(), Person.class); |
||||
|
||||
verify(collection).find(eq(clientSession), any(), any()); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void findShouldUseProxiedCollection() { |
||||
|
||||
template.find(new Query(), Person.class); |
||||
|
||||
verify(collection).find(eq(clientSession), any(), any()); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void findAllShouldUseProxiedCollection() { |
||||
|
||||
template.findAll(Person.class); |
||||
|
||||
verify(collection).find(eq(clientSession), any(), any()); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void executeCommandShouldUseProxiedDatabase() { |
||||
|
||||
template.executeCommand("{}"); |
||||
|
||||
verify(database).runCommand(eq(clientSession), any(), any(Class.class)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void removeShouldUseProxiedCollection() { |
||||
|
||||
template.remove(new Query(), Person.class); |
||||
|
||||
verify(collection).deleteMany(eq(clientSession), any(), any(DeleteOptions.class)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void insertShouldUseProxiedCollection() { |
||||
|
||||
template.insert(new Person()); |
||||
|
||||
verify(collection).insertOne(eq(clientSession), any(Document.class)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void aggregateShouldUseProxiedCollection() { |
||||
|
||||
template.aggregate(Aggregation.newAggregation(Aggregation.project("foo")), COLLECTION_NAME, Person.class); |
||||
|
||||
verify(collection).aggregate(eq(clientSession), anyList(), eq(Document.class)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void aggregateStreamShouldUseProxiedCollection() { |
||||
|
||||
template.aggregateStream(Aggregation.newAggregation(Aggregation.project("foo")), COLLECTION_NAME, Person.class); |
||||
|
||||
verify(collection).aggregate(eq(clientSession), anyList(), eq(Document.class)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void collectionExistsShouldUseProxiedDatabase() { |
||||
|
||||
template.collectionExists(Person.class); |
||||
|
||||
verify(database).listCollectionNames(eq(clientSession)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void countShouldUseProxiedCollection() { |
||||
|
||||
template.count(new Query(), Person.class); |
||||
|
||||
verify(collection).count(eq(clientSession), any(), any(CountOptions.class)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void createCollectionShouldUseProxiedDatabase() { |
||||
|
||||
template.createCollection(Person.class); |
||||
|
||||
verify(database).createCollection(eq(clientSession), anyString(), any()); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void dropShouldUseProxiedCollection() { |
||||
|
||||
template.dropCollection(Person.class); |
||||
|
||||
verify(collection).drop(eq(clientSession)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void findAndModifyShouldUseProxiedCollection() { |
||||
|
||||
template.findAndModify(new Query(), new Update().set("foo", "bar"), Person.class); |
||||
|
||||
verify(collection).findOneAndUpdate(eq(clientSession), any(), any(), any(FindOneAndUpdateOptions.class)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void findDistinctShouldUseProxiedCollection() { |
||||
|
||||
template.findDistinct(new Query(), "firstName", Person.class, String.class); |
||||
|
||||
verify(collection).distinct(eq(clientSession), anyString(), any(), any()); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void geoNearShouldUseProxiedDatabase() { |
||||
|
||||
when(database.runCommand(any(ClientSession.class), any(), eq(Document.class))) |
||||
.thenReturn(new Document("results", Collections.emptyList())); |
||||
template.geoNear(NearQuery.near(new Point(0, 0), Metrics.NEUTRAL), Person.class); |
||||
|
||||
verify(database).runCommand(eq(clientSession), any(), eq(Document.class)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void groupShouldUseProxiedDatabase() { |
||||
|
||||
when(database.runCommand(any(ClientSession.class), any(), eq(Document.class))) |
||||
.thenReturn(new Document("retval", Collections.emptyList())); |
||||
|
||||
template.group(COLLECTION_NAME, GroupBy.key("firstName"), Person.class); |
||||
|
||||
verify(database).runCommand(eq(clientSession), any(), eq(Document.class)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void mapReduceShouldUseProxiedCollection() { |
||||
|
||||
template.mapReduce(COLLECTION_NAME, "foo", "bar", Person.class); |
||||
|
||||
verify(collection).mapReduce(eq(clientSession), anyString(), anyString(), eq(Document.class)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void streamShouldUseProxiedCollection() { |
||||
|
||||
template.stream(new Query(), Person.class); |
||||
|
||||
verify(collection).find(eq(clientSession), any(), eq(Document.class)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void updateFirstShouldUseProxiedCollection() { |
||||
|
||||
template.updateFirst(new Query(), Update.update("foo", "bar"), Person.class); |
||||
|
||||
verify(collection).updateOne(eq(clientSession), any(), any(), any(UpdateOptions.class)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void updateMultiShouldUseProxiedCollection() { |
||||
|
||||
template.updateMulti(new Query(), Update.update("foo", "bar"), Person.class); |
||||
|
||||
verify(collection).updateMany(eq(clientSession), any(), any(), any(UpdateOptions.class)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void upsertShouldUseProxiedCollection() { |
||||
|
||||
template.upsert(new Query(), Update.update("foo", "bar"), Person.class); |
||||
|
||||
verify(collection).updateOne(eq(clientSession), any(), any(), any(UpdateOptions.class)); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void getCollectionShouldShouldJustReturnTheCollection/*No ClientSession binding*/() { |
||||
assertThat(template.getCollection(COLLECTION_NAME)).isNotInstanceOf(Proxy.class); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void getDbShouldJustReturnTheDatabase/*No ClientSession binding*/() { |
||||
assertThat(template.getDb()).isNotInstanceOf(Proxy.class); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void indexOpsShouldUseProxiedCollection() { |
||||
|
||||
template.indexOps(COLLECTION_NAME).dropIndex("index-name"); |
||||
|
||||
verify(collection).dropIndex(eq(clientSession), eq("index-name")); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void bulkOpsShouldUseProxiedCollection() { |
||||
|
||||
BulkOperations bulkOps = template.bulkOps(BulkMode.ORDERED, COLLECTION_NAME); |
||||
bulkOps.insert(new Document()); |
||||
|
||||
bulkOps.execute(); |
||||
|
||||
verify(collection).bulkWrite(eq(clientSession), anyList(), any()); |
||||
} |
||||
|
||||
@Test // DATAMONGO-1880
|
||||
public void scriptOpsShouldUseProxiedDatabase() { |
||||
|
||||
when(database.runCommand(eq(clientSession), any())).thenReturn(new Document("retval", new Object())); |
||||
template.scriptOps().call("W-O-P-R"); |
||||
|
||||
verify(database).runCommand(eq(clientSession), any()); |
||||
} |
||||
} |
||||
@ -0,0 +1,79 @@
@@ -0,0 +1,79 @@
|
||||
[[mongo.sessions]] |
||||
= MongoDB Sessions |
||||
|
||||
As of version 3.6 MongoDB supports a concept of Sessions. The use of sessions enables MongoDBs https://docs.mongodb.com/manual/core/read-isolation-consistency-recency/#causal-consistency[Causal Consistency] model guaranteeing to execute operations in an order that respect their causal relationships. Those are split into ``ServerSession``s and ``ClientSession``s. In the following when we speak of session we refer to `ClientSession`. |
||||
|
||||
WARNING: Operations within a client session are not isolated from operations outside the session. |
||||
|
||||
Both `MongoOperations` and `ReactiveMongoOperations` provide gateway methods for tying a `ClientSession` to the operations themselves. Within the callback all operations on `MongoCollection` and `MongoDatabase` are called with the provided session via a `Proxy` without the need to add it manually. This means that a potential call to `MongoCollection#find()` is delegated to `MongoCollection#find(ClientSession)`. |
||||
|
||||
NOTE: Methods like `(Reactive)MongoOperations#getCollection` returning native MongoDB java driver gateway objects, such as `MongoCollection`, that themselves offer dedicated methods for `ClientSession` will *NOT* be wrapped by the `Proxy`. So please make sure to provide the `ClientSession` where needed when interacting directly with a `MongoCollection` or `MongoDatabase` and not via one of the `#excute` callbacks on `MongoOperations`. |
||||
|
||||
.ClientSession with MongoOperations. |
||||
==== |
||||
[source,java] |
||||
---- |
||||
ClientSessionOptions sessionOptions = ClientSessionOptions.builder() |
||||
.causallyConsistent(true) |
||||
.build(); |
||||
|
||||
ClientSession session = client.startSession(sessionOptions); <1> |
||||
|
||||
template.withSession(() -> session) |
||||
.execute(action -> { |
||||
|
||||
Query query = query(where("name").is("Durzo Blint")); |
||||
Person durzo = action.findOne(query, Person.class); <2> |
||||
|
||||
Person azoth = new Person("Kylar Stern"); |
||||
azoth.setMaster(durzo); |
||||
|
||||
action.insert(azoth); <2> |
||||
|
||||
return azoth; |
||||
}); |
||||
|
||||
session.close() <4> |
||||
---- |
||||
<1> Obtain a new session from the server. |
||||
<2> Use `MongoOperation` methods as before. The `ClientSession` gets applied automatically. |
||||
<3> Important! Do not forget to close the session. |
||||
==== |
||||
|
||||
WARNING: When dealing with ``DBRef``s, especially lazily loaded ones, it is essential to **not** close the `ClientSession` before all data is loaded. |
||||
|
||||
The reactive counterpart uses the very same building blocks as the imperative one. |
||||
|
||||
.ClientSession with ReactiveMongoOperations. |
||||
==== |
||||
[source,java] |
||||
---- |
||||
ClientSessionOptions sessionOptions = ClientSessionOptions.builder() |
||||
.causallyConsistent(true) |
||||
.build(); |
||||
|
||||
Publisher<ClientSession> session = client.startSession(sessionOptions); <1> |
||||
|
||||
template.withSession(session) |
||||
.execute(action -> { |
||||
|
||||
Query query = query(where("name").is("Durzo Blint")); |
||||
return action.findOne(query, Person.class) |
||||
.flatMap(durzo -> { |
||||
|
||||
Person azoth = new Person("Kylar Stern"); |
||||
azoth.setMaster(durzo); |
||||
|
||||
return action.insert(azoth); <2> |
||||
}); |
||||
}, ClientSession::close) <4> |
||||
.subscribe(); |
||||
---- |
||||
<1> Obtain a `Publisher` for new session retrieval. |
||||
<2> Use `MongoOperation` methods as before. The `ClientSession` is obtained and applied automatically. |
||||
<3> Important! Do not forget to close the session. |
||||
==== |
||||
|
||||
By using a `Publisher` providing the actual session you can defer session acquisition to the point of actual subscription. |
||||
Still you need to close the session when done in order to not pollute the server with stale sessions. Use the `doFinally` hook on `execute` to call `ClientSession#close()` when you don't need the session any more. |
||||
In case you prefer having more control over the session itself, you can always obtain the `ClientSession` via the driver and provide it via a `Supplier`. |
||||
Loading…
Reference in new issue