Browse Source
This commit introduces a ClientHttpConnector implementation backed by Apache HttpComponents HttpClient 5.0. Fixes gh-24700pull/25006/head
14 changed files with 537 additions and 8 deletions
@ -0,0 +1,159 @@
@@ -0,0 +1,159 @@
|
||||
/* |
||||
* Copyright 2002-2020 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.http.client.reactive; |
||||
|
||||
import java.net.URI; |
||||
import java.nio.ByteBuffer; |
||||
import java.util.function.BiFunction; |
||||
import java.util.function.Function; |
||||
|
||||
import org.apache.hc.client5.http.cookie.BasicCookieStore; |
||||
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; |
||||
import org.apache.hc.client5.http.impl.async.HttpAsyncClients; |
||||
import org.apache.hc.client5.http.protocol.HttpClientContext; |
||||
import org.apache.hc.core5.concurrent.FutureCallback; |
||||
import org.apache.hc.core5.http.HttpResponse; |
||||
import org.apache.hc.core5.http.Message; |
||||
import org.apache.hc.core5.http.nio.AsyncRequestProducer; |
||||
import org.apache.hc.core5.reactive.ReactiveResponseConsumer; |
||||
import org.reactivestreams.Publisher; |
||||
import reactor.core.publisher.Mono; |
||||
import reactor.core.publisher.MonoSink; |
||||
|
||||
import org.springframework.core.io.buffer.DataBufferFactory; |
||||
import org.springframework.core.io.buffer.DefaultDataBufferFactory; |
||||
import org.springframework.http.HttpMethod; |
||||
import org.springframework.util.Assert; |
||||
|
||||
/** |
||||
* {@link ClientHttpConnector} implementation for the Apache HttpComponents HttpClient 5.x. |
||||
* |
||||
* @author Martin Tarjányi |
||||
* @since 5.3 |
||||
* @see <a href="https://hc.apache.org/index.html">Apache HttpComponents</a> |
||||
*/ |
||||
public class HttpComponentsClientHttpConnector implements ClientHttpConnector { |
||||
|
||||
private final CloseableHttpAsyncClient client; |
||||
|
||||
private final BiFunction<HttpMethod, URI, ? extends HttpClientContext> contextProvider; |
||||
|
||||
private DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory(); |
||||
|
||||
|
||||
/** |
||||
* Default constructor that creates and starts a new instance of {@link CloseableHttpAsyncClient}. |
||||
*/ |
||||
public HttpComponentsClientHttpConnector() { |
||||
this(HttpAsyncClients.createDefault()); |
||||
} |
||||
|
||||
/** |
||||
* Constructor with a pre-configured {@link CloseableHttpAsyncClient} instance. |
||||
* @param client the client to use |
||||
*/ |
||||
public HttpComponentsClientHttpConnector(CloseableHttpAsyncClient client) { |
||||
this(client, (method, uri) -> HttpClientContext.create()); |
||||
} |
||||
|
||||
/** |
||||
* Constructor with a pre-configured {@link CloseableHttpAsyncClient} instance |
||||
* and a {@link HttpClientContext} supplier lambda which is called before each request |
||||
* and passed to the client. |
||||
* @param client the client to use |
||||
* @param contextProvider a {@link HttpClientContext} supplier |
||||
*/ |
||||
public HttpComponentsClientHttpConnector(CloseableHttpAsyncClient client, |
||||
BiFunction<HttpMethod, URI, ? extends HttpClientContext> contextProvider) { |
||||
|
||||
Assert.notNull(client, "Client must not be null"); |
||||
Assert.notNull(contextProvider, "ContextProvider must not be null"); |
||||
|
||||
this.contextProvider = contextProvider; |
||||
this.client = client; |
||||
this.client.start(); |
||||
} |
||||
|
||||
/** |
||||
* Set the buffer factory to be used. |
||||
*/ |
||||
public void setBufferFactory(DataBufferFactory bufferFactory) { |
||||
this.dataBufferFactory = bufferFactory; |
||||
} |
||||
|
||||
@Override |
||||
public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri, |
||||
Function<? super ClientHttpRequest, Mono<Void>> requestCallback) { |
||||
|
||||
HttpClientContext context = this.contextProvider.apply(method, uri); |
||||
|
||||
if (context.getCookieStore() == null) { |
||||
context.setCookieStore(new BasicCookieStore()); |
||||
} |
||||
|
||||
HttpComponentsClientHttpRequest request = new HttpComponentsClientHttpRequest(method, uri, |
||||
context, this.dataBufferFactory); |
||||
|
||||
return requestCallback.apply(request).then(Mono.defer(() -> execute(request, context))); |
||||
} |
||||
|
||||
private Mono<ClientHttpResponse> execute(HttpComponentsClientHttpRequest request, HttpClientContext context) { |
||||
AsyncRequestProducer requestProducer = request.toRequestProducer(); |
||||
|
||||
return Mono.create(sink -> { |
||||
ReactiveResponseConsumer reactiveResponseConsumer = |
||||
new ReactiveResponseConsumer(new MonoFutureCallbackAdapter(sink, this.dataBufferFactory, context)); |
||||
|
||||
this.client.execute(requestProducer, reactiveResponseConsumer, context, null); |
||||
}); |
||||
} |
||||
|
||||
|
||||
private static class MonoFutureCallbackAdapter |
||||
implements FutureCallback<Message<HttpResponse, Publisher<ByteBuffer>>> { |
||||
|
||||
private final MonoSink<ClientHttpResponse> sink; |
||||
|
||||
private final DataBufferFactory dataBufferFactory; |
||||
|
||||
private final HttpClientContext context; |
||||
|
||||
public MonoFutureCallbackAdapter(MonoSink<ClientHttpResponse> sink, |
||||
DataBufferFactory dataBufferFactory, HttpClientContext context) { |
||||
this.sink = sink; |
||||
this.dataBufferFactory = dataBufferFactory; |
||||
this.context = context; |
||||
} |
||||
|
||||
@Override |
||||
public void completed(Message<HttpResponse, Publisher<ByteBuffer>> result) { |
||||
HttpComponentsClientHttpResponse response = new HttpComponentsClientHttpResponse(this.dataBufferFactory, |
||||
result, this.context); |
||||
this.sink.success(response); |
||||
} |
||||
|
||||
@Override |
||||
public void failed(Exception ex) { |
||||
this.sink.error(ex); |
||||
} |
||||
|
||||
@Override |
||||
public void cancelled() { |
||||
} |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,160 @@
@@ -0,0 +1,160 @@
|
||||
/* |
||||
* Copyright 2002-2020 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.http.client.reactive; |
||||
|
||||
import java.net.URI; |
||||
import java.net.URISyntaxException; |
||||
import java.nio.ByteBuffer; |
||||
import java.util.Collection; |
||||
|
||||
import org.apache.hc.client5.http.cookie.CookieStore; |
||||
import org.apache.hc.client5.http.impl.cookie.BasicClientCookie; |
||||
import org.apache.hc.client5.http.protocol.HttpClientContext; |
||||
import org.apache.hc.core5.http.ContentType; |
||||
import org.apache.hc.core5.http.HttpRequest; |
||||
import org.apache.hc.core5.http.message.BasicHttpRequest; |
||||
import org.apache.hc.core5.http.nio.AsyncRequestProducer; |
||||
import org.apache.hc.core5.http.nio.support.BasicRequestProducer; |
||||
import org.apache.hc.core5.reactive.ReactiveEntityProducer; |
||||
import org.reactivestreams.Publisher; |
||||
import reactor.core.publisher.Flux; |
||||
import reactor.core.publisher.Mono; |
||||
|
||||
import org.springframework.core.io.buffer.DataBuffer; |
||||
import org.springframework.core.io.buffer.DataBufferFactory; |
||||
import org.springframework.http.HttpHeaders; |
||||
import org.springframework.http.HttpMethod; |
||||
import org.springframework.lang.Nullable; |
||||
|
||||
import static org.springframework.http.MediaType.ALL_VALUE; |
||||
|
||||
/** |
||||
* {@link ClientHttpRequest} implementation for the Apache HttpComponents HttpClient 5.x. |
||||
* |
||||
* @author Martin Tarjányi |
||||
* @since 5.3 |
||||
* @see <a href="https://hc.apache.org/index.html">Apache HttpComponents</a> |
||||
*/ |
||||
class HttpComponentsClientHttpRequest extends AbstractClientHttpRequest { |
||||
|
||||
private final HttpRequest httpRequest; |
||||
|
||||
private final DataBufferFactory dataBufferFactory; |
||||
|
||||
private final HttpClientContext context; |
||||
|
||||
@Nullable |
||||
private Flux<ByteBuffer> byteBufferFlux; |
||||
|
||||
|
||||
public HttpComponentsClientHttpRequest(HttpMethod method, URI uri, HttpClientContext context, |
||||
DataBufferFactory dataBufferFactory) { |
||||
|
||||
this.context = context; |
||||
this.httpRequest = new BasicHttpRequest(method.name(), uri); |
||||
this.dataBufferFactory = dataBufferFactory; |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public HttpMethod getMethod() { |
||||
return HttpMethod.resolve(this.httpRequest.getMethod()); |
||||
} |
||||
|
||||
@Override |
||||
public URI getURI() { |
||||
try { |
||||
return this.httpRequest.getUri(); |
||||
} |
||||
catch (URISyntaxException ex) { |
||||
throw new IllegalArgumentException("Invalid URI syntax.", ex); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public DataBufferFactory bufferFactory() { |
||||
return this.dataBufferFactory; |
||||
} |
||||
|
||||
@Override |
||||
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) { |
||||
return doCommit(() -> { |
||||
this.byteBufferFlux = Flux.from(body).map(DataBuffer::asByteBuffer); |
||||
return Mono.empty(); |
||||
}); |
||||
} |
||||
|
||||
@Override |
||||
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) { |
||||
return writeWith(Flux.from(body).flatMap(p -> p)); |
||||
} |
||||
|
||||
@Override |
||||
public Mono<Void> setComplete() { |
||||
return doCommit(); |
||||
} |
||||
|
||||
@Override |
||||
protected void applyHeaders() { |
||||
HttpHeaders headers = getHeaders(); |
||||
|
||||
headers.entrySet() |
||||
.stream() |
||||
.filter(entry -> !HttpHeaders.CONTENT_LENGTH.equals(entry.getKey())) |
||||
.forEach(entry -> entry.getValue().forEach(v -> this.httpRequest.addHeader(entry.getKey(), v))); |
||||
|
||||
if (!this.httpRequest.containsHeader(HttpHeaders.ACCEPT)) { |
||||
this.httpRequest.addHeader(HttpHeaders.ACCEPT, ALL_VALUE); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
protected void applyCookies() { |
||||
if (getCookies().isEmpty()) { |
||||
return; |
||||
} |
||||
|
||||
CookieStore cookieStore = this.context.getCookieStore(); |
||||
|
||||
getCookies().values() |
||||
.stream() |
||||
.flatMap(Collection::stream) |
||||
.forEach(cookie -> { |
||||
BasicClientCookie clientCookie = new BasicClientCookie(cookie.getName(), cookie.getValue()); |
||||
clientCookie.setDomain(getURI().getHost()); |
||||
clientCookie.setPath(getURI().getPath()); |
||||
cookieStore.addCookie(clientCookie); |
||||
}); |
||||
} |
||||
|
||||
public AsyncRequestProducer toRequestProducer() { |
||||
ReactiveEntityProducer reactiveEntityProducer = null; |
||||
|
||||
if (this.byteBufferFlux != null) { |
||||
String contentEncoding = getHeaders().getFirst(HttpHeaders.CONTENT_ENCODING); |
||||
ContentType contentType = null; |
||||
if (getHeaders().getContentType() != null) { |
||||
contentType = ContentType.parse(getHeaders().getContentType().toString()); |
||||
} |
||||
reactiveEntityProducer = new ReactiveEntityProducer(this.byteBufferFlux, getHeaders().getContentLength(), |
||||
contentType, contentEncoding); |
||||
} |
||||
|
||||
return new BasicRequestProducer(this.httpRequest, reactiveEntityProducer); |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,114 @@
@@ -0,0 +1,114 @@
|
||||
/* |
||||
* Copyright 2002-2020 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.http.client.reactive; |
||||
|
||||
import java.nio.ByteBuffer; |
||||
import java.util.Arrays; |
||||
import java.util.concurrent.atomic.AtomicBoolean; |
||||
|
||||
import org.apache.hc.client5.http.cookie.Cookie; |
||||
import org.apache.hc.client5.http.protocol.HttpClientContext; |
||||
import org.apache.hc.core5.http.HttpResponse; |
||||
import org.apache.hc.core5.http.Message; |
||||
import org.reactivestreams.Publisher; |
||||
import reactor.core.publisher.Flux; |
||||
|
||||
import org.springframework.core.io.buffer.DataBuffer; |
||||
import org.springframework.core.io.buffer.DataBufferFactory; |
||||
import org.springframework.http.HttpHeaders; |
||||
import org.springframework.http.HttpStatus; |
||||
import org.springframework.http.ResponseCookie; |
||||
import org.springframework.util.LinkedMultiValueMap; |
||||
import org.springframework.util.MultiValueMap; |
||||
|
||||
/** |
||||
* {@link ClientHttpResponse} implementation for the Apache HttpComponents HttpClient 5.x. |
||||
* |
||||
* @author Martin Tarjányi |
||||
* @since 5.3 |
||||
* @see <a href="https://hc.apache.org/index.html">Apache HttpComponents</a> |
||||
*/ |
||||
class HttpComponentsClientHttpResponse implements ClientHttpResponse { |
||||
|
||||
private final DataBufferFactory dataBufferFactory; |
||||
|
||||
private final Message<HttpResponse, Publisher<ByteBuffer>> message; |
||||
|
||||
private final HttpClientContext context; |
||||
|
||||
private final AtomicBoolean rejectSubscribers = new AtomicBoolean(); |
||||
|
||||
|
||||
public HttpComponentsClientHttpResponse(DataBufferFactory dataBufferFactory, |
||||
Message<HttpResponse, Publisher<ByteBuffer>> message, |
||||
HttpClientContext context) { |
||||
|
||||
this.dataBufferFactory = dataBufferFactory; |
||||
this.message = message; |
||||
this.context = context; |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public HttpStatus getStatusCode() { |
||||
return HttpStatus.valueOf(this.message.getHead().getCode()); |
||||
} |
||||
|
||||
@Override |
||||
public int getRawStatusCode() { |
||||
return this.message.getHead().getCode(); |
||||
} |
||||
|
||||
@Override |
||||
public MultiValueMap<String, ResponseCookie> getCookies() { |
||||
LinkedMultiValueMap<String, ResponseCookie> result = new LinkedMultiValueMap<>(); |
||||
this.context.getCookieStore().getCookies().forEach(cookie -> |
||||
result.add(cookie.getName(), ResponseCookie.fromClientResponse(cookie.getName(), cookie.getValue()) |
||||
.domain(cookie.getDomain()) |
||||
.path(cookie.getPath()) |
||||
.maxAge(getMaxAgeSeconds(cookie)) |
||||
.secure(cookie.isSecure()) |
||||
.httpOnly(cookie.containsAttribute("httponly")) |
||||
.build())); |
||||
return result; |
||||
} |
||||
|
||||
private long getMaxAgeSeconds(Cookie cookie) { |
||||
String maxAgeAttribute = cookie.getAttribute(Cookie.MAX_AGE_ATTR); |
||||
|
||||
return maxAgeAttribute == null ? -1 : Long.parseLong(maxAgeAttribute); |
||||
} |
||||
|
||||
@Override |
||||
public Flux<DataBuffer> getBody() { |
||||
return Flux.from(this.message.getBody()) |
||||
.doOnSubscribe(s -> { |
||||
if (!this.rejectSubscribers.compareAndSet(false, true)) { |
||||
throw new IllegalStateException("The client response body can only be consumed once."); |
||||
} |
||||
}) |
||||
.map(this.dataBufferFactory::wrap); |
||||
} |
||||
|
||||
@Override |
||||
public HttpHeaders getHeaders() { |
||||
return Arrays.stream(this.message.getHead().getHeaders()) |
||||
.collect(HttpHeaders::new, |
||||
(httpHeaders, header) -> httpHeaders.add(header.getName(), header.getValue()), |
||||
HttpHeaders::putAll); |
||||
} |
||||
} |
||||
Loading…
Reference in new issue