diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyRequestUpgradeStrategy.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyRequestUpgradeStrategy.java index f5fe5cd5ece..ec41e8cf6c3 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyRequestUpgradeStrategy.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyRequestUpgradeStrategy.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,7 +16,6 @@ package org.springframework.web.reactive.socket.server.upgrade; -import java.io.IOException; import java.util.function.Supplier; import javax.servlet.ServletContext; @@ -162,18 +161,18 @@ public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy, Life boolean isUpgrade = this.factory.isUpgradeRequest(servletRequest, servletResponse); Assert.isTrue(isUpgrade, "Not a WebSocket handshake"); - try { - adapterHolder.set(new WebSocketHandlerContainer(adapter, subProtocol)); - this.factory.acceptWebSocket(servletRequest, servletResponse); - } - catch (IOException ex) { - return Mono.error(ex); - } - finally { - adapterHolder.remove(); - } - - return Mono.empty(); + // Trigger WebFlux preCommit actions and upgrade + return exchange.getResponse().setComplete() + .then(Mono.fromCallable(() -> { + try { + adapterHolder.set(new WebSocketHandlerContainer(adapter, subProtocol)); + this.factory.acceptWebSocket(servletRequest, servletResponse); + } + finally { + adapterHolder.remove(); + } + return null; + })); } private static HttpServletRequest getNativeRequest(ServerHttpRequest request) { diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java index f29c4f63564..8753d4bc324 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java @@ -103,8 +103,13 @@ public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrateg HttpServerResponse reactorResponse = getNativeResponse(response); HandshakeInfo handshakeInfo = handshakeInfoFactory.get(); NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory(); + + // Trigger WebFlux preCommit actions and upgrade return response.setComplete() - .then(Mono.defer(() -> reactorResponse.sendWebsocket(subProtocol, this.maxFramePayloadLength, this.handlePing, + .then(Mono.defer(() -> reactorResponse.sendWebsocket( + subProtocol, + this.maxFramePayloadLength, + this.handlePing, (in, out) -> { ReactorNettyWebSocketSession session = new ReactorNettyWebSocketSession( diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/TomcatRequestUpgradeStrategy.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/TomcatRequestUpgradeStrategy.java index fb4ef36dd07..ad56c0370f1 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/TomcatRequestUpgradeStrategy.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/TomcatRequestUpgradeStrategy.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,11 +16,9 @@ package org.springframework.web.reactive.socket.server.upgrade; -import java.io.IOException; import java.util.Collections; import java.util.function.Supplier; -import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.websocket.Endpoint; @@ -147,15 +145,13 @@ public class TomcatRequestUpgradeStrategy implements RequestUpgradeStrategy { config.setSubprotocols(subProtocol != null ? Collections.singletonList(subProtocol) : Collections.emptyList()); - try { - WsServerContainer container = getContainer(servletRequest); - container.doUpgrade(servletRequest, servletResponse, config, Collections.emptyMap()); - } - catch (ServletException | IOException ex) { - return Mono.error(ex); - } - - return Mono.empty(); + // Trigger WebFlux preCommit actions and upgrade + return exchange.getResponse().setComplete() + .then(Mono.fromCallable(() -> { + WsServerContainer container = getContainer(servletRequest); + container.doUpgrade(servletRequest, servletResponse, config, Collections.emptyMap()); + return null; + })); } private static HttpServletRequest getNativeRequest(ServerHttpRequest request) { diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/UndertowRequestUpgradeStrategy.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/UndertowRequestUpgradeStrategy.java index 8307c8eed01..d57fb1b9481 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/UndertowRequestUpgradeStrategy.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/UndertowRequestUpgradeStrategy.java @@ -64,6 +64,8 @@ public class UndertowRequestUpgradeStrategy implements RequestUpgradeStrategy { HandshakeInfo handshakeInfo = handshakeInfoFactory.get(); DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory(); + + // Trigger WebFlux preCommit actions and upgrade return exchange.getResponse().setComplete() .then(Mono.fromCallable(() -> { DefaultCallback callback = new DefaultCallback(handshakeInfo, handler, bufferFactory);