From 384e851bd1e2e348ea54fe057cf87349192a83a7 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 27 Dec 2016 13:44:39 -0500 Subject: [PATCH] Polish reactive WebSocketClient implementations --- .../socket/client/JettyWebSocketClient.java | 73 ++++++++---- .../client/ReactorNettyWebSocketClient.java | 40 ++++--- .../socket/client/RxNettyWebSocketClient.java | 107 +++++++++++------- .../client/StandardWebSocketClient.java | 27 +++-- .../client/UndertowWebSocketClient.java | 30 ++--- 5 files changed, 170 insertions(+), 107 deletions(-) diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java index 13775062e6b..1ac6a8b3743 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java @@ -18,7 +18,6 @@ package org.springframework.web.reactive.socket.client; import java.net.URI; -import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.UpgradeRequest; import org.eclipse.jetty.websocket.api.UpgradeResponse; import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; @@ -36,8 +35,14 @@ import org.springframework.web.reactive.socket.adapter.JettyWebSocketHandlerAdap import org.springframework.web.reactive.socket.adapter.JettyWebSocketSession; /** - * Jetty based implementation of {@link WebSocketClient}. - * + * A {@link WebSocketClient} implementation for use with Jetty + * {@link org.eclipse.jetty.websocket.client.WebSocketClient}. + * + *

Note: the Jetty {@code WebSocketClient} requires + * lifecycle management and must be started and stopped. This is automatically + * managed when this class is declared as a Spring bean and created with the + * default constructor. See constructor notes for more details. + * * @author Violeta Georgieva * @author Rossen Stoyanchev * @since 5.0 @@ -46,34 +51,60 @@ public class JettyWebSocketClient extends WebSocketClientSupport implements WebS private final org.eclipse.jetty.websocket.client.WebSocketClient jettyClient; - private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); + private final boolean externallyManaged; + + private boolean running = false; private final Object lifecycleMonitor = new Object(); + private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); + /** - * Default constructor that creates an instance of - * {@link org.eclipse.jetty.websocket.client.WebSocketClient}. + * Default constructor that creates and manages an instance of a Jetty + * {@link org.eclipse.jetty.websocket.client.WebSocketClient WebSocketClient}. + * The instance can be obtained with {@link #getJettyClient()} for further + * configuration. + * + *

Note: When this constructor is used {@link Lifecycle} + * methods of this class are delegated to the Jetty {@code WebSocketClient}. */ public JettyWebSocketClient() { - this(new org.eclipse.jetty.websocket.client.WebSocketClient()); + this.jettyClient = new org.eclipse.jetty.websocket.client.WebSocketClient(); + this.externallyManaged = false; } /** - * Constructor that accepts an existing - * {@link org.eclipse.jetty.websocket.client.WebSocketClient} instance. - * @param jettyClient a web socket client + * Constructor that accepts an existing instance of a Jetty + * {@link org.eclipse.jetty.websocket.client.WebSocketClient WebSocketClient}. + * + *

Note: Use of this constructor implies the Jetty + * {@code WebSocketClient} is externally managed and hence {@link Lifecycle} + * methods of this class are not delegated to it. */ public JettyWebSocketClient(org.eclipse.jetty.websocket.client.WebSocketClient jettyClient) { this.jettyClient = jettyClient; + this.externallyManaged = true; + } + + + /** + * Return the underlying Jetty {@code WebSocketClient}. + */ + public org.eclipse.jetty.websocket.client.WebSocketClient getJettyClient() { + return this.jettyClient; } @Override public void start() { + if (this.externallyManaged) { + return; + } synchronized (this.lifecycleMonitor) { if (!isRunning()) { try { + this.running = true; this.jettyClient.start(); } catch (Exception ex) { @@ -85,9 +116,13 @@ public class JettyWebSocketClient extends WebSocketClientSupport implements WebS @Override public void stop() { + if (this.externallyManaged) { + return; + } synchronized (this.lifecycleMonitor) { if (isRunning()) { try { + this.running = false; this.jettyClient.stop(); } catch (Exception ex) { @@ -100,7 +135,7 @@ public class JettyWebSocketClient extends WebSocketClientSupport implements WebS @Override public boolean isRunning() { synchronized (this.lifecycleMonitor) { - return this.jettyClient.isStarted(); + return this.running; } } @@ -131,15 +166,13 @@ public class JettyWebSocketClient extends WebSocketClientSupport implements WebS private Object createJettyHandler(URI url, WebSocketHandler handler, MonoProcessor completion) { return new JettyWebSocketHandlerAdapter(handler, - session -> createJettySession(url, completion, session)); - } - - private JettyWebSocketSession createJettySession(URI url, MonoProcessor completion, Session session) { - UpgradeResponse response = session.getUpgradeResponse(); - HttpHeaders responseHeaders = new HttpHeaders(); - response.getHeaders().forEach(responseHeaders::put); - HandshakeInfo info = afterHandshake(url, responseHeaders); - return new JettyWebSocketSession(session, info, this.bufferFactory, completion); + session -> { + UpgradeResponse response = session.getUpgradeResponse(); + HttpHeaders responseHeaders = new HttpHeaders(); + response.getHeaders().forEach(responseHeaders::put); + HandshakeInfo info = afterHandshake(url, responseHeaders); + return new JettyWebSocketSession(session, info, this.bufferFactory, completion); + }); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java index b08da13ff71..b386b5e380a 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java @@ -33,7 +33,7 @@ import org.springframework.web.reactive.socket.WebSocketSession; import org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSession; /** - * A Reactor Netty based implementation of {@link WebSocketClient}. + * {@link WebSocketClient} implementation for use with Reactor Netty. * * @author Rossen Stoyanchev * @since 5.0 @@ -43,15 +43,30 @@ public class ReactorNettyWebSocketClient extends WebSocketClientSupport implemen private final HttpClient httpClient; + /** + * Default constructor. + */ public ReactorNettyWebSocketClient() { - this.httpClient = HttpClient.create(); + this(options -> {}); } + /** + * Constructor that accepts an {@link HttpClientOptions} consumer to supply + * to {@link HttpClient#create(Consumer)}. + */ public ReactorNettyWebSocketClient(Consumer clientOptions) { this.httpClient = HttpClient.create(clientOptions); } + /** + * Return the configured {@link HttpClient}. + */ + public HttpClient getHttpClient() { + return this.httpClient; + } + + @Override public Mono execute(URI url, WebSocketHandler handler) { return execute(url, new HttpHeaders(), handler); @@ -63,18 +78,12 @@ public class ReactorNettyWebSocketClient extends WebSocketClientSupport implemen String[] protocols = beforeHandshake(url, headers, handler); // TODO: https://github.com/reactor/reactor-netty/issues/20 - return this.httpClient - .get(url.toString(), request -> { - addRequestHeaders(request, headers); - return request.sendWebsocket(); - }) + return getHttpClient() + .get(url.toString(), request -> addHeaders(request, headers).sendWebsocket()) .then(response -> { - HttpHeaders responseHeaders = getResponseHeaders(response); - HandshakeInfo info = afterHandshake(url, responseHeaders); - + HandshakeInfo info = afterHandshake(url, toHttpHeaders(response)); ByteBufAllocator allocator = response.channel().alloc(); NettyDataBufferFactory factory = new NettyDataBufferFactory(allocator); - return response.receiveWebsocket((in, out) -> { WebSocketSession session = new ReactorNettyWebSocketSession(in, out, info, factory); return handler.handle(session); @@ -82,13 +91,12 @@ public class ReactorNettyWebSocketClient extends WebSocketClientSupport implemen }); } - private void addRequestHeaders(HttpClientRequest request, HttpHeaders headers) { - headers.keySet().stream() - .forEach(key -> headers.get(key).stream() - .forEach(value -> request.addHeader(key, value))); + private HttpClientRequest addHeaders(HttpClientRequest request, HttpHeaders headers) { + headers.keySet().stream().forEach(key -> request.requestHeaders().set(key, headers.get(key))); + return request; } - private HttpHeaders getResponseHeaders(HttpClientResponse response) { + private HttpHeaders toHttpHeaders(HttpClientResponse response) { HttpHeaders headers = new HttpHeaders(); response.responseHeaders().forEach(entry -> { String name = entry.getKey(); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/RxNettyWebSocketClient.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/RxNettyWebSocketClient.java index a55b8f94e87..50b7fa401b5 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/RxNettyWebSocketClient.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/RxNettyWebSocketClient.java @@ -18,7 +18,6 @@ package org.springframework.web.reactive.socket.client; import java.net.URI; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -28,11 +27,12 @@ import javax.net.ssl.SSLEngine; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; -import io.reactivex.netty.protocol.http.HttpHandlerNames; import io.reactivex.netty.protocol.http.client.HttpClient; +import io.reactivex.netty.protocol.http.client.HttpClientRequest; import io.reactivex.netty.protocol.http.ws.WebSocketConnection; import io.reactivex.netty.protocol.http.ws.client.WebSocketRequest; import io.reactivex.netty.protocol.http.ws.client.WebSocketResponse; +import io.reactivex.netty.threads.RxEventLoopProvider; import reactor.core.publisher.Mono; import reactor.util.function.Tuples; import rx.Observable; @@ -45,49 +45,83 @@ import org.springframework.web.reactive.socket.HandshakeInfo; import org.springframework.web.reactive.socket.WebSocketHandler; import org.springframework.web.reactive.socket.adapter.RxNettyWebSocketSession; +import static io.reactivex.netty.protocol.http.HttpHandlerNames.WsClientDecoder; + /** - * An RxNetty based implementation of {@link WebSocketClient}. + * {@link WebSocketClient} implementation for use with RxNetty. + * + *

Note: RxNetty {@link HttpClient} instances require a host + * and port in order to be created. Hence it is not possible to configure a + * single {@code HttpClient} instance to use upfront. Instead the constructors + * accept a function for obtaining client instances when establishing a + * connection to a specific URI. By default new instances are created per + * connection with a shared Netty {@code EventLoopGroup}. See constructors for + * more details. * * @author Rossen Stoyanchev * @since 5.0 */ public class RxNettyWebSocketClient extends WebSocketClientSupport implements WebSocketClient { - private final Function> httpClientFactory; + private final Function> httpClientProvider; /** - * Default constructor that uses {@link HttpClient#newClient(String, int)} - * to create HTTP client instances when connecting. + * Default constructor that creates {@code HttpClient} instances via + * {@link HttpClient#newClient(String, int)} using port 80 or 443 depending + * on the target URL scheme. + * + *

Note: By default a new {@link HttpClient} instance + * is created per WebSocket connection. Those instances will share a global + * {@code EventLoopGroup} that RxNetty obtains via + * {@link RxEventLoopProvider#globalClientEventLoop(boolean)}. */ public RxNettyWebSocketClient() { - this(RxNettyWebSocketClient::createDefaultHttpClient); + this(RxNettyWebSocketClient::getDefaultHttpClientProvider); } /** - * Constructor with a function to create {@link HttpClient} instances. - * @param httpClientFactory factory to create clients + * Constructor with a function to use to obtain {@link HttpClient} instances. */ - public RxNettyWebSocketClient(Function> httpClientFactory) { - this.httpClientFactory = httpClientFactory; + public RxNettyWebSocketClient(Function> httpClientProvider) { + this.httpClientProvider = httpClientProvider; } - private static HttpClient createDefaultHttpClient(URI url) { + private static HttpClient getDefaultHttpClientProvider(URI url) { boolean secure = "wss".equals(url.getScheme()); int port = url.getPort() > 0 ? url.getPort() : secure ? 443 : 80; - HttpClient httpClient = HttpClient.newClient(url.getHost(), port); + HttpClient client = HttpClient.newClient(url.getHost(), port); if (secure) { try { SSLContext context = SSLContext.getDefault(); SSLEngine engine = context.createSSLEngine(url.getHost(), port); engine.setUseClientMode(true); - httpClient.secure(engine); + client.secure(engine); } catch (NoSuchAlgorithmException ex) { throw new IllegalStateException("Failed to create HttpClient for " + url, ex); } } - return httpClient; + return client; + } + + + /** + * Return the configured {@link HttpClient} provider depending on which + * constructor was used. + */ + public Function> getHttpClientProvider() { + return this.httpClientProvider; + } + + /** + * Return an {@link HttpClient} instance to use to connect to the given URI. + * The default implementation invokes the {@link #getHttpClientProvider()} + * provider} function created or supplied at construction time. + * @param url the full URL of the WebSocket endpoint. + */ + public HttpClient getHttpClient(URI url) { + return this.httpClientProvider.apply(url); } @@ -98,14 +132,12 @@ public class RxNettyWebSocketClient extends WebSocketClientSupport implements We @Override public Mono execute(URI url, HttpHeaders headers, WebSocketHandler handler) { - Observable completion = connectInternal(url, headers, handler); + Observable completion = executeInternal(url, headers, handler); return Mono.from(RxReactiveStreams.toPublisher(completion)); } - private Observable connectInternal(URI url, HttpHeaders headers, WebSocketHandler handler) { - + private Observable executeInternal(URI url, HttpHeaders headers, WebSocketHandler handler) { String[] protocols = beforeHandshake(url, headers, handler); - return createRequest(url, headers, protocols) .flatMap(response -> { Observable conn = response.getWebSocketConnection(); @@ -113,48 +145,35 @@ public class RxNettyWebSocketClient extends WebSocketClientSupport implements We }) .flatMap(tuple -> { WebSocketResponse response = tuple.getT1(); - HttpHeaders responseHeaders = getResponseHeaders(response); - HandshakeInfo info = afterHandshake(url, responseHeaders); + WebSocketConnection conn = tuple.getT2(); + HandshakeInfo info = afterHandshake(url, toHttpHeaders(response)); ByteBufAllocator allocator = response.unsafeNettyChannel().alloc(); NettyDataBufferFactory factory = new NettyDataBufferFactory(allocator); - - WebSocketConnection conn = tuple.getT2(); RxNettyWebSocketSession session = new RxNettyWebSocketSession(conn, info, factory); - String name = HttpHandlerNames.WsClientDecoder.getName(); - session.aggregateFrames(response.unsafeNettyChannel(), name); + session.aggregateFrames(response.unsafeNettyChannel(), WsClientDecoder.getName()); return RxReactiveStreams.toObservable(handler.handle(session)); }); } private WebSocketRequest createRequest(URI url, HttpHeaders headers, String[] protocols) { - String query = url.getRawQuery(); String requestUrl = url.getRawPath() + (query != null ? "?" + query : ""); + HttpClientRequest request = getHttpClient(url).createGet(requestUrl); - WebSocketRequest request = this.httpClientFactory.apply(url) - .createGet(requestUrl) - .setHeaders(toObjectValueMap(headers)) - .requestWebSocketUpgrade(); - - if (!ObjectUtils.isEmpty(protocols)) { - request = request.requestSubProtocols(protocols); + if (!headers.isEmpty()) { + Map> map = new HashMap<>(headers.size()); + headers.forEach((key, values) -> map.put(key, new ArrayList<>(headers.get(key)))); + request = request.setHeaders(map); } - return request; - } - - private Map> toObjectValueMap(HttpHeaders headers) { - if (headers.isEmpty()) { - return Collections.emptyMap(); - } - Map> map = new HashMap<>(headers.size()); - headers.keySet().stream().forEach(key -> map.put(key, new ArrayList<>(headers.get(key)))); - return map; + return (ObjectUtils.isEmpty(protocols) ? + request.requestWebSocketUpgrade() : + request.requestWebSocketUpgrade().requestSubProtocols(protocols)); } - private HttpHeaders getResponseHeaders(WebSocketResponse response) { + private HttpHeaders toHttpHeaders(WebSocketResponse response) { HttpHeaders headers = new HttpHeaders(); response.headerIterator().forEachRemaining(entry -> { String name = entry.getKey().toString(); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/StandardWebSocketClient.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/StandardWebSocketClient.java index 7605632eae6..61509c33c5c 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/StandardWebSocketClient.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/StandardWebSocketClient.java @@ -40,11 +40,12 @@ import org.springframework.web.reactive.socket.adapter.StandardWebSocketHandlerA import org.springframework.web.reactive.socket.adapter.StandardWebSocketSession; /** - * Java WebSocket API (JSR-356) implementation of {@link WebSocketClient}. - * + * {@link WebSocketClient} implementation for use with the Java WebSocket API. + * * @author Violeta Georgieva * @author Rossen Stoyanchev * @since 5.0 + * @see https://www.jcp.org/en/jsr/detail?id=356 */ public class StandardWebSocketClient extends WebSocketClientSupport implements WebSocketClient { @@ -71,6 +72,14 @@ public class StandardWebSocketClient extends WebSocketClientSupport implements W } + /** + * Return the configured {@link WebSocketContainer} to use. + */ + public WebSocketContainer getWebSocketContainer() { + return this.webSocketContainer; + } + + @Override public Mono execute(URI url, WebSocketHandler handler) { return execute(url, new HttpHeaders(), handler); @@ -95,13 +104,6 @@ public class StandardWebSocketClient extends WebSocketClientSupport implements W .then(completionMono); } - private ClientEndpointConfig createEndpointConfig(Configurator configurator, String[] subProtocols) { - return ClientEndpointConfig.Builder.create() - .configurator(configurator) - .preferredSubprotocols(Arrays.asList(subProtocols)) - .build(); - } - private StandardWebSocketHandlerAdapter createEndpoint(URI url, WebSocketHandler handler, MonoProcessor completion, DefaultConfigurator configurator) { @@ -112,6 +114,13 @@ public class StandardWebSocketClient extends WebSocketClientSupport implements W }); } + private ClientEndpointConfig createEndpointConfig(Configurator configurator, String[] subProtocols) { + return ClientEndpointConfig.Builder.create() + .configurator(configurator) + .preferredSubprotocols(Arrays.asList(subProtocols)) + .build(); + } + private static final class DefaultConfigurator extends Configurator { diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/UndertowWebSocketClient.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/UndertowWebSocketClient.java index a48b2c71a31..448e0ea8009 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/UndertowWebSocketClient.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/UndertowWebSocketClient.java @@ -23,7 +23,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.CancellationException; import java.util.function.Function; import javax.net.ssl.SSLContext; @@ -33,7 +32,7 @@ import io.undertow.server.DefaultByteBufferPool; import io.undertow.websockets.client.WebSocketClient.ConnectionBuilder; import io.undertow.websockets.client.WebSocketClientNegotiation; import io.undertow.websockets.core.WebSocketChannel; -import org.xnio.IoFuture.Status; +import org.xnio.IoFuture; import org.xnio.OptionMap; import org.xnio.Options; import org.xnio.Xnio; @@ -138,27 +137,22 @@ public class UndertowWebSocketClient extends WebSocketClientSupport implements W MonoProcessor completion = MonoProcessor.create(); return Mono.fromCallable( () -> { - String[] subProtocols = beforeHandshake(url, headers, handler); - DefaultNegotiation negotiation = new DefaultNegotiation(subProtocols, headers); + String[] protocols = beforeHandshake(url, headers, handler); + DefaultNegotiation negotiation = new DefaultNegotiation(protocols, headers); return this.builder.apply(url) .setClientNegotiation(negotiation) .connect() - .addNotifier((future, attachment) -> { - if (Status.DONE.equals(future.getStatus())) { - try { - handleChannel(url, handler, completion, negotiation, future.get()); - } - catch (CancellationException | IOException ex) { - completion.onError(ex); - } - } - else if (Status.FAILED.equals(future.getStatus())) { - completion.onError(future.getException()); + .addNotifier(new IoFuture.HandlingNotifier() { + + @Override + public void handleDone(WebSocketChannel channel, Object attachment) { + handleChannel(url, handler, completion, negotiation, channel); } - else { - String message = "Failed to connect" + future.getStatus(); - completion.onError(new IllegalStateException(message)); + + @Override + public void handleFailed(IOException ex, Object attachment) { + completion.onError(new IllegalStateException("Failed to connect", ex)); } }, null); })