From b4b7b163d1197caa338ea17c026ee862526b0ef5 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Fri, 23 Dec 2016 09:45:21 +0200 Subject: [PATCH] Add WebSocketClient for Undertow Issue: SPR-14527 --- .../UndertowWebSocketHandlerAdapter.java | 20 +- .../client/UndertowWebSocketClient.java | 234 ++++++++++++++++++ .../UndertowRequestUpgradeStrategy.java | 1 - .../server/WebSocketIntegrationTests.java | 32 +++ 4 files changed, 285 insertions(+), 2 deletions(-) create mode 100644 spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/UndertowWebSocketClient.java diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java index 1239fd018da..57c61edd01c 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java @@ -26,9 +26,12 @@ import io.undertow.websockets.core.BufferedTextMessage; import io.undertow.websockets.core.CloseMessage; import io.undertow.websockets.core.WebSocketChannel; import io.undertow.websockets.spi.WebSocketHttpExchange; + import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import reactor.core.publisher.MonoProcessor; + import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.web.reactive.socket.CloseStatus; @@ -50,11 +53,20 @@ public class UndertowWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupp private UndertowWebSocketSession delegateSession; + private final MonoProcessor completionMono; + public UndertowWebSocketHandlerAdapter(WebSocketHandler delegate, HandshakeInfo info, DataBufferFactory bufferFactory) { + this(delegate, info, bufferFactory, null); + } + + public UndertowWebSocketHandlerAdapter(WebSocketHandler delegate, HandshakeInfo info, + DataBufferFactory bufferFactory, MonoProcessor completionMono) { + super(delegate, info, bufferFactory); + this.completionMono = completionMono; } @@ -117,9 +129,9 @@ public class UndertowWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupp throw new IllegalArgumentException("Unexpected message type: " + message); } } - } + private final class HandlerResultSubscriber implements Subscriber { @Override @@ -134,12 +146,18 @@ public class UndertowWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupp @Override public void onError(Throwable ex) { + if (completionMono != null) { + completionMono.onError(ex); + } int code = CloseStatus.SERVER_ERROR.getCode(); delegateSession.close(new CloseStatus(code, ex.getMessage())); } @Override public void onComplete() { + if (completionMono != null) { + completionMono.onComplete(); + } delegateSession.close(); } } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/UndertowWebSocketClient.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/UndertowWebSocketClient.java new file mode 100644 index 00000000000..0ee60771071 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/UndertowWebSocketClient.java @@ -0,0 +1,234 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.reactive.socket.client; + +import java.io.IOException; +import java.net.URI; +import java.security.NoSuchAlgorithmException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CancellationException; +import java.util.function.Function; + +import javax.net.ssl.SSLContext; + +import io.undertow.protocols.ssl.UndertowXnioSsl; +import io.undertow.server.DefaultByteBufferPool; +import io.undertow.websockets.WebSocketExtension; +import io.undertow.websockets.client.WebSocketClientNegotiation; +import io.undertow.websockets.core.WebSocketChannel; + +import org.xnio.IoFuture; +import org.xnio.IoFuture.Notifier; +import org.xnio.IoFuture.Status; +import org.xnio.OptionMap; +import org.xnio.Options; +import org.xnio.Xnio; +import org.xnio.XnioWorker; + +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; + +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.http.HttpHeaders; +import org.springframework.web.reactive.socket.HandshakeInfo; +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.adapter.UndertowWebSocketHandlerAdapter; + +/** + * An Undertow based implementation of {@link WebSocketClient}. + * + * @author Violeta Georgieva + * @since 5.0 + */ +public class UndertowWebSocketClient extends WebSocketClientSupport implements WebSocketClient { + + private static final int DEFAULT_BUFFER_SIZE = 8192; + + private static XnioWorker worker; + + static { + try { + worker = Xnio.getInstance().createWorker(OptionMap.builder() + .set(Options.WORKER_IO_THREADS, 2) + .set(Options.CONNECTION_HIGH_WATER, 1000000) + .set(Options.CONNECTION_LOW_WATER, 1000000) + .set(Options.WORKER_TASK_CORE_THREADS, 30) + .set(Options.WORKER_TASK_MAX_THREADS, 30) + .set(Options.TCP_NODELAY, true) + .set(Options.CORK, true) + .getMap()); + } + catch (IOException ex) { + throw new RuntimeException(ex); + } + } + + private final Function builder; + + + /** + * Default constructor that uses + * {@link io.undertow.websockets.client.WebSocketClient#connectionBuilder(XnioWorker, ByteBufferPool, URI)} + * to create a web socket connection. + */ + public UndertowWebSocketClient() { + this(UndertowWebSocketClient::createDefaultConnectionBuilder); + } + + /** + * Constructor that accepts an existing + * {@link io.undertow.websockets.client.WebSocketClient#connectionBuilder(XnioWorker, ByteBufferPool, URI)} + * instance. + * @param builder a connection builder that can be used to create a web socket connection. + */ + public UndertowWebSocketClient(Function builder) { + this.builder = builder; + } + + private static io.undertow.websockets.client.WebSocketClient.ConnectionBuilder createDefaultConnectionBuilder( + URI url) { + + io.undertow.websockets.client.WebSocketClient.ConnectionBuilder builder = + io.undertow.websockets.client.WebSocketClient.connectionBuilder( + worker, new DefaultByteBufferPool(false, DEFAULT_BUFFER_SIZE), url); + + boolean secure = "wss".equals(url.getScheme()); + if (secure) { + try { + UndertowXnioSsl ssl = new UndertowXnioSsl(Xnio.getInstance(), + OptionMap.EMPTY, SSLContext.getDefault()); + builder.setSsl(ssl); + } + catch (NoSuchAlgorithmException ex) { + throw new RuntimeException("Failed to create Undertow ConnectionBuilder for " + url, ex); + } + } + + return builder; + } + + + @Override + public Mono execute(URI url, WebSocketHandler handler) { + return execute(url, new HttpHeaders(), handler); + } + + @Override + public Mono execute(URI url, HttpHeaders headers, WebSocketHandler handler) { + return connectInternal(url, headers, handler); + } + + private Mono connectInternal(URI url, HttpHeaders headers, WebSocketHandler handler) { + MonoProcessor processor = MonoProcessor.create(); + return Mono.fromCallable( + () -> { + WSClientNegotiation clientNegotiation = + new WSClientNegotiation(beforeHandshake(url, headers, handler), + Collections.emptyList(), headers); + + io.undertow.websockets.client.WebSocketClient.ConnectionBuilder builder = + this.builder.apply(url).setClientNegotiation(clientNegotiation); + + IoFuture future = builder.connect(); + future.addNotifier(new ResultNotifier(url, handler, clientNegotiation, processor), new Object()); + return future; + }) + .then(processor); + } + + + private static final class ResultNotifier implements Notifier { + + private final URI url; + + private final WebSocketHandler handler; + + private final WSClientNegotiation clientNegotiation; + + private final MonoProcessor processor; + + public ResultNotifier(URI url, WebSocketHandler handler, + WSClientNegotiation clientNegotiation, MonoProcessor processor) { + this.url = url; + this.handler = handler; + this.clientNegotiation = clientNegotiation; + this.processor = processor; + } + + @Override + public void notify(IoFuture ioFuture, + Object attachment) { + if (Status.CANCELLED.equals(ioFuture.getStatus())) { + processor.onError(null); + } + else if (Status.FAILED.equals(ioFuture.getStatus())) { + processor.onError(ioFuture.getException()); + } + else if (Status.DONE.equals(ioFuture.getStatus())) { + try { + WebSocketChannel channel = ioFuture.get(); + DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); + HandshakeInfo info = new HandshakeInfo(url, clientNegotiation.getResponseHeaders(), + Mono.empty(), Optional.ofNullable(channel.getSubProtocol())); + + UndertowWebSocketHandlerAdapter adapter = + new UndertowWebSocketHandlerAdapter(handler, + info, bufferFactory, processor); + adapter.onConnect(null, channel); + } + catch (CancellationException | IOException ex) { + processor.onError(ex); + } + } + } + } + + + private static final class WSClientNegotiation extends WebSocketClientNegotiation { + + private final HttpHeaders requestHeaders; + + private HttpHeaders responseHeaders = new HttpHeaders(); + + public WSClientNegotiation(String[] subProtocols, + List extensions, HttpHeaders requestHeaders) { + super(Arrays.asList(subProtocols), extensions); + this.requestHeaders = requestHeaders; + } + + @Override + public void beforeRequest(Map> headers) { + requestHeaders.forEach((k, v) -> headers.put(k, v)); + } + + @Override + public void afterRequest(Map> headers) { + headers.forEach((k, v) -> responseHeaders.put(k, v)); + } + + public HttpHeaders getResponseHeaders() { + return responseHeaders; + } + } + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/UndertowRequestUpgradeStrategy.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/UndertowRequestUpgradeStrategy.java index 25d22960ee7..959a7d75172 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/UndertowRequestUpgradeStrategy.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/UndertowRequestUpgradeStrategy.java @@ -17,7 +17,6 @@ package org.springframework.web.reactive.socket.server.upgrade; import java.security.Principal; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Optional; diff --git a/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/WebSocketIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/WebSocketIntegrationTests.java index 41017303713..51f71fe5e26 100644 --- a/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/WebSocketIntegrationTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/WebSocketIntegrationTests.java @@ -32,6 +32,7 @@ import reactor.core.publisher.ReplayProcessor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.HttpHeaders; +import org.springframework.http.server.reactive.bootstrap.RxNettyHttpServer; import org.springframework.web.reactive.HandlerMapping; import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping; import org.springframework.web.reactive.socket.HandshakeInfo; @@ -42,6 +43,7 @@ import org.springframework.web.reactive.socket.client.JettyWebSocketClient; import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient; import org.springframework.web.reactive.socket.client.RxNettyWebSocketClient; import org.springframework.web.reactive.socket.client.StandardWebSocketClient; +import org.springframework.web.reactive.socket.client.UndertowWebSocketClient; import org.springframework.web.reactive.socket.client.WebSocketClient; import static org.junit.Assert.assertEquals; @@ -84,6 +86,21 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests testEcho(new StandardWebSocketClient()); } + @Test + public void echoUndertowClient() throws Exception { + if (server instanceof RxNettyHttpServer) { + // Caused by: java.io.IOException: Upgrade responses cannot have a transfer coding + // at org.xnio.http.HttpUpgrade$HttpUpgradeState.handleUpgrade(HttpUpgrade.java:490) + // at org.xnio.http.HttpUpgrade$HttpUpgradeState.access$1200(HttpUpgrade.java:165) + // at org.xnio.http.HttpUpgrade$HttpUpgradeState$UpgradeResultListener.handleEvent(HttpUpgrade.java:461) + // at org.xnio.http.HttpUpgrade$HttpUpgradeState$UpgradeResultListener.handleEvent(HttpUpgrade.java:400) + // at org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92) + + return; + } + testEcho(new UndertowWebSocketClient()); + } + private void testEcho(WebSocketClient client) throws URISyntaxException { int count = 100; Flux input = Flux.range(1, count).map(index -> "msg-" + index); @@ -124,6 +141,21 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests testSubProtocol(new StandardWebSocketClient()); } + @Test + public void subProtocolUndertowClient() throws Exception { + if (server instanceof RxNettyHttpServer) { + // Caused by: java.io.IOException: Upgrade responses cannot have a transfer coding + // at org.xnio.http.HttpUpgrade$HttpUpgradeState.handleUpgrade(HttpUpgrade.java:490) + // at org.xnio.http.HttpUpgrade$HttpUpgradeState.access$1200(HttpUpgrade.java:165) + // at org.xnio.http.HttpUpgrade$HttpUpgradeState$UpgradeResultListener.handleEvent(HttpUpgrade.java:461) + // at org.xnio.http.HttpUpgrade$HttpUpgradeState$UpgradeResultListener.handleEvent(HttpUpgrade.java:400) + // at org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92) + + return; + } + testSubProtocol(new UndertowWebSocketClient()); + } + private void testSubProtocol(WebSocketClient client) throws URISyntaxException { String protocol = "echo-v1"; AtomicReference infoRef = new AtomicReference<>();