diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java index 77f33fb71c0..102162ba743 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java @@ -21,11 +21,14 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import io.netty.channel.Channel; +import io.netty.util.Attribute; import io.netty.util.AttributeKey; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import reactor.core.publisher.Mono; import reactor.netty.NettyOutbound; +import reactor.netty.channel.ChannelOperations; import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.HttpClientRequest; import reactor.netty.resources.ConnectionProvider; @@ -163,10 +166,14 @@ public class ReactorClientHttpConnector implements ClientHttpConnector, SmartLif .request(io.netty.handler.codec.http.HttpMethod.valueOf(method.name())); requestSender = setUri(requestSender, uri); + AtomicReference channelRef = new AtomicReference<>(); AtomicReference responseRef = new AtomicReference<>(); return requestSender - .send((request, outbound) -> requestCallback.apply(adaptRequest(method, uri, request, outbound))) + .send((request, outbound) -> { + channelRef.set(((ChannelOperations) request).channel()); + return requestCallback.apply(adaptRequest(method, uri, request, outbound)); + }) .responseConnection((response, connection) -> { responseRef.set(new ReactorClientHttpResponse(response, connection)); return Mono.just((ClientHttpResponse) responseRef.get()); @@ -177,6 +184,15 @@ public class ReactorClientHttpConnector implements ClientHttpConnector, SmartLif if (response != null) { response.releaseAfterCancel(method); } + }) + .doOnTerminate(() -> { + Channel channel = channelRef.get(); + if (channel != null) { + Attribute> attribute = channel.attr(ATTRIBUTES_KEY); + if (attribute != null) { + attribute.set(null); + } + } }); } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java index f338bd8e5dd..7acd8ccec7b 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java @@ -40,6 +40,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; +import io.netty.channel.Channel; import io.netty.util.Attribute; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; @@ -55,6 +56,7 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; import reactor.netty.channel.ChannelOperations; import reactor.netty.http.client.HttpClient; +import reactor.netty.http.client.HttpClientRequest; import reactor.netty.resources.ConnectionProvider; import reactor.test.StepVerifier; @@ -1328,6 +1330,28 @@ class WebClientIntegrationTests { .verify(Duration.ofSeconds(3)); } + @Test // gh-36158 + void reactorNettyAttributes() throws IOException { + startServer(new ReactorClientHttpConnector()); + + prepareResponse(builder -> + builder.setHeader("Content-Type", "text/plain").body("Hello Spring!")); + + AtomicReference channelRef = new AtomicReference<>(); + + Mono result = this.webClient.get().uri("/greeting") + .httpRequest(request -> { + HttpClientRequest reactorRequest = request.getNativeRequest(); + channelRef.set(((ChannelOperations) reactorRequest).channel()); + }) + .retrieve() + .bodyToMono(String.class); + + StepVerifier.create(result).expectNext("Hello Spring!").expectComplete().verify(Duration.ofSeconds(3)); + + assertThat(channelRef.get().attr(ReactorClientHttpConnector.ATTRIBUTES_KEY).get()).isNull(); + } + private Mono doMalformedChunkedResponseTest( ClientHttpConnector connector, Function> handler) {