diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java index 02b7a591694..880ee61f2fb 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java @@ -40,6 +40,8 @@ import org.springframework.web.reactive.socket.WebSocketSession; */ public abstract class AbstractListenerWebSocketSession extends WebSocketSessionSupport { + private static final int BUFFER_SIZE = 8192; + private final AtomicBoolean sendCalled = new AtomicBoolean(); private final String id; @@ -50,6 +52,8 @@ public abstract class AbstractListenerWebSocketSession extends WebSocketSessi private volatile WebSocketSendProcessor sendProcessor; + private volatile boolean suspended = false; + public AbstractListenerWebSocketSession(T delegate, String id, URI uri) { super(delegate); @@ -76,7 +80,7 @@ public abstract class AbstractListenerWebSocketSession extends WebSocketSessi @Override public Flux receive() { - return Flux.from(this.receivePublisher); + return Flux.from(this.receivePublisher).onBackpressureBuffer(BUFFER_SIZE); } @Override @@ -94,15 +98,15 @@ public abstract class AbstractListenerWebSocketSession extends WebSocketSessi } protected void resumeReceives() { - // no-op + this.suspended = false; } protected void suspendReceives() { - // no-op + this.suspended = true; } - protected boolean isReadyToReceive() { - return this.receivePublisher.isReadyToReceive(); + protected boolean isSuspended() { + return this.suspended; } protected abstract boolean sendMessage(WebSocketMessage message) throws IOException; @@ -159,10 +163,6 @@ public abstract class AbstractListenerWebSocketSession extends WebSocketSessi suspendReceives(); onDataAvailable(); } - - boolean isReadyToReceive() { - return this.webSocketMessage == null; - } } protected final class WebSocketSendProcessor extends AbstractListenerWriteProcessor { diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java index 815bd2da79e..553f4402ebf 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java @@ -71,7 +71,7 @@ public class JettyWebSocketHandlerAdapter { @OnWebSocketMessage public void onWebSocketText(String message) { - if (this.wsSession != null) { + if (this.wsSession != null && !this.wsSession.isSuspended()) { WebSocketMessage wsMessage = toMessage(Type.TEXT, message); this.wsSession.handleMessage(wsMessage.getType(), wsMessage); } @@ -79,7 +79,7 @@ public class JettyWebSocketHandlerAdapter { @OnWebSocketMessage public void onWebSocketBinary(byte[] message, int offset, int length) { - if (this.wsSession != null) { + if (this.wsSession != null && !this.wsSession.isSuspended()) { WebSocketMessage wsMessage = toMessage(Type.BINARY, ByteBuffer.wrap(message, offset, length)); wsSession.handleMessage(wsMessage.getType(), wsMessage); } @@ -87,7 +87,7 @@ public class JettyWebSocketHandlerAdapter { @OnWebSocketFrame public void onWebSocketFrame(Frame frame) { - if (this.wsSession != null) { + if (this.wsSession != null && !this.wsSession.isSuspended()) { if (OpCode.PONG == frame.getOpCode()) { ByteBuffer message = frame.getPayload() != null ? frame.getPayload() : EMPTY_PAYLOAD; WebSocketMessage wsMessage = toMessage(Type.PONG, message); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java index 405ef5b3883..28319a9934e 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java @@ -64,12 +64,9 @@ public class TomcatWebSocketHandlerAdapter extends Endpoint { @Override public void onMessage(String message) { - while (true) { - if (wsSession.isReadyToReceive()) { - WebSocketMessage wsMessage = toMessage(message); - wsSession.handleMessage(wsMessage.getType(), wsMessage); - break; - } + if (!wsSession.isSuspended()) { + WebSocketMessage wsMessage = toMessage(message); + wsSession.handleMessage(wsMessage.getType(), wsMessage); } } @@ -78,12 +75,9 @@ public class TomcatWebSocketHandlerAdapter extends Endpoint { @Override public void onMessage(ByteBuffer message) { - while (true) { - if (wsSession.isReadyToReceive()) { - WebSocketMessage wsMessage = toMessage(message); - wsSession.handleMessage(wsMessage.getType(), wsMessage); - break; - } + if (!wsSession.isSuspended()) { + WebSocketMessage wsMessage = toMessage(message); + wsSession.handleMessage(wsMessage.getType(), wsMessage); } } @@ -92,12 +86,9 @@ public class TomcatWebSocketHandlerAdapter extends Endpoint { @Override public void onMessage(PongMessage message) { - while (true) { - if (wsSession.isReadyToReceive()) { - WebSocketMessage wsMessage = toMessage(message); - wsSession.handleMessage(wsMessage.getType(), wsMessage); - break; - } + if (!wsSession.isSuspended()) { + WebSocketMessage wsMessage = toMessage(message); + wsSession.handleMessage(wsMessage.getType(), wsMessage); } } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java index b6f8a9dae92..dc3c305fca8 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java @@ -55,10 +55,12 @@ public class UndertowWebSocketSession extends AbstractListenerWebSocketSession conn.write(messages .map(TextWebSocketFrame::new) - .cast(WebSocketFrame.class) - .concatWith(Observable.just(new CloseWebSocketFrame()))) + .cast(WebSocketFrame.class)) .cast(WebSocketFrame.class) .mergeWith(conn.getInput()) )