Browse Source

Minor refactoring + polish reactive WebSocket support

Rename classes not specific to Tomcat:
TomcatWebSocketSession -> StandardWebSocketSession
TomcatWebSocketHandlerAdapter -> StandardWebSocketHandlerAdapter

WebSocketSessionSupport is renamed to AbstractWebSocketSession since it
actually is a WebSocketSession and pre-implements a number of methods.

ServerEndpointRegistration is now package private (mainly for use in
upgrade strategies) and renamed to DefaultServerEndpointConfig.
pull/1274/head
Rossen Stoyanchev 9 years ago
parent
commit
47e141675f
  1. 5
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/CloseStatus.java
  2. 24
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/HandshakeInfo.java
  3. 9
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketHandler.java
  4. 23
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketMessage.java
  5. 10
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java
  6. 6
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java
  7. 29
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractWebSocketSession.java
  8. 52
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java
  9. 7
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java
  10. 6
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/NettyWebSocketSessionSupport.java
  11. 15
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java
  12. 8
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java
  13. 63
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketHandlerAdapter.java
  14. 7
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketSession.java
  15. 38
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java
  16. 5
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java
  17. 31
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketHandlerAdapterSupport.java
  18. 2
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java
  19. 2
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/RxNettyWebSocketClient.java
  20. 4
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/WebSocketClient.java
  21. 2
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/package-info.java
  22. 21
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/RequestUpgradeStrategy.java
  23. 7
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/WebSocketService.java
  24. 7
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/support/HandshakeWebSocketService.java
  25. 10
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/support/WebSocketHandlerAdapter.java
  26. 32
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/DefaultServerEndpointConfig.java
  27. 20
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyRequestUpgradeStrategy.java
  28. 19
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java
  29. 19
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/RxNettyRequestUpgradeStrategy.java
  30. 22
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/TomcatRequestUpgradeStrategy.java
  31. 30
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/UndertowRequestUpgradeStrategy.java
  32. 2
      spring-websocket/src/main/java/org/springframework/web/socket/server/standard/ServerEndpointRegistration.java

5
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/CloseStatus.java

