diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandler.java index 5cd1d888da0..54cec200e3b 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandler.java @@ -24,9 +24,12 @@ import java.util.Collections; import java.util.Comparator; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.function.Predicate; +import reactor.core.publisher.Mono; + import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.ConfigurableApplicationContext; @@ -39,6 +42,7 @@ import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.handler.CompositeMessageCondition; import org.springframework.messaging.handler.DestinationPatternsMessageCondition; +import org.springframework.messaging.handler.HandlerMethod; import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.messaging.handler.annotation.support.AnnotationExceptionHandlerMethodResolver; import org.springframework.messaging.handler.invocation.AbstractExceptionHandlerMethodResolver; @@ -46,9 +50,11 @@ import org.springframework.messaging.handler.invocation.reactive.AbstractEncoder import org.springframework.messaging.handler.invocation.reactive.AbstractMethodMessageHandler; import org.springframework.messaging.handler.invocation.reactive.HandlerMethodArgumentResolver; import org.springframework.messaging.handler.invocation.reactive.HandlerMethodReturnValueHandler; +import org.springframework.messaging.support.MessageHeaderAccessor; import org.springframework.stereotype.Controller; import org.springframework.util.AntPathMatcher; import org.springframework.util.Assert; +import org.springframework.util.CollectionUtils; import org.springframework.util.PathMatcher; import org.springframework.util.StringValueResolver; import org.springframework.validation.Validator; @@ -311,4 +317,19 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler handleMatch(CompositeMessageCondition mapping, HandlerMethod handlerMethod, Message message) { + Set patterns = mapping.getCondition(DestinationPatternsMessageCondition.class).getPatterns(); + if (!CollectionUtils.isEmpty(patterns)) { + String pattern = patterns.iterator().next(); + String destination = getDestination(message); + Map vars = getPathMatcher().extractUriTemplateVariables(pattern, destination); + if (!CollectionUtils.isEmpty(vars)) { + MessageHeaderAccessor mha = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class); + Assert.state(mha != null && mha.isMutable(), "Mutable MessageHeaderAccessor required"); + mha.setHeader(DestinationVariableMethodArgumentResolver.DESTINATION_TEMPLATE_VARIABLES_HEADER, vars); + } + } + return super.handleMatch(mapping, handlerMethod, message); + } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/AbstractMethodMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/AbstractMethodMessageHandler.java index 9c95eeec03e..948384589ca 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/AbstractMethodMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/AbstractMethodMessageHandler.java @@ -381,7 +381,11 @@ public abstract class AbstractMethodMessageHandler // handleNoMatch would have been invoked already return Mono.empty(); } - HandlerMethod handlerMethod = match.getHandlerMethod().createWithResolvedBean(); + return handleMatch(match.mapping, match.handlerMethod, message); + } + + protected Mono handleMatch(T mapping, HandlerMethod handlerMethod, Message message) { + handlerMethod = handlerMethod.createWithResolvedBean(); return this.invocableHelper.handleMessage(handlerMethod, message); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingRSocket.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingRSocket.java index 8ae7fd14a47..604ce471780 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingRSocket.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingRSocket.java @@ -180,6 +180,7 @@ class MessagingRSocket extends AbstractRSocket { private MessageHeaders createHeaders(String destination, @Nullable MonoProcessor replyMono) { MessageHeaderAccessor headers = new MessageHeaderAccessor(); + headers.setLeaveMutable(true); headers.setHeader(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, destination); if (this.dataMimeType != null) { headers.setContentType(this.dataMimeType); diff --git a/spring-messaging/src/test/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandlerTests.java index 35ca0276739..5496e87eeb2 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandlerTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandlerTests.java @@ -39,10 +39,13 @@ import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.messaging.Message; import org.springframework.messaging.handler.DestinationPatternsMessageCondition; +import org.springframework.messaging.handler.annotation.DestinationVariable; import org.springframework.messaging.handler.annotation.MessageExceptionHandler; import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.messaging.handler.invocation.reactive.TestEncoderMethodReturnValueHandler; import org.springframework.messaging.support.GenericMessage; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.messaging.support.MessageHeaderAccessor; import org.springframework.stereotype.Controller; import static java.nio.charset.StandardCharsets.*; @@ -89,6 +92,13 @@ public class MessageMappingMessageHandlerTests { verifyOutputContent(Collections.singletonList("abcdef::response")); } + @Test + public void handleWithDestinationVariable() { + MessageMappingMessageHandler messsageHandler = initMesssageHandler(); + messsageHandler.handleMessage(message("destination.test", "abcdef")).block(Duration.ofSeconds(5)); + verifyOutputContent(Collections.singletonList("test::abcdef::response")); + } + @Test public void handleException() { MessageMappingMessageHandler messsageHandler = initMesssageHandler(); @@ -143,9 +153,11 @@ public class MessageMappingMessageHandlerTests { } private Message message(String destination, String... content) { - return new GenericMessage<>( - Flux.fromIterable(Arrays.asList(content)).map(payload -> toDataBuffer(payload)), - Collections.singletonMap(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, destination)); + Flux payload = Flux.fromIterable(Arrays.asList(content)).map(parts -> toDataBuffer(parts)); + MessageHeaderAccessor headers = new MessageHeaderAccessor(); + headers.setLeaveMutable(true); + headers.setHeader(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, destination); + return MessageBuilder.createMessage(payload, headers.getMessageHeaders()); } private DataBuffer toDataBuffer(String payload) { @@ -181,6 +193,11 @@ public class MessageMappingMessageHandlerTests { return payload + "::response"; } + @MessageMapping("destination.{variable}") + String handleWithDestinationVariable(@DestinationVariable String variable, String payload) { + return variable + "::" + payload + "::response"; + } + @MessageMapping("exception") String handleAndThrow() { throw new IllegalArgumentException("rejected");