From db5bc4a24e001f0aadc13755afd4c86f511a1815 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Mon, 12 Dec 2016 15:20:02 -0500 Subject: [PATCH] Minor refactoring of suspend/resumeReceiving suspend/resumeReceiving in the AbstractListenerWebSocketSession are now abstract methods. In Tomcat/Jetty these methods are no-op implementations that are then coupled with a buffering strategy via Flux#onBackpressureBuffer. In Undertow they rely on flow control for receiving WebSocket messages. Issue: SPR-14527 --- .../AbstractListenerWebSocketSession.java | 57 +++++++++++++------ .../adapter/JettyWebSocketHandlerAdapter.java | 6 +- .../socket/adapter/JettyWebSocketSession.java | 22 ++++++- .../TomcatWebSocketHandlerAdapter.java | 18 ++---- .../adapter/TomcatWebSocketSession.java | 25 ++++++-- .../adapter/UndertowWebSocketSession.java | 21 ++++--- 6 files changed, 101 insertions(+), 48 deletions(-) diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java index 880ee61f2fb..82035d46082 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java @@ -36,11 +36,17 @@ import org.springframework.web.reactive.socket.WebSocketSession; * Base class for Listener-based {@link WebSocketSession} adapters. * * @author Violeta Georgieva + * @author Rossen Stoyanchev * @since 5.0 */ public abstract class AbstractListenerWebSocketSession extends WebSocketSessionSupport { - private static final int BUFFER_SIZE = 8192; + /** + * The "back-pressure" buffer size to use if the underlying WebSocket API + * does not have flow control for receiving messages. + */ + private static final int RECEIVE_BUFFER_SIZE = 8192; + private final AtomicBoolean sendCalled = new AtomicBoolean(); @@ -52,8 +58,6 @@ public abstract class AbstractListenerWebSocketSession extends WebSocketSessi private volatile WebSocketSendProcessor sendProcessor; - private volatile boolean suspended = false; - public AbstractListenerWebSocketSession(T delegate, String id, URI uri) { super(delegate); @@ -80,7 +84,9 @@ public abstract class AbstractListenerWebSocketSession extends WebSocketSessi @Override public Flux receive() { - return Flux.from(this.receivePublisher).onBackpressureBuffer(BUFFER_SIZE); + return canSuspendReceiving() ? + Flux.from(this.receivePublisher) : + Flux.from(this.receivePublisher).onBackpressureBuffer(RECEIVE_BUFFER_SIZE); } @Override @@ -97,18 +103,33 @@ public abstract class AbstractListenerWebSocketSession extends WebSocketSessi } } - protected void resumeReceives() { - this.suspended = false; - } - - protected void suspendReceives() { - this.suspended = true; - } - - protected boolean isSuspended() { - return this.suspended; - } - + /** + * Resume receiving new message(s) after demand is generated by the + * downstream Subscriber. + *

Note: if the underlying WebSocket API does not provide + * flow control for receiving messages, and this method should be a no-op + * and {@link #canSuspendReceiving()} should return {@code false}. + */ + protected abstract void resumeReceiving(); + + /** + * Suspend receiving until received message(s) are processed and more demand + * is generated by the downstream Subscriber. + *

Note: if the underlying WebSocket API does not provide + * flow control for receiving messages, and this method should be a no-op + * and {@link #canSuspendReceiving()} should return {@code false}. + */ + protected abstract void suspendReceiving(); + + /** + * Whether the underlying WebSocket API has flow control and can suspend and + * resume the receiving of messages. + */ + protected abstract boolean canSuspendReceiving(); + + /** + * Send the given WebSocket message. + */ protected abstract boolean sendMessage(WebSocketMessage message) throws IOException; /** Handle a message callback from the WebSocketHandler adapter */ @@ -151,7 +172,7 @@ public abstract class AbstractListenerWebSocketSession extends WebSocketSessi if (this.webSocketMessage != null) { WebSocketMessage result = this.webSocketMessage; this.webSocketMessage = null; - resumeReceives(); + resumeReceiving(); return result; } @@ -160,7 +181,7 @@ public abstract class AbstractListenerWebSocketSession extends WebSocketSessi void handleMessage(WebSocketMessage webSocketMessage) { this.webSocketMessage = webSocketMessage; - suspendReceives(); + suspendReceiving(); onDataAvailable(); } } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java index 553f4402ebf..815bd2da79e 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java @@ -71,7 +71,7 @@ public class JettyWebSocketHandlerAdapter { @OnWebSocketMessage public void onWebSocketText(String message) { - if (this.wsSession != null && !this.wsSession.isSuspended()) { + if (this.wsSession != null) { WebSocketMessage wsMessage = toMessage(Type.TEXT, message); this.wsSession.handleMessage(wsMessage.getType(), wsMessage); } @@ -79,7 +79,7 @@ public class JettyWebSocketHandlerAdapter { @OnWebSocketMessage public void onWebSocketBinary(byte[] message, int offset, int length) { - if (this.wsSession != null && !this.wsSession.isSuspended()) { + if (this.wsSession != null) { WebSocketMessage wsMessage = toMessage(Type.BINARY, ByteBuffer.wrap(message, offset, length)); wsSession.handleMessage(wsMessage.getType(), wsMessage); } @@ -87,7 +87,7 @@ public class JettyWebSocketHandlerAdapter { @OnWebSocketFrame public void onWebSocketFrame(Frame frame) { - if (this.wsSession != null && !this.wsSession.isSuspended()) { + if (this.wsSession != null) { if (OpCode.PONG == frame.getOpCode()) { ByteBuffer message = frame.getPayload() != null ? frame.getPayload() : EMPTY_PAYLOAD; WebSocketMessage wsMessage = toMessage(Type.PONG, message); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java index 806f03295f2..56f04611c4f 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java @@ -21,13 +21,13 @@ import java.nio.charset.StandardCharsets; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.WriteCallback; +import reactor.core.publisher.Mono; + import org.springframework.util.ObjectUtils; import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.WebSocketMessage; import org.springframework.web.reactive.socket.WebSocketSession; -import reactor.core.publisher.Mono; - /** * Spring {@link WebSocketSession} adapter for Jetty's * {@link org.eclipse.jetty.websocket.api.Session}. @@ -37,11 +37,13 @@ import reactor.core.publisher.Mono; */ public class JettyWebSocketSession extends AbstractListenerWebSocketSession { + public JettyWebSocketSession(Session session) { super(session, ObjectUtils.getIdentityHexString(session), session.getUpgradeRequest().getRequestURI()); } + @Override protected Mono closeInternal(CloseStatus status) { getDelegate().close(status.getCode(), status.getReason()); @@ -73,6 +75,22 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession { + public TomcatWebSocketSession(Session session) { super(session, session.getId(), session.getRequestURI()); } + @Override protected Mono closeInternal(CloseStatus status) { try { @@ -81,6 +82,22 @@ public class TomcatWebSocketSession extends AbstractListenerWebSocketSession