Browse Source

Polish reactive WebSocketClient implementations

pull/1277/head
Rossen Stoyanchev 9 years ago
parent
commit
384e851bd1
  1. 73
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java
  2. 40
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java
  3. 107
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/RxNettyWebSocketClient.java
  4. 27
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/StandardWebSocketClient.java
  5. 30
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/UndertowWebSocketClient.java

73
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java

@ -18,7 +18,6 @@ package org.springframework.web.reactive.socket.client; @@ -18,7 +18,6 @@ package org.springframework.web.reactive.socket.client;
import java.net.URI;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
@ -36,8 +35,14 @@ import org.springframework.web.reactive.socket.adapter.JettyWebSocketHandlerAdap @@ -36,8 +35,14 @@ import org.springframework.web.reactive.socket.adapter.JettyWebSocketHandlerAdap
import org.springframework.web.reactive.socket.adapter.JettyWebSocketSession;
/**
* Jetty based implementation of {@link WebSocketClient}.
*
* A {@link WebSocketClient} implementation for use with Jetty
* {@link org.eclipse.jetty.websocket.client.WebSocketClient}.
*
* <p><strong>Note: </strong> the Jetty {@code WebSocketClient} requires
* lifecycle management and must be started and stopped. This is automatically
* managed when this class is declared as a Spring bean and created with the
* default constructor. See constructor notes for more details.
*
* @author Violeta Georgieva
* @author Rossen Stoyanchev
* @since 5.0
@ -46,34 +51,60 @@ public class JettyWebSocketClient extends WebSocketClientSupport implements WebS @@ -46,34 +51,60 @@ public class JettyWebSocketClient extends WebSocketClientSupport implements WebS
private final org.eclipse.jetty.websocket.client.WebSocketClient jettyClient;
private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
private final boolean externallyManaged;
private boolean running = false;
private final Object lifecycleMonitor = new Object();
private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
/**
* Default constructor that creates an instance of
* {@link org.eclipse.jetty.websocket.client.WebSocketClient}.
* Default constructor that creates and manages an instance of a Jetty
* {@link org.eclipse.jetty.websocket.client.WebSocketClient WebSocketClient}.
* The instance can be obtained with {@link #getJettyClient()} for further
* configuration.
*
* <p><strong>Note: </strong> When this constructor is used {@link Lifecycle}
* methods of this class are delegated to the Jetty {@code WebSocketClient}.
*/
public JettyWebSocketClient() {
this(new org.eclipse.jetty.websocket.client.WebSocketClient());
this.jettyClient = new org.eclipse.jetty.websocket.client.WebSocketClient();
this.externallyManaged = false;
}
/**
* Constructor that accepts an existing
* {@link org.eclipse.jetty.websocket.client.WebSocketClient} instance.
* @param jettyClient a web socket client
* Constructor that accepts an existing instance of a Jetty
* {@link org.eclipse.jetty.websocket.client.WebSocketClient WebSocketClient}.
*
* <p><strong>Note: </strong> Use of this constructor implies the Jetty
* {@code WebSocketClient} is externally managed and hence {@link Lifecycle}
* methods of this class are not delegated to it.
*/
public JettyWebSocketClient(org.eclipse.jetty.websocket.client.WebSocketClient jettyClient) {
this.jettyClient = jettyClient;
this.externallyManaged = true;
}
/**
* Return the underlying Jetty {@code WebSocketClient}.
*/
public org.eclipse.jetty.websocket.client.WebSocketClient getJettyClient() {
return this.jettyClient;
}
@Override
public void start() {
if (this.externallyManaged) {
return;
}
synchronized (this.lifecycleMonitor) {
if (!isRunning()) {
try {
this.running = true;
this.jettyClient.start();
}
catch (Exception ex) {
@ -85,9 +116,13 @@ public class JettyWebSocketClient extends WebSocketClientSupport implements WebS @@ -85,9 +116,13 @@ public class JettyWebSocketClient extends WebSocketClientSupport implements WebS
@Override
public void stop() {
if (this.externallyManaged) {
return;
}
synchronized (this.lifecycleMonitor) {
if (isRunning()) {
try {
this.running = false;
this.jettyClient.stop();
}
catch (Exception ex) {
@ -100,7 +135,7 @@ public class JettyWebSocketClient extends WebSocketClientSupport implements WebS @@ -100,7 +135,7 @@ public class JettyWebSocketClient extends WebSocketClientSupport implements WebS
@Override
public boolean isRunning() {
synchronized (this.lifecycleMonitor) {
return this.jettyClient.isStarted();
return this.running;
}
}
@ -131,15 +166,13 @@ public class JettyWebSocketClient extends WebSocketClientSupport implements WebS @@ -131,15 +166,13 @@ public class JettyWebSocketClient extends WebSocketClientSupport implements WebS
private Object createJettyHandler(URI url, WebSocketHandler handler, MonoProcessor<Void> completion) {
return new JettyWebSocketHandlerAdapter(handler,
session -> createJettySession(url, completion, session));
}
private JettyWebSocketSession createJettySession(URI url, MonoProcessor<Void> completion, Session session) {
UpgradeResponse response = session.getUpgradeResponse();
HttpHeaders responseHeaders = new HttpHeaders();
response.getHeaders().forEach(responseHeaders::put);
HandshakeInfo info = afterHandshake(url, responseHeaders);
return new JettyWebSocketSession(session, info, this.bufferFactory, completion);
session -> {
UpgradeResponse response = session.getUpgradeResponse();
HttpHeaders responseHeaders = new HttpHeaders();
response.getHeaders().forEach(responseHeaders::put);
HandshakeInfo info = afterHandshake(url, responseHeaders);
return new JettyWebSocketSession(session, info, this.bufferFactory, completion);
});
}

40
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java

@ -33,7 +33,7 @@ import org.springframework.web.reactive.socket.WebSocketSession; @@ -33,7 +33,7 @@ import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSession;
/**
* A Reactor Netty based implementation of {@link WebSocketClient}.
* {@link WebSocketClient} implementation for use with Reactor Netty.
*
* @author Rossen Stoyanchev
* @since 5.0
@ -43,15 +43,30 @@ public class ReactorNettyWebSocketClient extends WebSocketClientSupport implemen @@ -43,15 +43,30 @@ public class ReactorNettyWebSocketClient extends WebSocketClientSupport implemen
private final HttpClient httpClient;
/**
* Default constructor.
*/
public ReactorNettyWebSocketClient() {
this.httpClient = HttpClient.create();
this(options -> {});
}
/**
* Constructor that accepts an {@link HttpClientOptions} consumer to supply
* to {@link HttpClient#create(Consumer)}.
*/
public ReactorNettyWebSocketClient(Consumer<? super HttpClientOptions> clientOptions) {
this.httpClient = HttpClient.create(clientOptions);
}
/**
* Return the configured {@link HttpClient}.
*/
public HttpClient getHttpClient() {
return this.httpClient;
}
@Override
public Mono<Void> execute(URI url, WebSocketHandler handler) {
return execute(url, new HttpHeaders(), handler);
@ -63,18 +78,12 @@ public class ReactorNettyWebSocketClient extends WebSocketClientSupport implemen @@ -63,18 +78,12 @@ public class ReactorNettyWebSocketClient extends WebSocketClientSupport implemen
String[] protocols = beforeHandshake(url, headers, handler);
// TODO: https://github.com/reactor/reactor-netty/issues/20
return this.httpClient
.get(url.toString(), request -> {
addRequestHeaders(request, headers);
return request.sendWebsocket();
})
return getHttpClient()
.get(url.toString(), request -> addHeaders(request, headers).sendWebsocket())
.then(response -> {
HttpHeaders responseHeaders = getResponseHeaders(response);
HandshakeInfo info = afterHandshake(url, responseHeaders);
HandshakeInfo info = afterHandshake(url, toHttpHeaders(response));
ByteBufAllocator allocator = response.channel().alloc();
NettyDataBufferFactory factory = new NettyDataBufferFactory(allocator);
return response.receiveWebsocket((in, out) -> {
WebSocketSession session = new ReactorNettyWebSocketSession(in, out, info, factory);
return handler.handle(session);
@ -82,13 +91,12 @@ public class ReactorNettyWebSocketClient extends WebSocketClientSupport implemen @@ -82,13 +91,12 @@ public class ReactorNettyWebSocketClient extends WebSocketClientSupport implemen
});
}
private void addRequestHeaders(HttpClientRequest request, HttpHeaders headers) {
headers.keySet().stream()
.forEach(key -> headers.get(key).stream()
.forEach(value -> request.addHeader(key, value)));
private HttpClientRequest addHeaders(HttpClientRequest request, HttpHeaders headers) {
headers.keySet().stream().forEach(key -> request.requestHeaders().set(key, headers.get(key)));
return request;
}
private HttpHeaders getResponseHeaders(HttpClientResponse response) {
private HttpHeaders toHttpHeaders(HttpClientResponse response) {
HttpHeaders headers = new HttpHeaders();
response.responseHeaders().forEach(entry -> {
String name = entry.getKey();

107
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/RxNettyWebSocketClient.java

@ -18,7 +18,6 @@ package org.springframework.web.reactive.socket.client; @@ -18,7 +18,6 @@ package org.springframework.web.reactive.socket.client;
import java.net.URI;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -28,11 +27,12 @@ import javax.net.ssl.SSLEngine; @@ -28,11 +27,12 @@ import javax.net.ssl.SSLEngine;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.reactivex.netty.protocol.http.HttpHandlerNames;
import io.reactivex.netty.protocol.http.client.HttpClient;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.ws.WebSocketConnection;
import io.reactivex.netty.protocol.http.ws.client.WebSocketRequest;
import io.reactivex.netty.protocol.http.ws.client.WebSocketResponse;
import io.reactivex.netty.threads.RxEventLoopProvider;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;
import rx.Observable;
@ -45,49 +45,83 @@ import org.springframework.web.reactive.socket.HandshakeInfo; @@ -45,49 +45,83 @@ import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.adapter.RxNettyWebSocketSession;
import static io.reactivex.netty.protocol.http.HttpHandlerNames.WsClientDecoder;
/**
* An RxNetty based implementation of {@link WebSocketClient}.
* {@link WebSocketClient} implementation for use with RxNetty.
*
* <p><strong>Note: </strong> RxNetty {@link HttpClient} instances require a host
* and port in order to be created. Hence it is not possible to configure a
* single {@code HttpClient} instance to use upfront. Instead the constructors
* accept a function for obtaining client instances when establishing a
* connection to a specific URI. By default new instances are created per
* connection with a shared Netty {@code EventLoopGroup}. See constructors for
* more details.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public class RxNettyWebSocketClient extends WebSocketClientSupport implements WebSocketClient {
private final Function<URI, HttpClient<ByteBuf, ByteBuf>> httpClientFactory;
private final Function<URI, HttpClient<ByteBuf, ByteBuf>> httpClientProvider;
/**
* Default constructor that uses {@link HttpClient#newClient(String, int)}
* to create HTTP client instances when connecting.
* Default constructor that creates {@code HttpClient} instances via
* {@link HttpClient#newClient(String, int)} using port 80 or 443 depending
* on the target URL scheme.
*
* <p><strong>Note: </strong> By default a new {@link HttpClient} instance
* is created per WebSocket connection. Those instances will share a global
* {@code EventLoopGroup} that RxNetty obtains via
* {@link RxEventLoopProvider#globalClientEventLoop(boolean)}.
*/
public RxNettyWebSocketClient() {
this(RxNettyWebSocketClient::createDefaultHttpClient);
this(RxNettyWebSocketClient::getDefaultHttpClientProvider);
}
/**
* Constructor with a function to create {@link HttpClient} instances.
* @param httpClientFactory factory to create clients
* Constructor with a function to use to obtain {@link HttpClient} instances.
*/
public RxNettyWebSocketClient(Function<URI, HttpClient<ByteBuf, ByteBuf>> httpClientFactory) {
this.httpClientFactory = httpClientFactory;
public RxNettyWebSocketClient(Function<URI, HttpClient<ByteBuf, ByteBuf>> httpClientProvider) {
this.httpClientProvider = httpClientProvider;
}
private static HttpClient<ByteBuf, ByteBuf> createDefaultHttpClient(URI url) {
private static HttpClient<ByteBuf, ByteBuf> getDefaultHttpClientProvider(URI url) {
boolean secure = "wss".equals(url.getScheme());
int port = url.getPort() > 0 ? url.getPort() : secure ? 443 : 80;
HttpClient<ByteBuf, ByteBuf> httpClient = HttpClient.newClient(url.getHost(), port);
HttpClient<ByteBuf, ByteBuf> client = HttpClient.newClient(url.getHost(), port);
if (secure) {
try {
SSLContext context = SSLContext.getDefault();
SSLEngine engine = context.createSSLEngine(url.getHost(), port);
engine.setUseClientMode(true);
httpClient.secure(engine);
client.secure(engine);
}
catch (NoSuchAlgorithmException ex) {
throw new IllegalStateException("Failed to create HttpClient for " + url, ex);
}
}
return httpClient;
return client;
}
/**
* Return the configured {@link HttpClient} provider depending on which
* constructor was used.
*/
public Function<URI, HttpClient<ByteBuf, ByteBuf>> getHttpClientProvider() {
return this.httpClientProvider;
}
/**
* Return an {@link HttpClient} instance to use to connect to the given URI.
* The default implementation invokes the {@link #getHttpClientProvider()}
* provider} function created or supplied at construction time.
* @param url the full URL of the WebSocket endpoint.
*/
public HttpClient<ByteBuf, ByteBuf> getHttpClient(URI url) {
return this.httpClientProvider.apply(url);
}
@ -98,14 +132,12 @@ public class RxNettyWebSocketClient extends WebSocketClientSupport implements We @@ -98,14 +132,12 @@ public class RxNettyWebSocketClient extends WebSocketClientSupport implements We
@Override
public Mono<Void> execute(URI url, HttpHeaders headers, WebSocketHandler handler) {
Observable<Void> completion = connectInternal(url, headers, handler);
Observable<Void> completion = executeInternal(url, headers, handler);
return Mono.from(RxReactiveStreams.toPublisher(completion));
}
private Observable<Void> connectInternal(URI url, HttpHeaders headers, WebSocketHandler handler) {
private Observable<Void> executeInternal(URI url, HttpHeaders headers, WebSocketHandler handler) {
String[] protocols = beforeHandshake(url, headers, handler);
return createRequest(url, headers, protocols)
.flatMap(response -> {
Observable<WebSocketConnection> conn = response.getWebSocketConnection();
@ -113,48 +145,35 @@ public class RxNettyWebSocketClient extends WebSocketClientSupport implements We @@ -113,48 +145,35 @@ public class RxNettyWebSocketClient extends WebSocketClientSupport implements We
})
.flatMap(tuple -> {
WebSocketResponse<ByteBuf> response = tuple.getT1();
HttpHeaders responseHeaders = getResponseHeaders(response);
HandshakeInfo info = afterHandshake(url, responseHeaders);
WebSocketConnection conn = tuple.getT2();
HandshakeInfo info = afterHandshake(url, toHttpHeaders(response));
ByteBufAllocator allocator = response.unsafeNettyChannel().alloc();
NettyDataBufferFactory factory = new NettyDataBufferFactory(allocator);
WebSocketConnection conn = tuple.getT2();
RxNettyWebSocketSession session = new RxNettyWebSocketSession(conn, info, factory);
String name = HttpHandlerNames.WsClientDecoder.getName();
session.aggregateFrames(response.unsafeNettyChannel(), name);
session.aggregateFrames(response.unsafeNettyChannel(), WsClientDecoder.getName());
return RxReactiveStreams.toObservable(handler.handle(session));
});
}
private WebSocketRequest<ByteBuf> createRequest(URI url, HttpHeaders headers, String[] protocols) {
String query = url.getRawQuery();
String requestUrl = url.getRawPath() + (query != null ? "?" + query : "");
HttpClientRequest<ByteBuf, ByteBuf> request = getHttpClient(url).createGet(requestUrl);
WebSocketRequest<ByteBuf> request = this.httpClientFactory.apply(url)
.createGet(requestUrl)
.setHeaders(toObjectValueMap(headers))
.requestWebSocketUpgrade();
if (!ObjectUtils.isEmpty(protocols)) {
request = request.requestSubProtocols(protocols);
if (!headers.isEmpty()) {
Map<String, List<Object>> map = new HashMap<>(headers.size());
headers.forEach((key, values) -> map.put(key, new ArrayList<>(headers.get(key))));
request = request.setHeaders(map);
}
return request;
}
private Map<String, List<Object>> toObjectValueMap(HttpHeaders headers) {
if (headers.isEmpty()) {
return Collections.emptyMap();
}
Map<String, List<Object>> map = new HashMap<>(headers.size());
headers.keySet().stream().forEach(key -> map.put(key, new ArrayList<>(headers.get(key))));
return map;
return (ObjectUtils.isEmpty(protocols) ?
request.requestWebSocketUpgrade() :
request.requestWebSocketUpgrade().requestSubProtocols(protocols));
}
private HttpHeaders getResponseHeaders(WebSocketResponse<ByteBuf> response) {
private HttpHeaders toHttpHeaders(WebSocketResponse<ByteBuf> response) {
HttpHeaders headers = new HttpHeaders();
response.headerIterator().forEachRemaining(entry -> {
String name = entry.getKey().toString();

27
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/StandardWebSocketClient.java

@ -40,11 +40,12 @@ import org.springframework.web.reactive.socket.adapter.StandardWebSocketHandlerA @@ -40,11 +40,12 @@ import org.springframework.web.reactive.socket.adapter.StandardWebSocketHandlerA
import org.springframework.web.reactive.socket.adapter.StandardWebSocketSession;
/**
* Java WebSocket API (JSR-356) implementation of {@link WebSocketClient}.
*
* {@link WebSocketClient} implementation for use with the Java WebSocket API.
*
* @author Violeta Georgieva
* @author Rossen Stoyanchev
* @since 5.0
* @see <a href="https://www.jcp.org/en/jsr/detail?id=356">https://www.jcp.org/en/jsr/detail?id=356</a>
*/
public class StandardWebSocketClient extends WebSocketClientSupport implements WebSocketClient {
@ -71,6 +72,14 @@ public class StandardWebSocketClient extends WebSocketClientSupport implements W @@ -71,6 +72,14 @@ public class StandardWebSocketClient extends WebSocketClientSupport implements W
}
/**
* Return the configured {@link WebSocketContainer} to use.
*/
public WebSocketContainer getWebSocketContainer() {
return this.webSocketContainer;
}
@Override
public Mono<Void> execute(URI url, WebSocketHandler handler) {
return execute(url, new HttpHeaders(), handler);
@ -95,13 +104,6 @@ public class StandardWebSocketClient extends WebSocketClientSupport implements W @@ -95,13 +104,6 @@ public class StandardWebSocketClient extends WebSocketClientSupport implements W
.then(completionMono);
}
private ClientEndpointConfig createEndpointConfig(Configurator configurator, String[] subProtocols) {
return ClientEndpointConfig.Builder.create()
.configurator(configurator)
.preferredSubprotocols(Arrays.asList(subProtocols))
.build();
}
private StandardWebSocketHandlerAdapter createEndpoint(URI url, WebSocketHandler handler,
MonoProcessor<Void> completion, DefaultConfigurator configurator) {
@ -112,6 +114,13 @@ public class StandardWebSocketClient extends WebSocketClientSupport implements W @@ -112,6 +114,13 @@ public class StandardWebSocketClient extends WebSocketClientSupport implements W
});
}
private ClientEndpointConfig createEndpointConfig(Configurator configurator, String[] subProtocols) {
return ClientEndpointConfig.Builder.create()
.configurator(configurator)
.preferredSubprotocols(Arrays.asList(subProtocols))
.build();
}
private static final class DefaultConfigurator extends Configurator {

30
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/UndertowWebSocketClient.java

@ -23,7 +23,6 @@ import java.util.Arrays; @@ -23,7 +23,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.function.Function;
import javax.net.ssl.SSLContext;
@ -33,7 +32,7 @@ import io.undertow.server.DefaultByteBufferPool; @@ -33,7 +32,7 @@ import io.undertow.server.DefaultByteBufferPool;
import io.undertow.websockets.client.WebSocketClient.ConnectionBuilder;
import io.undertow.websockets.client.WebSocketClientNegotiation;
import io.undertow.websockets.core.WebSocketChannel;
import org.xnio.IoFuture.Status;
import org.xnio.IoFuture;
import org.xnio.OptionMap;
import org.xnio.Options;
import org.xnio.Xnio;
@ -138,27 +137,22 @@ public class UndertowWebSocketClient extends WebSocketClientSupport implements W @@ -138,27 +137,22 @@ public class UndertowWebSocketClient extends WebSocketClientSupport implements W
MonoProcessor<Void> completion = MonoProcessor.create();
return Mono.fromCallable(
() -> {
String[] subProtocols = beforeHandshake(url, headers, handler);
DefaultNegotiation negotiation = new DefaultNegotiation(subProtocols, headers);
String[] protocols = beforeHandshake(url, headers, handler);
DefaultNegotiation negotiation = new DefaultNegotiation(protocols, headers);
return this.builder.apply(url)
.setClientNegotiation(negotiation)
.connect()
.addNotifier((future, attachment) -> {
if (Status.DONE.equals(future.getStatus())) {
try {
handleChannel(url, handler, completion, negotiation, future.get());
}
catch (CancellationException | IOException ex) {
completion.onError(ex);
}
}
else if (Status.FAILED.equals(future.getStatus())) {
completion.onError(future.getException());
.addNotifier(new IoFuture.HandlingNotifier<WebSocketChannel, Object>() {
@Override
public void handleDone(WebSocketChannel channel, Object attachment) {
handleChannel(url, handler, completion, negotiation, channel);
}
else {
String message = "Failed to connect" + future.getStatus();
completion.onError(new IllegalStateException(message));
@Override
public void handleFailed(IOException ex, Object attachment) {
completion.onError(new IllegalStateException("Failed to connect", ex));
}
}, null);
})

Loading…
Cancel
Save