Browse Source

Refine solution to clear Netty channel attribute

Closes gh-36158
pull/36190/head
rstoyanchev 2 weeks ago
parent
commit
4119ecc25b
  1. 29
      spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java

29
spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java

@ -21,15 +21,13 @@ import java.util.Map; @@ -21,15 +21,13 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import io.netty.channel.Channel;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jspecify.annotations.Nullable;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.NettyOutbound;
import reactor.netty.channel.ChannelOperations;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.resources.ConnectionProvider;
@ -163,17 +161,14 @@ public class ReactorClientHttpConnector implements ClientHttpConnector, SmartLif @@ -163,17 +161,14 @@ public class ReactorClientHttpConnector implements ClientHttpConnector, SmartLif
.request(io.netty.handler.codec.http.HttpMethod.valueOf(method.name()));
requestSender = setUri(requestSender, uri);
AtomicReference<Channel> channelRef = new AtomicReference<>();
AtomicReference<ReactorClientHttpResponse> responseRef = new AtomicReference<>();
return requestSender
.send((request, outbound) -> {
channelRef.set(((ChannelOperations<?, ?>) request).channel());
return requestCallback.apply(adaptRequest(method, uri, request, outbound));
})
.send((request, outbound) -> requestCallback.apply(adaptRequest(method, uri, request, outbound)))
.responseConnection((response, connection) -> {
ReactorClientHttpResponse clientResponse = new ReactorClientHttpResponse(response, connection);
responseRef.set(clientResponse);
clearChannelAttribute(connection);
return Mono.just((ClientHttpResponse) clientResponse);
})
.next()
@ -182,15 +177,6 @@ public class ReactorClientHttpConnector implements ClientHttpConnector, SmartLif @@ -182,15 +177,6 @@ public class ReactorClientHttpConnector implements ClientHttpConnector, SmartLif
if (response != null) {
response.releaseAfterCancel(method);
}
})
.doOnTerminate(() -> {
Channel channel = channelRef.get();
if (channel != null) {
Attribute<Map<String, Object>> attribute = channel.attr(ATTRIBUTES_KEY);
if (attribute != null) {
attribute.set(null);
}
}
});
}
@ -212,6 +198,15 @@ public class ReactorClientHttpConnector implements ClientHttpConnector, SmartLif @@ -212,6 +198,15 @@ public class ReactorClientHttpConnector implements ClientHttpConnector, SmartLif
return new ReactorClientHttpRequest(method, uri, request, nettyOutbound);
}
private static void clearChannelAttribute(Connection connection) {
connection.onTerminate().subscribe(
aVoid -> {}, ex -> clearAttribute(connection), () -> clearAttribute(connection));
}
private static void clearAttribute(Connection connection) {
connection.channel().attr(ATTRIBUTES_KEY).set(null);
}
@Override
public void start() {

Loading…
Cancel
Save