Browse Source

DATACMNS-836 - Add reactive repository support.

We now expose reactive interfaces to facilitate reactive repository support in store-specific modules. Spring Data modules are free to implement their reactive support using either RxJava 1 or Project Reactor (Reactive Streams). We expose a set of base interfaces:

* `ReactiveCrudRepository`
* `ReactiveSortingRepository`
* `RxJava1CrudRepository`
* `RxJava1SortingRepository`

Reactive repositories provide a similar feature coverage to blocking repositories. Reactive paging support is limited to a `Mono<Page>`/`Single<Page>`. Data is fetched in a deferred way to provide a paging experience similar to blocking paging.

A store module can choose either Project Reactor or RxJava 1 to implement reactive repository support. Project Reactor and RxJava types are converted in both directions allowing repositories to be composed of Project Reactor and RxJava 1 query methods. Reactive wrapper type conversion handles wrapper type conversion at repository level. Query/implementation method selection uses multi-pass candidate selection to invoke the most appropriate method (exact arguments, convertible wrappers, assignable arguments).

We also provide ReactiveWrappers to expose metadata about reactive types and their value multiplicity.
pull/185/head
Mark Paluch 10 years ago committed by Oliver Gierke
parent
commit
2e4d63fddd
  1. 9
      pom.xml
  2. 4
      src/main/java/org/springframework/data/repository/core/support/DefaultRepositoryInformation.java
  3. 29
      src/main/java/org/springframework/data/repository/core/support/QueryExecutionResultHandler.java
  4. 259
      src/main/java/org/springframework/data/repository/core/support/ReactiveRepositoryInformation.java
  5. 181
      src/main/java/org/springframework/data/repository/core/support/RepositoryFactorySupport.java
  6. 33
      src/main/java/org/springframework/data/repository/query/Parameter.java
  7. 267
      src/main/java/org/springframework/data/repository/query/ReactiveWrapperConverters.java
  8. 29
      src/main/java/org/springframework/data/repository/query/ResultProcessor.java
  9. 168
      src/main/java/org/springframework/data/repository/reactive/ReactiveCrudRepository.java
  10. 78
      src/main/java/org/springframework/data/repository/reactive/ReactivePagingAndSortingRepository.java
  11. 167
      src/main/java/org/springframework/data/repository/reactive/RxJavaCrudRepository.java
  12. 79
      src/main/java/org/springframework/data/repository/reactive/RxJavaPagingAndSortingRepository.java
  13. 339
      src/main/java/org/springframework/data/repository/util/QueryExecutionConverters.java
  14. 312
      src/test/java/org/springframework/data/repository/core/support/QueryExecutionResultHandlerUnitTests.java
  15. 6
      src/test/java/org/springframework/data/repository/core/support/QueryExecutorMethodInterceptorUnitTests.java
  16. 149
      src/test/java/org/springframework/data/repository/core/support/ReactiveRepositoryInformationUnitTests.java
  17. 133
      src/test/java/org/springframework/data/repository/core/support/ReactiveWrapperRepositoryFactorySupportUnitTests.java
  18. 29
      src/test/java/org/springframework/data/repository/query/ParametersUnitTests.java
  19. 158
      src/test/java/org/springframework/data/repository/query/ReactiveWrapperConvertersUnitTests.java
  20. 109
      src/test/java/org/springframework/data/repository/query/ResultProcessorUnitTests.java
  21. 47
      src/test/java/org/springframework/data/repository/util/QueryExecutionConvertersUnitTests.java

9
pom.xml

@ -105,6 +105,15 @@ @@ -105,6 +105,15 @@
<optional>true</optional>
</dependency>
<!-- RxJava -->
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>${rxjava}</version>
<optional>true</optional>
</dependency>
<!-- Querydsl -->
<dependency>

4
src/main/java/org/springframework/data/repository/core/support/DefaultRepositoryInformation.java

