Browse Source

Synchronize Reactor Flux#create / Flux#generate changes

pull/1111/head
Stephane Maldini 10 years ago
parent
commit
04f47da15e
  1. 4
      spring-web-reactive/src/main/java/org/springframework/core/codec/support/JacksonJsonEncoder.java
  2. 20
      spring-web-reactive/src/main/java/org/springframework/core/io/buffer/support/DataBufferUtils.java
  3. 7
      spring-web-reactive/src/test/java/org/springframework/http/server/reactive/ChannelSendOperatorTests.java

4
spring-web-reactive/src/main/java/org/springframework/core/codec/support/JacksonJsonEncoder.java

@ -73,8 +73,8 @@ public class JacksonJsonEncoder extends AbstractEncoder<Object> { @@ -73,8 +73,8 @@ public class JacksonJsonEncoder extends AbstractEncoder<Object> {
else {
// array
Mono<DataBuffer> startArray = Mono.just(allocator.wrap(START_ARRAY_BUFFER));
Flux<DataBuffer> arraySeparators =
Flux.create(sub -> sub.onNext(allocator.wrap(SEPARATOR_BUFFER)));
Flux<DataBuffer> arraySeparators = Mono.just(allocator.wrap(SEPARATOR_BUFFER))
.repeat();
Mono<DataBuffer> endArray = Mono.just(allocator.wrap(END_ARRAY_BUFFER));
Flux<DataBuffer> serializedObjects =

20
spring-web-reactive/src/main/java/org/springframework/core/io/buffer/support/DataBufferUtils.java

@ -22,6 +22,7 @@ import java.nio.ByteBuffer; @@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
@ -29,7 +30,7 @@ import org.reactivestreams.Subscriber; @@ -29,7 +30,7 @@ import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSource;
import reactor.core.subscriber.SubscriberWithContext;
import reactor.core.publisher.GenerateOutput;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
@ -83,8 +84,9 @@ public abstract class DataBufferUtils { @@ -83,8 +84,9 @@ public abstract class DataBufferUtils {
Assert.notNull(channel, "'channel' must not be null");
Assert.notNull(allocator, "'allocator' must not be null");
return Flux.create(new ReadableByteChannelConsumer(allocator, bufferSize),
subscriber -> channel, CLOSE_CONSUMER);
return Flux.generate(() -> channel,
new ReadableByteChannelGenerator(allocator, bufferSize),
CLOSE_CONSUMER);
}
/**
@ -172,24 +174,25 @@ public abstract class DataBufferUtils { @@ -172,24 +174,25 @@ public abstract class DataBufferUtils {
}
}
private static class ReadableByteChannelConsumer
implements Consumer<SubscriberWithContext<DataBuffer, ReadableByteChannel>> {
private static class ReadableByteChannelGenerator
implements BiFunction<ReadableByteChannel, GenerateOutput<DataBuffer>,
ReadableByteChannel> {
private final DataBufferAllocator allocator;
private final int chunkSize;
public ReadableByteChannelConsumer(DataBufferAllocator allocator, int chunkSize) {
public ReadableByteChannelGenerator(DataBufferAllocator allocator, int chunkSize) {
this.allocator = allocator;
this.chunkSize = chunkSize;
}
@Override
public void accept(SubscriberWithContext<DataBuffer, ReadableByteChannel> sub) {
public ReadableByteChannel apply(ReadableByteChannel
channel, GenerateOutput<DataBuffer> sub) {
try {
ByteBuffer byteBuffer = ByteBuffer.allocate(chunkSize);
int read;
ReadableByteChannel channel = sub.context();
if ((read = channel.read(byteBuffer)) > 0) {
byteBuffer.flip();
boolean release = true;
@ -212,6 +215,7 @@ public abstract class DataBufferUtils { @@ -212,6 +215,7 @@ public abstract class DataBufferUtils {
catch (IOException ex) {
sub.onError(ex);
}
return channel;
}
}

7
spring-web-reactive/src/test/java/org/springframework/http/server/reactive/ChannelSendOperatorTests.java

@ -108,13 +108,14 @@ public class ChannelSendOperatorTests { @@ -108,13 +108,14 @@ public class ChannelSendOperatorTests {
@Test
public void errorAfterMultipleItems() throws Exception {
IllegalStateException error = new IllegalStateException("boo");
Flux<String> publisher = Flux.create(subscriber -> {
int i = subscriber.context().incrementAndGet();
Flux<String> publisher = Flux.generate(() -> 0, (idx , subscriber) -> {
int i = ++idx;
subscriber.onNext(String.valueOf(i));
if (i == 3) {
subscriber.onError(error);
}
}, subscriber -> new AtomicInteger());
return i;
});
Mono<Void> completion = publisher.as(this::sendOperator);
Signal<Void> signal = completion.materialize().get();

Loading…
Cancel
Save