Browse Source

Refine solution to clear Netty channel attribute

Closes gh-36158
6.2.x
rstoyanchev 2 weeks ago
parent
commit
9f1332c716
  1. 34
      spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java

34
spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java

@ -21,14 +21,12 @@ import java.util.Map; @@ -21,14 +21,12 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import io.netty.channel.Channel;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.NettyOutbound;
import reactor.netty.channel.ChannelOperations;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.resources.ConnectionProvider;
@ -166,17 +164,15 @@ public class ReactorClientHttpConnector implements ClientHttpConnector, SmartLif @@ -166,17 +164,15 @@ public class ReactorClientHttpConnector implements ClientHttpConnector, SmartLif
.request(io.netty.handler.codec.http.HttpMethod.valueOf(method.name()));
requestSender = setUri(requestSender, uri);
AtomicReference<Channel> channelRef = new AtomicReference<>();
AtomicReference<ReactorClientHttpResponse> responseRef = new AtomicReference<>();
return requestSender
.send((request, outbound) -> {
channelRef.set(((ChannelOperations<?, ?>) request).channel());
return requestCallback.apply(adaptRequest(method, uri, request, outbound));
})
.send((request, outbound) -> requestCallback.apply(adaptRequest(method, uri, request, outbound)))
.responseConnection((response, connection) -> {
responseRef.set(new ReactorClientHttpResponse(response, connection));
return Mono.just((ClientHttpResponse) responseRef.get());
ReactorClientHttpResponse clientResponse = new ReactorClientHttpResponse(response, connection);
responseRef.set(clientResponse);
clearChannelAttribute(connection);
return Mono.just((ClientHttpResponse) clientResponse);
})
.next()
.doOnCancel(() -> {
@ -184,15 +180,6 @@ public class ReactorClientHttpConnector implements ClientHttpConnector, SmartLif @@ -184,15 +180,6 @@ public class ReactorClientHttpConnector implements ClientHttpConnector, SmartLif
if (response != null) {
response.releaseAfterCancel(method);
}
})
.doOnTerminate(() -> {
Channel channel = channelRef.get();
if (channel != null) {
Attribute<Map<String, Object>> attribute = channel.attr(ATTRIBUTES_KEY);
if (attribute != null) {
attribute.set(null);
}
}
});
}
@ -214,6 +201,15 @@ public class ReactorClientHttpConnector implements ClientHttpConnector, SmartLif @@ -214,6 +201,15 @@ public class ReactorClientHttpConnector implements ClientHttpConnector, SmartLif
return new ReactorClientHttpRequest(method, uri, request, nettyOutbound);
}
private static void clearChannelAttribute(Connection connection) {
connection.onTerminate().subscribe(
aVoid -> {}, ex -> clearAttribute(connection), () -> clearAttribute(connection));
}
private static void clearAttribute(Connection connection) {
connection.channel().attr(ATTRIBUTES_KEY).set(null);
}
@Override
public void start() {

Loading…
Cancel
Save