@ -336,7 +336,7 @@ class DefaultRepositoryInformation implements RepositoryInformation { @@ -336,7 +336,7 @@ class DefaultRepositoryInformation implements RepositoryInformation {
/**
* Checks the given method's parameters to match the ones of the given base class method. Matches generic arguments
* agains the ones bound in the given repository interface.
* against the ones bound in the given repository interface.
*
* @param method
* @param baseClassMethod
@ -381,7 +381,7 @@ class DefaultRepositoryInformation implements RepositoryInformation { @@ -381,7 +381,7 @@ class DefaultRepositoryInformation implements RepositoryInformation {
* @param parameterType must not be {@literal null}.
* @return
*/
private boolean matchesGenericType(TypeVariable<?> variable, ResolvableType parameterType) {
protected boolean matchesGenericType(TypeVariable<?> variable, ResolvableType parameterType) {
GenericDeclaration declaration = variable.getGenericDeclaration();

29
src/main/java/org/springframework/data/repository/core/support/QueryExecutionResultHandler.java

@ -28,6 +28,7 @@ import org.springframework.data.repository.util.QueryExecutionConverters; @@ -28,6 +28,7 @@ import org.springframework.data.repository.util.QueryExecutionConverters;
* Simple domain service to convert query results into a dedicated type.
*
* @author Oliver Gierke
* @author Mark Paluch
*/
class QueryExecutionResultHandler {
@ -50,25 +51,36 @@ class QueryExecutionResultHandler { @@ -50,25 +51,36 @@ class QueryExecutionResultHandler {
* Post-processes the given result of a query invocation to the given type.
*
* @param result can be {@literal null}.
* @param returnTypeDesciptor can be {@literal null}, if so, no conversion is performed.
* @param returnTypeDescriptor can be {@literal null}, if so, no conversion is performed.
* @return
*/
public Object postProcessInvocationResult(Object result, TypeDescriptor returnTypeDesciptor) {
public Object postProcessInvocationResult(Object result, TypeDescriptor returnTypeDescriptor) {
if (returnTypeDesciptor == null) {
if (returnTypeDescriptor == null) {
return result;
}
Class<?> expectedReturnType = returnTypeDesciptor.getType();
Class<?> expectedReturnType = returnTypeDescriptor.getType();
if (result != null && expectedReturnType.isInstance(result)) {
return result;
}
if (QueryExecutionConverters.supports(expectedReturnType)
&& conversionService.canConvert(WRAPPER_TYPE, returnTypeDesciptor)
&& !conversionService.canBypassConvert(WRAPPER_TYPE, TypeDescriptor.valueOf(expectedReturnType))) {
return conversionService.convert(new NullableWrapper(result), expectedReturnType);
if (QueryExecutionConverters.supports(expectedReturnType)) {
TypeDescriptor targetType = TypeDescriptor.valueOf(expectedReturnType);
if(conversionService.canConvert(WRAPPER_TYPE, returnTypeDescriptor)
&& !conversionService.canBypassConvert(WRAPPER_TYPE, targetType)) {
return conversionService.convert(new NullableWrapper(result), expectedReturnType);
}
if(result != null && conversionService.canConvert(TypeDescriptor.valueOf(result.getClass()), returnTypeDescriptor)
&& !conversionService.canBypassConvert(TypeDescriptor.valueOf(result.getClass()), targetType)) {
return conversionService.convert(result, expectedReturnType);
}
}
if (result != null) {
@ -82,4 +94,5 @@ class QueryExecutionResultHandler { @@ -82,4 +94,5 @@ class QueryExecutionResultHandler {
return null;
}
}

259
src/main/java/org/springframework/data/repository/core/support/ReactiveRepositoryInformation.java

@ -0,0 +1,259 @@ @@ -0,0 +1,259 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.repository.core.support;
import static org.springframework.core.GenericTypeResolver.*;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.function.BiPredicate;
import org.springframework.core.MethodParameter;
import org.springframework.core.convert.ConversionService;
import org.springframework.data.repository.core.RepositoryInformation;
import org.springframework.data.repository.core.RepositoryMetadata;
import org.springframework.data.repository.util.QueryExecutionConverters;
import org.springframework.util.Assert;
/**
* This {@link RepositoryInformation} uses a {@link ConversionService} to check whether method arguments can be
* converted for invocation of implementation methods.
*
* @author Mark Paluch
*/
public class ReactiveRepositoryInformation extends DefaultRepositoryInformation {
private final ConversionService conversionService;
/**
* Creates a new {@link ReactiveRepositoryInformation} for the given repository interface and repository base class
* using a {@link ConversionService}.
*
* @param metadata must not be {@literal null}.
* @param repositoryBaseClass must not be {@literal null}.
* @param customImplementationClass
* @param conversionService must not be {@literal null}.
*/
public ReactiveRepositoryInformation(RepositoryMetadata metadata, Class<?> repositoryBaseClass,
Class<?> customImplementationClass, ConversionService conversionService) {
super(metadata, repositoryBaseClass, customImplementationClass);
Assert.notNull(conversionService, "Conversion service must not be null!");
this.conversionService = conversionService;
}
/**
* Returns the given target class' method if the given method (declared in the repository interface) was also declared
* at the target class. Returns the given method if the given base class does not declare the method given. Takes
* generics into account.
*
* @param method must not be {@literal null}
* @param baseClass
* @return
*/
Method getTargetClassMethod(Method method, Class<?> baseClass) {
if (baseClass == null) {
return method;
}
boolean wantsWrappers = wantsMethodUsingReactiveWrapperParameters(method);
if (wantsWrappers) {
Method candidate = getMethodCandidate(method, baseClass, new ExactWrapperMatch(method));
if (candidate != null) {
return candidate;
}
candidate = getMethodCandidate(method, baseClass, new WrapperConversionMatch(method, conversionService));
if (candidate != null) {
return candidate;
}
}
Method candidate = getMethodCandidate(method, baseClass,
new MatchParameterOrComponentType(method, getRepositoryInterface()));
if (candidate != null) {
return candidate;
}
return method;
}
private boolean wantsMethodUsingReactiveWrapperParameters(Method method) {
boolean wantsWrappers = false;
for (Class<?> parameterType : method.getParameterTypes()) {
if (isNonunwrappingWrapper(parameterType)) {
wantsWrappers = true;
break;
}
}
return wantsWrappers;
}
private Method getMethodCandidate(Method method, Class<?> baseClass, BiPredicate<Class<?>, Integer> predicate) {
for (Method baseClassMethod : baseClass.getMethods()) {
// Wrong name
if (!method.getName().equals(baseClassMethod.getName())) {
continue;
}
// Wrong number of arguments
if (!(method.getParameterTypes().length == baseClassMethod.getParameterTypes().length)) {
continue;
}
// Check whether all parameters match
if (!parametersMatch(method, baseClassMethod, predicate)) {
continue;
}
return baseClassMethod;
}
return null;
}
/**
* Checks the given method's parameters to match the ones of the given base class method. Matches generic arguments
* against the ones bound in the given repository interface.
*
* @param method
* @param baseClassMethod
* @param predicate
* @return
*/
private boolean parametersMatch(Method method, Method baseClassMethod, BiPredicate<Class<?>, Integer> predicate) {
Type[] genericTypes = baseClassMethod.getGenericParameterTypes();
Class<?>[] types = baseClassMethod.getParameterTypes();
for (int i = 0; i < genericTypes.length; i++) {
if (!predicate.test(types[i], i)) {
return false;
}
}
return true;
}
/**
* Checks whether the type is a wrapper without unwrapping support. Reactive wrappers don't like to be unwrapped.
*
* @param parameterType
* @return
*/
static boolean isNonunwrappingWrapper(Class<?> parameterType) {
return QueryExecutionConverters.supports(parameterType)
&& !QueryExecutionConverters.supportsUnwrapping(parameterType);
}
static class WrapperConversionMatch implements BiPredicate<Class<?>, Integer> {
final Method declaredMethod;
final Class<?>[] declaredParameterTypes;
final ConversionService conversionService;
public WrapperConversionMatch(Method declaredMethod, ConversionService conversionService) {
this.declaredMethod = declaredMethod;
this.declaredParameterTypes = declaredMethod.getParameterTypes();
this.conversionService = conversionService;
}
@Override
public boolean test(Class<?> candidateParameterType, Integer index) {
// TODO: should check for component type
if (isNonunwrappingWrapper(candidateParameterType) && isNonunwrappingWrapper(declaredParameterTypes[index])) {
if (conversionService.canConvert(declaredParameterTypes[index], candidateParameterType)) {
return true;
}
}
return false;
}
}
static class ExactWrapperMatch implements BiPredicate<Class<?>, Integer> {
final Method declaredMethod;
final Class<?>[] declaredParameterTypes;
public ExactWrapperMatch(Method declaredMethod) {
this.declaredMethod = declaredMethod;
this.declaredParameterTypes = declaredMethod.getParameterTypes();
}
@Override
public boolean test(Class<?> candidateParameterType, Integer index) {
// TODO: should check for component type
if (isNonunwrappingWrapper(candidateParameterType) && isNonunwrappingWrapper(declaredParameterTypes[index])) {
if (declaredParameterTypes[index].isAssignableFrom(candidateParameterType)) {
return true;
}
}
return false;
}
}
static class MatchParameterOrComponentType implements BiPredicate<Class<?>, Integer> {
final Method declaredMethod;
final Class<?>[] declaredParameterTypes;
final Class<?> repositoryInterface;
public MatchParameterOrComponentType(Method declaredMethod, Class<?> repositoryInterface) {
this.declaredMethod = declaredMethod;
this.declaredParameterTypes = declaredMethod.getParameterTypes();
this.repositoryInterface = repositoryInterface;
}
@Override
public boolean test(Class<?> candidateParameterType, Integer index) {
MethodParameter parameter = new MethodParameter(declaredMethod, index);
Class<?> parameterType = resolveParameterType(parameter, repositoryInterface);
if (!candidateParameterType.isAssignableFrom(parameterType)
|| !candidateParameterType.equals(declaredParameterTypes[index])) {
return false;
}
return true;
}
}
}

181
src/main/java/org/springframework/data/repository/core/support/RepositoryFactorySupport.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2008-2015 the original author or authors.
* Copyright 2008-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -35,6 +35,7 @@ import org.springframework.beans.factory.BeanFactory; @@ -35,6 +35,7 @@ import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.core.GenericTypeResolver;
import org.springframework.core.MethodParameter;
import org.springframework.core.convert.ConversionService;
import org.springframework.core.convert.TypeDescriptor;
import org.springframework.data.projection.DefaultMethodInvokingMethodInterceptor;
import org.springframework.data.projection.SpelAwareProxyProjectionFactory;
@ -60,6 +61,7 @@ import org.springframework.util.ObjectUtils; @@ -60,6 +61,7 @@ import org.springframework.util.ObjectUtils;
* detection strategy can be configured by setting {@link QueryLookupStrategy.Key}.
*
* @author Oliver Gierke
* @author Mark Paluch
*/
public abstract class RepositoryFactorySupport implements BeanClassLoaderAware, BeanFactoryAware {
@ -77,6 +79,7 @@ public abstract class RepositoryFactorySupport implements BeanClassLoaderAware, @@ -77,6 +79,7 @@ public abstract class RepositoryFactorySupport implements BeanClassLoaderAware,
private ClassLoader classLoader = org.springframework.util.ClassUtils.getDefaultClassLoader();
private EvaluationContextProvider evaluationContextProvider = DefaultEvaluationContextProvider.INSTANCE;
private BeanFactory beanFactory;
private ConversionService conversionService;
private QueryCollectingQueryCreationListener collectingListener = new QueryCollectingQueryCreationListener();
@ -102,6 +105,16 @@ public abstract class RepositoryFactorySupport implements BeanClassLoaderAware, @@ -102,6 +105,16 @@ public abstract class RepositoryFactorySupport implements BeanClassLoaderAware,
this.namedQueries = namedQueries == null ? PropertiesBasedNamedQueries.EMPTY : namedQueries;
}
/**
* Configures a {@link ConversionService} instance to convert method parameters when calling implementation methods on
* the base class or a custom implementation.
*
* @param conversionService the conversionService to set.
*/
public void setConversionService(ConversionService conversionService) {
this.conversionService = conversionService;
}
/*
* (non-Javadoc)
* @see org.springframework.beans.factory.BeanClassLoaderAware#setBeanClassLoader(java.lang.ClassLoader)
@ -217,7 +230,14 @@ public abstract class RepositoryFactorySupport implements BeanClassLoaderAware, @@ -217,7 +230,14 @@ public abstract class RepositoryFactorySupport implements BeanClassLoaderAware,
result.addAdvice(new DefaultMethodInvokingMethodInterceptor());
}
result.addAdvice(new QueryExecutorMethodInterceptor(information, customImplementation, target));
result.addAdvice(new QueryExecutorMethodInterceptor(information));
if (conversionService == null) {
result.addAdvice(new ImplementationMethodExecutionInterceptor(information, customImplementation, target));
} else {
result.addAdvice(new ConvertingImplementationMethodExecutionInterceptor(information, customImplementation, target,
conversionService));
}
return (T) result.getProxy(classLoader);
}
@ -252,7 +272,17 @@ public abstract class RepositoryFactorySupport implements BeanClassLoaderAware, @@ -252,7 +272,17 @@ public abstract class RepositoryFactorySupport implements BeanClassLoaderAware,
Class<?> repositoryBaseClass = this.repositoryBaseClass == null ? getRepositoryBaseClass(metadata)
: this.repositoryBaseClass;
repositoryInformation = new DefaultRepositoryInformation(metadata, repositoryBaseClass, customImplementationClass);
if (conversionService == null) {
repositoryInformation = new DefaultRepositoryInformation(metadata, repositoryBaseClass,
customImplementationClass);
} else {
// TODO: Not sure this is the best idea but at some point we need to distinguish between
// methods that want to get unwrapped data from wrapped parameters and those which are
// just fine with wrappers because at some point a converting interceptor kicks in
repositoryInformation = new ReactiveRepositoryInformation(metadata, repositoryBaseClass,
customImplementationClass, conversionService);
}
repositoryInformationCache.put(cacheKey, repositoryInformation);
return repositoryInformation;
}
@ -390,25 +420,15 @@ public abstract class RepositoryFactorySupport implements BeanClassLoaderAware, @@ -390,25 +420,15 @@ public abstract class RepositoryFactorySupport implements BeanClassLoaderAware,
private final Map<Method, RepositoryQuery> queries = new ConcurrentHashMap<Method, RepositoryQuery>();
private final Object customImplementation;
private final RepositoryInformation repositoryInformation;
private final QueryExecutionResultHandler resultHandler;
private final Object target;
/**
* Creates a new {@link QueryExecutorMethodInterceptor}. Builds a model of {@link QueryMethod}s to be invoked on
* execution of repository interface methods.
*/
public QueryExecutorMethodInterceptor(RepositoryInformation repositoryInformation, Object customImplementation,
Object target) {
Assert.notNull(repositoryInformation, "RepositoryInformation must not be null!");
Assert.notNull(target, "Target must not be null!");
public QueryExecutorMethodInterceptor(RepositoryInformation repositoryInformation) {
this.resultHandler = new QueryExecutionResultHandler();
this.repositoryInformation = repositoryInformation;
this.customImplementation = customImplementation;
this.target = target;
QueryLookupStrategy lookupStrategy = getQueryLookupStrategy(queryLookupStrategyKey,
RepositoryFactorySupport.this.evaluationContextProvider);
@ -472,16 +492,66 @@ public abstract class RepositoryFactorySupport implements BeanClassLoaderAware, @@ -472,16 +492,66 @@ public abstract class RepositoryFactorySupport implements BeanClassLoaderAware,
Method method = invocation.getMethod();
Object[] arguments = invocation.getArguments();
if (hasQueryFor(method)) {
return queries.get(method).execute(arguments);
}
return invocation.proceed();
}
/**
* Returns whether we know of a query to execute for the given {@link Method};
*
* @param method
* @return
*/
private boolean hasQueryFor(Method method) {
return queries.containsKey(method);
}
}
/**
* Method interceptor that calls methods on either the base implementation or the custom repository implementation.
*
* @author Mark Paluch
*/
public class ImplementationMethodExecutionInterceptor implements MethodInterceptor {
private final Object customImplementation;
private final RepositoryInformation repositoryInformation;
private final Object target;
/**
* Creates a new {@link QueryExecutorMethodInterceptor}. Builds a model of {@link QueryMethod}s to be invoked on
* execution of repository interface methods.
*/
public ImplementationMethodExecutionInterceptor(RepositoryInformation repositoryInformation,
Object customImplementation, Object target) {
Assert.notNull(repositoryInformation, "RepositoryInformation must not be null!");
Assert.notNull(target, "Target must not be null!");
this.repositoryInformation = repositoryInformation;
this.customImplementation = customImplementation;
this.target = target;
}
/* (non-Javadoc)
* @see org.aopalliance.intercept.MethodInterceptor#invoke(org.aopalliance.intercept.MethodInvocation)
*/
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
Method method = invocation.getMethod();
Object[] arguments = invocation.getArguments();
if (isCustomMethodInvocation(invocation)) {
Method actualMethod = repositoryInformation.getTargetClassMethod(method);
return executeMethodOn(customImplementation, actualMethod, arguments);
}
if (hasQueryFor(method)) {
return queries.get(method).execute(arguments);
}
// Lookup actual method as it might be redeclared in the interface
// and we have to use the repository instance nevertheless
Method actualMethod = repositoryInformation.getTargetClassMethod(method);
@ -497,7 +567,7 @@ public abstract class RepositoryFactorySupport implements BeanClassLoaderAware, @@ -497,7 +567,7 @@ public abstract class RepositoryFactorySupport implements BeanClassLoaderAware,
* @return
* @throws Throwable
*/
private Object executeMethodOn(Object target, Method method, Object[] parameters) throws Throwable {
protected Object executeMethodOn(Object target, Method method, Object[] parameters) throws Throwable {
try {
return method.invoke(target, parameters);
@ -508,16 +578,6 @@ public abstract class RepositoryFactorySupport implements BeanClassLoaderAware, @@ -508,16 +578,6 @@ public abstract class RepositoryFactorySupport implements BeanClassLoaderAware,
throw new IllegalStateException("Should not occur!");
}
/**
* Returns whether we know of a query to execute for the given {@link Method};
*
* @param method
* @return
*/
private boolean hasQueryFor(Method method) {
return queries.containsKey(method);
}
/**
* Returns whether the given {@link MethodInvocation} is considered to be targeted as an invocation of a custom
* method.
@ -535,6 +595,69 @@ public abstract class RepositoryFactorySupport implements BeanClassLoaderAware, @@ -535,6 +595,69 @@ public abstract class RepositoryFactorySupport implements BeanClassLoaderAware,
}
}
/**
* Method interceptor that converts parameters before invoking a method.
*
* @author Mark Paluch
*/
public class ConvertingImplementationMethodExecutionInterceptor extends ImplementationMethodExecutionInterceptor {
private final ConversionService conversionService;
/**
* @param repositoryInformation
* @param customImplementation
* @param target
* @param conversionService
*/
public ConvertingImplementationMethodExecutionInterceptor(RepositoryInformation repositoryInformation,
Object customImplementation, Object target, ConversionService conversionService) {
super(repositoryInformation, customImplementation, target);
this.conversionService = conversionService;
}
/* (non-Javadoc)
* @see org.springframework.data.repository.core.support.RepositoryFactorySupport.ImplementationMethodExecutionInterceptor#executeMethodOn(java.lang.Object, java.lang.reflect.Method, java.lang.Object[])
*/
@Override
protected Object executeMethodOn(Object target, Method method, Object[] parameters) throws Throwable {
return super.executeMethodOn(target, method, convertParameters(method.getParameterTypes(), parameters));
}
/**
* @param parameterTypes
* @param parameters
* @return
*/
private Object[] convertParameters(Class<?>[] parameterTypes, Object[] parameters) {
if (parameters.length == 0) {
return parameters;
}
Object[] result = new Object[parameters.length];
for (int i = 0; i < parameters.length; i++) {
if (parameters[i] == null) {
continue;
}
if (parameterTypes[i].isAssignableFrom(parameters[i].getClass())
|| !conversionService.canConvert(parameters[i].getClass(), parameterTypes[i])) {
result[i] = parameters[i];
} else {
result[i] = conversionService.convert(parameters[i], parameterTypes[i]);
}
}
return result;
}
}
/**
* {@link QueryCreationListener} collecting the {@link QueryMethod}s created for all query methods of the repository
* interface.

33
src/main/java/org/springframework/data/repository/query/Parameter.java

@ -34,6 +34,7 @@ import org.springframework.util.Assert; @@ -34,6 +34,7 @@ import org.springframework.util.Assert;
* Class to abstract a single parameter of a query method. It is held in the context of a {@link Parameters} instance.
*
* @author Oliver Gierke
* @author Mark Paluch
*/
public class Parameter {
@ -205,7 +206,30 @@ public class Parameter { @@ -205,7 +206,30 @@ public class Parameter {
}
/**
* Returns the component type if the given {@link MethodParameter} is a wrapper type.
* Returns whether the {@link MethodParameter} is wrapped in a wrapper type.
*
* @param parameter must not be {@literal null}.
* @return
* @see QueryExecutionConverters
*/
private static boolean isWrapped(MethodParameter parameter) {
return QueryExecutionConverters.supports(parameter.getParameterType());
}
/**
* Returns whether the {@link MethodParameter} should be unwrapped.
*
* @param parameter must not be {@literal null}.
* @return
* @see QueryExecutionConverters
*/
private static boolean shouldUnwrap(MethodParameter parameter) {
return QueryExecutionConverters.supportsUnwrapping(parameter.getParameterType());
}
/**
* Returns the component type if the given {@link MethodParameter} is a wrapper type and the wrapper should be
* unwrapped.
*
* @param parameter must not be {@literal null}.
* @return
@ -214,7 +238,10 @@ public class Parameter { @@ -214,7 +238,10 @@ public class Parameter {
Class<?> originalType = parameter.getParameterType();
return QueryExecutionConverters.supports(originalType)
? ResolvableType.forMethodParameter(parameter).getGeneric(0).getRawClass() : originalType;
if (isWrapped(parameter) && shouldUnwrap(parameter)) {
return ResolvableType.forMethodParameter(parameter).getGeneric(0).getRawClass();
}
return originalType;
}
}

267
src/main/java/org/springframework/data/repository/query/ReactiveWrapperConverters.java

@ -0,0 +1,267 @@ @@ -0,0 +1,267 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.repository.query;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Predicate;
import org.reactivestreams.Publisher;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.convert.support.GenericConversionService;
import org.springframework.data.repository.util.QueryExecutionConverters;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.Single;
/**
* Conversion support for reactive wrapper types.
*
* @author Mark Paluch
* @since 2.0
*/
public abstract class ReactiveWrapperConverters {
private static final boolean PROJECT_REACTOR_PRESENT = ClassUtils.isPresent("reactor.core.converter.DependencyUtils",
QueryExecutionConverters.class.getClassLoader());
private static final boolean RXJAVA_SINGLE_PRESENT = ClassUtils.isPresent("rx.Single",
QueryExecutionConverters.class.getClassLoader());
private static final boolean RXJAVA_OBSERVABLE_PRESENT = ClassUtils.isPresent("rx.Observable",
QueryExecutionConverters.class.getClassLoader());
private static final List<AbstractReactiveWrapper<?>> REACTIVE_WRAPPERS = new ArrayList<>();
private static final GenericConversionService GENERIC_CONVERSION_SERVICE = new GenericConversionService();
static {
if (PROJECT_REACTOR_PRESENT) {
REACTIVE_WRAPPERS.add(FluxWrapper.INSTANCE);
REACTIVE_WRAPPERS.add(MonoWrapper.INSTANCE);
REACTIVE_WRAPPERS.add(PublisherWrapper.INSTANCE);
}
if (RXJAVA_SINGLE_PRESENT) {
REACTIVE_WRAPPERS.add(SingleWrapper.INSTANCE);
}
if (RXJAVA_OBSERVABLE_PRESENT) {
REACTIVE_WRAPPERS.add(ObservableWrapper.INSTANCE);
}
QueryExecutionConverters.registerConvertersIn(GENERIC_CONVERSION_SERVICE);
}
private ReactiveWrapperConverters() {
}
/**
* Returns whether the given type is a supported wrapper type.
*
* @param type must not be {@literal null}.
* @return
*/
public static boolean supports(Class<?> type) {
return assignableStream(type).isPresent();
}
/**
* Returns whether the type is a single-like wrapper.
*
* @param type must not be {@literal null}.
* @return
* @see Single
* @see Mono
*/
public static boolean isSingleLike(Class<?> type) {
return assignableStream(type).map(wrapper -> wrapper.getMultiplicity() == Multiplicity.ONE).orElse(false);
}
/**
* Returns whether the type is a collection/multi-element-like wrapper.
*
* @param type must not be {@literal null}.
* @return
* @see Observable
* @see Flux
* @see Publisher
*/
public static boolean isCollectionLike(Class<?> type) {
return assignableStream(type).map(wrapper -> wrapper.getMultiplicity() == Multiplicity.MANY).orElse(false);
}
/**
* Casts or converts the given wrapper type into a different wrapper type.
*
* @param stream the stream, must not be {@literal null}.
* @param expectedWrapperType must not be {@literal null}.
* @return
*/
public static <T> T toWrapper(Object stream, Class<? extends T> expectedWrapperType) {
Assert.notNull(stream, "Stream must not be null!");
Assert.notNull(expectedWrapperType, "Converter must not be null!");
if (expectedWrapperType.isAssignableFrom(stream.getClass())) {
return (T) stream;
}
return GENERIC_CONVERSION_SERVICE.convert(stream, expectedWrapperType);
}
/**
* Maps elements of a reactive element stream to other elements.
*
* @param stream must not be {@literal null}.
* @param converter must not be {@literal null}.
* @return
*/
public static <T> T map(Object stream, Converter<Object, Object> converter) {
Assert.notNull(stream, "Stream must not be null!");
Assert.notNull(converter, "Converter must not be null!");
for (AbstractReactiveWrapper<?> reactiveWrapper : REACTIVE_WRAPPERS) {
if (ClassUtils.isAssignable(reactiveWrapper.getWrapperClass(), stream.getClass())) {
return (T) reactiveWrapper.map(stream, converter);
}
}
throw new IllegalStateException(String.format("Cannot apply converter to %s", stream));
}
private static Optional<AbstractReactiveWrapper<?>> assignableStream(Class<?> type) {
Assert.notNull(type, "Type must not be null!");
return findWrapper(wrapper -> ClassUtils.isAssignable(wrapper.getWrapperClass(), type));
}
private static Optional<AbstractReactiveWrapper<?>> findWrapper(
Predicate<? super AbstractReactiveWrapper<?>> predicate) {
return REACTIVE_WRAPPERS.stream().filter(predicate).findFirst();
}
private abstract static class AbstractReactiveWrapper<T> {
private final Class<? super T> wrapperClass;
private final Multiplicity multiplicity;
public AbstractReactiveWrapper(Class<? super T> wrapperClass, Multiplicity multiplicity) {
this.wrapperClass = wrapperClass;
this.multiplicity = multiplicity;
}
public Class<? super T> getWrapperClass() {
return wrapperClass;
}
public Multiplicity getMultiplicity() {
return multiplicity;
}
public abstract Object map(Object wrapper, Converter<Object, Object> converter);
}
private static class MonoWrapper extends AbstractReactiveWrapper<Mono<?>> {
static final MonoWrapper INSTANCE = new MonoWrapper();
private MonoWrapper() {
super(Mono.class, Multiplicity.ONE);
}
public Mono<?> map(Object wrapper, Converter<Object, Object> converter) {
return ((Mono<?>) wrapper).map(converter::convert);
}
}
private static class FluxWrapper extends AbstractReactiveWrapper<Flux<?>> {
static final FluxWrapper INSTANCE = new FluxWrapper();
private FluxWrapper() {
super(Flux.class, Multiplicity.MANY);
}
public Flux<?> map(Object wrapper, Converter<Object, Object> converter) {
return ((Flux<?>) wrapper).map(converter::convert);
}
}
private static class PublisherWrapper extends AbstractReactiveWrapper<Publisher<?>> {
static final PublisherWrapper INSTANCE = new PublisherWrapper();
public PublisherWrapper() {
super(Publisher.class, Multiplicity.MANY);
}
@Override
public Publisher<?> map(Object wrapper, Converter<Object, Object> converter) {
if (wrapper instanceof Mono) {
return MonoWrapper.INSTANCE.map((Mono<?>) wrapper, converter);
}
if (wrapper instanceof Flux) {
return FluxWrapper.INSTANCE.map((Flux<?>) wrapper, converter);
}
return FluxWrapper.INSTANCE.map(Flux.from((Publisher<?>) wrapper), converter);
}
}
private static class SingleWrapper extends AbstractReactiveWrapper<Single<?>> {
static final SingleWrapper INSTANCE = new SingleWrapper();
private SingleWrapper() {
super(Single.class, Multiplicity.ONE);
}
@Override
public Single<?> map(Object wrapper, Converter<Object, Object> converter) {
return ((Single<?>) wrapper).map(converter::convert);
}
}
private static class ObservableWrapper extends AbstractReactiveWrapper<Observable<?>> {
static final ObservableWrapper INSTANCE = new ObservableWrapper();
private ObservableWrapper() {
super(Observable.class, Multiplicity.MANY);
}
@Override
public Observable<?> map(Object wrapper, Converter<Object, Object> converter) {
return ((Observable<?>) wrapper).map(converter::convert);
}
}
private enum Multiplicity {
ONE, MANY,
}
}

29
src/main/java/org/springframework/data/repository/query/ResultProcessor.java

@ -36,11 +36,12 @@ import org.springframework.data.util.ReflectionUtils; @@ -36,11 +36,12 @@ import org.springframework.data.util.ReflectionUtils;
import org.springframework.util.Assert;
/**
* A {@link ResultProcessor} to expose metadata about query result element projection and eventually post prcessing raw
* A {@link ResultProcessor} to expose metadata about query result element projection and eventually post processing raw
* query results into projections and data transfer objects.
*
*
* @author Oliver Gierke
* @author John Blum
* @author Mark Paluch
* @since 1.12
*/
public class ResultProcessor {
@ -53,7 +54,7 @@ public class ResultProcessor { @@ -53,7 +54,7 @@ public class ResultProcessor {
/**
* Creates a new {@link ResultProcessor} from the given {@link QueryMethod} and {@link ProjectionFactory}.
*
*
* @param method must not be {@literal null}.
* @param factory must not be {@literal null}.
*/
@ -63,7 +64,7 @@ public class ResultProcessor { @@ -63,7 +64,7 @@ public class ResultProcessor {
/**
* Creates a new {@link ResultProcessor} for the given {@link QueryMethod}, {@link ProjectionFactory} and type.
*
*
* @param method must not be {@literal null}.
* @param factory must not be {@literal null}.
* @param type must not be {@literal null}.
@ -82,7 +83,7 @@ public class ResultProcessor { @@ -82,7 +83,7 @@ public class ResultProcessor {
/**
* Returns a new {@link ResultProcessor} with a new projection type obtained from the given {@link ParameterAccessor}.
*
*
* @param accessor can be {@literal null}.
* @return
*/
@ -99,7 +100,7 @@ public class ResultProcessor { @@ -99,7 +100,7 @@ public class ResultProcessor {
/**
* Returns the {@link ReturnedType}.
*
*
* @return
*/
public ReturnedType getReturnedType() {
@ -108,7 +109,7 @@ public class ResultProcessor { @@ -108,7 +109,7 @@ public class ResultProcessor {
/**
* Post-processes the given query result.
*
*
* @param source can be {@literal null}.
* @return
*/
@ -119,7 +120,7 @@ public class ResultProcessor { @@ -119,7 +120,7 @@ public class ResultProcessor {
/**
* Post-processes the given query result using the given preparing {@link Converter} to potentially prepare collection
* elements.
*
*
* @param source can be {@literal null}.
* @param preparingConverter must not be {@literal null}.
* @return
@ -156,6 +157,10 @@ public class ResultProcessor { @@ -156,6 +157,10 @@ public class ResultProcessor {
return (T) new StreamQueryResultHandler(type, converter).handle(source);
}
if(ReactiveWrapperConverters.supports(source.getClass())){
return (T) ReactiveWrapperConverters.map(source, o -> type.isInstance(o) ? o : converter.convert(o));
}
return (T) converter.convert(source);
}
@ -184,7 +189,7 @@ public class ResultProcessor { @@ -184,7 +189,7 @@ public class ResultProcessor {
/**
* Returns a new {@link ChainingConverter} that hands the elements resulting from the current conversion to the
* given {@link Converter}.
*
*
* @param converter must not be {@literal null}.
* @return
*/
@ -203,7 +208,7 @@ public class ResultProcessor { @@ -203,7 +208,7 @@ public class ResultProcessor {
});
}
/*
/*
* (non-Javadoc)
* @see org.springframework.core.convert.converter.Converter#convert(java.lang.Object)
*/
@ -223,7 +228,7 @@ public class ResultProcessor { @@ -223,7 +228,7 @@ public class ResultProcessor {
INSTANCE;
/*
/*
* (non-Javadoc)
* @see org.springframework.core.convert.converter.Converter#convert(java.lang.Object)
*/
@ -240,7 +245,7 @@ public class ResultProcessor { @@ -240,7 +245,7 @@ public class ResultProcessor {
private final @NonNull ProjectionFactory factory;
private final ConversionService conversionService = new DefaultConversionService();
/*
/*
* (non-Javadoc)
* @see org.springframework.core.convert.converter.Converter#convert(java.lang.Object)
*/

168
src/main/java/org/springframework/data/repository/reactive/ReactiveCrudRepository.java

@ -0,0 +1,168 @@ @@ -0,0 +1,168 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.repository.reactive;
import java.io.Serializable;
import org.reactivestreams.Publisher;
import org.springframework.data.repository.NoRepositoryBean;
import org.springframework.data.repository.Repository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* Interface for generic CRUD operations on a repository for a specific type. This repository follows reactive paradigms
* and uses Project Reactor types which are built on top of Reactive Streams.
*
* @author Mark Paluch
* @see Mono
* @see Flux
* @see ReactiveStreamsCrudRepository
*/
@NoRepositoryBean
public interface ReactiveCrudRepository<T, ID extends Serializable> extends Repository<T, ID> {
/**
* Saves a given entity. Use the returned instance for further operations as the save operation might have changed the
* entity instance completely.
*
* @param entity
* @return the saved entity
*/
<S extends T> Mono<S> save(S entity);
/**
* Saves all given entities.
*
* @param entities must not be {@literal null}.
* @return the saved entities
* @throws IllegalArgumentException in case the given entity is {@literal null}.
*/
<S extends T> Flux<S> save(Iterable<S> entities);
/**
* Saves all given entities.
*
* @param entityStream must not be {@literal null}.
* @return the saved entities
* @throws IllegalArgumentException in case the given {@code Publisher} is {@literal null}.
*/
<S extends T> Flux<S> save(Publisher<S> entityStream);
/**
* Retrieves an entity by its id.
*
* @param id must not be {@literal null}.
* @return the entity with the given id or {@literal null} if none found
* @throws IllegalArgumentException if {@code id} is {@literal null}
*/
Mono<T> findOne(ID id);
/**
* Retrieves an entity by its id supplied by a {@link Mono}.
*
* @param id must not be {@literal null}.
* @return the entity with the given id or {@literal null} if none found
* @throws IllegalArgumentException if {@code id} is {@literal null}
*/
Mono<T> findOne(Mono<ID> id);
/**
* Returns whether an entity with the given id exists.
*
* @param id must not be {@literal null}.
* @return true if an entity with the given id exists, {@literal false} otherwise
* @throws IllegalArgumentException if {@code id} is {@literal null}
*/
Mono<Boolean> exists(ID id);
/**
* Returns whether an entity with the given id exists supplied by a {@link Mono}.
*
* @param id must not be {@literal null}.
* @return true if an entity with the given id exists, {@literal false} otherwise
* @throws IllegalArgumentException if {@code id} is {@literal null}
*/
Mono<Boolean> exists(Mono<ID> id);
/**
* Returns all instances of the type.
*
* @return all entities
*/
Flux<T> findAll();
/**
* Returns all instances of the type with the given IDs.
*
* @param ids
* @return
*/
Flux<T> findAll(Iterable<ID> ids);
/**
* Returns all instances of the type with the given IDs.
*
* @param idStream
* @return
*/
Flux<T> findAll(Publisher<ID> idStream);
/**
* Returns the number of entities available.
*
* @return the number of entities
*/
Mono<Long> count();
/**
* Deletes the entity with the given id.
*
* @param id must not be {@literal null}.
* @throws IllegalArgumentException in case the given {@code id} is {@literal null}
*/
Mono<Void> delete(ID id);
/**
* Deletes a given entity.
*
* @param entity
* @throws IllegalArgumentException in case the given entity is {@literal null}.
*/
Mono<Void> delete(T entity);
/**
* Deletes the given entities.
*
* @param entities
* @throws IllegalArgumentException in case the given {@link Iterable} is {@literal null}.
*/
Mono<Void> delete(Iterable<? extends T> entities);
/**
* Deletes the given entities.
*
* @param entityStream
* @throws IllegalArgumentException in case the given {@link Publisher} is {@literal null}.
*/
Mono<Void> delete(Publisher<? extends T> entityStream);
/**
* Deletes all entities managed by the repository.
*/
Mono<Void> deleteAll();
}

78
src/main/java/org/springframework/data/repository/reactive/ReactivePagingAndSortingRepository.java

@ -0,0 +1,78 @@ @@ -0,0 +1,78 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.repository.reactive;
import java.io.Serializable;
import org.reactivestreams.Publisher;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.repository.NoRepositoryBean;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* Extension of {@link ReactiveCrudRepository} to provide additional methods to retrieve entities using the pagination
* and sorting abstraction.
*
* @author Mark Paluch
* @see Sort
* @see Pageable
* @see Mono
* @see Flux
*/
@NoRepositoryBean
public interface ReactivePagingAndSortingRepository<T, ID extends Serializable> extends ReactiveCrudRepository<T, ID> {
/*
* (non-Javadoc)
* @see org.springframework.data.repository.reactive.ReactiveCrudRepository#findAll()
*/
@Override
Flux<T> findAll();
/*
* (non-Javadoc)
* @see org.springframework.data.repository.reactive.ReactiveCrudRepository#findAll(java.lang.Iterable)
*/
@Override
Flux<T> findAll(Iterable<ID> ids);
/*
* (non-Javadoc)
* @see org.springframework.data.repository.reactive.ReactiveCrudRepository#findAll(org.reactivestreams.Publisher)
*/
@Override
Flux<T> findAll(Publisher<ID> idStream);
/**
* Returns all entities sorted by the given options.
*
* @param sort
* @return all entities sorted by the given options
*/
Flux<T> findAll(Sort sort);
/**
* Returns a {@link Page} of entities meeting the paging restriction provided in the {@code Pageable} object.
*
* @param pageable
* @return a page of entities
*/
Mono<Page<T>> findAll(Pageable pageable);
}

167
src/main/java/org/springframework/data/repository/reactive/RxJavaCrudRepository.java

@ -0,0 +1,167 @@ @@ -0,0 +1,167 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.repository.reactive;
import java.io.Serializable;
import org.reactivestreams.Publisher;
import org.springframework.data.repository.NoRepositoryBean;
import org.springframework.data.repository.Repository;
import rx.Observable;
import rx.Single;
/**
* Interface for generic CRUD operations on a repository for a specific type. This repository follows reactive paradigms
* and uses RxJava types.
*
* @author Mark Paluch
* @see Single
* @see Observable
*/
@NoRepositoryBean
public interface RxJavaCrudRepository<T, ID extends Serializable> extends Repository<T, ID> {
/**
* Saves a given entity. Use the returned instance for further operations as the save operation might have changed the
* entity instance completely.
*
* @param entity
* @return the saved entity
*/
<S extends T> Single<S> save(S entity);
/**
* Saves all given entities.
*
* @param entities must not be {@literal null}.
* @return the saved entities
* @throws IllegalArgumentException in case the given entity is {@literal null}.
*/
<S extends T> Observable<S> save(Iterable<S> entities);
/**
* Saves all given entities.
*
* @param entityStream must not be {@literal null}.
* @return the saved entities
* @throws IllegalArgumentException in case the given {@code Publisher} is {@literal null}.
*/
<S extends T> Observable<S> save(Observable<S> entityStream);
/**
* Retrieves an entity by its id.
*
* @param id must not be {@literal null}.
* @return the entity with the given id or {@literal null} if none found
* @throws IllegalArgumentException if {@code id} is {@literal null}
*/
Single<T> findOne(ID id);
/**
* Retrieves an entity by its id supplied by a {@link Single}.
*
* @param id must not be {@literal null}.
* @return the entity with the given id or {@literal null} if none found
* @throws IllegalArgumentException if {@code id} is {@literal null}
*/
Single<T> findOne(Single<ID> id);
/**
* Returns whether an entity with the given id exists.
*
* @param id must not be {@literal null}.
* @return true if an entity with the given id exists, {@literal false} otherwise
* @throws IllegalArgumentException if {@code id} is {@literal null}
*/
Single<Boolean> exists(ID id);
/**
* Returns whether an entity with the given id exists supplied by a {@link Single}.
*
* @param id must not be {@literal null}.
* @return true if an entity with the given id exists, {@literal false} otherwise
* @throws IllegalArgumentException if {@code id} is {@literal null}
*/
Single<Boolean> exists(Single<ID> id);
/**
* Returns all instances of the type.
*
* @return all entities
*/
Observable<T> findAll();
/**
* Returns all instances of the type with the given IDs.
*
* @param ids
* @return
*/
Observable<T> findAll(Iterable<ID> ids);
/**
* Returns all instances of the type with the given IDs.
*
* @param idStream
* @return
*/
Observable<T> findAll(Observable<ID> idStream);
/**
* Returns the number of entities available.
*
* @return the number of entities
*/
Single<Long> count();
/**
* Deletes the entity with the given id.
*
* @param id must not be {@literal null}.
* @throws IllegalArgumentException in case the given {@code id} is {@literal null}
*/
Single<Void> delete(ID id);
/**
* Deletes a given entity.
*
* @param entity
* @throws IllegalArgumentException in case the given entity is {@literal null}.
*/
Single<Void> delete(T entity);
/**
* Deletes the given entities.
*
* @param entities
* @throws IllegalArgumentException in case the given {@link Iterable} is {@literal null}.
*/
Single<Void> delete(Iterable<? extends T> entities);
/**
* Deletes the given entities.
*
* @param entityStream
* @throws IllegalArgumentException in case the given {@link Publisher} is {@literal null}.
*/
Single<Void> delete(Observable<? extends T> entityStream);
/**
* Deletes all entities managed by the repository.
*/
Single<Void> deleteAll();
}

79
src/main/java/org/springframework/data/repository/reactive/RxJavaPagingAndSortingRepository.java

@ -0,0 +1,79 @@ @@ -0,0 +1,79 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.repository.reactive;
import java.io.Serializable;
import org.reactivestreams.Publisher;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.repository.NoRepositoryBean;
import rx.Observable;
import rx.Single;
/**
* Extension of {@link RxJavaCrudRepository} to provide additional methods to retrieve entities using the pagination and sorting
* abstraction.
*
* @author Mark Paluch
* @see Sort
* @see Pageable
* @see Single
* @see Observable
* @see RxJavaCrudRepository
*/
@NoRepositoryBean
public interface RxJavaPagingAndSortingRepository<T, ID extends Serializable> extends RxJavaCrudRepository<T, ID> {
/*
* (non-Javadoc)
* @see org.springframework.data.repository.reactive.ReactiveCrudRepository#findAll()
*/
@Override
Observable<T> findAll();
/*
* (non-Javadoc)
* @see org.springframework.data.repository.reactive.ReactiveCrudRepository#findAll(java.lang.Iterable)
*/
@Override
Observable<T> findAll(Iterable<ID> ids);
/*
* (non-Javadoc)
* @see org.springframework.data.repository.reactive.ReactiveCrudRepository#findAll(org.reactivestreams.Publisher)
*/
@Override
Observable<T> findAll(Observable<ID> idStream);
/**
* Returns all entities sorted by the given options.
*
* @param sort
* @return all entities sorted by the given options
*/
Observable<T> findAll(Sort sort);
/**
* Returns a {@link Page} of entities meeting the paging restriction provided in the {@code Pageable} object.
*
* @param pageable
* @return a page of entities
*/
Single<Page<T>> findAll(Pageable pageable);
}

339
src/main/java/org/springframework/data/repository/util/QueryExecutionConverters.java

@ -25,6 +25,7 @@ import java.util.Set; @@ -25,6 +25,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import org.reactivestreams.Publisher;
import org.springframework.core.convert.ConversionService;
import org.springframework.core.convert.TypeDescriptor;
import org.springframework.core.convert.converter.Converter;
@ -37,6 +38,14 @@ import org.springframework.util.concurrent.ListenableFuture; @@ -37,6 +38,14 @@ import org.springframework.util.concurrent.ListenableFuture;
import com.google.common.base.Optional;
import reactor.core.converter.DependencyUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Completable;
import rx.Observable;
import rx.Single;
import scala.Option;
/**
* Converters to potentially wrap the execution of a repository method into a variety of wrapper types potentially being
* available on the classpath. Currently supported:
@ -47,6 +56,12 @@ import com.google.common.base.Optional; @@ -47,6 +56,12 @@ import com.google.common.base.Optional;
* <li>{@code java.util.concurrent.Future}</li>
* <li>{@code java.util.concurrent.CompletableFuture}</li>
* <li>{@code org.springframework.util.concurrent.ListenableFuture<}</li>
* <li>{@code rx.Single}</li>
* <li>{@code rx.Observable}</li>
* <li>{@code rx.Completable}</li>
* <li>{@code reactor.core.publisher.Mono}</li>
* <li>{@code reactor.core.publisher.Flux}</li>
* <li>{@code org.reactivestreams.Publisher}</li>
* </ul>
*
* @author Oliver Gierke
@ -66,32 +81,66 @@ public abstract class QueryExecutionConverters { @@ -66,32 +81,66 @@ public abstract class QueryExecutionConverters {
private static final boolean SCALA_PRESENT = ClassUtils.isPresent("scala.Option",
QueryExecutionConverters.class.getClassLoader());
private static final boolean PROJECT_REACTOR_PRESENT = ClassUtils.isPresent("reactor.core.converter.DependencyUtils",
QueryExecutionConverters.class.getClassLoader());
private static final boolean RXJAVA_SINGLE_PRESENT = ClassUtils.isPresent("rx.Single",
QueryExecutionConverters.class.getClassLoader());
private static final boolean RXJAVA_OBSERVABLE_PRESENT = ClassUtils.isPresent("rx.Observable",
QueryExecutionConverters.class.getClassLoader());
private static final boolean RXJAVA_COMPLETABLE_PRESENT = ClassUtils.isPresent("rx.Completable",
QueryExecutionConverters.class.getClassLoader());
private static final Set<Class<?>> WRAPPER_TYPES = new HashSet<Class<?>>();
private static final Set<Class<?>> UNWRAPPER_TYPES = new HashSet<Class<?>>();
private static final Set<Converter<Object, Object>> UNWRAPPERS = new HashSet<Converter<Object, Object>>();
static {
WRAPPER_TYPES.add(Future.class);
UNWRAPPER_TYPES.add(Future.class);
WRAPPER_TYPES.add(ListenableFuture.class);
UNWRAPPER_TYPES.add(ListenableFuture.class);
if (GUAVA_PRESENT) {
WRAPPER_TYPES.add(NullableWrapperToGuavaOptionalConverter.getWrapperType());
UNWRAPPER_TYPES.add(NullableWrapperToGuavaOptionalConverter.getWrapperType());
UNWRAPPERS.add(GuavaOptionalUnwrapper.INSTANCE);
}
if (JDK_8_PRESENT) {
WRAPPER_TYPES.add(NullableWrapperToJdk8OptionalConverter.getWrapperType());
UNWRAPPER_TYPES.add(NullableWrapperToJdk8OptionalConverter.getWrapperType());
UNWRAPPERS.add(Jdk8OptionalUnwrapper.INSTANCE);
}
if (JDK_8_PRESENT && SPRING_4_2_PRESENT) {
WRAPPER_TYPES.add(NullableWrapperToCompletableFutureConverter.getWrapperType());
UNWRAPPER_TYPES.add(NullableWrapperToCompletableFutureConverter.getWrapperType());
}
if (SCALA_PRESENT) {
WRAPPER_TYPES.add(NullableWrapperToScalaOptionConverter.getWrapperType());
UNWRAPPER_TYPES.add(NullableWrapperToScalaOptionConverter.getWrapperType());
UNWRAPPERS.add(ScalOptionUnwrapper.INSTANCE);
}
if (PROJECT_REACTOR_PRESENT) {
WRAPPER_TYPES.add(Publisher.class);
WRAPPER_TYPES.add(Mono.class);
WRAPPER_TYPES.add(Flux.class);
}
if (RXJAVA_SINGLE_PRESENT) {
WRAPPER_TYPES.add(Single.class);
}
if (RXJAVA_COMPLETABLE_PRESENT) {
WRAPPER_TYPES.add(Completable.class);
}
if (RXJAVA_OBSERVABLE_PRESENT) {
WRAPPER_TYPES.add(Observable.class);
}
}
private QueryExecutionConverters() {}
@ -115,6 +164,25 @@ public abstract class QueryExecutionConverters { @@ -115,6 +164,25 @@ public abstract class QueryExecutionConverters {
return false;
}
/**
* Returns whether the given wrapper type supports unwrapping.
*
* @param type must not be {@literal null}.
* @return
*/
public static boolean supportsUnwrapping(Class<?> type) {
Assert.notNull(type, "Type must not be null!");
for (Class<?> candidate : UNWRAPPER_TYPES) {
if (candidate.isAssignableFrom(type)) {
return true;
}
}
return false;
}
/**
* Registers converters for wrapper types found on the classpath.
*
@ -138,6 +206,37 @@ public abstract class QueryExecutionConverters { @@ -138,6 +206,37 @@ public abstract class QueryExecutionConverters {
}
conversionService.addConverter(new NullableWrapperToFutureConverter(conversionService));
if (PROJECT_REACTOR_PRESENT) {
if (RXJAVA_COMPLETABLE_PRESENT) {
conversionService.addConverter(PublisherToCompletableConverter.INSTANCE);
conversionService.addConverter(CompletableToPublisherConverter.INSTANCE);
conversionService.addConverter(CompletableToMonoConverter.INSTANCE);
}
if (RXJAVA_SINGLE_PRESENT) {
conversionService.addConverter(PublisherToSingleConverter.INSTANCE);
conversionService.addConverter(SingleToPublisherConverter.INSTANCE);
conversionService.addConverter(SingleToMonoConverter.INSTANCE);
conversionService.addConverter(SingleToFluxConverter.INSTANCE);
}
if (RXJAVA_OBSERVABLE_PRESENT) {
conversionService.addConverter(PublisherToObservableConverter.INSTANCE);
conversionService.addConverter(ObservableToPublisherConverter.INSTANCE);
conversionService.addConverter(ObservableToMonoConverter.INSTANCE);
conversionService.addConverter(ObservableToFluxConverter.INSTANCE);
}
conversionService.addConverter(PublisherToMonoConverter.INSTANCE);
conversionService.addConverter(PublisherToFluxConverter.INSTANCE);
}
if (RXJAVA_SINGLE_PRESENT && RXJAVA_OBSERVABLE_PRESENT) {
conversionService.addConverter(SingleToObservableConverter.INSTANCE);
conversionService.addConverter(ObservableToSingleConverter.INSTANCE);
}
}
/**
@ -447,4 +546,244 @@ public abstract class QueryExecutionConverters { @@ -447,4 +546,244 @@ public abstract class QueryExecutionConverters {
return source instanceof Option ? ((Option<?>) source).getOrElse(alternative) : source;
}
}
/**
* A {@link Converter} to convert a {@link Publisher} to {@link Flux}.
*
* @author Mark Paluch
* @author 2.0
*/
public enum PublisherToFluxConverter implements Converter<Publisher<?>, Flux<?>> {
INSTANCE;
@Override
public Flux<?> convert(Publisher<?> source) {
return Flux.from(source);
}
}
/**
* A {@link Converter} to convert a {@link Publisher} to {@link Mono}.
*
* @author Mark Paluch
* @author 2.0
*/
public enum PublisherToMonoConverter implements Converter<Publisher<?>, Mono<?>> {
INSTANCE;
@Override
public Mono<?> convert(Publisher<?> source) {
return Mono.from(source);
}
}
/**
* A {@link Converter} to convert a {@link Publisher} to {@link Single}.
*
* @author Mark Paluch
* @author 2.0
*/
public enum PublisherToSingleConverter implements Converter<Publisher<?>, Single<?>> {
INSTANCE;
@Override
public Single<?> convert(Publisher<?> source) {
return DependencyUtils.convertFromPublisher(source, Single.class);
}
}
/**
* A {@link Converter} to convert a {@link Publisher} to {@link Completable}.
*
* @author Mark Paluch
* @author 2.0
*/
public enum PublisherToCompletableConverter implements Converter<Publisher<?>, Completable> {
INSTANCE;
@Override
public Completable convert(Publisher<?> source) {
return DependencyUtils.convertFromPublisher(source, Completable.class);
}
}
/**
* A {@link Converter} to convert a {@link Publisher} to {@link Observable}.
*
* @author Mark Paluch
* @author 2.0
*/
public enum PublisherToObservableConverter implements Converter<Publisher<?>, Observable<?>> {
INSTANCE;
@Override
public Observable<?> convert(Publisher<?> source) {
return DependencyUtils.convertFromPublisher(source, Observable.class);
}
}
/**
* A {@link Converter} to convert a {@link Single} to {@link Publisher}.
*
* @author Mark Paluch
* @author 2.0
*/
public enum SingleToPublisherConverter implements Converter<Single<?>, Publisher<?>> {
INSTANCE;
@Override
public Publisher<?> convert(Single<?> source) {
return DependencyUtils.convertToPublisher(source);
}
}
/**
* A {@link Converter} to convert a {@link Single} to {@link Mono}.
*
* @author Mark Paluch
* @author 2.0
*/
public enum SingleToMonoConverter implements Converter<Single<?>, Mono<?>> {
INSTANCE;
@Override
public Mono<?> convert(Single<?> source) {
return PublisherToMonoConverter.INSTANCE.convert(DependencyUtils.convertToPublisher(source));
}
}
/**
* A {@link Converter} to convert a {@link Single} to {@link Publisher}.
*
* @author Mark Paluch
* @author 2.0
*/
public enum SingleToFluxConverter implements Converter<Single<?>, Flux<?>> {
INSTANCE;
@Override
public Flux<?> convert(Single<?> source) {
return PublisherToFluxConverter.INSTANCE.convert(DependencyUtils.convertToPublisher(source));
}
}
/**
* A {@link Converter} to convert a {@link Completable} to {@link Publisher}.
*
* @author Mark Paluch
* @author 2.0
*/
public enum CompletableToPublisherConverter implements Converter<Completable, Publisher<?>> {
INSTANCE;
@Override
public Publisher<?> convert(Completable source) {
return DependencyUtils.convertToPublisher(source);
}
}
/**
* A {@link Converter} to convert a {@link Completable} to {@link Mono}.
*
* @author Mark Paluch
* @author 2.0
*/
public enum CompletableToMonoConverter implements Converter<Completable, Mono<?>> {
INSTANCE;
@Override
public Mono<?> convert(Completable source) {
return Mono.from(CompletableToPublisherConverter.INSTANCE.convert(source));
}
}
/**
* A {@link Converter} to convert an {@link Observable} to {@link Publisher}.
*
* @author Mark Paluch
* @author 2.0
*/
public enum ObservableToPublisherConverter implements Converter<Observable<?>, Publisher<?>> {
INSTANCE;
@Override
public Publisher<?> convert(Observable<?> source) {
return DependencyUtils.convertToPublisher(source);
}
}
/**
* A {@link Converter} to convert a {@link Observable} to {@link Mono}.
*
* @author Mark Paluch
* @author 2.0
*/
public enum ObservableToMonoConverter implements Converter<Observable<?>, Mono<?>> {
INSTANCE;
@Override
public Mono<?> convert(Observable<?> source) {
return PublisherToMonoConverter.INSTANCE.convert(DependencyUtils.convertToPublisher(source));
}
}
/**
* A {@link Converter} to convert a {@link Observable} to {@link Flux}.
*
* @author Mark Paluch
* @author 2.0
*/
public enum ObservableToFluxConverter implements Converter<Observable<?>, Flux<?>> {
INSTANCE;
@Override
public Flux<?> convert(Observable<?> source) {
return PublisherToFluxConverter.INSTANCE.convert(DependencyUtils.convertToPublisher(source));
}
}
/**
* A {@link Converter} to convert a {@link Observable} to {@link Single}.
*
* @author Mark Paluch
* @author 2.0
*/
public enum ObservableToSingleConverter implements Converter<Observable<?>, Single<?>> {
INSTANCE;
@Override
public Single<?> convert(Observable<?> source) {
return source.toSingle();
}
}
/**
* A {@link Converter} to convert a {@link Single} to {@link Single}.
*
* @author Mark Paluch
* @author 2.0
*/
public enum SingleToObservableConverter implements Converter<Single<?>, Observable<?>> {
INSTANCE;
@Override
public Observable<?> convert(Single<?> source) {
return source.toObservable();
}
}
}

312
src/test/java/org/springframework/data/repository/core/support/QueryExecutionResultHandlerUnitTests.java

@ -18,6 +18,12 @@ package org.springframework.data.repository.core.support; @@ -18,6 +18,12 @@ package org.springframework.data.repository.core.support;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Completable;
import rx.Observable;
import rx.Single;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.List;
@ -26,14 +32,17 @@ import java.util.Optional; @@ -26,14 +32,17 @@ import java.util.Optional;
import java.util.Set;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.springframework.core.MethodParameter;
import org.springframework.core.convert.TypeDescriptor;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.repository.Repository;
/**
* Unit tests for {@link QueryExecutionResultHandler}.
*
*
* @author Oliver Gierke
* @author Mark Paluch
*/
public class QueryExecutionResultHandlerUnitTests {
@ -111,6 +120,295 @@ public class QueryExecutionResultHandlerUnitTests { @@ -111,6 +120,295 @@ public class QueryExecutionResultHandlerUnitTests {
assertThat(handler.postProcessInvocationResult(null, getTypeDescriptorFor("map")), is(instanceOf(Map.class)));
}
/**
* @see DATACMNS-836
*/
@Test
@SuppressWarnings("unchecked")
public void convertsRxJavaSingleIntoPublisher() throws Exception {
Single<Entity> entity = Single.just(new Entity());
Object result = handler.postProcessInvocationResult(entity, getTypeDescriptorFor("publisher"));
assertThat(result, is(instanceOf(Publisher.class)));
Mono<Entity> mono = Mono.from((Publisher<Entity>) result);
assertThat(mono.block(), is(entity.toBlocking().value()));
}
/**
* @see DATACMNS-836
*/
@Test
@SuppressWarnings("unchecked")
public void convertsRxJavaSingleIntoMono() throws Exception {
Single<Entity> entity = Single.just(new Entity());
Object result = handler.postProcessInvocationResult(entity, getTypeDescriptorFor("mono"));
assertThat(result, is(instanceOf(Mono.class)));
Mono<Entity> mono = (Mono<Entity>) result;
assertThat(mono.block(), is(entity.toBlocking().value()));
}
/**
* @see DATACMNS-836
*/
@Test
@SuppressWarnings("unchecked")
public void convertsRxJavaSingleIntoFlux() throws Exception {
Single<Entity> entity = Single.just(new Entity());
Object result = handler.postProcessInvocationResult(entity, getTypeDescriptorFor("flux"));
assertThat(result, is(instanceOf(Flux.class)));
Flux<Entity> flux = (Flux<Entity>) result;
assertThat(flux.next().block(), is(entity.toBlocking().value()));
}
/**
* @see DATACMNS-836
*/
@Test
@SuppressWarnings("unchecked")
public void convertsRxJavaObservableIntoPublisher() throws Exception {
Observable<Entity> entity = Observable.just(new Entity());
Object result = handler.postProcessInvocationResult(entity, getTypeDescriptorFor("publisher"));
assertThat(result, is(instanceOf(Publisher.class)));
Mono<Entity> mono = Mono.from((Publisher<Entity>) result);
assertThat(mono.block(), is(entity.toBlocking().first()));
}
/**
* @see DATACMNS-836
*/
@Test
@SuppressWarnings("unchecked")
public void convertsRxJavaObservableIntoMono() throws Exception {
Observable<Entity> entity = Observable.just(new Entity());
Object result = handler.postProcessInvocationResult(entity, getTypeDescriptorFor("mono"));
assertThat(result, is(instanceOf(Mono.class)));
Mono<Entity> mono = (Mono<Entity>) result;
assertThat(mono.block(), is(entity.toBlocking().first()));
}
/**
* @see DATACMNS-836
*/
@Test
@SuppressWarnings("unchecked")
public void convertsRxJavaObservableIntoFlux() throws Exception {
Observable<Entity> entity = Observable.just(new Entity());
Object result = handler.postProcessInvocationResult(entity, getTypeDescriptorFor("flux"));
assertThat(result, is(instanceOf(Flux.class)));
Flux<Entity> flux = (Flux<Entity>) result;
assertThat(flux.next().block(), is(entity.toBlocking().first()));
}
/**
* @see DATACMNS-836
*/
@Test
@SuppressWarnings("unchecked")
public void convertsRxJavaObservableIntoSingle() throws Exception {
Observable<Entity> entity = Observable.just(new Entity());
Object result = handler.postProcessInvocationResult(entity, getTypeDescriptorFor("single"));
assertThat(result, is(instanceOf(Single.class)));
Single<Entity> single = (Single<Entity>) result;
assertThat(single.toBlocking().value(), is(entity.toBlocking().first()));
}
/**
* @see DATACMNS-836
*/
@Test
@SuppressWarnings("unchecked")
public void convertsRxJavaSingleIntoObservable() throws Exception {
Single<Entity> entity = Single.just(new Entity());
Object result = handler.postProcessInvocationResult(entity, getTypeDescriptorFor("observable"));
assertThat(result, is(instanceOf(Observable.class)));
Observable<Entity> observable = (Observable<Entity>) result;
assertThat(observable.toBlocking().first(), is(entity.toBlocking().value()));
}
/**
* @see DATACMNS-836
*/
@Test
@SuppressWarnings("unchecked")
public void convertsReactorMonoIntoSingle() throws Exception {
Mono<Entity> entity = Mono.just(new Entity());
Object result = handler.postProcessInvocationResult(entity, getTypeDescriptorFor("single"));
assertThat(result, is(instanceOf(Single.class)));
Single<Entity> single = (Single<Entity>) result;
assertThat(single.toBlocking().value(), is(entity.block()));
}
/**
* @see DATACMNS-836
*/
@Test
@SuppressWarnings("unchecked")
public void convertsReactorMonoIntoCompletable() throws Exception {
Mono<Entity> entity = Mono.just(new Entity());
Object result = handler.postProcessInvocationResult(entity, getTypeDescriptorFor("completable"));
assertThat(result, is(instanceOf(Completable.class)));
Completable completable = (Completable) result;
assertThat(completable.get(), is(nullValue()));
}
/**
* @see DATACMNS-836
*/
@Test
@SuppressWarnings("unchecked")
public void convertsReactorMonoIntoCompletableWithException() throws Exception {
Mono<Entity> entity = Mono.error(new InvalidDataAccessApiUsageException("err"));
Object result = handler.postProcessInvocationResult(entity, getTypeDescriptorFor("completable"));
assertThat(result, is(instanceOf(Completable.class)));
Completable completable = (Completable) result;
assertThat(completable.get(), is(instanceOf(InvalidDataAccessApiUsageException.class)));
}
/**
* @see DATACMNS-836
*/
@Test
@SuppressWarnings("unchecked")
public void convertsRxJavaCompletableIntoMono() throws Exception {
Completable entity = Completable.complete();
Object result = handler.postProcessInvocationResult(entity, getTypeDescriptorFor("mono"));
assertThat(result, is(instanceOf(Mono.class)));
Mono mono = (Mono) result;
assertThat(mono.block(), is(nullValue()));
}
/**
* @see DATACMNS-836
*/
@Test(expected = InvalidDataAccessApiUsageException.class)
@SuppressWarnings("unchecked")
public void convertsRxJavaCompletableIntoMonoWithException() throws Exception {
Completable entity = Completable.error(new InvalidDataAccessApiUsageException("err"));
Object result = handler.postProcessInvocationResult(entity, getTypeDescriptorFor("mono"));
assertThat(result, is(instanceOf(Mono.class)));
Mono mono = (Mono) result;
mono.block();
fail("Missing InvalidDataAccessApiUsageException");
}
/**
* @see DATACMNS-836
*/
@Test
@SuppressWarnings("unchecked")
public void convertsReactorMonoIntoObservable() throws Exception {
Mono<Entity> entity = Mono.just(new Entity());
Object result = handler.postProcessInvocationResult(entity, getTypeDescriptorFor("observable"));
assertThat(result, is(instanceOf(Observable.class)));
Observable<Entity> observable = (Observable<Entity>) result;
assertThat(observable.toBlocking().first(), is(entity.block()));
}
/**
* @see DATACMNS-836
*/
@Test
@SuppressWarnings("unchecked")
public void convertsReactorFluxIntoSingle() throws Exception {
Flux<Entity> entity = Flux.just(new Entity());
Object result = handler.postProcessInvocationResult(entity, getTypeDescriptorFor("single"));
assertThat(result, is(instanceOf(Single.class)));
Single<Entity> single = (Single<Entity>) result;
assertThat(single.toBlocking().value(), is(entity.next().block()));
}
/**
* @see DATACMNS-836
*/
@Test
@SuppressWarnings("unchecked")
public void convertsReactorFluxIntoObservable() throws Exception {
Flux<Entity> entity = Flux.just(new Entity());
Object result = handler.postProcessInvocationResult(entity, getTypeDescriptorFor("observable"));
assertThat(result, is(instanceOf(Observable.class)));
Observable<Entity> observable = (Observable<Entity>) result;
assertThat(observable.toBlocking().first(), is(entity.next().block()));
}
/**
* @see DATACMNS-836
*/
@Test
@SuppressWarnings("unchecked")
public void convertsReactorFluxIntoMono() throws Exception {
Flux<Entity> entity = Flux.just(new Entity());
Object result = handler.postProcessInvocationResult(entity, getTypeDescriptorFor("mono"));
assertThat(result, is(instanceOf(Mono.class)));
Mono<Entity> mono = (Mono<Entity>) result;
assertThat(mono.block(), is(entity.next().block()));
}
/**
* @see DATACMNS-836
*/
@Test
@SuppressWarnings("unchecked")
public void convertsReactorMonoIntoFlux() throws Exception {
Mono<Entity> entity = Mono.just(new Entity());
Object result = handler.postProcessInvocationResult(entity, getTypeDescriptorFor("flux"));
assertThat(result, is(instanceOf(Flux.class)));
Flux<Entity> flux = (Flux<Entity>) result;
assertThat(flux.next().block(), is(entity.block()));
}
private static TypeDescriptor getTypeDescriptorFor(String methodName) throws Exception {
Method method = Sample.class.getMethod(methodName);
@ -128,6 +426,18 @@ public class QueryExecutionResultHandlerUnitTests { @@ -128,6 +426,18 @@ public class QueryExecutionResultHandlerUnitTests {
com.google.common.base.Optional<Entity> guavaOptional();
Map<Integer, Entity> map();
Publisher<Entity> publisher();
Mono<Entity> mono();
Flux<Entity> flux();
Observable<Entity> observable();
Single<Entity> single();
Completable completable();
}
static class Entity {}

6
src/test/java/org/springframework/data/repository/core/support/QueryExecutorMethodInterceptorUnitTests.java

@ -51,16 +51,16 @@ public class QueryExecutorMethodInterceptorUnitTests { @@ -51,16 +51,16 @@ public class QueryExecutorMethodInterceptorUnitTests {
when(information.getQueryMethods()).thenReturn(Arrays.asList(Object.class.getMethod("toString")));
when(factory.getQueryLookupStrategy(any(Key.class))).thenReturn(null);
factory.new QueryExecutorMethodInterceptor(information, null, new Object());
factory.new QueryExecutorMethodInterceptor(information);
}
@Test
public void skipsQueryLookupsIfQueryLookupStrategyIsNull() {
when(information.getQueryMethods()).thenReturn(Collections.<Method> emptySet());
when(information.getQueryMethods()).thenReturn(Collections.<Method>emptySet());
when(factory.getQueryLookupStrategy(any(Key.class))).thenReturn(strategy);
factory.new QueryExecutorMethodInterceptor(information, null, new Object());
factory.new QueryExecutorMethodInterceptor(information);
verify(strategy, times(0)).resolveQuery(any(Method.class), any(RepositoryMetadata.class),
any(ProjectionFactory.class), any(NamedQueries.class));
}

149
src/test/java/org/springframework/data/repository/core/support/ReactiveRepositoryInformationUnitTests.java

@ -0,0 +1,149 @@ @@ -0,0 +1,149 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.repository.core.support;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import rx.Observable;
import java.io.Serializable;
import java.lang.reflect.Method;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.runners.MockitoJUnitRunner;
import org.reactivestreams.Publisher;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.data.repository.core.RepositoryMetadata;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import org.springframework.data.repository.reactive.ReactivePagingAndSortingRepository;
import org.springframework.data.repository.reactive.RxJavaCrudRepository;
import org.springframework.data.repository.util.QueryExecutionConverters;
/**
* Unit tests for {@link ConvertingMethodParameterRepositoryInformation}.
*
* @author Mark Paluch
*/
@RunWith(MockitoJUnitRunner.class)
public class ReactiveRepositoryInformationUnitTests {
static final Class<ReactiveJavaInterfaceWithGenerics> REPOSITORY = ReactiveJavaInterfaceWithGenerics.class;
@Test
public void discoversMethodWithoutComparingReturnType() throws Exception {
Method method = RxJavaInterfaceWithGenerics.class.getMethod("deleteAll");
RepositoryMetadata metadata = new DefaultRepositoryMetadata(RxJavaInterfaceWithGenerics.class);
DefaultRepositoryInformation information = new DefaultRepositoryInformation(metadata, REPOSITORY, null);
Method reference = information.getTargetClassMethod(method);
assertEquals(ReactiveCrudRepository.class, reference.getDeclaringClass());
assertThat(reference.getName(), is("deleteAll"));
}
@Test
public void discoversMethodWithConvertibleArguments() throws Exception {
DefaultConversionService conversionService = new DefaultConversionService();
QueryExecutionConverters.registerConvertersIn(conversionService);
Method method = RxJavaInterfaceWithGenerics.class.getMethod("save", Observable.class);
RepositoryMetadata metadata = new DefaultRepositoryMetadata(RxJavaInterfaceWithGenerics.class);
DefaultRepositoryInformation information = new ReactiveRepositoryInformation(metadata, REPOSITORY, null,
conversionService);
Method reference = information.getTargetClassMethod(method);
assertEquals(ReactiveCrudRepository.class, reference.getDeclaringClass());
assertThat(reference.getName(), is("save"));
assertThat(reference.getParameterTypes()[0], is(equalTo(Publisher.class)));
}
@Test
public void discoversMethodAssignableArguments() throws Exception {
DefaultConversionService conversionService = new DefaultConversionService();
QueryExecutionConverters.registerConvertersIn(conversionService);
Method method = ReactivePagingAndSortingRepository.class.getMethod("save", Publisher.class);
RepositoryMetadata metadata = new DefaultRepositoryMetadata(ReactiveJavaInterfaceWithGenerics.class);
DefaultRepositoryInformation information = new ReactiveRepositoryInformation(metadata, REPOSITORY, null,
conversionService);
Method reference = information.getTargetClassMethod(method);
assertEquals(ReactiveCrudRepository.class, reference.getDeclaringClass());
assertThat(reference.getName(), is("save"));
assertThat(reference.getParameterTypes()[0], is(equalTo(Publisher.class)));
}
@Test
public void discoversMethodExactIterableArguments() throws Exception {
DefaultConversionService conversionService = new DefaultConversionService();
QueryExecutionConverters.registerConvertersIn(conversionService);
Method method = ReactiveJavaInterfaceWithGenerics.class.getMethod("save", Iterable.class);
RepositoryMetadata metadata = new DefaultRepositoryMetadata(ReactiveJavaInterfaceWithGenerics.class);
DefaultRepositoryInformation information = new ReactiveRepositoryInformation(metadata, REPOSITORY, null,
conversionService);
Method reference = information.getTargetClassMethod(method);
assertEquals(ReactiveCrudRepository.class, reference.getDeclaringClass());
assertThat(reference.getName(), is("save"));
assertThat(reference.getParameterTypes()[0], is(equalTo(Iterable.class)));
}
@Test
public void discoversMethodExactObjectArguments() throws Exception {
DefaultConversionService conversionService = new DefaultConversionService();
QueryExecutionConverters.registerConvertersIn(conversionService);
Method method = ReactiveJavaInterfaceWithGenerics.class.getMethod("save", Object.class);
RepositoryMetadata metadata = new DefaultRepositoryMetadata(ReactiveJavaInterfaceWithGenerics.class);
DefaultRepositoryInformation information = new ReactiveRepositoryInformation(metadata, REPOSITORY, null,
conversionService);
Method reference = information.getTargetClassMethod(method);
assertEquals(ReactiveCrudRepository.class, reference.getDeclaringClass());
assertThat(reference.getName(), is("save"));
assertThat(reference.getParameterTypes()[0], is(equalTo(Object.class)));
}
interface RxJavaInterfaceWithGenerics extends RxJavaCrudRepository<User, String> {}
interface ReactiveJavaInterfaceWithGenerics extends ReactiveCrudRepository<User, String> {}
static abstract class DummyGenericReactiveRepositorySupport<T, ID extends Serializable>
implements ReactiveCrudRepository<T, ID> {
}
static class User {
String id;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
}
}

133
src/test/java/org/springframework/data/repository/core/support/ReactiveWrapperRepositoryFactorySupportUnitTests.java

@ -0,0 +1,133 @@ @@ -0,0 +1,133 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.repository.core.support;
import static org.mockito.Mockito.*;
import java.io.Serializable;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.data.repository.Repository;
import org.springframework.data.repository.reactive.ReactivePagingAndSortingRepository;
import org.springframework.data.repository.util.QueryExecutionConverters;
import reactor.core.publisher.Mono;
import rx.Single;
/**
* Unit tests for {@link RepositoryFactorySupport} using reactive wrapper types.
*
* @author Mark Paluch
*/
@RunWith(MockitoJUnitRunner.class)
public class ReactiveWrapperRepositoryFactorySupportUnitTests {
public @Rule ExpectedException exception = ExpectedException.none();
DummyRepositoryFactory factory;
@Mock ReactivePagingAndSortingRepository<Object, Serializable> backingRepo;
@Mock ObjectRepositoryCustom customImplementation;
@Before
public void setUp() {
DefaultConversionService defaultConversionService = new DefaultConversionService();
QueryExecutionConverters.registerConvertersIn(defaultConversionService);
factory = new DummyRepositoryFactory(backingRepo);
factory.setConversionService(defaultConversionService);
}
/**
* @see DATACMNS-836
*/
@Test
public void invokesCustomMethodIfItRedeclaresACRUDOne() {
ObjectRepository repository = factory.getRepository(ObjectRepository.class, customImplementation);
repository.findOne(1);
verify(customImplementation, times(1)).findOne(1);
verify(backingRepo, times(0)).findOne(1);
}
/**
* @see DATACMNS-836
*/
@Test
public void callsMethodOnBaseImplementationWithExactArguments() {
Serializable id = 1L;
ConvertingRepository repository = factory.getRepository(ConvertingRepository.class);
repository.exists(id);
verify(backingRepo, times(1)).exists(id);
}
/**
* @see DATACMNS-836
*/
@Test
public void doesNotCallMethodOnBaseEvenIfDeclaredTypeCouldBeConverted() {
Long id = 1L;
ConvertingRepository repository = factory.getRepository(ConvertingRepository.class);
repository.exists(id);
verifyZeroInteractions(backingRepo);
}
/**
* @see DATACMNS-836
*/
@Test
public void callsMethodOnBaseImplementationWithTypeConversion() {
Single<Long> ids = Single.just(1L);
ConvertingRepository repository = factory.getRepository(ConvertingRepository.class);
repository.exists(ids);
verify(backingRepo, times(1)).exists(any(Mono.class));
}
interface ConvertingRepository extends Repository<Object, Long> {
Single<Boolean> exists(Single<Long> id);
Single<Boolean> exists(Serializable id);
Single<Boolean> exists(Long id);
}
interface ObjectRepository
extends Repository<Object, Serializable>, RepositoryFactorySupportUnitTests.ObjectRepositoryCustom {
}
interface ObjectRepositoryCustom {
Object findOne(Serializable id);
}
}

29
src/test/java/org/springframework/data/repository/query/ParametersUnitTests.java

@ -17,12 +17,14 @@ package org.springframework.data.repository.query; @@ -17,12 +17,14 @@ package org.springframework.data.repository.query;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import rx.Single;
import java.lang.reflect.Method;
import java.util.Optional;
import org.junit.Before;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.test.util.ReflectionTestUtils;
@ -31,6 +33,7 @@ import org.springframework.test.util.ReflectionTestUtils; @@ -31,6 +33,7 @@ import org.springframework.test.util.ReflectionTestUtils;
* Unit test for {@link Parameters}.
*
* @author Oliver Gierke
* @author Mark Paluch
*/
public class ParametersUnitTests {
@ -176,6 +179,28 @@ public class ParametersUnitTests { @@ -176,6 +179,28 @@ public class ParametersUnitTests {
assertThat(parameters.getParameter(0).getType(), is(typeCompatibleWith(String.class)));
}
/**
* @see DATACMNS-836
*/
@Test
public void keepsReactiveStreamsWrapper() throws Exception {
Parameters<?, Parameter> parameters = getParametersFor("methodWithPublisher", Publisher.class);
assertThat(parameters.getParameter(0).getType(), is(typeCompatibleWith(Publisher.class)));
}
/**
* @see DATACMNS-836
*/
@Test
public void keepsRxJavaWrapper() throws Exception {
Parameters<?, Parameter> parameters = getParametersFor("methodWithSingle", Single.class);
assertThat(parameters.getParameter(0).getType(), is(typeCompatibleWith(Single.class)));
}
private Parameters<?, Parameter> getParametersFor(String methodName, Class<?>... parameterTypes)
throws SecurityException, NoSuchMethodException {
@ -209,5 +234,9 @@ public class ParametersUnitTests { @@ -209,5 +234,9 @@ public class ParametersUnitTests {
<T> T dynamicBind(Class<T> type, Class<?> one, Class<Object> two);
void methodWithOptional(Optional<String> optional);
void methodWithPublisher(Publisher<String> publisher);
void methodWithSingle(Single<String> single);
}
}

158
src/test/java/org/springframework/data/repository/query/ReactiveWrapperConvertersUnitTests.java

@ -0,0 +1,158 @@ @@ -0,0 +1,158 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.repository.query;
import static org.assertj.core.api.AssertionsForClassTypes.*;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Completable;
import rx.Observable;
import rx.Single;
/**
* Unit tests for {@link ReactiveWrapperConverters}.
*
* @author Mark Paluch
*/
public class ReactiveWrapperConvertersUnitTests {
/**
* @see DATACMNS-836
*/
@Test
public void shouldSupportReactorTypes() {
assertThat(ReactiveWrapperConverters.supports(Mono.class)).isTrue();
assertThat(ReactiveWrapperConverters.supports(Flux.class)).isTrue();
assertThat(ReactiveWrapperConverters.supports(Publisher.class)).isTrue();
assertThat(ReactiveWrapperConverters.supports(Object.class)).isFalse();
}
/**
* @see DATACMNS-836
*/
@Test
public void shouldSupportRxJavaTypes() {
assertThat(ReactiveWrapperConverters.supports(Single.class)).isTrue();
assertThat(ReactiveWrapperConverters.supports(Observable.class)).isTrue();
assertThat(ReactiveWrapperConverters.supports(Completable.class)).isFalse();
}
/**
* @see DATACMNS-836
*/
@Test
public void isSingleLikeShouldReportCorrectSingleTypes() {
assertThat(ReactiveWrapperConverters.isSingleLike(Mono.class)).isTrue();
assertThat(ReactiveWrapperConverters.isSingleLike(Flux.class)).isFalse();
assertThat(ReactiveWrapperConverters.isSingleLike(Single.class)).isTrue();
assertThat(ReactiveWrapperConverters.isSingleLike(Observable.class)).isFalse();
assertThat(ReactiveWrapperConverters.isSingleLike(Publisher.class)).isFalse();
}
/**
* @see DATACMNS-836
*/
@Test
public void isCollectionLikeShouldReportCorrectCollectionTypes() {
assertThat(ReactiveWrapperConverters.isCollectionLike(Mono.class)).isFalse();
assertThat(ReactiveWrapperConverters.isCollectionLike(Flux.class)).isTrue();
assertThat(ReactiveWrapperConverters.isCollectionLike(Single.class)).isFalse();
assertThat(ReactiveWrapperConverters.isCollectionLike(Observable.class)).isTrue();
assertThat(ReactiveWrapperConverters.isCollectionLike(Publisher.class)).isTrue();
}
/**
* @see DATACMNS-836
*/
@Test
public void toWrapperShouldCastMonoToMono() {
Mono<String> foo = Mono.just("foo");
assertThat(ReactiveWrapperConverters.toWrapper(foo, Mono.class)).isSameAs(foo);
}
/**
* @see DATACMNS-836
*/
@Test
public void toWrapperShouldConvertMonoToSingle() {
Mono<String> foo = Mono.just("foo");
assertThat(ReactiveWrapperConverters.toWrapper(foo, Single.class)).isInstanceOf(Single.class);
}
/**
* @see DATACMNS-836
*/
@Test
public void toWrapperShouldConvertMonoToFlux() {
Mono<String> foo = Mono.just("foo");
assertThat(ReactiveWrapperConverters.toWrapper(foo, Flux.class)).isInstanceOf(Flux.class);
}
/**
* @see DATACMNS-836
*/
@Test
public void shouldMapMono() {
Mono<String> foo = Mono.just("foo");
Mono<Long> map = ReactiveWrapperConverters.map(foo, source -> 1L);
assertThat(map.block()).isEqualTo(1L);
}
/**
* @see DATACMNS-836
*/
@Test
public void shouldMapFlux() {
Flux<String> foo = Flux.just("foo");
Flux<Long> map = ReactiveWrapperConverters.map(foo, source -> 1L);
assertThat(map.next().block()).isEqualTo(1L);
}
/**
* @see DATACMNS-836
*/
@Test
public void shouldMapSingle() {
Single<String> foo = Single.just("foo");
Single<Long> map = ReactiveWrapperConverters.map(foo, source -> 1L);
assertThat(map.toBlocking().value()).isEqualTo(1L);
}
/**
* @see DATACMNS-836
*/
@Test
public void shouldMapObservable() {
Observable<String> foo = Observable.just("foo");
Observable<Long> map = ReactiveWrapperConverters.map(foo, source -> 1L);
assertThat(map.toBlocking().first()).isEqualTo(1L);
}
}

109
src/test/java/org/springframework/data/repository/query/ResultProcessorUnitTests.java

@ -40,10 +40,16 @@ import org.springframework.data.projection.SpelAwareProxyProjectionFactory; @@ -40,10 +40,16 @@ import org.springframework.data.projection.SpelAwareProxyProjectionFactory;
import org.springframework.data.repository.Repository;
import org.springframework.data.repository.core.support.DefaultRepositoryMetadata;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.Single;
/**
* Unit tests for {@link ResultProcessor}.
*
* @author Oliver Gierke
* @author Mark Paluch
*/
public class ResultProcessorUnitTests {
@ -260,6 +266,99 @@ public class ResultProcessorUnitTests { @@ -260,6 +266,99 @@ public class ResultProcessorUnitTests {
processor.processResult(specialList);
}
/**
* @see DATACMNS-836
*/
@Test
@SuppressWarnings("unchecked")
public void supportsMonoWrapper() throws Exception {
Mono<Sample> samples = Mono.just(new Sample("Dave", "Matthews"));
Object result = getProcessor("findMonoSample").processResult(samples);
assertThat(result, is(instanceOf(Mono.class)));
Object content = ((Mono<Object>) result).block();
assertThat(content, is(instanceOf(Sample.class)));
}
/**
* @see DATACMNS-836
*/
@Test
@SuppressWarnings("unchecked")
public void supportsSingleWrapper() throws Exception {
Single<Sample> samples = Single.just(new Sample("Dave", "Matthews"));
Object result = getProcessor("findSingleSample").processResult(samples);
assertThat(result, is(instanceOf(Single.class)));
Object content = ((Single<Object>) result).toBlocking().value();
assertThat(content, is(instanceOf(Sample.class)));
}
/**
* @see DATACMNS-836
*/
@Test
public void refrainsFromProjectingUsingReactiveWrappersIfThePreparingConverterReturnsACompatibleInstance()
throws Exception {
ResultProcessor processor = getProcessor("findMonoSampleDto");
Object result = processor.processResult(Mono.just(new Sample("Dave", "Matthews")), new Converter<Object, Object>() {
@Override
public Object convert(Object source) {
return new SampleDto();
}
});
assertThat(result, is(instanceOf(Mono.class)));
Object content = ((Mono<Object>) result).block();
assertThat(content, is(instanceOf(SampleDto.class)));
}
/**
* @see DATACMNS-836
*/
@Test
@SuppressWarnings("unchecked")
public void supportsFluxProjections() throws Exception {
Flux<Sample> samples = Flux.just(new Sample("Dave", "Matthews"));
Object result = getProcessor("findFluxProjection").processResult(samples);
assertThat(result, is(instanceOf(Flux.class)));
List<Object> content = ((Flux<Object>) result).collectList().block();
assertThat(content, is(not(empty())));
assertThat(content.get(0), is(instanceOf(SampleProjection.class)));
}
/**
* @see DATACMNS-836
*/
@Test
@SuppressWarnings("unchecked")
public void supportsObservableProjections() throws Exception {
Observable<Sample> samples = Observable.just(new Sample("Dave", "Matthews"));
Object result = getProcessor("findObservableProjection").processResult(samples);
assertThat(result, is(instanceOf(Observable.class)));
List<Object> content = ((Observable<Object>) result).toList().toBlocking().single();
assertThat(content, is(not(empty())));
assertThat(content.get(0), is(instanceOf(SampleProjection.class)));
}
private static ResultProcessor getProcessor(String methodName, Class<?>... parameters) throws Exception {
return getQueryMethod(methodName, parameters).getResultProcessor();
}
@ -296,6 +395,16 @@ public class ResultProcessorUnitTests { @@ -296,6 +395,16 @@ public class ResultProcessorUnitTests {
<T> T findOneDynamic(Class<T> type);
Stream<SampleProjection> findStreamProjection();
Mono<Sample> findMonoSample();
Mono<SampleDto> findMonoSampleDto();
Single<Sample> findSingleSample();
Flux<SampleProjection> findFluxProjection();
Observable<SampleProjection> findObservableProjection();
}
static class Sample {

47
src/test/java/org/springframework/data/repository/util/QueryExecutionConvertersUnitTests.java

@ -19,6 +19,11 @@ import static org.hamcrest.CoreMatchers.*; @@ -19,6 +19,11 @@ import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
import static org.junit.Assume.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Completable;
import rx.Observable;
import rx.Single;
import scala.Option;
import java.util.concurrent.CompletableFuture;
@ -26,6 +31,7 @@ import java.util.concurrent.Future; @@ -26,6 +31,7 @@ import java.util.concurrent.Future;
import org.junit.Before;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.springframework.core.SpringVersion;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.data.util.Version;
@ -66,6 +72,47 @@ public class QueryExecutionConvertersUnitTests { @@ -66,6 +72,47 @@ public class QueryExecutionConvertersUnitTests {
assertThat(QueryExecutionConverters.supports(Option.class), is(true));
}
/**
* @see DATACMNS-836
*/
@Test
public void registersReactiveWrapperTypes() {
assertThat(QueryExecutionConverters.supports(Publisher.class), is(true));
assertThat(QueryExecutionConverters.supports(Mono.class), is(true));
assertThat(QueryExecutionConverters.supports(Flux.class), is(true));
assertThat(QueryExecutionConverters.supports(Single.class), is(true));
assertThat(QueryExecutionConverters.supports(Completable.class), is(true));
assertThat(QueryExecutionConverters.supports(Observable.class), is(true));
}
/**
* @see DATACMNS-836
*/
@Test
public void registersUnwrapperTypes() {
assertThat(QueryExecutionConverters.supportsUnwrapping(Optional.class), is(true));
assertThat(QueryExecutionConverters.supportsUnwrapping(java.util.Optional.class), is(true));
assertThat(QueryExecutionConverters.supportsUnwrapping(Future.class), is(true));
assertThat(QueryExecutionConverters.supportsUnwrapping(ListenableFuture.class), is(true));
assertThat(QueryExecutionConverters.supportsUnwrapping(Option.class), is(true));
}
/**
* @see DATACMNS-836
*/
@Test
public void doesNotRegisterReactiveUnwrapperTypes() {
assertThat(QueryExecutionConverters.supportsUnwrapping(Publisher.class), is(false));
assertThat(QueryExecutionConverters.supportsUnwrapping(Mono.class), is(false));
assertThat(QueryExecutionConverters.supportsUnwrapping(Flux.class), is(false));
assertThat(QueryExecutionConverters.supportsUnwrapping(Single.class), is(false));
assertThat(QueryExecutionConverters.supportsUnwrapping(Completable.class), is(false));
assertThat(QueryExecutionConverters.supportsUnwrapping(Observable.class), is(false));
}
/**
* @see DATACMNS-714
*/

Loading…
Cancel
Save