Browse Source

Write CharSequence instances to DataBuffers

Prior to this commit, one could write a `CharSequence` to an existing
`DataBuffer` instance by turning it into a byte array or `ByteBuffer`
first. This had the following disadvantages:

1. Memory allocation was not efficient (not leveraging pooled memory
when available)
2. Dealing with `CharsetEncoder` is not always easy
3. `DataBuffer` implementations, like `NettyDataBuffer` can use
optimized implementations in some cases

This commit adds a new `DataBuffer#write(CharSequence, Charset)` method
for those cases and also an `ensureCapacity` method useful for checking
that the current buffer has enough capacity to write to it..

Issue: SPR-17558
pull/2061/head
Brian Clozel 7 years ago
parent
commit
6361b0cb23
  1. 58
      spring-core/src/main/java/org/springframework/core/io/buffer/DataBuffer.java
  2. 16
      spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBuffer.java
  3. 29
      spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBuffer.java
  4. 65
      spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java
  5. 11
      spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBuffer.java
  6. 11
      spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java
  7. 17
      spring-webflux/src/test/java/org/springframework/web/reactive/function/BodyInsertersTests.java

58
spring-core/src/main/java/org/springframework/core/io/buffer/DataBuffer.java

@ -19,8 +19,15 @@ package org.springframework.core.io.buffer; @@ -19,8 +19,15 @@ package org.springframework.core.io.buffer;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.util.function.IntPredicate;
import org.springframework.util.Assert;
/**
* Basic abstraction over byte buffers.
*
@ -45,6 +52,7 @@ import java.util.function.IntPredicate; @@ -45,6 +52,7 @@ import java.util.function.IntPredicate;
* can also be used on non-Netty platforms (i.e. Servlet containers).
*
* @author Arjen Poutsma
* @author Brian Clozel
* @since 5.0
* @see DataBufferFactory
*/
@ -106,6 +114,16 @@ public interface DataBuffer { @@ -106,6 +114,16 @@ public interface DataBuffer {
*/
DataBuffer capacity(int capacity);
/**
* Ensure that the current buffer has enough {@link #writableByteCount()}
* to write the amount of data given as an argument. If not, the missing
* capacity will be added to the buffer.
* @param capacity the writable capacity to check for
* @return this buffer
* @since 5.1.4
*/
DataBuffer ensureCapacity(int capacity);
/**
* Return the position from which this buffer will read.
* @return the read position
@ -181,7 +199,7 @@ public interface DataBuffer { @@ -181,7 +199,7 @@ public interface DataBuffer {
DataBuffer write(byte b);
/**
* Write the given source into this buffer, startin at the current writing position
* Write the given source into this buffer, starting at the current writing position
* of this buffer.
* @param source the bytes to be written into this buffer
* @return this buffer
@ -215,6 +233,44 @@ public interface DataBuffer { @@ -215,6 +233,44 @@ public interface DataBuffer {
*/
DataBuffer write(ByteBuffer... buffers);
/**
* Write the given {@code CharSequence} using the given {@code Charset},
* starting at the current writing position.
* @param charSequence the char sequence to write into this buffer
* @param charset the charset to encode the char sequence with
* @return this buffer
* @since 5.1.4
*/
default DataBuffer write(CharSequence charSequence, Charset charset) {
Assert.notNull(charSequence, "'charSequence' must not be null");
Assert.notNull(charset, "'charset' must not be null");
CharsetEncoder charsetEncoder = charset.newEncoder()
.onMalformedInput(CodingErrorAction.REPLACE)
.onUnmappableCharacter(CodingErrorAction.REPLACE);
CharBuffer inBuffer = CharBuffer.wrap(charSequence);
int estimatedSize = (int) (inBuffer.remaining() * charsetEncoder.averageBytesPerChar());
ByteBuffer outBuffer = ensureCapacity(estimatedSize)
.asByteBuffer(writePosition(), writableByteCount());
for (; ; ) {
CoderResult cr = inBuffer.hasRemaining() ?
charsetEncoder.encode(inBuffer, outBuffer, true) : CoderResult.UNDERFLOW;
if (cr.isUnderflow()) {
cr = charsetEncoder.flush(outBuffer);
}
if (cr.isUnderflow()) {
break;
}
if (cr.isOverflow()) {
writePosition(outBuffer.position());
int maximumSize = (int) (inBuffer.remaining() * charsetEncoder.maxBytesPerChar());
ensureCapacity(maximumSize);
outBuffer = asByteBuffer(writePosition(), writableByteCount());
}
}
writePosition(outBuffer.position());
return this;
}
/**
* Create a new {@code DataBuffer} whose contents is a shared subsequence of this
* data buffer's content. Data between this data buffer and the returned buffer is

16
spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBuffer.java

@ -215,6 +215,15 @@ public class DefaultDataBuffer implements DataBuffer { @@ -215,6 +215,15 @@ public class DefaultDataBuffer implements DataBuffer {
return this;
}
@Override
public DataBuffer ensureCapacity(int length) {
if (length > writableByteCount()) {
int newCapacity = calculateCapacity(this.writePosition + length);
capacity(newCapacity);
}
return this;
}
private static ByteBuffer allocate(int capacity, boolean direct) {
return direct ? ByteBuffer.allocateDirect(capacity) : ByteBuffer.allocate(capacity);
}
@ -369,13 +378,6 @@ public class DefaultDataBuffer implements DataBuffer { @@ -369,13 +378,6 @@ public class DefaultDataBuffer implements DataBuffer {
return new DefaultDataBufferOutputStream();
}
private void ensureCapacity(int length) {
if (length <= writableByteCount()) {
return;
}
int newCapacity = calculateCapacity(this.writePosition + length);
capacity(newCapacity);
}
/**
* Calculate the capacity of the buffer.

29
spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBuffer.java

@ -19,11 +19,14 @@ package org.springframework.core.io.buffer; @@ -19,11 +19,14 @@ package org.springframework.core.io.buffer;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.function.IntPredicate;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.ByteBufUtil;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
@ -138,6 +141,12 @@ public class NettyDataBuffer implements PooledDataBuffer { @@ -138,6 +141,12 @@ public class NettyDataBuffer implements PooledDataBuffer {
return this;
}
@Override
public DataBuffer ensureCapacity(int capacity) {
this.byteBuf.ensureWritable(capacity);
return this;
}
@Override
public byte read() {
return this.byteBuf.readByte();
@ -178,14 +187,14 @@ public class NettyDataBuffer implements PooledDataBuffer { @@ -178,14 +187,14 @@ public class NettyDataBuffer implements PooledDataBuffer {
if (!ObjectUtils.isEmpty(buffers)) {
if (hasNettyDataBuffers(buffers)) {
ByteBuf[] nativeBuffers = new ByteBuf[buffers.length];
for (int i = 0 ; i < buffers.length; i++) {
for (int i = 0; i < buffers.length; i++) {
nativeBuffers[i] = ((NettyDataBuffer) buffers[i]).getNativeBuffer();
}
write(nativeBuffers);
}
else {
ByteBuffer[] byteBuffers = new ByteBuffer[buffers.length];
for (int i = 0 ; i < buffers.length; i++) {
for (int i = 0; i < buffers.length; i++) {
byteBuffers[i] = buffers[i].asByteBuffer();
}
@ -229,6 +238,22 @@ public class NettyDataBuffer implements PooledDataBuffer { @@ -229,6 +238,22 @@ public class NettyDataBuffer implements PooledDataBuffer {
return this;
}
@Override
public DataBuffer write(CharSequence charSequence, Charset charset) {
Assert.notNull(charSequence, "'charSequence' must not be null");
Assert.notNull(charset, "'charset' must not be null");
if (StandardCharsets.UTF_8.equals(charset)) {
ByteBufUtil.writeUtf8(this.byteBuf, charSequence);
}
else if (StandardCharsets.US_ASCII.equals(charset)) {
ByteBufUtil.writeAscii(this.byteBuf, charSequence);
}
else {
return PooledDataBuffer.super.write(charSequence, charset);
}
return this;
}
@Override
public NettyDataBuffer slice(int index, int length) {
ByteBuf slice = this.byteBuf.slice(index, length);

65
spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java

@ -20,6 +20,7 @@ import java.io.IOException; @@ -20,6 +20,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import org.junit.Test;
@ -150,6 +151,70 @@ public class DataBufferTests extends AbstractDataBufferAllocatingTestCase { @@ -150,6 +151,70 @@ public class DataBufferTests extends AbstractDataBufferAllocatingTestCase {
release(buffer);
}
@Test
public void writeNullString() {
DataBuffer buffer = createDataBuffer(1);
try {
buffer.write(null, StandardCharsets.UTF_8);
fail("IllegalArgumentException expected");
}
catch (IllegalArgumentException exc) {
}
finally {
release(buffer);
}
}
@Test
public void writeNullCharset() {
DataBuffer buffer = createDataBuffer(1);
try {
buffer.write("test", null);
fail("IllegalArgumentException expected");
}
catch (IllegalArgumentException exc) {
}
finally {
release(buffer);
}
}
@Test
public void writeUtf8String() {
DataBuffer buffer = createDataBuffer(6);
buffer.write("Spring", StandardCharsets.UTF_8);
byte[] result = new byte[6];
buffer.read(result);
assertArrayEquals("Spring".getBytes(StandardCharsets.UTF_8), result);
release(buffer);
}
@Test
public void writeUtf8StringOutGrowsCapacity() {
DataBuffer buffer = createDataBuffer(5);
buffer.write("Spring €", StandardCharsets.UTF_8);
byte[] result = new byte[10];
buffer.read(result);
assertArrayEquals("Spring €".getBytes(StandardCharsets.UTF_8), result);
release(buffer);
}
@Test
public void writeIsoString() {
DataBuffer buffer = createDataBuffer(3);
buffer.write("\u00A3", StandardCharsets.ISO_8859_1);
byte[] result = new byte[1];
buffer.read(result);
assertArrayEquals("\u00A3".getBytes(StandardCharsets.ISO_8859_1), result);
release(buffer);
}
@Test
public void inputStream() throws IOException {
DataBuffer buffer = createDataBuffer(4);

11
spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBuffer.java

@ -19,6 +19,7 @@ package org.springframework.core.io.buffer; @@ -19,6 +19,7 @@ package org.springframework.core.io.buffer;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.function.IntPredicate;
import org.springframework.util.Assert;
@ -139,6 +140,11 @@ class LeakAwareDataBuffer implements PooledDataBuffer { @@ -139,6 +140,11 @@ class LeakAwareDataBuffer implements PooledDataBuffer {
return this.delegate.capacity(newCapacity);
}
@Override
public DataBuffer ensureCapacity(int capacity) {
return this.delegate.ensureCapacity(capacity);
}
@Override
public byte getByte(int index) {
return this.delegate.getByte(index);
@ -184,6 +190,11 @@ class LeakAwareDataBuffer implements PooledDataBuffer { @@ -184,6 +190,11 @@ class LeakAwareDataBuffer implements PooledDataBuffer {
return this.delegate.write(byteBuffers);
}
@Override
public DataBuffer write(CharSequence charSequence, Charset charset) {
return this.delegate.write(charSequence, charset);
}
@Override
public DataBuffer slice(int index, int length) {
return this.delegate.slice(index, length);

11
spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java

@ -23,6 +23,7 @@ import java.net.InetSocketAddress; @@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.function.IntPredicate;
import javax.net.ssl.SSLSession;
@ -294,6 +295,11 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest { @@ -294,6 +295,11 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest {
return this.dataBuffer.capacity(newCapacity);
}
@Override
public DataBuffer ensureCapacity(int capacity) {
return this.dataBuffer.ensureCapacity(capacity);
}
@Override
public byte getByte(int index) {
return this.dataBuffer.getByte(index);
@ -343,6 +349,11 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest { @@ -343,6 +349,11 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest {
return this.dataBuffer.write(byteBuffers);
}
@Override
public DataBuffer write(CharSequence charSequence, Charset charset) {
return this.dataBuffer.write(charSequence, charset);
}
@Override
public DataBuffer slice(int index, int length) {
return this.dataBuffer.slice(index, length);

17
spring-webflux/src/test/java/org/springframework/web/reactive/function/BodyInsertersTests.java

@ -30,6 +30,7 @@ import java.util.Map; @@ -30,6 +30,7 @@ import java.util.Map;
import java.util.Optional;
import com.fasterxml.jackson.annotation.JsonView;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Flux;
@ -44,6 +45,7 @@ import org.springframework.core.io.buffer.DataBuffer; @@ -44,6 +45,7 @@ import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.core.io.buffer.support.DataBufferTestUtils;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpRange;
import org.springframework.http.ReactiveHttpOutputMessage;
@ -121,10 +123,11 @@ public class BodyInsertersTests { @@ -121,10 +123,11 @@ public class BodyInsertersTests {
MockServerHttpResponse response = new MockServerHttpResponse();
Mono<Void> result = inserter.insert(response, this.context);
StepVerifier.create(result).expectComplete().verify();
DataBuffer buffer = new DefaultDataBufferFactory().wrap(body.getBytes(UTF_8));
StepVerifier.create(response.getBody())
.expectNext(buffer)
.consumeNextWith(buf -> {
String actual = DataBufferTestUtils.dumpString(buf, UTF_8);
Assert.assertEquals("foo", actual);
})
.expectComplete()
.verify();
}
@ -166,11 +169,11 @@ public class BodyInsertersTests { @@ -166,11 +169,11 @@ public class BodyInsertersTests {
MockServerHttpResponse response = new MockServerHttpResponse();
Mono<Void> result = inserter.insert(response, this.context);
StepVerifier.create(result).expectComplete().verify();
ByteBuffer byteBuffer = ByteBuffer.wrap("foo".getBytes(UTF_8));
DataBuffer buffer = new DefaultDataBufferFactory().wrap(byteBuffer);
StepVerifier.create(response.getBody())
.expectNext(buffer)
.consumeNextWith(buf -> {
String actual = DataBufferTestUtils.dumpString(buf, UTF_8);
Assert.assertEquals("foo", actual);
})
.expectComplete()
.verify();
}

Loading…
Cancel
Save