|
|
|
@ -15,9 +15,12 @@ |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
package org.springframework.data.r2dbc.core; |
|
|
|
package org.springframework.data.r2dbc.core; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import static io.netty.buffer.ByteBufUtil.*; |
|
|
|
import static org.assertj.core.api.Assertions.*; |
|
|
|
import static org.assertj.core.api.Assertions.*; |
|
|
|
import static org.springframework.data.relational.core.query.Criteria.*; |
|
|
|
import static org.springframework.data.relational.core.query.Criteria.*; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import io.netty.buffer.ByteBufUtil; |
|
|
|
|
|
|
|
import io.netty.buffer.Unpooled; |
|
|
|
import io.r2dbc.postgresql.PostgresqlConnectionConfiguration; |
|
|
|
import io.r2dbc.postgresql.PostgresqlConnectionConfiguration; |
|
|
|
import io.r2dbc.postgresql.PostgresqlConnectionFactory; |
|
|
|
import io.r2dbc.postgresql.PostgresqlConnectionFactory; |
|
|
|
import io.r2dbc.postgresql.codec.Box; |
|
|
|
import io.r2dbc.postgresql.codec.Box; |
|
|
|
@ -30,21 +33,26 @@ import io.r2dbc.postgresql.codec.Path; |
|
|
|
import io.r2dbc.postgresql.codec.Point; |
|
|
|
import io.r2dbc.postgresql.codec.Point; |
|
|
|
import io.r2dbc.postgresql.codec.Polygon; |
|
|
|
import io.r2dbc.postgresql.codec.Polygon; |
|
|
|
import io.r2dbc.postgresql.extension.CodecRegistrar; |
|
|
|
import io.r2dbc.postgresql.extension.CodecRegistrar; |
|
|
|
|
|
|
|
import io.r2dbc.spi.Blob; |
|
|
|
import io.r2dbc.spi.ConnectionFactory; |
|
|
|
import io.r2dbc.spi.ConnectionFactory; |
|
|
|
import lombok.AllArgsConstructor; |
|
|
|
import lombok.AllArgsConstructor; |
|
|
|
import lombok.Data; |
|
|
|
import lombok.Data; |
|
|
|
|
|
|
|
import reactor.core.publisher.Flux; |
|
|
|
|
|
|
|
import reactor.core.publisher.Mono; |
|
|
|
import reactor.test.StepVerifier; |
|
|
|
import reactor.test.StepVerifier; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import java.nio.ByteBuffer; |
|
|
|
|
|
|
|
import java.nio.charset.StandardCharsets; |
|
|
|
import java.time.Duration; |
|
|
|
import java.time.Duration; |
|
|
|
import java.util.Collections; |
|
|
|
import java.util.Collections; |
|
|
|
import java.util.List; |
|
|
|
import java.util.List; |
|
|
|
|
|
|
|
import java.util.concurrent.CompletableFuture; |
|
|
|
|
|
|
|
|
|
|
|
import javax.sql.DataSource; |
|
|
|
import javax.sql.DataSource; |
|
|
|
|
|
|
|
|
|
|
|
import org.junit.jupiter.api.BeforeEach; |
|
|
|
import org.junit.jupiter.api.BeforeEach; |
|
|
|
import org.junit.jupiter.api.Test; |
|
|
|
import org.junit.jupiter.api.Test; |
|
|
|
import org.junit.jupiter.api.extension.RegisterExtension; |
|
|
|
import org.junit.jupiter.api.extension.RegisterExtension; |
|
|
|
|
|
|
|
|
|
|
|
import org.springframework.dao.DataAccessException; |
|
|
|
import org.springframework.dao.DataAccessException; |
|
|
|
import org.springframework.data.annotation.Id; |
|
|
|
import org.springframework.data.annotation.Id; |
|
|
|
import org.springframework.data.r2dbc.convert.EnumWriteSupport; |
|
|
|
import org.springframework.data.r2dbc.convert.EnumWriteSupport; |
|
|
|
@ -81,6 +89,13 @@ public class PostgresIntegrationTests extends R2dbcIntegrationTestSupport { |
|
|
|
+ "primitive_array INT[]," //
|
|
|
|
+ "primitive_array INT[]," //
|
|
|
|
+ "multidimensional_array INT[]," //
|
|
|
|
+ "multidimensional_array INT[]," //
|
|
|
|
+ "collection_array INT[][])"); |
|
|
|
+ "collection_array INT[][])"); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
template.execute("DROP TABLE IF EXISTS with_blobs"); |
|
|
|
|
|
|
|
template.execute("CREATE TABLE with_blobs (" //
|
|
|
|
|
|
|
|
+ "id serial PRIMARY KEY," //
|
|
|
|
|
|
|
|
+ "byte_array bytea," //
|
|
|
|
|
|
|
|
+ "byte_buffer bytea," //
|
|
|
|
|
|
|
|
+ "byte_blob bytea)"); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Test // gh-411
|
|
|
|
@Test // gh-411
|
|
|
|
@ -198,9 +213,9 @@ public class PostgresIntegrationTests extends R2dbcIntegrationTestSupport { |
|
|
|
|
|
|
|
|
|
|
|
template.execute("DROP TABLE IF EXISTS with_interval"); |
|
|
|
template.execute("DROP TABLE IF EXISTS with_interval"); |
|
|
|
template.execute("CREATE TABLE with_interval (" //
|
|
|
|
template.execute("CREATE TABLE with_interval (" //
|
|
|
|
+ "id serial PRIMARY KEY," //
|
|
|
|
+ "id serial PRIMARY KEY," //
|
|
|
|
+ "interval INTERVAL" //
|
|
|
|
+ "interval INTERVAL" //
|
|
|
|
+ ")"); |
|
|
|
+ ")"); |
|
|
|
|
|
|
|
|
|
|
|
R2dbcEntityTemplate template = new R2dbcEntityTemplate(client, |
|
|
|
R2dbcEntityTemplate template = new R2dbcEntityTemplate(client, |
|
|
|
new DefaultReactiveDataAccessStrategy(PostgresDialect.INSTANCE)); |
|
|
|
new DefaultReactiveDataAccessStrategy(PostgresDialect.INSTANCE)); |
|
|
|
@ -213,6 +228,62 @@ public class PostgresIntegrationTests extends R2dbcIntegrationTestSupport { |
|
|
|
}).verifyComplete(); |
|
|
|
}).verifyComplete(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Test // gh-1408
|
|
|
|
|
|
|
|
void shouldReadAndWriteBlobs() { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
R2dbcEntityTemplate template = new R2dbcEntityTemplate(client, |
|
|
|
|
|
|
|
new DefaultReactiveDataAccessStrategy(PostgresDialect.INSTANCE)); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
WithBlobs withBlobs = new WithBlobs(); |
|
|
|
|
|
|
|
byte[] content = "123ä".getBytes(StandardCharsets.UTF_8); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
withBlobs.byteArray = content; |
|
|
|
|
|
|
|
withBlobs.byteBuffer = ByteBuffer.wrap(content); |
|
|
|
|
|
|
|
withBlobs.byteBlob = Blob.from(Mono.just(ByteBuffer.wrap(content))); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
template.insert(withBlobs) //
|
|
|
|
|
|
|
|
.as(StepVerifier::create) //
|
|
|
|
|
|
|
|
.expectNextCount(1) //
|
|
|
|
|
|
|
|
.verifyComplete(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
template.selectOne(Query.empty(), WithBlobs.class) //
|
|
|
|
|
|
|
|
.flatMap(it -> { |
|
|
|
|
|
|
|
return Flux.from(it.byteBlob.stream()).last().map(blob -> { |
|
|
|
|
|
|
|
it.byteBlob = Blob.from(Mono.just(blob)); |
|
|
|
|
|
|
|
return it; |
|
|
|
|
|
|
|
}); |
|
|
|
|
|
|
|
}).as(StepVerifier::create) //
|
|
|
|
|
|
|
|
.consumeNextWith(actual -> { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
CompletableFuture<byte[]> cf = Mono.from(actual.byteBlob.stream()).map(Unpooled::wrappedBuffer) |
|
|
|
|
|
|
|
.map(ByteBufUtil::getBytes).toFuture(); |
|
|
|
|
|
|
|
assertThat(actual.getByteArray()).isEqualTo(content); |
|
|
|
|
|
|
|
assertThat(getBytes(Unpooled.wrappedBuffer(actual.getByteBuffer()))).isEqualTo(content); |
|
|
|
|
|
|
|
assertThat(cf.join()).isEqualTo(content); |
|
|
|
|
|
|
|
}).verifyComplete(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
template.selectOne(Query.empty(), WithBlobs.class) |
|
|
|
|
|
|
|
.doOnNext(it -> it.byteArray = "foo".getBytes(StandardCharsets.UTF_8)).flatMap(template::update) //
|
|
|
|
|
|
|
|
.as(StepVerifier::create) //
|
|
|
|
|
|
|
|
.expectNextCount(1).verifyComplete(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
template.selectOne(Query.empty(), WithBlobs.class) //
|
|
|
|
|
|
|
|
.flatMap(it -> { |
|
|
|
|
|
|
|
return Flux.from(it.byteBlob.stream()).last().map(blob -> { |
|
|
|
|
|
|
|
it.byteBlob = Blob.from(Mono.just(blob)); |
|
|
|
|
|
|
|
return it; |
|
|
|
|
|
|
|
}); |
|
|
|
|
|
|
|
}).as(StepVerifier::create) //
|
|
|
|
|
|
|
|
.consumeNextWith(actual -> { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
CompletableFuture<byte[]> cf = Mono.from(actual.byteBlob.stream()).map(Unpooled::wrappedBuffer) |
|
|
|
|
|
|
|
.map(ByteBufUtil::getBytes).toFuture(); |
|
|
|
|
|
|
|
assertThat(actual.getByteArray()).isEqualTo("foo".getBytes(StandardCharsets.UTF_8)); |
|
|
|
|
|
|
|
assertThat(getBytes(Unpooled.wrappedBuffer(actual.getByteBuffer()))).isEqualTo(content); |
|
|
|
|
|
|
|
assertThat(cf.join()).isEqualTo(content); |
|
|
|
|
|
|
|
}).verifyComplete(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Data |
|
|
|
@Data |
|
|
|
@AllArgsConstructor |
|
|
|
@AllArgsConstructor |
|
|
|
static class EntityWithEnum { |
|
|
|
static class EntityWithEnum { |
|
|
|
@ -260,4 +331,16 @@ public class PostgresIntegrationTests extends R2dbcIntegrationTestSupport { |
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Data |
|
|
|
|
|
|
|
@Table("with_blobs") |
|
|
|
|
|
|
|
static class WithBlobs { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Id Integer id; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
byte[] byteArray; |
|
|
|
|
|
|
|
ByteBuffer byteBuffer; |
|
|
|
|
|
|
|
Blob byteBlob; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|