@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicReference;
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.hamcrest.Matchers ;
import org.junit.Ignore ;
import org.junit.Test ;
import org.reactivestreams.Publisher ;
import reactor.core.publisher.Flux ;
import reactor.core.publisher.Mono ;
import reactor.core.publisher.MonoProcessor ;
@ -30,7 +31,7 @@ import reactor.core.publisher.ReplayProcessor;
@@ -30,7 +31,7 @@ import reactor.core.publisher.ReplayProcessor;
import org.springframework.context.annotation.Bean ;
import org.springframework.context.annotation.Configuration ;
import org.springframework.util.StringUtil s ;
import org.springframework.http.HttpHeader s ;
import org.springframework.web.reactive.HandlerMapping ;
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping ;
import org.springframework.web.reactive.socket.HandshakeInfo ;
@ -86,7 +87,7 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests
@@ -86,7 +87,7 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests
@Test
@Ignore ( "https://github.com/reactor/reactor-netty/issues/20" )
public void subProtocolReactorNetty Client ( ) throws Exception {
public void subProtocolReactorClient ( ) throws Exception {
testSubProtocol ( new ReactorNettyWebSocketClient ( ) ) ;
}
@ -101,7 +102,12 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests
@@ -101,7 +102,12 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests
MonoProcessor < Object > output = MonoProcessor . create ( ) ;
client . execute ( getUrl ( "/sub-protocol" ) ,
new SubProtocolWebSocketHandler ( protocol ) {
new WebSocketHandler ( ) {
@Override
public String [ ] getSubProtocols ( ) {
return new String [ ] { protocol } ;
}
@Override
public Mono < Void > handle ( WebSocketSession session ) {
@ -121,10 +127,30 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests
@@ -121,10 +127,30 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests
assertEquals ( "Wrong protocol detected on the server side" , protocol , output . blockMillis ( 5000 ) ) ;
}
@Test
public void customHeaderReactorClient ( ) throws Exception {
testCustomHeader ( new ReactorNettyWebSocketClient ( ) ) ;
}
@Test
public void customHeaders ( ) throws Exception {
// TODO
public void customHeaderRxNettyClient ( ) throws Exception {
testCustomHeader ( new RxNettyWebSocketClient ( ) ) ;
}
private void testCustomHeader ( WebSocketClient client ) throws Exception {
HttpHeaders headers = new HttpHeaders ( ) ;
headers . add ( "my-header" , "my-value" ) ;
MonoProcessor < Object > output = MonoProcessor . create ( ) ;
client . execute ( getUrl ( "/custom-header" ) , headers ,
session - > session . receive ( )
. map ( WebSocketMessage : : getPayloadAsText )
. subscribeWith ( output )
. then ( ) )
. blockMillis ( 5000 ) ;
assertEquals ( "my-header:my-value" , output . blockMillis ( 5000 ) ) ;
}
@ -136,7 +162,8 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests
@@ -136,7 +162,8 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests
Map < String , WebSocketHandler > map = new HashMap < > ( ) ;
map . put ( "/echo" , new EchoWebSocketHandler ( ) ) ;
map . put ( "/sub-protocol" , new SubProtocolWebSocketHandler ( "echo-v1" ) ) ;
map . put ( "/sub-protocol" , new SubProtocolWebSocketHandler ( ) ) ;
map . put ( "/custom-header" , new CustomHeaderHandler ( ) ) ;
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping ( ) ;
mapping . setUrlMap ( map ) ;
@ -156,23 +183,35 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests
@@ -156,23 +183,35 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests
private static class SubProtocolWebSocketHandler implements WebSocketHandler {
private final String subProtocols ;
public SubProtocolWebSocketHandler ( String subProtocols ) {
this . subProtocols = subProtocols ;
}
@Override
public String [ ] getSubProtocols ( ) {
return StringUtils . commaDelimitedListToStringArray ( this . subProtocols ) ;
return new String [ ] { "echo-v1" } ;
}
@Override
public Mono < Void > handle ( WebSocketSession session ) {
String protocol = session . getHandshakeInfo ( ) . getSubProtocol ( ) . orElse ( "none" ) ;
WebSocketMessage message = session . textMessage ( protocol ) ;
return session . send ( Mono . just ( message ) ) ;
return doSend ( session , Mono . just ( message ) ) ;
}
}
private static class CustomHeaderHandler implements WebSocketHandler {
@Override
public Mono < Void > handle ( WebSocketSession session ) {
HttpHeaders headers = session . getHandshakeInfo ( ) . getHeaders ( ) ;
String payload = "my-header:" + headers . getFirst ( "my-header" ) ;
WebSocketMessage message = session . textMessage ( payload ) ;
return doSend ( session , Mono . just ( message ) ) ;
}
}
// TODO: workaround for suspected RxNetty WebSocket client issue
// https://github.com/ReactiveX/RxNetty/issues/560
private static Mono < Void > doSend ( WebSocketSession session , Publisher < WebSocketMessage > output ) {
return session . send ( Mono . delayMillis ( 100 ) . thenMany ( output ) ) ;
}
}