|
|
|
|
@ -29,6 +29,7 @@ import reactor.core.publisher.Flux;
@@ -29,6 +29,7 @@ import reactor.core.publisher.Flux;
|
|
|
|
|
import reactor.core.publisher.Mono; |
|
|
|
|
import reactor.core.publisher.MonoProcessor; |
|
|
|
|
import reactor.core.publisher.ReplayProcessor; |
|
|
|
|
import reactor.util.retry.Retry; |
|
|
|
|
|
|
|
|
|
import org.springframework.context.annotation.Bean; |
|
|
|
|
import org.springframework.context.annotation.Configuration; |
|
|
|
|
@ -39,6 +40,7 @@ import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
@@ -39,6 +40,7 @@ import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
|
|
|
|
|
import org.springframework.web.reactive.socket.client.WebSocketClient; |
|
|
|
|
import org.springframework.web.server.WebFilter; |
|
|
|
|
import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer; |
|
|
|
|
import org.springframework.web.testfixture.http.server.reactive.bootstrap.TomcatHttpServer; |
|
|
|
|
|
|
|
|
|
import static org.assertj.core.api.Assertions.assertThat; |
|
|
|
|
|
|
|
|
|
@ -66,18 +68,28 @@ class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests {
@@ -66,18 +68,28 @@ class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests {
|
|
|
|
|
void echo(WebSocketClient client, HttpServer server, Class<?> serverConfigClass) throws Exception { |
|
|
|
|
startServer(client, server, serverConfigClass); |
|
|
|
|
|
|
|
|
|
if (server instanceof TomcatHttpServer) { |
|
|
|
|
Mono.fromRunnable(this::testEcho) |
|
|
|
|
.retryWhen(Retry.max(3).filter(ex -> ex instanceof IllegalStateException)) |
|
|
|
|
.block(); |
|
|
|
|
} |
|
|
|
|
else { |
|
|
|
|
testEcho(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void testEcho() { |
|
|
|
|
int count = 100; |
|
|
|
|
Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index); |
|
|
|
|
ReplayProcessor<Object> output = ReplayProcessor.create(count); |
|
|
|
|
|
|
|
|
|
this.client.execute(getUrl("/echo"), session -> session |
|
|
|
|
.send(input.map(session::textMessage)) |
|
|
|
|
.thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText)) |
|
|
|
|
.subscribeWith(output) |
|
|
|
|
.then()) |
|
|
|
|
.block(TIMEOUT); |
|
|
|
|
|
|
|
|
|
assertThat(output.collectList().block(TIMEOUT)).isEqualTo(input.collectList().block(TIMEOUT)); |
|
|
|
|
assertThat(output.isTerminated()).isTrue(); |
|
|
|
|
assertThat(output.collectList().block()).isEqualTo(input.collectList().block()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ParameterizedWebSocketTest |
|
|
|
|
|