diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBuffer.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBuffer.java index 00985426d15..2161e6be793 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBuffer.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBuffer.java @@ -186,6 +186,28 @@ public interface DataBuffer { */ byte getByte(int index); + /** + * Process a range of bytes from the current buffer using a + * {@link ByteProcessor}. + * @param index the index at which the processing will start + * @param length the maximum number of bytes to be processed + * @param processor the processor that consumes bytes + * @return the position that was reached when processing was stopped, + * or {@code -1} if the entire byte range was processed. + * @throws IndexOutOfBoundsException when {@code index} is out of bounds + * @since 6.2.12 + */ + default int forEachByte(int index, int length, ByteProcessor processor) { + Assert.isTrue(length >= 0, "Length must be >= 0"); + for (int position = index; position < index + length; position++) { + byte b = getByte(position); + if(!processor.process(b)) { + return position; + } + } + return -1; + } + /** * Read a single byte from the current reading position from this data buffer. * @return the byte at this buffer's current reading position @@ -531,4 +553,22 @@ public interface DataBuffer { void close(); } + /** + * Process a range of bytes one by one. + * @since 6.2.12 + */ + @FunctionalInterface + interface ByteProcessor { + + /** + * Process the given {@code byte} and indicate whether processing + * should continue further. + * @param b a byte from the {@link DataBuffer} + * @return {@code true} if processing should continue, + * or {@code false} if processing should stop at this element. + */ + boolean process(byte b); + + } + } diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index 568c7be82c5..9b9d4b9f79c 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -832,13 +832,9 @@ public abstract class DataBufferUtils { @Override public int match(DataBuffer dataBuffer) { - for (int pos = dataBuffer.readPosition(); pos < dataBuffer.writePosition(); pos++) { - byte b = dataBuffer.getByte(pos); - if (match(b)) { - return pos; - } - } - return -1; + int start = dataBuffer.readPosition(); + int end = dataBuffer.writePosition(); + return dataBuffer.forEachByte(start, end - start, b -> !this.match(b)); } @Override @@ -881,14 +877,13 @@ public abstract class DataBufferUtils { @Override public int match(DataBuffer dataBuffer) { - for (int pos = dataBuffer.readPosition(); pos < dataBuffer.writePosition(); pos++) { - byte b = dataBuffer.getByte(pos); - if (match(b)) { - reset(); - return pos; - } + int start = dataBuffer.readPosition(); + int end = dataBuffer.writePosition(); + int matchPosition = dataBuffer.forEachByte(start, end - start, b -> !this.match(b)); + if (matchPosition != -1) { + reset(); } - return -1; + return matchPosition; } @Override diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBuffer.java b/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBuffer.java index a769f5e2af3..7ba28200d68 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBuffer.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBuffer.java @@ -129,6 +129,11 @@ public class NettyDataBuffer implements PooledDataBuffer { return this.byteBuf.getByte(index); } + @Override + public int forEachByte(int index, int length, ByteProcessor processor) { + return this.byteBuf.forEachByte(index, length, processor::process); + } + @Override public int capacity() { return this.byteBuf.capacity(); diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java index 6fe20c291ae..55f82ce23a0 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java @@ -21,7 +21,9 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import org.springframework.core.testfixture.io.buffer.AbstractDataBufferAllocatingTests; @@ -1045,4 +1047,35 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { release(buffer); } + @ParameterizedDataBufferAllocatingTest + void forEachByteProcessAll(DataBufferFactory bufferFactory) { + super.bufferFactory = bufferFactory; + + List result = new ArrayList<>(); + DataBuffer buffer = byteBuffer(new byte[]{'a', 'b', 'c', 'd'}); + int index = buffer.forEachByte(0, 4, b -> { + result.add(b); + return true; + }); + assertThat(index).isEqualTo(-1); + assertThat(result).containsExactly((byte) 'a', (byte) 'b', (byte) 'c', (byte) 'd'); + release(buffer); + } + + + @ParameterizedDataBufferAllocatingTest + void forEachByteProcessSome(DataBufferFactory bufferFactory) { + super.bufferFactory = bufferFactory; + + List result = new ArrayList<>(); + DataBuffer buffer = byteBuffer(new byte[]{'a', 'b', 'c', 'd'}); + int index = buffer.forEachByte(0, 4, b -> { + result.add(b); + return (b != 'c'); + }); + assertThat(index).isEqualTo(2); + assertThat(result).containsExactly((byte) 'a', (byte) 'b', (byte) 'c'); + release(buffer); + } + }