@ -31,6 +31,7 @@ import io.reactivex.netty.protocol.http.ws.client.WebSocketRequest;
@@ -31,6 +31,7 @@ import io.reactivex.netty.protocol.http.ws.client.WebSocketRequest;
import io.reactivex.netty.protocol.http.ws.client.WebSocketResponse ;
import io.reactivex.netty.threads.RxEventLoopProvider ;
import reactor.core.publisher.Mono ;
import reactor.util.function.Tuple2 ;
import reactor.util.function.Tuples ;
import rx.Observable ;
import rx.RxReactiveStreams ;
@ -121,12 +122,15 @@ public class RxNettyWebSocketClient extends WebSocketClientSupport implements We
@@ -121,12 +122,15 @@ public class RxNettyWebSocketClient extends WebSocketClientSupport implements We
return Mono . from ( RxReactiveStreams . toPublisher ( completion ) ) ;
}
@SuppressWarnings ( "cast" )
private Observable < Void > executeInternal ( URI url , HttpHeaders headers , WebSocketHandler handler ) {
String [ ] protocols = beforeHandshake ( url , headers , handler ) ;
return createRequest ( url , headers , protocols )
. flatMap ( response - > {
Observable < WebSocketConnection > conn = response . getWebSocketConnection ( ) ;
return Observable . zip ( Observable . just ( response ) , conn , Tuples : : of ) ;
// following cast is necessary to enable compilation on Eclipse 4.6
return ( Observable < Tuple2 < WebSocketResponse < ByteBuf > , WebSocketConnection > > )
Observable . zip ( Observable . just ( response ) , conn , Tuples : : of ) ;
} )
. flatMap ( tuple - > {
WebSocketResponse < ByteBuf > response = tuple . getT1 ( ) ;