Browse Source

Minor refactoring of suspend/resumeReceiving

suspend/resumeReceiving in the AbstractListenerWebSocketSession are
now abstract methods. In Tomcat/Jetty these methods are no-op
implementations that are then coupled with a buffering strategy via
Flux#onBackpressureBuffer. In Undertow they rely on flow control for
receiving WebSocket messages.

Issue: SPR-14527
pull/1256/merge
Rossen Stoyanchev 9 years ago
parent
commit
db5bc4a24e
  1. 57
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java
  2. 6
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java
  3. 22
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java
  4. 18
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java
  5. 25
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java
  6. 21
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java

57
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java

@ -36,11 +36,17 @@ import org.springframework.web.reactive.socket.WebSocketSession; @@ -36,11 +36,17 @@ import org.springframework.web.reactive.socket.WebSocketSession;
* Base class for Listener-based {@link WebSocketSession} adapters.
*
* @author Violeta Georgieva
* @author Rossen Stoyanchev
* @since 5.0
*/
public abstract class AbstractListenerWebSocketSession<T> extends WebSocketSessionSupport<T> {
private static final int BUFFER_SIZE = 8192;
/**
* The "back-pressure" buffer size to use if the underlying WebSocket API
* does not have flow control for receiving messages.
*/
private static final int RECEIVE_BUFFER_SIZE = 8192;
private final AtomicBoolean sendCalled = new AtomicBoolean();
@ -52,8 +58,6 @@ public abstract class AbstractListenerWebSocketSession<T> extends WebSocketSessi @@ -52,8 +58,6 @@ public abstract class AbstractListenerWebSocketSession<T> extends WebSocketSessi
private volatile WebSocketSendProcessor sendProcessor;
private volatile boolean suspended = false;
public AbstractListenerWebSocketSession(T delegate, String id, URI uri) {
super(delegate);
@ -80,7 +84,9 @@ public abstract class AbstractListenerWebSocketSession<T> extends WebSocketSessi @@ -80,7 +84,9 @@ public abstract class AbstractListenerWebSocketSession<T> extends WebSocketSessi
@Override
public Flux<WebSocketMessage> receive() {
return Flux.from(this.receivePublisher).onBackpressureBuffer(BUFFER_SIZE);
return canSuspendReceiving() ?
Flux.from(this.receivePublisher) :
Flux.from(this.receivePublisher).onBackpressureBuffer(RECEIVE_BUFFER_SIZE);
}
@Override
@ -97,18 +103,33 @@ public abstract class AbstractListenerWebSocketSession<T> extends WebSocketSessi @@ -97,18 +103,33 @@ public abstract class AbstractListenerWebSocketSession<T> extends WebSocketSessi
}
}
protected void resumeReceives() {
this.suspended = false;
}
protected void suspendReceives() {
this.suspended = true;
}
protected boolean isSuspended() {
return this.suspended;
}
/**
* Resume receiving new message(s) after demand is generated by the
* downstream Subscriber.
* <p><strong>Note:</strong> if the underlying WebSocket API does not provide
* flow control for receiving messages, and this method should be a no-op
* and {@link #canSuspendReceiving()} should return {@code false}.
*/
protected abstract void resumeReceiving();
/**
* Suspend receiving until received message(s) are processed and more demand
* is generated by the downstream Subscriber.
* <p><strong>Note:</strong> if the underlying WebSocket API does not provide
* flow control for receiving messages, and this method should be a no-op
* and {@link #canSuspendReceiving()} should return {@code false}.
*/
protected abstract void suspendReceiving();
/**
* Whether the underlying WebSocket API has flow control and can suspend and
* resume the receiving of messages.
*/
protected abstract boolean canSuspendReceiving();
/**
* Send the given WebSocket message.
*/
protected abstract boolean sendMessage(WebSocketMessage message) throws IOException;
/** Handle a message callback from the WebSocketHandler adapter */
@ -151,7 +172,7 @@ public abstract class AbstractListenerWebSocketSession<T> extends WebSocketSessi @@ -151,7 +172,7 @@ public abstract class AbstractListenerWebSocketSession<T> extends WebSocketSessi
if (this.webSocketMessage != null) {
WebSocketMessage result = this.webSocketMessage;
this.webSocketMessage = null;
resumeReceives();
resumeReceiving();
return result;
}
@ -160,7 +181,7 @@ public abstract class AbstractListenerWebSocketSession<T> extends WebSocketSessi @@ -160,7 +181,7 @@ public abstract class AbstractListenerWebSocketSession<T> extends WebSocketSessi
void handleMessage(WebSocketMessage webSocketMessage) {
this.webSocketMessage = webSocketMessage;
suspendReceives();
suspendReceiving();
onDataAvailable();
}
}

