diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractReactiveMongoQuery.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractReactiveMongoQuery.java index 08e61c902..d0e065384 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractReactiveMongoQuery.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractReactiveMongoQuery.java @@ -123,7 +123,7 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery { ReactiveMongoParameterAccessor parameterAccessor = new ReactiveMongoParameterAccessor(method, parameters); - return execute(parameterAccessor); + return parameterAccessor.resolveParameters().flatMapMany(this::execute); } private Publisher execute(MongoParameterAccessor parameterAccessor) { diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/MongoParametersParameterAccessor.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/MongoParametersParameterAccessor.java index 5d46866be..0807e1695 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/MongoParametersParameterAccessor.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/MongoParametersParameterAccessor.java @@ -38,7 +38,7 @@ import org.springframework.util.ClassUtils; */ public class MongoParametersParameterAccessor extends ParametersParameterAccessor implements MongoParameterAccessor { - private final MongoQueryMethod method; + final MongoQueryMethod method; /** * Creates a new {@link MongoParametersParameterAccessor}. diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/ReactiveMongoParameterAccessor.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/ReactiveMongoParameterAccessor.java index 4b3a715e0..92b16333e 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/ReactiveMongoParameterAccessor.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/ReactiveMongoParameterAccessor.java @@ -20,13 +20,18 @@ import reactor.core.publisher.Mono; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +import org.reactivestreams.Publisher; import org.springframework.data.repository.util.ReactiveWrapperConverters; import org.springframework.data.repository.util.ReactiveWrappers; /** * Reactive {@link org.springframework.data.repository.query.ParametersParameterAccessor} implementation that subscribes - * to reactive parameter wrapper types upon creation. This class performs synchronization when acessing parameters. + * to reactive parameter wrapper types upon creation. This class performs synchronization when accessing parameters. * * @author Mark Paluch * @author Christoph Strobl @@ -34,43 +39,13 @@ import org.springframework.data.repository.util.ReactiveWrappers; */ class ReactiveMongoParameterAccessor extends MongoParametersParameterAccessor { - private final List> subscriptions; + private final Object[] values; public ReactiveMongoParameterAccessor(MongoQueryMethod method, Object[] values) { super(method, values); + this.values = values; - this.subscriptions = new ArrayList<>(values.length); - - for (int i = 0; i < values.length; i++) { - - Object value = values[i]; - - if (value == null || !ReactiveWrappers.supports(value.getClass())) { - subscriptions.add(null); - continue; - } - - if (ReactiveWrappers.isSingleValueType(value.getClass())) { - subscriptions.add(ReactiveWrapperConverters.toWrapper(value, Mono.class).share()); - } else { - subscriptions.add(ReactiveWrapperConverters.toWrapper(value, Flux.class).collectList().share()); - } - } - } - - /* (non-Javadoc) - * @see org.springframework.data.repository.query.ParametersParameterAccessor#getValue(int) - */ - @SuppressWarnings("unchecked") - @Override - protected T getValue(int index) { - - if (subscriptions.get(index) != null) { - return (T) subscriptions.get(index).block(); - } - - return super.getValue(index); } /* (non-Javadoc) @@ -86,10 +61,64 @@ class ReactiveMongoParameterAccessor extends MongoParametersParameterAccessor { return result; } - /* (non-Javadoc) - * @see org.springframework.data.repository.query.ParametersParameterAccessor#getBindableValue(int) - */ public Object getBindableValue(int index) { return getValue(getParameters().getBindableParameter(index).getIndex()); } + + /** + * Resolve parameters that were provided through reactive wrapper types. Flux is collected into a list, values from + * Mono's are used directly. + * + * @return + */ + @SuppressWarnings("unchecked") + public Mono resolveParameters() { + + boolean hasReactiveWrapper = false; + + for (Object value : values) { + if (value == null || !ReactiveWrappers.supports(value.getClass())) { + continue; + } + + hasReactiveWrapper = true; + break; + } + + if (!hasReactiveWrapper) { + return Mono.just(this); + } + + Object[] resolved = new Object[values.length]; + Map> holder = new ConcurrentHashMap<>(); + List> publishers = new ArrayList<>(); + + for (int i = 0; i < values.length; i++) { + + Object value = resolved[i] = values[i]; + if (value == null || !ReactiveWrappers.supports(value.getClass())) { + continue; + } + + if (ReactiveWrappers.isSingleValueType(value.getClass())) { + + int index = i; + publishers.add(ReactiveWrapperConverters.toWrapper(value, Mono.class) // + .map(Optional::of) // + .defaultIfEmpty(Optional.empty()) // + .doOnNext(it -> holder.put(index, (Optional) it))); + } else { + + int index = i; + publishers.add(ReactiveWrapperConverters.toWrapper(value, Flux.class) // + .collectList() // + .doOnNext(it -> holder.put(index, Optional.of(it)))); + } + } + + return Flux.merge(publishers).then().thenReturn(resolved).map(values -> { + holder.forEach((index, v) -> values[index] = v.orElse(null)); + return new ReactiveMongoParameterAccessor(method, values); + }); + } }