From 384a399fd25bf7a9bd7dff239d31e9e7fbbbad24 Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Thu, 11 Jan 2018 17:19:32 +0100 Subject: [PATCH] Add DataBufferUtils.compose Added a utility method that composes data buffers into a single buffer. Depending on the `DataBuffer` implementation, the returned buffer may be a single buffer containing all data of the provided buffers, or it may be a true composite that contains references to the buffers. Issue: SPR-16365 --- .../core/io/buffer/DataBufferFactory.java | 13 ++++++++++- .../core/io/buffer/DataBufferUtils.java | 22 +++++++++++++++++++ .../io/buffer/DefaultDataBufferFactory.java | 20 ++++++++++++++++- .../io/buffer/NettyDataBufferFactory.java | 20 ++++++++++++++++- .../core/io/buffer/DataBufferTests.java | 13 +++++++++++ .../core/io/buffer/DataBufferUtilsTests.java | 14 ++++++++++++ 6 files changed, 99 insertions(+), 3 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferFactory.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferFactory.java index 21e8f10f372..5deade308f4 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferFactory.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.springframework.core.io.buffer; import java.nio.ByteBuffer; +import java.util.List; /** * A factory for {@link DataBuffer}s, allowing for allocation and wrapping of @@ -61,4 +62,14 @@ public interface DataBufferFactory { */ DataBuffer wrap(byte[] bytes); + /** + * Create a composite data buffer from the list of provided data buffers. Depending on the + * implementation, the returned buffer may be a single buffer containing all data of the + * provided buffers, or it may be a true composite that contains references to the buffers. + *

Note that the given data buffers do not have to be released, as they are + * released as part of the returned composite. + * @param dataBuffers the data buffers to be composed + * @return a buffer that composes {@code dataBuffers} into one + */ + DataBuffer compose(List dataBuffers); } diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index f89375c125d..5c942573e37 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -39,6 +39,7 @@ import org.reactivestreams.Subscription; import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; import reactor.core.publisher.SynchronousSink; import org.springframework.core.io.Resource; @@ -434,6 +435,27 @@ public abstract class DataBufferUtils { return WRITE_AGGREGATOR; } + /** + * Composes the buffers in the given {@link Publisher} into a single data buffer. Depending on + * the {@code DataBuffer} implementation, the returned buffer may be a single buffer containing + * all data of the provided buffers, or it may be a true composite that contains references to + * the buffers. + * @param publisher the data buffers that are to be composed + * @return the composed data buffer + */ + public static Mono compose(Publisher publisher) { + Assert.notNull(publisher, "'publisher' must not be null"); + + Flux source = Flux.from(publisher); + + return source.collectList() + .filter(dataBuffers -> !dataBuffers.isEmpty()) + .map(dataBuffers -> { + DataBufferFactory bufferFactory = dataBuffers.get(0).factory(); + return bufferFactory.compose(dataBuffers); + }); + } + private static class ReadableByteChannelGenerator implements BiFunction, ReadableByteChannel> { diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBufferFactory.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBufferFactory.java index 600b952046b..c04829c5a14 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBufferFactory.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBufferFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.springframework.core.io.buffer; import java.nio.ByteBuffer; +import java.util.List; import org.springframework.util.Assert; @@ -102,6 +103,23 @@ public class DefaultDataBufferFactory implements DataBufferFactory { return DefaultDataBuffer.fromFilledByteBuffer(this, wrapper); } + /** + * {@inheritDoc} + *

This implementation creates a single {@link DefaultDataBuffer} to contain the data + * in {@code dataBuffers}. + */ + @Override + public DataBuffer compose(List dataBuffers) { + Assert.notEmpty(dataBuffers, "'dataBuffers' must not be empty"); + + int capacity = dataBuffers.stream() + .mapToInt(DataBuffer::readableByteCount) + .sum(); + DefaultDataBuffer dataBuffer = allocateBuffer(capacity); + return dataBuffers.stream() + .reduce(dataBuffer, DataBuffer::write); + } + @Override public String toString() { return "DefaultDataBufferFactory (preferDirect=" + this.preferDirect + ")"; diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBufferFactory.java b/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBufferFactory.java index ba1ad13d70b..2b2eaba0aec 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBufferFactory.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBufferFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,9 +17,11 @@ package org.springframework.core.io.buffer; import java.nio.ByteBuffer; +import java.util.List; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import org.springframework.util.Assert; @@ -80,6 +82,22 @@ public class NettyDataBufferFactory implements DataBufferFactory { return new NettyDataBuffer(byteBuf, this); } + /** + * {@inheritDoc} + *

This implementation uses Netty's {@link CompositeByteBuf}. + */ + @Override + public DataBuffer compose(List dataBuffers) { + Assert.notNull(dataBuffers, "'dataBuffers' must not be null"); + CompositeByteBuf composite = this.byteBufAllocator.compositeBuffer(dataBuffers.size()); + for (DataBuffer dataBuffer : dataBuffers) { + Assert.isInstanceOf(NettyDataBuffer.class, dataBuffer); + NettyDataBuffer nettyDataBuffer = (NettyDataBuffer) dataBuffer; + composite.addComponent(true, nettyDataBuffer.getNativeBuffer()); + } + return new NettyDataBuffer(composite, this); + } + /** * Wrap the given Netty {@link ByteBuf} in a {@code NettyDataBuffer}. * @param byteBuf the Netty byte buffer to wrap diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java index cb3c24d25a6..89f3f111a26 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java @@ -479,5 +479,18 @@ public class DataBufferTests extends AbstractDataBufferAllocatingTestCase { release(buffer); } + @Test + public void composite() { + DataBuffer composite = this.bufferFactory.compose(Arrays.asList(stringBuffer("a"), + stringBuffer("b"), stringBuffer("c"))); + assertEquals(3, composite.readableByteCount()); + byte[] bytes = new byte[3]; + composite.read(bytes); + + assertArrayEquals(new byte[] {'a','b','c'}, bytes); + + release(composite); + } + } \ No newline at end of file diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java index 03df86d7249..afa451a0a06 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java @@ -351,5 +351,19 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { }; } + @Test + public void compose() { + DataBuffer foo = stringBuffer("foo"); + DataBuffer bar = stringBuffer("bar"); + DataBuffer baz = stringBuffer("baz"); + Flux flux = Flux.just(foo, bar, baz); + + DataBuffer result = DataBufferUtils.compose(flux).block(Duration.ofSeconds(5)); + + assertEquals("foobarbaz", DataBufferTestUtils.dumpString(result, StandardCharsets.UTF_8)); + + release(result); + } + }