diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java index 287609df3f7..886c48336b0 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java @@ -19,19 +19,21 @@ package org.springframework.web.reactive.result.method.annotation; import java.time.Duration; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import reactor.core.publisher.Flux; +import reactor.core.publisher.MonoProcessor; import reactor.test.StepVerifier; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.ParameterizedTypeReference; -import org.springframework.core.ResolvableType; import org.springframework.http.codec.ServerSentEvent; import org.springframework.http.server.reactive.AbstractHttpHandlerIntegrationTests; import org.springframework.http.server.reactive.HttpHandler; -import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.http.server.reactive.bootstrap.ReactorHttpServer; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.reactive.DispatcherHandler; import org.springframework.web.reactive.config.EnableWebFlux; @@ -39,9 +41,9 @@ import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.server.adapter.WebHttpHandlerBuilder; import static org.junit.Assert.*; -import static org.springframework.core.ResolvableType.forClassWithGenerics; -import static org.springframework.http.MediaType.TEXT_EVENT_STREAM; -import static org.springframework.web.reactive.function.BodyExtractors.toFlux; +import static org.junit.Assume.*; +import static org.springframework.http.MediaType.*; +import static org.springframework.web.reactive.function.BodyExtractors.*; /** * @author Sebastien Deleuze @@ -102,7 +104,6 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { @Test public void sseAsEvent() { - ResolvableType type = forClassWithGenerics(ServerSentEvent.class, String.class); Flux> result = this.webClient.get() .uri("/event") .accept(TEXT_EVENT_STREAM) @@ -157,6 +158,28 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { .verify(Duration.ofSeconds(5L)); } + @Test // SPR-16494 + @Ignore // https://github.com/reactor/reactor-netty/issues/283 + public void serverDetectsClientDisconnect() { + + assumeTrue(this.server instanceof ReactorHttpServer); + + Flux result = this.webClient.get() + .uri("/infinite") + .accept(TEXT_EVENT_STREAM) + .exchange() + .flatMapMany(response -> response.bodyToFlux(String.class)); + + StepVerifier.create(result) + .expectNext("foo 0") + .expectNext("foo 1") + .thenCancel() + .verify(Duration.ofSeconds(5L)); + + SseController controller = this.wac.getBean(SseController.class); + controller.cancellation.block(Duration.ofSeconds(5)); + } + @RestController @SuppressWarnings("unused") @@ -164,18 +187,20 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { private static final Flux INTERVAL = interval(Duration.ofMillis(100), 50); + private MonoProcessor cancellation = MonoProcessor.create(); - @RequestMapping("/sse/string") + + @GetMapping("/sse/string") Flux string() { return INTERVAL.map(l -> "foo " + l); } - @RequestMapping("/sse/person") + @GetMapping("/sse/person") Flux person() { return INTERVAL.map(l -> new Person("foo " + l)); } - @RequestMapping("/sse/event") + @GetMapping("/sse/event") Flux> sse() { return INTERVAL.map(l -> ServerSentEvent.builder("foo") .id(Long.toString(l)) @@ -183,6 +208,12 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { .build()); } + @GetMapping("/sse/infinite") + Flux infinite() { + return Flux.just(0, 1).map(l -> "foo " + l) + .mergeWith(Flux.never()) + .doOnCancel(() -> cancellation.onComplete()); + } }