Browse Source

Add DataBufferUtils.compose

Added a utility method that composes data buffers into a single buffer.
Depending on the `DataBuffer` implementation, the returned buffer may be
a single buffer containing all data of the provided buffers, or it may
be a true composite that contains references to the buffers.

Issue: SPR-16365
pull/1503/merge
Arjen Poutsma 8 years ago
parent
commit
384a399fd2
  1. 13
      spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferFactory.java
  2. 22
      spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java
  3. 20
      spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBufferFactory.java
  4. 20
      spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBufferFactory.java
  5. 13
      spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java
  6. 14
      spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java

13
spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferFactory.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package org.springframework.core.io.buffer;
import java.nio.ByteBuffer;
import java.util.List;
/**
* A factory for {@link DataBuffer}s, allowing for allocation and wrapping of
@ -61,4 +62,14 @@ public interface DataBufferFactory { @@ -61,4 +62,14 @@ public interface DataBufferFactory {
*/
DataBuffer wrap(byte[] bytes);
/**
* Create a composite data buffer from the list of provided data buffers. Depending on the
* implementation, the returned buffer may be a single buffer containing all data of the
* provided buffers, or it may be a true composite that contains references to the buffers.
* <p>Note that the given data buffers do <strong>not</strong> have to be released, as they are
* released as part of the returned composite.
* @param dataBuffers the data buffers to be composed
* @return a buffer that composes {@code dataBuffers} into one
*/
DataBuffer compose(List<DataBuffer> dataBuffers);
}

22
spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java

@ -39,6 +39,7 @@ import org.reactivestreams.Subscription; @@ -39,6 +39,7 @@ import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;
import org.springframework.core.io.Resource;
@ -434,6 +435,27 @@ public abstract class DataBufferUtils { @@ -434,6 +435,27 @@ public abstract class DataBufferUtils {
return WRITE_AGGREGATOR;
}
/**
* Composes the buffers in the given {@link Publisher} into a single data buffer. Depending on
* the {@code DataBuffer} implementation, the returned buffer may be a single buffer containing
* all data of the provided buffers, or it may be a true composite that contains references to
* the buffers.
* @param publisher the data buffers that are to be composed
* @return the composed data buffer
*/
public static Mono<DataBuffer> compose(Publisher<DataBuffer> publisher) {
Assert.notNull(publisher, "'publisher' must not be null");
Flux<DataBuffer> source = Flux.from(publisher);
return source.collectList()
.filter(dataBuffers -> !dataBuffers.isEmpty())
.map(dataBuffers -> {
DataBufferFactory bufferFactory = dataBuffers.get(0).factory();
return bufferFactory.compose(dataBuffers);
});
}
private static class ReadableByteChannelGenerator
implements BiFunction<ReadableByteChannel, SynchronousSink<DataBuffer>, ReadableByteChannel> {

20
spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBufferFactory.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package org.springframework.core.io.buffer;
import java.nio.ByteBuffer;
import java.util.List;
import org.springframework.util.Assert;
@ -102,6 +103,23 @@ public class DefaultDataBufferFactory implements DataBufferFactory { @@ -102,6 +103,23 @@ public class DefaultDataBufferFactory implements DataBufferFactory {
return DefaultDataBuffer.fromFilledByteBuffer(this, wrapper);
}
/**
* {@inheritDoc}
* <p>This implementation creates a single {@link DefaultDataBuffer} to contain the data
* in {@code dataBuffers}.
*/
@Override
public DataBuffer compose(List<DataBuffer> dataBuffers) {
Assert.notEmpty(dataBuffers, "'dataBuffers' must not be empty");
int capacity = dataBuffers.stream()
.mapToInt(DataBuffer::readableByteCount)
.sum();
DefaultDataBuffer dataBuffer = allocateBuffer(capacity);
return dataBuffers.stream()
.reduce(dataBuffer, DataBuffer::write);
}
@Override
public String toString() {
return "DefaultDataBufferFactory (preferDirect=" + this.preferDirect + ")";

20
spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBufferFactory.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,9 +17,11 @@ @@ -17,9 +17,11 @@
package org.springframework.core.io.buffer;
import java.nio.ByteBuffer;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import org.springframework.util.Assert;
@ -80,6 +82,22 @@ public class NettyDataBufferFactory implements DataBufferFactory { @@ -80,6 +82,22 @@ public class NettyDataBufferFactory implements DataBufferFactory {
return new NettyDataBuffer(byteBuf, this);
}
/**
* {@inheritDoc}
* <p>This implementation uses Netty's {@link CompositeByteBuf}.
*/
@Override
public DataBuffer compose(List<DataBuffer> dataBuffers) {
Assert.notNull(dataBuffers, "'dataBuffers' must not be null");
CompositeByteBuf composite = this.byteBufAllocator.compositeBuffer(dataBuffers.size());
for (DataBuffer dataBuffer : dataBuffers) {
Assert.isInstanceOf(NettyDataBuffer.class, dataBuffer);
NettyDataBuffer nettyDataBuffer = (NettyDataBuffer) dataBuffer;
composite.addComponent(true, nettyDataBuffer.getNativeBuffer());
}
return new NettyDataBuffer(composite, this);
}
/**
* Wrap the given Netty {@link ByteBuf} in a {@code NettyDataBuffer}.
* @param byteBuf the Netty byte buffer to wrap

13
spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java

@ -479,5 +479,18 @@ public class DataBufferTests extends AbstractDataBufferAllocatingTestCase { @@ -479,5 +479,18 @@ public class DataBufferTests extends AbstractDataBufferAllocatingTestCase {
release(buffer);
}
@Test
public void composite() {
DataBuffer composite = this.bufferFactory.compose(Arrays.asList(stringBuffer("a"),
stringBuffer("b"), stringBuffer("c")));
assertEquals(3, composite.readableByteCount());
byte[] bytes = new byte[3];
composite.read(bytes);
assertArrayEquals(new byte[] {'a','b','c'}, bytes);
release(composite);
}
}

14
spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java

@ -351,5 +351,19 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { @@ -351,5 +351,19 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
};
}
@Test
public void compose() {
DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar");
DataBuffer baz = stringBuffer("baz");
Flux<DataBuffer> flux = Flux.just(foo, bar, baz);
DataBuffer result = DataBufferUtils.compose(flux).block(Duration.ofSeconds(5));
assertEquals("foobarbaz", DataBufferTestUtils.dumpString(result, StandardCharsets.UTF_8));
release(result);
}
}

Loading…
Cancel
Save