From 7be2943c031c4fb1e325fbd9b292c8524d317d2e Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 5 Jul 2018 21:28:29 -0400 Subject: [PATCH] Simplify WebSocket client implementationss in WebFlux 1. Eliminate WebSocketClientSupport base class whose main value was to provide logging but those methods get in the way of inserting a log prefix. 2. Remove checks and synchronization in lifecycle methods of Jetty client since underlying Jetty client already has that. --- .../socket/client/JettyWebSocketClient.java | 73 +++++++++---------- .../client/ReactorNettyWebSocketClient.java | 26 +++++-- .../client/StandardWebSocketClient.java | 18 ++++- .../client/UndertowWebSocketClient.java | 20 ++++- .../socket/client/WebSocketClientSupport.java | 59 --------------- 5 files changed, 82 insertions(+), 114 deletions(-) delete mode 100644 spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/WebSocketClientSupport.java diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java index a30550c56a4..b60d8fcb07d 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java @@ -17,8 +17,9 @@ package org.springframework.web.reactive.socket.client; import java.net.URI; -import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.eclipse.jetty.websocket.api.UpgradeRequest; import org.eclipse.jetty.websocket.api.UpgradeResponse; import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; @@ -48,15 +49,14 @@ import org.springframework.web.reactive.socket.adapter.JettyWebSocketSession; * @author Rossen Stoyanchev * @since 5.0 */ -public class JettyWebSocketClient extends WebSocketClientSupport implements WebSocketClient, Lifecycle { +public class JettyWebSocketClient implements WebSocketClient, Lifecycle { - private final org.eclipse.jetty.websocket.client.WebSocketClient jettyClient; + private static final Log logger = LogFactory.getLog(JettyWebSocketClient.class); - private final boolean externallyManaged; - private volatile boolean running = false; + private final org.eclipse.jetty.websocket.client.WebSocketClient jettyClient; - private final Object lifecycleMonitor = new Object(); + private final boolean externallyManaged; private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); @@ -99,43 +99,31 @@ public class JettyWebSocketClient extends WebSocketClientSupport implements WebS @Override public void start() { - if (this.externallyManaged) { - return; - } - synchronized (this.lifecycleMonitor) { - if (!isRunning()) { - try { - this.running = true; - this.jettyClient.start(); - } - catch (Exception ex) { - throw new IllegalStateException("Failed to start Jetty WebSocketClient", ex); - } + if (!this.externallyManaged) { + try { + this.jettyClient.start(); + } + catch (Exception ex) { + throw new IllegalStateException("Failed to start Jetty WebSocketClient", ex); } } } @Override public void stop() { - if (this.externallyManaged) { - return; - } - synchronized (this.lifecycleMonitor) { - if (isRunning()) { - try { - this.running = false; - this.jettyClient.stop(); - } - catch (Exception ex) { - throw new IllegalStateException("Error stopping Jetty WebSocketClient", ex); - } + if (!this.externallyManaged) { + try { + this.jettyClient.stop(); + } + catch (Exception ex) { + throw new IllegalStateException("Error stopping Jetty WebSocketClient", ex); } } } @Override public boolean isRunning() { - return this.running; + return this.jettyClient.isRunning(); } @@ -153,23 +141,28 @@ public class JettyWebSocketClient extends WebSocketClientSupport implements WebS MonoProcessor completionMono = MonoProcessor.create(); return Mono.fromCallable( () -> { - List protocols = beforeHandshake(url, headers, handler); - ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest(); - upgradeRequest.setSubProtocols(protocols); - Object jettyHandler = createJettyHandler(url, handler, completionMono); + if (logger.isDebugEnabled()) { + logger.debug("Connecting to " + url); + } + Object jettyHandler = createHandler(url, handler, completionMono); + ClientUpgradeRequest request = new ClientUpgradeRequest(); + request.setSubProtocols(handler.getSubProtocols()); UpgradeListener upgradeListener = new DefaultUpgradeListener(headers); - return this.jettyClient.connect(jettyHandler, url, upgradeRequest, upgradeListener); + return this.jettyClient.connect(jettyHandler, url, request, upgradeListener); }) .then(completionMono); } - private Object createJettyHandler(URI url, WebSocketHandler handler, MonoProcessor completion) { + private Object createHandler(URI url, WebSocketHandler handler, MonoProcessor completion) { return new JettyWebSocketHandlerAdapter(handler, session -> { - UpgradeResponse response = session.getUpgradeResponse(); + if (logger.isDebugEnabled()) { + logger.debug("Connected to " + url); + } HttpHeaders responseHeaders = new HttpHeaders(); - response.getHeaders().forEach(responseHeaders::put); - HandshakeInfo info = afterHandshake(url, responseHeaders); + session.getUpgradeResponse().getHeaders().forEach(responseHeaders::put); + String protocol = responseHeaders.getFirst("Sec-WebSocket-Protocol"); + HandshakeInfo info = new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol); return new JettyWebSocketSession(session, info, this.bufferFactory, completion); }); } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java index 13e557e47e7..0f8215e1ec4 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java @@ -16,8 +16,9 @@ package org.springframework.web.reactive.socket.client; import java.net.URI; -import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; import reactor.netty.http.websocket.WebsocketInbound; @@ -36,7 +37,10 @@ import org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSess * @author Rossen Stoyanchev * @since 5.0 */ -public class ReactorNettyWebSocketClient extends WebSocketClientSupport implements WebSocketClient { +public class ReactorNettyWebSocketClient implements WebSocketClient { + + private static final Log logger = LogFactory.getLog(ReactorNettyWebSocketClient.class); + private final HttpClient httpClient; @@ -71,15 +75,21 @@ public class ReactorNettyWebSocketClient extends WebSocketClientSupport implemen } @Override - public Mono execute(URI url, HttpHeaders httpHeaders, WebSocketHandler handler) { - List protocols = beforeHandshake(url, httpHeaders, handler); - + public Mono execute(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) { + if (logger.isDebugEnabled()) { + logger.debug("Connecting to " + url); + } return getHttpClient() - .headers(nettyHeaders -> setNettyHeaders(httpHeaders, nettyHeaders)) - .websocket(StringUtils.collectionToCommaDelimitedString(protocols)) + .headers(nettyHeaders -> setNettyHeaders(requestHeaders, nettyHeaders)) + .websocket(StringUtils.collectionToCommaDelimitedString(handler.getSubProtocols())) .uri(url.toString()) .handle((inbound, outbound) -> { - HandshakeInfo info = afterHandshake(url, toHttpHeaders(inbound)); + if (logger.isDebugEnabled()) { + logger.debug("Connected to " + url); + } + HttpHeaders responseHeaders = toHttpHeaders(inbound); + String protocol = responseHeaders.getFirst("Sec-WebSocket-Protocol"); + HandshakeInfo info = new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol); NettyDataBufferFactory factory = new NettyDataBufferFactory(outbound.alloc()); WebSocketSession session = new ReactorNettyWebSocketSession(inbound, outbound, info, factory); return handler.handle(session); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/StandardWebSocketClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/StandardWebSocketClient.java index dde278b0c6d..15b45d7db68 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/StandardWebSocketClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/StandardWebSocketClient.java @@ -27,6 +27,8 @@ import javax.websocket.HandshakeResponse; import javax.websocket.Session; import javax.websocket.WebSocketContainer; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; import reactor.core.scheduler.Schedulers; @@ -47,7 +49,10 @@ import org.springframework.web.reactive.socket.adapter.StandardWebSocketSession; * @since 5.0 * @see https://www.jcp.org/en/jsr/detail?id=356 */ -public class StandardWebSocketClient extends WebSocketClientSupport implements WebSocketClient { +public class StandardWebSocketClient implements WebSocketClient { + + private static final Log logger = LogFactory.getLog(StandardWebSocketClient.class); + private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); @@ -94,7 +99,10 @@ public class StandardWebSocketClient extends WebSocketClientSupport implements W MonoProcessor completionMono = MonoProcessor.create(); return Mono.fromCallable( () -> { - List protocols = beforeHandshake(url, requestHeaders, handler); + if (logger.isDebugEnabled()) { + logger.debug("Connecting to " + url); + } + List protocols = handler.getSubProtocols(); DefaultConfigurator configurator = new DefaultConfigurator(requestHeaders); Endpoint endpoint = createEndpoint(url, handler, completionMono, configurator); ClientEndpointConfig config = createEndpointConfig(configurator, protocols); @@ -108,8 +116,12 @@ public class StandardWebSocketClient extends WebSocketClientSupport implements W MonoProcessor completion, DefaultConfigurator configurator) { return new StandardWebSocketHandlerAdapter(handler, session -> { + if (logger.isDebugEnabled()) { + logger.debug("Connected to " + url); + } HttpHeaders responseHeaders = configurator.getResponseHeaders(); - HandshakeInfo info = afterHandshake(url, responseHeaders); + String protocol = responseHeaders.getFirst("Sec-WebSocket-Protocol"); + HandshakeInfo info = new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol); return createWebSocketSession(session, info, completion); }); } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/UndertowWebSocketClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/UndertowWebSocketClient.java index bdfa01c8770..f9ca84d5321 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/UndertowWebSocketClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/UndertowWebSocketClient.java @@ -28,6 +28,8 @@ import io.undertow.server.DefaultByteBufferPool; import io.undertow.websockets.client.WebSocketClient.ConnectionBuilder; import io.undertow.websockets.client.WebSocketClientNegotiation; import io.undertow.websockets.core.WebSocketChannel; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.xnio.IoFuture; import org.xnio.XnioWorker; import reactor.core.publisher.Mono; @@ -50,7 +52,9 @@ import org.springframework.web.reactive.socket.adapter.UndertowWebSocketSession; * @author Rossen Stoyanchev * @since 5.0 */ -public class UndertowWebSocketClient extends WebSocketClientSupport implements WebSocketClient { +public class UndertowWebSocketClient implements WebSocketClient { + + private static final Log logger = LogFactory.getLog(UndertowWebSocketClient.class); private static final int DEFAULT_POOL_BUFFER_SIZE = 8192; @@ -153,8 +157,11 @@ public class UndertowWebSocketClient extends WebSocketClientSupport implements W MonoProcessor completion = MonoProcessor.create(); return Mono.fromCallable( () -> { + if (logger.isDebugEnabled()) { + logger.debug("Connecting to " + url); + } + List protocols = handler.getSubProtocols(); ConnectionBuilder builder = createConnectionBuilder(url); - List protocols = beforeHandshake(url, headers, handler); DefaultNegotiation negotiation = new DefaultNegotiation(protocols, headers, builder); builder.setClientNegotiation(negotiation); return builder.connect().addNotifier( @@ -165,7 +172,7 @@ public class UndertowWebSocketClient extends WebSocketClientSupport implements W } @Override public void handleFailed(IOException ex, Object attachment) { - completion.onError(new IllegalStateException("Failed to connect", ex)); + completion.onError(new IllegalStateException("Failed to connect to " + url, ex)); } }, null); }) @@ -189,7 +196,12 @@ public class UndertowWebSocketClient extends WebSocketClientSupport implements W private void handleChannel(URI url, WebSocketHandler handler, MonoProcessor completion, DefaultNegotiation negotiation, WebSocketChannel channel) { - HandshakeInfo info = afterHandshake(url, negotiation.getResponseHeaders()); + if (logger.isDebugEnabled()) { + logger.debug("Connected to " + url); + } + HttpHeaders responseHeaders = negotiation.getResponseHeaders(); + String protocol = responseHeaders.getFirst("Sec-WebSocket-Protocol"); + HandshakeInfo info = new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol); UndertowWebSocketSession session = new UndertowWebSocketSession(channel, info, this.bufferFactory, completion); UndertowWebSocketHandlerAdapter adapter = new UndertowWebSocketHandlerAdapter(session); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/WebSocketClientSupport.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/WebSocketClientSupport.java deleted file mode 100644 index fa3c8f5d3b8..00000000000 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/WebSocketClientSupport.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright 2002-2017 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.web.reactive.socket.client; - -import java.net.URI; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import reactor.core.publisher.Mono; - -import org.springframework.http.HttpHeaders; -import org.springframework.web.reactive.socket.HandshakeInfo; -import org.springframework.web.reactive.socket.WebSocketHandler; - -/** - * Base class for {@link WebSocketClient} implementations. - * - * @author Rossen Stoyanchev - * @since 5.0 - */ -public class WebSocketClientSupport { - - private static final String SEC_WEBSOCKET_PROTOCOL = "Sec-WebSocket-Protocol"; - - - protected final Log logger = LogFactory.getLog(getClass()); - - - protected List beforeHandshake(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) { - if (logger.isDebugEnabled()) { - logger.debug("Connecting to " + url); - } - return handler.getSubProtocols(); - } - - protected HandshakeInfo afterHandshake(URI url, HttpHeaders responseHeaders) { - if (logger.isDebugEnabled()) { - logger.debug("Connected to " + url + ", " + responseHeaders); - } - String protocol = responseHeaders.getFirst(SEC_WEBSOCKET_PROTOCOL); - return new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol); - } - -}