From 55361fa4f6ed8a571314e19eb6eff47fc7ef2714 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Basl=C3=A9?= Date: Thu, 25 Apr 2024 10:09:05 +0200 Subject: [PATCH] Release RSocket setup payload when it is not handled This commit counterbalances the retain in the MessagingRSocket handleConnectionSetupPayload method with a conditional release on SETUP frame type in handleNoMatch, preventing Netty buffer leaks. Closes gh-32424 --- .../support/RSocketMessageHandler.java | 6 ++- .../support/RSocketMessageHandlerTests.java | 41 ++++++++++++++++++- 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandler.java index a13942d77e2..37f6c6a811f 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2023 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,6 +37,7 @@ import org.springframework.core.ReactiveAdapterRegistry; import org.springframework.core.annotation.AnnotatedElementUtils; import org.springframework.core.codec.Decoder; import org.springframework.core.codec.Encoder; +import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageDeliveryException; @@ -381,6 +382,9 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler { protected void handleNoMatch(@Nullable RouteMatcher.Route destination, Message message) { FrameType frameType = RSocketFrameTypeMessageCondition.getFrameType(message); if (frameType == FrameType.SETUP || frameType == FrameType.METADATA_PUSH) { + if (frameType == FrameType.SETUP && message.getPayload() instanceof PooledDataBuffer pooledDataBuffer) { + pooledDataBuffer.release(); + } return; // optional handling } if (frameType == FrameType.REQUEST_FNF) { diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandlerTests.java index fc557428278..bad6da50b2e 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandlerTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandlerTests.java @@ -16,10 +16,12 @@ package org.springframework.messaging.rsocket.annotation.support; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; import java.util.Map; +import io.netty.buffer.UnpooledByteBufAllocator; import io.rsocket.frame.FrameType; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; @@ -33,6 +35,8 @@ import org.springframework.core.codec.ByteBufferDecoder; import org.springframework.core.codec.ByteBufferEncoder; import org.springframework.core.codec.CharSequenceEncoder; import org.springframework.core.codec.StringDecoder; +import org.springframework.core.io.buffer.NettyDataBuffer; +import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.messaging.Message; import org.springframework.messaging.handler.CompositeMessageCondition; import org.springframework.messaging.handler.DestinationPatternsMessageCondition; @@ -250,6 +254,10 @@ class RSocketMessageHandlerTests { } private static void testHandleNoMatch(FrameType frameType) { + testHandleNoMatch(frameType, ""); + } + + private static void testHandleNoMatch(FrameType frameType, Object payload) { RSocketMessageHandler handler = new RSocketMessageHandler(); handler.setDecoders(Collections.singletonList(StringDecoder.allMimeTypes())); handler.setEncoders(Collections.singletonList(CharSequenceEncoder.allMimeTypes())); @@ -260,11 +268,42 @@ class RSocketMessageHandlerTests { MessageHeaderAccessor headers = new MessageHeaderAccessor(); headers.setHeader(RSocketFrameTypeMessageCondition.FRAME_TYPE_HEADER, frameType); - Message message = MessageBuilder.createMessage("", headers.getMessageHeaders()); + Message message = MessageBuilder.createMessage(payload, headers.getMessageHeaders()); handler.handleNoMatch(route, message); } + @Test + void handleNoMatchWithNettyBufferPayload() { + + testHandleNoMatchBuffer(FrameType.SETUP, true); + testHandleNoMatchBuffer(FrameType.METADATA_PUSH, false); + testHandleNoMatchBuffer(FrameType.REQUEST_FNF, false); + + assertThatThrownBy(() -> testHandleNoMatchBuffer(FrameType.REQUEST_RESPONSE, false)) + .hasMessage("No handler for destination 'path'"); + } + + private static void testHandleNoMatchBuffer(FrameType frameType, boolean expectReleased) { + NettyDataBufferFactory factory = new NettyDataBufferFactory(UnpooledByteBufAllocator.DEFAULT); + NettyDataBuffer buf = factory.allocateBuffer(5); + buf.write("hello", StandardCharsets.UTF_8); + + assertThat(buf.getNativeBuffer().refCnt()).as(frameType + " refCnt").isOne(); + + try { + testHandleNoMatch(frameType, buf); + } + finally { + if (expectReleased) { + assertThat(buf.getNativeBuffer().refCnt()).as(frameType + " is released").isZero(); + } + else { + assertThat(buf.getNativeBuffer().refCnt()).as("is not released").isOne(); + } + } + } + private static class SimpleController {