6
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java

@ -71,7 +71,7 @@ public class JettyWebSocketHandlerAdapter { @@ -71,7 +71,7 @@ public class JettyWebSocketHandlerAdapter {
@OnWebSocketMessage
public void onWebSocketText(String message) {
if (this.wsSession != null && !this.wsSession.isSuspended()) {
if (this.wsSession != null) {
WebSocketMessage wsMessage = toMessage(Type.TEXT, message);
this.wsSession.handleMessage(wsMessage.getType(), wsMessage);
}
@ -79,7 +79,7 @@ public class JettyWebSocketHandlerAdapter { @@ -79,7 +79,7 @@ public class JettyWebSocketHandlerAdapter {
@OnWebSocketMessage
public void onWebSocketBinary(byte[] message, int offset, int length) {
if (this.wsSession != null && !this.wsSession.isSuspended()) {
if (this.wsSession != null) {
WebSocketMessage wsMessage = toMessage(Type.BINARY, ByteBuffer.wrap(message, offset, length));
wsSession.handleMessage(wsMessage.getType(), wsMessage);
}
@ -87,7 +87,7 @@ public class JettyWebSocketHandlerAdapter { @@ -87,7 +87,7 @@ public class JettyWebSocketHandlerAdapter {
@OnWebSocketFrame
public void onWebSocketFrame(Frame frame) {
if (this.wsSession != null && !this.wsSession.isSuspended()) {
if (this.wsSession != null) {
if (OpCode.PONG == frame.getOpCode()) {
ByteBuffer message = frame.getPayload() != null ? frame.getPayload() : EMPTY_PAYLOAD;
WebSocketMessage wsMessage = toMessage(Type.PONG, message);

22
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java

@ -21,13 +21,13 @@ import java.nio.charset.StandardCharsets; @@ -21,13 +21,13 @@ import java.nio.charset.StandardCharsets;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WriteCallback;
import reactor.core.publisher.Mono;
import org.springframework.util.ObjectUtils;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Mono;
/**
* Spring {@link WebSocketSession} adapter for Jetty's
* {@link org.eclipse.jetty.websocket.api.Session}.
@ -37,11 +37,13 @@ import reactor.core.publisher.Mono; @@ -37,11 +37,13 @@ import reactor.core.publisher.Mono;
*/
public class JettyWebSocketSession extends AbstractListenerWebSocketSession<Session> {
public JettyWebSocketSession(Session session) {
super(session, ObjectUtils.getIdentityHexString(session),
session.getUpgradeRequest().getRequestURI());
}
@Override
protected Mono<Void> closeInternal(CloseStatus status) {
getDelegate().close(status.getCode(), status.getReason());
@ -73,6 +75,22 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession<Sess @@ -73,6 +75,22 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession<Sess
return true;
}
@Override
protected void resumeReceiving() {
// No-op
}
@Override
protected void suspendReceiving() {
// No-op
}
@Override
protected boolean canSuspendReceiving() {
return false;
}
private final class WebSocketMessageWriteCallback implements WriteCallback {
@Override

18
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java

@ -64,10 +64,8 @@ public class TomcatWebSocketHandlerAdapter extends Endpoint { @@ -64,10 +64,8 @@ public class TomcatWebSocketHandlerAdapter extends Endpoint {
@Override
public void onMessage(String message) {
if (!wsSession.isSuspended()) {
WebSocketMessage wsMessage = toMessage(message);
wsSession.handleMessage(wsMessage.getType(), wsMessage);
}
WebSocketMessage wsMessage = toMessage(message);
wsSession.handleMessage(wsMessage.getType(), wsMessage);
}
});
@ -75,10 +73,8 @@ public class TomcatWebSocketHandlerAdapter extends Endpoint { @@ -75,10 +73,8 @@ public class TomcatWebSocketHandlerAdapter extends Endpoint {
@Override
public void onMessage(ByteBuffer message) {
if (!wsSession.isSuspended()) {
WebSocketMessage wsMessage = toMessage(message);
wsSession.handleMessage(wsMessage.getType(), wsMessage);
}
WebSocketMessage wsMessage = toMessage(message);
wsSession.handleMessage(wsMessage.getType(), wsMessage);
}
});
@ -86,10 +82,8 @@ public class TomcatWebSocketHandlerAdapter extends Endpoint { @@ -86,10 +82,8 @@ public class TomcatWebSocketHandlerAdapter extends Endpoint {
@Override
public void onMessage(PongMessage message) {
if (!wsSession.isSuspended()) {
WebSocketMessage wsMessage = toMessage(message);
wsSession.handleMessage(wsMessage.getType(), wsMessage);
}
WebSocketMessage wsMessage = toMessage(message);
wsSession.handleMessage(wsMessage.getType(), wsMessage);
}
});

25
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java

@ -18,19 +18,18 @@ package org.springframework.web.reactive.socket.adapter; @@ -18,19 +18,18 @@ package org.springframework.web.reactive.socket.adapter;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import javax.websocket.CloseReason;
import javax.websocket.CloseReason.CloseCodes;
import javax.websocket.SendHandler;
import javax.websocket.SendResult;
import javax.websocket.Session;
import javax.websocket.CloseReason.CloseCodes;
import reactor.core.publisher.Mono;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Mono;
/**
* Spring {@link WebSocketSession} adapter for Tomcat's
* {@link javax.websocket.Session}.
@ -40,10 +39,12 @@ import reactor.core.publisher.Mono; @@ -40,10 +39,12 @@ import reactor.core.publisher.Mono;
*/
public class TomcatWebSocketSession extends AbstractListenerWebSocketSession<Session> {
public TomcatWebSocketSession(Session session) {
super(session, session.getId(), session.getRequestURI());
}
@Override
protected Mono<Void> closeInternal(CloseStatus status) {
try {
@ -81,6 +82,22 @@ public class TomcatWebSocketSession extends AbstractListenerWebSocketSession<Ses @@ -81,6 +82,22 @@ public class TomcatWebSocketSession extends AbstractListenerWebSocketSession<Ses
return true;
}
@Override
protected void resumeReceiving() {
// No-op
}
@Override
protected void suspendReceiving() {
// No-op
}
@Override
protected boolean canSuspendReceiving() {
return false;
}
private final class WebSocketMessageSendHandler implements SendHandler {
@Override

21
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java

@ -21,17 +21,17 @@ import java.net.URI; @@ -21,17 +21,17 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import org.springframework.util.ObjectUtils;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import io.undertow.websockets.core.CloseMessage;
import io.undertow.websockets.core.WebSocketCallback;
import io.undertow.websockets.core.WebSocketChannel;
import io.undertow.websockets.core.WebSockets;
import reactor.core.publisher.Mono;
import org.springframework.util.ObjectUtils;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
/**
* Spring {@link WebSocketSession} adapter for Undertow's
* {@link io.undertow.websockets.core.WebSocketChannel}.
@ -54,16 +54,19 @@ public class UndertowWebSocketSession extends AbstractListenerWebSocketSession<W @@ -54,16 +54,19 @@ public class UndertowWebSocketSession extends AbstractListenerWebSocketSession<W
return Mono.empty();
}
protected void resumeReceives() {
super.resumeReceives();
protected void resumeReceiving() {
getDelegate().resumeReceives();
}
protected void suspendReceives() {
super.suspendReceives();
protected void suspendReceiving() {
getDelegate().suspendReceives();
}
@Override
protected boolean canSuspendReceiving() {
return true;
}
@Override
protected boolean sendMessage(WebSocketMessage message) throws IOException {
if (WebSocketMessage.Type.TEXT.equals(message.getType())) {

Loading…
Cancel
Save