From eed7c9dcf76a5b7cda1fe22bdca0402010e99e08 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Tue, 10 Jan 2023 15:09:50 +0100 Subject: [PATCH] Fix BLOB reading and writing. We now correctly consider ByteBuffer, Clob, and Blob types as additional types to read and write blob data. Closes #1408 --- .../r2dbc/convert/MappingR2dbcConverter.java | 11 ++- .../r2dbc/mapping/R2dbcSimpleTypeHolder.java | 8 +- .../r2dbc/core/PostgresIntegrationTests.java | 91 ++++++++++++++++++- 3 files changed, 103 insertions(+), 7 deletions(-) diff --git a/spring-data-r2dbc/src/main/java/org/springframework/data/r2dbc/convert/MappingR2dbcConverter.java b/spring-data-r2dbc/src/main/java/org/springframework/data/r2dbc/convert/MappingR2dbcConverter.java index a00b627a7..cb092b037 100644 --- a/spring-data-r2dbc/src/main/java/org/springframework/data/r2dbc/convert/MappingR2dbcConverter.java +++ b/spring-data-r2dbc/src/main/java/org/springframework/data/r2dbc/convert/MappingR2dbcConverter.java @@ -15,6 +15,8 @@ */ package org.springframework.data.r2dbc.convert; +import io.r2dbc.spi.Blob; +import io.r2dbc.spi.Clob; import io.r2dbc.spi.ColumnMetadata; import io.r2dbc.spi.Row; import io.r2dbc.spi.RowMetadata; @@ -158,7 +160,14 @@ public class MappingR2dbcConverter extends BasicRelationalConverter implements R Object value = null; if (metadata == null || RowMetadataUtils.containsColumn(metadata, identifier)) { - value = row.get(identifier); + + if (property.getType().equals(Clob.class)) { + value = row.get(identifier, Clob.class); + } else if (property.getType().equals(Blob.class)) { + value = row.get(identifier, Blob.class); + } else { + value = row.get(identifier); + } } if (value == null) { diff --git a/spring-data-r2dbc/src/main/java/org/springframework/data/r2dbc/mapping/R2dbcSimpleTypeHolder.java b/spring-data-r2dbc/src/main/java/org/springframework/data/r2dbc/mapping/R2dbcSimpleTypeHolder.java index e5768cd73..23e69a408 100644 --- a/spring-data-r2dbc/src/main/java/org/springframework/data/r2dbc/mapping/R2dbcSimpleTypeHolder.java +++ b/spring-data-r2dbc/src/main/java/org/springframework/data/r2dbc/mapping/R2dbcSimpleTypeHolder.java @@ -15,10 +15,13 @@ */ package org.springframework.data.r2dbc.mapping; +import io.r2dbc.spi.Blob; +import io.r2dbc.spi.Clob; import io.r2dbc.spi.Row; import java.math.BigDecimal; import java.math.BigInteger; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -37,8 +40,9 @@ public class R2dbcSimpleTypeHolder extends SimpleTypeHolder { /** * Set of R2DBC simple types. */ - public static final Set> R2DBC_SIMPLE_TYPES = Collections.unmodifiableSet( - new HashSet<>(Arrays.asList(OutboundRow.class, Row.class, BigInteger.class, BigDecimal.class, UUID.class))); + public static final Set> R2DBC_SIMPLE_TYPES = Collections + .unmodifiableSet(new HashSet<>(Arrays.asList(OutboundRow.class, Row.class, BigInteger.class, BigDecimal.class, + UUID.class, Blob.class, Clob.class, ByteBuffer.class))); public static final SimpleTypeHolder HOLDER = new R2dbcSimpleTypeHolder(); diff --git a/spring-data-r2dbc/src/test/java/org/springframework/data/r2dbc/core/PostgresIntegrationTests.java b/spring-data-r2dbc/src/test/java/org/springframework/data/r2dbc/core/PostgresIntegrationTests.java index 0668f935f..eec02e523 100644 --- a/spring-data-r2dbc/src/test/java/org/springframework/data/r2dbc/core/PostgresIntegrationTests.java +++ b/spring-data-r2dbc/src/test/java/org/springframework/data/r2dbc/core/PostgresIntegrationTests.java @@ -15,9 +15,12 @@ */ package org.springframework.data.r2dbc.core; +import static io.netty.buffer.ByteBufUtil.*; import static org.assertj.core.api.Assertions.*; import static org.springframework.data.relational.core.query.Criteria.*; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; import io.r2dbc.postgresql.PostgresqlConnectionConfiguration; import io.r2dbc.postgresql.PostgresqlConnectionFactory; import io.r2dbc.postgresql.codec.Box; @@ -30,21 +33,26 @@ import io.r2dbc.postgresql.codec.Path; import io.r2dbc.postgresql.codec.Point; import io.r2dbc.postgresql.codec.Polygon; import io.r2dbc.postgresql.extension.CodecRegistrar; +import io.r2dbc.spi.Blob; import io.r2dbc.spi.ConnectionFactory; import lombok.AllArgsConstructor; import lombok.Data; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; import javax.sql.DataSource; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; - import org.springframework.dao.DataAccessException; import org.springframework.data.annotation.Id; import org.springframework.data.r2dbc.convert.EnumWriteSupport; @@ -81,6 +89,13 @@ public class PostgresIntegrationTests extends R2dbcIntegrationTestSupport { + "primitive_array INT[]," // + "multidimensional_array INT[]," // + "collection_array INT[][])"); + + template.execute("DROP TABLE IF EXISTS with_blobs"); + template.execute("CREATE TABLE with_blobs (" // + + "id serial PRIMARY KEY," // + + "byte_array bytea," // + + "byte_buffer bytea," // + + "byte_blob bytea)"); } @Test // gh-411 @@ -198,9 +213,9 @@ public class PostgresIntegrationTests extends R2dbcIntegrationTestSupport { template.execute("DROP TABLE IF EXISTS with_interval"); template.execute("CREATE TABLE with_interval (" // - + "id serial PRIMARY KEY," // - + "interval INTERVAL" // - + ")"); + + "id serial PRIMARY KEY," // + + "interval INTERVAL" // + + ")"); R2dbcEntityTemplate template = new R2dbcEntityTemplate(client, new DefaultReactiveDataAccessStrategy(PostgresDialect.INSTANCE)); @@ -213,6 +228,62 @@ public class PostgresIntegrationTests extends R2dbcIntegrationTestSupport { }).verifyComplete(); } + @Test // gh-1408 + void shouldReadAndWriteBlobs() { + + R2dbcEntityTemplate template = new R2dbcEntityTemplate(client, + new DefaultReactiveDataAccessStrategy(PostgresDialect.INSTANCE)); + + WithBlobs withBlobs = new WithBlobs(); + byte[] content = "123รค".getBytes(StandardCharsets.UTF_8); + + withBlobs.byteArray = content; + withBlobs.byteBuffer = ByteBuffer.wrap(content); + withBlobs.byteBlob = Blob.from(Mono.just(ByteBuffer.wrap(content))); + + template.insert(withBlobs) // + .as(StepVerifier::create) // + .expectNextCount(1) // + .verifyComplete(); + + template.selectOne(Query.empty(), WithBlobs.class) // + .flatMap(it -> { + return Flux.from(it.byteBlob.stream()).last().map(blob -> { + it.byteBlob = Blob.from(Mono.just(blob)); + return it; + }); + }).as(StepVerifier::create) // + .consumeNextWith(actual -> { + + CompletableFuture cf = Mono.from(actual.byteBlob.stream()).map(Unpooled::wrappedBuffer) + .map(ByteBufUtil::getBytes).toFuture(); + assertThat(actual.getByteArray()).isEqualTo(content); + assertThat(getBytes(Unpooled.wrappedBuffer(actual.getByteBuffer()))).isEqualTo(content); + assertThat(cf.join()).isEqualTo(content); + }).verifyComplete(); + + template.selectOne(Query.empty(), WithBlobs.class) + .doOnNext(it -> it.byteArray = "foo".getBytes(StandardCharsets.UTF_8)).flatMap(template::update) // + .as(StepVerifier::create) // + .expectNextCount(1).verifyComplete(); + + template.selectOne(Query.empty(), WithBlobs.class) // + .flatMap(it -> { + return Flux.from(it.byteBlob.stream()).last().map(blob -> { + it.byteBlob = Blob.from(Mono.just(blob)); + return it; + }); + }).as(StepVerifier::create) // + .consumeNextWith(actual -> { + + CompletableFuture cf = Mono.from(actual.byteBlob.stream()).map(Unpooled::wrappedBuffer) + .map(ByteBufUtil::getBytes).toFuture(); + assertThat(actual.getByteArray()).isEqualTo("foo".getBytes(StandardCharsets.UTF_8)); + assertThat(getBytes(Unpooled.wrappedBuffer(actual.getByteBuffer()))).isEqualTo(content); + assertThat(cf.join()).isEqualTo(content); + }).verifyComplete(); + } + @Data @AllArgsConstructor static class EntityWithEnum { @@ -260,4 +331,16 @@ public class PostgresIntegrationTests extends R2dbcIntegrationTestSupport { } + @Data + @Table("with_blobs") + static class WithBlobs { + + @Id Integer id; + + byte[] byteArray; + ByteBuffer byteBuffer; + Blob byteBlob; + + } + }