Browse Source

DatabaseClient uses SQL Supplier more lazily

This commit modifies the `DefaultDatabaseClient` implementation in order
to ensure lazier usage of the `Supplier<String>` passed to the sql
method (`DatabaseClient#sql(Supplier)`).

Since technically `DatabaseClient` is an interface that could have 3rd
party implementations, the lazyness expectation is only hinted at in the
`DatabaseClient#sql` javadoc.

Possible caveat: some log statements attempt to reflect the now lazily
resolved SQL string. Similarly, some exceptions can capture the SQL that
caused the issue if known. We expect that these always occur after the
execution of the statement has been attempted (see `ResultFunction`).
At this point the SQL string will be accessible and logs and exceptions
should reflect it as before. Keep an eye out for such strings turning
into `null` after this change, which would indicate the opposite.

Closes gh-29367
pull/29890/head
Simon Baslé 3 years ago
parent
commit
d72df5ace4
  1. 33
      spring-r2dbc/src/main/java/org/springframework/r2dbc/core/ConnectionFunction.java
  2. 7
      spring-r2dbc/src/main/java/org/springframework/r2dbc/core/DatabaseClient.java
  3. 24
      spring-r2dbc/src/main/java/org/springframework/r2dbc/core/DefaultDatabaseClient.java
  4. 18
      spring-r2dbc/src/main/java/org/springframework/r2dbc/core/DefaultFetchSpec.java
  5. 56
      spring-r2dbc/src/main/java/org/springframework/r2dbc/core/DelegateConnectionFunction.java
  6. 74
      spring-r2dbc/src/main/java/org/springframework/r2dbc/core/ResultFunction.java
  7. 44
      spring-r2dbc/src/test/java/org/springframework/r2dbc/core/DefaultDatabaseClientUnitTests.java

33
spring-r2dbc/src/main/java/org/springframework/r2dbc/core/ConnectionFunction.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2020 the original author or authors. * Copyright 2002-2023 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -20,34 +20,19 @@ import java.util.function.Function;
import io.r2dbc.spi.Connection; import io.r2dbc.spi.Connection;
/** /**
* Union type combining {@link Function} and {@link SqlProvider} to expose the SQL that is * Union type combining {@link Function} and {@link SqlProvider} to expose the SQL that is
* related to the underlying action. * related to the underlying action. The SqlProvider can support lazy / generate once semantics,
* in which case {@link #getSql()} can be {@code null} until the {@code #apply(Connection)}
* method is invoked.
* *
* @author Mark Paluch * @author Mark Paluch
* @author Simon Baslé
* @since 5.3 * @since 5.3
* @param <R> the type of the result of the function. * @param <R> the type of the result of the function.
*/ */
class ConnectionFunction<R> implements Function<Connection, R>, SqlProvider { sealed interface ConnectionFunction<R> extends Function<Connection, R>, SqlProvider
permits DelegateConnectionFunction, ResultFunction {
private final String sql;
private final Function<Connection, R> function;
ConnectionFunction(String sql, Function<Connection, R> function) {
this.sql = sql;
this.function = function;
}
@Override
public R apply(Connection t) {
return this.function.apply(t);
}
@Override
public String getSql() {
return this.sql;
}
} }

