Browse Source

Refactor AbstractListenerWebSocketSession

- Added suspended flag to indicate whether the ReceivePublisher
is able to process the incoming messages.
- Use buffer strategy for the incoming messages.

Issue: SPR-14527
pull/1256/merge
Violeta Georgieva 9 years ago committed by Rossen Stoyanchev
parent
commit
08edec006b
  1. 18
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java
  2. 6
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java
  3. 27
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java
  4. 2
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java
  5. 3
      spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/BasicWebSocketHandlerIntegrationTests.java

18
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java

@ -40,6 +40,8 @@ import org.springframework.web.reactive.socket.WebSocketSession; @@ -40,6 +40,8 @@ import org.springframework.web.reactive.socket.WebSocketSession;
*/
public abstract class AbstractListenerWebSocketSession<T> extends WebSocketSessionSupport<T> {
private static final int BUFFER_SIZE = 8192;
private final AtomicBoolean sendCalled = new AtomicBoolean();
private final String id;
@ -50,6 +52,8 @@ public abstract class AbstractListenerWebSocketSession<T> extends WebSocketSessi @@ -50,6 +52,8 @@ public abstract class AbstractListenerWebSocketSession<T> extends WebSocketSessi
private volatile WebSocketSendProcessor sendProcessor;
private volatile boolean suspended = false;
public AbstractListenerWebSocketSession(T delegate, String id, URI uri) {
super(delegate);
@ -76,7 +80,7 @@ public abstract class AbstractListenerWebSocketSession<T> extends WebSocketSessi @@ -76,7 +80,7 @@ public abstract class AbstractListenerWebSocketSession<T> extends WebSocketSessi
@Override
public Flux<WebSocketMessage> receive() {
return Flux.from(this.receivePublisher);
return Flux.from(this.receivePublisher).onBackpressureBuffer(BUFFER_SIZE);
}
@Override
@ -94,15 +98,15 @@ public abstract class AbstractListenerWebSocketSession<T> extends WebSocketSessi @@ -94,15 +98,15 @@ public abstract class AbstractListenerWebSocketSession<T> extends WebSocketSessi
}
protected void resumeReceives() {
// no-op
this.suspended = false;
}
protected void suspendReceives() {
// no-op
this.suspended = true;
}
protected boolean isReadyToReceive() {
return this.receivePublisher.isReadyToReceive();
protected boolean isSuspended() {
return this.suspended;
}
protected abstract boolean sendMessage(WebSocketMessage message) throws IOException;
@ -159,10 +163,6 @@ public abstract class AbstractListenerWebSocketSession<T> extends WebSocketSessi @@ -159,10 +163,6 @@ public abstract class AbstractListenerWebSocketSession<T> extends WebSocketSessi
suspendReceives();
onDataAvailable();
}
boolean isReadyToReceive() {
return this.webSocketMessage == null;
}
}
protected final class WebSocketSendProcessor extends AbstractListenerWriteProcessor<WebSocketMessage> {

6
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java

@ -71,7 +71,7 @@ public class JettyWebSocketHandlerAdapter { @@ -71,7 +71,7 @@ public class JettyWebSocketHandlerAdapter {
@OnWebSocketMessage
public void onWebSocketText(String message) {
if (this.wsSession != null) {
if (this.wsSession != null && !this.wsSession.isSuspended()) {
WebSocketMessage wsMessage = toMessage(Type.TEXT, message);
this.wsSession.handleMessage(wsMessage.getType(), wsMessage);
}
@ -79,7 +79,7 @@ public class JettyWebSocketHandlerAdapter { @@ -79,7 +79,7 @@ public class JettyWebSocketHandlerAdapter {
@OnWebSocketMessage
public void onWebSocketBinary(byte[] message, int offset, int length) {
if (this.wsSession != null) {
if (this.wsSession != null && !this.wsSession.isSuspended()) {
WebSocketMessage wsMessage = toMessage(Type.BINARY, ByteBuffer.wrap(message, offset, length));
wsSession.handleMessage(wsMessage.getType(), wsMessage);
}
@ -87,7 +87,7 @@ public class JettyWebSocketHandlerAdapter { @@ -87,7 +87,7 @@ public class JettyWebSocketHandlerAdapter {
@OnWebSocketFrame
public void onWebSocketFrame(Frame frame) {
if (this.wsSession != null) {
if (this.wsSession != null && !this.wsSession.isSuspended()) {
if (OpCode.PONG == frame.getOpCode()) {
ByteBuffer message = frame.getPayload() != null ? frame.getPayload() : EMPTY_PAYLOAD;
WebSocketMessage wsMessage = toMessage(Type.PONG, message);

27
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java

@ -64,12 +64,9 @@ public class TomcatWebSocketHandlerAdapter extends Endpoint { @@ -64,12 +64,9 @@ public class TomcatWebSocketHandlerAdapter extends Endpoint {
@Override
public void onMessage(String message) {
while (true) {
if (wsSession.isReadyToReceive()) {
WebSocketMessage wsMessage = toMessage(message);
wsSession.handleMessage(wsMessage.getType(), wsMessage);
break;
}
if (!wsSession.isSuspended()) {
WebSocketMessage wsMessage = toMessage(message);
wsSession.handleMessage(wsMessage.getType(), wsMessage);
}
}
@ -78,12 +75,9 @@ public class TomcatWebSocketHandlerAdapter extends Endpoint { @@ -78,12 +75,9 @@ public class TomcatWebSocketHandlerAdapter extends Endpoint {
@Override
public void onMessage(ByteBuffer message) {
while (true) {
if (wsSession.isReadyToReceive()) {
WebSocketMessage wsMessage = toMessage(message);
wsSession.handleMessage(wsMessage.getType(), wsMessage);
break;
}
if (!wsSession.isSuspended()) {
WebSocketMessage wsMessage = toMessage(message);
wsSession.handleMessage(wsMessage.getType(), wsMessage);
}
}
@ -92,12 +86,9 @@ public class TomcatWebSocketHandlerAdapter extends Endpoint { @@ -92,12 +86,9 @@ public class TomcatWebSocketHandlerAdapter extends Endpoint {
@Override
public void onMessage(PongMessage message) {
while (true) {
if (wsSession.isReadyToReceive()) {
WebSocketMessage wsMessage = toMessage(message);
wsSession.handleMessage(wsMessage.getType(), wsMessage);
break;
}
if (!wsSession.isSuspended()) {
WebSocketMessage wsMessage = toMessage(message);
wsSession.handleMessage(wsMessage.getType(), wsMessage);
}
}

2
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java

@ -55,10 +55,12 @@ public class UndertowWebSocketSession extends AbstractListenerWebSocketSession<W @@ -55,10 +55,12 @@ public class UndertowWebSocketSession extends AbstractListenerWebSocketSession<W
}
protected void resumeReceives() {
super.resumeReceives();
getDelegate().resumeReceives();
}
protected void suspendReceives() {
super.suspendReceives();
getDelegate().suspendReceives();
}

3
spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/BasicWebSocketHandlerIntegrationTests.java

@ -61,8 +61,7 @@ public class BasicWebSocketHandlerIntegrationTests extends AbstractWebSocketHand @@ -61,8 +61,7 @@ public class BasicWebSocketHandlerIntegrationTests extends AbstractWebSocketHand
.flatMap(WebSocketResponse::getWebSocketConnection)
.flatMap(conn -> conn.write(messages
.map(TextWebSocketFrame::new)
.cast(WebSocketFrame.class)
.concatWith(Observable.just(new CloseWebSocketFrame())))
.cast(WebSocketFrame.class))
.cast(WebSocketFrame.class)
.mergeWith(conn.getInput())
)

Loading…
Cancel
Save