@ -34,7 +34,6 @@ import java.time.Duration;
import java.util.Arrays ;
import java.util.Arrays ;
import java.util.List ;
import java.util.List ;
import java.util.Map ;
import java.util.Map ;
import java.util.concurrent.atomic.AtomicInteger ;
import java.util.function.Consumer ;
import java.util.function.Consumer ;
import java.util.function.Function ;
import java.util.function.Function ;
import java.util.stream.Collectors ;
import java.util.stream.Collectors ;
@ -50,6 +49,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource ;
import org.junit.jupiter.params.provider.MethodSource ;
import reactor.core.publisher.Flux ;
import reactor.core.publisher.Flux ;
import reactor.core.publisher.Mono ;
import reactor.core.publisher.Mono ;
import reactor.core.publisher.Sinks ;
import reactor.netty.http.client.HttpClient ;
import reactor.netty.http.client.HttpClient ;
import reactor.netty.resources.ConnectionProvider ;
import reactor.netty.resources.ConnectionProvider ;
import reactor.test.StepVerifier ;
import reactor.test.StepVerifier ;
@ -1281,12 +1281,13 @@ class WebClientIntegrationTests {
private < T > Mono < T > doMalformedChunkedResponseTest (
private < T > Mono < T > doMalformedChunkedResponseTest (
ClientHttpConnector connector , Function < ResponseSpec , Mono < T > > handler ) {
ClientHttpConnector connector , Function < ResponseSpec , Mono < T > > handler ) {
AtomicInteger port = new AtomicInteger ( ) ;
Sinks . One < Integer > portSink = Sinks . one ( ) ;
Thread serverThread = new Thread ( ( ) - > {
Thread serverThread = new Thread ( ( ) - > {
// No way to simulate a malformed chunked response through MockWebServer.
// No way to simulate a malformed chunked response through MockWebServer.
try ( ServerSocket serverSocket = new ServerSocket ( 0 ) ) {
try ( ServerSocket serverSocket = new ServerSocket ( 0 ) ) {
port . set ( serverSocket . getLocalPort ( ) ) ;
Sinks . EmitResult result = portSink . tryEmitValue ( serverSocket . getLocalPort ( ) ) ;
assertThat ( result ) . isEqualTo ( Sinks . EmitResult . OK ) ;
Socket socket = serverSocket . accept ( ) ;
Socket socket = serverSocket . accept ( ) ;
InputStream is = socket . getInputStream ( ) ;
InputStream is = socket . getInputStream ( ) ;
@ -1310,12 +1311,13 @@ class WebClientIntegrationTests {
serverThread . start ( ) ;
serverThread . start ( ) ;
WebClient client = WebClient . builder ( )
return portSink . asMono ( ) . flatMap ( port - > {
. clientConnector ( connector )
WebClient client = WebClient . builder ( )
. baseUrl ( "http://localhost:" + port )
. clientConnector ( connector )
. build ( ) ;
. baseUrl ( "http://localhost:" + port )
. build ( ) ;
return handler . apply ( client . post ( ) . retrieve ( ) ) ;
return handler . apply ( client . post ( ) . retrieve ( ) ) ;
} ) ;
}
}
private void prepareResponse ( Consumer < MockResponse > consumer ) {
private void prepareResponse ( Consumer < MockResponse > consumer ) {