7
spring-r2dbc/src/main/java/org/springframework/r2dbc/core/DatabaseClient.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2022 the original author or authors. * Copyright 2002-2023 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -83,7 +83,10 @@ public interface DatabaseClient extends ConnectionAccessor {
* the execution. The SQL string can contain either native parameter * the execution. The SQL string can contain either native parameter
* bind markers or named parameters (e.g. {@literal :foo, :bar}) when * bind markers or named parameters (e.g. {@literal :foo, :bar}) when
* {@link NamedParameterExpander} is enabled. * {@link NamedParameterExpander} is enabled.
* <p>Accepts {@link PreparedOperation} as SQL and binding {@link Supplier} * <p>Accepts {@link PreparedOperation} as SQL and binding {@link Supplier}.
* <p>{code DatabaseClient} implementations should defer the resolution of
* the SQL string as much as possible, ideally up to the point where a
* {@code Subscription} happens. This is the case for the default implementation.
* @param sqlSupplier a supplier for the SQL statement * @param sqlSupplier a supplier for the SQL statement
* @return a new {@link GenericExecuteSpec} * @return a new {@link GenericExecuteSpec}
* @see NamedParameterExpander * @see NamedParameterExpander

24
spring-r2dbc/src/main/java/org/springframework/r2dbc/core/DefaultDatabaseClient.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2022 the original author or authors. * Copyright 2002-2023 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -63,6 +63,7 @@ import org.springframework.util.StringUtils;
* @author Mark Paluch * @author Mark Paluch
* @author Mingyuan Wu * @author Mingyuan Wu
* @author Bogdan Ilchyshyn * @author Bogdan Ilchyshyn
* @author Simon Baslé
* @since 5.3 * @since 5.3
*/ */
class DefaultDatabaseClient implements DatabaseClient { class DefaultDatabaseClient implements DatabaseClient {
@ -346,8 +347,7 @@ class DefaultDatabaseClient implements DatabaseClient {
} }
private ResultFunction getResultFunction(Supplier<String> sqlSupplier) { private ResultFunction getResultFunction(Supplier<String> sqlSupplier) {
String sql = getRequiredSql(sqlSupplier); BiFunction<Connection, String, Statement> statementFunction = (connection, sql) -> {
Function<Connection, Statement> statementFunction = connection -> {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Executing SQL statement [" + sql + "]"); logger.debug("Executing SQL statement [" + sql + "]");
} }
@ -393,28 +393,22 @@ class DefaultDatabaseClient implements DatabaseClient {
return statement; return statement;
}; };
Function<Connection, Flux<Result>> resultFunction = connection -> { return new ResultFunction(sqlSupplier, statementFunction, this.filterFunction, DefaultDatabaseClient.this.executeFunction);
Statement statement = statementFunction.apply(connection);
return Flux.from(this.filterFunction.filter(statement, DefaultDatabaseClient.this.executeFunction))
.cast(Result.class).checkpoint("SQL \"" + sql + "\" [DatabaseClient]");
};
return new ResultFunction(resultFunction, sql);
} }
private <T> FetchSpec<T> execute(Supplier<String> sqlSupplier, Function<Result, Publisher<T>> resultAdapter) { private <T> FetchSpec<T> execute(Supplier<String> sqlSupplier, Function<Result, Publisher<T>> resultAdapter) {
ResultFunction resultHandler = getResultFunction(sqlSupplier); ResultFunction resultHandler = getResultFunction(sqlSupplier);
return new DefaultFetchSpec<>( return new DefaultFetchSpec<>(
DefaultDatabaseClient.this, resultHandler.sql(), DefaultDatabaseClient.this,
new ConnectionFunction<>(resultHandler.sql(), resultHandler.function()), resultHandler,
new ConnectionFunction<>(resultHandler.sql(), connection -> sumRowsUpdated(resultHandler.function(), connection)), connection -> sumRowsUpdated(resultHandler, connection),
resultAdapter); resultAdapter);
} }
private <T> Flux<T> flatMap(Supplier<String> sqlSupplier, Function<Result, Publisher<T>> mappingFunction) { private <T> Flux<T> flatMap(Supplier<String> sqlSupplier, Function<Result, Publisher<T>> mappingFunction) {
ResultFunction resultHandler = getResultFunction(sqlSupplier); ResultFunction resultHandler = getResultFunction(sqlSupplier);
ConnectionFunction<Flux<T>> connectionFunction = new ConnectionFunction<>(resultHandler.sql(), cx -> resultHandler.function() ConnectionFunction<Flux<T>> connectionFunction = new DelegateConnectionFunction<>(resultHandler, cx -> resultHandler
.apply(cx) .apply(cx)
.flatMap(mappingFunction)); .flatMap(mappingFunction));
return inConnectionMany(connectionFunction); return inConnectionMany(connectionFunction);
@ -473,8 +467,6 @@ class DefaultDatabaseClient implements DatabaseClient {
Assert.state(StringUtils.hasText(sql), "SQL returned by supplier must not be empty"); Assert.state(StringUtils.hasText(sql), "SQL returned by supplier must not be empty");
return sql; return sql;
} }
record ResultFunction(Function<Connection, Flux<Result>> function, String sql){}
} }

