From a2053a516e7d28e0bc924fd29a6822a2a9d70675 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Thu, 8 Dec 2016 19:47:41 +0200 Subject: [PATCH] Initial reactive, WebSocket Jetty support Issue: SPR-14527 --- build.gradle | 3 + .../adapter/JettyWebSocketHandlerAdapter.java | 158 ++++++++++++++++++ .../socket/adapter/JettyWebSocketSession.java | 92 ++++++++++ .../upgrade/JettyRequestUpgradeStrategy.java | 146 ++++++++++++++++ ...tractWebSocketHandlerIntegrationTests.java | 14 +- 5 files changed, 412 insertions(+), 1 deletion(-) create mode 100644 spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyRequestUpgradeStrategy.java diff --git a/build.gradle b/build.gradle index 7c7f576b7fb..8ace9a7022e 100644 --- a/build.gradle +++ b/build.gradle @@ -829,6 +829,9 @@ project("spring-web-reactive") { exclude group: "org.apache.tomcat", module: "tomcat-websocket-api" exclude group: "org.apache.tomcat", module: "tomcat-servlet-api" } + optional("org.eclipse.jetty.websocket:websocket-server:${jettyVersion}") { + exclude group: "javax.servlet", module: "javax.servlet" + } optional("io.undertow:undertow-websockets-jsr:${undertowVersion}") { exclude group: "org.jboss.spec.javax.websocket", module: "jboss-websocket-api_1.1_spec" } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java new file mode 100644 index 00000000000..815bd2da79e --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java @@ -0,0 +1,158 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.reactive.socket.adapter; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketFrame; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.eclipse.jetty.websocket.api.extensions.Frame; +import org.eclipse.jetty.websocket.common.OpCode; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.util.Assert; +import org.springframework.web.reactive.socket.CloseStatus; +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.WebSocketMessage; +import org.springframework.web.reactive.socket.WebSocketMessage.Type; + +/** + * Jetty {@code WebSocketHandler} implementation adapting and + * delegating to a Spring {@link WebSocketHandler}. + * + * @author Violeta Georgieva + * @since 5.0 + */ +@WebSocket +public class JettyWebSocketHandlerAdapter { + + private static final ByteBuffer EMPTY_PAYLOAD = ByteBuffer.wrap(new byte[0]); + + private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(false); + + private final WebSocketHandler handler; + + private JettyWebSocketSession wsSession; + + public JettyWebSocketHandlerAdapter(WebSocketHandler handler) { + Assert.notNull("'handler' is required"); + this.handler = handler; + } + + @OnWebSocketConnect + public void onWebSocketConnect(Session session) { + this.wsSession = new JettyWebSocketSession(session); + + HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber(); + this.handler.handle(this.wsSession).subscribe(resultSubscriber); + } + + @OnWebSocketMessage + public void onWebSocketText(String message) { + if (this.wsSession != null) { + WebSocketMessage wsMessage = toMessage(Type.TEXT, message); + this.wsSession.handleMessage(wsMessage.getType(), wsMessage); + } + } + + @OnWebSocketMessage + public void onWebSocketBinary(byte[] message, int offset, int length) { + if (this.wsSession != null) { + WebSocketMessage wsMessage = toMessage(Type.BINARY, ByteBuffer.wrap(message, offset, length)); + wsSession.handleMessage(wsMessage.getType(), wsMessage); + } + } + + @OnWebSocketFrame + public void onWebSocketFrame(Frame frame) { + if (this.wsSession != null) { + if (OpCode.PONG == frame.getOpCode()) { + ByteBuffer message = frame.getPayload() != null ? frame.getPayload() : EMPTY_PAYLOAD; + WebSocketMessage wsMessage = toMessage(Type.PONG, message); + wsSession.handleMessage(wsMessage.getType(), wsMessage); + } + } + } + + @OnWebSocketClose + public void onWebSocketClose(int statusCode, String reason) { + if (this.wsSession != null) { + this.wsSession.handleClose(new CloseStatus(statusCode, reason)); + } + } + + @OnWebSocketError + public void onWebSocketError(Throwable cause) { + if (this.wsSession != null) { + this.wsSession.handleError(cause); + } + } + + private WebSocketMessage toMessage(Type type, T message) { + if (Type.TEXT.equals(type)) { + return WebSocketMessage.create(Type.TEXT, + bufferFactory.wrap(((String) message).getBytes(StandardCharsets.UTF_8))); + } + else if (Type.BINARY.equals(type)) { + return WebSocketMessage.create(Type.BINARY, + bufferFactory.wrap((ByteBuffer) message)); + } + else if (Type.PONG.equals(type)) { + return WebSocketMessage.create(Type.PONG, + bufferFactory.wrap((ByteBuffer) message)); + } + else { + throw new IllegalArgumentException("Unexpected message type: " + message); + } + } + + private final class HandlerResultSubscriber implements Subscriber { + + @Override + public void onSubscribe(Subscription subscription) { + subscription.request(Long.MAX_VALUE); + } + + @Override + public void onNext(Void aVoid) { + // no op + } + + @Override + public void onError(Throwable ex) { + if (wsSession != null) { + wsSession.close(new CloseStatus(CloseStatus.SERVER_ERROR.getCode(), ex.getMessage())); + } + } + + @Override + public void onComplete() { + if (wsSession != null) { + wsSession.close(); + } + } + } + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java new file mode 100644 index 00000000000..0e7e330ef7f --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java @@ -0,0 +1,92 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.reactive.socket.adapter; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WriteCallback; +import org.springframework.util.ObjectUtils; +import org.springframework.web.reactive.socket.CloseStatus; +import org.springframework.web.reactive.socket.WebSocketMessage; +import org.springframework.web.reactive.socket.WebSocketSession; + +import reactor.core.publisher.Mono; + +/** + * Spring {@link WebSocketSession} adapter for Jetty's + * {@link org.eclipse.jetty.websocket.api.Session}. + * + * @author Violeta Georgieva + * @since 5.0 + */ +public class JettyWebSocketSession extends AbstractListenerWebSocketSessionSupport { + + public JettyWebSocketSession(Session session) { + super(session, ObjectUtils.getIdentityHexString(session), + session.getUpgradeRequest().getRequestURI()); + } + + @Override + protected Mono closeInternal(CloseStatus status) { + getDelegate().close(status.getCode(), status.getReason()); + return Mono.empty(); + } + + @Override + protected boolean writeInternal(WebSocketMessage message) throws IOException { + if (WebSocketMessage.Type.TEXT.equals(message.getType())) { + this.webSocketMessageProcessor.setReady(false); + getDelegate().getRemote().sendString( + new String(message.getPayload().asByteBuffer().array(), StandardCharsets.UTF_8), + new WebSocketMessageWriteCallback()); + } + else if (WebSocketMessage.Type.BINARY.equals(message.getType())) { + this.webSocketMessageProcessor.setReady(false); + getDelegate().getRemote().sendBytes(message.getPayload().asByteBuffer(), + new WebSocketMessageWriteCallback()); + } + else if (WebSocketMessage.Type.PING.equals(message.getType())) { + getDelegate().getRemote().sendPing(message.getPayload().asByteBuffer()); + } + else if (WebSocketMessage.Type.PONG.equals(message.getType())) { + getDelegate().getRemote().sendPong(message.getPayload().asByteBuffer()); + } + else { + throw new IllegalArgumentException("Unexpected message type: " + message.getType()); + } + return true; + } + + private final class WebSocketMessageWriteCallback implements WriteCallback { + + @Override + public void writeFailed(Throwable x) { + webSocketMessageProcessor.cancel(); + webSocketMessageProcessor.onError(x); + } + + @Override + public void writeSuccess() { + webSocketMessageProcessor.setReady(true); + webSocketMessageProcessor.onWritePossible(); + } + + } + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyRequestUpgradeStrategy.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyRequestUpgradeStrategy.java new file mode 100644 index 00000000000..82295c6e0a5 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyRequestUpgradeStrategy.java @@ -0,0 +1,146 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.reactive.socket.server.upgrade; + +import java.io.IOException; + +import javax.servlet.ServletContext; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.util.DecoratedObjectFactory; +import org.eclipse.jetty.websocket.server.WebSocketServerFactory; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; +import org.eclipse.jetty.websocket.servlet.WebSocketCreator; +import org.springframework.context.Lifecycle; +import org.springframework.core.NamedThreadLocal; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpResponse; +import org.springframework.http.server.reactive.ServletServerHttpRequest; +import org.springframework.http.server.reactive.ServletServerHttpResponse; +import org.springframework.util.Assert; +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.adapter.JettyWebSocketHandlerAdapter; +import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy; +import org.springframework.web.server.ServerWebExchange; + +import reactor.core.publisher.Mono; + +/** + * A {@link RequestUpgradeStrategy} for use with Jetty. + * + * @author Violeta Georgieva + * @since 5.0 + */ +public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy, Lifecycle { + + private static final ThreadLocal wsContainerHolder = + new NamedThreadLocal<>("Jetty WebSocketHandler Adapter"); + + private WebSocketServerFactory factory; + + private ServletContext servletContext; + + private volatile boolean running = false; + + @Override + public Mono upgrade(ServerWebExchange exchange, WebSocketHandler webSocketHandler) { + + JettyWebSocketHandlerAdapter adapter = + new JettyWebSocketHandlerAdapter(webSocketHandler); + + HttpServletRequest servletRequest = getHttpServletRequest(exchange.getRequest()); + HttpServletResponse servletResponse = getHttpServletResponse(exchange.getResponse()); + + if (this.servletContext == null) { + this.servletContext = servletRequest.getServletContext(); + servletContext.setAttribute(DecoratedObjectFactory.ATTR, new DecoratedObjectFactory()); + } + + try { + start(); + + Assert.isTrue(this.factory.isUpgradeRequest(servletRequest, servletResponse), "Not a WebSocket handshake"); + + wsContainerHolder.set(adapter); + this.factory.acceptWebSocket(servletRequest, servletResponse); + } + catch (IOException ex) { + return Mono.error(ex); + } + finally { + wsContainerHolder.remove(); + } + + return Mono.empty(); + } + + @Override + public void start() { + if (!isRunning() && this.servletContext != null) { + this.running = true; + try { + this.factory = new WebSocketServerFactory(this.servletContext); + this.factory.setCreator(new WebSocketCreator() { + + @Override + public Object createWebSocket(ServletUpgradeRequest req, + ServletUpgradeResponse resp) { + JettyWebSocketHandlerAdapter adapter = wsContainerHolder.get(); + Assert.state(adapter != null, "Expected JettyWebSocketHandlerAdapter"); + return adapter; + } + + }); + this.factory.start(); + } + catch (Exception ex) { + throw new IllegalStateException("Unable to start Jetty WebSocketServerFactory", ex); + } + } + } + + @Override + public void stop() { + if (isRunning()) { + this.running = false; + try { + this.factory.stop(); + } + catch (Exception ex) { + throw new IllegalStateException("Unable to stop Jetty WebSocketServerFactory", ex); + } + } + } + + @Override + public boolean isRunning() { + return this.running; + } + + private final HttpServletRequest getHttpServletRequest(ServerHttpRequest request) { + Assert.isTrue(request instanceof ServletServerHttpRequest); + return ((ServletServerHttpRequest) request).getServletRequest(); + } + + private final HttpServletResponse getHttpServletResponse(ServerHttpResponse response) { + Assert.isTrue(response instanceof ServletServerHttpResponse); + return ((ServletServerHttpResponse) response).getServletResponse(); + } + +} diff --git a/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/AbstractWebSocketHandlerIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/AbstractWebSocketHandlerIntegrationTests.java index 5d494253cb9..17d7bb49702 100644 --- a/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/AbstractWebSocketHandlerIntegrationTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/AbstractWebSocketHandlerIntegrationTests.java @@ -31,6 +31,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.http.server.reactive.HttpHandler; import org.springframework.http.server.reactive.bootstrap.HttpServer; import org.springframework.http.server.reactive.bootstrap.ReactorHttpServer; +import org.springframework.http.server.reactive.bootstrap.JettyHttpServer; import org.springframework.http.server.reactive.bootstrap.RxNettyHttpServer; import org.springframework.http.server.reactive.bootstrap.TomcatHttpServer; import org.springframework.http.server.reactive.bootstrap.UndertowHttpServer; @@ -39,6 +40,7 @@ import org.springframework.web.reactive.DispatcherHandler; import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService; import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter; import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy; +import org.springframework.web.reactive.socket.server.upgrade.JettyRequestUpgradeStrategy; import org.springframework.web.reactive.socket.server.upgrade.RxNettyRequestUpgradeStrategy; import org.springframework.web.reactive.socket.server.upgrade.TomcatRequestUpgradeStrategy; import org.springframework.web.reactive.socket.server.upgrade.UndertowRequestUpgradeStrategy; @@ -71,7 +73,8 @@ public abstract class AbstractWebSocketHandlerIntegrationTests { {new ReactorHttpServer(), ReactorNettyConfig.class}, {new RxNettyHttpServer(), RxNettyConfig.class}, {new TomcatHttpServer(base.getAbsolutePath(), WsContextListener.class), TomcatConfig.class}, - {new UndertowHttpServer(), UndertowConfig.class} + {new UndertowHttpServer(), UndertowConfig.class}, + {new JettyHttpServer(), JettyConfig.class} }; } @@ -162,4 +165,13 @@ public abstract class AbstractWebSocketHandlerIntegrationTests { } } + @Configuration + static class JettyConfig extends AbstractHandlerAdapterConfig { + + @Override + protected RequestUpgradeStrategy getUpgradeStrategy() { + return new JettyRequestUpgradeStrategy(); + } + } + }