diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractBrokerMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractBrokerMessageHandler.java index bd10b604f8e..1babc663ebf 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractBrokerMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractBrokerMessageHandler.java @@ -19,6 +19,7 @@ package org.springframework.messaging.simp.broker; import java.util.Collection; import java.util.Collections; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Predicate; import org.apache.commons.logging.Log; @@ -58,6 +59,9 @@ public abstract class AbstractBrokerMessageHandler private final Collection destinationPrefixes; + @Nullable + private Predicate userDestinationPredicate; + private boolean preservePublishOrder = false; @Nullable @@ -135,6 +139,21 @@ public abstract class AbstractBrokerMessageHandler return this.destinationPrefixes; } + /** + * Configure a Predicate to identify messages with a user destination. When + * no {@link #getDestinationPrefixes() destination prefixes} are configured, + * this helps to recognize and skip user destination messages that need to + * be pre-processed by the + * {@link org.springframework.messaging.simp.user.UserDestinationMessageHandler} + * before they reach the broker. + * @param predicate the predicate to identify user messages with a non-null + * destination as messages with a user destinations. + * @since 5.3.4 + */ + public void setUserDestinationPredicate(@Nullable Predicate predicate) { + this.userDestinationPredicate = predicate; + } + /** * Whether the client must receive messages in the order of publication. *

By default messages sent to the {@code "clientOutboundChannel"} may @@ -265,10 +284,27 @@ public abstract class AbstractBrokerMessageHandler protected abstract void handleMessageInternal(Message message); + /** + * Whether a message with the given destination should be processed. This is + * the case if one of the following conditions is true: + *

    + *
  1. The destination starts with one of the configured + * {@link #getDestinationPrefixes() destination prefixes}. + *
  2. No prefixes are configured and the destination isn't matched + * by the {@link #setUserDestinationPredicate(Predicate) + * userDestinationPredicate}. + *
  3. The message has no destination. + *
+ * @param destination the destination to check + * @return whether to process (true) or skip (false) the destination + */ protected boolean checkDestinationPrefix(@Nullable String destination) { - if (destination == null || CollectionUtils.isEmpty(this.destinationPrefixes)) { + if (destination == null) { return true; } + if (CollectionUtils.isEmpty(this.destinationPrefixes)) { + return !isUserDestination(destination); + } for (String prefix : this.destinationPrefixes) { if (destination.startsWith(prefix)) { return true; @@ -277,6 +313,10 @@ public abstract class AbstractBrokerMessageHandler return false; } + private boolean isUserDestination(String destination) { + return (this.userDestinationPredicate != null && this.userDestinationPredicate.test(destination)); + } + protected void publishBrokerAvailableEvent() { boolean shouldPublish = this.brokerAvailable.compareAndSet(false, true); if (this.eventPublisher != null && shouldPublish) { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java index 575a6dbc2d6..df9b5320fb1 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java @@ -64,6 +64,7 @@ import org.springframework.util.Assert; import org.springframework.util.ClassUtils; import org.springframework.util.MimeTypeUtils; import org.springframework.util.PathMatcher; +import org.springframework.util.StringUtils; import org.springframework.validation.Errors; import org.springframework.validation.Validator; @@ -346,15 +347,21 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC if (handler == null) { return null; } - updateUserDestinationResolver(handler, userDestinationResolver); + updateUserDestinationResolver(handler, userDestinationResolver, registry.getUserDestinationPrefix()); return handler; } - private void updateUserDestinationResolver(AbstractBrokerMessageHandler handler, UserDestinationResolver userDestinationResolver) { + private void updateUserDestinationResolver( + AbstractBrokerMessageHandler handler, UserDestinationResolver userDestinationResolver, + @Nullable String userDestinationPrefix) { + Collection prefixes = handler.getDestinationPrefixes(); if (!prefixes.isEmpty() && !prefixes.iterator().next().startsWith("/")) { ((DefaultUserDestinationResolver) userDestinationResolver).setRemoveLeadingSlash(true); } + if (StringUtils.hasText(userDestinationPrefix)) { + handler.setUserDestinationPredicate(destination -> destination.startsWith(userDestinationPrefix)); + } } @Bean @@ -379,7 +386,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC subscriptions.put(destination, userRegistryMessageHandler); } handler.setSystemSubscriptions(subscriptions); - updateUserDestinationResolver(handler, userDestinationResolver); + updateUserDestinationResolver(handler, userDestinationResolver, registry.getUserDestinationPrefix()); return handler; } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/BrokerMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/BrokerMessageHandlerTests.java index 90183650c8a..16746e25f4a 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/BrokerMessageHandlerTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/BrokerMessageHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,7 +28,10 @@ import org.springframework.context.ApplicationEventPublisher; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; +import org.springframework.messaging.simp.SimpMessageHeaderAccessor; +import org.springframework.messaging.simp.SimpMessageType; import org.springframework.messaging.support.GenericMessage; +import org.springframework.messaging.support.MessageBuilder; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; @@ -74,28 +77,25 @@ public class BrokerMessageHandlerTests { @Test public void publishBrokerAvailableEvent() { - assertThat(this.handler.isBrokerAvailable()).isFalse(); assertThat(this.handler.availabilityEvents).isEqualTo(Collections.emptyList()); this.handler.publishBrokerAvailableEvent(); assertThat(this.handler.isBrokerAvailable()).isTrue(); - assertThat(this.handler.availabilityEvents).isEqualTo(Arrays.asList(true)); + assertThat(this.handler.availabilityEvents).isEqualTo(Collections.singletonList(true)); } @Test public void publishBrokerAvailableEventWhenAlreadyAvailable() { - this.handler.publishBrokerAvailableEvent(); this.handler.publishBrokerAvailableEvent(); - assertThat(this.handler.availabilityEvents).isEqualTo(Arrays.asList(true)); + assertThat(this.handler.availabilityEvents).isEqualTo(Collections.singletonList(true)); } @Test public void publishBrokerUnavailableEvent() { - this.handler.publishBrokerAvailableEvent(); assertThat(this.handler.isBrokerAvailable()).isTrue(); @@ -107,7 +107,6 @@ public class BrokerMessageHandlerTests { @Test public void publishBrokerUnavailableEventWhenAlreadyUnavailable() { - this.handler.publishBrokerAvailableEvent(); this.handler.publishBrokerUnavailableEvent(); this.handler.publishBrokerUnavailableEvent(); @@ -115,6 +114,47 @@ public class BrokerMessageHandlerTests { assertThat(this.handler.availabilityEvents).isEqualTo(Arrays.asList(true, false)); } + @Test + public void checkDestination() { + TestBrokerMessageHandler theHandler = new TestBrokerMessageHandler("/topic"); + theHandler.start(); + + SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE); + accessor.setLeaveMutable(true); + + accessor.setDestination("/topic/foo"); + theHandler.handleMessage(MessageBuilder.createMessage("p", accessor.toMessageHeaders())); + + accessor.setDestination("/app/foo"); + theHandler.handleMessage(MessageBuilder.createMessage("p", accessor.toMessageHeaders())); + + accessor.setDestination(null); + theHandler.handleMessage(MessageBuilder.createMessage("p", accessor.toMessageHeaders())); + + List> list = theHandler.messages; + assertThat(list).hasSize(2); + assertThat(list.get(0).getHeaders().get(SimpMessageHeaderAccessor.DESTINATION_HEADER)).isEqualTo("/topic/foo"); + assertThat(list.get(1).getHeaders().get(SimpMessageHeaderAccessor.DESTINATION_HEADER)).isNull(); + } + + @Test + public void checkDestinationWithoutConfiguredPrefixes() { + this.handler.setUserDestinationPredicate(destination -> destination.startsWith("/user/")); + this.handler.start(); + + SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE); + accessor.setLeaveMutable(true); + + accessor.setDestination("/user/1/foo"); + this.handler.handleMessage(MessageBuilder.createMessage("p", accessor.toMessageHeaders())); + + accessor.setDestination("/foo"); + this.handler.handleMessage(MessageBuilder.createMessage("p", accessor.toMessageHeaders())); + + List> list = this.handler.messages; + assertThat(list).hasSize(1); + assertThat(list.get(0).getHeaders().get(SimpMessageHeaderAccessor.DESTINATION_HEADER)).isEqualTo("/foo"); + } private static class TestBrokerMessageHandler extends AbstractBrokerMessageHandler implements ApplicationEventPublisher { @@ -124,14 +164,19 @@ public class BrokerMessageHandlerTests { private final List availabilityEvents = new ArrayList<>(); - private TestBrokerMessageHandler() { - super(mock(SubscribableChannel.class), mock(MessageChannel.class), mock(SubscribableChannel.class)); + TestBrokerMessageHandler(String... destinationPrefixes) { + super(mock(SubscribableChannel.class), mock(MessageChannel.class), + mock(SubscribableChannel.class), Arrays.asList(destinationPrefixes)); + setApplicationEventPublisher(this); } @Override protected void handleMessageInternal(Message message) { - this.messages.add(message); + String destination = (String) message.getHeaders().get(SimpMessageHeaderAccessor.DESTINATION_HEADER); + if (checkDestinationPrefix(destination)) { + this.messages.add(message); + } } @Override