@ -19,19 +19,21 @@ package org.springframework.web.reactive.result.method.annotation;
@@ -19,19 +19,21 @@ package org.springframework.web.reactive.result.method.annotation;
import java.time.Duration ;
import org.junit.Before ;
import org.junit.Ignore ;
import org.junit.Test ;
import reactor.core.publisher.Flux ;
import reactor.core.publisher.MonoProcessor ;
import reactor.test.StepVerifier ;
import org.springframework.context.annotation.AnnotationConfigApplicationContext ;
import org.springframework.context.annotation.Bean ;
import org.springframework.context.annotation.Configuration ;
import org.springframework.core.ParameterizedTypeReference ;
import org.springframework.core.ResolvableType ;
import org.springframework.http.codec.ServerSentEvent ;
import org.springframework.http.server.reactive.AbstractHttpHandlerIntegrationTests ;
import org.springframework.http.server.reactive.HttpHandler ;
import org.springframework.web.bind.annotation.RequestMapping ;
import org.springframework.http.server.reactive.bootstrap.ReactorHttpServer ;
import org.springframework.web.bind.annotation.GetMapping ;
import org.springframework.web.bind.annotation.RestController ;
import org.springframework.web.reactive.DispatcherHandler ;
import org.springframework.web.reactive.config.EnableWebFlux ;
@ -39,9 +41,9 @@ import org.springframework.web.reactive.function.client.WebClient;
@@ -39,9 +41,9 @@ import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.server.adapter.WebHttpHandlerBuilder ;
import static org.junit.Assert.* ;
import static org.springframework.core.ResolvableType.forClassWithGenerics ;
import static org.springframework.http.MediaType.TEXT_EVENT_STREAM ;
import static org.springframework.web.reactive.function.BodyExtractors.toFlux ;
import static org.junit.Assume.* ;
import static org.springframework.http.MediaType.* ;
import static org.springframework.web.reactive.function.BodyExtractors.* ;
/ * *
* @author Sebastien Deleuze
@ -102,7 +104,6 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
@@ -102,7 +104,6 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
@Test
public void sseAsEvent ( ) {
ResolvableType type = forClassWithGenerics ( ServerSentEvent . class , String . class ) ;
Flux < ServerSentEvent < String > > result = this . webClient . get ( )
. uri ( "/event" )
. accept ( TEXT_EVENT_STREAM )
@ -157,6 +158,28 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
@@ -157,6 +158,28 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
. verify ( Duration . ofSeconds ( 5L ) ) ;
}
@Test // SPR-16494
@Ignore // https://github.com/reactor/reactor-netty/issues/283
public void serverDetectsClientDisconnect ( ) {
assumeTrue ( this . server instanceof ReactorHttpServer ) ;
Flux < String > result = this . webClient . get ( )
. uri ( "/infinite" )
. accept ( TEXT_EVENT_STREAM )
. exchange ( )
. flatMapMany ( response - > response . bodyToFlux ( String . class ) ) ;
StepVerifier . create ( result )
. expectNext ( "foo 0" )
. expectNext ( "foo 1" )
. thenCancel ( )
. verify ( Duration . ofSeconds ( 5L ) ) ;
SseController controller = this . wac . getBean ( SseController . class ) ;
controller . cancellation . block ( Duration . ofSeconds ( 5 ) ) ;
}
@RestController
@SuppressWarnings ( "unused" )
@ -164,18 +187,20 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
@@ -164,18 +187,20 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
private static final Flux < Long > INTERVAL = interval ( Duration . ofMillis ( 100 ) , 50 ) ;
private MonoProcessor < Void > cancellation = MonoProcessor . create ( ) ;
@RequestMapping ( "/sse/string" )
@GetMapping ( "/sse/string" )
Flux < String > string ( ) {
return INTERVAL . map ( l - > "foo " + l ) ;
}
@Reques tMapping ( "/sse/person" )
@Ge tMapping ( "/sse/person" )
Flux < Person > person ( ) {
return INTERVAL . map ( l - > new Person ( "foo " + l ) ) ;
}
@Reques tMapping ( "/sse/event" )
@Ge tMapping ( "/sse/event" )
Flux < ServerSentEvent < String > > sse ( ) {
return INTERVAL . map ( l - > ServerSentEvent . builder ( "foo" )
. id ( Long . toString ( l ) )
@ -183,6 +208,12 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
@@ -183,6 +208,12 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
. build ( ) ) ;
}
@GetMapping ( "/sse/infinite" )
Flux < String > infinite ( ) {
return Flux . just ( 0 , 1 ) . map ( l - > "foo " + l )
. mergeWith ( Flux . never ( ) )
. doOnCancel ( ( ) - > cancellation . onComplete ( ) ) ;
}
}