diff --git a/spring-core/src/main/java/org/springframework/core/codec/AbstractCharSequenceDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/AbstractCharSequenceDecoder.java new file mode 100644 index 00000000000..340792259bb --- /dev/null +++ b/spring-core/src/main/java/org/springframework/core/codec/AbstractCharSequenceDecoder.java @@ -0,0 +1,205 @@ +/* + * Copyright 2002-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.codec; + +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.core.ResolvableType; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.LimitedDataBufferList; +import org.springframework.core.log.LogFormatUtils; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; +import org.springframework.util.MimeType; + +/** + * Abstract base class that decodes from a data buffer stream to a + * {@code CharSequence} stream. + * + * @author Arjen Poutsma + * @since 6.1 + * @param the character sequence type + */ +public abstract class AbstractCharSequenceDecoder extends AbstractDataBufferDecoder { + + /** The default charset to use, i.e. "UTF-8". */ + public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8; + + /** The default delimiter strings to use, i.e. {@code \r\n} and {@code \n}. */ + public static final List DEFAULT_DELIMITERS = List.of("\r\n", "\n"); + + + private final List delimiters; + + private final boolean stripDelimiter; + + private Charset defaultCharset = DEFAULT_CHARSET; + + private final ConcurrentMap delimitersCache = new ConcurrentHashMap<>(); + + + /** + * Create a new {@code AbstractCharSequenceDecoder} with the given parameters. + */ + protected AbstractCharSequenceDecoder(List delimiters, boolean stripDelimiter, MimeType... mimeTypes) { + super(mimeTypes); + Assert.notEmpty(delimiters, "'delimiters' must not be empty"); + this.delimiters = new ArrayList<>(delimiters); + this.stripDelimiter = stripDelimiter; + } + + + /** + * Set the default character set to fall back on if the MimeType does not specify any. + *

By default this is {@code UTF-8}. + * @param defaultCharset the charset to fall back on + */ + public void setDefaultCharset(Charset defaultCharset) { + this.defaultCharset = defaultCharset; + } + + /** + * Return the configured {@link #setDefaultCharset(Charset) defaultCharset}. + */ + public Charset getDefaultCharset() { + return this.defaultCharset; + } + + + @Override + public final Flux decode(Publisher input, ResolvableType elementType, + @Nullable MimeType mimeType, @Nullable Map hints) { + + byte[][] delimiterBytes = getDelimiterBytes(mimeType); + + LimitedDataBufferList chunks = new LimitedDataBufferList(getMaxInMemorySize()); + DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delimiterBytes); + + return Flux.from(input) + .concatMapIterable(buffer -> processDataBuffer(buffer, matcher, chunks)) + .concatWith(Mono.defer(() -> { + if (chunks.isEmpty()) { + return Mono.empty(); + } + DataBuffer lastBuffer = chunks.get(0).factory().join(chunks); + chunks.clear(); + return Mono.just(lastBuffer); + })) + .doFinally(signalType -> chunks.releaseAndClear()) + .doOnDiscard(DataBuffer.class, DataBufferUtils::release) + .map(buffer -> decode(buffer, elementType, mimeType, hints)); + } + + private byte[][] getDelimiterBytes(@Nullable MimeType mimeType) { + return this.delimitersCache.computeIfAbsent(getCharset(mimeType), charset -> { + byte[][] result = new byte[this.delimiters.size()][]; + for (int i = 0; i < this.delimiters.size(); i++) { + result[i] = this.delimiters.get(i).getBytes(charset); + } + return result; + }); + } + + private Collection processDataBuffer(DataBuffer buffer, DataBufferUtils.Matcher matcher, + LimitedDataBufferList chunks) { + + boolean release = true; + try { + List result = null; + do { + int endIndex = matcher.match(buffer); + if (endIndex == -1) { + chunks.add(buffer); + release = false; + break; + } + DataBuffer split = buffer.split(endIndex + 1); + if (result == null) { + result = new ArrayList<>(); + } + int delimiterLength = matcher.delimiter().length; + if (chunks.isEmpty()) { + if (this.stripDelimiter) { + split.writePosition(split.writePosition() - delimiterLength); + } + result.add(split); + } + else { + chunks.add(split); + DataBuffer joined = buffer.factory().join(chunks); + if (this.stripDelimiter) { + joined.writePosition(joined.writePosition() - delimiterLength); + } + result.add(joined); + chunks.clear(); + } + } + while (buffer.readableByteCount() > 0); + return (result != null ? result : Collections.emptyList()); + } + finally { + if (release) { + DataBufferUtils.release(buffer); + } + } + } + + @Override + public final T decode(DataBuffer dataBuffer, ResolvableType elementType, + @Nullable MimeType mimeType, @Nullable Map hints) { + + Charset charset = getCharset(mimeType); + T value = decodeInternal(dataBuffer, charset); + DataBufferUtils.release(dataBuffer); + LogFormatUtils.traceDebug(logger, traceOn -> { + String formatted = LogFormatUtils.formatValue(value, !traceOn); + return Hints.getLogPrefix(hints) + "Decoded " + formatted; + }); + return value; + } + + private Charset getCharset(@Nullable MimeType mimeType) { + if (mimeType != null) { + Charset charset = mimeType.getCharset(); + if (charset != null) { + return charset; + } + } + return getDefaultCharset(); + } + + + /** + * Template method that decodes the given data buffer into {@code T}, given + * the charset. + */ + protected abstract T decodeInternal(DataBuffer dataBuffer, Charset charset); + +} diff --git a/spring-core/src/main/java/org/springframework/core/codec/CharBufferDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/CharBufferDecoder.java index 93a3f965eb7..7c87b5039dc 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/CharBufferDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/CharBufferDecoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,28 +16,14 @@ package org.springframework.core.codec; +import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.reactivestreams.Publisher; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DataBufferUtils; -import org.springframework.core.io.buffer.LimitedDataBufferList; -import org.springframework.core.log.LogFormatUtils; import org.springframework.lang.Nullable; -import org.springframework.util.Assert; import org.springframework.util.MimeType; import org.springframework.util.MimeTypeUtils; @@ -49,55 +35,16 @@ import org.springframework.util.MimeTypeUtils; * avoiding split-character issues. The default delimiters used by default are * {@code \n} and {@code \r\n} but that can be customized. * - * @author Sebastien Deleuze - * @author Brian Clozel + * @author Markus Heiden * @author Arjen Poutsma - * @author Mark Paluch + * @since 6.1 * @see CharSequenceEncoder */ -public final class CharBufferDecoder extends AbstractDataBufferDecoder { - - /** - * The default charset "UTF-8". - */ - public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8; - - /** - * The default delimiter strings to use, i.e. {@code \r\n} and {@code \n}. - */ - public static final List DEFAULT_DELIMITERS = List.of("\r\n", "\n"); - - - private final List delimiters; +public final class CharBufferDecoder extends AbstractCharSequenceDecoder { - private final boolean stripDelimiter; - private Charset defaultCharset = DEFAULT_CHARSET; - - private final ConcurrentMap delimitersCache = new ConcurrentHashMap<>(); - - - private CharBufferDecoder(List delimiters, boolean stripDelimiter, MimeType... mimeTypes) { - super(mimeTypes); - Assert.notEmpty(delimiters, "'delimiters' must not be empty"); - this.delimiters = new ArrayList<>(delimiters); - this.stripDelimiter = stripDelimiter; - } - - /** - * Set the default character set to fall back on if the MimeType does not specify any. - *

By default this is {@code UTF-8}. - * @param defaultCharset the charset to fall back on - */ - public void setDefaultCharset(Charset defaultCharset) { - this.defaultCharset = defaultCharset; - } - - /** - * Return the configured {@link #setDefaultCharset(Charset) defaultCharset}. - */ - public Charset getDefaultCharset() { - return this.defaultCharset; + public CharBufferDecoder(List delimiters, boolean stripDelimiter, MimeType... mimeTypes) { + super(delimiters, stripDelimiter, mimeTypes); } @Override @@ -106,105 +53,12 @@ public final class CharBufferDecoder extends AbstractDataBufferDecoder decode(Publisher input, ResolvableType elementType, - @Nullable MimeType mimeType, @Nullable Map hints) { - - byte[][] delimiterBytes = getDelimiterBytes(mimeType); - - LimitedDataBufferList chunks = new LimitedDataBufferList(getMaxInMemorySize()); - DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delimiterBytes); - - return Flux.from(input) - .concatMapIterable(buffer -> processDataBuffer(buffer, matcher, chunks)) - .concatWith(Mono.defer(() -> { - if (chunks.isEmpty()) { - return Mono.empty(); - } - DataBuffer lastBuffer = chunks.get(0).factory().join(chunks); - chunks.clear(); - return Mono.just(lastBuffer); - })) - .doOnTerminate(chunks::releaseAndClear) - .doOnDiscard(DataBuffer.class, DataBufferUtils::release) - .map(buffer -> decode(buffer, elementType, mimeType, hints)); - } - - private byte[][] getDelimiterBytes(@Nullable MimeType mimeType) { - return this.delimitersCache.computeIfAbsent(getCharset(mimeType), charset -> { - byte[][] result = new byte[this.delimiters.size()][]; - for (int i = 0; i < this.delimiters.size(); i++) { - result[i] = this.delimiters.get(i).getBytes(charset); - } - return result; - }); - } - - private Collection processDataBuffer( - DataBuffer buffer, DataBufferUtils.Matcher matcher, LimitedDataBufferList chunks) { - - boolean release = true; - try { - List result = null; - do { - int endIndex = matcher.match(buffer); - if (endIndex == -1) { - chunks.add(buffer); - release = false; - break; - } - DataBuffer split = buffer.split(endIndex + 1); - if (result == null) { - result = new ArrayList<>(); - } - int delimiterLength = matcher.delimiter().length; - if (chunks.isEmpty()) { - if (this.stripDelimiter) { - split.writePosition(split.writePosition() - delimiterLength); - } - result.add(split); - } - else { - chunks.add(split); - DataBuffer joined = buffer.factory().join(chunks); - if (this.stripDelimiter) { - joined.writePosition(joined.writePosition() - delimiterLength); - } - result.add(joined); - chunks.clear(); - } - } - while (buffer.readableByteCount() > 0); - return (result != null ? result : Collections.emptyList()); - } - finally { - if (release) { - DataBufferUtils.release(buffer); - } - } - } - - @Override - public CharBuffer decode(DataBuffer dataBuffer, ResolvableType elementType, - @Nullable MimeType mimeType, @Nullable Map hints) { - - Charset charset = getCharset(mimeType); - CharBuffer charBuffer = charset.decode(dataBuffer.toByteBuffer()); - DataBufferUtils.release(dataBuffer); - LogFormatUtils.traceDebug(logger, traceOn -> { - String formatted = LogFormatUtils.formatValue(charBuffer, !traceOn); - return Hints.getLogPrefix(hints) + "Decoded " + formatted; - }); - return charBuffer; + protected CharBuffer decodeInternal(DataBuffer dataBuffer, Charset charset) { + ByteBuffer byteBuffer = ByteBuffer.allocate(dataBuffer.readableByteCount()); + dataBuffer.toByteBuffer(byteBuffer); + return charset.decode(byteBuffer); } - private Charset getCharset(@Nullable MimeType mimeType) { - if (mimeType != null && mimeType.getCharset() != null) { - return mimeType.getCharset(); - } - else { - return getDefaultCharset(); - } - } /** * Create a {@code CharBufferDecoder} for {@code "text/plain"}. diff --git a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java index 602734f43b4..d68abc2a33e 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java @@ -17,26 +17,11 @@ package org.springframework.core.codec; import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.reactivestreams.Publisher; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DataBufferUtils; -import org.springframework.core.io.buffer.LimitedDataBufferList; -import org.springframework.core.log.LogFormatUtils; import org.springframework.lang.Nullable; -import org.springframework.util.Assert; import org.springframework.util.MimeType; import org.springframework.util.MimeTypeUtils; @@ -55,48 +40,10 @@ import org.springframework.util.MimeTypeUtils; * @since 5.0 * @see CharSequenceEncoder */ -public final class StringDecoder extends AbstractDataBufferDecoder { - - /** The default charset to use, i.e. "UTF-8". */ - public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8; - - /** The default delimiter strings to use, i.e. {@code \r\n} and {@code \n}. */ - public static final List DEFAULT_DELIMITERS = List.of("\r\n", "\n"); - - - private final List delimiters; - - private final boolean stripDelimiter; - - private Charset defaultCharset = DEFAULT_CHARSET; - - private final ConcurrentMap delimitersCache = new ConcurrentHashMap<>(); - +public final class StringDecoder extends AbstractCharSequenceDecoder { private StringDecoder(List delimiters, boolean stripDelimiter, MimeType... mimeTypes) { - super(mimeTypes); - Assert.notEmpty(delimiters, "'delimiters' must not be empty"); - this.delimiters = new ArrayList<>(delimiters); - this.stripDelimiter = stripDelimiter; - } - - - /** - * Set the default character set to fall back on if the MimeType does not specify any. - *

By default this is {@code UTF-8}. - * @param defaultCharset the charset to fall back on - * @since 5.2.9 - */ - public void setDefaultCharset(Charset defaultCharset) { - this.defaultCharset = defaultCharset; - } - - /** - * Return the configured {@link #setDefaultCharset(Charset) defaultCharset}. - * @since 5.2.9 - */ - public Charset getDefaultCharset() { - return this.defaultCharset; + super(delimiters, stripDelimiter, mimeTypes); } @@ -105,106 +52,12 @@ public final class StringDecoder extends AbstractDataBufferDecoder { return (elementType.resolve() == String.class && super.canDecode(elementType, mimeType)); } - @Override - public Flux decode(Publisher input, ResolvableType elementType, - @Nullable MimeType mimeType, @Nullable Map hints) { - - byte[][] delimiterBytes = getDelimiterBytes(mimeType); - - LimitedDataBufferList chunks = new LimitedDataBufferList(getMaxInMemorySize()); - DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delimiterBytes); - - return Flux.from(input) - .concatMapIterable(buffer -> processDataBuffer(buffer, matcher, chunks)) - .concatWith(Mono.defer(() -> { - if (chunks.isEmpty()) { - return Mono.empty(); - } - DataBuffer lastBuffer = chunks.get(0).factory().join(chunks); - chunks.clear(); - return Mono.just(lastBuffer); - })) - .doFinally(signalType -> chunks.releaseAndClear()) - .doOnDiscard(DataBuffer.class, DataBufferUtils::release) - .map(buffer -> decode(buffer, elementType, mimeType, hints)); - } - - private byte[][] getDelimiterBytes(@Nullable MimeType mimeType) { - return this.delimitersCache.computeIfAbsent(getCharset(mimeType), charset -> { - byte[][] result = new byte[this.delimiters.size()][]; - for (int i = 0; i < this.delimiters.size(); i++) { - result[i] = this.delimiters.get(i).getBytes(charset); - } - return result; - }); - } - - private Collection processDataBuffer( - DataBuffer buffer, DataBufferUtils.Matcher matcher, LimitedDataBufferList chunks) { - - boolean release = true; - try { - List result = null; - do { - int endIndex = matcher.match(buffer); - if (endIndex == -1) { - chunks.add(buffer); - release = false; - break; - } - DataBuffer split = buffer.split(endIndex + 1); - if (result == null) { - result = new ArrayList<>(); - } - int delimiterLength = matcher.delimiter().length; - if (chunks.isEmpty()) { - if (this.stripDelimiter) { - split.writePosition(split.writePosition() - delimiterLength); - } - result.add(split); - } - else { - chunks.add(split); - DataBuffer joined = buffer.factory().join(chunks); - if (this.stripDelimiter) { - joined.writePosition(joined.writePosition() - delimiterLength); - } - result.add(joined); - chunks.clear(); - } - } - while (buffer.readableByteCount() > 0); - return (result != null ? result : Collections.emptyList()); - } - finally { - if (release) { - DataBufferUtils.release(buffer); - } - } - } @Override - public String decode(DataBuffer dataBuffer, ResolvableType elementType, - @Nullable MimeType mimeType, @Nullable Map hints) { - - Charset charset = getCharset(mimeType); - String value = dataBuffer.toString(charset); - DataBufferUtils.release(dataBuffer); - LogFormatUtils.traceDebug(logger, traceOn -> { - String formatted = LogFormatUtils.formatValue(value, !traceOn); - return Hints.getLogPrefix(hints) + "Decoded " + formatted; - }); - return value; + protected String decodeInternal(DataBuffer dataBuffer, Charset charset) { + return dataBuffer.toString(charset); } - private Charset getCharset(@Nullable MimeType mimeType) { - if (mimeType != null && mimeType.getCharset() != null) { - return mimeType.getCharset(); - } - else { - return getDefaultCharset(); - } - } /** * Create a {@code StringDecoder} for {@code "text/plain"}. diff --git a/spring-core/src/test/java/org/springframework/core/codec/CharBufferDecoderTests.java b/spring-core/src/test/java/org/springframework/core/codec/CharBufferDecoderTests.java index f5070ab91ec..6f65b10f9ac 100644 --- a/spring-core/src/test/java/org/springframework/core/codec/CharBufferDecoderTests.java +++ b/spring-core/src/test/java/org/springframework/core/codec/CharBufferDecoderTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ package org.springframework.core.codec; import java.nio.CharBuffer; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -41,9 +42,8 @@ import static org.assertj.core.api.Assertions.assertThat; /** * Unit tests for {@link CharBufferDecoder}. * - * @author Sebastien Deleuze - * @author Brian Clozel - * @author Mark Paluch + * @author Markus Heiden + * @author Arjen Poutsma */ class CharBufferDecoderTests extends AbstractDecoderTests { @@ -71,14 +71,7 @@ class CharBufferDecoderTests extends AbstractDecoderTests { CharBuffer e = charBuffer("é"); CharBuffer o = charBuffer("ø"); String s = String.format("%s\n%s\n%s", u, e, o); - Flux input = buffers(s, 1, UTF_8); - - // TODO: temporarily replace testDecodeAll with explicit decode/cancel/empty - // see https://github.com/reactor/reactor-core/issues/2041 - -// testDecode(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null); -// testDecodeCancel(input, TYPE, null, null); -// testDecodeEmpty(TYPE, null, null); + Flux input = toDataBuffers(s, 1, UTF_8); testDecodeAll(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null); } @@ -89,33 +82,38 @@ class CharBufferDecoderTests extends AbstractDecoderTests { CharBuffer e = charBuffer("é"); CharBuffer o = charBuffer("ø"); String s = String.format("%s\n%s\n%s", u, e, o); - Flux source = buffers(s, 2, UTF_16BE); + Flux source = toDataBuffers(s, 2, UTF_16BE); MimeType mimeType = MimeTypeUtils.parseMimeType("text/plain;charset=utf-16be"); testDecode(source, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), mimeType, null); } - private Flux buffers(String s, int length, Charset charset) { + private Flux toDataBuffers(String s, int length, Charset charset) { byte[] bytes = s.getBytes(charset); List chunks = new ArrayList<>(); for (int i = 0; i < bytes.length; i += length) { chunks.add(Arrays.copyOfRange(bytes, i, i + length)); } return Flux.fromIterable(chunks) - .map(this::buffer); + .map(chunk -> { + DataBuffer dataBuffer = this.bufferFactory.allocateBuffer(length); + dataBuffer.write(chunk, 0, chunk.length); + return dataBuffer; + }); } @Test void decodeNewLine() { - Flux input = buffers( - "\r\nabc\n", - "def", - "ghi\r\n\n", - "jkl", - "mno\npqr\n", - "stu", - "vw", - "xyz"); + Flux input = Flux.just( + stringBuffer("\r\nabc\n"), + stringBuffer("def"), + stringBuffer("ghi\r\n\n"), + stringBuffer("jkl"), + stringBuffer("mno\npqr\n"), + stringBuffer("stu"), + stringBuffer("vw"), + stringBuffer("xyz") + ); testDecode(input, CharBuffer.class, step -> step .expectNext(charBuffer("")).as("1st") @@ -130,11 +128,12 @@ class CharBufferDecoderTests extends AbstractDecoderTests { } @Test - void decodeNewlinesAcrossBuffers() { - Flux input = buffers( - "\r", - "\n", - "xyz"); + void decodeNewlinesAcrossBuffers() { + Flux input = Flux.just( + stringBuffer("\r"), + stringBuffer("\n"), + stringBuffer("xyz") + ); testDecode(input, CharBuffer.class, step -> step .expectNext(charBuffer("")) @@ -145,9 +144,9 @@ class CharBufferDecoderTests extends AbstractDecoderTests { @Test void maxInMemoryLimit() { - Flux input = buffers( - "abc\n", "defg\n", - "hi", "jkl", "mnop"); + Flux input = Flux.just( + stringBuffer("abc\n"), stringBuffer("defg\n"), + stringBuffer("hi"), stringBuffer("jkl"), stringBuffer("mnop")); this.decoder.setMaxInMemorySize(5); @@ -159,8 +158,8 @@ class CharBufferDecoderTests extends AbstractDecoderTests { @Test void maxInMemoryLimitDoesNotApplyToParsedItemsThatDontRequireBuffering() { - Flux input = buffers( - "TOO MUCH DATA\nanother line\n\nand another\n"); + Flux input = Flux.just( + stringBuffer("TOO MUCH DATA\nanother line\n\nand another\n")); this.decoder.setMaxInMemorySize(5); @@ -176,10 +175,9 @@ class CharBufferDecoderTests extends AbstractDecoderTests { @Test // gh-24339 void maxInMemoryLimitReleaseUnprocessedLinesWhenUnlimited() { - Flux input = buffers("Line 1\nLine 2\nLine 3\n"); + Flux input = Flux.just(stringBuffer("Line 1\nLine 2\nLine 3\n")); this.decoder.setMaxInMemorySize(-1); - testDecodeCancel(input, ResolvableType.forClass(String.class), null, Collections.emptyMap()); } @@ -187,15 +185,16 @@ class CharBufferDecoderTests extends AbstractDecoderTests { void decodeNewLineIncludeDelimiters() { this.decoder = CharBufferDecoder.allMimeTypes(CharBufferDecoder.DEFAULT_DELIMITERS, false); - Flux input = buffers( - "\r\nabc\n", - "def", - "ghi\r\n\n", - "jkl", - "mno\npqr\n", - "stu", - "vw", - "xyz"); + Flux input = Flux.just( + stringBuffer("\r\nabc\n"), + stringBuffer("def"), + stringBuffer("ghi\r\n\n"), + stringBuffer("jkl"), + stringBuffer("mno\npqr\n"), + stringBuffer("stu"), + stringBuffer("vw"), + stringBuffer("xyz") + ); testDecode(input, CharBuffer.class, step -> step .expectNext(charBuffer("\r\n")) @@ -220,9 +219,9 @@ class CharBufferDecoderTests extends AbstractDecoderTests { @Test void decodeEmptyDataBuffer() { - Flux input = buffers(""); - - Flux output = this.decoder.decode(input, TYPE, null, Collections.emptyMap()); + Flux input = Flux.just(stringBuffer("")); + Flux output = this.decoder.decode(input, + TYPE, null, Collections.emptyMap()); StepVerifier.create(output) .expectNext(charBuffer("")) @@ -232,10 +231,10 @@ class CharBufferDecoderTests extends AbstractDecoderTests { @Override @Test public void decodeToMono() { - Flux input = buffers( - "foo", - "bar", - "baz"); + Flux input = Flux.just( + stringBuffer("foo"), + stringBuffer("bar"), + stringBuffer("baz")); testDecodeToMonoAll(input, CharBuffer.class, step -> step .expectNext(charBuffer("foobarbaz")) @@ -245,24 +244,17 @@ class CharBufferDecoderTests extends AbstractDecoderTests { @Test void decodeToMonoWithEmptyFlux() { - Flux input = buffers(); + Flux input = Flux.empty(); testDecodeToMono(input, String.class, step -> step .expectComplete() .verify()); } - private Flux buffers(String... value) { - return Flux.just(value).map(this::buffer); - } - - private DataBuffer buffer(String value) { - return buffer(value.getBytes(UTF_8)); - } - - private DataBuffer buffer(byte[] value) { - DataBuffer buffer = this.bufferFactory.allocateBuffer(value.length); - buffer.write(value); + private DataBuffer stringBuffer(String value) { + byte[] bytes = value.getBytes(StandardCharsets.UTF_8); + DataBuffer buffer = this.bufferFactory.allocateBuffer(bytes.length); + buffer.write(bytes); return buffer; }