12 changed files with 869 additions and 7 deletions
@ -0,0 +1,299 @@ |
|||||||
|
/* |
||||||
|
* Copyright 2002-2019 the original author or authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
package org.springframework.messaging.handler.annotation.support.reactive; |
||||||
|
|
||||||
|
import java.lang.annotation.Annotation; |
||||||
|
import java.util.ArrayList; |
||||||
|
import java.util.Collections; |
||||||
|
import java.util.List; |
||||||
|
import java.util.function.Consumer; |
||||||
|
|
||||||
|
import org.apache.commons.logging.Log; |
||||||
|
import org.apache.commons.logging.LogFactory; |
||||||
|
import org.reactivestreams.Publisher; |
||||||
|
import reactor.core.publisher.Flux; |
||||||
|
import reactor.core.publisher.Mono; |
||||||
|
|
||||||
|
import org.springframework.core.Conventions; |
||||||
|
import org.springframework.core.MethodParameter; |
||||||
|
import org.springframework.core.ReactiveAdapter; |
||||||
|
import org.springframework.core.ReactiveAdapterRegistry; |
||||||
|
import org.springframework.core.ResolvableType; |
||||||
|
import org.springframework.core.annotation.AnnotationUtils; |
||||||
|
import org.springframework.core.codec.Decoder; |
||||||
|
import org.springframework.core.codec.DecodingException; |
||||||
|
import org.springframework.core.io.buffer.DataBuffer; |
||||||
|
import org.springframework.lang.Nullable; |
||||||
|
import org.springframework.messaging.Message; |
||||||
|
import org.springframework.messaging.MessageHeaders; |
||||||
|
import org.springframework.messaging.handler.annotation.Payload; |
||||||
|
import org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException; |
||||||
|
import org.springframework.messaging.handler.invocation.MethodArgumentResolutionException; |
||||||
|
import org.springframework.messaging.handler.invocation.reactive.HandlerMethodArgumentResolver; |
||||||
|
import org.springframework.util.Assert; |
||||||
|
import org.springframework.util.CollectionUtils; |
||||||
|
import org.springframework.util.MimeType; |
||||||
|
import org.springframework.util.MimeTypeUtils; |
||||||
|
import org.springframework.util.ObjectUtils; |
||||||
|
import org.springframework.util.StringUtils; |
||||||
|
import org.springframework.validation.BeanPropertyBindingResult; |
||||||
|
import org.springframework.validation.SmartValidator; |
||||||
|
import org.springframework.validation.Validator; |
||||||
|
import org.springframework.validation.annotation.Validated; |
||||||
|
|
||||||
|
/** |
||||||
|
* A resolver to extract and decode the payload of a message using a |
||||||
|
* {@link Decoder}, where the payload is expected to be a {@link Publisher} of |
||||||
|
* {@link DataBuffer DataBuffer}. |
||||||
|
* |
||||||
|
* <p>Validation is applied if the method argument is annotated with |
||||||
|
* {@code @javax.validation.Valid} or |
||||||
|
* {@link org.springframework.validation.annotation.Validated}. Validation |
||||||
|
* failure results in an {@link MethodArgumentNotValidException}. |
||||||
|
* |
||||||
|
* <p>This resolver should be ordered last if {@link #useDefaultResolution} is |
||||||
|
* set to {@code true} since in that case it supports all types and does not |
||||||
|
* require the presence of {@link Payload}. |
||||||
|
* |
||||||
|
* @author Rossen Stoyanchev |
||||||
|
* @since 5.2 |
||||||
|
*/ |
||||||
|
public class PayloadMethodArgumentResolver implements HandlerMethodArgumentResolver { |
||||||
|
|
||||||
|
protected final Log logger = LogFactory.getLog(getClass()); |
||||||
|
|
||||||
|
|
||||||
|
private final List<Decoder<?>> decoders; |
||||||
|
|
||||||
|
@Nullable |
||||||
|
private final Validator validator; |
||||||
|
|
||||||
|
private final ReactiveAdapterRegistry adapterRegistry; |
||||||
|
|
||||||
|
private final boolean useDefaultResolution; |
||||||
|
|
||||||
|
|
||||||
|
public PayloadMethodArgumentResolver(List<? extends Decoder<?>> decoders, @Nullable Validator validator, |
||||||
|
@Nullable ReactiveAdapterRegistry registry, boolean useDefaultResolution) { |
||||||
|
|
||||||
|
Assert.isTrue(!CollectionUtils.isEmpty(decoders), "At least one Decoder is required."); |
||||||
|
this.decoders = Collections.unmodifiableList(new ArrayList<>(decoders)); |
||||||
|
this.validator = validator; |
||||||
|
this.adapterRegistry = registry != null ? registry : ReactiveAdapterRegistry.getSharedInstance(); |
||||||
|
this.useDefaultResolution = useDefaultResolution; |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
/** |
||||||
|
* Return a read-only list of the configured decoders. |
||||||
|
*/ |
||||||
|
public List<Decoder<?>> getDecoders() { |
||||||
|
return this.decoders; |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* Return the configured validator, if any. |
||||||
|
*/ |
||||||
|
@Nullable |
||||||
|
public Validator getValidator() { |
||||||
|
return this.validator; |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* Return the configured {@link ReactiveAdapterRegistry}. |
||||||
|
*/ |
||||||
|
public ReactiveAdapterRegistry getAdapterRegistry() { |
||||||
|
return this.adapterRegistry; |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* Whether this resolver is configured to use default resolution, i.e. |
||||||
|
* works for any argument type regardless of whether {@code @Payload} is |
||||||
|
* present or not. |
||||||
|
*/ |
||||||
|
public boolean isUseDefaultResolution() { |
||||||
|
return this.useDefaultResolution; |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
@Override |
||||||
|
public boolean supportsParameter(MethodParameter parameter) { |
||||||
|
return parameter.hasParameterAnnotation(Payload.class) || this.useDefaultResolution; |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
/** |
||||||
|
* Decode the content of the given message payload through a compatible |
||||||
|
* {@link Decoder}. |
||||||
|
* |
||||||
|
* <p>Validation is applied if the method argument is annotated with |
||||||
|
* {@code @javax.validation.Valid} or |
||||||
|
* {@link org.springframework.validation.annotation.Validated}. Validation |
||||||
|
* failure results in an {@link MethodArgumentNotValidException}. |
||||||
|
* |
||||||
|
* @param parameter the target method argument that we are decoding to |
||||||
|
* @param message the message from which the content was extracted |
||||||
|
* @return a Mono with the result of argument resolution |
||||||
|
* |
||||||
|
* @see #extractPayloadContent(MethodParameter, Message) |
||||||
|
* @see #getMimeType(Message) |
||||||
|
*/ |
||||||
|
@Override |
||||||
|
public final Mono<Object> resolveArgument(MethodParameter parameter, Message<?> message) { |
||||||
|
Payload ann = parameter.getParameterAnnotation(Payload.class); |
||||||
|
if (ann != null && StringUtils.hasText(ann.expression())) { |
||||||
|
throw new IllegalStateException("@Payload SpEL expressions not supported by this resolver"); |
||||||
|
} |
||||||
|
Publisher<DataBuffer> content = extractPayloadContent(parameter, message); |
||||||
|
return decodeContent(parameter, message, ann == null || ann.required(), content, getMimeType(message)); |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* Extract the content to decode from the message. By default, the message |
||||||
|
* payload is expected to be {@code Publisher<DataBuffer>}. Sub-classes can |
||||||
|
* override this method to change that assumption. |
||||||
|
* @param parameter the target method parameter we're decoding to |
||||||
|
* @param message the input message with the content |
||||||
|
* @return the content to decode |
||||||
|
*/ |
||||||
|
@SuppressWarnings("unchecked") |
||||||
|
protected Publisher<DataBuffer> extractPayloadContent(MethodParameter parameter, Message<?> message) { |
||||||
|
Publisher<DataBuffer> content; |
||||||
|
try { |
||||||
|
content = (Publisher<DataBuffer>) message.getPayload(); |
||||||
|
} |
||||||
|
catch (ClassCastException ex) { |
||||||
|
throw new MethodArgumentResolutionException( |
||||||
|
message, parameter, "Expected Publisher<DataBuffer> payload", ex); |
||||||
|
} |
||||||
|
return content; |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* Return the mime type for the content. By default this method checks the |
||||||
|
* {@link MessageHeaders#CONTENT_TYPE} header expecting to find a |
||||||
|
* {@link MimeType} value or a String to parse to a {@link MimeType}. |
||||||
|
* @param message the input message |
||||||
|
*/ |
||||||
|
@Nullable |
||||||
|
protected MimeType getMimeType(Message<?> message) { |
||||||
|
Object headerValue = message.getHeaders().get(MessageHeaders.CONTENT_TYPE); |
||||||
|
if (headerValue == null) { |
||||||
|
return null; |
||||||
|
} |
||||||
|
else if (headerValue instanceof String) { |
||||||
|
return MimeTypeUtils.parseMimeType((String) headerValue); |
||||||
|
} |
||||||
|
else if (headerValue instanceof MimeType) { |
||||||
|
return (MimeType) headerValue; |
||||||
|
} |
||||||
|
else { |
||||||
|
throw new IllegalArgumentException("Unexpected MimeType value: " + headerValue); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
private Mono<Object> decodeContent(MethodParameter parameter, Message<?> message, |
||||||
|
boolean isContentRequired, Publisher<DataBuffer> content, @Nullable MimeType mimeType) { |
||||||
|
|
||||||
|
ResolvableType targetType = ResolvableType.forMethodParameter(parameter); |
||||||
|
Class<?> resolvedType = targetType.resolve(); |
||||||
|
ReactiveAdapter adapter = (resolvedType != null ? getAdapterRegistry().getAdapter(resolvedType) : null); |
||||||
|
ResolvableType elementType = (adapter != null ? targetType.getGeneric() : targetType); |
||||||
|
isContentRequired = isContentRequired || (adapter != null && !adapter.supportsEmpty()); |
||||||
|
Consumer<Object> validator = getValidator(message, parameter); |
||||||
|
|
||||||
|
if (logger.isDebugEnabled()) { |
||||||
|
logger.debug("Mime type:" + mimeType); |
||||||
|
} |
||||||
|
mimeType = mimeType != null ? mimeType : MimeTypeUtils.APPLICATION_OCTET_STREAM; |
||||||
|
|
||||||
|
for (Decoder<?> decoder : this.decoders) { |
||||||
|
if (decoder.canDecode(elementType, mimeType)) { |
||||||
|
if (adapter != null && adapter.isMultiValue()) { |
||||||
|
if (logger.isDebugEnabled()) { |
||||||
|
logger.debug("0..N [" + elementType + "]"); |
||||||
|
} |
||||||
|
Flux<?> flux = decoder.decode(content, elementType, mimeType, Collections.emptyMap()); |
||||||
|
flux = flux.onErrorResume(ex -> Flux.error(handleReadError(parameter, message, ex))); |
||||||
|
if (isContentRequired) { |
||||||
|
flux = flux.switchIfEmpty(Flux.error(() -> handleMissingBody(parameter, message))); |
||||||
|
} |
||||||
|
if (validator != null) { |
||||||
|
flux = flux.doOnNext(validator::accept); |
||||||
|
} |
||||||
|
return Mono.just(adapter.fromPublisher(flux)); |
||||||
|
} |
||||||
|
else { |
||||||
|
if (logger.isDebugEnabled()) { |
||||||
|
logger.debug("0..1 [" + elementType + "]"); |
||||||
|
} |
||||||
|
// Single-value (with or without reactive type wrapper)
|
||||||
|
Mono<?> mono = decoder.decodeToMono(content, targetType, mimeType, Collections.emptyMap()); |
||||||
|
mono = mono.onErrorResume(ex -> Mono.error(handleReadError(parameter, message, ex))); |
||||||
|
if (isContentRequired) { |
||||||
|
mono = mono.switchIfEmpty(Mono.error(() -> handleMissingBody(parameter, message))); |
||||||
|
} |
||||||
|
if (validator != null) { |
||||||
|
mono = mono.doOnNext(validator::accept); |
||||||
|
} |
||||||
|
return (adapter != null ? Mono.just(adapter.fromPublisher(mono)) : Mono.from(mono)); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
return Mono.error(new MethodArgumentResolutionException( |
||||||
|
message, parameter, "Cannot decode to [" + targetType + "]" + message)); |
||||||
|
} |
||||||
|
|
||||||
|
private Throwable handleReadError(MethodParameter parameter, Message<?> message, Throwable ex) { |
||||||
|
return ex instanceof DecodingException ? |
||||||
|
new MethodArgumentResolutionException(message, parameter, "Failed to read HTTP message", ex) : ex; |
||||||
|
} |
||||||
|
|
||||||
|
private MethodArgumentResolutionException handleMissingBody(MethodParameter param, Message<?> message) { |
||||||
|
return new MethodArgumentResolutionException(message, param, |
||||||
|
"Payload content is missing: " + param.getExecutable().toGenericString()); |
||||||
|
} |
||||||
|
|
||||||
|
@Nullable |
||||||
|
private Consumer<Object> getValidator(Message<?> message, MethodParameter parameter) { |
||||||
|
if (this.validator == null) { |
||||||
|
return null; |
||||||
|
} |
||||||
|
for (Annotation ann : parameter.getParameterAnnotations()) { |
||||||
|
Validated validatedAnn = AnnotationUtils.getAnnotation(ann, Validated.class); |
||||||
|
if (validatedAnn != null || ann.annotationType().getSimpleName().startsWith("Valid")) { |
||||||
|
Object hints = (validatedAnn != null ? validatedAnn.value() : AnnotationUtils.getValue(ann)); |
||||||
|
Object[] validationHints = (hints instanceof Object[] ? (Object[]) hints : new Object[] {hints}); |
||||||
|
String name = Conventions.getVariableNameForParameter(parameter); |
||||||
|
return target -> { |
||||||
|
BeanPropertyBindingResult bindingResult = new BeanPropertyBindingResult(target, name); |
||||||
|
if (!ObjectUtils.isEmpty(validationHints) && this.validator instanceof SmartValidator) { |
||||||
|
((SmartValidator) this.validator).validate(target, bindingResult, validationHints); |
||||||
|
} |
||||||
|
else { |
||||||
|
this.validator.validate(target, bindingResult); |
||||||
|
} |
||||||
|
if (bindingResult.hasErrors()) { |
||||||
|
throw new MethodArgumentNotValidException(message, parameter, bindingResult); |
||||||
|
} |
||||||
|
}; |
||||||
|
} |
||||||
|
} |
||||||
|
return null; |
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
@ -0,0 +1,10 @@ |
|||||||
|
/** |
||||||
|
* Support classes for working with annotated message-handling methods with |
||||||
|
* non-blocking, reactive contracts. |
||||||
|
*/ |
||||||
|
@NonNullApi |
||||||
|
@NonNullFields |
||||||
|
package org.springframework.messaging.handler.annotation.support.reactive; |
||||||
|
|
||||||
|
import org.springframework.lang.NonNullApi; |
||||||
|
import org.springframework.lang.NonNullFields; |
||||||
@ -0,0 +1,169 @@ |
|||||||
|
/* |
||||||
|
* Copyright 2002-2019 the original author or authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
package org.springframework.messaging.handler.invocation.reactive; |
||||||
|
|
||||||
|
import java.util.Collections; |
||||||
|
import java.util.List; |
||||||
|
import java.util.Map; |
||||||
|
|
||||||
|
import org.apache.commons.logging.Log; |
||||||
|
import org.apache.commons.logging.LogFactory; |
||||||
|
import org.reactivestreams.Publisher; |
||||||
|
import reactor.core.publisher.Flux; |
||||||
|
import reactor.core.publisher.Mono; |
||||||
|
|
||||||
|
import org.springframework.core.MethodParameter; |
||||||
|
import org.springframework.core.ReactiveAdapter; |
||||||
|
import org.springframework.core.ReactiveAdapterRegistry; |
||||||
|
import org.springframework.core.ResolvableType; |
||||||
|
import org.springframework.core.codec.Encoder; |
||||||
|
import org.springframework.core.io.buffer.DataBuffer; |
||||||
|
import org.springframework.core.io.buffer.DataBufferFactory; |
||||||
|
import org.springframework.core.io.buffer.DefaultDataBufferFactory; |
||||||
|
import org.springframework.lang.Nullable; |
||||||
|
import org.springframework.messaging.Message; |
||||||
|
import org.springframework.messaging.MessagingException; |
||||||
|
import org.springframework.util.Assert; |
||||||
|
|
||||||
|
/** |
||||||
|
* Base class for a return value handler that encodes the return value, possibly |
||||||
|
* a {@link Publisher} of values, to a {@code Flux<DataBuffer>} through a |
||||||
|
* compatible {@link Encoder}. |
||||||
|
* |
||||||
|
* <p>Sub-classes must implement the abstract method |
||||||
|
* {@link #handleEncodedContent} to do something with the resulting encoded |
||||||
|
* content. |
||||||
|
* |
||||||
|
* <p>This handler should be ordered last since its {@link #supportsReturnType} |
||||||
|
* returns {@code true} for any method parameter type. |
||||||
|
* |
||||||
|
* @author Rossen Stoyanchev |
||||||
|
* @since 5.2 |
||||||
|
*/ |
||||||
|
public abstract class AbstractEncoderMethodReturnValueHandler implements HandlerMethodReturnValueHandler { |
||||||
|
|
||||||
|
private static final ResolvableType VOID_RESOLVABLE_TYPE = ResolvableType.forClass(Void.class); |
||||||
|
|
||||||
|
private static final ResolvableType OBJECT_RESOLVABLE_TYPE = ResolvableType.forClass(Object.class); |
||||||
|
|
||||||
|
|
||||||
|
protected final Log logger = LogFactory.getLog(getClass()); |
||||||
|
|
||||||
|
|
||||||
|
private final List<Encoder<?>> encoders; |
||||||
|
|
||||||
|
private final ReactiveAdapterRegistry adapterRegistry; |
||||||
|
|
||||||
|
// TODO: configure or passed via MessageHeaders
|
||||||
|
private DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); |
||||||
|
|
||||||
|
|
||||||
|
protected AbstractEncoderMethodReturnValueHandler(List<Encoder<?>> encoders, ReactiveAdapterRegistry registry) { |
||||||
|
Assert.notEmpty(encoders, "At least one Encoder is required"); |
||||||
|
Assert.notNull(registry, "ReactiveAdapterRegistry is required"); |
||||||
|
this.encoders = Collections.unmodifiableList(encoders); |
||||||
|
this.adapterRegistry = registry; |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
/** |
||||||
|
* The configured encoders. |
||||||
|
*/ |
||||||
|
public List<Encoder<?>> getEncoders() { |
||||||
|
return this.encoders; |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* The configured adapter registry. |
||||||
|
*/ |
||||||
|
public ReactiveAdapterRegistry getAdapterRegistry() { |
||||||
|
return this.adapterRegistry; |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
@Override |
||||||
|
public boolean supportsReturnType(MethodParameter returnType) { |
||||||
|
return true; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public Mono<Void> handleReturnValue(Object returnValue, MethodParameter returnType, Message<?> message) { |
||||||
|
Flux<DataBuffer> encodedContent = encodeContent(returnValue, returnType, this.bufferFactory); |
||||||
|
return handleEncodedContent(encodedContent, returnType, message); |
||||||
|
} |
||||||
|
|
||||||
|
@SuppressWarnings("unchecked") |
||||||
|
private Flux<DataBuffer> encodeContent(@Nullable Object content, MethodParameter returnType, |
||||||
|
DataBufferFactory bufferFactory) { |
||||||
|
|
||||||
|
ResolvableType bodyType = ResolvableType.forMethodParameter(returnType); |
||||||
|
ReactiveAdapter adapter = getAdapterRegistry().getAdapter(bodyType.resolve(), content); |
||||||
|
|
||||||
|
Publisher<?> publisher; |
||||||
|
ResolvableType elementType; |
||||||
|
if (adapter != null) { |
||||||
|
publisher = adapter.toPublisher(content); |
||||||
|
ResolvableType genericType = bodyType.getGeneric(); |
||||||
|
elementType = getElementType(adapter, genericType); |
||||||
|
} |
||||||
|
else { |
||||||
|
publisher = Mono.justOrEmpty(content); |
||||||
|
elementType = (bodyType.toClass() == Object.class && content != null ? |
||||||
|
ResolvableType.forInstance(content) : bodyType); |
||||||
|
} |
||||||
|
|
||||||
|
if (elementType.resolve() == void.class || elementType.resolve() == Void.class) { |
||||||
|
return Flux.from(publisher).cast(DataBuffer.class); |
||||||
|
} |
||||||
|
|
||||||
|
if (logger.isDebugEnabled()) { |
||||||
|
logger.debug((publisher instanceof Mono ? "0..1" : "0..N") + " [" + elementType + "]"); |
||||||
|
} |
||||||
|
|
||||||
|
for (Encoder<?> encoder : getEncoders()) { |
||||||
|
if (encoder.canEncode(elementType, null)) { |
||||||
|
Map<String, Object> hints = Collections.emptyMap(); |
||||||
|
return encoder.encode((Publisher) publisher, bufferFactory, elementType, null, hints); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
return Flux.error(new MessagingException("No encoder for " + returnType)); |
||||||
|
} |
||||||
|
|
||||||
|
private ResolvableType getElementType(ReactiveAdapter adapter, ResolvableType genericType) { |
||||||
|
if (adapter.isNoValue()) { |
||||||
|
return VOID_RESOLVABLE_TYPE; |
||||||
|
} |
||||||
|
else if (genericType != ResolvableType.NONE) { |
||||||
|
return genericType; |
||||||
|
} |
||||||
|
else { |
||||||
|
return OBJECT_RESOLVABLE_TYPE; |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* Handle the encoded content in some way, e.g. wrapping it in a message and |
||||||
|
* passing it on for further processing. |
||||||
|
* @param encodedContent the result of data encoding |
||||||
|
* @param returnType return type of the handler method that produced the data |
||||||
|
* @param message the input message handled by the handler method |
||||||
|
* @return completion {@code Mono<Void>} for the handling |
||||||
|
*/ |
||||||
|
protected abstract Mono<Void> handleEncodedContent( |
||||||
|
Flux<DataBuffer> encodedContent, MethodParameter returnType, Message<?> message); |
||||||
|
|
||||||
|
} |
||||||
@ -0,0 +1,208 @@ |
|||||||
|
/* |
||||||
|
* Copyright 2002-2019 the original author or authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
package org.springframework.messaging.handler.annotation.support.reactive; |
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets; |
||||||
|
import java.time.Duration; |
||||||
|
import java.util.ArrayList; |
||||||
|
import java.util.Arrays; |
||||||
|
import java.util.Collections; |
||||||
|
import java.util.List; |
||||||
|
|
||||||
|
import org.junit.Test; |
||||||
|
import org.reactivestreams.Publisher; |
||||||
|
import reactor.core.publisher.Flux; |
||||||
|
import reactor.core.publisher.Mono; |
||||||
|
import reactor.test.StepVerifier; |
||||||
|
|
||||||
|
import org.springframework.core.MethodParameter; |
||||||
|
import org.springframework.core.ResolvableType; |
||||||
|
import org.springframework.core.codec.Decoder; |
||||||
|
import org.springframework.core.codec.StringDecoder; |
||||||
|
import org.springframework.core.io.buffer.DataBuffer; |
||||||
|
import org.springframework.core.io.buffer.DefaultDataBufferFactory; |
||||||
|
import org.springframework.lang.Nullable; |
||||||
|
import org.springframework.messaging.Message; |
||||||
|
import org.springframework.messaging.MessageHeaders; |
||||||
|
import org.springframework.messaging.handler.annotation.Payload; |
||||||
|
import org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException; |
||||||
|
import org.springframework.messaging.handler.invocation.MethodArgumentResolutionException; |
||||||
|
import org.springframework.messaging.handler.invocation.ResolvableMethod; |
||||||
|
import org.springframework.messaging.support.GenericMessage; |
||||||
|
import org.springframework.util.MimeTypeUtils; |
||||||
|
import org.springframework.validation.Errors; |
||||||
|
import org.springframework.validation.Validator; |
||||||
|
import org.springframework.validation.annotation.Validated; |
||||||
|
|
||||||
|
import static org.junit.Assert.*; |
||||||
|
|
||||||
|
|
||||||
|
/** |
||||||
|
* Unit tests for {@link PayloadMethodArgumentResolver}. |
||||||
|
* |
||||||
|
* @author Rossen Stoyanchev |
||||||
|
*/ |
||||||
|
public class PayloadMethodArgumentResolverTests { |
||||||
|
|
||||||
|
private final List<Decoder<?>> decoders = new ArrayList<>(); |
||||||
|
|
||||||
|
private final ResolvableMethod testMethod = ResolvableMethod.on(getClass()).named("handle").build(); |
||||||
|
|
||||||
|
|
||||||
|
@Test |
||||||
|
public void supportsParameter() { |
||||||
|
|
||||||
|
boolean useDefaultResolution = true; |
||||||
|
PayloadMethodArgumentResolver resolver = createResolver(null, useDefaultResolution); |
||||||
|
|
||||||
|
assertTrue(resolver.supportsParameter(this.testMethod.annotPresent(Payload.class).arg())); |
||||||
|
assertTrue(resolver.supportsParameter(this.testMethod.annotNotPresent(Payload.class).arg(String.class))); |
||||||
|
|
||||||
|
useDefaultResolution = false; |
||||||
|
resolver = createResolver(null, useDefaultResolution); |
||||||
|
|
||||||
|
assertTrue(resolver.supportsParameter(this.testMethod.annotPresent(Payload.class).arg())); |
||||||
|
assertFalse(resolver.supportsParameter(this.testMethod.annotNotPresent(Payload.class).arg(String.class))); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void emptyBodyWhenRequired() { |
||||||
|
MethodParameter param = this.testMethod.arg(ResolvableType.forClassWithGenerics(Mono.class, String.class)); |
||||||
|
Mono<Object> mono = resolveValue(param, Mono.empty(), null); |
||||||
|
|
||||||
|
StepVerifier.create(mono) |
||||||
|
.consumeErrorWith(ex -> { |
||||||
|
assertEquals(MethodArgumentResolutionException.class, ex.getClass()); |
||||||
|
assertTrue(ex.getMessage(), ex.getMessage().contains("Payload content is missing")); |
||||||
|
}) |
||||||
|
.verify(); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void emptyBodyWhenNotRequired() { |
||||||
|
MethodParameter param = this.testMethod.annotPresent(Payload.class).arg(); |
||||||
|
assertNull(resolveValue(param, Mono.empty(), null)); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void stringMono() { |
||||||
|
String body = "foo"; |
||||||
|
MethodParameter param = this.testMethod.arg(ResolvableType.forClassWithGenerics(Mono.class, String.class)); |
||||||
|
Mono<DataBuffer> value = Mono.delay(Duration.ofMillis(10)).map(aLong -> toDataBuffer(body)); |
||||||
|
Mono<Object> mono = resolveValue(param, value, null); |
||||||
|
|
||||||
|
assertEquals(body, mono.block()); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void stringFlux() { |
||||||
|
List<String> body = Arrays.asList("foo", "bar"); |
||||||
|
ResolvableType type = ResolvableType.forClassWithGenerics(Flux.class, String.class); |
||||||
|
MethodParameter param = this.testMethod.arg(type); |
||||||
|
Flux<Object> flux = resolveValue(param, Flux.fromIterable(body) |
||||||
|
.delayElements(Duration.ofMillis(10)).map(value -> toDataBuffer(value + "\n")), null); |
||||||
|
|
||||||
|
assertEquals(body, flux.collectList().block()); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void string() { |
||||||
|
String body = "foo"; |
||||||
|
MethodParameter param = this.testMethod.annotNotPresent(Payload.class).arg(String.class); |
||||||
|
Object value = resolveValue(param, Mono.just(toDataBuffer(body)), null); |
||||||
|
|
||||||
|
assertEquals(body, value); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void validateStringMono() { |
||||||
|
ResolvableType type = ResolvableType.forClassWithGenerics(Mono.class, String.class); |
||||||
|
MethodParameter param = this.testMethod.arg(type); |
||||||
|
Mono<Object> mono = resolveValue(param, Mono.just(toDataBuffer("12345")), new TestValidator()); |
||||||
|
|
||||||
|
StepVerifier.create(mono).expectNextCount(0) |
||||||
|
.expectError(MethodArgumentNotValidException.class).verify(); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void validateStringFlux() { |
||||||
|
ResolvableType type = ResolvableType.forClassWithGenerics(Flux.class, String.class); |
||||||
|
MethodParameter param = this.testMethod.arg(type); |
||||||
|
Flux<Object> flux = resolveValue(param, Flux.just(toDataBuffer("12345678\n12345")), new TestValidator()); |
||||||
|
|
||||||
|
StepVerifier.create(flux) |
||||||
|
.expectNext("12345678") |
||||||
|
.expectError(MethodArgumentNotValidException.class) |
||||||
|
.verify(); |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
private DataBuffer toDataBuffer(String value) { |
||||||
|
return new DefaultDataBufferFactory().wrap(value.getBytes(StandardCharsets.UTF_8)); |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked") |
||||||
|
@Nullable |
||||||
|
private <T> T resolveValue(MethodParameter param, Publisher<DataBuffer> content, Validator validator) { |
||||||
|
|
||||||
|
Message<?> message = new GenericMessage<>(content, |
||||||
|
Collections.singletonMap(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)); |
||||||
|
|
||||||
|
Mono<Object> result = createResolver(validator, true).resolveArgument(param, message); |
||||||
|
|
||||||
|
Object value = result.block(Duration.ofSeconds(5)); |
||||||
|
if (value != null) { |
||||||
|
Class<?> expectedType = param.getParameterType(); |
||||||
|
assertTrue("Unexpected return value type: " + value, expectedType.isAssignableFrom(value.getClass())); |
||||||
|
} |
||||||
|
return (T) value; |
||||||
|
} |
||||||
|
|
||||||
|
private PayloadMethodArgumentResolver createResolver(@Nullable Validator validator, boolean useDefaultResolution) { |
||||||
|
if (this.decoders.isEmpty()) { |
||||||
|
this.decoders.add(StringDecoder.allMimeTypes()); |
||||||
|
} |
||||||
|
List<StringDecoder> decoders = Collections.singletonList(StringDecoder.allMimeTypes()); |
||||||
|
return new PayloadMethodArgumentResolver(decoders, validator, null, useDefaultResolution) {}; |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
@SuppressWarnings("unused") |
||||||
|
private void handle( |
||||||
|
@Validated Mono<String> valueMono, |
||||||
|
@Validated Flux<String> valueFlux, |
||||||
|
@Payload(required = false) String optionalValue, |
||||||
|
String value) { |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
private static class TestValidator implements Validator { |
||||||
|
|
||||||
|
@Override |
||||||
|
public boolean supports(Class<?> clazz) { |
||||||
|
return clazz.equals(String.class); |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public void validate(@Nullable Object target, Errors errors) { |
||||||
|
if (target instanceof String && ((String) target).length() < 8) { |
||||||
|
errors.reject("Invalid length"); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
@ -0,0 +1,154 @@ |
|||||||
|
/* |
||||||
|
* Copyright 2002-2019 the original author or authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
package org.springframework.messaging.handler.invocation.reactive; |
||||||
|
|
||||||
|
import java.util.Collections; |
||||||
|
import java.util.List; |
||||||
|
|
||||||
|
import io.reactivex.Completable; |
||||||
|
import org.junit.Test; |
||||||
|
import reactor.core.publisher.Flux; |
||||||
|
import reactor.core.publisher.Mono; |
||||||
|
import reactor.test.StepVerifier; |
||||||
|
|
||||||
|
import org.springframework.core.MethodParameter; |
||||||
|
import org.springframework.core.ReactiveAdapterRegistry; |
||||||
|
import org.springframework.core.codec.CharSequenceEncoder; |
||||||
|
import org.springframework.core.codec.Encoder; |
||||||
|
import org.springframework.core.io.buffer.DataBuffer; |
||||||
|
import org.springframework.core.io.buffer.support.DataBufferTestUtils; |
||||||
|
import org.springframework.lang.Nullable; |
||||||
|
import org.springframework.messaging.Message; |
||||||
|
|
||||||
|
import static java.nio.charset.StandardCharsets.*; |
||||||
|
import static org.junit.Assert.*; |
||||||
|
import static org.mockito.Mockito.*; |
||||||
|
import static org.springframework.messaging.handler.invocation.ResolvableMethod.*; |
||||||
|
|
||||||
|
/** |
||||||
|
* Unit tests for {@link AbstractEncoderMethodReturnValueHandler}. |
||||||
|
* |
||||||
|
* @author Rossen Stoyanchev |
||||||
|
*/ |
||||||
|
public class EncoderMethodReturnValueHandlerTests { |
||||||
|
|
||||||
|
private final TestEncoderMethodReturnValueHandler handler = new TestEncoderMethodReturnValueHandler( |
||||||
|
Collections.singletonList(CharSequenceEncoder.textPlainOnly()), |
||||||
|
ReactiveAdapterRegistry.getSharedInstance()); |
||||||
|
|
||||||
|
private final Message<?> message = mock(Message.class); |
||||||
|
|
||||||
|
|
||||||
|
@Test |
||||||
|
public void stringReturnValue() { |
||||||
|
MethodParameter parameter = on(TestController.class).resolveReturnType(String.class); |
||||||
|
this.handler.handleReturnValue("foo", parameter, message).block(); |
||||||
|
Flux<DataBuffer> result = this.handler.encodedContent; |
||||||
|
|
||||||
|
StepVerifier.create(result) |
||||||
|
.consumeNextWith(buffer -> assertEquals("foo", DataBufferTestUtils.dumpString(buffer, UTF_8))) |
||||||
|
.verifyComplete(); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void objectReturnValue() { |
||||||
|
MethodParameter parameter = on(TestController.class).resolveReturnType(Object.class); |
||||||
|
this.handler.handleReturnValue("foo", parameter, message).block(); |
||||||
|
Flux<DataBuffer> result = this.handler.encodedContent; |
||||||
|
|
||||||
|
StepVerifier.create(result) |
||||||
|
.consumeNextWith(buffer -> assertEquals("foo", DataBufferTestUtils.dumpString(buffer, UTF_8))) |
||||||
|
.verifyComplete(); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void fluxStringReturnValue() { |
||||||
|
MethodParameter parameter = on(TestController.class).resolveReturnType(Flux.class, String.class); |
||||||
|
this.handler.handleReturnValue(Flux.just("foo", "bar"), parameter, message).block(); |
||||||
|
Flux<DataBuffer> result = this.handler.encodedContent; |
||||||
|
|
||||||
|
StepVerifier.create(result) |
||||||
|
.consumeNextWith(buffer -> assertEquals("foo", DataBufferTestUtils.dumpString(buffer, UTF_8))) |
||||||
|
.consumeNextWith(buffer -> assertEquals("bar", DataBufferTestUtils.dumpString(buffer, UTF_8))) |
||||||
|
.verifyComplete(); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void voidReturnValue() { |
||||||
|
testVoidReturnType(null, on(TestController.class).resolveReturnType(void.class)); |
||||||
|
testVoidReturnType(Mono.empty(), on(TestController.class).resolveReturnType(Mono.class, Void.class)); |
||||||
|
testVoidReturnType(Completable.complete(), on(TestController.class).resolveReturnType(Completable.class)); |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
private void testVoidReturnType(@Nullable Object value, MethodParameter bodyParameter) { |
||||||
|
this.handler.handleReturnValue(value, bodyParameter, message).block(); |
||||||
|
Flux<DataBuffer> result = this.handler.encodedContent; |
||||||
|
StepVerifier.create(result).expectComplete().verify(); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void noEncoder() { |
||||||
|
MethodParameter parameter = on(TestController.class).resolveReturnType(Object.class); |
||||||
|
this.handler.handleReturnValue(new Object(), parameter, message).block(); |
||||||
|
Flux<DataBuffer> result = this.handler.encodedContent; |
||||||
|
|
||||||
|
StepVerifier.create(result) |
||||||
|
.expectErrorMessage("No encoder for method 'object' parameter -1") |
||||||
|
.verify(); |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
@SuppressWarnings({"unused", "ConstantConditions"}) |
||||||
|
private static class TestController { |
||||||
|
|
||||||
|
String string() { return null; } |
||||||
|
|
||||||
|
Object object() { return null; } |
||||||
|
|
||||||
|
Flux<String> fluxString() { return null; } |
||||||
|
|
||||||
|
void voidReturn() { } |
||||||
|
|
||||||
|
Mono<Void> monoVoid() { return null; } |
||||||
|
|
||||||
|
Completable completable() { return null; } |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
private static class TestEncoderMethodReturnValueHandler extends AbstractEncoderMethodReturnValueHandler { |
||||||
|
|
||||||
|
private Flux<DataBuffer> encodedContent; |
||||||
|
|
||||||
|
|
||||||
|
public Flux<DataBuffer> getEncodedContent() { |
||||||
|
return this.encodedContent; |
||||||
|
} |
||||||
|
|
||||||
|
protected TestEncoderMethodReturnValueHandler(List<Encoder<?>> encoders, ReactiveAdapterRegistry registry) { |
||||||
|
super(encoders, registry); |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
protected Mono<Void> handleEncodedContent( |
||||||
|
Flux<DataBuffer> encodedContent, MethodParameter returnType, Message<?> message) { |
||||||
|
|
||||||
|
this.encodedContent = encodedContent; |
||||||
|
return Mono.empty(); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
Loading…
Reference in new issue