diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java
index 13775062e6b..1ac6a8b3743 100644
--- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java
+++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java
@@ -18,7 +18,6 @@ package org.springframework.web.reactive.socket.client;
import java.net.URI;
-import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
@@ -36,8 +35,14 @@ import org.springframework.web.reactive.socket.adapter.JettyWebSocketHandlerAdap
import org.springframework.web.reactive.socket.adapter.JettyWebSocketSession;
/**
- * Jetty based implementation of {@link WebSocketClient}.
- *
+ * A {@link WebSocketClient} implementation for use with Jetty
+ * {@link org.eclipse.jetty.websocket.client.WebSocketClient}.
+ *
+ *
Note: the Jetty {@code WebSocketClient} requires
+ * lifecycle management and must be started and stopped. This is automatically
+ * managed when this class is declared as a Spring bean and created with the
+ * default constructor. See constructor notes for more details.
+ *
* @author Violeta Georgieva
* @author Rossen Stoyanchev
* @since 5.0
@@ -46,34 +51,60 @@ public class JettyWebSocketClient extends WebSocketClientSupport implements WebS
private final org.eclipse.jetty.websocket.client.WebSocketClient jettyClient;
- private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
+ private final boolean externallyManaged;
+
+ private boolean running = false;
private final Object lifecycleMonitor = new Object();
+ private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
+
/**
- * Default constructor that creates an instance of
- * {@link org.eclipse.jetty.websocket.client.WebSocketClient}.
+ * Default constructor that creates and manages an instance of a Jetty
+ * {@link org.eclipse.jetty.websocket.client.WebSocketClient WebSocketClient}.
+ * The instance can be obtained with {@link #getJettyClient()} for further
+ * configuration.
+ *
+ *
Note: When this constructor is used {@link Lifecycle}
+ * methods of this class are delegated to the Jetty {@code WebSocketClient}.
*/
public JettyWebSocketClient() {
- this(new org.eclipse.jetty.websocket.client.WebSocketClient());
+ this.jettyClient = new org.eclipse.jetty.websocket.client.WebSocketClient();
+ this.externallyManaged = false;
}
/**
- * Constructor that accepts an existing
- * {@link org.eclipse.jetty.websocket.client.WebSocketClient} instance.
- * @param jettyClient a web socket client
+ * Constructor that accepts an existing instance of a Jetty
+ * {@link org.eclipse.jetty.websocket.client.WebSocketClient WebSocketClient}.
+ *
+ *
Note: Use of this constructor implies the Jetty
+ * {@code WebSocketClient} is externally managed and hence {@link Lifecycle}
+ * methods of this class are not delegated to it.
*/
public JettyWebSocketClient(org.eclipse.jetty.websocket.client.WebSocketClient jettyClient) {
this.jettyClient = jettyClient;
+ this.externallyManaged = true;
+ }
+
+
+ /**
+ * Return the underlying Jetty {@code WebSocketClient}.
+ */
+ public org.eclipse.jetty.websocket.client.WebSocketClient getJettyClient() {
+ return this.jettyClient;
}
@Override
public void start() {
+ if (this.externallyManaged) {
+ return;
+ }
synchronized (this.lifecycleMonitor) {
if (!isRunning()) {
try {
+ this.running = true;
this.jettyClient.start();
}
catch (Exception ex) {
@@ -85,9 +116,13 @@ public class JettyWebSocketClient extends WebSocketClientSupport implements WebS
@Override
public void stop() {
+ if (this.externallyManaged) {
+ return;
+ }
synchronized (this.lifecycleMonitor) {
if (isRunning()) {
try {
+ this.running = false;
this.jettyClient.stop();
}
catch (Exception ex) {
@@ -100,7 +135,7 @@ public class JettyWebSocketClient extends WebSocketClientSupport implements WebS
@Override
public boolean isRunning() {
synchronized (this.lifecycleMonitor) {
- return this.jettyClient.isStarted();
+ return this.running;
}
}
@@ -131,15 +166,13 @@ public class JettyWebSocketClient extends WebSocketClientSupport implements WebS
private Object createJettyHandler(URI url, WebSocketHandler handler, MonoProcessor completion) {
return new JettyWebSocketHandlerAdapter(handler,
- session -> createJettySession(url, completion, session));
- }
-
- private JettyWebSocketSession createJettySession(URI url, MonoProcessor completion, Session session) {
- UpgradeResponse response = session.getUpgradeResponse();
- HttpHeaders responseHeaders = new HttpHeaders();
- response.getHeaders().forEach(responseHeaders::put);
- HandshakeInfo info = afterHandshake(url, responseHeaders);
- return new JettyWebSocketSession(session, info, this.bufferFactory, completion);
+ session -> {
+ UpgradeResponse response = session.getUpgradeResponse();
+ HttpHeaders responseHeaders = new HttpHeaders();
+ response.getHeaders().forEach(responseHeaders::put);
+ HandshakeInfo info = afterHandshake(url, responseHeaders);
+ return new JettyWebSocketSession(session, info, this.bufferFactory, completion);
+ });
}
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java
index b08da13ff71..b386b5e380a 100644
--- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java
+++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java
@@ -33,7 +33,7 @@ import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSession;
/**
- * A Reactor Netty based implementation of {@link WebSocketClient}.
+ * {@link WebSocketClient} implementation for use with Reactor Netty.
*
* @author Rossen Stoyanchev
* @since 5.0
@@ -43,15 +43,30 @@ public class ReactorNettyWebSocketClient extends WebSocketClientSupport implemen
private final HttpClient httpClient;
+ /**
+ * Default constructor.
+ */
public ReactorNettyWebSocketClient() {
- this.httpClient = HttpClient.create();
+ this(options -> {});
}
+ /**
+ * Constructor that accepts an {@link HttpClientOptions} consumer to supply
+ * to {@link HttpClient#create(Consumer)}.
+ */
public ReactorNettyWebSocketClient(Consumer super HttpClientOptions> clientOptions) {
this.httpClient = HttpClient.create(clientOptions);
}
+ /**
+ * Return the configured {@link HttpClient}.
+ */
+ public HttpClient getHttpClient() {
+ return this.httpClient;
+ }
+
+
@Override
public Mono execute(URI url, WebSocketHandler handler) {
return execute(url, new HttpHeaders(), handler);
@@ -63,18 +78,12 @@ public class ReactorNettyWebSocketClient extends WebSocketClientSupport implemen
String[] protocols = beforeHandshake(url, headers, handler);
// TODO: https://github.com/reactor/reactor-netty/issues/20
- return this.httpClient
- .get(url.toString(), request -> {
- addRequestHeaders(request, headers);
- return request.sendWebsocket();
- })
+ return getHttpClient()
+ .get(url.toString(), request -> addHeaders(request, headers).sendWebsocket())
.then(response -> {
- HttpHeaders responseHeaders = getResponseHeaders(response);
- HandshakeInfo info = afterHandshake(url, responseHeaders);
-
+ HandshakeInfo info = afterHandshake(url, toHttpHeaders(response));
ByteBufAllocator allocator = response.channel().alloc();
NettyDataBufferFactory factory = new NettyDataBufferFactory(allocator);
-
return response.receiveWebsocket((in, out) -> {
WebSocketSession session = new ReactorNettyWebSocketSession(in, out, info, factory);
return handler.handle(session);
@@ -82,13 +91,12 @@ public class ReactorNettyWebSocketClient extends WebSocketClientSupport implemen
});
}
- private void addRequestHeaders(HttpClientRequest request, HttpHeaders headers) {
- headers.keySet().stream()
- .forEach(key -> headers.get(key).stream()
- .forEach(value -> request.addHeader(key, value)));
+ private HttpClientRequest addHeaders(HttpClientRequest request, HttpHeaders headers) {
+ headers.keySet().stream().forEach(key -> request.requestHeaders().set(key, headers.get(key)));
+ return request;
}
- private HttpHeaders getResponseHeaders(HttpClientResponse response) {
+ private HttpHeaders toHttpHeaders(HttpClientResponse response) {
HttpHeaders headers = new HttpHeaders();
response.responseHeaders().forEach(entry -> {
String name = entry.getKey();
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/RxNettyWebSocketClient.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/RxNettyWebSocketClient.java
index a55b8f94e87..50b7fa401b5 100644
--- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/RxNettyWebSocketClient.java
+++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/RxNettyWebSocketClient.java
@@ -18,7 +18,6 @@ package org.springframework.web.reactive.socket.client;
import java.net.URI;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -28,11 +27,12 @@ import javax.net.ssl.SSLEngine;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
-import io.reactivex.netty.protocol.http.HttpHandlerNames;
import io.reactivex.netty.protocol.http.client.HttpClient;
+import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.ws.WebSocketConnection;
import io.reactivex.netty.protocol.http.ws.client.WebSocketRequest;
import io.reactivex.netty.protocol.http.ws.client.WebSocketResponse;
+import io.reactivex.netty.threads.RxEventLoopProvider;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;
import rx.Observable;
@@ -45,49 +45,83 @@ import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.adapter.RxNettyWebSocketSession;
+import static io.reactivex.netty.protocol.http.HttpHandlerNames.WsClientDecoder;
+
/**
- * An RxNetty based implementation of {@link WebSocketClient}.
+ * {@link WebSocketClient} implementation for use with RxNetty.
+ *
+ * Note: RxNetty {@link HttpClient} instances require a host
+ * and port in order to be created. Hence it is not possible to configure a
+ * single {@code HttpClient} instance to use upfront. Instead the constructors
+ * accept a function for obtaining client instances when establishing a
+ * connection to a specific URI. By default new instances are created per
+ * connection with a shared Netty {@code EventLoopGroup}. See constructors for
+ * more details.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public class RxNettyWebSocketClient extends WebSocketClientSupport implements WebSocketClient {
- private final Function> httpClientFactory;
+ private final Function> httpClientProvider;
/**
- * Default constructor that uses {@link HttpClient#newClient(String, int)}
- * to create HTTP client instances when connecting.
+ * Default constructor that creates {@code HttpClient} instances via
+ * {@link HttpClient#newClient(String, int)} using port 80 or 443 depending
+ * on the target URL scheme.
+ *
+ * Note: By default a new {@link HttpClient} instance
+ * is created per WebSocket connection. Those instances will share a global
+ * {@code EventLoopGroup} that RxNetty obtains via
+ * {@link RxEventLoopProvider#globalClientEventLoop(boolean)}.
*/
public RxNettyWebSocketClient() {
- this(RxNettyWebSocketClient::createDefaultHttpClient);
+ this(RxNettyWebSocketClient::getDefaultHttpClientProvider);
}
/**
- * Constructor with a function to create {@link HttpClient} instances.
- * @param httpClientFactory factory to create clients
+ * Constructor with a function to use to obtain {@link HttpClient} instances.
*/
- public RxNettyWebSocketClient(Function> httpClientFactory) {
- this.httpClientFactory = httpClientFactory;
+ public RxNettyWebSocketClient(Function> httpClientProvider) {
+ this.httpClientProvider = httpClientProvider;
}
- private static HttpClient createDefaultHttpClient(URI url) {
+ private static HttpClient getDefaultHttpClientProvider(URI url) {
boolean secure = "wss".equals(url.getScheme());
int port = url.getPort() > 0 ? url.getPort() : secure ? 443 : 80;
- HttpClient httpClient = HttpClient.newClient(url.getHost(), port);
+ HttpClient client = HttpClient.newClient(url.getHost(), port);
if (secure) {
try {
SSLContext context = SSLContext.getDefault();
SSLEngine engine = context.createSSLEngine(url.getHost(), port);
engine.setUseClientMode(true);
- httpClient.secure(engine);
+ client.secure(engine);
}
catch (NoSuchAlgorithmException ex) {
throw new IllegalStateException("Failed to create HttpClient for " + url, ex);
}
}
- return httpClient;
+ return client;
+ }
+
+
+ /**
+ * Return the configured {@link HttpClient} provider depending on which
+ * constructor was used.
+ */
+ public Function> getHttpClientProvider() {
+ return this.httpClientProvider;
+ }
+
+ /**
+ * Return an {@link HttpClient} instance to use to connect to the given URI.
+ * The default implementation invokes the {@link #getHttpClientProvider()}
+ * provider} function created or supplied at construction time.
+ * @param url the full URL of the WebSocket endpoint.
+ */
+ public HttpClient getHttpClient(URI url) {
+ return this.httpClientProvider.apply(url);
}
@@ -98,14 +132,12 @@ public class RxNettyWebSocketClient extends WebSocketClientSupport implements We
@Override
public Mono execute(URI url, HttpHeaders headers, WebSocketHandler handler) {
- Observable completion = connectInternal(url, headers, handler);
+ Observable completion = executeInternal(url, headers, handler);
return Mono.from(RxReactiveStreams.toPublisher(completion));
}
- private Observable connectInternal(URI url, HttpHeaders headers, WebSocketHandler handler) {
-
+ private Observable executeInternal(URI url, HttpHeaders headers, WebSocketHandler handler) {
String[] protocols = beforeHandshake(url, headers, handler);
-
return createRequest(url, headers, protocols)
.flatMap(response -> {
Observable conn = response.getWebSocketConnection();
@@ -113,48 +145,35 @@ public class RxNettyWebSocketClient extends WebSocketClientSupport implements We
})
.flatMap(tuple -> {
WebSocketResponse response = tuple.getT1();
- HttpHeaders responseHeaders = getResponseHeaders(response);
- HandshakeInfo info = afterHandshake(url, responseHeaders);
+ WebSocketConnection conn = tuple.getT2();
+ HandshakeInfo info = afterHandshake(url, toHttpHeaders(response));
ByteBufAllocator allocator = response.unsafeNettyChannel().alloc();
NettyDataBufferFactory factory = new NettyDataBufferFactory(allocator);
-
- WebSocketConnection conn = tuple.getT2();
RxNettyWebSocketSession session = new RxNettyWebSocketSession(conn, info, factory);
- String name = HttpHandlerNames.WsClientDecoder.getName();
- session.aggregateFrames(response.unsafeNettyChannel(), name);
+ session.aggregateFrames(response.unsafeNettyChannel(), WsClientDecoder.getName());
return RxReactiveStreams.toObservable(handler.handle(session));
});
}
private WebSocketRequest createRequest(URI url, HttpHeaders headers, String[] protocols) {
-
String query = url.getRawQuery();
String requestUrl = url.getRawPath() + (query != null ? "?" + query : "");
+ HttpClientRequest request = getHttpClient(url).createGet(requestUrl);
- WebSocketRequest request = this.httpClientFactory.apply(url)
- .createGet(requestUrl)
- .setHeaders(toObjectValueMap(headers))
- .requestWebSocketUpgrade();
-
- if (!ObjectUtils.isEmpty(protocols)) {
- request = request.requestSubProtocols(protocols);
+ if (!headers.isEmpty()) {
+ Map> map = new HashMap<>(headers.size());
+ headers.forEach((key, values) -> map.put(key, new ArrayList<>(headers.get(key))));
+ request = request.setHeaders(map);
}
- return request;
- }
-
- private Map> toObjectValueMap(HttpHeaders headers) {
- if (headers.isEmpty()) {
- return Collections.emptyMap();
- }
- Map> map = new HashMap<>(headers.size());
- headers.keySet().stream().forEach(key -> map.put(key, new ArrayList<>(headers.get(key))));
- return map;
+ return (ObjectUtils.isEmpty(protocols) ?
+ request.requestWebSocketUpgrade() :
+ request.requestWebSocketUpgrade().requestSubProtocols(protocols));
}
- private HttpHeaders getResponseHeaders(WebSocketResponse response) {
+ private HttpHeaders toHttpHeaders(WebSocketResponse response) {
HttpHeaders headers = new HttpHeaders();
response.headerIterator().forEachRemaining(entry -> {
String name = entry.getKey().toString();
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/StandardWebSocketClient.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/StandardWebSocketClient.java
index 7605632eae6..61509c33c5c 100644
--- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/StandardWebSocketClient.java
+++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/StandardWebSocketClient.java
@@ -40,11 +40,12 @@ import org.springframework.web.reactive.socket.adapter.StandardWebSocketHandlerA
import org.springframework.web.reactive.socket.adapter.StandardWebSocketSession;
/**
- * Java WebSocket API (JSR-356) implementation of {@link WebSocketClient}.
- *
+ * {@link WebSocketClient} implementation for use with the Java WebSocket API.
+ *
* @author Violeta Georgieva
* @author Rossen Stoyanchev
* @since 5.0
+ * @see https://www.jcp.org/en/jsr/detail?id=356
*/
public class StandardWebSocketClient extends WebSocketClientSupport implements WebSocketClient {
@@ -71,6 +72,14 @@ public class StandardWebSocketClient extends WebSocketClientSupport implements W
}
+ /**
+ * Return the configured {@link WebSocketContainer} to use.
+ */
+ public WebSocketContainer getWebSocketContainer() {
+ return this.webSocketContainer;
+ }
+
+
@Override
public Mono execute(URI url, WebSocketHandler handler) {
return execute(url, new HttpHeaders(), handler);
@@ -95,13 +104,6 @@ public class StandardWebSocketClient extends WebSocketClientSupport implements W
.then(completionMono);
}
- private ClientEndpointConfig createEndpointConfig(Configurator configurator, String[] subProtocols) {
- return ClientEndpointConfig.Builder.create()
- .configurator(configurator)
- .preferredSubprotocols(Arrays.asList(subProtocols))
- .build();
- }
-
private StandardWebSocketHandlerAdapter createEndpoint(URI url, WebSocketHandler handler,
MonoProcessor completion, DefaultConfigurator configurator) {
@@ -112,6 +114,13 @@ public class StandardWebSocketClient extends WebSocketClientSupport implements W
});
}
+ private ClientEndpointConfig createEndpointConfig(Configurator configurator, String[] subProtocols) {
+ return ClientEndpointConfig.Builder.create()
+ .configurator(configurator)
+ .preferredSubprotocols(Arrays.asList(subProtocols))
+ .build();
+ }
+
private static final class DefaultConfigurator extends Configurator {
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/UndertowWebSocketClient.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/UndertowWebSocketClient.java
index a48b2c71a31..448e0ea8009 100644
--- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/UndertowWebSocketClient.java
+++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/UndertowWebSocketClient.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CancellationException;
import java.util.function.Function;
import javax.net.ssl.SSLContext;
@@ -33,7 +32,7 @@ import io.undertow.server.DefaultByteBufferPool;
import io.undertow.websockets.client.WebSocketClient.ConnectionBuilder;
import io.undertow.websockets.client.WebSocketClientNegotiation;
import io.undertow.websockets.core.WebSocketChannel;
-import org.xnio.IoFuture.Status;
+import org.xnio.IoFuture;
import org.xnio.OptionMap;
import org.xnio.Options;
import org.xnio.Xnio;
@@ -138,27 +137,22 @@ public class UndertowWebSocketClient extends WebSocketClientSupport implements W
MonoProcessor completion = MonoProcessor.create();
return Mono.fromCallable(
() -> {
- String[] subProtocols = beforeHandshake(url, headers, handler);
- DefaultNegotiation negotiation = new DefaultNegotiation(subProtocols, headers);
+ String[] protocols = beforeHandshake(url, headers, handler);
+ DefaultNegotiation negotiation = new DefaultNegotiation(protocols, headers);
return this.builder.apply(url)
.setClientNegotiation(negotiation)
.connect()
- .addNotifier((future, attachment) -> {
- if (Status.DONE.equals(future.getStatus())) {
- try {
- handleChannel(url, handler, completion, negotiation, future.get());
- }
- catch (CancellationException | IOException ex) {
- completion.onError(ex);
- }
- }
- else if (Status.FAILED.equals(future.getStatus())) {
- completion.onError(future.getException());
+ .addNotifier(new IoFuture.HandlingNotifier() {
+
+ @Override
+ public void handleDone(WebSocketChannel channel, Object attachment) {
+ handleChannel(url, handler, completion, negotiation, channel);
}
- else {
- String message = "Failed to connect" + future.getStatus();
- completion.onError(new IllegalStateException(message));
+
+ @Override
+ public void handleFailed(IOException ex, Object attachment) {
+ completion.onError(new IllegalStateException("Failed to connect", ex));
}
}, null);
})