Browse Source

Switch to suspended mode before demand

After this commit, Tomcat and Undertow WebSocketSession imlpementations
start out in suspended mode and wait for demand.

The JettyWebSocketSession is capable of suspending but it doesn't seem
to work if invoked before any messages are received. That may become an
issue if there is a case where no demand appears long enough for more
messages to accumulate than we can hold.

UnderowServerHttpRequest would ideally also start in suspended mode but
that also doesn't work. It is not an issue in this case since we can
ignore the read notifications.

Servlet API requires a proactive check before it calls you back so
there is no need to suspend.

Issue: SPR-16207
pull/1605/head
Rossen Stoyanchev 8 years ago
parent
commit
f44366877c
  1. 7
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java
  2. 2
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java
  3. 1
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java
  4. 1
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java

7
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java

@ -128,6 +128,8 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc @@ -128,6 +128,8 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
/**
* Whether the underlying WebSocket API has flow control and can suspend and
* resume the receiving of messages.
* <p><strong>Note:</strong> Sub-classes are encouraged to start out in
* suspended mode, if possible, and wait until demand is received.
*/
protected abstract boolean canSuspendReceiving();
@ -238,7 +240,10 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc @@ -238,7 +240,10 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
}
void handleMessage(WebSocketMessage webSocketMessage) {
this.pendingMessages.offer(webSocketMessage);
if (!this.pendingMessages.offer(webSocketMessage)) {
throw new IllegalStateException("Too many messages received. " +
"Please ensure WebSocketSession.receive() is subscribed to.");
}
onDataAvailable();
}
}

2
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java

@ -57,6 +57,8 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession<Sess @@ -57,6 +57,8 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession<Sess
@Nullable MonoProcessor<Void> completionMono) {
super(session, ObjectUtils.getIdentityHexString(session), info, factory, completionMono);
// TODO: suspend causes failures if invoked at this stage
// suspendReceiving();
}

1
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java

@ -49,6 +49,7 @@ public class TomcatWebSocketSession extends StandardWebSocketSession { @@ -49,6 +49,7 @@ public class TomcatWebSocketSession extends StandardWebSocketSession {
MonoProcessor<Void> completionMono) {
super(session, info, factory, completionMono);
suspendReceiving();
}

1
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java

@ -53,6 +53,7 @@ public class UndertowWebSocketSession extends AbstractListenerWebSocketSession<W @@ -53,6 +53,7 @@ public class UndertowWebSocketSession extends AbstractListenerWebSocketSession<W
DataBufferFactory factory, @Nullable MonoProcessor<Void> completionMono) {
super(channel, ObjectUtils.getIdentityHexString(channel), info, factory, completionMono);
suspendReceiving();
}

Loading…
Cancel
Save