Browse Source

Release RSocket setup payload when it is not handled

This commit counterbalances the retain in the MessagingRSocket
handleConnectionSetupPayload method with a conditional release on SETUP
frame type in handleNoMatch, preventing Netty buffer leaks.

Closes gh-32424
pull/32864/head
Simon Baslé 2 years ago
parent
commit
55361fa4f6
  1. 6
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandler.java
  2. 41
      spring-messaging/src/test/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandlerTests.java

6
spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandler.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -37,6 +37,7 @@ import org.springframework.core.ReactiveAdapterRegistry; @@ -37,6 +37,7 @@ import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.Encoder;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageDeliveryException;
@ -381,6 +382,9 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler { @@ -381,6 +382,9 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
protected void handleNoMatch(@Nullable RouteMatcher.Route destination, Message<?> message) {
FrameType frameType = RSocketFrameTypeMessageCondition.getFrameType(message);
if (frameType == FrameType.SETUP || frameType == FrameType.METADATA_PUSH) {
if (frameType == FrameType.SETUP && message.getPayload() instanceof PooledDataBuffer pooledDataBuffer) {
pooledDataBuffer.release();
}
return; // optional handling
}
if (frameType == FrameType.REQUEST_FNF) {

41
spring-messaging/src/test/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandlerTests.java

@ -16,10 +16,12 @@ @@ -16,10 +16,12 @@
package org.springframework.messaging.rsocket.annotation.support;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.rsocket.frame.FrameType;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
@ -33,6 +35,8 @@ import org.springframework.core.codec.ByteBufferDecoder; @@ -33,6 +35,8 @@ import org.springframework.core.codec.ByteBufferDecoder;
import org.springframework.core.codec.ByteBufferEncoder;
import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.core.io.buffer.NettyDataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.CompositeMessageCondition;
import org.springframework.messaging.handler.DestinationPatternsMessageCondition;
@ -250,6 +254,10 @@ class RSocketMessageHandlerTests { @@ -250,6 +254,10 @@ class RSocketMessageHandlerTests {
}
private static void testHandleNoMatch(FrameType frameType) {
testHandleNoMatch(frameType, "");
}
private static void testHandleNoMatch(FrameType frameType, Object payload) {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setDecoders(Collections.singletonList(StringDecoder.allMimeTypes()));
handler.setEncoders(Collections.singletonList(CharSequenceEncoder.allMimeTypes()));
@ -260,11 +268,42 @@ class RSocketMessageHandlerTests { @@ -260,11 +268,42 @@ class RSocketMessageHandlerTests {
MessageHeaderAccessor headers = new MessageHeaderAccessor();
headers.setHeader(RSocketFrameTypeMessageCondition.FRAME_TYPE_HEADER, frameType);
Message<Object> message = MessageBuilder.createMessage("", headers.getMessageHeaders());
Message<Object> message = MessageBuilder.createMessage(payload, headers.getMessageHeaders());
handler.handleNoMatch(route, message);
}
@Test
void handleNoMatchWithNettyBufferPayload() {
testHandleNoMatchBuffer(FrameType.SETUP, true);
testHandleNoMatchBuffer(FrameType.METADATA_PUSH, false);
testHandleNoMatchBuffer(FrameType.REQUEST_FNF, false);
assertThatThrownBy(() -> testHandleNoMatchBuffer(FrameType.REQUEST_RESPONSE, false))
.hasMessage("No handler for destination 'path'");
}
private static void testHandleNoMatchBuffer(FrameType frameType, boolean expectReleased) {
NettyDataBufferFactory factory = new NettyDataBufferFactory(UnpooledByteBufAllocator.DEFAULT);
NettyDataBuffer buf = factory.allocateBuffer(5);
buf.write("hello", StandardCharsets.UTF_8);
assertThat(buf.getNativeBuffer().refCnt()).as(frameType + " refCnt").isOne();
try {
testHandleNoMatch(frameType, buf);
}
finally {
if (expectReleased) {
assertThat(buf.getNativeBuffer().refCnt()).as(frameType + " is released").isZero();
}
else {
assertThat(buf.getNativeBuffer().refCnt()).as("is not released").isOne();
}
}
}
private static class SimpleController {

Loading…
Cancel
Save