Browse Source

Expose WebSocket options for Tomcat/Jetty

pull/1592/merge
Rossen Stoyanchev 8 years ago
parent
commit
5e86049438
  1. 35
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyRequestUpgradeStrategy.java
  2. 104
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/TomcatRequestUpgradeStrategy.java

35
spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyRequestUpgradeStrategy.java

@ -22,6 +22,7 @@ import javax.servlet.ServletContext; @@ -22,6 +22,7 @@ import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.server.WebSocketServerFactory;
import reactor.core.publisher.Mono;
@ -41,6 +42,7 @@ import org.springframework.web.reactive.socket.adapter.JettyWebSocketSession; @@ -41,6 +42,7 @@ import org.springframework.web.reactive.socket.adapter.JettyWebSocketSession;
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
import org.springframework.web.server.ServerWebExchange;
/**
* A {@link RequestUpgradeStrategy} for use with Jetty.
*
@ -54,6 +56,9 @@ public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy, Life @@ -54,6 +56,9 @@ public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy, Life
new NamedThreadLocal<>("JettyWebSocketHandlerAdapter");
@Nullable
private WebSocketPolicy webSocketPolicy;
@Nullable
private WebSocketServerFactory factory;
@ -65,6 +70,24 @@ public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy, Life @@ -65,6 +70,24 @@ public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy, Life
private final Object lifecycleMonitor = new Object();
/**
* Configure a {@link WebSocketPolicy} to use to initialize
* {@link WebSocketServerFactory}.
* @param webSocketPolicy the WebSocket settings
*/
public void setWebSocketPolicy(WebSocketPolicy webSocketPolicy) {
this.webSocketPolicy = webSocketPolicy;
}
/**
* Return the configured {@link WebSocketPolicy}, if any.
*/
@Nullable
public WebSocketPolicy getWebSocketPolicy() {
return webSocketPolicy;
}
@Override
public void start() {
synchronized (this.lifecycleMonitor) {
@ -72,7 +95,9 @@ public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy, Life @@ -72,7 +95,9 @@ public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy, Life
if (!isRunning() && servletContext != null) {
this.running = true;
try {
this.factory = new WebSocketServerFactory(servletContext);
this.factory = this.webSocketPolicy != null ?
new WebSocketServerFactory(servletContext, this.webSocketPolicy) :
new WebSocketServerFactory(servletContext);
this.factory.setCreator((request, response) -> {
WebSocketHandlerContainer container = adapterHolder.get();
String protocol = container.getProtocol();
@ -114,7 +139,9 @@ public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy, Life @@ -114,7 +139,9 @@ public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy, Life
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler, @Nullable String subProtocol) {
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
@Nullable String subProtocol) {
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
@ -149,12 +176,12 @@ public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy, Life @@ -149,12 +176,12 @@ public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy, Life
}
private HttpServletRequest getHttpServletRequest(ServerHttpRequest request) {
Assert.isInstanceOf(AbstractServerHttpRequest.class, request, "ServletServerHttpRequest required");
Assert.isInstanceOf(AbstractServerHttpRequest.class, request);
return ((AbstractServerHttpRequest) request).getNativeRequest();
}
private HttpServletResponse getHttpServletResponse(ServerHttpResponse response) {
Assert.isInstanceOf(AbstractServerHttpResponse.class, response, "ServletServerHttpResponse required");
Assert.isInstanceOf(AbstractServerHttpResponse.class, response);
return ((AbstractServerHttpResponse) response).getNativeResponse();
}

104
spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/TomcatRequestUpgradeStrategy.java

@ -23,6 +23,7 @@ import javax.servlet.ServletException; @@ -23,6 +23,7 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.websocket.Endpoint;
import javax.websocket.server.ServerContainer;
import org.apache.tomcat.websocket.server.WsServerContainer;
import reactor.core.publisher.Mono;
@ -52,8 +53,79 @@ public class TomcatRequestUpgradeStrategy implements RequestUpgradeStrategy { @@ -52,8 +53,79 @@ public class TomcatRequestUpgradeStrategy implements RequestUpgradeStrategy {
private static final String SERVER_CONTAINER_ATTR = "javax.websocket.server.ServerContainer";
@Nullable
private Long asyncSendTimeout;
@Nullable
private Long maxSessionIdleTimeout;
@Nullable
private Integer maxTextMessageBufferSize;
@Nullable
private Integer maxBinaryMessageBufferSize;
@Nullable
private WsServerContainer serverContainer;
/**
* Exposes the underlying config option on
* {@link javax.websocket.server.ServerContainer#setAsyncSendTimeout(long)}.
*/
public void setAsyncSendTimeout(Long timeoutInMillis) {
this.asyncSendTimeout = timeoutInMillis;
}
@Nullable
public Long getAsyncSendTimeout() {
return this.asyncSendTimeout;
}
/**
* Exposes the underlying config option on
* {@link javax.websocket.server.ServerContainer#setDefaultMaxSessionIdleTimeout(long)}.
*/
public void setMaxSessionIdleTimeout(Long timeoutInMillis) {
this.maxSessionIdleTimeout = timeoutInMillis;
}
@Nullable
public Long getMaxSessionIdleTimeout() {
return this.maxSessionIdleTimeout;
}
/**
* Exposes the underlying config option on
* {@link javax.websocket.server.ServerContainer#setDefaultMaxTextMessageBufferSize(int)}.
*/
public void setMaxTextMessageBufferSize(Integer bufferSize) {
this.maxTextMessageBufferSize = bufferSize;
}
@Nullable
public Integer getMaxTextMessageBufferSize() {
return this.maxTextMessageBufferSize;
}
/**
* Exposes the underlying config option on
* {@link javax.websocket.server.ServerContainer#setDefaultMaxBinaryMessageBufferSize(int)}.
*/
public void setMaxBinaryMessageBufferSize(Integer bufferSize) {
this.maxBinaryMessageBufferSize = bufferSize;
}
@Nullable
public Integer getMaxBinaryMessageBufferSize() {
return this.maxBinaryMessageBufferSize;
}
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler, @Nullable String subProtocol){
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
@Nullable String subProtocol){
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
@ -69,7 +141,8 @@ public class TomcatRequestUpgradeStrategy implements RequestUpgradeStrategy { @@ -69,7 +141,8 @@ public class TomcatRequestUpgradeStrategy implements RequestUpgradeStrategy {
String requestURI = servletRequest.getRequestURI();
DefaultServerEndpointConfig config = new DefaultServerEndpointConfig(requestURI, endpoint);
config.setSubprotocols(subProtocol != null ? Collections.singletonList(subProtocol) : Collections.emptyList());
config.setSubprotocols(subProtocol != null ?
Collections.singletonList(subProtocol) : Collections.emptyList());
try {
WsServerContainer container = getContainer(servletRequest);
@ -99,10 +172,29 @@ public class TomcatRequestUpgradeStrategy implements RequestUpgradeStrategy { @@ -99,10 +172,29 @@ public class TomcatRequestUpgradeStrategy implements RequestUpgradeStrategy {
}
private WsServerContainer getContainer(HttpServletRequest request) {
Object container = request.getServletContext().getAttribute(SERVER_CONTAINER_ATTR);
Assert.state(container instanceof WsServerContainer,
"No 'javax.websocket.server.ServerContainer' ServletContext attribute in a Tomcat container");
return (WsServerContainer) container;
if (this.serverContainer == null) {
Object container = request.getServletContext().getAttribute(SERVER_CONTAINER_ATTR);
Assert.state(container instanceof WsServerContainer,
"ServletContext attribute 'javax.websocket.server.ServerContainer' not found.");
this.serverContainer = (WsServerContainer) container;
initServerContainer(this.serverContainer);
}
return this.serverContainer;
}
private void initServerContainer(ServerContainer serverContainer) {
if (this.asyncSendTimeout != null) {
serverContainer.setAsyncSendTimeout(this.asyncSendTimeout);
}
if (this.maxSessionIdleTimeout != null) {
serverContainer.setDefaultMaxSessionIdleTimeout(this.maxSessionIdleTimeout);
}
if (this.maxTextMessageBufferSize != null) {
serverContainer.setDefaultMaxTextMessageBufferSize(this.maxTextMessageBufferSize);
}
if (this.maxBinaryMessageBufferSize != null) {
serverContainer.setDefaultMaxBinaryMessageBufferSize(this.maxBinaryMessageBufferSize);
}
}
}

Loading…
Cancel
Save