diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/codec/encoder/JsonObjectEncoder.java b/spring-web-reactive/src/main/java/org/springframework/reactive/codec/encoder/JsonObjectEncoder.java index e162dd9ed3d..dd324a3c6f2 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/codec/encoder/JsonObjectEncoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/codec/encoder/JsonObjectEncoder.java @@ -16,48 +16,94 @@ package org.springframework.reactive.codec.encoder; -import java.nio.ByteBuffer; - import org.reactivestreams.Publisher; -import reactor.rx.Promise; -import rx.Observable; -import rx.RxReactiveStreams; - +import org.reactivestreams.Subscriber; import org.springframework.core.ResolvableType; import org.springframework.http.MediaType; import org.springframework.reactive.codec.decoder.JsonObjectDecoder; +import org.springframework.util.ClassUtils; + +import reactor.core.subscriber.SubscriberBarrier; +import reactor.io.buffer.Buffer; +import reactor.rx.Promise; +import rx.Observable; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +import static reactor.Publishers.*; /** * Encode a bye stream of individual JSON element to a byte stream representing a single * JSON array when {@code Hints.ENCODE_AS_ARRAY} is enabled. * * @author Sebastien Deleuze + * @author Stephane Maldini + * * @see JsonObjectDecoder */ public class JsonObjectEncoder implements MessageToByteEncoder { - private final ByteBuffer START_ARRAY = ByteBuffer.wrap("[".getBytes()); + private static final boolean rxJava1Present = + ClassUtils.isPresent("rx.Observable", JsonObjectEncoder.class.getClassLoader()); + + private static final boolean reactorPresent = + ClassUtils.isPresent("reactor.rx.Promise", JsonObjectEncoder.class.getClassLoader()); - private final ByteBuffer END_ARRAY = ByteBuffer.wrap("]".getBytes()); + final ByteBuffer START_ARRAY = ByteBuffer.wrap("[".getBytes()); - private final ByteBuffer COMMA = ByteBuffer.wrap(",".getBytes()); + final ByteBuffer END_ARRAY = ByteBuffer.wrap("]".getBytes()); + + final ByteBuffer COMMA = ByteBuffer.wrap(",".getBytes()); @Override public boolean canEncode(ResolvableType type, MediaType mediaType, Object... hints) { - return mediaType.isCompatibleWith(MediaType.APPLICATION_JSON) && !Promise.class.isAssignableFrom(type.getRawClass()) && - (Observable.class.isAssignableFrom(type.getRawClass()) || Publisher.class.isAssignableFrom(type.getRawClass())); + return mediaType.isCompatibleWith(MediaType.APPLICATION_JSON) && + !(reactorPresent && Promise.class.isAssignableFrom(type.getRawClass())) && + (rxJava1Present && Observable.class.isAssignableFrom(type.getRawClass()) + || Publisher.class.isAssignableFrom(type.getRawClass())); } @Override - public Publisher encode(Publisher messageStream, ResolvableType type, MediaType mediaType, Object... hints) { - // TODO We use RxJava Observable because there is no skipLast() operator in Reactor - // TODO Merge some chunks, there is no need to have chunks with only '[', ']' or ',' characters - return RxReactiveStreams.toPublisher( - Observable.concat( - Observable.just(START_ARRAY), - RxReactiveStreams.toObservable(messageStream).flatMap(b -> Observable.just(b, COMMA)).skipLast(1), - Observable.just(END_ARRAY))); + public Publisher encode(Publisher messageStream, ResolvableType type, MediaType + mediaType, Object... hints) { + //TODO Merge some chunks, there is no need to have chunks with only '[', ']' or ',' characters + return + concat( + from( + Arrays.>asList( + just(START_ARRAY), + lift( + flatMap(messageStream, (ByteBuffer b) -> from(Arrays.asList(b, COMMA))), + sub -> new SkipLastBarrier(sub) + ), + just(END_ARRAY) + ) + ) + ); } + private static class SkipLastBarrier extends SubscriberBarrier { + + public SkipLastBarrier(Subscriber subscriber) { + super(subscriber); + } + + ByteBuffer prev = null; + + @Override + protected void doNext(ByteBuffer next) { + if (prev == null) { + prev = next; + doRequest(1); + return; + } + + ByteBuffer tmp = prev; + prev = next; + subscriber.onNext(tmp); + } + + } }