Browse Source

Fix BLOB reading and writing.

We now correctly consider ByteBuffer, Clob, and Blob types as additional types to read and write blob data.

Closes #1408
pull/1486/head
Mark Paluch 3 years ago
parent
commit
d6e00a8734
No known key found for this signature in database
GPG Key ID: 4406B84C1661DCD1
  1. 11
      spring-data-r2dbc/src/main/java/org/springframework/data/r2dbc/convert/MappingR2dbcConverter.java
  2. 8
      spring-data-r2dbc/src/main/java/org/springframework/data/r2dbc/mapping/R2dbcSimpleTypeHolder.java
  3. 91
      spring-data-r2dbc/src/test/java/org/springframework/data/r2dbc/core/PostgresIntegrationTests.java

11
spring-data-r2dbc/src/main/java/org/springframework/data/r2dbc/convert/MappingR2dbcConverter.java

@ -15,6 +15,8 @@
*/ */
package org.springframework.data.r2dbc.convert; package org.springframework.data.r2dbc.convert;
import io.r2dbc.spi.Blob;
import io.r2dbc.spi.Clob;
import io.r2dbc.spi.ColumnMetadata; import io.r2dbc.spi.ColumnMetadata;
import io.r2dbc.spi.Row; import io.r2dbc.spi.Row;
import io.r2dbc.spi.RowMetadata; import io.r2dbc.spi.RowMetadata;
@ -158,7 +160,14 @@ public class MappingR2dbcConverter extends BasicRelationalConverter implements R
Object value = null; Object value = null;
if (metadata == null || RowMetadataUtils.containsColumn(metadata, identifier)) { if (metadata == null || RowMetadataUtils.containsColumn(metadata, identifier)) {
value = row.get(identifier);
if (property.getType().equals(Clob.class)) {
value = row.get(identifier, Clob.class);
} else if (property.getType().equals(Blob.class)) {
value = row.get(identifier, Blob.class);
} else {
value = row.get(identifier);
}
} }
if (value == null) { if (value == null) {

8
spring-data-r2dbc/src/main/java/org/springframework/data/r2dbc/mapping/R2dbcSimpleTypeHolder.java

@ -15,10 +15,13 @@
*/ */
package org.springframework.data.r2dbc.mapping; package org.springframework.data.r2dbc.mapping;
import io.r2dbc.spi.Blob;
import io.r2dbc.spi.Clob;
import io.r2dbc.spi.Row; import io.r2dbc.spi.Row;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.math.BigInteger; import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
@ -37,8 +40,9 @@ public class R2dbcSimpleTypeHolder extends SimpleTypeHolder {
/** /**
* Set of R2DBC simple types. * Set of R2DBC simple types.
*/ */
public static final Set<Class<?>> R2DBC_SIMPLE_TYPES = Collections.unmodifiableSet( public static final Set<Class<?>> R2DBC_SIMPLE_TYPES = Collections
new HashSet<>(Arrays.asList(OutboundRow.class, Row.class, BigInteger.class, BigDecimal.class, UUID.class))); .unmodifiableSet(new HashSet<>(Arrays.asList(OutboundRow.class, Row.class, BigInteger.class, BigDecimal.class,
UUID.class, Blob.class, Clob.class, ByteBuffer.class)));
public static final SimpleTypeHolder HOLDER = new R2dbcSimpleTypeHolder(); public static final SimpleTypeHolder HOLDER = new R2dbcSimpleTypeHolder();

91
spring-data-r2dbc/src/test/java/org/springframework/data/r2dbc/core/PostgresIntegrationTests.java

@ -15,9 +15,12 @@
*/ */
package org.springframework.data.r2dbc.core; package org.springframework.data.r2dbc.core;
import static io.netty.buffer.ByteBufUtil.*;
import static org.assertj.core.api.Assertions.*; import static org.assertj.core.api.Assertions.*;
import static org.springframework.data.relational.core.query.Criteria.*; import static org.springframework.data.relational.core.query.Criteria.*;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.r2dbc.postgresql.PostgresqlConnectionConfiguration; import io.r2dbc.postgresql.PostgresqlConnectionConfiguration;
import io.r2dbc.postgresql.PostgresqlConnectionFactory; import io.r2dbc.postgresql.PostgresqlConnectionFactory;
import io.r2dbc.postgresql.codec.Box; import io.r2dbc.postgresql.codec.Box;
@ -30,21 +33,26 @@ import io.r2dbc.postgresql.codec.Path;
import io.r2dbc.postgresql.codec.Point; import io.r2dbc.postgresql.codec.Point;
import io.r2dbc.postgresql.codec.Polygon; import io.r2dbc.postgresql.codec.Polygon;
import io.r2dbc.postgresql.extension.CodecRegistrar; import io.r2dbc.postgresql.extension.CodecRegistrar;
import io.r2dbc.spi.Blob;
import io.r2dbc.spi.ConnectionFactory; import io.r2dbc.spi.ConnectionFactory;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier; import reactor.test.StepVerifier;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration; import java.time.Duration;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture;
import javax.sql.DataSource; import javax.sql.DataSource;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.extension.RegisterExtension;
import org.springframework.dao.DataAccessException; import org.springframework.dao.DataAccessException;
import org.springframework.data.annotation.Id; import org.springframework.data.annotation.Id;
import org.springframework.data.r2dbc.convert.EnumWriteSupport; import org.springframework.data.r2dbc.convert.EnumWriteSupport;
@ -81,6 +89,13 @@ public class PostgresIntegrationTests extends R2dbcIntegrationTestSupport {
+ "primitive_array INT[]," // + "primitive_array INT[]," //
+ "multidimensional_array INT[]," // + "multidimensional_array INT[]," //
+ "collection_array INT[][])"); + "collection_array INT[][])");
template.execute("DROP TABLE IF EXISTS with_blobs");
template.execute("CREATE TABLE with_blobs (" //
+ "id serial PRIMARY KEY," //
+ "byte_array bytea," //
+ "byte_buffer bytea," //
+ "byte_blob bytea)");
} }
@Test // gh-411 @Test // gh-411
@ -198,9 +213,9 @@ public class PostgresIntegrationTests extends R2dbcIntegrationTestSupport {
template.execute("DROP TABLE IF EXISTS with_interval"); template.execute("DROP TABLE IF EXISTS with_interval");
template.execute("CREATE TABLE with_interval (" // template.execute("CREATE TABLE with_interval (" //
+ "id serial PRIMARY KEY," // + "id serial PRIMARY KEY," //
+ "interval INTERVAL" // + "interval INTERVAL" //
+ ")"); + ")");
R2dbcEntityTemplate template = new R2dbcEntityTemplate(client, R2dbcEntityTemplate template = new R2dbcEntityTemplate(client,
new DefaultReactiveDataAccessStrategy(PostgresDialect.INSTANCE)); new DefaultReactiveDataAccessStrategy(PostgresDialect.INSTANCE));
@ -213,6 +228,62 @@ public class PostgresIntegrationTests extends R2dbcIntegrationTestSupport {
}).verifyComplete(); }).verifyComplete();
} }
@Test // gh-1408
void shouldReadAndWriteBlobs() {
R2dbcEntityTemplate template = new R2dbcEntityTemplate(client,
new DefaultReactiveDataAccessStrategy(PostgresDialect.INSTANCE));
WithBlobs withBlobs = new WithBlobs();
byte[] content = "123ä".getBytes(StandardCharsets.UTF_8);
withBlobs.byteArray = content;
withBlobs.byteBuffer = ByteBuffer.wrap(content);
withBlobs.byteBlob = Blob.from(Mono.just(ByteBuffer.wrap(content)));
template.insert(withBlobs) //
.as(StepVerifier::create) //
.expectNextCount(1) //
.verifyComplete();
template.selectOne(Query.empty(), WithBlobs.class) //
.flatMap(it -> {
return Flux.from(it.byteBlob.stream()).last().map(blob -> {
it.byteBlob = Blob.from(Mono.just(blob));
return it;
});
}).as(StepVerifier::create) //
.consumeNextWith(actual -> {
CompletableFuture<byte[]> cf = Mono.from(actual.byteBlob.stream()).map(Unpooled::wrappedBuffer)
.map(ByteBufUtil::getBytes).toFuture();
assertThat(actual.getByteArray()).isEqualTo(content);
assertThat(getBytes(Unpooled.wrappedBuffer(actual.getByteBuffer()))).isEqualTo(content);
assertThat(cf.join()).isEqualTo(content);
}).verifyComplete();
template.selectOne(Query.empty(), WithBlobs.class)
.doOnNext(it -> it.byteArray = "foo".getBytes(StandardCharsets.UTF_8)).flatMap(template::update) //
.as(StepVerifier::create) //
.expectNextCount(1).verifyComplete();
template.selectOne(Query.empty(), WithBlobs.class) //
.flatMap(it -> {
return Flux.from(it.byteBlob.stream()).last().map(blob -> {
it.byteBlob = Blob.from(Mono.just(blob));
return it;
});
}).as(StepVerifier::create) //
.consumeNextWith(actual -> {
CompletableFuture<byte[]> cf = Mono.from(actual.byteBlob.stream()).map(Unpooled::wrappedBuffer)
.map(ByteBufUtil::getBytes).toFuture();
assertThat(actual.getByteArray()).isEqualTo("foo".getBytes(StandardCharsets.UTF_8));
assertThat(getBytes(Unpooled.wrappedBuffer(actual.getByteBuffer()))).isEqualTo(content);
assertThat(cf.join()).isEqualTo(content);
}).verifyComplete();
}
@Data @Data
@AllArgsConstructor @AllArgsConstructor
static class EntityWithEnum { static class EntityWithEnum {
@ -260,4 +331,16 @@ public class PostgresIntegrationTests extends R2dbcIntegrationTestSupport {
} }
@Data
@Table("with_blobs")
static class WithBlobs {
@Id Integer id;
byte[] byteArray;
ByteBuffer byteBuffer;
Blob byteBlob;
}
} }

Loading…
Cancel
Save