diff --git a/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/WebSocketIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/WebSocketIntegrationTests.java index 5b1cc14525a..806f967f691 100644 --- a/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/WebSocketIntegrationTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/WebSocketIntegrationTests.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.hamcrest.Matchers; import org.junit.Ignore; import org.junit.Test; +import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; @@ -30,7 +31,7 @@ import reactor.core.publisher.ReplayProcessor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.util.StringUtils; +import org.springframework.http.HttpHeaders; import org.springframework.web.reactive.HandlerMapping; import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping; import org.springframework.web.reactive.socket.HandshakeInfo; @@ -86,7 +87,7 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests @Test @Ignore("https://github.com/reactor/reactor-netty/issues/20") - public void subProtocolReactorNettyClient() throws Exception { + public void subProtocolReactorClient() throws Exception { testSubProtocol(new ReactorNettyWebSocketClient()); } @@ -101,7 +102,12 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests MonoProcessor output = MonoProcessor.create(); client.execute(getUrl("/sub-protocol"), - new SubProtocolWebSocketHandler(protocol) { + new WebSocketHandler() { + + @Override + public String[] getSubProtocols() { + return new String[] {protocol}; + } @Override public Mono handle(WebSocketSession session) { @@ -121,10 +127,30 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests assertEquals("Wrong protocol detected on the server side", protocol, output.blockMillis(5000)); } + @Test + public void customHeaderReactorClient() throws Exception { + testCustomHeader(new ReactorNettyWebSocketClient()); + } @Test - public void customHeaders() throws Exception { - // TODO + public void customHeaderRxNettyClient() throws Exception { + testCustomHeader(new RxNettyWebSocketClient()); + } + + private void testCustomHeader(WebSocketClient client) throws Exception { + + HttpHeaders headers = new HttpHeaders(); + headers.add("my-header", "my-value"); + MonoProcessor output = MonoProcessor.create(); + + client.execute(getUrl("/custom-header"), headers, + session -> session.receive() + .map(WebSocketMessage::getPayloadAsText) + .subscribeWith(output) + .then()) + .blockMillis(5000); + + assertEquals("my-header:my-value", output.blockMillis(5000)); } @@ -136,7 +162,8 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests Map map = new HashMap<>(); map.put("/echo", new EchoWebSocketHandler()); - map.put("/sub-protocol", new SubProtocolWebSocketHandler("echo-v1")); + map.put("/sub-protocol", new SubProtocolWebSocketHandler()); + map.put("/custom-header", new CustomHeaderHandler()); SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping(); mapping.setUrlMap(map); @@ -156,23 +183,35 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests private static class SubProtocolWebSocketHandler implements WebSocketHandler { - private final String subProtocols; - - public SubProtocolWebSocketHandler(String subProtocols) { - this.subProtocols = subProtocols; - } - @Override public String[] getSubProtocols() { - return StringUtils.commaDelimitedListToStringArray(this.subProtocols); + return new String[] {"echo-v1"}; } @Override public Mono handle(WebSocketSession session) { String protocol = session.getHandshakeInfo().getSubProtocol().orElse("none"); WebSocketMessage message = session.textMessage(protocol); - return session.send(Mono.just(message)); + return doSend(session, Mono.just(message)); } } + private static class CustomHeaderHandler implements WebSocketHandler { + + @Override + public Mono handle(WebSocketSession session) { + HttpHeaders headers = session.getHandshakeInfo().getHeaders(); + String payload = "my-header:" + headers.getFirst("my-header"); + WebSocketMessage message = session.textMessage(payload); + return doSend(session, Mono.just(message)); + } + } + + // TODO: workaround for suspected RxNetty WebSocket client issue + // https://github.com/ReactiveX/RxNetty/issues/560 + + private static Mono doSend(WebSocketSession session, Publisher output) { + return session.send(Mono.delayMillis(100).thenMany(output)); + } + } diff --git a/spring-web-reactive/src/test/resources/log4j2-test.xml b/spring-web-reactive/src/test/resources/log4j2-test.xml index 1b0782fb8f3..7cacb266ab4 100644 --- a/spring-web-reactive/src/test/resources/log4j2-test.xml +++ b/spring-web-reactive/src/test/resources/log4j2-test.xml @@ -9,6 +9,7 @@ +