@ -19,15 +19,14 @@ import java.time.Duration;
@@ -19,15 +19,14 @@ import java.time.Duration;
import java.util.Arrays ;
import java.util.Collections ;
import java.util.List ;
import java.util.stream.Collectors ;
import org.junit.Test ;
import reactor.core.publisher.Flux ;
import reactor.core.publisher.Mono ;
import reactor.test.StepVerifier ;
import org.springframework.beans.factory.config.EmbeddedValueResolver ;
import org.springframework.context.support.StaticApplicationContext ;
import org.springframework.core.MethodParameter ;
import org.springframework.core.ReactiveAdapterRegistry ;
import org.springframework.core.codec.CharSequenceEncoder ;
import org.springframework.core.codec.Decoder ;
@ -38,18 +37,18 @@ import org.springframework.core.env.PropertySource;
@@ -38,18 +37,18 @@ import org.springframework.core.env.PropertySource;
import org.springframework.core.io.buffer.DataBuffer ;
import org.springframework.core.io.buffer.DataBufferFactory ;
import org.springframework.core.io.buffer.DefaultDataBufferFactory ;
import org.springframework.lang.Nullable ;
import org.springframework.messaging.Message ;
import org.springframework.messaging.ReactiveSubscribableChannel ;
import org.springframework.messaging.handler.DestinationPatternsMessageCondition ;
import org.springframework.messaging.handler.annotation.MessageExceptionHandler ;
import org.springframework.messaging.handler.annotation.MessageMapping ;
import org.springframework.messaging.handler.invocation.reactive.Abstrac tEncoderMethodReturnValueHandler ;
import org.springframework.messaging.handler.invocation.reactive.Tes tEncoderMethodReturnValueHandler ;
import org.springframework.messaging.support.GenericMessage ;
import org.springframework.stereotype.Controller ;
import static java.nio.charset.StandardCharsets.* ;
import static org.junit.Assert.* ;
import static org.springframework.core.io.buffer.support.DataBufferTestUtils .* ;
import static org.mockito.Mockito .* ;
/ * *
* Unit tests for { @link MessageMappingMessageHandler } .
@ -61,51 +60,64 @@ public class MessageMappingMessageHandlerTests {
@@ -61,51 +60,64 @@ public class MessageMappingMessageHandlerTests {
private static final DataBufferFactory bufferFactory = new DefaultDataBufferFactory ( ) ;
private TestEncoderReturnValueHandler returnValueHandler ;
private TestEncoderMethod ReturnValueHandler returnValueHandler ;
@Test
public void handleString ( ) {
MessageMappingMessageHandler messsageHandler = initMesssageHandler ( ) ;
messsageHandler . handleMessage ( message ( "/ string" , "abcdef" ) ) . block ( Duration . ofSeconds ( 5 ) ) ;
messsageHandler . handleMessage ( message ( "string" , "abcdef" ) ) . block ( Duration . ofSeconds ( 5 ) ) ;
verifyOutputContent ( Collections . singletonList ( "abcdef::response" ) ) ;
}
@Test
public void handleMonoString ( ) {
MessageMappingMessageHandler messsageHandler = initMesssageHandler ( ) ;
messsageHandler . handleMessage ( message ( "/ monoString" , "abcdef" ) ) . block ( Duration . ofSeconds ( 5 ) ) ;
messsageHandler . handleMessage ( message ( "monoString" , "abcdef" ) ) . block ( Duration . ofSeconds ( 5 ) ) ;
verifyOutputContent ( Collections . singletonList ( "abcdef::response" ) ) ;
}
@Test
public void handleFluxString ( ) {
MessageMappingMessageHandler messsageHandler = initMesssageHandler ( ) ;
messsageHandler . handleMessage ( message ( "/ fluxString" , "abc\ndef\nghi" ) ) . block ( Duration . ofSeconds ( 5 ) ) ;
messsageHandler . handleMessage ( message ( "fluxString" , "abc\ndef\nghi" ) ) . block ( Duration . ofSeconds ( 5 ) ) ;
verifyOutputContent ( Arrays . asList ( "abc::response" , "def::response" , "ghi::response" ) ) ;
}
@Test
public void handleWithPlaceholderInMapping ( ) {
MessageMappingMessageHandler messsageHandler = initMesssageHandler ( ) ;
messsageHandler . handleMessage ( message ( "/ path123" , "abcdef" ) ) . block ( Duration . ofSeconds ( 5 ) ) ;
messsageHandler . handleMessage ( message ( "path123" , "abcdef" ) ) . block ( Duration . ofSeconds ( 5 ) ) ;
verifyOutputContent ( Collections . singletonList ( "abcdef::response" ) ) ;
}
@Test
public void handleException ( ) {
MessageMappingMessageHandler messsageHandler = initMesssageHandler ( ) ;
messsageHandler . handleMessage ( message ( "/ exception" , "abc" ) ) . block ( Duration . ofSeconds ( 5 ) ) ;
messsageHandler . handleMessage ( message ( "exception" , "abc" ) ) . block ( Duration . ofSeconds ( 5 ) ) ;
verifyOutputContent ( Collections . singletonList ( "rejected::handled" ) ) ;
}
@Test
public void handleErrorSignal ( ) {
MessageMappingMessageHandler messsageHandler = initMesssageHandler ( ) ;
messsageHandler . handleMessage ( message ( "/ errorSignal" , "abc" ) ) . block ( Duration . ofSeconds ( 5 ) ) ;
messsageHandler . handleMessage ( message ( "errorSignal" , "abc" ) ) . block ( Duration . ofSeconds ( 5 ) ) ;
verifyOutputContent ( Collections . singletonList ( "rejected::handled" ) ) ;
}
@Test
public void unhandledExceptionShouldFlowThrough ( ) {
GenericMessage < ? > message = new GenericMessage < > ( new Object ( ) ,
Collections . singletonMap ( DestinationPatternsMessageCondition . LOOKUP_DESTINATION_HEADER , "string" ) ) ;
StepVerifier . create ( initMesssageHandler ( ) . handleMessage ( message ) )
. expectErrorSatisfies ( ex - > assertTrue (
"Actual: " + ex . getMessage ( ) ,
ex . getMessage ( ) . startsWith ( "Could not resolve method parameter at index 0" ) ) )
. verify ( Duration . ofSeconds ( 5 ) ) ;
}
private MessageMappingMessageHandler initMesssageHandler ( ) {
@ -113,7 +125,7 @@ public class MessageMappingMessageHandlerTests {
@@ -113,7 +125,7 @@ public class MessageMappingMessageHandlerTests {
List < Encoder < ? > > encoders = Collections . singletonList ( CharSequenceEncoder . allMimeTypes ( ) ) ;
ReactiveAdapterRegistry registry = ReactiveAdapterRegistry . getSharedInstance ( ) ;
this . returnValueHandler = new TestEncoderReturnValueHandler ( encoders , registry ) ;
this . returnValueHandler = new TestEncoderMethod ReturnValueHandler ( encoders , registry ) ;
PropertySource < ? > source = new MapPropertySource ( "test" , Collections . singletonMap ( "path" , "path123" ) ) ;
@ -122,11 +134,13 @@ public class MessageMappingMessageHandlerTests {
@@ -122,11 +134,13 @@ public class MessageMappingMessageHandlerTests {
context . registerSingleton ( "testController" , TestController . class ) ;
context . refresh ( ) ;
MessageMappingMessageHandler messageHandler = new MessageMappingMessageHandler ( ) ;
ReactiveSubscribableChannel channel = mock ( ReactiveSubscribableChannel . class ) ;
MessageMappingMessageHandler messageHandler = new MessageMappingMessageHandler ( channel ) ;
messageHandler . getReturnValueHandlerConfigurer ( ) . addCustomHandler ( this . returnValueHandler ) ;
messageHandler . setApplicationContext ( context ) ;
messageHandler . setEmbeddedValueResolver ( new EmbeddedValueResolver ( context . getBeanFactory ( ) ) ) ;
messageHandler . setDecoders ( decoders ) ;
messageHandler . setEncoderReturnValueHandler ( this . returnValueHandler ) ;
messageHandler . afterPropertiesSet ( ) ;
return messageHandler ;
@ -134,7 +148,7 @@ public class MessageMappingMessageHandlerTests {
@@ -134,7 +148,7 @@ public class MessageMappingMessageHandlerTests {
private Message < ? > message ( String destination , String . . . content ) {
return new GenericMessage < > (
Flux . fromIterable ( Arrays . stream ( content ) . map ( this : : toDataBuffer ) . collect ( Collectors . toList ( ) ) ) ,
Flux . fromIterable ( Arrays . asLi st( content ) ) . map ( payload - > toDataBuffer ( payload ) ) ,
Collections . singletonMap ( DestinationPatternsMessageCondition . LOOKUP_DESTINATION_HEADER , destination ) ) ;
}
@ -143,42 +157,40 @@ public class MessageMappingMessageHandlerTests {
@@ -143,42 +157,40 @@ public class MessageMappingMessageHandlerTests {
}
private void verifyOutputContent ( List < String > expected ) {
List < DataBuffer > buffers = this . returnValueHandler . getOutputContent ( ) ;
assertNotNull ( "No output: no matching handler method?" , buffers ) ;
List < String > actual = buffers . stream ( ) . map ( buffer - > dumpString ( buffer , UTF_8 ) ) . collect ( Collectors . toList ( ) ) ;
assertEquals ( expected , actual ) ;
Flux < String > result = this . returnValueHandler . getContentAsStrings ( ) ;
StepVerifier . create ( result . collectList ( ) ) . expectNext ( expected ) . verifyComplete ( ) ;
}
@Controller
static class TestController {
@MessageMapping ( "/ string" )
@MessageMapping ( "string" )
String handleString ( String payload ) {
return payload + "::response" ;
}
@MessageMapping ( "/ monoString" )
@MessageMapping ( "monoString" )
Mono < String > handleMonoString ( Mono < String > payload ) {
return payload . map ( s - > s + "::response" ) . delayElement ( Duration . ofMillis ( 10 ) ) ;
}
@MessageMapping ( "/ fluxString" )
@MessageMapping ( "fluxString" )
Flux < String > handleFluxString ( Flux < String > payload ) {
return payload . map ( s - > s + "::response" ) . delayElements ( Duration . ofMillis ( 10 ) ) ;
}
@MessageMapping ( "/ ${path}" )
@MessageMapping ( "${path}" )
String handleWithPlaceholder ( String payload ) {
return payload + "::response" ;
}
@MessageMapping ( "/ exception" )
@MessageMapping ( "exception" )
String handleAndThrow ( ) {
throw new IllegalArgumentException ( "rejected" ) ;
}
@MessageMapping ( "/ errorSignal" )
@MessageMapping ( "errorSignal" )
Mono < String > handleAndSignalError ( ) {
return Mono . delay ( Duration . ofMillis ( 10 ) )
. flatMap ( aLong - > Mono . error ( new IllegalArgumentException ( "rejected" ) ) ) ;
@ -190,29 +202,4 @@ public class MessageMappingMessageHandlerTests {
@@ -190,29 +202,4 @@ public class MessageMappingMessageHandlerTests {
}
}
private static class TestEncoderReturnValueHandler extends AbstractEncoderMethodReturnValueHandler {
@Nullable
private volatile List < DataBuffer > outputContent ;
TestEncoderReturnValueHandler ( List < Encoder < ? > > encoders , ReactiveAdapterRegistry registry ) {
super ( encoders , registry ) ;
}
@Nullable
public List < DataBuffer > getOutputContent ( ) {
return this . outputContent ;
}
@Override
protected Mono < Void > handleEncodedContent (
Flux < DataBuffer > encodedContent , MethodParameter returnType , Message < ? > message ) {
return encodedContent . collectList ( ) . doOnNext ( buffers - > this . outputContent = buffers ) . then ( ) ;
}
}
}