diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/HandlerMethod.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/HandlerMethod.java
index 0be63540cd3..30230a2c301 100644
--- a/spring-messaging/src/main/java/org/springframework/messaging/handler/HandlerMethod.java
+++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/HandlerMethod.java
@@ -298,7 +298,7 @@ public class HandlerMethod {
*/
public String getShortLogMessage() {
int args = this.method.getParameterCount();
- return getBeanType().getName() + "#" + this.method.getName() + "[" + args + " args]";
+ return getBeanType().getSimpleName() + "#" + this.method.getName() + "[" + args + " args]";
}
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandler.java
index 25463e70f33..68518837980 100644
--- a/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandler.java
+++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandler.java
@@ -19,21 +19,25 @@ import java.lang.reflect.AnnotatedElement;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
+import java.util.function.Predicate;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.EmbeddedValueResolverAware;
+import org.springframework.context.SmartLifecycle;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.codec.Decoder;
import org.springframework.core.convert.ConversionService;
import org.springframework.format.support.DefaultFormattingConversionService;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
+import org.springframework.messaging.ReactiveSubscribableChannel;
import org.springframework.messaging.handler.CompositeMessageCondition;
import org.springframework.messaging.handler.DestinationPatternsMessageCondition;
import org.springframework.messaging.handler.annotation.MessageMapping;
@@ -51,58 +55,57 @@ import org.springframework.util.StringValueResolver;
import org.springframework.validation.Validator;
/**
- * Extension of {@link AbstractMethodMessageHandler} for
- * {@link MessageMapping @MessageMapping} methods.
+ * Extension of {@link AbstractMethodMessageHandler} for reactive, non-blocking
+ * handling of messages via {@link MessageMapping @MessageMapping} methods.
+ * By default such methods are detected in {@code @Controller} Spring beans but
+ * that can be changed via {@link #setHandlerPredicate(Predicate)}.
*
- *
The payload of incoming messages is decoded through
- * {@link PayloadMethodArgumentResolver} using one of the configured
- * {@link #setDecoders(List)} decoders.
+ *
Payloads for incoming messages are decoded through the configured
+ * {@link #setDecoders(List)} decoders, with the help of
+ * {@link PayloadMethodArgumentResolver}.
*
- *
The {@link #setEncoderReturnValueHandler encoderReturnValueHandler}
- * property must be set to encode and handle return values from
- * {@code @MessageMapping} methods.
+ *
There is no default handling for return values but
+ * {@link #setReturnValueHandlerConfigurer} can be used to configure custom
+ * return value handlers. Sub-classes may also override
+ * {@link #initReturnValueHandlers()} to set up default return value handlers.
*
* @author Rossen Stoyanchev
* @since 5.2
+ * @see AbstractEncoderMethodReturnValueHandler
*/
public class MessageMappingMessageHandler extends AbstractMethodMessageHandler
- implements EmbeddedValueResolverAware {
+ implements SmartLifecycle, EmbeddedValueResolverAware {
- private PathMatcher pathMatcher = new AntPathMatcher();
+ private final ReactiveSubscribableChannel inboundChannel;
private final List> decoders = new ArrayList<>();
@Nullable
private Validator validator;
- @Nullable
- private HandlerMethodReturnValueHandler encoderReturnValueHandler;
+ private PathMatcher pathMatcher;
private ConversionService conversionService = new DefaultFormattingConversionService();
@Nullable
private StringValueResolver valueResolver;
+ private volatile boolean running = false;
- /**
- * Set the PathMatcher implementation to use for matching destinations
- * against configured destination patterns.
- * By default, {@link AntPathMatcher} is used.
- */
- public void setPathMatcher(PathMatcher pathMatcher) {
- Assert.notNull(pathMatcher, "PathMatcher must not be null");
- this.pathMatcher = pathMatcher;
- }
+ private final Object lifecycleMonitor = new Object();
- /**
- * Return the PathMatcher implementation to use for matching destinations.
- */
- public PathMatcher getPathMatcher() {
- return this.pathMatcher;
+
+ public MessageMappingMessageHandler(ReactiveSubscribableChannel inboundChannel) {
+ Assert.notNull(inboundChannel, "`inboundChannel` is required");
+ this.inboundChannel = inboundChannel;
+ this.pathMatcher = new AntPathMatcher();
+ ((AntPathMatcher) this.pathMatcher).setPathSeparator(".");
+ setHandlerPredicate(beanType -> AnnotatedElementUtils.hasAnnotation(beanType, Controller.class));
}
+
/**
- * Configure the decoders to user for incoming payloads.
+ * Configure the decoders to use for incoming payloads.
*/
public void setDecoders(List extends Decoder>> decoders) {
this.decoders.addAll(decoders);
@@ -115,14 +118,6 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandlerBy default this is not configured in which case payload/content return
- * values from {@code @MessageMapping} methods will remain unhandled.
- * @param encoderReturnValueHandler the return value handler to use
- * @see AbstractEncoderMethodReturnValueHandler
+ * Return the configured Validator instance.
*/
- public void setEncoderReturnValueHandler(@Nullable HandlerMethodReturnValueHandler encoderReturnValueHandler) {
- this.encoderReturnValueHandler = encoderReturnValueHandler;
+ @Nullable
+ public Validator getValidator() {
+ return this.validator;
}
/**
- * Return the configured
- * {@link #setEncoderReturnValueHandler encoderReturnValueHandler}.
+ * Set the PathMatcher implementation to use for matching destinations
+ * against configured destination patterns.
+ * By default, {@link AntPathMatcher} is used with separator set to ".".
*/
- @Nullable
- public HandlerMethodReturnValueHandler getEncoderReturnValueHandler() {
- return this.encoderReturnValueHandler;
+ public void setPathMatcher(PathMatcher pathMatcher) {
+ Assert.notNull(pathMatcher, "PathMatcher must not be null");
+ this.pathMatcher = pathMatcher;
+ }
+
+ /**
+ * Return the PathMatcher implementation to use for matching destinations.
+ */
+ public PathMatcher getPathMatcher() {
+ return this.pathMatcher;
}
/**
@@ -204,20 +200,40 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler initReturnValueHandlers() {
- List handlers = new ArrayList<>();
- handlers.addAll(getReturnValueHandlerConfigurer().getCustomHandlers());
- if (this.encoderReturnValueHandler != null) {
- handlers.add(this.encoderReturnValueHandler);
+ return Collections.emptyList();
+ }
+
+
+ @Override
+ public final void start() {
+ synchronized (this.lifecycleMonitor) {
+ this.inboundChannel.subscribe(this);
+ this.running = true;
}
- return handlers;
}
+ @Override
+ public final void stop() {
+ synchronized (this.lifecycleMonitor) {
+ this.running = false;
+ this.inboundChannel.unsubscribe(this);
+ }
+ }
+
+ @Override
+ public final void stop(Runnable callback) {
+ synchronized (this.lifecycleMonitor) {
+ stop();
+ callback.run();
+ }
+ }
@Override
- protected boolean isHandler(Class> beanType) {
- return AnnotatedElementUtils.hasAnnotation(beanType, Controller.class);
+ public final boolean isRunning() {
+ return this.running;
}
+
@Override
protected CompositeMessageCondition getMappingForMethod(Method method, Class> handlerType) {
CompositeMessageCondition methodCondition = getCondition(method);
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/PayloadMethodArgumentResolver.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/PayloadMethodArgumentResolver.java
index 6aad4365729..e6885e21ad9 100644
--- a/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/PayloadMethodArgumentResolver.java
+++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/PayloadMethodArgumentResolver.java
@@ -19,6 +19,7 @@ import java.lang.annotation.Annotation;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.function.Consumer;
import org.apache.commons.logging.Log;
@@ -148,38 +149,47 @@ public class PayloadMethodArgumentResolver implements HandlerMethodArgumentResol
* @param message the message from which the content was extracted
* @return a Mono with the result of argument resolution
*
- * @see #extractPayloadContent(MethodParameter, Message)
+ * @see #extractContent(MethodParameter, Message)
* @see #getMimeType(Message)
*/
@Override
public final Mono resolveArgument(MethodParameter parameter, Message> message) {
+
Payload ann = parameter.getParameterAnnotation(Payload.class);
if (ann != null && StringUtils.hasText(ann.expression())) {
throw new IllegalStateException("@Payload SpEL expressions not supported by this resolver");
}
- Publisher content = extractPayloadContent(parameter, message);
- return decodeContent(parameter, message, ann == null || ann.required(), content, getMimeType(message));
+
+ MimeType mimeType = getMimeType(message);
+ mimeType = mimeType != null ? mimeType : MimeTypeUtils.APPLICATION_OCTET_STREAM;
+
+ Flux content = extractContent(parameter, message);
+ return decodeContent(parameter, message, ann == null || ann.required(), content, mimeType);
}
- /**
- * Extract the content to decode from the message. By default, the message
- * payload is expected to be {@code Publisher}. Sub-classes can
- * override this method to change that assumption.
- * @param parameter the target method parameter we're decoding to
- * @param message the input message with the content
- * @return the content to decode
- */
@SuppressWarnings("unchecked")
- protected Publisher extractPayloadContent(MethodParameter parameter, Message> message) {
- Publisher content;
- try {
- content = (Publisher) message.getPayload();
+ private Flux extractContent(MethodParameter parameter, Message> message) {
+ Object payload = message.getPayload();
+ if (payload instanceof DataBuffer) {
+ return Flux.just((DataBuffer) payload);
}
- catch (ClassCastException ex) {
- throw new MethodArgumentResolutionException(
- message, parameter, "Expected Publisher payload", ex);
+ if (payload instanceof Publisher) {
+ return Flux.from((Publisher>) payload).map(value -> {
+ if (value instanceof DataBuffer) {
+ return (DataBuffer) value;
+ }
+ String className = value.getClass().getName();
+ throw getUnexpectedPayloadError(message, parameter, "Publisher<" + className + ">");
+ });
}
- return content;
+ return Flux.error(getUnexpectedPayloadError(message, parameter, payload.getClass().getName()));
+ }
+
+ private MethodArgumentResolutionException getUnexpectedPayloadError(
+ Message> message, MethodParameter parameter, String actualType) {
+
+ return new MethodArgumentResolutionException(message, parameter,
+ "Expected DataBuffer or Publisher for the Message payload, actual: " + actualType);
}
/**
@@ -206,7 +216,7 @@ public class PayloadMethodArgumentResolver implements HandlerMethodArgumentResol
}
private Mono decodeContent(MethodParameter parameter, Message> message,
- boolean isContentRequired, Publisher content, @Nullable MimeType mimeType) {
+ boolean isContentRequired, Flux content, MimeType mimeType) {
ResolvableType targetType = ResolvableType.forMethodParameter(parameter);
Class> resolvedType = targetType.resolve();
@@ -215,19 +225,14 @@ public class PayloadMethodArgumentResolver implements HandlerMethodArgumentResol
isContentRequired = isContentRequired || (adapter != null && !adapter.supportsEmpty());
Consumer validator = getValidator(message, parameter);
- if (logger.isDebugEnabled()) {
- logger.debug("Mime type:" + mimeType);
- }
- mimeType = mimeType != null ? mimeType : MimeTypeUtils.APPLICATION_OCTET_STREAM;
+ Map hints = Collections.emptyMap();
for (Decoder> decoder : this.decoders) {
if (decoder.canDecode(elementType, mimeType)) {
if (adapter != null && adapter.isMultiValue()) {
- if (logger.isDebugEnabled()) {
- logger.debug("0..N [" + elementType + "]");
- }
- Flux> flux = decoder.decode(content, elementType, mimeType, Collections.emptyMap());
- flux = flux.onErrorResume(ex -> Flux.error(handleReadError(parameter, message, ex)));
+ Flux> flux = content
+ .concatMap(buffer -> decoder.decode(Mono.just(buffer), elementType, mimeType, hints))
+ .onErrorResume(ex -> Flux.error(handleReadError(parameter, message, ex)));
if (isContentRequired) {
flux = flux.switchIfEmpty(Flux.error(() -> handleMissingBody(parameter, message)));
}
@@ -237,12 +242,10 @@ public class PayloadMethodArgumentResolver implements HandlerMethodArgumentResol
return Mono.just(adapter.fromPublisher(flux));
}
else {
- if (logger.isDebugEnabled()) {
- logger.debug("0..1 [" + elementType + "]");
- }
// Single-value (with or without reactive type wrapper)
- Mono> mono = decoder.decodeToMono(content, targetType, mimeType, Collections.emptyMap());
- mono = mono.onErrorResume(ex -> Mono.error(handleReadError(parameter, message, ex)));
+ Mono> mono = decoder
+ .decodeToMono(content.next(), targetType, mimeType, hints)
+ .onErrorResume(ex -> Mono.error(handleReadError(parameter, message, ex)));
if (isContentRequired) {
mono = mono.switchIfEmpty(Mono.error(() -> handleMissingBody(parameter, message)));
}
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/AbstractEncoderMethodReturnValueHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/AbstractEncoderMethodReturnValueHandler.java
index d7a3aa96b48..aa5916a4147 100644
--- a/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/AbstractEncoderMethodReturnValueHandler.java
+++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/AbstractEncoderMethodReturnValueHandler.java
@@ -32,20 +32,21 @@ import org.springframework.core.ResolvableType;
import org.springframework.core.codec.Encoder;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
+import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.util.Assert;
+import org.springframework.util.MimeType;
/**
- * Base class for a return value handler that encodes the return value, possibly
- * a {@link Publisher} of values, to a {@code Flux} through a
- * compatible {@link Encoder}.
+ * Base class for a return value handler that encodes return values to
+ * {@code Flux} through the configured {@link Encoder}s.
*
* Sub-classes must implement the abstract method
- * {@link #handleEncodedContent} to do something with the resulting encoded
- * content.
+ * {@link #handleEncodedContent} to handle the resulting encoded content.
*
*
This handler should be ordered last since its {@link #supportsReturnType}
* returns {@code true} for any method parameter type.
@@ -67,8 +68,7 @@ public abstract class AbstractEncoderMethodReturnValueHandler implements Handler
private final ReactiveAdapterRegistry adapterRegistry;
- // TODO: configure or passed via MessageHeaders
- private DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
+ private DataBufferFactory defaultBufferFactory = new DefaultDataBufferFactory();
protected AbstractEncoderMethodReturnValueHandler(List> encoders, ReactiveAdapterRegistry registry) {
@@ -96,69 +96,104 @@ public abstract class AbstractEncoderMethodReturnValueHandler implements Handler
@Override
public boolean supportsReturnType(MethodParameter returnType) {
+ // We could check canEncode but we're probably last in order anyway
return true;
}
@Override
- public Mono handleReturnValue(Object returnValue, MethodParameter returnType, Message> message) {
- Flux encodedContent = encodeContent(returnValue, returnType, this.bufferFactory);
+ public Mono handleReturnValue(
+ @Nullable Object returnValue, MethodParameter returnType, Message> message) {
+
+ DataBufferFactory bufferFactory = (DataBufferFactory) message.getHeaders()
+ .getOrDefault(HandlerMethodReturnValueHandler.DATA_BUFFER_FACTORY_HEADER, this.defaultBufferFactory);
+
+ MimeType mimeType = (MimeType) message.getHeaders().get(MessageHeaders.CONTENT_TYPE);
+
+ Flux encodedContent = encodeContent(
+ returnValue, returnType, bufferFactory, mimeType, Collections.emptyMap());
+
return handleEncodedContent(encodedContent, returnType, message);
}
@SuppressWarnings("unchecked")
- private Flux encodeContent(@Nullable Object content, MethodParameter returnType,
- DataBufferFactory bufferFactory) {
+ private Flux encodeContent(
+ @Nullable Object content, MethodParameter returnType, DataBufferFactory bufferFactory,
+ @Nullable MimeType mimeType, Map hints) {
- ResolvableType bodyType = ResolvableType.forMethodParameter(returnType);
- ReactiveAdapter adapter = getAdapterRegistry().getAdapter(bodyType.resolve(), content);
+ ResolvableType returnValueType = ResolvableType.forMethodParameter(returnType);
+ ReactiveAdapter adapter = getAdapterRegistry().getAdapter(returnValueType.resolve(), content);
Publisher> publisher;
ResolvableType elementType;
if (adapter != null) {
publisher = adapter.toPublisher(content);
- ResolvableType genericType = bodyType.getGeneric();
+ ResolvableType genericType = returnValueType.getGeneric();
elementType = getElementType(adapter, genericType);
}
else {
publisher = Mono.justOrEmpty(content);
- elementType = (bodyType.toClass() == Object.class && content != null ?
- ResolvableType.forInstance(content) : bodyType);
+ elementType = returnValueType.toClass() == Object.class && content != null ?
+ ResolvableType.forInstance(content) : returnValueType;
}
if (elementType.resolve() == void.class || elementType.resolve() == Void.class) {
return Flux.from(publisher).cast(DataBuffer.class);
}
- if (logger.isDebugEnabled()) {
- logger.debug((publisher instanceof Mono ? "0..1" : "0..N") + " [" + elementType + "]");
- }
-
- for (Encoder> encoder : getEncoders()) {
- if (encoder.canEncode(elementType, null)) {
- Map hints = Collections.emptyMap();
- return encoder.encode((Publisher) publisher, bufferFactory, elementType, null, hints);
- }
- }
+ Encoder> encoder = getEncoder(elementType, mimeType);
- return Flux.error(new MessagingException("No encoder for " + returnType));
+ return Flux.from((Publisher) publisher).concatMap(value ->
+ encodeValue(value, elementType, encoder, bufferFactory, mimeType, hints));
}
- private ResolvableType getElementType(ReactiveAdapter adapter, ResolvableType genericType) {
+ private ResolvableType getElementType(ReactiveAdapter adapter, ResolvableType type) {
if (adapter.isNoValue()) {
return VOID_RESOLVABLE_TYPE;
}
- else if (genericType != ResolvableType.NONE) {
- return genericType;
+ else if (type != ResolvableType.NONE) {
+ return type;
}
else {
return OBJECT_RESOLVABLE_TYPE;
}
}
+ @Nullable
+ @SuppressWarnings("unchecked")
+ private Encoder getEncoder(ResolvableType elementType, @Nullable MimeType mimeType) {
+ for (Encoder> encoder : getEncoders()) {
+ if (encoder.canEncode(elementType, mimeType)) {
+ return (Encoder) encoder;
+ }
+ }
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ private Mono encodeValue(
+ Object element, ResolvableType elementType, @Nullable Encoder encoder,
+ DataBufferFactory bufferFactory, @Nullable MimeType mimeType,
+ @Nullable Map hints) {
+
+ if (encoder == null) {
+ encoder = getEncoder(ResolvableType.forInstance(element), mimeType);
+ if (encoder == null) {
+ return Mono.error(new MessagingException(
+ "No encoder for " + elementType + ", current value type is " + element.getClass()));
+ }
+ }
+ Mono mono = Mono.just((T) element);
+ Flux dataBuffers = encoder.encode(mono, bufferFactory, elementType, mimeType, hints);
+ return DataBufferUtils.join(dataBuffers);
+ }
+
/**
- * Handle the encoded content in some way, e.g. wrapping it in a message and
- * passing it on for further processing.
- * @param encodedContent the result of data encoding
+ * Sub-classes implement this method to handle encoded values in some way
+ * such as creating and sending messages.
+ *
+ * @param encodedContent the encoded content; each {@code DataBuffer}
+ * represents the fully-aggregated, encoded content for one value
+ * (i.e. payload) returned from the HandlerMethod.
* @param returnType return type of the handler method that produced the data
* @param message the input message handled by the handler method
* @return completion {@code Mono} for the handling
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/AbstractMethodMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/AbstractMethodMessageHandler.java
index 684ed8e04d6..23824c3096a 100644
--- a/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/AbstractMethodMessageHandler.java
+++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/AbstractMethodMessageHandler.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -93,6 +94,9 @@ public abstract class AbstractMethodMessageHandler
private ReactiveAdapterRegistry reactiveAdapterRegistry = ReactiveAdapterRegistry.getSharedInstance();
+ @Nullable
+ private Predicate> handlerPredicate;
+
@Nullable
private ApplicationContext applicationContext;
@@ -137,6 +141,24 @@ public abstract class AbstractMethodMessageHandler
return this.returnValueHandlerConfigurer;
}
+ /**
+ * Configure a predicate to decide if which beans in the Spring context
+ * should be checked to see if they have message handling methods.
+ * By default this is not set and sub-classes should configure it in
+ * order to enable auto-detection of message handling methods.
+ */
+ public void setHandlerPredicate(@Nullable Predicate> handlerPredicate) {
+ this.handlerPredicate = handlerPredicate;
+ }
+
+ /**
+ * Return the {@link #setHandlerPredicate configured} handler predicate.
+ */
+ @Nullable
+ public Predicate> getHandlerPredicate() {
+ return this.handlerPredicate;
+ }
+
/**
* Configure the registry for adapting various reactive types.
* By default this is an instance of {@link ReactiveAdapterRegistry} with
@@ -228,6 +250,10 @@ public abstract class AbstractMethodMessageHandler
logger.warn("No ApplicationContext available for detecting beans with message handling methods.");
return;
}
+ if (this.handlerPredicate == null) {
+ logger.warn("'handlerPredicate' not configured: no auto-detection of message handling methods.");
+ return;
+ }
for (String beanName : this.applicationContext.getBeanNamesForType(Object.class)) {
if (!beanName.startsWith(SCOPED_TARGET_NAME_PREFIX)) {
Class> beanType = null;
@@ -240,24 +266,22 @@ public abstract class AbstractMethodMessageHandler
logger.debug("Could not resolve target class for bean with name '" + beanName + "'", ex);
}
}
- if (beanType != null && isHandler(beanType)) {
+ if (beanType != null && this.handlerPredicate.test(beanType)) {
detectHandlerMethods(beanName);
}
}
}
}
- /**
- * Whether the given bean could contain message handling methods.
- */
- protected abstract boolean isHandler(Class> beanType);
-
/**
* Detect if the given handler has any methods that can handle messages and if
* so register it with the extracted mapping information.
+ * Note: This method is protected and can be invoked by
+ * sub-classes, but this should be done on startup only as documented in
+ * {@link #registerHandlerMethod}.
* @param handler the handler to check, either an instance of a Spring bean name
*/
- private void detectHandlerMethods(Object handler) {
+ protected final void detectHandlerMethods(Object handler) {
Class> handlerType;
if (handler instanceof String) {
ApplicationContext context = getApplicationContext();
@@ -288,14 +312,17 @@ public abstract class AbstractMethodMessageHandler
protected abstract T getMappingForMethod(Method method, Class> handlerType);
/**
- * Register a handler method and its unique mapping, on startup.
+ * Register a handler method and its unique mapping.
+ * Note: This method is protected and can be invoked by
+ * sub-classes. Keep in mind however that the registration is not protected
+ * for concurrent use, and is expected to be done on startup.
* @param handler the bean name of the handler or the handler instance
* @param method the method to register
* @param mapping the mapping conditions associated with the handler method
* @throws IllegalStateException if another method was already registered
* under the same mapping
*/
- protected void registerHandlerMethod(Object handler, Method method, T mapping) {
+ protected final void registerHandlerMethod(Object handler, Method method, T mapping) {
Assert.notNull(mapping, "Mapping must not be null");
HandlerMethod newHandlerMethod = createHandlerMethod(handler, method);
HandlerMethod oldHandlerMethod = this.handlerMethods.get(mapping);
@@ -348,6 +375,7 @@ public abstract class AbstractMethodMessageHandler
public Mono handleMessage(Message> message) throws MessagingException {
Match match = getHandlerMethod(message);
if (match == null) {
+ // handleNoMatch would have been invoked already
return Mono.empty();
}
HandlerMethod handlerMethod = match.getHandlerMethod().createWithResolvedBean();
@@ -383,6 +411,7 @@ public abstract class AbstractMethodMessageHandler
addMatchesToCollection(allMappings, message, matches);
}
if (matches.isEmpty()) {
+ handleNoMatch(destination, message);
return null;
}
Comparator> comparator = new MatchComparator(getMappingComparator(message));
@@ -443,12 +472,22 @@ public abstract class AbstractMethodMessageHandler
*/
protected abstract Comparator getMappingComparator(Message> message);
+ /**
+ * Invoked when no matching handler is found.
+ * @param destination the destination
+ * @param message the message
+ */
+ @Nullable
+ protected void handleNoMatch(@Nullable String destination, Message> message) {
+ logger.debug("No handlers for destination '" + destination + "'");
+ }
+
private Mono processHandlerException(Message> message, HandlerMethod handlerMethod, Exception ex) {
InvocableHandlerMethod exceptionInvocable = findExceptionHandler(handlerMethod, ex);
if (exceptionInvocable == null) {
logger.error("Unhandled exception from message handling method", ex);
- return Mono.empty();
+ return Mono.error(ex);
}
exceptionInvocable.setArgumentResolvers(this.argumentResolvers.getResolvers());
if (logger.isDebugEnabled()) {
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/HandlerMethodReturnValueHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/HandlerMethodReturnValueHandler.java
index 9cf6c894ce7..7c72ba6332f 100644
--- a/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/HandlerMethodReturnValueHandler.java
+++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/HandlerMethodReturnValueHandler.java
@@ -31,6 +31,10 @@ import org.springframework.messaging.Message;
*/
public interface HandlerMethodReturnValueHandler {
+ /** Header containing a DataBufferFactory to use. */
+ public static final String DATA_BUFFER_FACTORY_HEADER = "dataBufferFactoryHeader";
+
+
/**
* Whether the given {@linkplain MethodParameter method return type} is
* supported by this handler.
diff --git a/spring-messaging/src/test/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandlerTests.java
index 243beb8f8d8..a34ddc8b0c2 100644
--- a/spring-messaging/src/test/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandlerTests.java
+++ b/spring-messaging/src/test/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandlerTests.java
@@ -19,15 +19,14 @@ import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.stream.Collectors;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
import org.springframework.beans.factory.config.EmbeddedValueResolver;
import org.springframework.context.support.StaticApplicationContext;
-import org.springframework.core.MethodParameter;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.codec.Decoder;
@@ -38,18 +37,18 @@ import org.springframework.core.env.PropertySource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
-import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
+import org.springframework.messaging.ReactiveSubscribableChannel;
import org.springframework.messaging.handler.DestinationPatternsMessageCondition;
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
import org.springframework.messaging.handler.annotation.MessageMapping;
-import org.springframework.messaging.handler.invocation.reactive.AbstractEncoderMethodReturnValueHandler;
+import org.springframework.messaging.handler.invocation.reactive.TestEncoderMethodReturnValueHandler;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Controller;
import static java.nio.charset.StandardCharsets.*;
import static org.junit.Assert.*;
-import static org.springframework.core.io.buffer.support.DataBufferTestUtils.*;
+import static org.mockito.Mockito.*;
/**
* Unit tests for {@link MessageMappingMessageHandler}.
@@ -61,51 +60,64 @@ public class MessageMappingMessageHandlerTests {
private static final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
- private TestEncoderReturnValueHandler returnValueHandler;
+ private TestEncoderMethodReturnValueHandler returnValueHandler;
@Test
public void handleString() {
MessageMappingMessageHandler messsageHandler = initMesssageHandler();
- messsageHandler.handleMessage(message("/string", "abcdef")).block(Duration.ofSeconds(5));
+ messsageHandler.handleMessage(message("string", "abcdef")).block(Duration.ofSeconds(5));
verifyOutputContent(Collections.singletonList("abcdef::response"));
}
@Test
public void handleMonoString() {
MessageMappingMessageHandler messsageHandler = initMesssageHandler();
- messsageHandler.handleMessage(message("/monoString", "abcdef")).block(Duration.ofSeconds(5));
+ messsageHandler.handleMessage(message("monoString", "abcdef")).block(Duration.ofSeconds(5));
verifyOutputContent(Collections.singletonList("abcdef::response"));
}
@Test
public void handleFluxString() {
MessageMappingMessageHandler messsageHandler = initMesssageHandler();
- messsageHandler.handleMessage(message("/fluxString", "abc\ndef\nghi")).block(Duration.ofSeconds(5));
+ messsageHandler.handleMessage(message("fluxString", "abc\ndef\nghi")).block(Duration.ofSeconds(5));
verifyOutputContent(Arrays.asList("abc::response", "def::response", "ghi::response"));
}
@Test
public void handleWithPlaceholderInMapping() {
MessageMappingMessageHandler messsageHandler = initMesssageHandler();
- messsageHandler.handleMessage(message("/path123", "abcdef")).block(Duration.ofSeconds(5));
+ messsageHandler.handleMessage(message("path123", "abcdef")).block(Duration.ofSeconds(5));
verifyOutputContent(Collections.singletonList("abcdef::response"));
}
@Test
public void handleException() {
MessageMappingMessageHandler messsageHandler = initMesssageHandler();
- messsageHandler.handleMessage(message("/exception", "abc")).block(Duration.ofSeconds(5));
+ messsageHandler.handleMessage(message("exception", "abc")).block(Duration.ofSeconds(5));
verifyOutputContent(Collections.singletonList("rejected::handled"));
}
@Test
public void handleErrorSignal() {
MessageMappingMessageHandler messsageHandler = initMesssageHandler();
- messsageHandler.handleMessage(message("/errorSignal", "abc")).block(Duration.ofSeconds(5));
+ messsageHandler.handleMessage(message("errorSignal", "abc")).block(Duration.ofSeconds(5));
verifyOutputContent(Collections.singletonList("rejected::handled"));
}
+ @Test
+ public void unhandledExceptionShouldFlowThrough() {
+
+ GenericMessage> message = new GenericMessage<>(new Object(),
+ Collections.singletonMap(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, "string"));
+
+ StepVerifier.create(initMesssageHandler().handleMessage(message))
+ .expectErrorSatisfies(ex -> assertTrue(
+ "Actual: " + ex.getMessage(),
+ ex.getMessage().startsWith("Could not resolve method parameter at index 0")))
+ .verify(Duration.ofSeconds(5));
+ }
+
private MessageMappingMessageHandler initMesssageHandler() {
@@ -113,7 +125,7 @@ public class MessageMappingMessageHandlerTests {
List> encoders = Collections.singletonList(CharSequenceEncoder.allMimeTypes());
ReactiveAdapterRegistry registry = ReactiveAdapterRegistry.getSharedInstance();
- this.returnValueHandler = new TestEncoderReturnValueHandler(encoders, registry);
+ this.returnValueHandler = new TestEncoderMethodReturnValueHandler(encoders, registry);
PropertySource> source = new MapPropertySource("test", Collections.singletonMap("path", "path123"));
@@ -122,11 +134,13 @@ public class MessageMappingMessageHandlerTests {
context.registerSingleton("testController", TestController.class);
context.refresh();
- MessageMappingMessageHandler messageHandler = new MessageMappingMessageHandler();
+ ReactiveSubscribableChannel channel = mock(ReactiveSubscribableChannel.class);
+
+ MessageMappingMessageHandler messageHandler = new MessageMappingMessageHandler(channel);
+ messageHandler.getReturnValueHandlerConfigurer().addCustomHandler(this.returnValueHandler);
messageHandler.setApplicationContext(context);
messageHandler.setEmbeddedValueResolver(new EmbeddedValueResolver(context.getBeanFactory()));
messageHandler.setDecoders(decoders);
- messageHandler.setEncoderReturnValueHandler(this.returnValueHandler);
messageHandler.afterPropertiesSet();
return messageHandler;
@@ -134,7 +148,7 @@ public class MessageMappingMessageHandlerTests {
private Message> message(String destination, String... content) {
return new GenericMessage<>(
- Flux.fromIterable(Arrays.stream(content).map(this::toDataBuffer).collect(Collectors.toList())),
+ Flux.fromIterable(Arrays.asList(content)).map(payload -> toDataBuffer(payload)),
Collections.singletonMap(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, destination));
}
@@ -143,42 +157,40 @@ public class MessageMappingMessageHandlerTests {
}
private void verifyOutputContent(List expected) {
- List buffers = this.returnValueHandler.getOutputContent();
- assertNotNull("No output: no matching handler method?", buffers);
- List actual = buffers.stream().map(buffer -> dumpString(buffer, UTF_8)).collect(Collectors.toList());
- assertEquals(expected, actual);
+ Flux result = this.returnValueHandler.getContentAsStrings();
+ StepVerifier.create(result.collectList()).expectNext(expected).verifyComplete();
}
@Controller
static class TestController {
- @MessageMapping("/string")
+ @MessageMapping("string")
String handleString(String payload) {
return payload + "::response";
}
- @MessageMapping("/monoString")
+ @MessageMapping("monoString")
Mono handleMonoString(Mono payload) {
return payload.map(s -> s + "::response").delayElement(Duration.ofMillis(10));
}
- @MessageMapping("/fluxString")
+ @MessageMapping("fluxString")
Flux handleFluxString(Flux payload) {
return payload.map(s -> s + "::response").delayElements(Duration.ofMillis(10));
}
- @MessageMapping("/${path}")
+ @MessageMapping("${path}")
String handleWithPlaceholder(String payload) {
return payload + "::response";
}
- @MessageMapping("/exception")
+ @MessageMapping("exception")
String handleAndThrow() {
throw new IllegalArgumentException("rejected");
}
- @MessageMapping("/errorSignal")
+ @MessageMapping("errorSignal")
Mono handleAndSignalError() {
return Mono.delay(Duration.ofMillis(10))
.flatMap(aLong -> Mono.error(new IllegalArgumentException("rejected")));
@@ -190,29 +202,4 @@ public class MessageMappingMessageHandlerTests {
}
}
-
- private static class TestEncoderReturnValueHandler extends AbstractEncoderMethodReturnValueHandler {
-
- @Nullable
- private volatile List outputContent;
-
-
- TestEncoderReturnValueHandler(List> encoders, ReactiveAdapterRegistry registry) {
- super(encoders, registry);
- }
-
-
- @Nullable
- public List getOutputContent() {
- return this.outputContent;
- }
-
- @Override
- protected Mono handleEncodedContent(
- Flux encodedContent, MethodParameter returnType, Message> message) {
-
- return encodedContent.collectList().doOnNext(buffers -> this.outputContent = buffers).then();
- }
- }
-
}
diff --git a/spring-messaging/src/test/java/org/springframework/messaging/handler/annotation/support/reactive/PayloadMethodArgumentResolverTests.java b/spring-messaging/src/test/java/org/springframework/messaging/handler/annotation/support/reactive/PayloadMethodArgumentResolverTests.java
index 0797440d06c..0e1e05d08ae 100644
--- a/spring-messaging/src/test/java/org/springframework/messaging/handler/annotation/support/reactive/PayloadMethodArgumentResolverTests.java
+++ b/spring-messaging/src/test/java/org/springframework/messaging/handler/annotation/support/reactive/PayloadMethodArgumentResolverTests.java
@@ -101,8 +101,8 @@ public class PayloadMethodArgumentResolverTests {
public void stringMono() {
String body = "foo";
MethodParameter param = this.testMethod.arg(ResolvableType.forClassWithGenerics(Mono.class, String.class));
- Mono value = Mono.delay(Duration.ofMillis(10)).map(aLong -> toDataBuffer(body));
- Mono mono = resolveValue(param, value, null);
+ Mono mono = resolveValue(param,
+ Mono.delay(Duration.ofMillis(10)).map(aLong -> toDataBuffer(body)), null);
assertEquals(body, mono.block());
}
@@ -112,8 +112,8 @@ public class PayloadMethodArgumentResolverTests {
List body = Arrays.asList("foo", "bar");
ResolvableType type = ResolvableType.forClassWithGenerics(Flux.class, String.class);
MethodParameter param = this.testMethod.arg(type);
- Flux flux = resolveValue(param, Flux.fromIterable(body)
- .delayElements(Duration.ofMillis(10)).map(value -> toDataBuffer(value + "\n")), null);
+ Flux flux = resolveValue(param,
+ Flux.fromIterable(body).delayElements(Duration.ofMillis(10)).map(this::toDataBuffer), null);
assertEquals(body, flux.collectList().block());
}
@@ -141,7 +141,7 @@ public class PayloadMethodArgumentResolverTests {
public void validateStringFlux() {
ResolvableType type = ResolvableType.forClassWithGenerics(Flux.class, String.class);
MethodParameter param = this.testMethod.arg(type);
- Flux flux = resolveValue(param, Flux.just(toDataBuffer("12345678\n12345")), new TestValidator());
+ Flux flux = resolveValue(param, Mono.just(toDataBuffer("12345678\n12345")), new TestValidator());
StepVerifier.create(flux)
.expectNext("12345678")
diff --git a/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/reactive/EncoderMethodReturnValueHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/reactive/EncoderMethodReturnValueHandlerTests.java
index b373d671c01..a8cb85b7eee 100644
--- a/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/reactive/EncoderMethodReturnValueHandlerTests.java
+++ b/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/reactive/EncoderMethodReturnValueHandlerTests.java
@@ -16,7 +16,6 @@
package org.springframework.messaging.handler.invocation.reactive;
import java.util.Collections;
-import java.util.List;
import io.reactivex.Completable;
import org.junit.Test;
@@ -27,15 +26,10 @@ import reactor.test.StepVerifier;
import org.springframework.core.MethodParameter;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.codec.CharSequenceEncoder;
-import org.springframework.core.codec.Encoder;
-import org.springframework.core.io.buffer.DataBuffer;
-import org.springframework.core.io.buffer.support.DataBufferTestUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
+import org.springframework.messaging.support.GenericMessage;
-import static java.nio.charset.StandardCharsets.*;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
import static org.springframework.messaging.handler.invocation.ResolvableMethod.*;
/**
@@ -49,41 +43,43 @@ public class EncoderMethodReturnValueHandlerTests {
Collections.singletonList(CharSequenceEncoder.textPlainOnly()),
ReactiveAdapterRegistry.getSharedInstance());
- private final Message> message = mock(Message.class);
+ private final Message> message = new GenericMessage<>("shouldn't matter");
@Test
public void stringReturnValue() {
MethodParameter parameter = on(TestController.class).resolveReturnType(String.class);
- this.handler.handleReturnValue("foo", parameter, message).block();
- Flux result = this.handler.encodedContent;
+ this.handler.handleReturnValue("foo", parameter, this.message).block();
+ Flux result = this.handler.getContentAsStrings();
- StepVerifier.create(result)
- .consumeNextWith(buffer -> assertEquals("foo", DataBufferTestUtils.dumpString(buffer, UTF_8)))
- .verifyComplete();
+ StepVerifier.create(result).expectNext("foo").verifyComplete();
}
@Test
public void objectReturnValue() {
MethodParameter parameter = on(TestController.class).resolveReturnType(Object.class);
- this.handler.handleReturnValue("foo", parameter, message).block();
- Flux result = this.handler.encodedContent;
+ this.handler.handleReturnValue("foo", parameter, this.message).block();
+ Flux result = this.handler.getContentAsStrings();
- StepVerifier.create(result)
- .consumeNextWith(buffer -> assertEquals("foo", DataBufferTestUtils.dumpString(buffer, UTF_8)))
- .verifyComplete();
+ StepVerifier.create(result).expectNext("foo").verifyComplete();
}
@Test
public void fluxStringReturnValue() {
MethodParameter parameter = on(TestController.class).resolveReturnType(Flux.class, String.class);
- this.handler.handleReturnValue(Flux.just("foo", "bar"), parameter, message).block();
- Flux result = this.handler.encodedContent;
+ this.handler.handleReturnValue(Flux.just("foo", "bar"), parameter, this.message).block();
+ Flux result = this.handler.getContentAsStrings();
- StepVerifier.create(result)
- .consumeNextWith(buffer -> assertEquals("foo", DataBufferTestUtils.dumpString(buffer, UTF_8)))
- .consumeNextWith(buffer -> assertEquals("bar", DataBufferTestUtils.dumpString(buffer, UTF_8)))
- .verifyComplete();
+ StepVerifier.create(result).expectNext("foo").expectNext("bar").verifyComplete();
+ }
+
+ @Test
+ public void fluxObjectReturnValue() {
+ MethodParameter parameter = on(TestController.class).resolveReturnType(Flux.class, Object.class);
+ this.handler.handleReturnValue(Flux.just("foo", "bar"), parameter, this.message).block();
+ Flux result = this.handler.getContentAsStrings();
+
+ StepVerifier.create(result).expectNext("foo").expectNext("bar").verifyComplete();
}
@Test
@@ -91,23 +87,19 @@ public class EncoderMethodReturnValueHandlerTests {
testVoidReturnType(null, on(TestController.class).resolveReturnType(void.class));
testVoidReturnType(Mono.empty(), on(TestController.class).resolveReturnType(Mono.class, Void.class));
testVoidReturnType(Completable.complete(), on(TestController.class).resolveReturnType(Completable.class));
-
}
private void testVoidReturnType(@Nullable Object value, MethodParameter bodyParameter) {
- this.handler.handleReturnValue(value, bodyParameter, message).block();
- Flux result = this.handler.encodedContent;
+ this.handler.handleReturnValue(value, bodyParameter, this.message).block();
+ Flux result = this.handler.getContentAsStrings();
StepVerifier.create(result).expectComplete().verify();
}
@Test
public void noEncoder() {
MethodParameter parameter = on(TestController.class).resolveReturnType(Object.class);
- this.handler.handleReturnValue(new Object(), parameter, message).block();
- Flux result = this.handler.encodedContent;
-
- StepVerifier.create(result)
- .expectErrorMessage("No encoder for method 'object' parameter -1")
+ StepVerifier.create(this.handler.handleReturnValue(new Object(), parameter, this.message))
+ .expectErrorMessage("No encoder for java.lang.Object, current value type is class java.lang.Object")
.verify();
}
@@ -121,6 +113,8 @@ public class EncoderMethodReturnValueHandlerTests {
Flux fluxString() { return null; }
+ Flux fluxObject() { return null; }
+
void voidReturn() { }
Mono monoVoid() { return null; }
@@ -128,27 +122,4 @@ public class EncoderMethodReturnValueHandlerTests {
Completable completable() { return null; }
}
-
- private static class TestEncoderMethodReturnValueHandler extends AbstractEncoderMethodReturnValueHandler {
-
- private Flux encodedContent;
-
-
- public Flux getEncodedContent() {
- return this.encodedContent;
- }
-
- protected TestEncoderMethodReturnValueHandler(List> encoders, ReactiveAdapterRegistry registry) {
- super(encoders, registry);
- }
-
- @Override
- protected Mono handleEncodedContent(
- Flux encodedContent, MethodParameter returnType, Message> message) {
-
- this.encodedContent = encodedContent;
- return Mono.empty();
- }
- }
-
}
diff --git a/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/reactive/MethodMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/reactive/MethodMessageHandlerTests.java
index 32c3cb2638e..bcd7b6eec0b 100644
--- a/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/reactive/MethodMessageHandlerTests.java
+++ b/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/reactive/MethodMessageHandlerTests.java
@@ -192,6 +192,11 @@ public class MethodMessageHandlerTests {
private PathMatcher pathMatcher = new AntPathMatcher();
+ public TestMethodMessageHandler() {
+ setHandlerPredicate(handlerType -> handlerType.getName().endsWith("Controller"));
+ }
+
+
@Override
protected List extends HandlerMethodArgumentResolver> initArgumentResolvers() {
return Collections.emptyList();
@@ -211,11 +216,6 @@ public class MethodMessageHandlerTests {
super.registerHandlerMethod(handler, method, mapping);
}
- @Override
- protected boolean isHandler(Class> handlerType) {
- return handlerType.getName().endsWith("Controller");
- }
-
@Override
protected String getMappingForMethod(Method method, Class> handlerType) {
String methodName = method.getName();
diff --git a/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/reactive/TestEncoderMethodReturnValueHandler.java b/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/reactive/TestEncoderMethodReturnValueHandler.java
new file mode 100644
index 00000000000..3a47d53af9b
--- /dev/null
+++ b/spring-messaging/src/test/java/org/springframework/messaging/handler/invocation/reactive/TestEncoderMethodReturnValueHandler.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2002-2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.messaging.handler.invocation.reactive;
+
+import java.util.List;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import org.springframework.core.MethodParameter;
+import org.springframework.core.ReactiveAdapterRegistry;
+import org.springframework.core.codec.Encoder;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.support.DataBufferTestUtils;
+import org.springframework.messaging.Message;
+
+import static java.nio.charset.StandardCharsets.*;
+
+/**
+ * Implementation of {@link AbstractEncoderMethodReturnValueHandler} for tests.
+ * "Handles" by storing encoded return values.
+ *
+ * @author Rossen Stoyanchev
+ */
+public class TestEncoderMethodReturnValueHandler extends AbstractEncoderMethodReturnValueHandler {
+
+ private Flux encodedContent;
+
+
+ public TestEncoderMethodReturnValueHandler(List> encoders, ReactiveAdapterRegistry registry) {
+ super(encoders, registry);
+ }
+
+
+ public Flux getContent() {
+ return this.encodedContent;
+ }
+
+ public Flux getContentAsStrings() {
+ return this.encodedContent.map(buffer -> DataBufferTestUtils.dumpString(buffer, UTF_8));
+ }
+
+ @Override
+ protected Mono handleEncodedContent(
+ Flux encodedContent, MethodParameter returnType, Message> message) {
+
+ this.encodedContent = encodedContent.cache();
+ return this.encodedContent.then();
+ }
+}