@ -23,6 +23,7 @@ import java.util.Collections;
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.Map ;
import com.fasterxml.jackson.databind.ObjectMapper ;
import org.junit.Before ;
import org.junit.Test ;
import org.reactivestreams.Publisher ;
import reactor.core.publisher.Flux ;
@ -31,6 +32,8 @@ import reactor.test.StepVerifier;
@@ -31,6 +32,8 @@ import reactor.test.StepVerifier;
import org.springframework.core.ResolvableType ;
import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase ;
import org.springframework.core.io.buffer.DataBufferUtils ;
import org.springframework.core.io.buffer.support.DataBufferTestUtils ;
import org.springframework.http.MediaType ;
import org.springframework.http.codec.json.Jackson2JsonEncoder ;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder ;
@ -45,6 +48,7 @@ import static org.springframework.core.ResolvableType.forClass;
@@ -45,6 +48,7 @@ import static org.springframework.core.ResolvableType.forClass;
* @author Sebastien Deleuze
* @author Rossen Stoyanchev
* /
@SuppressWarnings ( "rawtypes" )
public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAllocatingTestCase {
private static final Map < String , Object > HINTS = Collections . emptyMap ( ) ;
@ -52,6 +56,15 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll
@@ -52,6 +56,15 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll
private ServerSentEventHttpMessageWriter messageWriter =
new ServerSentEventHttpMessageWriter ( new Jackson2JsonEncoder ( ) ) ;
private MockServerHttpResponse outputMessage ;
@Before
public void setUp ( ) {
this . outputMessage = new MockServerHttpResponse ( this . bufferFactory ) ;
}
@Test
public void canWrite ( ) {
@ -72,41 +85,28 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll
@@ -72,41 +85,28 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll
. comment ( "bla\nbla bla\nbla bla bla" ) . retry ( Duration . ofMillis ( 123L ) ) . build ( ) ;
Mono < ServerSentEvent > source = Mono . just ( event ) ;
MockServerHttpResponse outputMessage = new MockServerHttpResponse ( ) ;
testWrite ( source , outputMessage , ServerSentEvent . class ) ;
StepVerifier . create ( outputMessage . getBodyAsString ( ) )
. expectNext ( "id:c42\nevent:foo\nretry:123\n:bla\n:bla bla\n:bla bla bla\ndata:bar\n\n" )
StepVerifier . create ( outputMessage . getBody ( ) )
. consumeNextWith ( stringConsumer ( "id:c42\nevent:foo\nretry:123\n:bla\n:bla bla\n:bla bla bla\ndata:" ) )
. consumeNextWith ( stringConsumer ( "bar\n" ) )
. consumeNextWith ( stringConsumer ( "\n" ) )
. expectComplete ( )
. verify ( ) ;
}
@Test
@SuppressWarnings ( "rawtypes" )
public void writeServerSentEventError ( ) {
ServerSentEvent < ? > event = ServerSentEvent . builder ( ) . data ( "bar" ) . id ( "c42" ) . event ( "foo" )
. comment ( "bla\nbla bla\nbla bla bla" ) . retry ( Duration . ofMillis ( 123L ) ) . build ( ) ;
Flux < ServerSentEvent > source = Flux . concat (
Flux . just ( event ) ,
Flux . error ( new RuntimeException ( ) ) ) ;
MockServerHttpResponse outputMessage = new MockServerHttpResponse ( ) ;
Mono < Void > result = this . messageWriter . write ( source , forClass ( ServerSentEvent . class ) ,
MediaType . TEXT_EVENT_STREAM , outputMessage , HINTS ) ;
StepVerifier . create ( result )
. verifyError ( RuntimeException . class ) ;
}
@Test
public void writeString ( ) {
Flux < String > source = Flux . just ( "foo" , "bar" ) ;
MockServerHttpResponse outputMessage = new MockServerHttpResponse ( ) ;
testWrite ( source , outputMessage , String . class ) ;
StepVerifier . create ( outputMessage . getBodyAsString ( ) )
. expectNext ( "data:foo\n\ndata:bar\n\n" )
StepVerifier . create ( outputMessage . getBody ( ) )
. consumeNextWith ( stringConsumer ( "data:" ) )
. consumeNextWith ( stringConsumer ( "foo\n" ) )
. consumeNextWith ( stringConsumer ( "\n" ) )
. consumeNextWith ( stringConsumer ( "data:" ) )
. consumeNextWith ( stringConsumer ( "bar\n" ) )
. consumeNextWith ( stringConsumer ( "\n" ) )
. expectComplete ( )
. verify ( ) ;
}
@ -114,11 +114,15 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll
@@ -114,11 +114,15 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll
@Test
public void writeMultiLineString ( ) {
Flux < String > source = Flux . just ( "foo\nbar" , "foo\nbaz" ) ;
MockServerHttpResponse outputMessage = new MockServerHttpResponse ( ) ;
testWrite ( source , outputMessage , String . class ) ;
StepVerifier . create ( outputMessage . getBodyAsString ( ) )
. expectNext ( "data:foo\ndata:bar\n\ndata:foo\ndata:baz\n\n" )
StepVerifier . create ( outputMessage . getBody ( ) )
. consumeNextWith ( stringConsumer ( "data:" ) )
. consumeNextWith ( stringConsumer ( "foo\ndata:bar\n" ) )
. consumeNextWith ( stringConsumer ( "\n" ) )
. consumeNextWith ( stringConsumer ( "data:" ) )
. consumeNextWith ( stringConsumer ( "foo\ndata:baz\n" ) )
. consumeNextWith ( stringConsumer ( "\n" ) )
. expectComplete ( )
. verify ( ) ;
}
@ -128,22 +132,36 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll
@@ -128,22 +132,36 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll
Flux < String > source = Flux . just ( "\u00A3" ) ;
Charset charset = StandardCharsets . ISO_8859_1 ;
MediaType mediaType = new MediaType ( "text" , "event-stream" , charset ) ;
MockServerHttpResponse outputMessage = new MockServerHttpResponse ( ) ;
testWrite ( source , mediaType , outputMessage , String . class ) ;
assertEquals ( mediaType , outputMessage . getHeaders ( ) . getContentType ( ) ) ;
StepVerifier . create ( outputMessage . getBodyAsString ( ) ) . expectNext ( "data:\u00A3\n\n" ) . verifyComplete ( ) ;
StepVerifier . create ( outputMessage . getBody ( ) )
. consumeNextWith ( stringConsumer ( "data:" ) )
. consumeNextWith ( dataBuffer - > {
String value =
DataBufferTestUtils . dumpString ( dataBuffer , charset ) ;
DataBufferUtils . release ( dataBuffer ) ;
assertEquals ( "\u00A3\n" , value ) ;
} )
. consumeNextWith ( stringConsumer ( "\n" ) )
. expectComplete ( )
. verify ( ) ;
}
@Test
public void writePojo ( ) {
Flux < Pojo > source = Flux . just ( new Pojo ( "foofoo" , "barbar" ) , new Pojo ( "foofoofoo" , "barbarbar" ) ) ;
MockServerHttpResponse outputMessage = new MockServerHttpResponse ( ) ;
testWrite ( source , outputMessage , Pojo . class ) ;
StepVerifier . create ( outputMessage . getBodyAsString ( ) )
. expectNext ( "data:{\"foo\":\"foofoo\",\"bar\":\"barbar\"}\n\n" +
"data:{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}\n\n" )
StepVerifier . create ( outputMessage . getBody ( ) )
. consumeNextWith ( stringConsumer ( "data:" ) )
. consumeNextWith ( stringConsumer ( "{\"foo\":\"foofoo\",\"bar\":\"barbar\"}" ) )
. consumeNextWith ( stringConsumer ( "\n" ) )
. consumeNextWith ( stringConsumer ( "\n" ) )
. consumeNextWith ( stringConsumer ( "data:" ) )
. consumeNextWith ( stringConsumer ( "{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}" ) )
. consumeNextWith ( stringConsumer ( "\n" ) )
. consumeNextWith ( stringConsumer ( "\n" ) )
. expectComplete ( )
. verify ( ) ;
}
@ -154,16 +172,21 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll
@@ -154,16 +172,21 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll
this . messageWriter = new ServerSentEventHttpMessageWriter ( new Jackson2JsonEncoder ( mapper ) ) ;
Flux < Pojo > source = Flux . just ( new Pojo ( "foofoo" , "barbar" ) , new Pojo ( "foofoofoo" , "barbarbar" ) ) ;
MockServerHttpResponse outputMessage = new MockServerHttpResponse ( ) ;
testWrite ( source , outputMessage , Pojo . class ) ;
StepVerifier . create ( outputMessage . getBodyAsString ( ) )
. expectNext ( "data:{\n" +
StepVerifier . create ( outputMessage . getBody ( ) )
. consumeNextWith ( stringConsumer ( "data:" ) )
. consumeNextWith ( stringConsumer ( "{\n" +
"data: \"foo\" : \"foofoo\",\n" +
"data: \"bar\" : \"barbar\"\n" + "data:}\n\n" +
"data:{\n" +
"data: \"bar\" : \"barbar\"\n" + "data:}" ) )
. consumeNextWith ( stringConsumer ( "\n" ) )
. consumeNextWith ( stringConsumer ( "\n" ) )
. consumeNextWith ( stringConsumer ( "data:" ) )
. consumeNextWith ( stringConsumer ( "{\n" +
"data: \"foo\" : \"foofoofoo\",\n" +
"data: \"bar\" : \"barbarbar\"\n" + "data:}\n\n" )
"data: \"bar\" : \"barbarbar\"\n" + "data:}" ) )
. consumeNextWith ( stringConsumer ( "\n" ) )
. consumeNextWith ( stringConsumer ( "\n" ) )
. expectComplete ( )
. verify ( ) ;
}
@ -173,13 +196,35 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll
@@ -173,13 +196,35 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll
Flux < Pojo > source = Flux . just ( new Pojo ( "foo\uD834\uDD1E" , "bar\uD834\uDD1E" ) ) ;
Charset charset = StandardCharsets . UTF_16LE ;
MediaType mediaType = new MediaType ( "text" , "event-stream" , charset ) ;
MockServerHttpResponse outputMessage = new MockServerHttpResponse ( ) ;
testWrite ( source , mediaType , outputMessage , Pojo . class ) ;
assertEquals ( mediaType , outputMessage . getHeaders ( ) . getContentType ( ) ) ;
StepVerifier . create ( outputMessage . getBodyAsString ( ) )
. expectNext ( "data:{\"foo\":\"foo\uD834\uDD1E\",\"bar\":\"bar\uD834\uDD1E\"}\n\n" )
. verifyComplete ( ) ;
StepVerifier . create ( outputMessage . getBody ( ) )
. consumeNextWith ( dataBuffer1 - > {
String value1 =
DataBufferTestUtils . dumpString ( dataBuffer1 , charset ) ;
DataBufferUtils . release ( dataBuffer1 ) ;
assertEquals ( "data:" , value1 ) ;
} )
. consumeNextWith ( dataBuffer - > {
String value = DataBufferTestUtils . dumpString ( dataBuffer , charset ) ;
DataBufferUtils . release ( dataBuffer ) ;
assertEquals ( "{\"foo\":\"foo\uD834\uDD1E\",\"bar\":\"bar\uD834\uDD1E\"}" , value ) ;
} )
. consumeNextWith ( dataBuffer2 - > {
String value2 =
DataBufferTestUtils . dumpString ( dataBuffer2 , charset ) ;
DataBufferUtils . release ( dataBuffer2 ) ;
assertEquals ( "\n" , value2 ) ;
} )
. consumeNextWith ( dataBuffer3 - > {
String value3 =
DataBufferTestUtils . dumpString ( dataBuffer3 , charset ) ;
DataBufferUtils . release ( dataBuffer3 ) ;
assertEquals ( "\n" , value3 ) ;
} )
. expectComplete ( )
. verify ( ) ;
}