Browse Source

Clear Netty channel attribute

Closes gh-36158
6.2.x
rstoyanchev 6 days ago
parent
commit
183cd4c8cd
  1. 18
      spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java
  2. 24
      spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java

18
spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java

@ -21,11 +21,14 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function; import java.util.function.Function;
import io.netty.channel.Channel;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.netty.NettyOutbound; import reactor.netty.NettyOutbound;
import reactor.netty.channel.ChannelOperations;
import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientRequest; import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.resources.ConnectionProvider; import reactor.netty.resources.ConnectionProvider;
@ -163,10 +166,14 @@ public class ReactorClientHttpConnector implements ClientHttpConnector, SmartLif
.request(io.netty.handler.codec.http.HttpMethod.valueOf(method.name())); .request(io.netty.handler.codec.http.HttpMethod.valueOf(method.name()));
requestSender = setUri(requestSender, uri); requestSender = setUri(requestSender, uri);
AtomicReference<Channel> channelRef = new AtomicReference<>();
AtomicReference<ReactorClientHttpResponse> responseRef = new AtomicReference<>(); AtomicReference<ReactorClientHttpResponse> responseRef = new AtomicReference<>();
return requestSender return requestSender
.send((request, outbound) -> requestCallback.apply(adaptRequest(method, uri, request, outbound))) .send((request, outbound) -> {
channelRef.set(((ChannelOperations<?, ?>) request).channel());
return requestCallback.apply(adaptRequest(method, uri, request, outbound));
})
.responseConnection((response, connection) -> { .responseConnection((response, connection) -> {
responseRef.set(new ReactorClientHttpResponse(response, connection)); responseRef.set(new ReactorClientHttpResponse(response, connection));
return Mono.just((ClientHttpResponse) responseRef.get()); return Mono.just((ClientHttpResponse) responseRef.get());
@ -177,6 +184,15 @@ public class ReactorClientHttpConnector implements ClientHttpConnector, SmartLif
if (response != null) { if (response != null) {
response.releaseAfterCancel(method); response.releaseAfterCancel(method);
} }
})
.doOnTerminate(() -> {
Channel channel = channelRef.get();
if (channel != null) {
Attribute<Map<String, Object>> attribute = channel.attr(ATTRIBUTES_KEY);
if (attribute != null) {
attribute.set(null);
}
}
}); });
} }

24
spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java

@ -40,6 +40,7 @@ import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import io.netty.channel.Channel;
import io.netty.util.Attribute; import io.netty.util.Attribute;
import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer; import okhttp3.mockwebserver.MockWebServer;
@ -55,6 +56,7 @@ import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks;
import reactor.netty.channel.ChannelOperations; import reactor.netty.channel.ChannelOperations;
import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.resources.ConnectionProvider; import reactor.netty.resources.ConnectionProvider;
import reactor.test.StepVerifier; import reactor.test.StepVerifier;
@ -1328,6 +1330,28 @@ class WebClientIntegrationTests {
.verify(Duration.ofSeconds(3)); .verify(Duration.ofSeconds(3));
} }
@Test // gh-36158
void reactorNettyAttributes() throws IOException {
startServer(new ReactorClientHttpConnector());
prepareResponse(builder ->
builder.setHeader("Content-Type", "text/plain").body("Hello Spring!"));
AtomicReference<Channel> channelRef = new AtomicReference<>();
Mono<String> result = this.webClient.get().uri("/greeting")
.httpRequest(request -> {
HttpClientRequest reactorRequest = request.getNativeRequest();
channelRef.set(((ChannelOperations<?, ?>) reactorRequest).channel());
})
.retrieve()
.bodyToMono(String.class);
StepVerifier.create(result).expectNext("Hello Spring!").expectComplete().verify(Duration.ofSeconds(3));
assertThat(channelRef.get().attr(ReactorClientHttpConnector.ATTRIBUTES_KEY).get()).isNull();
}
private <T> Mono<T> doMalformedChunkedResponseTest( private <T> Mono<T> doMalformedChunkedResponseTest(
ClientHttpConnector connector, Function<ResponseSpec, Mono<T>> handler) { ClientHttpConnector connector, Function<ResponseSpec, Mono<T>> handler) {

Loading…
Cancel
Save