|
|
|
|
@ -24,11 +24,13 @@ import java.util.function.Function;
@@ -24,11 +24,13 @@ import java.util.function.Function;
|
|
|
|
|
import io.r2dbc.pool.ConnectionPool; |
|
|
|
|
import io.r2dbc.pool.ConnectionPoolConfiguration; |
|
|
|
|
import io.r2dbc.pool.PoolingConnectionFactoryProvider; |
|
|
|
|
import io.r2dbc.spi.Connection; |
|
|
|
|
import io.r2dbc.spi.ConnectionFactories; |
|
|
|
|
import io.r2dbc.spi.ConnectionFactory; |
|
|
|
|
import io.r2dbc.spi.ConnectionFactoryOptions; |
|
|
|
|
import io.r2dbc.spi.ConnectionFactoryOptions.Builder; |
|
|
|
|
import io.r2dbc.spi.ValidationDepth; |
|
|
|
|
import org.reactivestreams.Publisher; |
|
|
|
|
|
|
|
|
|
import org.springframework.boot.context.properties.PropertyMapper; |
|
|
|
|
import org.springframework.util.Assert; |
|
|
|
|
@ -209,7 +211,7 @@ public final class ConnectionFactoryBuilder {
@@ -209,7 +211,7 @@ public final class ConnectionFactoryBuilder {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private ConnectionFactoryOptions delegateFactoryOptions(ConnectionFactoryOptions options) { |
|
|
|
|
String protocol = options.getRequiredValue(ConnectionFactoryOptions.PROTOCOL); |
|
|
|
|
String protocol = toString(options.getRequiredValue(ConnectionFactoryOptions.PROTOCOL)); |
|
|
|
|
if (protocol.trim().length() == 0) { |
|
|
|
|
throw new IllegalArgumentException(String.format("Protocol %s is not valid.", protocol)); |
|
|
|
|
} |
|
|
|
|
@ -221,35 +223,45 @@ public final class ConnectionFactoryBuilder {
@@ -221,35 +223,45 @@ public final class ConnectionFactoryBuilder {
|
|
|
|
|
.option(ConnectionFactoryOptions.PROTOCOL, protocolDelegate).build(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
|
ConnectionPoolConfiguration connectionPoolConfiguration(ConnectionFactoryOptions options, |
|
|
|
|
ConnectionFactory connectionFactory) { |
|
|
|
|
ConnectionPoolConfiguration.Builder builder = ConnectionPoolConfiguration.builder(connectionFactory); |
|
|
|
|
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); |
|
|
|
|
map.from((Object) options.getValue(PoolingConnectionFactoryProvider.BACKGROUND_EVICTION_INTERVAL)) |
|
|
|
|
map.from(options.getValue(PoolingConnectionFactoryProvider.BACKGROUND_EVICTION_INTERVAL)) |
|
|
|
|
.as(this::toDuration).to(builder::backgroundEvictionInterval); |
|
|
|
|
map.from((Object) options.getValue(PoolingConnectionFactoryProvider.INITIAL_SIZE)).as(this::toInteger) |
|
|
|
|
map.from(options.getValue(PoolingConnectionFactoryProvider.INITIAL_SIZE)).as(this::toInteger) |
|
|
|
|
.to(builder::initialSize); |
|
|
|
|
map.from((Object) options.getValue(PoolingConnectionFactoryProvider.MAX_SIZE)).as(this::toInteger) |
|
|
|
|
map.from(options.getValue(PoolingConnectionFactoryProvider.MAX_SIZE)).as(this::toInteger) |
|
|
|
|
.to(builder::maxSize); |
|
|
|
|
map.from((Object) options.getValue(PoolingConnectionFactoryProvider.ACQUIRE_RETRY)).as(this::toInteger) |
|
|
|
|
map.from(options.getValue(PoolingConnectionFactoryProvider.ACQUIRE_RETRY)).as(this::toInteger) |
|
|
|
|
.to(builder::acquireRetry); |
|
|
|
|
map.from((Object) options.getValue(PoolingConnectionFactoryProvider.MAX_LIFE_TIME)).as(this::toDuration) |
|
|
|
|
map.from(options.getValue(PoolingConnectionFactoryProvider.MAX_LIFE_TIME)).as(this::toDuration) |
|
|
|
|
.to(builder::maxLifeTime); |
|
|
|
|
map.from((Object) options.getValue(PoolingConnectionFactoryProvider.MAX_ACQUIRE_TIME)).as(this::toDuration) |
|
|
|
|
map.from(options.getValue(PoolingConnectionFactoryProvider.MAX_ACQUIRE_TIME)).as(this::toDuration) |
|
|
|
|
.to(builder::maxAcquireTime); |
|
|
|
|
map.from((Object) options.getValue(PoolingConnectionFactoryProvider.MAX_IDLE_TIME)).as(this::toDuration) |
|
|
|
|
map.from(options.getValue(PoolingConnectionFactoryProvider.MAX_IDLE_TIME)).as(this::toDuration) |
|
|
|
|
.to(builder::maxIdleTime); |
|
|
|
|
map.from((Object) options.getValue(PoolingConnectionFactoryProvider.MAX_CREATE_CONNECTION_TIME)) |
|
|
|
|
.as(this::toDuration).to(builder::maxCreateConnectionTime); |
|
|
|
|
map.from(options.getValue(PoolingConnectionFactoryProvider.POOL_NAME)).to(builder::name); |
|
|
|
|
map.from((Object) options.getValue(PoolingConnectionFactoryProvider.REGISTER_JMX)).as(this::toBoolean) |
|
|
|
|
map.from(options.getValue(PoolingConnectionFactoryProvider.MAX_CREATE_CONNECTION_TIME)).as(this::toDuration) |
|
|
|
|
.to(builder::maxCreateConnectionTime); |
|
|
|
|
map.from(options.getValue(PoolingConnectionFactoryProvider.POOL_NAME)).as(this::toString).to(builder::name); |
|
|
|
|
map.from(options.getValue(PoolingConnectionFactoryProvider.PRE_RELEASE)).to((function) -> builder |
|
|
|
|
.preRelease((Function<? super Connection, ? extends Publisher<Void>>) function)); |
|
|
|
|
map.from(options.getValue(PoolingConnectionFactoryProvider.POST_ALLOCATE)).to((function) -> builder |
|
|
|
|
.postAllocate((Function<? super Connection, ? extends Publisher<Void>>) function)); |
|
|
|
|
map.from(options.getValue(PoolingConnectionFactoryProvider.REGISTER_JMX)).as(this::toBoolean) |
|
|
|
|
.to(builder::registerJmx); |
|
|
|
|
map.from(options.getValue(PoolingConnectionFactoryProvider.VALIDATION_QUERY)).to(builder::validationQuery); |
|
|
|
|
map.from((Object) options.getValue(PoolingConnectionFactoryProvider.VALIDATION_DEPTH)) |
|
|
|
|
.as(this::toValidationDepth).to(builder::validationDepth); |
|
|
|
|
map.from(options.getValue(PoolingConnectionFactoryProvider.VALIDATION_QUERY)).as(this::toString) |
|
|
|
|
.to(builder::validationQuery); |
|
|
|
|
map.from(options.getValue(PoolingConnectionFactoryProvider.VALIDATION_DEPTH)).as(this::toValidationDepth) |
|
|
|
|
.to(builder::validationDepth); |
|
|
|
|
return builder.build(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private String toString(Object object) { |
|
|
|
|
return toType(String.class, object, String::valueOf); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private Integer toInteger(Object object) { |
|
|
|
|
return toType(Integer.class, object, Integer::valueOf); |
|
|
|
|
} |
|
|
|
|
|