|
|
|
|
@ -91,10 +91,10 @@ class PayloadInterceptorRSocket extends RSocketProxy {
@@ -91,10 +91,10 @@ class PayloadInterceptorRSocket extends RSocketProxy {
|
|
|
|
|
public Flux<Payload> requestChannel(Publisher<Payload> payloads) { |
|
|
|
|
return Flux.from(payloads).switchOnFirst((signal, innerFlux) -> { |
|
|
|
|
Payload firstPayload = signal.get(); |
|
|
|
|
return intercept(PayloadExchangeType.REQUEST_CHANNEL, firstPayload).flatMapMany((context) -> innerFlux |
|
|
|
|
.index().concatMap((tuple) -> justOrIntercept(tuple.getT1(), tuple.getT2())) |
|
|
|
|
.transform((securedPayloads) -> this.source.requestChannel(securedPayloads)) |
|
|
|
|
.subscriberContext(context)); |
|
|
|
|
return intercept(PayloadExchangeType.REQUEST_CHANNEL, firstPayload).flatMapMany( |
|
|
|
|
(context) -> innerFlux.index().concatMap((tuple) -> justOrIntercept(tuple.getT1(), tuple.getT2())) |
|
|
|
|
.transform((securedPayloads) -> this.source.requestChannel(securedPayloads)) |
|
|
|
|
.subscriberContext(context)); |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|