Browse Source

Consistently call setComplete before WS upgrade

Even thought Tomcat and Jetty, which commit the response more lazily,
were not impacted by this issue, it still makes sense for them to
complete the WebFlux response (and pre-commit actions) at the same time
as for other servers for consistent behavior.

See gh-24475
pull/24552/head
Rossen Stoyanchev 6 years ago
parent
commit
fb6ee01281
  1. 27
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyRequestUpgradeStrategy.java
  2. 7
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java
  3. 20
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/TomcatRequestUpgradeStrategy.java
  4. 2
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/UndertowRequestUpgradeStrategy.java

27
spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyRequestUpgradeStrategy.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,7 +16,6 @@ @@ -16,7 +16,6 @@
package org.springframework.web.reactive.socket.server.upgrade;
import java.io.IOException;
import java.util.function.Supplier;
import javax.servlet.ServletContext;
@ -162,18 +161,18 @@ public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy, Life @@ -162,18 +161,18 @@ public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy, Life
boolean isUpgrade = this.factory.isUpgradeRequest(servletRequest, servletResponse);
Assert.isTrue(isUpgrade, "Not a WebSocket handshake");
try {
adapterHolder.set(new WebSocketHandlerContainer(adapter, subProtocol));
this.factory.acceptWebSocket(servletRequest, servletResponse);
}
catch (IOException ex) {
return Mono.error(ex);
}
finally {
adapterHolder.remove();
}
return Mono.empty();
// Trigger WebFlux preCommit actions and upgrade
return exchange.getResponse().setComplete()
.then(Mono.fromCallable(() -> {
try {
adapterHolder.set(new WebSocketHandlerContainer(adapter, subProtocol));
this.factory.acceptWebSocket(servletRequest, servletResponse);
}
finally {
adapterHolder.remove();
}
return null;
}));
}
private static HttpServletRequest getNativeRequest(ServerHttpRequest request) {

7
spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java

@ -103,8 +103,13 @@ public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrateg @@ -103,8 +103,13 @@ public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrateg
HttpServerResponse reactorResponse = getNativeResponse(response);
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory();
// Trigger WebFlux preCommit actions and upgrade
return response.setComplete()
.then(Mono.defer(() -> reactorResponse.sendWebsocket(subProtocol, this.maxFramePayloadLength, this.handlePing,
.then(Mono.defer(() -> reactorResponse.sendWebsocket(
subProtocol,
this.maxFramePayloadLength,
this.handlePing,
(in, out) -> {
ReactorNettyWebSocketSession session =
new ReactorNettyWebSocketSession(

20
spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/TomcatRequestUpgradeStrategy.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,11 +16,9 @@ @@ -16,11 +16,9 @@
package org.springframework.web.reactive.socket.server.upgrade;
import java.io.IOException;
import java.util.Collections;
import java.util.function.Supplier;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.websocket.Endpoint;
@ -147,15 +145,13 @@ public class TomcatRequestUpgradeStrategy implements RequestUpgradeStrategy { @@ -147,15 +145,13 @@ public class TomcatRequestUpgradeStrategy implements RequestUpgradeStrategy {
config.setSubprotocols(subProtocol != null ?
Collections.singletonList(subProtocol) : Collections.emptyList());
try {
WsServerContainer container = getContainer(servletRequest);
container.doUpgrade(servletRequest, servletResponse, config, Collections.emptyMap());
}
catch (ServletException | IOException ex) {
return Mono.error(ex);
}
return Mono.empty();
// Trigger WebFlux preCommit actions and upgrade
return exchange.getResponse().setComplete()
.then(Mono.fromCallable(() -> {
WsServerContainer container = getContainer(servletRequest);
container.doUpgrade(servletRequest, servletResponse, config, Collections.emptyMap());
return null;
}));
}
private static HttpServletRequest getNativeRequest(ServerHttpRequest request) {

2
spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/UndertowRequestUpgradeStrategy.java

@ -64,6 +64,8 @@ public class UndertowRequestUpgradeStrategy implements RequestUpgradeStrategy { @@ -64,6 +64,8 @@ public class UndertowRequestUpgradeStrategy implements RequestUpgradeStrategy {
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory();
// Trigger WebFlux preCommit actions and upgrade
return exchange.getResponse().setComplete()
.then(Mono.fromCallable(() -> {
DefaultCallback callback = new DefaultCallback(handshakeInfo, handler, bufferFactory);

Loading…
Cancel
Save