@ -23,11 +23,10 @@ import org.springframework.util.ObjectUtils; @@ -23,11 +23,10 @@ import org.springframework.util.ObjectUtils;
* Representation of WebSocket "close" status codes and reasons. Status codes
* in the 1xxx range are pre-defined by the protocol.
*
* <p>See <a href="https://tools.ietf.org/html/rfc6455#section-7.4.1">
* RFC 6455, Section 7.4.1 "Defined Status Codes"</a>.
*
* @author Rossen Stoyanchev
* @since 5.0
* @see <a href="https://tools.ietf.org/html/rfc6455#section-7.4.1">
* RFC 6455, Section 7.4.1 "Defined Status Codes"</a>
*/
public final class CloseStatus {

24
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/HandshakeInfo.java

@ -24,10 +24,12 @@ import org.springframework.http.HttpHeaders; @@ -24,10 +24,12 @@ import org.springframework.http.HttpHeaders;
import org.springframework.util.Assert;
/**
* Simple container of information from a WebSocket handshake request.
* Simple container of information related to the handshake request that started
* the {@link WebSocketSession} session.
*
* @author Rossen Stoyanchev
* @since 5.0
* @see WebSocketSession#getHandshakeInfo()
*/
public class HandshakeInfo {
@ -35,31 +37,41 @@ public class HandshakeInfo { @@ -35,31 +37,41 @@ public class HandshakeInfo {
private final HttpHeaders headers;
private final Mono<Principal> principal;
private final Mono<Principal> principalMono;
public HandshakeInfo(URI uri, HttpHeaders headers, Mono<Principal> principal) {
public HandshakeInfo(URI uri, HttpHeaders headers, Mono<Principal> principalMono) {
Assert.notNull(uri, "URI is required.");
Assert.notNull(headers, "HttpHeaders are required.");
Assert.notNull(principal, "Prinicpal is required.");
Assert.notNull(principalMono, "Principal is required.");
this.uri = uri;
this.headers = headers;
this.principal = principal;
this.principalMono = principalMono;
}
/**
* Return the URL for the WebSocket endpoint.
*/
public URI getUri() {
return this.uri;
}
/**
* Return the headers from the handshake HTTP request.
*/
public HttpHeaders getHeaders() {
return this.headers;
}
/**
* Return the principal associated with the handshake HTTP request, if any.
*/
public Mono<Principal> getPrincipal() {
return this.principal;
return this.principalMono;
}
@Override
public String toString() {
return "HandshakeInfo[uri=" + this.uri + ", headers=" + this.headers + "]";

9
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketHandler.java

@ -21,7 +21,7 @@ import java.util.List; @@ -21,7 +21,7 @@ import java.util.List;
import reactor.core.publisher.Mono;
/**
* Handler for a WebSocket-style session interaction.
* Handler for a WebSocket session.
*
* @author Rossen Stoyanchev
* @since 5.0
@ -37,9 +37,10 @@ public interface WebSocketHandler { @@ -37,9 +37,10 @@ public interface WebSocketHandler {
}
/**
* Handle the given WebSocket session.
* @param session the session
* @return signals completion for session handling
* Handle the WebSocket session.
* @param session the session to handle
* @return completion {@code Mono<Void>} to indicate the outcome of the
* WebSocket session handling.
*/
Mono<Void> handle(WebSocketSession session);

23
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketMessage.java

@ -24,7 +24,9 @@ import org.springframework.util.ObjectUtils; @@ -24,7 +24,9 @@ import org.springframework.util.ObjectUtils;
/**
* Representation of a WebSocket message.
* Use one of the static factory methods in this class to create a message.
* <p>See static factory methods in {@link WebSocketSession} for creating messages
* with the {@link org.springframework.core.io.buffer.DataBufferFactory
* DataBufferFactory} for the session.
*
* @author Rossen Stoyanchev
* @since 5.0
@ -37,15 +39,10 @@ public class WebSocketMessage { @@ -37,15 +39,10 @@ public class WebSocketMessage {
/**
* Constructor for a WebSocketMessage. To create, see factory methods:
* <ul>
* <li>{@link WebSocketSession#textMessage}
* <li>{@link WebSocketSession#binaryMessage}
* <li>{@link WebSocketSession#pingMessage}
* <li>{@link WebSocketSession#pongMessage}
* </ul>
* <p>Alternatively use {@link WebSocketSession#bufferFactory()} to create
* the payload and then invoke this constructor.
* Constructor for a WebSocketMessage.
* <p>See static factory methods in {@link WebSocketSession} or alternatively
* use {@link WebSocketSession#bufferFactory()} to create the payload and
* then invoke this constructor.
*/
public WebSocketMessage(Type type, DataBuffer payload) {
Assert.notNull(type, "'type' must not be null");
@ -81,7 +78,7 @@ public class WebSocketMessage { @@ -81,7 +78,7 @@ public class WebSocketMessage {
/**
* Retain the data buffer for the message payload, which is useful on
* runtimes with pooled buffers, e.g. Netty. A shortcut for:
* runtimes (e.g. Netty) with pooled buffers. A shortcut for:
* <pre>
* DataBuffer payload = message.getPayload();
* DataBufferUtils.retain(payload);
@ -94,8 +91,8 @@ public class WebSocketMessage { @@ -94,8 +91,8 @@ public class WebSocketMessage {
}
/**
* Release the payload {@code DataBuffer} which is useful on runtimes with
* pooled buffers such as Netty. Effectively a shortcut for:
* Release the payload {@code DataBuffer} which is useful on runtimes
* (e.g. Netty) with pooled buffers such as Netty. A shortcut for:
* <pre>
* DataBuffer payload = message.getPayload();
* DataBufferUtils.release(payload);

10
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java

@ -25,7 +25,15 @@ import org.springframework.core.io.buffer.DataBuffer; @@ -25,7 +25,15 @@ import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
/**
* Representation for a WebSocket session.
* Represents a WebSocket session with Reactive Streams input and output.
*
* <p>On the server side a WebSocket session can be handled by mapping
* requests to a {@link WebSocketHandler} and ensuring there is a
* {@link org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter
* WebSocketHandlerAdapter} strategy registered in Spring configuration.
* On the client side a {@link WebSocketHandler} can be provided to a
* {@link org.springframework.web.reactive.socket.client.WebSocketClient
* WebSocketClient}.
*
* @author Rossen Stoyanchev
* @since 5.0

6
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java

@ -33,13 +33,15 @@ import org.springframework.web.reactive.socket.WebSocketMessage.Type; @@ -33,13 +33,15 @@ import org.springframework.web.reactive.socket.WebSocketMessage.Type;
import org.springframework.web.reactive.socket.WebSocketSession;
/**
* Base class for Listener-based {@link WebSocketSession} adapters.
* Base class for {@link WebSocketSession} implementations that bridge between
* event-listener WebSocket APIs (e.g. Java WebSocket API JSR-356, Jetty,
* Undertow) and Reactive Streams.
*
* @author Violeta Georgieva
* @author Rossen Stoyanchev
* @since 5.0
*/
public abstract class AbstractListenerWebSocketSession<T> extends WebSocketSessionSupport<T> {
public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSocketSession<T> {
/**
* The "back-pressure" buffer size to use if the underlying WebSocket API

29
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketSessionSupport.java → spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractWebSocketSession.java

@ -28,20 +28,19 @@ import reactor.core.publisher.Mono; @@ -28,20 +28,19 @@ import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
/**
* Base class for {@link WebSocketSession} implementations wrapping and
* delegating to the native WebSocket session (or connection) of the underlying
* WebSocket runtime.
* Convenient base class for {@link WebSocketSession} implementations that
* holds common fields and exposes accessors. Also implements the
* {@code WebSocketMessage} factory methods.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public abstract class WebSocketSessionSupport<T> implements WebSocketSession {
public abstract class AbstractWebSocketSession<T> implements WebSocketSession {
protected final Log logger = LogFactory.getLog(getClass());
@ -58,7 +57,7 @@ public abstract class WebSocketSessionSupport<T> implements WebSocketSession { @@ -58,7 +57,7 @@ public abstract class WebSocketSessionSupport<T> implements WebSocketSession {
/**
* Create a new instance and associate the given attributes with it.
*/
protected WebSocketSessionSupport(T delegate, String id, HandshakeInfo handshakeInfo,
protected AbstractWebSocketSession(T delegate, String id, HandshakeInfo handshakeInfo,
DataBufferFactory bufferFactory) {
Assert.notNull(delegate, "Native session is required.");
@ -73,10 +72,7 @@ public abstract class WebSocketSessionSupport<T> implements WebSocketSession { @@ -73,10 +72,7 @@ public abstract class WebSocketSessionSupport<T> implements WebSocketSession {
}
/**
* Return the native session of the underlying runtime.
*/
public T getDelegate() {
protected T getDelegate() {
return this.delegate;
}
@ -105,6 +101,9 @@ public abstract class WebSocketSessionSupport<T> implements WebSocketSession { @@ -105,6 +101,9 @@ public abstract class WebSocketSessionSupport<T> implements WebSocketSession {
return this.bufferFactory;
}
// WebSocketMessage factory methods
@Override
public WebSocketMessage textMessage(String payload) {
byte[] bytes = payload.getBytes(StandardCharsets.UTF_8);
@ -130,16 +129,6 @@ public abstract class WebSocketSessionSupport<T> implements WebSocketSession { @@ -130,16 +129,6 @@ public abstract class WebSocketSessionSupport<T> implements WebSocketSession {
return new WebSocketMessage(WebSocketMessage.Type.PONG, payload);
}
@Override
public final Mono<Void> close(CloseStatus status) {
if (logger.isDebugEnabled()) {
logger.debug("Closing " + this);
}
return closeInternal(status);
}
protected abstract Mono<Void> closeInternal(CloseStatus status);
@Override
public String toString() {

52
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java

@ -40,10 +40,11 @@ import org.springframework.web.reactive.socket.WebSocketMessage; @@ -40,10 +40,11 @@ import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketMessage.Type;
/**
* Jetty {@code WebSocketHandler} implementation adapting and
* delegating to a Spring {@link WebSocketHandler}.
* Jetty {@link WebSocket @WebSocket} handler that delegates events to a
* reactive {@link WebSocketHandler} and its session.
*
* @author Violeta Georgieva
* @author Rossen Stoyanchev
* @since 5.0
*/
@WebSocket
@ -51,47 +52,47 @@ public class JettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport @@ -51,47 +52,47 @@ public class JettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport
private static final ByteBuffer EMPTY_PAYLOAD = ByteBuffer.wrap(new byte[0]);
private JettyWebSocketSession session;
private JettyWebSocketSession delegateSession;
public JettyWebSocketHandlerAdapter(HandshakeInfo handshakeInfo, DataBufferFactory bufferFactory,
WebSocketHandler delegate) {
public JettyWebSocketHandlerAdapter(WebSocketHandler delegate, HandshakeInfo info,
DataBufferFactory bufferFactory) {
super(handshakeInfo, bufferFactory, delegate);
super(delegate, info, bufferFactory);
}
@OnWebSocketConnect
public void onWebSocketConnect(Session session) {
this.session = new JettyWebSocketSession(session, getHandshakeInfo(), getBufferFactory());
this.delegateSession = new JettyWebSocketSession(session, getHandshakeInfo(), bufferFactory());
HandlerResultSubscriber subscriber = new HandlerResultSubscriber();
getDelegate().handle(this.session).subscribe(subscriber);
getDelegate().handle(this.delegateSession).subscribe(subscriber);
}
@OnWebSocketMessage
public void onWebSocketText(String message) {
if (this.session != null) {
if (this.delegateSession != null) {
WebSocketMessage webSocketMessage = toMessage(Type.TEXT, message);
this.session.handleMessage(webSocketMessage.getType(), webSocketMessage);
this.delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage);
}
}
@OnWebSocketMessage
public void onWebSocketBinary(byte[] message, int offset, int length) {
if (this.session != null) {
if (this.delegateSession != null) {
ByteBuffer buffer = ByteBuffer.wrap(message, offset, length);
WebSocketMessage webSocketMessage = toMessage(Type.BINARY, buffer);
session.handleMessage(webSocketMessage.getType(), webSocketMessage);
delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage);
}
}
@OnWebSocketFrame
public void onWebSocketFrame(Frame frame) {
if (this.session != null) {
if (this.delegateSession != null) {
if (OpCode.PONG == frame.getOpCode()) {
ByteBuffer buffer = (frame.getPayload() != null ? frame.getPayload() : EMPTY_PAYLOAD);
WebSocketMessage webSocketMessage = toMessage(Type.PONG, buffer);
session.handleMessage(webSocketMessage.getType(), webSocketMessage);
delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage);
}
}
}
@ -99,15 +100,15 @@ public class JettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport @@ -99,15 +100,15 @@ public class JettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport
private <T> WebSocketMessage toMessage(Type type, T message) {
if (Type.TEXT.equals(type)) {
byte[] bytes = ((String) message).getBytes(StandardCharsets.UTF_8);
DataBuffer buffer = getBufferFactory().wrap(bytes);
DataBuffer buffer = bufferFactory().wrap(bytes);
return new WebSocketMessage(Type.TEXT, buffer);
}
else if (Type.BINARY.equals(type)) {
DataBuffer buffer = getBufferFactory().wrap((ByteBuffer) message);
DataBuffer buffer = bufferFactory().wrap((ByteBuffer) message);
return new WebSocketMessage(Type.BINARY, buffer);
}
else if (Type.PONG.equals(type)) {
DataBuffer buffer = getBufferFactory().wrap((ByteBuffer) message);
DataBuffer buffer = bufferFactory().wrap((ByteBuffer) message);
return new WebSocketMessage(Type.PONG, buffer);
}
else {
@ -117,15 +118,15 @@ public class JettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport @@ -117,15 +118,15 @@ public class JettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport
@OnWebSocketClose
public void onWebSocketClose(int statusCode, String reason) {
if (this.session != null) {
this.session.handleClose(new CloseStatus(statusCode, reason));
if (this.delegateSession != null) {
this.delegateSession.handleClose(new CloseStatus(statusCode, reason));
}
}
@OnWebSocketError
public void onWebSocketError(Throwable cause) {
if (this.session != null) {
this.session.handleError(cause);
if (this.delegateSession != null) {
this.delegateSession.handleError(cause);
}
}
@ -144,15 +145,16 @@ public class JettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport @@ -144,15 +145,16 @@ public class JettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport
@Override
public void onError(Throwable ex) {
if (session != null) {
session.close(new CloseStatus(CloseStatus.SERVER_ERROR.getCode(), ex.getMessage()));
if (delegateSession != null) {
int code = CloseStatus.SERVER_ERROR.getCode();
delegateSession.close(new CloseStatus(code, ex.getMessage()));
}
}
@Override
public void onComplete() {
if (session != null) {
session.close();
if (delegateSession != null) {
delegateSession.close();
}
}
}

7
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java

@ -32,10 +32,11 @@ import org.springframework.web.reactive.socket.WebSocketMessage; @@ -32,10 +32,11 @@ import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
/**
* Spring {@link WebSocketSession} adapter for Jetty's
* {@link org.eclipse.jetty.websocket.api.Session}.
* Spring {@link WebSocketSession} implementation that adapts to a Jetty
* WebSocket {@link org.eclipse.jetty.websocket.api.Session}.
*
* @author Violeta Georgieva
* @author Rossen Stoyanchev
* @since 5.0
*/
public class JettyWebSocketSession extends AbstractListenerWebSocketSession<Session> {
@ -86,7 +87,7 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession<Sess @@ -86,7 +87,7 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession<Sess
}
@Override
protected Mono<Void> closeInternal(CloseStatus status) {
public Mono<Void> close(CloseStatus status) {
getDelegate().close(status.getCode(), status.getReason());
return Mono.empty();
}

6
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/NettyWebSocketSessionSupport.java

@ -36,12 +36,14 @@ import org.springframework.web.reactive.socket.WebSocketMessage; @@ -36,12 +36,14 @@ import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
/**
* Base class for Netty-based {@link WebSocketSession} adapters.
* Base class for Netty-based {@link WebSocketSession} adapters that provides
* convenience methods to convert Netty {@link WebSocketFrame}s to and from
* {@link WebSocketMessage}s.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public abstract class NettyWebSocketSessionSupport<T> extends WebSocketSessionSupport<T> {
public abstract class NettyWebSocketSessionSupport<T> extends AbstractWebSocketSession<T> {
private static final Map<Class<?>, WebSocketMessage.Type> MESSAGE_TYPES;

15
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java

@ -31,8 +31,8 @@ import org.springframework.web.reactive.socket.WebSocketSession; @@ -31,8 +31,8 @@ import org.springframework.web.reactive.socket.WebSocketSession;
/**
* Spring {@link WebSocketSession} adapter for RxNetty's
* {@link io.reactivex.netty.protocol.http.ws.WebSocketConnection}.
* Spring {@link WebSocketSession} implementation that adapts to Reactor Netty's
* WebSocket {@link NettyInbound} and {@link NettyOutbound}.
*
* @author Rossen Stoyanchev
* @since 5.0
@ -42,9 +42,9 @@ public class ReactorNettyWebSocketSession @@ -42,9 +42,9 @@ public class ReactorNettyWebSocketSession
public ReactorNettyWebSocketSession(NettyInbound inbound, NettyOutbound outbound,
HandshakeInfo handshakeInfo, NettyDataBufferFactory bufferFactory) {
HandshakeInfo info, NettyDataBufferFactory bufferFactory) {
super(new WebSocketConnection(inbound, outbound), handshakeInfo, bufferFactory);
super(new WebSocketConnection(inbound, outbound), info, bufferFactory);
}
@ -64,10 +64,11 @@ public class ReactorNettyWebSocketSession @@ -64,10 +64,11 @@ public class ReactorNettyWebSocketSession
}
@Override
protected Mono<Void> closeInternal(CloseStatus status) {
public Mono<Void> close(CloseStatus status) {
return Mono.error(new UnsupportedOperationException(
"Currently in Reactor Netty applications are expected to use the Cancellation" +
"returned from subscribing to the input Flux to close the WebSocket session."));
"Currently in Reactor Netty applications are expected to use the " +
"Cancellation returned from subscribing to the \"receive\"-side Flux " +
"in order to close the WebSocket session."));
}

8
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java

@ -31,7 +31,7 @@ import org.springframework.web.reactive.socket.WebSocketMessage; @@ -31,7 +31,7 @@ import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
/**
* Spring {@link WebSocketSession} adapter for RxNetty's
* Spring {@link WebSocketSession} implementation that adapts to the RxNetty
* {@link io.reactivex.netty.protocol.http.ws.WebSocketConnection}.
*
* @author Rossen Stoyanchev
@ -40,7 +40,9 @@ import org.springframework.web.reactive.socket.WebSocketSession; @@ -40,7 +40,9 @@ import org.springframework.web.reactive.socket.WebSocketSession;
public class RxNettyWebSocketSession extends NettyWebSocketSessionSupport<WebSocketConnection> {
public RxNettyWebSocketSession(WebSocketConnection conn, HandshakeInfo info, NettyDataBufferFactory factory) {
public RxNettyWebSocketSession(WebSocketConnection conn, HandshakeInfo info,
NettyDataBufferFactory factory) {
super(conn, info, factory);
}
@ -60,7 +62,7 @@ public class RxNettyWebSocketSession extends NettyWebSocketSessionSupport<WebSoc @@ -60,7 +62,7 @@ public class RxNettyWebSocketSession extends NettyWebSocketSessionSupport<WebSoc
}
@Override
protected Mono<Void> closeInternal(CloseStatus status) {
public Mono<Void> close(CloseStatus status) {
Observable<Void> completion = getDelegate().close();
return Mono.from(RxReactiveStreams.toPublisher(completion));
}

63
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java → spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketHandlerAdapter.java

@ -36,21 +36,22 @@ import org.springframework.web.reactive.socket.WebSocketMessage; @@ -36,21 +36,22 @@ import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketMessage.Type;
/**
* Tomcat {@code WebSocketHandler} implementation adapting and
* delegating to a Spring {@link WebSocketHandler}.
*
* Adapter for Java WebSocket API (JSR-356) {@link Endpoint} delegating events
* to a reactive {@link WebSocketHandler} and its session.
*
* @author Violeta Georgieva
* @author Rossen Stoyanchev
* @since 5.0
*/
public class TomcatWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport {
public class StandardWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport {
private TomcatWebSocketSession session;
private StandardWebSocketSession delegateSession;
public TomcatWebSocketHandlerAdapter(HandshakeInfo handshakeInfo, DataBufferFactory bufferFactory,
WebSocketHandler delegate) {
public StandardWebSocketHandlerAdapter(WebSocketHandler delegate, HandshakeInfo info,
DataBufferFactory bufferFactory) {
super(handshakeInfo, bufferFactory, delegate);
super(delegate, info, bufferFactory);
}
@ -58,47 +59,42 @@ public class TomcatWebSocketHandlerAdapter extends WebSocketHandlerAdapterSuppor @@ -58,47 +59,42 @@ public class TomcatWebSocketHandlerAdapter extends WebSocketHandlerAdapterSuppor
return new StandardEndpoint();
}
private TomcatWebSocketSession getSession() {
return this.session;
}
private class StandardEndpoint extends Endpoint {
@Override
public void onOpen(Session session, EndpointConfig config) {
public void onOpen(Session nativeSession, EndpointConfig config) {
TomcatWebSocketHandlerAdapter.this.session = new TomcatWebSocketSession(
session, getHandshakeInfo(), getBufferFactory());
delegateSession = new StandardWebSocketSession(nativeSession, getHandshakeInfo(), bufferFactory());
session.addMessageHandler(String.class, message -> {
nativeSession.addMessageHandler(String.class, message -> {
WebSocketMessage webSocketMessage = toMessage(message);
getSession().handleMessage(webSocketMessage.getType(), webSocketMessage);
delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage);
});
session.addMessageHandler(ByteBuffer.class, message -> {
nativeSession.addMessageHandler(ByteBuffer.class, message -> {
WebSocketMessage webSocketMessage = toMessage(message);
getSession().handleMessage(webSocketMessage.getType(), webSocketMessage);
delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage);
});
session.addMessageHandler(PongMessage.class, message -> {
nativeSession.addMessageHandler(PongMessage.class, message -> {
WebSocketMessage webSocketMessage = toMessage(message);
getSession().handleMessage(webSocketMessage.getType(), webSocketMessage);
delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage);
});
HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber();
getDelegate().handle(TomcatWebSocketHandlerAdapter.this.session).subscribe(resultSubscriber);
getDelegate().handle(delegateSession).subscribe(resultSubscriber);
}
private <T> WebSocketMessage toMessage(T message) {
if (message instanceof String) {
byte[] bytes = ((String) message).getBytes(StandardCharsets.UTF_8);
return new WebSocketMessage(Type.TEXT, getBufferFactory().wrap(bytes));
return new WebSocketMessage(Type.TEXT, bufferFactory().wrap(bytes));
}
else if (message instanceof ByteBuffer) {
DataBuffer buffer = getBufferFactory().wrap((ByteBuffer) message);
DataBuffer buffer = bufferFactory().wrap((ByteBuffer) message);
return new WebSocketMessage(Type.BINARY, buffer);
}
else if (message instanceof PongMessage) {
DataBuffer buffer = getBufferFactory().wrap(((PongMessage) message).getApplicationData());
DataBuffer buffer = bufferFactory().wrap(((PongMessage) message).getApplicationData());
return new WebSocketMessage(Type.PONG, buffer);
}
else {
@ -108,16 +104,16 @@ public class TomcatWebSocketHandlerAdapter extends WebSocketHandlerAdapterSuppor @@ -108,16 +104,16 @@ public class TomcatWebSocketHandlerAdapter extends WebSocketHandlerAdapterSuppor
@Override
public void onClose(Session session, CloseReason reason) {
if (getSession() != null) {
if (delegateSession != null) {
int code = reason.getCloseCode().getCode();
getSession().handleClose(new CloseStatus(code, reason.getReasonPhrase()));
delegateSession.handleClose(new CloseStatus(code, reason.getReasonPhrase()));
}
}
@Override
public void onError(Session session, Throwable exception) {
if (getSession() != null) {
getSession().handleError(exception);
if (delegateSession != null) {
delegateSession.handleError(exception);
}
}
}
@ -136,15 +132,16 @@ public class TomcatWebSocketHandlerAdapter extends WebSocketHandlerAdapterSuppor @@ -136,15 +132,16 @@ public class TomcatWebSocketHandlerAdapter extends WebSocketHandlerAdapterSuppor
@Override
public void onError(Throwable ex) {
if (getSession() != null) {
getSession().close(new CloseStatus(CloseStatus.SERVER_ERROR.getCode(), ex.getMessage()));
if (delegateSession != null) {
int code = CloseStatus.SERVER_ERROR.getCode();
delegateSession.close(new CloseStatus(code, ex.getMessage()));
}
}
@Override
public void onComplete() {
if (getSession() != null) {
getSession().close();
if (delegateSession != null) {
delegateSession.close();
}
}
}

7
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java → spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketSession.java

@ -38,12 +38,13 @@ import org.springframework.web.reactive.socket.WebSocketSession; @@ -38,12 +38,13 @@ import org.springframework.web.reactive.socket.WebSocketSession;
* {@link javax.websocket.Session}.
*
* @author Violeta Georgieva
* @author Rossen Stoyanchev
* @since 5.0
*/
public class TomcatWebSocketSession extends AbstractListenerWebSocketSession<Session> {
public class StandardWebSocketSession extends AbstractListenerWebSocketSession<Session> {
public TomcatWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory bufferFactory) {
public StandardWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory bufferFactory) {
super(session, session.getId(), info, bufferFactory);
}
@ -88,7 +89,7 @@ public class TomcatWebSocketSession extends AbstractListenerWebSocketSession<Ses @@ -88,7 +89,7 @@ public class TomcatWebSocketSession extends AbstractListenerWebSocketSession<Ses
}
@Override
protected Mono<Void> closeInternal(CloseStatus status) {
public Mono<Void> close(CloseStatus status) {
try {
CloseReason.CloseCode code = CloseCodes.getCloseCode(status.getCode());
getDelegate().close(new CloseReason(code, status.getReason()));

38
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java

@ -38,33 +38,34 @@ import org.springframework.web.reactive.socket.WebSocketMessage; @@ -38,33 +38,34 @@ import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketMessage.Type;
/**
* Undertow {@code WebSocketHandler} implementation adapting and
* delegating to a Spring {@link WebSocketHandler}.
* Undertow {@link WebSocketConnectionCallback} implementation that adapts and
* delegates to a Spring {@link WebSocketHandler}.
*
* @author Violeta Georgieva
* @author Rossen Stoyanchev
* @since 5.0
*/
public class UndertowWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport
implements WebSocketConnectionCallback {
private UndertowWebSocketSession session;
private UndertowWebSocketSession delegateSession;
public UndertowWebSocketHandlerAdapter(HandshakeInfo handshakeInfo, DataBufferFactory bufferFactory,
WebSocketHandler delegate) {
public UndertowWebSocketHandlerAdapter(WebSocketHandler delegate, HandshakeInfo info,
DataBufferFactory bufferFactory) {
super(handshakeInfo, bufferFactory, delegate);
super(delegate, info, bufferFactory);
}
@Override
public void onConnect(WebSocketHttpExchange exchange, WebSocketChannel channel) {
this.session = new UndertowWebSocketSession(channel, getHandshakeInfo(), getBufferFactory());
this.delegateSession = new UndertowWebSocketSession(channel, getHandshakeInfo(), bufferFactory());
channel.getReceiveSetter().set(new UndertowReceiveListener());
channel.resumeReceives();
HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber();
getDelegate().handle(this.session).subscribe(resultSubscriber);
getDelegate().handle(this.delegateSession).subscribe(resultSubscriber);
}
@ -72,44 +73,44 @@ public class UndertowWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupp @@ -72,44 +73,44 @@ public class UndertowWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupp
@Override
protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage message) {
session.handleMessage(Type.TEXT, toMessage(Type.TEXT, message.getData()));
delegateSession.handleMessage(Type.TEXT, toMessage(Type.TEXT, message.getData()));
}
@Override
protected void onFullBinaryMessage(WebSocketChannel channel, BufferedBinaryMessage message) {
session.handleMessage(Type.BINARY, toMessage(Type.BINARY, message.getData().getResource()));
delegateSession.handleMessage(Type.BINARY, toMessage(Type.BINARY, message.getData().getResource()));
message.getData().free();
}
@Override
protected void onFullPongMessage(WebSocketChannel channel, BufferedBinaryMessage message) {
session.handleMessage(Type.PONG, toMessage(Type.PONG, message.getData().getResource()));
delegateSession.handleMessage(Type.PONG, toMessage(Type.PONG, message.getData().getResource()));
message.getData().free();
}
@Override
protected void onFullCloseMessage(WebSocketChannel channel, BufferedBinaryMessage message) {
CloseMessage closeMessage = new CloseMessage(message.getData().getResource());
session.handleClose(new CloseStatus(closeMessage.getCode(), closeMessage.getReason()));
delegateSession.handleClose(new CloseStatus(closeMessage.getCode(), closeMessage.getReason()));
message.getData().free();
}
@Override
protected void onError(WebSocketChannel channel, Throwable error) {
session.handleError(error);
delegateSession.handleError(error);
}
private <T> WebSocketMessage toMessage(Type type, T message) {
if (Type.TEXT.equals(type)) {
byte[] bytes = ((String) message).getBytes(StandardCharsets.UTF_8);
return new WebSocketMessage(Type.TEXT, getBufferFactory().wrap(bytes));
return new WebSocketMessage(Type.TEXT, bufferFactory().wrap(bytes));
}
else if (Type.BINARY.equals(type)) {
DataBuffer buffer = getBufferFactory().allocateBuffer().write((ByteBuffer[]) message);
DataBuffer buffer = bufferFactory().allocateBuffer().write((ByteBuffer[]) message);
return new WebSocketMessage(Type.BINARY, buffer);
}
else if (Type.PONG.equals(type)) {
DataBuffer buffer = getBufferFactory().allocateBuffer().write((ByteBuffer[]) message);
DataBuffer buffer = bufferFactory().allocateBuffer().write((ByteBuffer[]) message);
return new WebSocketMessage(Type.PONG, buffer);
}
else {
@ -133,12 +134,13 @@ public class UndertowWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupp @@ -133,12 +134,13 @@ public class UndertowWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupp
@Override
public void onError(Throwable ex) {
session.close(new CloseStatus(CloseStatus.SERVER_ERROR.getCode(), ex.getMessage()));
int code = CloseStatus.SERVER_ERROR.getCode();
delegateSession.close(new CloseStatus(code, ex.getMessage()));
}
@Override
public void onComplete() {
session.close();
delegateSession.close();
}
}

5
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java

@ -34,10 +34,11 @@ import org.springframework.web.reactive.socket.WebSocketMessage; @@ -34,10 +34,11 @@ import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
/**
* Spring {@link WebSocketSession} adapter for Undertow's
* Spring {@link WebSocketSession} implementation that adapts to an Undertow
* {@link io.undertow.websockets.core.WebSocketChannel}.
*
* @author Violeta Georgieva
* @author Rossen Stoyanchev
* @since 5.0
*/
public class UndertowWebSocketSession extends AbstractListenerWebSocketSession<WebSocketChannel> {
@ -90,7 +91,7 @@ public class UndertowWebSocketSession extends AbstractListenerWebSocketSession<W @@ -90,7 +91,7 @@ public class UndertowWebSocketSession extends AbstractListenerWebSocketSession<W
}
@Override
protected Mono<Void> closeInternal(CloseStatus status) {
public Mono<Void> close(CloseStatus status) {
CloseMessage cm = new CloseMessage(status.getCode(), status.getReason());
if (!getDelegate().isCloseFrameSent()) {
WebSockets.sendClose(cm, getDelegate(), null);

31
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketHandlerAdapterSupport.java

@ -21,44 +21,45 @@ import org.springframework.web.reactive.socket.HandshakeInfo; @@ -21,44 +21,45 @@ import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketHandler;
/**
* Base class for {@link WebSocketHandler} adapters to WebSocket handler APIs
* of underlying runtimes.
* Base class for adapters from event-listener WebSocket APIs (e.g. Java
* WebSocket API JSR-356, Jetty, Undertow) to the Reactive Streams based
* {@link WebSocketHandler}.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public abstract class WebSocketHandlerAdapterSupport {
private final HandshakeInfo handshakeInfo;
private final WebSocketHandler delegate;
private final HandshakeInfo handshakeInfo;
private final DataBufferFactory bufferFactory;
protected WebSocketHandlerAdapterSupport(HandshakeInfo handshakeInfo, DataBufferFactory bufferFactory,
WebSocketHandler handler) {
protected WebSocketHandlerAdapterSupport(WebSocketHandler delegate, HandshakeInfo info,
DataBufferFactory bufferFactory) {
Assert.notNull(handshakeInfo, "HandshakeInfo is required.");
Assert.notNull(delegate, "WebSocketHandler delegate is required");
Assert.notNull(info, "HandshakeInfo is required.");
Assert.notNull(bufferFactory, "DataBufferFactory is required");
Assert.notNull(handler, "WebSocketHandler handler is required");
this.handshakeInfo = handshakeInfo;
this.delegate = delegate;
this.handshakeInfo = info;
this.bufferFactory = bufferFactory;
this.delegate = handler;
}
protected HandshakeInfo getHandshakeInfo() {
return this.handshakeInfo;
}
protected WebSocketHandler getDelegate() {
return this.delegate;
}
protected HandshakeInfo getHandshakeInfo() {
return this.handshakeInfo;
}
@SuppressWarnings("unchecked")
protected <T extends DataBufferFactory> T getBufferFactory() {
protected <T extends DataBufferFactory> T bufferFactory() {
return (T) this.bufferFactory;
}

2
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java

@ -34,7 +34,7 @@ import org.springframework.web.reactive.socket.WebSocketSession; @@ -34,7 +34,7 @@ import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSession;
/**
* A {@link WebSocketClient} based on Reactor Netty.
* A Reactor Netty based implementation of {@link WebSocketClient}.
*
* @author Rossen Stoyanchev
* @since 5.0

2
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/RxNettyWebSocketClient.java

@ -39,7 +39,7 @@ import org.springframework.web.reactive.socket.WebSocketSession; @@ -39,7 +39,7 @@ import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.adapter.RxNettyWebSocketSession;
/**
* A {@link WebSocketClient} based on RxNetty.
* An RxNetty based implementation of {@link WebSocketClient}.
*
* @author Rossen Stoyanchev
* @since 5.0

4
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/WebSocketClient.java

@ -36,7 +36,7 @@ public interface WebSocketClient { @@ -36,7 +36,7 @@ public interface WebSocketClient {
* @param url the handshake url
* @param handler the handler of the WebSocket session
* @return completion {@code Mono<Void>} to indicate the outcome of the
* WebSocket session handling
* WebSocket session handling.
*/
Mono<Void> execute(URI url, WebSocketHandler handler);
@ -46,7 +46,7 @@ public interface WebSocketClient { @@ -46,7 +46,7 @@ public interface WebSocketClient {
* @param headers custom headers for the handshake request
* @param handler the handler of the WebSocket session
* @return completion {@code Mono<Void>} to indicate the outcome of the
* WebSocket session handling
* WebSocket session handling.
*/
Mono<Void> execute(URI url, HttpHeaders headers, WebSocketHandler handler);

2
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/package-info.java

@ -1,4 +1,4 @@ @@ -1,4 +1,4 @@
/**
* Abstractions and support classes for WebSocket interactions.
* Abstractions and support classes for reactive WebSocket interactions.
*/
package org.springframework.web.reactive.socket;

21
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/RequestUpgradeStrategy.java

@ -15,8 +15,6 @@ @@ -15,8 +15,6 @@
*/
package org.springframework.web.reactive.socket.server;
import java.util.Map;
import reactor.core.publisher.Mono;
import org.springframework.http.server.reactive.ServerHttpRequest;
@ -25,14 +23,13 @@ import org.springframework.web.reactive.socket.WebSocketHandler; @@ -25,14 +23,13 @@ import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.server.ServerWebExchange;
/**
* A strategy for upgrading an HTTP request to a WebSocket interaction depending
* on the underlying HTTP runtime.
* A strategy for upgrading an HTTP request to a WebSocket session depending
* on the underlying network runtime.
*
* <p>Typically there is one such strategy for every {@link ServerHttpRequest}
* and {@link ServerHttpResponse} implementation type except in the case of
* Servlet containers for which there is no standard API to upgrade a request.
* JSR-356 does have programmatic endpoint registration but that is only
* intended for use on startup and not per request.
* and {@link ServerHttpResponse} type except in the case of Servlet containers
* for which the standard Java WebSocket API JSR-356 does not define a way to
* upgrade a request so a custom strategy is needed for every Servlet container.
*
* @author Rossen Stoyanchev
* @since 5.0
@ -40,11 +37,11 @@ import org.springframework.web.server.ServerWebExchange; @@ -40,11 +37,11 @@ import org.springframework.web.server.ServerWebExchange;
public interface RequestUpgradeStrategy {
/**
* Upgrade the request to a WebSocket interaction and adapt the given
* Spring {@link WebSocketHandler} to the underlying runtime WebSocket API.
* Upgrade to a WebSocket session and handle it with the given handler.
* @param exchange the current exchange
* @param webSocketHandler handler for WebSocket session
* @return a completion Mono for the WebSocket session handling
* @param webSocketHandler handler for the WebSocket session
* @return completion {@code Mono<Void>} to indicate the outcome of the
* WebSocket session handling.
*/
Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler webSocketHandler);

7
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/WebSocketService.java

@ -18,19 +18,18 @@ package org.springframework.web.reactive.socket.server; @@ -18,19 +18,18 @@ package org.springframework.web.reactive.socket.server;
import reactor.core.publisher.Mono;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService;
import org.springframework.web.server.ServerWebExchange;
/**
* A service to delegate WebSocket-related HTTP requests to.
*
* <p>For a straight-up WebSocket endpoint this means handling the initial
* handshake request but for a SockJS endpoint this means handling all HTTP
* <p>For a WebSocket endpoint this means handling the initial WebSocket HTTP
* handshake request. For a SockJS endpoint it could mean handling all HTTP
* requests defined in the SockJS protocol.
*
* @author Rossen Stoyanchev
* @since 5.0
* @see HandshakeWebSocketService
* @see org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService;
*/
public interface WebSocketService {

7
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/support/HandshakeWebSocketService.java

@ -37,9 +37,10 @@ import org.springframework.web.server.MethodNotAllowedException; @@ -37,9 +37,10 @@ import org.springframework.web.server.MethodNotAllowedException;
import org.springframework.web.server.ServerWebExchange;
/**
* A {@code WebSocketService} implementation that handles a WebSocket handshake
* and upgrades to a WebSocket interaction through the configured or
* auto-detected {@link RequestUpgradeStrategy}.
* {@code WebSocketService} implementation that handles a WebSocket HTTP
* handshake request by delegating to a {@link RequestUpgradeStrategy} which
* is either auto-detected (no-arg constructor) from the classpath but can
* also be explicitly configured.
*
* @author Rossen Stoyanchev
* @since 5.0

10
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/support/WebSocketHandlerAdapter.java

@ -26,9 +26,10 @@ import org.springframework.web.reactive.socket.server.WebSocketService; @@ -26,9 +26,10 @@ import org.springframework.web.reactive.socket.server.WebSocketService;
import org.springframework.web.server.ServerWebExchange;
/**
* {@code HandlerAdapter} that allows using a {@link WebSocketHandler} contract
* with the generic {@link DispatcherHandler} mapping URLs directly to such
* handlers. Requests are handled through the configured {@link WebSocketService}.
* {@link HandlerAdapter} that allows using a {@link WebSocketHandler} with the
* generic {@link DispatcherHandler} mapping URLs directly to such handlers.
* Requests are handled by delegating to the configured {@link WebSocketService}
* which by default is {@link HandshakeWebSocketService}.
*
* @author Rossen Stoyanchev
* @since 5.0
@ -40,8 +41,7 @@ public class WebSocketHandlerAdapter implements HandlerAdapter { @@ -40,8 +41,7 @@ public class WebSocketHandlerAdapter implements HandlerAdapter {
/**
* Default constructor that creates and uses a
* {@link HandshakeWebSocketService} for a straight-up WebSocket interaction,
* i.e. treating incoming requests as WebSocket handshake requests.
* {@link HandshakeWebSocketService}.
*/
public WebSocketHandlerAdapter() {
this(new HandshakeWebSocketService());

32
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ServerEndpointRegistration.java → spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/DefaultServerEndpointConfig.java

@ -30,38 +30,34 @@ import javax.websocket.server.ServerEndpointConfig; @@ -30,38 +30,34 @@ import javax.websocket.server.ServerEndpointConfig;
import org.springframework.util.Assert;
/**
* An implementation of {@link javax.websocket.server.ServerEndpointConfig} for use in
* Spring applications.
*
* <p>Class constructor accept a singleton {@link javax.websocket.Endpoint} instance.
*
* <p>This class also extends
* {@link javax.websocket.server.ServerEndpointConfig.Configurator} to make it easier to
* override methods for customizing the handshake process.
* Default implementation of {@link javax.websocket.server.ServerEndpointConfig}
* for use in {@code RequestUpgradeStrategy} implementations.
*
* @author Violeta Georgieva
* @author Rossen Stoyanchev
* @since 5.0
*/
public class ServerEndpointRegistration extends ServerEndpointConfig.Configurator
class DefaultServerEndpointConfig extends ServerEndpointConfig.Configurator
implements ServerEndpointConfig {
private final String path;
private final Endpoint endpoint;
/**
* Create a new {@link ServerEndpointRegistration} instance from an
* {@code javax.websocket.Endpoint} instance.
* Constructor with a path and an {@code javax.websocket.Endpoint}.
* @param path the endpoint path
* @param endpoint the endpoint instance
*/
public ServerEndpointRegistration(String path, Endpoint endpoint) {
public DefaultServerEndpointConfig(String path, Endpoint endpoint) {
Assert.hasText(path, "path must not be empty");
Assert.notNull(endpoint, "endpoint must not be null");
this.path = path;
this.endpoint = endpoint;
}
@Override
public List<Class<? extends Encoder>> getEncoders() {
return new ArrayList<>();
@ -82,10 +78,6 @@ public class ServerEndpointRegistration extends ServerEndpointConfig.Configurato @@ -82,10 +78,6 @@ public class ServerEndpointRegistration extends ServerEndpointConfig.Configurato
return this.endpoint.getClass();
}
public Endpoint getEndpoint() {
return this.endpoint;
}
@Override
public String getPath() {
return this.path;
@ -108,13 +100,13 @@ public class ServerEndpointRegistration extends ServerEndpointConfig.Configurato @@ -108,13 +100,13 @@ public class ServerEndpointRegistration extends ServerEndpointConfig.Configurato
@SuppressWarnings("unchecked")
@Override
public <T> T getEndpointInstance(Class<T> endpointClass)
throws InstantiationException {
return (T) getEndpoint();
public <T> T getEndpointInstance(Class<T> endpointClass) throws InstantiationException {
return (T) this.endpoint;
}
@Override
public String toString() {
return "ServerEndpointRegistration for path '" + getPath() + "': " + getEndpointClass();
return "DefaultServerEndpointConfig for path '" + getPath() + "': " + getEndpointClass();
}
}

20
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyRequestUpgradeStrategy.java

@ -17,8 +17,6 @@ @@ -17,8 +17,6 @@
package org.springframework.web.reactive.socket.server.upgrade;
import java.io.IOException;
import java.net.URI;
import java.security.Principal;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -30,14 +28,13 @@ import reactor.core.publisher.Mono; @@ -30,14 +28,13 @@ import reactor.core.publisher.Mono;
import org.springframework.context.Lifecycle;
import org.springframework.core.NamedThreadLocal;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServletServerHttpRequest;
import org.springframework.http.server.reactive.ServletServerHttpResponse;
import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.adapter.JettyWebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
import org.springframework.web.server.ServerWebExchange;
@ -111,13 +108,9 @@ public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy, Life @@ -111,13 +108,9 @@ public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy, Life
HttpServletRequest servletRequest = getHttpServletRequest(request);
HttpServletResponse servletResponse = getHttpServletResponse(response);
URI uri = request.getURI();
HttpHeaders headers = request.getHeaders();
Mono<Principal> principal = exchange.getPrincipal();
HandshakeInfo info = new HandshakeInfo(uri, headers, principal);
DataBufferFactory bufferFactory = response.bufferFactory();
JettyWebSocketHandlerAdapter adapter = new JettyWebSocketHandlerAdapter(info, bufferFactory, handler);
HandshakeInfo info = getHandshakeInfo(exchange);
DataBufferFactory factory = response.bufferFactory();
JettyWebSocketHandlerAdapter adapter = new JettyWebSocketHandlerAdapter(handler, info, factory);
startLazily(servletRequest);
@ -148,6 +141,11 @@ public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy, Life @@ -148,6 +141,11 @@ public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy, Life
return ((ServletServerHttpResponse) response).getServletResponse();
}
private HandshakeInfo getHandshakeInfo(ServerWebExchange exchange) {
ServerHttpRequest request = exchange.getRequest();
return new HandshakeInfo(request.getURI(), request.getHeaders(), exchange.getPrincipal());
}
private void startLazily(HttpServletRequest request) {
if (this.servletContext != null) {
return;

19
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java

@ -15,19 +15,16 @@ @@ -15,19 +15,16 @@
*/
package org.springframework.web.reactive.socket.server.upgrade;
import java.net.URI;
import java.security.Principal;
import java.util.List;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.server.reactive.ReactorServerHttpRequest;
import org.springframework.http.server.reactive.ReactorServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.util.StringUtils;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSession;
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
import org.springframework.web.server.ServerWebExchange;
@ -43,13 +40,8 @@ public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrateg @@ -43,13 +40,8 @@ public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrateg
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler) {
ReactorServerHttpRequest request = (ReactorServerHttpRequest) exchange.getRequest();
ReactorServerHttpResponse response = (ReactorServerHttpResponse) exchange.getResponse();
URI uri = request.getURI();
HttpHeaders headers = request.getHeaders();
Mono<Principal> principal = exchange.getPrincipal();
HandshakeInfo handshakeInfo = new HandshakeInfo(uri, headers, principal);
HandshakeInfo handshakeInfo = getHandshakeInfo(exchange);
NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory();
String protocols = StringUtils.arrayToCommaDelimitedString(getSubProtocols(handler));
@ -60,6 +52,11 @@ public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrateg @@ -60,6 +52,11 @@ public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrateg
new ReactorNettyWebSocketSession(inbound, outbound, handshakeInfo, bufferFactory)));
}
private HandshakeInfo getHandshakeInfo(ServerWebExchange exchange) {
ServerHttpRequest request = exchange.getRequest();
return new HandshakeInfo(request.getURI(), request.getHeaders(), exchange.getPrincipal());
}
private static String[] getSubProtocols(WebSocketHandler webSocketHandler) {
List<String> subProtocols = webSocketHandler.getSubProtocols();
return subProtocols.toArray(new String[subProtocols.size()]);

19
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/RxNettyRequestUpgradeStrategy.java

@ -15,8 +15,6 @@ @@ -15,8 +15,6 @@
*/
package org.springframework.web.reactive.socket.server.upgrade;
import java.net.URI;
import java.security.Principal;
import java.util.List;
import reactor.core.publisher.Mono;
@ -24,12 +22,11 @@ import rx.Observable; @@ -24,12 +22,11 @@ import rx.Observable;
import rx.RxReactiveStreams;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.server.reactive.RxNettyServerHttpRequest;
import org.springframework.http.server.reactive.RxNettyServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.adapter.RxNettyWebSocketSession;
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
import org.springframework.web.server.ServerWebExchange;
@ -46,13 +43,8 @@ public class RxNettyRequestUpgradeStrategy implements RequestUpgradeStrategy { @@ -46,13 +43,8 @@ public class RxNettyRequestUpgradeStrategy implements RequestUpgradeStrategy {
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler) {
RxNettyServerHttpRequest request = (RxNettyServerHttpRequest) exchange.getRequest();
RxNettyServerHttpResponse response = (RxNettyServerHttpResponse) exchange.getResponse();
URI uri = request.getURI();
HttpHeaders headers = request.getHeaders();
Mono<Principal> principal = exchange.getPrincipal();
HandshakeInfo handshakeInfo = new HandshakeInfo(uri, headers, principal);
HandshakeInfo handshakeInfo = getHandshakeInfo(exchange);
NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory();
Observable<Void> completion = response.getRxNettyResponse()
@ -65,6 +57,11 @@ public class RxNettyRequestUpgradeStrategy implements RequestUpgradeStrategy { @@ -65,6 +57,11 @@ public class RxNettyRequestUpgradeStrategy implements RequestUpgradeStrategy {
return Mono.from(RxReactiveStreams.toPublisher(completion));
}
private HandshakeInfo getHandshakeInfo(ServerWebExchange exchange) {
ServerHttpRequest request = exchange.getRequest();
return new HandshakeInfo(request.getURI(), request.getHeaders(), exchange.getPrincipal());
}
private static String[] getSubProtocols(WebSocketHandler webSocketHandler) {
List<String> subProtocols = webSocketHandler.getSubProtocols();
return subProtocols.toArray(new String[subProtocols.size()]);

22
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/TomcatRequestUpgradeStrategy.java

@ -17,8 +17,6 @@ @@ -17,8 +17,6 @@
package org.springframework.web.reactive.socket.server.upgrade;
import java.io.IOException;
import java.net.URI;
import java.security.Principal;
import java.util.Collections;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
@ -31,7 +29,6 @@ import org.apache.tomcat.websocket.server.WsServerContainer; @@ -31,7 +29,6 @@ import org.apache.tomcat.websocket.server.WsServerContainer;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServletServerHttpRequest;
@ -39,7 +36,7 @@ import org.springframework.http.server.reactive.ServletServerHttpResponse; @@ -39,7 +36,7 @@ import org.springframework.http.server.reactive.ServletServerHttpResponse;
import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.adapter.TomcatWebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.adapter.StandardWebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
import org.springframework.web.server.ServerWebExchange;
@ -63,15 +60,13 @@ public class TomcatRequestUpgradeStrategy implements RequestUpgradeStrategy { @@ -63,15 +60,13 @@ public class TomcatRequestUpgradeStrategy implements RequestUpgradeStrategy {
HttpServletRequest servletRequest = getHttpServletRequest(request);
HttpServletResponse servletResponse = getHttpServletResponse(response);
URI uri = request.getURI();
HttpHeaders headers = request.getHeaders();
Mono<Principal> principal = exchange.getPrincipal();
HandshakeInfo info = new HandshakeInfo(uri, headers, principal);
DataBufferFactory bufferFactory = response.bufferFactory();
Endpoint endpoint = new TomcatWebSocketHandlerAdapter(info, bufferFactory, handler).getEndpoint();
HandshakeInfo info = getHandshakeInfo(exchange);
DataBufferFactory factory = response.bufferFactory();
Endpoint endpoint = new StandardWebSocketHandlerAdapter(handler, info, factory).getEndpoint();
String requestURI = servletRequest.getRequestURI();
ServerEndpointConfig config = new ServerEndpointRegistration(requestURI, endpoint);
ServerEndpointConfig config = new DefaultServerEndpointConfig(requestURI, endpoint);
try {
WsServerContainer container = getContainer(servletRequest);
container.doUpgrade(servletRequest, servletResponse, config, Collections.emptyMap());
@ -93,6 +88,11 @@ public class TomcatRequestUpgradeStrategy implements RequestUpgradeStrategy { @@ -93,6 +88,11 @@ public class TomcatRequestUpgradeStrategy implements RequestUpgradeStrategy {
return ((ServletServerHttpResponse) response).getServletResponse();
}
private HandshakeInfo getHandshakeInfo(ServerWebExchange exchange) {
ServerHttpRequest request = exchange.getRequest();
return new HandshakeInfo(request.getURI(), request.getHeaders(), exchange.getPrincipal());
}
private WsServerContainer getContainer(HttpServletRequest request) {
ServletContext servletContext = request.getServletContext();
Object container = servletContext.getAttribute(SERVER_CONTAINER_ATTR);

30
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/UndertowRequestUpgradeStrategy.java

@ -16,26 +16,22 @@ @@ -16,26 +16,22 @@
package org.springframework.web.reactive.socket.server.upgrade;
import java.net.URI;
import java.security.Principal;
import io.undertow.server.HttpServerExchange;
import io.undertow.websockets.WebSocketConnectionCallback;
import io.undertow.websockets.WebSocketProtocolHandshakeHandler;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.UndertowServerHttpRequest;
import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.adapter.UndertowWebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
import org.springframework.web.server.ServerWebExchange;
import io.undertow.server.HttpServerExchange;
import io.undertow.websockets.WebSocketConnectionCallback;
import io.undertow.websockets.WebSocketProtocolHandshakeHandler;
import reactor.core.publisher.Mono;
/**
* A {@link RequestUpgradeStrategy} for use with Undertow.
*
@ -51,18 +47,15 @@ public class UndertowRequestUpgradeStrategy implements RequestUpgradeStrategy { @@ -51,18 +47,15 @@ public class UndertowRequestUpgradeStrategy implements RequestUpgradeStrategy {
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
URI uri = request.getURI();
HttpHeaders headers = request.getHeaders();
Mono<Principal> principal = exchange.getPrincipal();
HandshakeInfo info = new HandshakeInfo(uri, headers, principal);
HandshakeInfo info = getHandshakeInfo(exchange);
DataBufferFactory bufferFactory = response.bufferFactory();
WebSocketConnectionCallback callback =
new UndertowWebSocketHandlerAdapter(info, bufferFactory, handler);
Assert.isTrue(request instanceof UndertowServerHttpRequest);
HttpServerExchange httpExchange = ((UndertowServerHttpRequest) request).getUndertowExchange();
WebSocketConnectionCallback callback =
new UndertowWebSocketHandlerAdapter(handler, info, bufferFactory);
try {
new WebSocketProtocolHandshakeHandler(callback).handleRequest(httpExchange);
}
@ -73,4 +66,9 @@ public class UndertowRequestUpgradeStrategy implements RequestUpgradeStrategy { @@ -73,4 +66,9 @@ public class UndertowRequestUpgradeStrategy implements RequestUpgradeStrategy {
return Mono.empty();
}
private HandshakeInfo getHandshakeInfo(ServerWebExchange exchange) {
ServerHttpRequest request = exchange.getRequest();
return new HandshakeInfo(request.getURI(), request.getHeaders(), exchange.getPrincipal());
}
}

2
spring-websocket/src/main/java/org/springframework/web/socket/server/standard/ServerEndpointRegistration.java

@ -48,7 +48,7 @@ import org.springframework.web.socket.handler.BeanCreatingHandlerProvider; @@ -48,7 +48,7 @@ import org.springframework.web.socket.handler.BeanCreatingHandlerProvider;
* override methods for customizing the handshake process.
*
* @author Rossen Stoyanchev
* @since 4.0
* @since 5.0
* @see ServerEndpointExporter
*/
public class ServerEndpointRegistration extends ServerEndpointConfig.Configurator

Loading…
Cancel
Save