diff --git a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JacksonJsonEncoder.java b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JacksonJsonEncoder.java index fc08cfd8786..81e317b1141 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JacksonJsonEncoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/JacksonJsonEncoder.java @@ -73,8 +73,8 @@ public class JacksonJsonEncoder extends AbstractEncoder { else { // array Mono startArray = Mono.just(allocator.wrap(START_ARRAY_BUFFER)); - Flux arraySeparators = - Flux.create(sub -> sub.onNext(allocator.wrap(SEPARATOR_BUFFER))); + Flux arraySeparators = Mono.just(allocator.wrap(SEPARATOR_BUFFER)) + .repeat(); Mono endArray = Mono.just(allocator.wrap(END_ARRAY_BUFFER)); Flux serializedObjects = diff --git a/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/support/DataBufferUtils.java b/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/support/DataBufferUtils.java index 28ce6781c09..2b0293f8eb2 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/support/DataBufferUtils.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/support/DataBufferUtils.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiFunction; import java.util.function.Consumer; import org.reactivestreams.Publisher; @@ -29,7 +30,7 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSource; -import reactor.core.subscriber.SubscriberWithContext; +import reactor.core.publisher.GenerateOutput; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferAllocator; @@ -83,8 +84,9 @@ public abstract class DataBufferUtils { Assert.notNull(channel, "'channel' must not be null"); Assert.notNull(allocator, "'allocator' must not be null"); - return Flux.create(new ReadableByteChannelConsumer(allocator, bufferSize), - subscriber -> channel, CLOSE_CONSUMER); + return Flux.generate(() -> channel, + new ReadableByteChannelGenerator(allocator, bufferSize), + CLOSE_CONSUMER); } /** @@ -172,24 +174,25 @@ public abstract class DataBufferUtils { } } - private static class ReadableByteChannelConsumer - implements Consumer> { + private static class ReadableByteChannelGenerator + implements BiFunction, + ReadableByteChannel> { private final DataBufferAllocator allocator; private final int chunkSize; - public ReadableByteChannelConsumer(DataBufferAllocator allocator, int chunkSize) { + public ReadableByteChannelGenerator(DataBufferAllocator allocator, int chunkSize) { this.allocator = allocator; this.chunkSize = chunkSize; } @Override - public void accept(SubscriberWithContext sub) { + public ReadableByteChannel apply(ReadableByteChannel + channel, GenerateOutput sub) { try { ByteBuffer byteBuffer = ByteBuffer.allocate(chunkSize); int read; - ReadableByteChannel channel = sub.context(); if ((read = channel.read(byteBuffer)) > 0) { byteBuffer.flip(); boolean release = true; @@ -212,6 +215,7 @@ public abstract class DataBufferUtils { catch (IOException ex) { sub.onError(ex); } + return channel; } } diff --git a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/ChannelSendOperatorTests.java b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/ChannelSendOperatorTests.java index 70fdc8b20f6..d5ce4dc018f 100644 --- a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/ChannelSendOperatorTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/ChannelSendOperatorTests.java @@ -108,13 +108,14 @@ public class ChannelSendOperatorTests { @Test public void errorAfterMultipleItems() throws Exception { IllegalStateException error = new IllegalStateException("boo"); - Flux publisher = Flux.create(subscriber -> { - int i = subscriber.context().incrementAndGet(); + Flux publisher = Flux.generate(() -> 0, (idx , subscriber) -> { + int i = ++idx; subscriber.onNext(String.valueOf(i)); if (i == 3) { subscriber.onError(error); } - }, subscriber -> new AtomicInteger()); + return i; + }); Mono completion = publisher.as(this::sendOperator); Signal signal = completion.materialize().get();