From c097c0284a8aa2de782c5f1021fe440c23d3fc79 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 6 Dec 2016 16:09:24 -0500 Subject: [PATCH] Reactor Netty WebSocket server-side support Issue: SPR-14527 --- build.gradle | 2 +- .../web/reactive/socket/WebSocketSession.java | 6 -- .../ReactorNettyWebSocketHandlerAdapter.java | 64 +++++++++++++ .../adapter/ReactorNettyWebSocketSession.java | 96 +++++++++++++++++++ .../ReactorNettyRequestUpgradeStrategy.java | 58 +++++++++++ ...tractWebSocketHandlerIntegrationTests.java | 12 +++ ...BasicWebSocketHandlerIntegrationTests.java | 2 +- 7 files changed, 232 insertions(+), 8 deletions(-) create mode 100644 spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketHandlerAdapter.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java diff --git a/build.gradle b/build.gradle index 74a0c69c0c5..a2bd13ec512 100644 --- a/build.gradle +++ b/build.gradle @@ -818,6 +818,7 @@ project("spring-web-reactive") { optional("org.freemarker:freemarker:${freemarkerVersion}") optional "org.apache.httpcomponents:httpclient:${httpclientVersion}" optional('org.webjars:webjars-locator:0.32') + optional("io.projectreactor.ipc:reactor-netty:${reactorNettyVersion}") optional("io.reactivex:rxnetty-http:${rxnettyVersion}") { exclude group: 'io.reactivex', module: 'rxjava' } @@ -830,7 +831,6 @@ project("spring-web-reactive") { testCompile("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}") testCompile("org.eclipse.jetty:jetty-server:${jettyVersion}") testCompile("org.eclipse.jetty:jetty-servlet:${jettyVersion}") - testCompile("io.projectreactor.ipc:reactor-netty:${reactorNettyVersion}") testCompile "io.reactivex.rxjava2:rxjava:${rxjava2Version}" testCompile("io.undertow:undertow-core:${undertowVersion}") testCompile("org.jboss.xnio:xnio-api:${xnioVersion}") diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java index f094edafa82..6287832c914 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java @@ -21,8 +21,6 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import org.springframework.core.io.buffer.DataBuffer; - /** * Representation for a WebSocket session. * @@ -43,10 +41,6 @@ public interface WebSocketSession { /** * Get the flux of incoming messages. - *

Note: the caller of this method is responsible for - * releasing the DataBuffer payload of each message after consuming it - * on runtimes where a {@code PooledByteBuffer} is used such as Netty. - * @see org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer) */ Flux receive(); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketHandlerAdapter.java new file mode 100644 index 00000000000..faaf90c2b5f --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketHandlerAdapter.java @@ -0,0 +1,64 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.web.reactive.socket.adapter; + +import java.util.function.BiFunction; + +import org.reactivestreams.Publisher; +import reactor.ipc.netty.http.HttpInbound; +import reactor.ipc.netty.http.HttpOutbound; + +import org.springframework.core.io.buffer.NettyDataBufferFactory; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpResponse; +import org.springframework.util.Assert; +import org.springframework.web.reactive.socket.WebSocketHandler; + +/** + * Reactor Netty {@code WebSocketHandler} implementation adapting and + * delegating to a Spring {@link WebSocketHandler}. + * + * @author Rossen Stoyanchev + * @since 5.0 + */ +public class ReactorNettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport + implements BiFunction> { + + + private final NettyDataBufferFactory bufferFactory; + + + public ReactorNettyWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpResponse response, + WebSocketHandler handler) { + + super(request, handler); + Assert.notNull("'response' is required"); + this.bufferFactory = (NettyDataBufferFactory) response.bufferFactory(); + } + + + public NettyDataBufferFactory getBufferFactory() { + return this.bufferFactory; + } + + @Override + public Publisher apply(HttpInbound inbound, HttpOutbound outbound) { + ReactorNettyWebSocketSession session = + new ReactorNettyWebSocketSession(inbound, outbound, getUri(), getBufferFactory()); + return getDelegate().handle(session); + } + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java new file mode 100644 index 00000000000..b532a3be0f1 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java @@ -0,0 +1,96 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.web.reactive.socket.adapter; + +import java.net.URI; + +import io.netty.handler.codec.http.websocketx.WebSocketFrame; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.ipc.netty.http.HttpInbound; +import reactor.ipc.netty.http.HttpOutbound; + +import org.springframework.core.io.buffer.NettyDataBufferFactory; +import org.springframework.web.reactive.socket.CloseStatus; +import org.springframework.web.reactive.socket.WebSocketMessage; +import org.springframework.web.reactive.socket.WebSocketSession; + + +/** + * Spring {@link WebSocketSession} adapter for RxNetty's + * {@link io.reactivex.netty.protocol.http.ws.WebSocketConnection}. + * + * @author Rossen Stoyanchev + * @since 5.0 + */ +public class ReactorNettyWebSocketSession + extends NettyWebSocketSessionSupport { + + + protected ReactorNettyWebSocketSession(HttpInbound inbound, HttpOutbound outbound, + URI uri, NettyDataBufferFactory factory) { + + super(new WebSocketConnection(inbound, outbound), uri, factory); + } + + + @Override + public Flux receive() { + HttpInbound inbound = getDelegate().getHttpInbound(); + return toMessageFlux(inbound.receiveObject().cast(WebSocketFrame.class)); + } + + @Override + public Mono send(Publisher messages) { + HttpOutbound outbound = getDelegate().getHttpOutbound(); + Flux frameFlux = Flux.from(messages).map(this::toFrame); + return outbound.sendObject(frameFlux); + } + + @Override + protected Mono closeInternal(CloseStatus status) { + return Mono.error(new UnsupportedOperationException( + "Currently in Reactor Netty applications are expected to use the Cancellation" + + "returned from subscribing to the input Flux to close the WebSocket session.")); + } + + + /** + * Simple container for {@link HttpInbound} and {@link HttpOutbound}. + */ + public static class WebSocketConnection { + + private final HttpInbound inbound; + + private final HttpOutbound outbound; + + + public WebSocketConnection(HttpInbound inbound, HttpOutbound outbound) { + this.inbound = inbound; + this.outbound = outbound; + } + + public HttpInbound getHttpInbound() { + return this.inbound; + } + + public HttpOutbound getHttpOutbound() { + return this.outbound; + } + } + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java new file mode 100644 index 00000000000..7cd3fa03227 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java @@ -0,0 +1,58 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.web.reactive.socket.server.upgrade; + +import java.util.List; + +import reactor.core.publisher.Mono; + +import org.springframework.http.server.reactive.ReactorServerHttpRequest; +import org.springframework.http.server.reactive.ReactorServerHttpResponse; +import org.springframework.util.StringUtils; +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketHandlerAdapter; +import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy; +import org.springframework.web.server.ServerWebExchange; + +/** + * A {@link RequestUpgradeStrategy} for use with Reactor Netty. + * + * @author Rossen Stoyanchev + * @since 5.0 + */ +public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrategy { + + @Override + public Mono upgrade(ServerWebExchange exchange, WebSocketHandler webSocketHandler) { + + ReactorServerHttpRequest request = (ReactorServerHttpRequest) exchange.getRequest(); + ReactorServerHttpResponse response = (ReactorServerHttpResponse) exchange.getResponse(); + + ReactorNettyWebSocketHandlerAdapter reactorHandler = + new ReactorNettyWebSocketHandlerAdapter(request, response, webSocketHandler); + + String protocols = StringUtils.arrayToCommaDelimitedString(getSubProtocols(webSocketHandler)); + protocols = (StringUtils.hasText(protocols) ? protocols : null); + + return response.getReactorResponse().upgradeToWebsocket(protocols, false, reactorHandler); + } + + private static String[] getSubProtocols(WebSocketHandler webSocketHandler) { + List subProtocols = webSocketHandler.getSubProtocols(); + return subProtocols.toArray(new String[subProtocols.size()]); + } + +} diff --git a/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/AbstractWebSocketHandlerIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/AbstractWebSocketHandlerIntegrationTests.java index ec73d6a3316..681864cfd6b 100644 --- a/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/AbstractWebSocketHandlerIntegrationTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/AbstractWebSocketHandlerIntegrationTests.java @@ -27,11 +27,13 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.server.reactive.HttpHandler; import org.springframework.http.server.reactive.bootstrap.HttpServer; +import org.springframework.http.server.reactive.bootstrap.ReactorHttpServer; import org.springframework.http.server.reactive.bootstrap.RxNettyHttpServer; import org.springframework.util.SocketUtils; import org.springframework.web.reactive.DispatcherHandler; import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService; import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter; +import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy; import org.springframework.web.reactive.socket.server.upgrade.RxNettyRequestUpgradeStrategy; /** @@ -58,6 +60,7 @@ public abstract class AbstractWebSocketHandlerIntegrationTests { @Parameters public static Object[][] arguments() { return new Object[][] { + {new ReactorHttpServer(), ReactorNettyConfig.class}, {new RxNettyHttpServer(), RxNettyConfig.class} }; } @@ -110,6 +113,15 @@ public abstract class AbstractWebSocketHandlerIntegrationTests { } + @Configuration + static class ReactorNettyConfig extends AbstractHandlerAdapterConfig { + + @Override + protected RequestUpgradeStrategy createUpgradeStrategy() { + return new ReactorNettyRequestUpgradeStrategy(); + } + } + @Configuration static class RxNettyConfig extends AbstractHandlerAdapterConfig { diff --git a/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/BasicWebSocketHandlerIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/BasicWebSocketHandlerIntegrationTests.java index cd2dd26b002..80c7a54ea40 100644 --- a/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/BasicWebSocketHandlerIntegrationTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/BasicWebSocketHandlerIntegrationTests.java @@ -39,7 +39,7 @@ import org.springframework.web.reactive.socket.WebSocketSession; import static org.junit.Assert.assertEquals; /** - * Basic WebSocket integration + * Basic WebSocket integration tests. * @author Rossen Stoyanchev */ @SuppressWarnings({"unused", "WeakerAccess"})