Browse Source

#2 - Refactor to functional usage of resources.

DatabaseClient now creates functions. The actual invocation/execution takes place from DefaultFetchFunctions (FetchSpec) to keep stateful resources in the scope of the execution. execute()/executeMany() use Flux.usingWhen/Mono.usingWhen to release connections after their usage.
pull/1188/head
Mark Paluch 8 years ago
parent
commit
b8375f5a96
  1. 265
      src/main/java/org/springframework/data/jdbc/core/function/DefaultDatabaseClient.java
  2. 19
      src/test/java/org/springframework/data/jdbc/core/function/DatabaseClientIntegrationTests.java

265
src/main/java/org/springframework/data/jdbc/core/function/DefaultDatabaseClient.java

@ -102,25 +102,75 @@ class DefaultDatabaseClient implements DatabaseClient {
return new DefaultInsertIntoSpec(); return new DefaultInsertIntoSpec();
} }
public <T> Flux<T> execute(Function<Connection, Flux<T>> action) throws DataAccessException { /**
* Execute a callback {@link Function} within a {@link Connection} scope. The function is responsible for creating a
* {@link Flux}. The connection is released after the {@link Flux} terminates (or the subscription is cancelled).
* Connection resources must not be passed outside of the {@link Function} closure, otherwise resources may get
* defunct.
*
* @param action must not be {@literal null}.
* @return the resulting {@link Flux}.
* @throws DataAccessException
*/
public <T> Flux<T> executeMany(Function<Connection, Flux<T>> action) throws DataAccessException {
Assert.notNull(action, "Callback object must not be null"); Assert.notNull(action, "Callback object must not be null");
Mono<Connection> connectionMono = Mono.from(obtainConnectionFactory().create()); Mono<Connection> connectionMono = getConnection();
// Create close-suppressing Connection proxy, also preparing returned Statements. // Create close-suppressing Connection proxy, also preparing returned Statements.
return connectionMono.flatMapMany(connection -> { return Flux.usingWhen(connectionMono, it -> {
Connection connectionToUse = createConnectionProxy(connection); Connection connectionToUse = createConnectionProxy(it);
// TODO: Release connection return doInConnectionMany(connectionToUse, action);
return doInConnection(action, connectionToUse); }, this::closeConnection, this::closeConnection, this::closeConnection) //
.onErrorMap(SQLException.class, ex -> translateException("executeMany", getSql(action), ex));
}
}).onErrorMap(SQLException.class, ex -> { /**
* Execute a callback {@link Function} within a {@link Connection} scope. The function is responsible for creating a
* {@link Mono}. The connection is released after the {@link Mono} terminates (or the subscription is cancelled).
* Connection resources must not be passed outside of the {@link Function} closure, otherwise resources may get
* defunct.
*
* @param action must not be {@literal null}.
* @return the resulting {@link Mono}.
* @throws DataAccessException
*/
public <T> Mono<T> execute(Function<Connection, Mono<T>> action) throws DataAccessException {
String sql = getSql(action); Assert.notNull(action, "Callback object must not be null");
return translateException("ConnectionCallback", sql, ex);
}); Mono<Connection> connectionMono = getConnection();
// Create close-suppressing Connection proxy, also preparing returned Statements.
return Mono.usingWhen(connectionMono, it -> {
Connection connectionToUse = createConnectionProxy(it);
return doInConnection(connectionToUse, action);
}, this::closeConnection, this::closeConnection, this::closeConnection) //
.onErrorMap(SQLException.class, ex -> translateException("execute", getSql(action), ex));
}
/**
* Obtain a {@link Connection}.
*
* @return
*/
protected Mono<Connection> getConnection() {
return Mono.from(obtainConnectionFactory().create());
}
/**
* Release the {@link Connection}.
*
* @param connection
* @return
*/
protected Publisher<Void> closeConnection(Connection connection) {
return connection.close();
} }
/** /**
@ -153,11 +203,12 @@ class DefaultDatabaseClient implements DatabaseClient {
* @return a DataAccessException wrapping the {@code SQLException} (never {@code null}) * @return a DataAccessException wrapping the {@code SQLException} (never {@code null})
*/ */
protected DataAccessException translateException(String task, @Nullable String sql, SQLException ex) { protected DataAccessException translateException(String task, @Nullable String sql, SQLException ex) {
DataAccessException dae = exceptionTranslator.translate(task, sql, ex); DataAccessException dae = exceptionTranslator.translate(task, sql, ex);
return (dae != null ? dae : new UncategorizedSQLException(task, sql, ex)); return (dae != null ? dae : new UncategorizedSQLException(task, sql, ex));
} }
static void doBind(Statement statement, Map<String, Optional<Object>> byName, private static void doBind(Statement statement, Map<String, Optional<Object>> byName,
Map<Integer, Optional<Object>> byIndex) { Map<Integer, Optional<Object>> byIndex) {
byIndex.forEach((i, o) -> { byIndex.forEach((i, o) -> {
@ -225,16 +276,26 @@ class DefaultDatabaseClient implements DatabaseClient {
return sql; return sql;
} }
<T> Mono<SqlResult<T>> exchange(String sql, BiFunction<Row, RowMetadata, T> mappingFunction) { protected <T> SqlResult<T> exchange(String sql, BiFunction<Row, RowMetadata, T> mappingFunction) {
return execute(it -> { Function<Connection, Statement> executeFunction = it -> {
if (logger.isDebugEnabled()) {
logger.debug("Executing SQL statement [" + sql + "]");
}
Statement statement = it.createStatement(sql); Statement statement = it.createStatement(sql);
doBind(statement, byName, byIndex); doBind(statement, byName, byIndex);
return Flux return statement;
.just((SqlResult<T>) new DefaultSqlResult<>(sql, Flux.from(statement.add().execute()), mappingFunction)); };
}).next();
Function<Connection, Flux<Result>> resultFunction = it -> Flux.from(executeFunction.apply(it).execute());
return new DefaultSqlResultFunctions<>(sql, //
resultFunction, //
it -> resultFunction.apply(it).flatMap(Result::getRowsUpdated).next(), //
mappingFunction);
} }
public GenericExecuteSpecSupport bind(int index, Object value) { public GenericExecuteSpecSupport bind(int index, Object value) {
@ -310,15 +371,12 @@ class DefaultDatabaseClient implements DatabaseClient {
@Override @Override
public FetchSpec<Map<String, Object>> fetch() { public FetchSpec<Map<String, Object>> fetch() {
return exchange(getSql(), ColumnMapRowMapper.INSTANCE);
String sql = getSql();
return new DefaultFetchSpec<>(sql, exchange(sql, ColumnMapRowMapper.INSTANCE).flatMapMany(SqlResult::all),
exchange(sql, ColumnMapRowMapper.INSTANCE).flatMap(FetchSpec::rowsUpdated));
} }
@Override @Override
public Mono<SqlResult<Map<String, Object>>> exchange() { public Mono<SqlResult<Map<String, Object>>> exchange() {
return exchange(getSql(), ColumnMapRowMapper.INSTANCE); return Mono.just(exchange(getSql(), ColumnMapRowMapper.INSTANCE));
} }
@Override @Override
@ -381,14 +439,12 @@ class DefaultDatabaseClient implements DatabaseClient {
@Override @Override
public FetchSpec<T> fetch() { public FetchSpec<T> fetch() {
String sql = getSql(); return exchange(getSql(), mappingFunction);
return new DefaultFetchSpec<>(sql, exchange(sql, mappingFunction).flatMapMany(SqlResult::all),
exchange(sql, mappingFunction).flatMap(FetchSpec::rowsUpdated));
} }
@Override @Override
public Mono<SqlResult<T>> exchange() { public Mono<SqlResult<T>> exchange() {
return exchange(getSql(), mappingFunction); return Mono.just(exchange(getSql(), mappingFunction));
} }
@Override @Override
@ -472,11 +528,15 @@ class DefaultDatabaseClient implements DatabaseClient {
@Override @Override
public Mono<Void> then() { public Mono<Void> then() {
return exchange().flatMapMany(FetchSpec::all).then(); return exchange((row, md) -> row).all().then();
} }
@Override @Override
public Mono<SqlResult<Map<String, Object>>> exchange() { public Mono<SqlResult<Map<String, Object>>> exchange() {
return Mono.just(exchange(ColumnMapRowMapper.INSTANCE));
}
private <T> SqlResult<T> exchange(BiFunction<Row, RowMetadata, T> mappingFunction) {
if (byName.isEmpty()) { if (byName.isEmpty()) {
throw new IllegalStateException("Insert fields is empty!"); throw new IllegalStateException("Insert fields is empty!");
@ -490,26 +550,43 @@ class DefaultDatabaseClient implements DatabaseClient {
builder.append("INSERT INTO ").append(table).append(" (").append(fieldNames).append(") ").append(" VALUES(") builder.append("INSERT INTO ").append(table).append(" (").append(fieldNames).append(") ").append(" VALUES(")
.append(placeholders).append(")"); .append(placeholders).append(")");
return execute(it -> { String sql = builder.toString();
Function<Connection, Statement> insertFunction = it -> {
String sql = builder.toString(); if (logger.isDebugEnabled()) {
logger.debug("Executing SQL statement [" + sql + "]");
}
Statement statement = it.createStatement(sql); Statement statement = it.createStatement(sql);
doBind(statement);
return statement;
};
AtomicInteger index = new AtomicInteger(); Function<Connection, Flux<Result>> resultFunction = it -> Flux
for (Optional<Object> o : byName.values()) { .from(insertFunction.apply(it).executeReturningGeneratedKeys());
if (o.isPresent()) { return new DefaultSqlResultFunctions<>(sql, //
o.ifPresent(v -> statement.bind(index.getAndIncrement(), v)); resultFunction, //
} else { it -> resultFunction.apply(it).flatMap(Result::getRowsUpdated).next(), //
statement.bindNull("$" + (index.getAndIncrement() + 1), 0); // TODO: What is type? mappingFunction);
} }
}
/**
* PostgreSQL-specific bind.
*
* @param statement
*/
private void doBind(Statement statement) {
SqlResult<Map<String, Object>> result = new DefaultSqlResult<>(sql, AtomicInteger index = new AtomicInteger();
Flux.from(statement.executeReturningGeneratedKeys()), ColumnMapRowMapper.INSTANCE);
return Flux.just(result);
}).next(); for (Optional<Object> o : byName.values()) {
if (o.isPresent()) {
o.ifPresent(v -> statement.bind(index.getAndIncrement(), v));
} else {
statement.bindNull("$" + (index.getAndIncrement() + 1), 0); // TODO: What is type?
}
}
} }
} }
@ -523,7 +600,7 @@ class DefaultDatabaseClient implements DatabaseClient {
private final String table; private final String table;
private final Publisher<T> objectToInsert; private final Publisher<T> objectToInsert;
public DefaultTypedInsertSpec(Class<?> typeToInsert) { DefaultTypedInsertSpec(Class<?> typeToInsert) {
this.typeToInsert = typeToInsert; this.typeToInsert = typeToInsert;
this.table = dataAccessStrategy.getTableName(typeToInsert); this.table = dataAccessStrategy.getTableName(typeToInsert);
@ -556,69 +633,84 @@ class DefaultDatabaseClient implements DatabaseClient {
@Override @Override
public Mono<Void> then() { public Mono<Void> then() {
return exchange().flatMapMany(FetchSpec::all).then(); return Mono.from(objectToInsert).map(toInsert -> exchange(toInsert, (row, md) -> row).all()).then();
} }
@Override @Override
public Mono<SqlResult<Map<String, Object>>> exchange() { public Mono<SqlResult<Map<String, Object>>> exchange() {
return Mono.from(objectToInsert).map(toInsert -> exchange(toInsert, ColumnMapRowMapper.INSTANCE));
}
return Mono.from(objectToInsert).flatMap(toInsert -> { private <R> SqlResult<R> exchange(Object toInsert, BiFunction<Row, RowMetadata, R> mappingFunction) {
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
List<Pair<String, Object>> insertValues = dataAccessStrategy.getInsert(toInsert); List<Pair<String, Object>> insertValues = dataAccessStrategy.getInsert(toInsert);
String fieldNames = insertValues.stream().map(Pair::getFirst).collect(Collectors.joining(",")); String fieldNames = insertValues.stream().map(Pair::getFirst).collect(Collectors.joining(","));
String placeholders = IntStream.range(0, insertValues.size()).mapToObj(i -> "$" + (i + 1)) String placeholders = IntStream.range(0, insertValues.size()).mapToObj(i -> "$" + (i + 1))
.collect(Collectors.joining(",")); .collect(Collectors.joining(","));
builder.append("INSERT INTO ").append(table).append(" (").append(fieldNames).append(") ").append(" VALUES(") builder.append("INSERT INTO ").append(table).append(" (").append(fieldNames).append(") ").append(" VALUES(")
.append(placeholders).append(")"); .append(placeholders).append(")");
return execute(it -> { String sql = builder.toString();
String sql = builder.toString(); Function<Connection, Statement> insertFunction = it -> {
Statement statement = it.createStatement(sql);
AtomicInteger index = new AtomicInteger(); if (logger.isDebugEnabled()) {
logger.debug("Executing SQL statement [" + sql + "]");
}
for (Pair<String, Object> pair : insertValues) { Statement statement = it.createStatement(sql);
if (pair.getSecond() != null) { // TODO: Better type to transport null values. AtomicInteger index = new AtomicInteger();
statement.bind(index.getAndIncrement(), pair.getSecond());
} else { for (Pair<String, Object> pair : insertValues) {
statement.bindNull("$" + (index.getAndIncrement() + 1), 0); // TODO: What is type?
} if (pair.getSecond() != null) { // TODO: Better type to transport null values.
statement.bind(index.getAndIncrement(), pair.getSecond());
} else {
statement.bindNull("$" + (index.getAndIncrement() + 1), 0); // TODO: What is type?
} }
}
return statement;
};
SqlResult<Map<String, Object>> result = new DefaultSqlResult<>(sql, Function<Connection, Flux<Result>> resultFunction = it -> Flux
Flux.from(statement.executeReturningGeneratedKeys()), ColumnMapRowMapper.INSTANCE); .from(insertFunction.apply(it).executeReturningGeneratedKeys());
return Flux.just(result);
}).next(); return new DefaultSqlResultFunctions<>(sql, //
}); resultFunction, //
it -> resultFunction.apply(it).flatMap(Result::getRowsUpdated).next(), //
mappingFunction);
} }
} }
/** /**
* Default {@link org.springframework.data.jdbc.core.function.DatabaseClient.SqlResult} implementation. * Default {@link org.springframework.data.jdbc.core.function.SqlResult} implementation.
*/ */
static class DefaultSqlResult<T> implements SqlResult<T> { class DefaultSqlResultFunctions<T> implements SqlResult<T> {
private final String sql; private final String sql;
private final Flux<Result> result; private final Function<Connection, Flux<Result>> resultFunction;
private final Function<Connection, Mono<Integer>> updatedRowsFunction;
private final FetchSpec<T> fetchSpec; private final FetchSpec<T> fetchSpec;
DefaultSqlResult(String sql, Flux<Result> result, BiFunction<Row, RowMetadata, T> mappingFunction) { DefaultSqlResultFunctions(String sql, Function<Connection, Flux<Result>> resultFunction,
Function<Connection, Mono<Integer>> updatedRowsFunction, BiFunction<Row, RowMetadata, T> mappingFunction) {
this.sql = sql; this.sql = sql;
this.result = result; this.resultFunction = resultFunction;
this.fetchSpec = new DefaultFetchSpec<>(sql, result.flatMap(it -> it.map(mappingFunction)), this.updatedRowsFunction = updatedRowsFunction;
result.flatMap(Result::getRowsUpdated).next());
this.fetchSpec = new DefaultFetchFunctions<>(sql,
it -> resultFunction.apply(it).flatMap(result -> result.map(mappingFunction)), updatedRowsFunction);
} }
@Override @Override
public <R> SqlResult<R> extract(BiFunction<Row, RowMetadata, R> mappingFunction) { public <R> SqlResult<R> extract(BiFunction<Row, RowMetadata, R> mappingFunction) {
return new DefaultSqlResult<>(sql, result, mappingFunction); return new DefaultSqlResultFunctions<>(sql, resultFunction, updatedRowsFunction, mappingFunction);
} }
@Override @Override
@ -643,11 +735,11 @@ class DefaultDatabaseClient implements DatabaseClient {
} }
@RequiredArgsConstructor @RequiredArgsConstructor
static class DefaultFetchSpec<T> implements FetchSpec<T> { class DefaultFetchFunctions<T> implements FetchSpec<T> {
private final String sql; private final String sql;
private final Flux<T> result; private final Function<Connection, Flux<T>> resultFunction;
private final Mono<Integer> updatedRows; private final Function<Connection, Mono<Integer>> updatedRowsFunction;
@Override @Override
public Mono<T> one() { public Mono<T> one() {
@ -675,19 +767,19 @@ class DefaultDatabaseClient implements DatabaseClient {
@Override @Override
public Flux<T> all() { public Flux<T> all() {
return result; return executeMany(resultFunction);
} }
@Override @Override
public Mono<Integer> rowsUpdated() { public Mono<Integer> rowsUpdated() {
return updatedRows; return execute(updatedRowsFunction);
} }
} }
private static <T> Flux<T> doInConnection(Function<Connection, Flux<T>> action, Connection it) { private static <T> Flux<T> doInConnectionMany(Connection connection, Function<Connection, Flux<T>> action) {
try { try {
return action.apply(it); return action.apply(connection);
} catch (RuntimeException e) { } catch (RuntimeException e) {
String sql = getSql(action); String sql = getSql(action);
@ -695,6 +787,17 @@ class DefaultDatabaseClient implements DatabaseClient {
} }
} }
private static <T> Mono<T> doInConnection(Connection connection, Function<Connection, Mono<T>> action) {
try {
return action.apply(connection);
} catch (RuntimeException e) {
String sql = getSql(action);
return Mono.error(new DefaultDatabaseClient.UncategorizedSQLException("ConnectionCallback", sql, e) {});
}
}
/** /**
* Determine SQL from potential provider object. * Determine SQL from potential provider object.
* *
@ -764,7 +867,7 @@ class DefaultDatabaseClient implements DatabaseClient {
} }
} }
private static class UncategorizedSQLException extends UncategorizedDataAccessException { private static class UncategorizedSQLException extends UncategorizedDataAccessException implements SqlProvider {
/** SQL that led to the problem */ /** SQL that led to the problem */
@Nullable private final String sql; @Nullable private final String sql;

19
src/test/java/org/springframework/data/jdbc/core/function/DatabaseClientIntegrationTests.java

@ -121,12 +121,29 @@ public class DatabaseClientIntegrationTests {
.value("name", "SCHAUFELRADBAGGER") // .value("name", "SCHAUFELRADBAGGER") //
.nullValue("manual") // .nullValue("manual") //
.exchange() // .exchange() //
.flatMapMany(it -> it.extract((r, m) -> r.get("id", Integer.class)).all()).as(StepVerifier::create) // .flatMapMany(it -> it.extract((r, m) -> r.get("id", Integer.class)).all()) //
.as(StepVerifier::create) //
.expectNext(42055).verifyComplete(); .expectNext(42055).verifyComplete();
assertThat(jdbc.queryForMap("SELECT id, name, manual FROM legoset")).containsEntry("id", 42055); assertThat(jdbc.queryForMap("SELECT id, name, manual FROM legoset")).containsEntry("id", 42055);
} }
@Test
public void insertWithoutResult() {
DatabaseClient databaseClient = DatabaseClient.create(connectionFactory);
databaseClient.insert().into("legoset")//
.value("id", 42055) //
.value("name", "SCHAUFELRADBAGGER") //
.nullValue("manual") //
.then() //
.as(StepVerifier::create) //
.verifyComplete();
assertThat(jdbc.queryForMap("SELECT id, name, manual FROM legoset")).containsEntry("id", 42055);
}
@Test @Test
public void insertTypedObject() { public void insertTypedObject() {

Loading…
Cancel
Save