18
spring-r2dbc/src/main/java/org/springframework/r2dbc/core/DefaultFetchSpec.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2020 the original author or authors. * Copyright 2002-2023 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -30,6 +30,7 @@ import org.springframework.dao.IncorrectResultSizeDataAccessException;
* Default {@link FetchSpec} implementation. * Default {@link FetchSpec} implementation.
* *
* @author Mark Paluch * @author Mark Paluch
* @author Simon Baslé
* @since 5.3 * @since 5.3
* @param <T> the row result type * @param <T> the row result type
*/ */
@ -37,24 +38,21 @@ class DefaultFetchSpec<T> implements FetchSpec<T> {
private final ConnectionAccessor connectionAccessor; private final ConnectionAccessor connectionAccessor;
private final String sql; private final ResultFunction resultFunction;
private final Function<Connection, Flux<Result>> resultFunction;
private final Function<Connection, Mono<Long>> updatedRowsFunction; private final Function<Connection, Mono<Long>> updatedRowsFunction;
private final Function<Result, Publisher<T>> resultAdapter; private final Function<Result, Publisher<T>> resultAdapter;
DefaultFetchSpec(ConnectionAccessor connectionAccessor, String sql, DefaultFetchSpec(ConnectionAccessor connectionAccessor,
Function<Connection, Flux<Result>> resultFunction, ResultFunction resultFunction,
Function<Connection, Mono<Long>> updatedRowsFunction, Function<Connection, Mono<Long>> updatedRowsFunction,
Function<Result, Publisher<T>> resultAdapter) { Function<Result, Publisher<T>> resultAdapter) {
this.sql = sql;
this.connectionAccessor = connectionAccessor; this.connectionAccessor = connectionAccessor;
this.resultFunction = resultFunction; this.resultFunction = resultFunction;
this.updatedRowsFunction = updatedRowsFunction; this.updatedRowsFunction = new DelegateConnectionFunction<>(resultFunction, updatedRowsFunction);
this.resultAdapter = resultAdapter; this.resultAdapter = resultAdapter;
} }
@ -68,7 +66,7 @@ class DefaultFetchSpec<T> implements FetchSpec<T> {
} }
if (list.size() > 1) { if (list.size() > 1) {
return Mono.error(new IncorrectResultSizeDataAccessException( return Mono.error(new IncorrectResultSizeDataAccessException(
String.format("Query [%s] returned non unique result.", this.sql), String.format("Query [%s] returned non unique result.", this.resultFunction.getSql()),
1)); 1));
} }
return Mono.just(list.get(0)); return Mono.just(list.get(0));
@ -82,7 +80,7 @@ class DefaultFetchSpec<T> implements FetchSpec<T> {
@Override @Override
public Flux<T> all() { public Flux<T> all() {
return this.connectionAccessor.inConnectionMany(new ConnectionFunction<>(this.sql, return this.connectionAccessor.inConnectionMany(new DelegateConnectionFunction<>(this.resultFunction,
connection -> this.resultFunction.apply(connection) connection -> this.resultFunction.apply(connection)
.flatMap(this.resultAdapter))); .flatMap(this.resultAdapter)));
} }

56
spring-r2dbc/src/main/java/org/springframework/r2dbc/core/DelegateConnectionFunction.java

@ -0,0 +1,56 @@
/*
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.r2dbc.core;
import java.util.function.Function;
import io.r2dbc.spi.Connection;
import org.springframework.lang.Nullable;
/**
* A {@link ConnectionFunction} that delegates to a {@code SqlProvider} and a plain
* {@code Function}.
*
* @author Simon Baslé
* @since 5.3.26
* @param <R> the type of the result of the function.
*/
final class DelegateConnectionFunction<R> implements ConnectionFunction<R> {
private final SqlProvider sql;
private final Function<Connection, R> function;
DelegateConnectionFunction(SqlProvider sql, Function<Connection, R> function) {
this.sql = sql;
this.function = function;
}
@Override
public R apply(Connection t) {
return this.function.apply(t);
}
@Nullable
@Override
public String getSql() {
return this.sql.getSql();
}
}

74
spring-r2dbc/src/main/java/org/springframework/r2dbc/core/ResultFunction.java

@ -0,0 +1,74 @@
/*
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.r2dbc.core;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Statement;
import reactor.core.publisher.Flux;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
* A {@link ConnectionFunction} that produces a {@code Flux} of {@link Result} and that
* defers generation of the SQL until the function has been applied.
* Beforehand, the {@code getSql()} method simply returns {@code null}. The sql String is
* also memoized during application, so that subsequent calls to {@link #getSql()} return
* the same {@code String} without further calls to the {@code Supplier}.
*
* @author Mark Paluch
* @author Simon Baslé
* @since 5.3.26
*/
final class ResultFunction implements ConnectionFunction<Flux<Result>> {
final Supplier<String> sqlSupplier;
final BiFunction<Connection, String, Statement> statementFunction;
final StatementFilterFunction filterFunction;
final ExecuteFunction executeFunction;
@Nullable
String resolvedSql = null;
ResultFunction(Supplier<String> sqlSupplier, BiFunction<Connection, String, Statement> statementFunction, StatementFilterFunction filterFunction, ExecuteFunction executeFunction) {
this.sqlSupplier = sqlSupplier;
this.statementFunction = statementFunction;
this.filterFunction = filterFunction;
this.executeFunction = executeFunction;
}
@Override
public Flux<Result> apply(Connection connection) {
String sql = this.sqlSupplier.get();
Assert.state(StringUtils.hasText(sql), "SQL returned by supplier must not be empty");
this.resolvedSql = sql;
Statement statement = this.statementFunction.apply(connection, sql);
return Flux.from(this.filterFunction.filter(statement, this.executeFunction))
.cast(Result.class).checkpoint("SQL \"" + sql + "\" [DatabaseClient]");
}
@Nullable
@Override
public String getSql() {
return this.resolvedSql;
}
}

44
spring-r2dbc/src/test/java/org/springframework/r2dbc/core/DefaultDatabaseClientUnitTests.java

@ -17,6 +17,8 @@
package org.springframework.r2dbc.core; package org.springframework.r2dbc.core;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import io.r2dbc.spi.Connection; import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory; import io.r2dbc.spi.ConnectionFactory;
@ -63,6 +65,7 @@ import static org.mockito.BDDMockito.when;
* @author Mark Paluch * @author Mark Paluch
* @author Ferdinand Jacobs * @author Ferdinand Jacobs
* @author Jens Schauder * @author Jens Schauder
* @author Simon Baslé
*/ */
@MockitoSettings(strictness = Strictness.LENIENT) @MockitoSettings(strictness = Strictness.LENIENT)
class DefaultDatabaseClientUnitTests { class DefaultDatabaseClientUnitTests {
@ -385,6 +388,47 @@ class DefaultDatabaseClientUnitTests {
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
} }
@Test
void sqlSupplierInvocationIsDeferredUntilSubscription() {
// We'll have either 2 or 3 rows, depending on the subscription and the generated SQL
MockRowMetadata metadata = MockRowMetadata.builder().columnMetadata(
MockColumnMetadata.builder().name("id").javaType(Integer.class).build()).build();
final MockRow row1 = MockRow.builder().metadata(metadata)
.identified("id", Integer.class, 1).build();
final MockRow row2 = MockRow.builder().metadata(metadata)
.identified("id", Integer.class, 2).build();
final MockRow row3 = MockRow.builder().metadata(metadata)
.identified("id", Integer.class, 3).build();
// Set up 2 mock statements
mockStatementFor("SELECT id FROM test WHERE id < '3'", MockResult.builder()
.row(row1, row2).build());
mockStatementFor("SELECT id FROM test WHERE id < '4'", MockResult.builder()
.row(row1, row2, row3).build());
// Create the client
DatabaseClient databaseClient = this.databaseClientBuilder.build();
AtomicInteger invoked = new AtomicInteger();
// Assemble a publisher, but don't subscribe yet
Mono<List<Integer>> operation = databaseClient
.sql(() -> {
int idMax = 2 + invoked.incrementAndGet();
return String.format("SELECT id FROM test WHERE id < '%s'", idMax);
})
.map(r -> r.get("id", Integer.class))
.all()
.collectList();
assertThat(invoked).as("invoked (before subscription)").hasValue(0);
List<Integer> rows = operation.block();
assertThat(invoked).as("invoked (after 1st subscription)").hasValue(1);
assertThat(rows).containsExactly(1, 2);
rows = operation.block();
assertThat(invoked).as("invoked (after 2nd subscription)").hasValue(2);
assertThat(rows).containsExactly(1, 2, 3);
}
private Statement mockStatement() { private Statement mockStatement() {
return mockStatementFor(null, null); return mockStatementFor(null, null);

Loading…
Cancel
Save