|
|
|
|
@ -16,48 +16,94 @@
@@ -16,48 +16,94 @@
|
|
|
|
|
|
|
|
|
|
package org.springframework.reactive.codec.encoder; |
|
|
|
|
|
|
|
|
|
import java.nio.ByteBuffer; |
|
|
|
|
|
|
|
|
|
import org.reactivestreams.Publisher; |
|
|
|
|
import reactor.rx.Promise; |
|
|
|
|
import rx.Observable; |
|
|
|
|
import rx.RxReactiveStreams; |
|
|
|
|
|
|
|
|
|
import org.reactivestreams.Subscriber; |
|
|
|
|
import org.springframework.core.ResolvableType; |
|
|
|
|
import org.springframework.http.MediaType; |
|
|
|
|
import org.springframework.reactive.codec.decoder.JsonObjectDecoder; |
|
|
|
|
import org.springframework.util.ClassUtils; |
|
|
|
|
|
|
|
|
|
import reactor.core.subscriber.SubscriberBarrier; |
|
|
|
|
import reactor.io.buffer.Buffer; |
|
|
|
|
import reactor.rx.Promise; |
|
|
|
|
import rx.Observable; |
|
|
|
|
|
|
|
|
|
import java.nio.ByteBuffer; |
|
|
|
|
import java.util.Arrays; |
|
|
|
|
|
|
|
|
|
import static reactor.Publishers.*; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Encode a bye stream of individual JSON element to a byte stream representing a single |
|
|
|
|
* JSON array when {@code Hints.ENCODE_AS_ARRAY} is enabled. |
|
|
|
|
* |
|
|
|
|
* @author Sebastien Deleuze |
|
|
|
|
* @author Stephane Maldini |
|
|
|
|
* |
|
|
|
|
* @see JsonObjectDecoder |
|
|
|
|
*/ |
|
|
|
|
public class JsonObjectEncoder implements MessageToByteEncoder<ByteBuffer> { |
|
|
|
|
|
|
|
|
|
private final ByteBuffer START_ARRAY = ByteBuffer.wrap("[".getBytes()); |
|
|
|
|
private static final boolean rxJava1Present = |
|
|
|
|
ClassUtils.isPresent("rx.Observable", JsonObjectEncoder.class.getClassLoader()); |
|
|
|
|
|
|
|
|
|
private static final boolean reactorPresent = |
|
|
|
|
ClassUtils.isPresent("reactor.rx.Promise", JsonObjectEncoder.class.getClassLoader()); |
|
|
|
|
|
|
|
|
|
private final ByteBuffer END_ARRAY = ByteBuffer.wrap("]".getBytes()); |
|
|
|
|
final ByteBuffer START_ARRAY = ByteBuffer.wrap("[".getBytes()); |
|
|
|
|
|
|
|
|
|
private final ByteBuffer COMMA = ByteBuffer.wrap(",".getBytes()); |
|
|
|
|
final ByteBuffer END_ARRAY = ByteBuffer.wrap("]".getBytes()); |
|
|
|
|
|
|
|
|
|
final ByteBuffer COMMA = ByteBuffer.wrap(",".getBytes()); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public boolean canEncode(ResolvableType type, MediaType mediaType, Object... hints) { |
|
|
|
|
return mediaType.isCompatibleWith(MediaType.APPLICATION_JSON) && !Promise.class.isAssignableFrom(type.getRawClass()) && |
|
|
|
|
(Observable.class.isAssignableFrom(type.getRawClass()) || Publisher.class.isAssignableFrom(type.getRawClass())); |
|
|
|
|
return mediaType.isCompatibleWith(MediaType.APPLICATION_JSON) && |
|
|
|
|
!(reactorPresent && Promise.class.isAssignableFrom(type.getRawClass())) && |
|
|
|
|
(rxJava1Present && Observable.class.isAssignableFrom(type.getRawClass()) |
|
|
|
|
|| Publisher.class.isAssignableFrom(type.getRawClass())); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public Publisher<ByteBuffer> encode(Publisher<? extends ByteBuffer> messageStream, ResolvableType type, MediaType mediaType, Object... hints) { |
|
|
|
|
// TODO We use RxJava Observable because there is no skipLast() operator in Reactor
|
|
|
|
|
// TODO Merge some chunks, there is no need to have chunks with only '[', ']' or ',' characters
|
|
|
|
|
return RxReactiveStreams.toPublisher( |
|
|
|
|
Observable.concat( |
|
|
|
|
Observable.just(START_ARRAY), |
|
|
|
|
RxReactiveStreams.toObservable(messageStream).flatMap(b -> Observable.just(b, COMMA)).skipLast(1), |
|
|
|
|
Observable.just(END_ARRAY))); |
|
|
|
|
public Publisher<ByteBuffer> encode(Publisher<? extends ByteBuffer> messageStream, ResolvableType type, MediaType |
|
|
|
|
mediaType, Object... hints) { |
|
|
|
|
//TODO Merge some chunks, there is no need to have chunks with only '[', ']' or ',' characters
|
|
|
|
|
return |
|
|
|
|
concat( |
|
|
|
|
from( |
|
|
|
|
Arrays.<Publisher<ByteBuffer>>asList( |
|
|
|
|
just(START_ARRAY), |
|
|
|
|
lift( |
|
|
|
|
flatMap(messageStream, (ByteBuffer b) -> from(Arrays.asList(b, COMMA))), |
|
|
|
|
sub -> new SkipLastBarrier(sub) |
|
|
|
|
), |
|
|
|
|
just(END_ARRAY) |
|
|
|
|
) |
|
|
|
|
) |
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private static class SkipLastBarrier extends SubscriberBarrier<ByteBuffer, ByteBuffer> { |
|
|
|
|
|
|
|
|
|
public SkipLastBarrier(Subscriber<? super ByteBuffer> subscriber) { |
|
|
|
|
super(subscriber); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ByteBuffer prev = null; |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
protected void doNext(ByteBuffer next) { |
|
|
|
|
if (prev == null) { |
|
|
|
|
prev = next; |
|
|
|
|
doRequest(1); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ByteBuffer tmp = prev; |
|
|
|
|
prev = next; |
|
|
|
|
subscriber.onNext(tmp); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|