From a6c56925866ea3a4ae2518a1290e1ccf87d7139c Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Wed, 5 Jul 2023 15:20:34 +0200 Subject: [PATCH] Make OutputStreamPublisher more generic This commit improves the OutputStreamPublisher so that it is capable of publishing other types that ByteBuffers. --- .../http/client/JdkClientHttpRequest.java | 24 +++++ .../http/client/OutputStreamPublisher.java | 90 ++++++++++++------- .../client/OutputStreamPublisherTests.java | 43 ++++++--- 3 files changed, 113 insertions(+), 44 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/client/JdkClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/JdkClientHttpRequest.java index 3743bb294aa..5fb3257de23 100644 --- a/spring-web/src/main/java/org/springframework/http/client/JdkClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/JdkClientHttpRequest.java @@ -47,6 +47,8 @@ import org.springframework.util.StringUtils; */ class JdkClientHttpRequest extends AbstractStreamingClientHttpRequest { + private static final OutputStreamPublisher.ByteMapper BYTE_MAPPER = new ByteBufferMapper(); + private static final Set DISALLOWED_HEADERS = disallowedHeaders(); /** @@ -142,6 +144,7 @@ class JdkClientHttpRequest extends AbstractStreamingClientHttpRequest { if (body != null) { Flow.Publisher outputStreamPublisher = OutputStreamPublisher.create( outputStream -> body.writeTo(StreamUtils.nonClosing(outputStream)), + BYTE_MAPPER, this.executor); long contentLength = headers.getContentLength(); @@ -157,4 +160,25 @@ class JdkClientHttpRequest extends AbstractStreamingClientHttpRequest { } } + + private static final class ByteBufferMapper implements OutputStreamPublisher.ByteMapper { + + @Override + public ByteBuffer map(int b) { + ByteBuffer byteBuffer = ByteBuffer.allocate(1); + byteBuffer.put((byte) b); + byteBuffer.flip(); + return byteBuffer; + } + + @Override + public ByteBuffer map(byte[] b, int off, int len) { + ByteBuffer byteBuffer = ByteBuffer.allocate(len); + byteBuffer.put(b, off, len); + byteBuffer.flip(); + return byteBuffer; + } + + } + } diff --git a/spring-web/src/main/java/org/springframework/http/client/OutputStreamPublisher.java b/spring-web/src/main/java/org/springframework/http/client/OutputStreamPublisher.java index 73652a632e6..5869de45d60 100644 --- a/spring-web/src/main/java/org/springframework/http/client/OutputStreamPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/client/OutputStreamPublisher.java @@ -19,7 +19,6 @@ package org.springframework.http.client; import java.io.BufferedOutputStream; import java.io.IOException; import java.io.OutputStream; -import java.nio.ByteBuffer; import java.util.Objects; import java.util.concurrent.Executor; import java.util.concurrent.Flow; @@ -32,37 +31,42 @@ import org.springframework.util.Assert; /** * Bridges between {@link OutputStream} and - * {@link Flow.Publisher Flow.Publisher<ByteBuffer>}. - * + * {@link Flow.Publisher Flow.Publisher<T>}. + * @author Oleh Dokuka * @author Arjen Poutsma * @since 6.1 - * @see #create(OutputStreamHandler, Executor) + * @param the published item type + * @see #create(OutputStreamHandler, ByteMapper, Executor) */ -final class OutputStreamPublisher implements Flow.Publisher { +final class OutputStreamPublisher implements Flow.Publisher { private final OutputStreamHandler outputStreamHandler; + private final ByteMapper byteMapper; + private final Executor executor; - private OutputStreamPublisher(OutputStreamHandler outputStreamHandler, Executor executor) { + private OutputStreamPublisher(OutputStreamHandler outputStreamHandler, ByteMapper byteMapper, Executor executor) { this.outputStreamHandler = outputStreamHandler; + this.byteMapper = byteMapper; this.executor = executor; } /** - * Creates a new {@code Publisher} based on bytes written to a - * {@code OutputStream}. + * Creates a new {@code Publisher} based on bytes written to a + * {@code OutputStream}. The parameter {@code byteMapper} is used to map + * from written bytes to the published type. *
    *
  • The parameter {@code outputStreamHandler} is invoked once per * subscription of the returned {@code Publisher}, when the first - * {@code ByteBuffer} is + * item is * {@linkplain Flow.Subscription#request(long) requested}.
  • *
  • Each {@link OutputStream#write(byte[], int, int) OutputStream.write()} * invocation that {@code outputStreamHandler} makes will result in a - * {@linkplain Flow.Subscriber#onNext(Object) published} {@code ByteBuffer} + * {@linkplain Flow.Subscriber#onNext(Object) published} item * if there is {@linkplain Flow.Subscription#request(long) demand}.
  • *
  • If there is no demand, {@code OutputStream.write()} will block * until there is.
  • @@ -74,23 +78,28 @@ final class OutputStreamPublisher implements Flow.Publisher { * be dispatched to the {@linkplain Flow.Subscriber#onError(Throwable) Subscriber}. *
* @param outputStreamHandler invoked when the first buffer is requested + * @param byteMapper maps written bytes to {@code T} * @param executor used to invoke the {@code outputStreamHandler} - * @return a {@code Publisher} based on bytes written by - * {@code outputStreamHandler} + * @param the publisher type + * @return a {@code Publisher} based on bytes written by + * {@code outputStreamHandler} mapped by {@code byteMapper} */ - public static Flow.Publisher create(OutputStreamHandler outputStreamHandler, Executor executor) { + public static Flow.Publisher create(OutputStreamHandler outputStreamHandler, ByteMapper byteMapper, + Executor executor) { + Assert.notNull(outputStreamHandler, "OutputStreamHandler must not be null"); + Assert.notNull(byteMapper, "ByteMapper must not be null"); Assert.notNull(executor, "Executor must not be null"); - return new OutputStreamPublisher(outputStreamHandler, executor); + return new OutputStreamPublisher<>(outputStreamHandler, byteMapper, executor); } - @Override - public void subscribe(Flow.Subscriber subscriber) { + public void subscribe(Flow.Subscriber subscriber) { Objects.requireNonNull(subscriber, "Subscriber must not be null"); - OutputStreamSubscription subscription = new OutputStreamSubscription(subscriber, this.outputStreamHandler); + OutputStreamSubscription subscription = new OutputStreamSubscription<>(subscriber, this.outputStreamHandler, + this.byteMapper); subscriber.onSubscribe(subscription); this.executor.execute(subscription::invokeHandler); } @@ -109,7 +118,8 @@ final class OutputStreamPublisher implements Flow.Publisher { *
  • If the linked subscription has * {@linkplain Flow.Subscription#request(long) demand}, any * {@linkplain OutputStream#write(byte[], int, int) written} bytes - * will be {@linkplain Flow.Subscriber#onNext(Object) published} to the + * will be {@linkplain ByteMapper#map(byte[], int, int) mapped} + * and {@linkplain Flow.Subscriber#onNext(Object) published} to the * {@link Flow.Subscriber Subscriber}.
  • *
  • If there is no demand, any * {@link OutputStream#write(byte[], int, int) write()} invocations will @@ -128,14 +138,37 @@ final class OutputStreamPublisher implements Flow.Publisher { } - private static final class OutputStreamSubscription extends OutputStream implements Flow.Subscription { + /** + * Maps bytes written to in {@link OutputStreamHandler#handle(OutputStream)} + * to published items. + * @param the type to map to + */ + public interface ByteMapper { + + /** + * Maps a single byte to {@code T}. + */ + T map(int b); + + /** + * Maps a byte array to {@code T}. + */ + T map(byte[] b, int off, int len); + + } + + + private static final class OutputStreamSubscription extends OutputStream implements Flow.Subscription { static final Object READY = new Object(); - private final Flow.Subscriber actual; + private final Flow.Subscriber actual; + private final OutputStreamHandler outputStreamHandler; + private final ByteMapper byteMapper; + private final AtomicLong requested = new AtomicLong(); private final AtomicReference parkedThreadAtomic = new AtomicReference<>(); @@ -146,9 +179,10 @@ final class OutputStreamPublisher implements Flow.Publisher { private long produced; - public OutputStreamSubscription(Flow.Subscriber actual, - OutputStreamHandler outputStreamHandler) { + public OutputStreamSubscription(Flow.Subscriber actual, OutputStreamHandler outputStreamHandler, + ByteMapper byteMapper) { this.actual = actual; + this.byteMapper = byteMapper; this.outputStreamHandler = outputStreamHandler; } @@ -156,11 +190,9 @@ final class OutputStreamPublisher implements Flow.Publisher { public void write(int b) throws IOException { checkDemandAndAwaitIfNeeded(); - ByteBuffer byteBuffer = ByteBuffer.allocate(1); - byteBuffer.put((byte) b); - byteBuffer.flip(); + T next = this.byteMapper.map(b); - this.actual.onNext(byteBuffer); + this.actual.onNext(next); this.produced++; } @@ -174,11 +206,9 @@ final class OutputStreamPublisher implements Flow.Publisher { public void write(byte[] b, int off, int len) throws IOException { checkDemandAndAwaitIfNeeded(); - ByteBuffer byteBuffer = ByteBuffer.allocate(len); - byteBuffer.put(b, off, len); - byteBuffer.flip(); + T next = this.byteMapper.map(b, off, len); - this.actual.onNext(byteBuffer); + this.actual.onNext(next); this.produced++; } diff --git a/spring-web/src/test/java/org/springframework/http/client/OutputStreamPublisherTests.java b/spring-web/src/test/java/org/springframework/http/client/OutputStreamPublisherTests.java index fc792ec55cd..574ba987f0d 100644 --- a/spring-web/src/test/java/org/springframework/http/client/OutputStreamPublisherTests.java +++ b/spring-web/src/test/java/org/springframework/http/client/OutputStreamPublisherTests.java @@ -18,7 +18,6 @@ package org.springframework.http.client; import java.io.OutputStreamWriter; import java.io.Writer; -import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; @@ -41,15 +40,31 @@ class OutputStreamPublisherTests { private final Executor executor = Executors.newSingleThreadExecutor(); + private final OutputStreamPublisher.ByteMapper byteMapper = + new OutputStreamPublisher.ByteMapper<>() { + @Override + public byte[] map(int b) { + return new byte[]{(byte) b}; + } + + @Override + public byte[] map(byte[] b, int off, int len) { + byte[] result = new byte[len]; + System.arraycopy(b, off, result, 0, len); + return result; + } + }; + + @Test void basic() { - Flow.Publisher flowPublisher = OutputStreamPublisher.create(outputStream -> { + Flow.Publisher flowPublisher = OutputStreamPublisher.create(outputStream -> { try (Writer writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)) { writer.write("foo"); writer.write("bar"); writer.write("baz"); } - }, this.executor); + }, this.byteMapper, this.executor); Flux flux = toString(flowPublisher); StepVerifier.create(flux) @@ -59,7 +74,7 @@ class OutputStreamPublisherTests { @Test void flush() { - Flow.Publisher flowPublisher = OutputStreamPublisher.create(outputStream -> { + Flow.Publisher flowPublisher = OutputStreamPublisher.create(outputStream -> { try (Writer writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)) { writer.write("foo"); writer.flush(); @@ -68,7 +83,7 @@ class OutputStreamPublisherTests { writer.write("baz"); writer.flush(); } - }, this.executor); + }, this.byteMapper, this.executor); Flux flux = toString(flowPublisher); StepVerifier.create(flux) @@ -82,7 +97,7 @@ class OutputStreamPublisherTests { void cancel() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); - Flow.Publisher flowPublisher = OutputStreamPublisher.create(outputStream -> { + Flow.Publisher flowPublisher = OutputStreamPublisher.create(outputStream -> { try (Writer writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)) { assertThatIOException() .isThrownBy(() -> { @@ -94,7 +109,7 @@ class OutputStreamPublisherTests { .withMessage("Subscription has been terminated"); latch.countDown(); } - }, this.executor); + }, this.byteMapper, this.executor); Flux flux = toString(flowPublisher); StepVerifier.create(flux, 1) @@ -109,14 +124,14 @@ class OutputStreamPublisherTests { void closed() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); - Flow.Publisher flowPublisher = OutputStreamPublisher.create(outputStream -> { + Flow.Publisher flowPublisher = OutputStreamPublisher.create(outputStream -> { Writer writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8); writer.write("foo"); writer.close(); assertThatIOException().isThrownBy(() -> writer.write("bar")) .withMessage("Stream closed"); latch.countDown(); - }, this.executor); + }, this.byteMapper, this.executor); Flux flux = toString(flowPublisher); StepVerifier.create(flux) @@ -130,7 +145,7 @@ class OutputStreamPublisherTests { void negativeRequestN() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); - Flow.Publisher flowPublisher = OutputStreamPublisher.create(outputStream -> { + Flow.Publisher flowPublisher = OutputStreamPublisher.create(outputStream -> { try(Writer writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)) { writer.write("foo"); writer.flush(); @@ -140,7 +155,7 @@ class OutputStreamPublisherTests { finally { latch.countDown(); } - }, this.executor); + }, this.byteMapper, this.executor); Flow.Subscription[] subscriptions = new Flow.Subscription[1]; Flux flux = toString(a-> flowPublisher.subscribe(new Flow.Subscriber<>() { @Override @@ -150,7 +165,7 @@ class OutputStreamPublisherTests { } @Override - public void onNext(ByteBuffer item) { + public void onNext(byte[] item) { a.onNext(item); } @@ -174,9 +189,9 @@ class OutputStreamPublisherTests { latch.await(); } - private static Flux toString(Flow.Publisher flowPublisher) { + private static Flux toString(Flow.Publisher flowPublisher) { return Flux.from(FlowAdapters.toPublisher(flowPublisher)) - .map(bb -> StandardCharsets.UTF_8.decode(bb).toString()); + .map(bytes -> new String(bytes, StandardCharsets.UTF_8)); } }