diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferFactory.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferFactory.java
index 21e8f10f372..5deade308f4 100644
--- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferFactory.java
+++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferFactory.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2017 the original author or authors.
+ * Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
package org.springframework.core.io.buffer;
import java.nio.ByteBuffer;
+import java.util.List;
/**
* A factory for {@link DataBuffer}s, allowing for allocation and wrapping of
@@ -61,4 +62,14 @@ public interface DataBufferFactory {
*/
DataBuffer wrap(byte[] bytes);
+ /**
+ * Create a composite data buffer from the list of provided data buffers. Depending on the
+ * implementation, the returned buffer may be a single buffer containing all data of the
+ * provided buffers, or it may be a true composite that contains references to the buffers.
+ *
Note that the given data buffers do not have to be released, as they are
+ * released as part of the returned composite.
+ * @param dataBuffers the data buffers to be composed
+ * @return a buffer that composes {@code dataBuffers} into one
+ */
+ DataBuffer compose(List dataBuffers);
}
diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java
index f89375c125d..5c942573e37 100644
--- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java
+++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java
@@ -39,6 +39,7 @@ import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;
import org.springframework.core.io.Resource;
@@ -434,6 +435,27 @@ public abstract class DataBufferUtils {
return WRITE_AGGREGATOR;
}
+ /**
+ * Composes the buffers in the given {@link Publisher} into a single data buffer. Depending on
+ * the {@code DataBuffer} implementation, the returned buffer may be a single buffer containing
+ * all data of the provided buffers, or it may be a true composite that contains references to
+ * the buffers.
+ * @param publisher the data buffers that are to be composed
+ * @return the composed data buffer
+ */
+ public static Mono compose(Publisher publisher) {
+ Assert.notNull(publisher, "'publisher' must not be null");
+
+ Flux source = Flux.from(publisher);
+
+ return source.collectList()
+ .filter(dataBuffers -> !dataBuffers.isEmpty())
+ .map(dataBuffers -> {
+ DataBufferFactory bufferFactory = dataBuffers.get(0).factory();
+ return bufferFactory.compose(dataBuffers);
+ });
+ }
+
private static class ReadableByteChannelGenerator
implements BiFunction, ReadableByteChannel> {
diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBufferFactory.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBufferFactory.java
index 600b952046b..c04829c5a14 100644
--- a/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBufferFactory.java
+++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBufferFactory.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2017 the original author or authors.
+ * Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
package org.springframework.core.io.buffer;
import java.nio.ByteBuffer;
+import java.util.List;
import org.springframework.util.Assert;
@@ -102,6 +103,23 @@ public class DefaultDataBufferFactory implements DataBufferFactory {
return DefaultDataBuffer.fromFilledByteBuffer(this, wrapper);
}
+ /**
+ * {@inheritDoc}
+ * This implementation creates a single {@link DefaultDataBuffer} to contain the data
+ * in {@code dataBuffers}.
+ */
+ @Override
+ public DataBuffer compose(List dataBuffers) {
+ Assert.notEmpty(dataBuffers, "'dataBuffers' must not be empty");
+
+ int capacity = dataBuffers.stream()
+ .mapToInt(DataBuffer::readableByteCount)
+ .sum();
+ DefaultDataBuffer dataBuffer = allocateBuffer(capacity);
+ return dataBuffers.stream()
+ .reduce(dataBuffer, DataBuffer::write);
+ }
+
@Override
public String toString() {
return "DefaultDataBufferFactory (preferDirect=" + this.preferDirect + ")";
diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBufferFactory.java b/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBufferFactory.java
index ba1ad13d70b..2b2eaba0aec 100644
--- a/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBufferFactory.java
+++ b/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBufferFactory.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2017 the original author or authors.
+ * Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,9 +17,11 @@
package org.springframework.core.io.buffer;
import java.nio.ByteBuffer;
+import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import org.springframework.util.Assert;
@@ -80,6 +82,22 @@ public class NettyDataBufferFactory implements DataBufferFactory {
return new NettyDataBuffer(byteBuf, this);
}
+ /**
+ * {@inheritDoc}
+ * This implementation uses Netty's {@link CompositeByteBuf}.
+ */
+ @Override
+ public DataBuffer compose(List dataBuffers) {
+ Assert.notNull(dataBuffers, "'dataBuffers' must not be null");
+ CompositeByteBuf composite = this.byteBufAllocator.compositeBuffer(dataBuffers.size());
+ for (DataBuffer dataBuffer : dataBuffers) {
+ Assert.isInstanceOf(NettyDataBuffer.class, dataBuffer);
+ NettyDataBuffer nettyDataBuffer = (NettyDataBuffer) dataBuffer;
+ composite.addComponent(true, nettyDataBuffer.getNativeBuffer());
+ }
+ return new NettyDataBuffer(composite, this);
+ }
+
/**
* Wrap the given Netty {@link ByteBuf} in a {@code NettyDataBuffer}.
* @param byteBuf the Netty byte buffer to wrap
diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java
index cb3c24d25a6..89f3f111a26 100644
--- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java
+++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java
@@ -479,5 +479,18 @@ public class DataBufferTests extends AbstractDataBufferAllocatingTestCase {
release(buffer);
}
+ @Test
+ public void composite() {
+ DataBuffer composite = this.bufferFactory.compose(Arrays.asList(stringBuffer("a"),
+ stringBuffer("b"), stringBuffer("c")));
+ assertEquals(3, composite.readableByteCount());
+ byte[] bytes = new byte[3];
+ composite.read(bytes);
+
+ assertArrayEquals(new byte[] {'a','b','c'}, bytes);
+
+ release(composite);
+ }
+
}
\ No newline at end of file
diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java
index 03df86d7249..afa451a0a06 100644
--- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java
+++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java
@@ -351,5 +351,19 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
};
}
+ @Test
+ public void compose() {
+ DataBuffer foo = stringBuffer("foo");
+ DataBuffer bar = stringBuffer("bar");
+ DataBuffer baz = stringBuffer("baz");
+ Flux flux = Flux.just(foo, bar, baz);
+
+ DataBuffer result = DataBufferUtils.compose(flux).block(Duration.ofSeconds(5));
+
+ assertEquals("foobarbaz", DataBufferTestUtils.dumpString(result, StandardCharsets.UTF_8));
+
+ release(result);
+ }
+
}