@ -19,8 +19,10 @@ package org.springframework.web.reactive.socket.client;
@@ -19,8 +19,10 @@ package org.springframework.web.reactive.socket.client;
import java.net.URI ;
import org.eclipse.jetty.websocket.api.Session ;
import org.eclipse.jetty.websocket.api.UpgradeRequest ;
import org.eclipse.jetty.websocket.api.UpgradeResponse ;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest ;
import org.eclipse.jetty.websocket.client.io.UpgradeListener ;
import reactor.core.publisher.Mono ;
import reactor.core.publisher.MonoProcessor ;
@ -118,9 +120,11 @@ public class JettyWebSocketClient extends WebSocketClientSupport implements WebS
@@ -118,9 +120,11 @@ public class JettyWebSocketClient extends WebSocketClientSupport implements WebS
return Mono . fromCallable (
( ) - > {
String [ ] protocols = beforeHandshake ( url , headers , handler ) ;
ClientUpgradeRequest upgradeRequest = createRequest ( headers , protocols ) ;
ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest ( ) ;
upgradeRequest . setSubProtocols ( protocols ) ;
Object jettyHandler = createJettyHandler ( url , handler , completionMono ) ;
return this . jettyClient . connect ( jettyHandler , url , upgradeRequest ) ;
UpgradeListener upgradeListener = new DefaultUpgradeListener ( headers ) ;
return this . jettyClient . connect ( jettyHandler , url , upgradeRequest , upgradeListener ) ;
} )
. then ( completionMono ) ;
}
@ -138,11 +142,24 @@ public class JettyWebSocketClient extends WebSocketClientSupport implements WebS
@@ -138,11 +142,24 @@ public class JettyWebSocketClient extends WebSocketClientSupport implements WebS
return new JettyWebSocketSession ( session , info , this . bufferFactory , completion ) ;
}
private ClientUpgradeRequest createRequest ( HttpHeaders headers , String [ ] protocols ) {
ClientUpgradeRequest request = new ClientUpgradeRequest ( ) ;
request . setSubProtocols ( protocols ) ;
headers . forEach ( request : : setHeader ) ;
return request ;
private static class DefaultUpgradeListener implements UpgradeListener {
private final HttpHeaders headers ;
public DefaultUpgradeListener ( HttpHeaders headers ) {
this . headers = headers ;
}
@Override
public void onHandshakeRequest ( UpgradeRequest request ) {
this . headers . forEach ( request : : setHeader ) ;
}
@Override
public void onHandshakeResponse ( UpgradeResponse response ) {
}
}
}