@ -16,10 +16,13 @@
@@ -16,10 +16,13 @@
package org.springframework.web.servlet.mvc.method.annotation ;
import java.io.IOException ;
import java.util.ArrayList ;
import java.util.Arrays ;
import java.util.Collections ;
import java.util.List ;
import java.util.Set ;
import java.util.concurrent.atomic.AtomicReference ;
import java.util.stream.Collectors ;
import org.junit.Before ;
import org.junit.Test ;
@ -52,8 +55,8 @@ import org.springframework.web.servlet.HandlerMapping;
@@ -52,8 +55,8 @@ import org.springframework.web.servlet.HandlerMapping;
import static junit.framework.TestCase.assertNull ;
import static org.junit.Assert.assertEquals ;
import static org.junit.Assert.assertFalse ;
import static org.junit.Assert.assertNotNull ;
import static org.junit.Assert.assertTrue ;
import static org.springframework.core.ResolvableType.forClass ;
import static org.springframework.web.method.ResolvableMethod.on ;
/ * *
@ -108,27 +111,27 @@ public class ReactiveTypeHandlerTests {
@@ -108,27 +111,27 @@ public class ReactiveTypeHandlerTests {
// Mono
MonoProcessor < String > mono = MonoProcessor . create ( ) ;
testDeferredResultSubscriber ( mono , Mono . class , ( ) - > mono . onNext ( "foo" ) , "foo" ) ;
testDeferredResultSubscriber ( mono , Mono . class , forClass ( String . class ) , ( ) - > mono . onNext ( "foo" ) , "foo" ) ;
// Mono empty
MonoProcessor < String > monoEmpty = MonoProcessor . create ( ) ;
testDeferredResultSubscriber ( monoEmpty , Mono . class , monoEmpty : : onComplete , null ) ;
testDeferredResultSubscriber ( monoEmpty , Mono . class , forClass ( String . class ) , monoEmpty : : onComplete , null ) ;
// RxJava 1 Single
AtomicReference < SingleEmitter < String > > ref = new AtomicReference < > ( ) ;
Single < String > single = Single . fromEmitter ( ref : : set ) ;
testDeferredResultSubscriber ( single , Single . class , ( ) - > ref . get ( ) . onSuccess ( "foo" ) , "foo" ) ;
testDeferredResultSubscriber ( single , Single . class , forClass ( String . class ) , ( ) - > ref . get ( ) . onSuccess ( "foo" ) , "foo" ) ;
// RxJava 2 Single
AtomicReference < io . reactivex . SingleEmitter < String > > ref2 = new AtomicReference < > ( ) ;
io . reactivex . Single < String > single2 = io . reactivex . Single . create ( ref2 : : set ) ;
testDeferredResultSubscriber ( single2 , io . reactivex . Single . class , ( ) - > ref2 . get ( ) . onSuccess ( "foo" ) , "foo" ) ;
testDeferredResultSubscriber ( single2 , io . reactivex . Single . class , forClass ( String . class ) , ( ) - > ref2 . get ( ) . onSuccess ( "foo" ) , "foo" ) ;
}
@Test
public void deferredResultSubscriberWithNoValues ( ) throws Exception {
MonoProcessor < String > monoEmpty = MonoProcessor . create ( ) ;
testDeferredResultSubscriber ( monoEmpty , Mono . class , monoEmpty : : onComplete , null ) ;
testDeferredResultSubscriber ( monoEmpty , Mono . class , forClass ( String . class ) , monoEmpty : : onComplete , null ) ;
}
@Test
@ -137,13 +140,15 @@ public class ReactiveTypeHandlerTests {
@@ -137,13 +140,15 @@ public class ReactiveTypeHandlerTests {
// JSON must be preferred for Flux<String> -> List<String> or else we stream
this . servletRequest . addHeader ( "Accept" , "application/json" ) ;
EmitterProcessor < String > emitter = EmitterProcessor . create ( ) ;
testDeferredResultSubscriber ( emitter , Flux . class , ( ) - > {
emitter . onNext ( "foo" ) ;
emitter . onNext ( "bar" ) ;
emitter . onNext ( "baz" ) ;
Bar bar1 = new Bar ( "foo" ) ;
Bar bar2 = new Bar ( "bar" ) ;
EmitterProcessor < Bar > emitter = EmitterProcessor . create ( ) ;
testDeferredResultSubscriber ( emitter , Flux . class , forClass ( Bar . class ) , ( ) - > {
emitter . onNext ( bar1 ) ;
emitter . onNext ( bar2 ) ;
emitter . onComplete ( ) ;
} , Arrays . asList ( "foo" , "bar" , "baz" ) ) ;
} , Arrays . asList ( bar1 , bar2 ) ) ;
}
@Test
@ -153,48 +158,17 @@ public class ReactiveTypeHandlerTests {
@@ -153,48 +158,17 @@ public class ReactiveTypeHandlerTests {
// Mono
MonoProcessor < String > mono = MonoProcessor . create ( ) ;
testDeferredResultSubscriber ( mono , Mono . class , ( ) - > mono . onError ( ex ) , ex ) ;
testDeferredResultSubscriber ( mono , Mono . class , forClass ( String . class ) , ( ) - > mono . onError ( ex ) , ex ) ;
// RxJava 1 Single
AtomicReference < SingleEmitter < String > > ref = new AtomicReference < > ( ) ;
Single < String > single = Single . fromEmitter ( ref : : set ) ;
testDeferredResultSubscriber ( single , Single . class , ( ) - > ref . get ( ) . onError ( ex ) , ex ) ;
testDeferredResultSubscriber ( single , Single . class , forClass ( String . class ) , ( ) - > ref . get ( ) . onError ( ex ) , ex ) ;
// RxJava 2 Single
AtomicReference < io . reactivex . SingleEmitter < String > > ref2 = new AtomicReference < > ( ) ;
io . reactivex . Single < String > single2 = io . reactivex . Single . create ( ref2 : : set ) ;
testDeferredResultSubscriber ( single2 , io . reactivex . Single . class , ( ) - > ref2 . get ( ) . onError ( ex ) , ex ) ;
}
@Test
public void jsonArrayOfStrings ( ) throws Exception {
// Empty -> null
testJsonNotPreferred ( "text/plain" ) ;
testJsonNotPreferred ( "text/plain, application/json" ) ;
testJsonNotPreferred ( "text/markdown" ) ;
testJsonNotPreferred ( "foo/bar" ) ;
// Empty -> List[0] when JSON is preferred
testJsonPreferred ( "application/json" ) ;
testJsonPreferred ( "application/foo+json" ) ;
testJsonPreferred ( "application/json, text/plain" ) ;
testJsonPreferred ( "*/*, application/json, text/plain" ) ;
}
private void testJsonNotPreferred ( String acceptHeaderValue ) throws Exception {
resetRequest ( ) ;
this . servletRequest . addHeader ( "Accept" , acceptHeaderValue ) ;
EmitterProcessor < String > processor = EmitterProcessor . create ( ) ;
ResponseBodyEmitter emitter = handleValue ( processor , Flux . class ) ;
assertNotNull ( emitter ) ;
}
private void testJsonPreferred ( String acceptHeaderValue ) throws Exception {
resetRequest ( ) ;
this . servletRequest . addHeader ( "Accept" , acceptHeaderValue ) ;
EmitterProcessor < String > processor = EmitterProcessor . create ( ) ;
testDeferredResultSubscriber ( processor , Flux . class , processor : : onComplete , Collections . emptyList ( ) ) ;
testDeferredResultSubscriber ( single2 , io . reactivex . Single . class , forClass ( String . class ) , ( ) - > ref2 . get ( ) . onError ( ex ) , ex ) ;
}
@Test
@ -211,14 +185,10 @@ public class ReactiveTypeHandlerTests {
@@ -211,14 +185,10 @@ public class ReactiveTypeHandlerTests {
// No media type preferences
testSseResponse ( false ) ;
// Requested media types are sorted
testJsonPreferred ( "text/plain;q=0.8, application/json;q=1.0" ) ;
testJsonNotPreferred ( "text/plain, application/json" ) ;
}
private void testSseResponse ( boolean expectSseEimtter ) throws Exception {
ResponseBodyEmitter emitter = handleValue ( Flux . empty ( ) , Flux . class ) ;
ResponseBodyEmitter emitter = handleValue ( Flux . empty ( ) , Flux . class , forClass ( String . class ) ) ;
assertEquals ( expectSseEimtter , emitter instanceof SseEmitter ) ;
resetRequest ( ) ;
}
@ -228,7 +198,7 @@ public class ReactiveTypeHandlerTests {
@@ -228,7 +198,7 @@ public class ReactiveTypeHandlerTests {
this . servletRequest . addHeader ( "Accept" , "text/event-stream" ) ;
EmitterProcessor < String > processor = EmitterProcessor . create ( ) ;
SseEmitter sseEmitter = ( SseEmitter ) handleValue ( processor , Flux . class ) ;
SseEmitter sseEmitter = ( SseEmitter ) handleValue ( processor , Flux . class , forClass ( String . class ) ) ;
EmitterHandler emitterHandler = new EmitterHandler ( ) ;
sseEmitter . initialize ( emitterHandler ) ;
@ -238,11 +208,11 @@ public class ReactiveTypeHandlerTests {
@@ -238,11 +208,11 @@ public class ReactiveTypeHandlerTests {
processor . onNext ( "baz" ) ;
processor . onComplete ( ) ;
assertEquals ( "data:foo\n\ndata:bar\n\ndata:baz\n\n" , emitterHandler . getOutpu t ( ) ) ;
assertEquals ( "data:foo\n\ndata:bar\n\ndata:baz\n\n" , emitterHandler . getValuesAsTex t ( ) ) ;
}
@Test
public void writeSentEventsWithBuilder ( ) throws Exception {
public void writeServerSe ntEventsWithBuilder ( ) throws Exception {
ResolvableType type = ResolvableType . forClassWithGenerics ( ServerSentEvent . class , String . class ) ;
@ -258,7 +228,7 @@ public class ReactiveTypeHandlerTests {
@@ -258,7 +228,7 @@ public class ReactiveTypeHandlerTests {
processor . onComplete ( ) ;
assertEquals ( "id:1\ndata:foo\n\nid:2\ndata:bar\n\nid:3\ndata:baz\n\n" ,
emitterHandler . getOutpu t ( ) ) ;
emitterHandler . getValuesAsTex t ( ) ) ;
}
@Test
@ -266,8 +236,8 @@ public class ReactiveTypeHandlerTests {
@@ -266,8 +236,8 @@ public class ReactiveTypeHandlerTests {
this . servletRequest . addHeader ( "Accept" , "application/stream+json" ) ;
EmitterProcessor < String > processor = EmitterProcessor . create ( ) ;
ResponseBodyEmitter emitter = handleValue ( processor , Flux . class ) ;
EmitterProcessor < Bar > processor = EmitterProcessor . create ( ) ;
ResponseBodyEmitter emitter = handleValue ( processor , Flux . class , forClass ( Bar . class ) ) ;
EmitterHandler emitterHandler = new EmitterHandler ( ) ;
emitter . initialize ( emitterHandler ) ;
@ -275,19 +245,22 @@ public class ReactiveTypeHandlerTests {
@@ -275,19 +245,22 @@ public class ReactiveTypeHandlerTests {
ServletServerHttpResponse message = new ServletServerHttpResponse ( this . servletResponse ) ;
emitter . extendResponse ( message ) ;
processor . onNext ( "[\"foo\",\"bar\"]" ) ;
processor . onNext ( "[\"bar\",\"baz\"]" ) ;
Bar bar1 = new Bar ( "foo" ) ;
Bar bar2 = new Bar ( "bar" ) ;
processor . onNext ( bar1 ) ;
processor . onNext ( bar2 ) ;
processor . onComplete ( ) ;
assertEquals ( "application/stream+json" , message . getHeaders ( ) . getContentType ( ) . toString ( ) ) ;
assertEquals ( "[\"foo\",\"bar\"]\n[\"bar\",\"baz\"]\n" , emitterHandler . getOutput ( ) ) ;
assertEquals ( Arrays . asList ( bar1 , "\n" , bar2 , "\n" ) , emitterHandler . getValues ( ) ) ;
}
@Test
public void writeText ( ) throws Exception {
EmitterProcessor < String > processor = EmitterProcessor . create ( ) ;
ResponseBodyEmitter emitter = handleValue ( processor , Flux . class ) ;
ResponseBodyEmitter emitter = handleValue ( processor , Flux . class , forClass ( String . class ) ) ;
EmitterHandler emitterHandler = new EmitterHandler ( ) ;
emitter . initialize ( emitterHandler ) ;
@ -297,31 +270,35 @@ public class ReactiveTypeHandlerTests {
@@ -297,31 +270,35 @@ public class ReactiveTypeHandlerTests {
processor . onNext ( "the lazy dog" ) ;
processor . onComplete ( ) ;
assertEquals ( "The quick brown fox jumps over the lazy dog" , emitterHandler . getOutpu t ( ) ) ;
assertEquals ( "The quick brown fox jumps over the lazy dog" , emitterHandler . getValuesAsTex t ( ) ) ;
}
@Test
public void writeTextContentType ( ) throws Exception {
public void writeFluxOfString ( ) throws Exception {
// Default to "text/plain"
testEmitterContentType ( "text/plain" ) ;
// Any requested, concrete, "text" media type
// Same if no concrete media type
this . servletRequest . addHeader ( "Accept" , "text/*" ) ;
testEmitterContentType ( "text/plain" ) ;
// Otherwise pick concrete media type
this . servletRequest . addHeader ( "Accept" , "*/*, text/*, text/markdown" ) ;
testEmitterContentType ( "text/markdown" ) ;
// Or any requested concrete media type
// Any concrete media type
this . servletRequest . addHeader ( "Accept" , "*/*, text/*, foo/bar" ) ;
testEmitterContentType ( "foo/bar" ) ;
// Or default to...
testEmitterContentType ( "text/plain" ) ;
// Or default to if not concrete..
this . servletRequest . addHeader ( "Accept" , "text/*" ) ;
testEmitterContentType ( "text/plain" ) ;
// Including json
this . servletRequest . addHeader ( "Accept" , "*/*, text/*, application/json" ) ;
testEmitterContentType ( "application/json" ) ;
}
private void testEmitterContentType ( String expected ) throws Exception {
ServletServerHttpResponse message = new ServletServerHttpResponse ( this . servletResponse ) ;
ResponseBodyEmitter emitter = handleValue ( Flux . empty ( ) , Flux . class ) ;
ResponseBodyEmitter emitter = handleValue ( Flux . empty ( ) , Flux . class , forClass ( String . class ) ) ;
emitter . extendResponse ( message ) ;
assertEquals ( expected , message . getHeaders ( ) . getContentType ( ) . toString ( ) ) ;
resetRequest ( ) ;
@ -329,9 +306,9 @@ public class ReactiveTypeHandlerTests {
@@ -329,9 +306,9 @@ public class ReactiveTypeHandlerTests {
private void testDeferredResultSubscriber ( Object returnValue , Class < ? > asyncType ,
Runnable produceTask , Object expected ) throws Exception {
ResolvableType elementType , R unnable produceTask , Object expected ) throws Exception {
ResponseBodyEmitter emitter = handleValue ( returnValue , asyncType ) ;
ResponseBodyEmitter emitter = handleValue ( returnValue , asyncType , elementType ) ;
assertNull ( emitter ) ;
assertTrue ( this . servletRequest . isAsyncStarted ( ) ) ;
@ -345,10 +322,6 @@ public class ReactiveTypeHandlerTests {
@@ -345,10 +322,6 @@ public class ReactiveTypeHandlerTests {
resetRequest ( ) ;
}
private ResponseBodyEmitter handleValue ( Object returnValue , Class < ? > asyncType ) throws Exception {
return handleValue ( returnValue , asyncType , ResolvableType . forClass ( String . class ) ) ;
}
private ResponseBodyEmitter handleValue ( Object returnValue , Class < ? > asyncType ,
ResolvableType genericType ) throws Exception {
@ -369,7 +342,9 @@ public class ReactiveTypeHandlerTests {
@@ -369,7 +342,9 @@ public class ReactiveTypeHandlerTests {
io . reactivex . Single < String > handleSingleRxJava2 ( ) { return null ; }
Flux < String > handleFlux ( ) { return null ; }
Flux < Bar > handleFlux ( ) { return null ; }
Flux < String > handleFluxString ( ) { return null ; }
Flux < ServerSentEvent < String > > handleFluxSseEventBuilder ( ) { return null ; }
}
@ -377,16 +352,20 @@ public class ReactiveTypeHandlerTests {
@@ -377,16 +352,20 @@ public class ReactiveTypeHandlerTests {
private static class EmitterHandler implements ResponseBodyEmitter . Handler {
private final StringBuilder stringBuilder = new StringBuilder ( ) ;
private final List < Object > values = new ArrayList < > ( ) ;
public String getOutput ( ) {
return this . stringBuilder . toString ( ) ;
public List < ? > getValues ( ) {
return this . values ;
}
public String getValuesAsText ( ) {
return this . values . stream ( ) . map ( Object : : toString ) . collect ( Collectors . joining ( ) ) ;
}
@Override
public void send ( Object data , MediaType mediaType ) throws IOException {
this . stringBuilder . appen d( data ) ;
this . values . ad d( data ) ;
}
@Override
@ -406,4 +385,16 @@ public class ReactiveTypeHandlerTests {
@@ -406,4 +385,16 @@ public class ReactiveTypeHandlerTests {
}
}
private static class Bar {
private final String value ;
public Bar ( String value ) {
this . value = value ;
}
public String getValue ( ) {
return this . value ;
}
}
}