Browse Source

Request streaming for Apache HttpClient

- Added org.springframework.http.StreamingHttpOutputMessage, which
  allows for a settable request body (as opposed to an output stream).

- Added http.client.HttpComponentsStreamingClientHttpRequest, which
  implements the above mentioned interface, mapping setBody() to a
  setEntity() call on the Apache HttpClient HttpEntityEnclosingRequest.

- Added a 'bufferRequestBody' property to the
  HttpComponentsClientHttpRequestFactory. When this property is set to
  false (default is true), we return a
  HttpComponentsStreamingClientHttpRequest instead of a (request
  buffering) HttpComponentsClientHttpRequest.

Issue: SPR-10728
pull/315/head
Arjen Poutsma 13 years ago committed by Rossen Stoyanchev
parent
commit
14ab2c88cc
  1. 54
      spring-web/src/main/java/org/springframework/http/StreamingHttpOutputMessage.java
  2. 13
      spring-web/src/main/java/org/springframework/http/client/AbstractClientHttpRequest.java
  3. 31
      spring-web/src/main/java/org/springframework/http/client/HttpComponentsClientHttpRequest.java
  4. 22
      spring-web/src/main/java/org/springframework/http/client/HttpComponentsClientHttpRequestFactory.java
  5. 168
      spring-web/src/main/java/org/springframework/http/client/HttpComponentsStreamingClientHttpRequest.java
  6. 41
      spring-web/src/main/java/org/springframework/http/converter/AbstractHttpMessageConverter.java
  7. 36
      spring-web/src/test/java/org/springframework/http/client/AbstractHttpRequestFactoryTestCase.java
  8. 40
      spring-web/src/test/java/org/springframework/http/client/StreamingHttpComponentsClientHttpRequestFactoryTests.java

54
spring-web/src/main/java/org/springframework/http/StreamingHttpOutputMessage.java

@ -0,0 +1,54 @@ @@ -0,0 +1,54 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http;
import java.io.IOException;
import java.io.OutputStream;
/**
* Represents a HTTP output message that allows for setting a streaming body.
*
* @author Arjen Poutsma
* @since 4.0
*/
public interface StreamingHttpOutputMessage extends HttpOutputMessage {
/**
* Sets the streaming body for this message.
*
* @param body the streaming body
*/
void setBody(Body body);
/**
* Defines the contract for bodies that can be written directly to a
* {@link OuputStream}. It is useful with HTTP client libraries that provide indirect
* access to an {@link OutputStream} via a callback mechanism.
*/
public interface Body {
/**
* Writes this body to the given {@link OuputStream}.
*
* @param outputStream the output stream to write to
* @throws IOException in case of errors
*/
void writeTo(OutputStream outputStream) throws IOException;
}
}

13
spring-web/src/main/java/org/springframework/http/client/AbstractClientHttpRequest.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2012 the original author or authors.
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -43,7 +43,7 @@ public abstract class AbstractClientHttpRequest implements ClientHttpRequest { @@ -43,7 +43,7 @@ public abstract class AbstractClientHttpRequest implements ClientHttpRequest {
@Override
public final OutputStream getBody() throws IOException {
checkExecuted();
assertNotExecuted();
return getBodyInternal(this.headers);
}
@ -55,13 +55,18 @@ public abstract class AbstractClientHttpRequest implements ClientHttpRequest { @@ -55,13 +55,18 @@ public abstract class AbstractClientHttpRequest implements ClientHttpRequest {
@Override
public final ClientHttpResponse execute() throws IOException {
checkExecuted();
assertNotExecuted();
ClientHttpResponse result = executeInternal(this.headers);
this.executed = true;
return result;
}
private void checkExecuted() {
/**
* Asserts that this request has not been {@linkplain #execute() executed} yet.
*
* @throws IllegalStateException if this request has been executed
*/
protected void assertNotExecuted() {
Assert.state(!this.executed, "ClientHttpRequest already executed");
}

31
spring-web/src/main/java/org/springframework/http/client/HttpComponentsClientHttpRequest.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2012 the original author or authors.
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -73,22 +73,35 @@ final class HttpComponentsClientHttpRequest extends AbstractBufferingClientHttpR @@ -73,22 +73,35 @@ final class HttpComponentsClientHttpRequest extends AbstractBufferingClientHttpR
@Override
protected ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException {
addHeaders(this.httpRequest, headers);
if (this.httpRequest instanceof HttpEntityEnclosingRequest) {
HttpEntityEnclosingRequest entityEnclosingRequest =
(HttpEntityEnclosingRequest) this.httpRequest;
HttpEntity requestEntity = new ByteArrayEntity(bufferedOutput);
entityEnclosingRequest.setEntity(requestEntity);
}
HttpResponse httpResponse =
this.httpClient.execute(this.httpRequest, this.httpContext);
return new HttpComponentsClientHttpResponse(httpResponse);
}
/**
* Adds the given headers to the given HTTP request.
*
* @param httpRequest the request to add the headers to
* @param headers the headers to add
*/
static void addHeaders(HttpUriRequest httpRequest, HttpHeaders headers) {
for (Map.Entry<String, List<String>> entry : headers.entrySet()) {
String headerName = entry.getKey();
if (!headerName.equalsIgnoreCase(HTTP.CONTENT_LEN) &&
!headerName.equalsIgnoreCase(HTTP.TRANSFER_ENCODING)) {
for (String headerValue : entry.getValue()) {
this.httpRequest.addHeader(headerName, headerValue);
httpRequest.addHeader(headerName, headerValue);
}
}
}
if (this.httpRequest instanceof HttpEntityEnclosingRequest) {
HttpEntityEnclosingRequest entityEnclosingRequest = (HttpEntityEnclosingRequest) this.httpRequest;
HttpEntity requestEntity = new ByteArrayEntity(bufferedOutput);
entityEnclosingRequest.setEntity(requestEntity);
}
HttpResponse httpResponse = this.httpClient.execute(this.httpRequest, this.httpContext);
return new HttpComponentsClientHttpResponse(httpResponse);
}
}

22
spring-web/src/main/java/org/springframework/http/client/HttpComponentsClientHttpRequestFactory.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2012 the original author or authors.
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -64,6 +64,7 @@ public class HttpComponentsClientHttpRequestFactory implements ClientHttpRequest @@ -64,6 +64,7 @@ public class HttpComponentsClientHttpRequestFactory implements ClientHttpRequest
private HttpClient httpClient;
private boolean bufferRequestBody = true;
/**
* Create a new instance of the HttpComponentsClientHttpRequestFactory with a default
@ -128,11 +129,28 @@ public class HttpComponentsClientHttpRequestFactory implements ClientHttpRequest @@ -128,11 +129,28 @@ public class HttpComponentsClientHttpRequestFactory implements ClientHttpRequest
getHttpClient().getParams().setIntParameter(CoreConnectionPNames.SO_TIMEOUT, timeout);
}
/**
* Indicates whether this request factory should buffer the request body internally.
*
* <p>Default is {@code true}. When sending large amounts of data via POST or PUT, it is
* recommended to change this property to {@code false}, so as not to run out of memory.
*/
public void setBufferRequestBody(boolean bufferRequestBody) {
this.bufferRequestBody = bufferRequestBody;
}
@Override
public ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod) throws IOException {
HttpUriRequest httpRequest = createHttpUriRequest(httpMethod, uri);
postProcessHttpRequest(httpRequest);
return new HttpComponentsClientHttpRequest(getHttpClient(), httpRequest, createHttpContext(httpMethod, uri));
if (bufferRequestBody) {
return new HttpComponentsClientHttpRequest(getHttpClient(), httpRequest,
createHttpContext(httpMethod, uri));
}
else {
return new HttpComponentsStreamingClientHttpRequest(getHttpClient(),
httpRequest, createHttpContext(httpMethod, uri));
}
}
/**

168
spring-web/src/main/java/org/springframework/http/client/HttpComponentsStreamingClientHttpRequest.java

@ -0,0 +1,168 @@ @@ -0,0 +1,168 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.client;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.message.BasicHeader;
import org.apache.http.protocol.HttpContext;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.StreamingHttpOutputMessage;
/**
* {@link ClientHttpRequest} implementation that uses Apache HttpComponents HttpClient to
* execute requests.
*
* <p>Created via the {@link org.springframework.http.client.HttpComponentsClientHttpRequestFactory}.
*
* @author Arjen Poutsma
* @see org.springframework.http.client.HttpComponentsClientHttpRequestFactory#createRequest(java.net.URI,
* org.springframework.http.HttpMethod)
* @since 4.0
*/
final class HttpComponentsStreamingClientHttpRequest extends AbstractClientHttpRequest
implements StreamingHttpOutputMessage {
private final HttpClient httpClient;
private final HttpUriRequest httpRequest;
private final HttpContext httpContext;
private Body body;
public HttpComponentsStreamingClientHttpRequest(HttpClient httpClient,
HttpUriRequest httpRequest, HttpContext httpContext) {
this.httpClient = httpClient;
this.httpRequest = httpRequest;
this.httpContext = httpContext;
}
@Override
public HttpMethod getMethod() {
return HttpMethod.valueOf(this.httpRequest.getMethod());
}
@Override
public URI getURI() {
return this.httpRequest.getURI();
}
@Override
public void setBody(Body body) {
assertNotExecuted();
this.body = body;
}
@Override
protected OutputStream getBodyInternal(HttpHeaders headers) throws IOException {
throw new UnsupportedOperationException(
"getBody not supported when bufferRequestBody is false");
}
@Override
protected ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException {
HttpComponentsClientHttpRequest.addHeaders(this.httpRequest, headers);
if (this.httpRequest instanceof HttpEntityEnclosingRequest && body != null) {
HttpEntityEnclosingRequest entityEnclosingRequest =
(HttpEntityEnclosingRequest) this.httpRequest;
HttpEntity requestEntity = new StreamingHttpEntity(getHeaders(), body);
entityEnclosingRequest.setEntity(requestEntity);
}
HttpResponse httpResponse =
this.httpClient.execute(this.httpRequest, this.httpContext);
return new HttpComponentsClientHttpResponse(httpResponse);
}
private static class StreamingHttpEntity implements HttpEntity {
private final HttpHeaders headers;
private final StreamingHttpOutputMessage.Body body;
private StreamingHttpEntity(HttpHeaders headers,
StreamingHttpOutputMessage.Body body) {
this.headers = headers;
this.body = body;
}
@Override
public boolean isRepeatable() {
return false;
}
@Override
public boolean isChunked() {
return false;
}
@Override
public long getContentLength() {
return headers.getContentLength();
}
@Override
public Header getContentType() {
MediaType contentType = headers.getContentType();
return contentType != null ?
new BasicHeader("Content-Type", contentType.toString()) : null;
}
@Override
public Header getContentEncoding() {
String contentEncoding = headers.getFirst("Content-Encoding");
return contentEncoding != null ?
new BasicHeader("Content-Encoding", contentEncoding) : null;
}
@Override
public InputStream getContent() throws IOException, IllegalStateException {
throw new IllegalStateException();
}
@Override
public void writeTo(OutputStream outputStream) throws IOException {
body.writeTo(outputStream);
}
@Override
public boolean isStreaming() {
return true;
}
@Override
public void consumeContent() throws IOException {
throw new UnsupportedOperationException();
}
}
}

41
spring-web/src/main/java/org/springframework/http/converter/AbstractHttpMessageConverter.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2012 the original author or authors.
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package org.springframework.http.converter;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -25,10 +26,12 @@ import java.util.List; @@ -25,10 +26,12 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.http.Cookies;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpInputMessage;
import org.springframework.http.HttpOutputMessage;
import org.springframework.http.MediaType;
import org.springframework.http.StreamingHttpOutputMessage;
import org.springframework.util.Assert;
/**
@ -163,10 +166,10 @@ public abstract class AbstractHttpMessageConverter<T> implements HttpMessageConv @@ -163,10 +166,10 @@ public abstract class AbstractHttpMessageConverter<T> implements HttpMessageConv
* on the output message. It then calls {@link #writeInternal}.
*/
@Override
public final void write(T t, MediaType contentType, HttpOutputMessage outputMessage)
public final void write(final T t, MediaType contentType, HttpOutputMessage outputMessage)
throws IOException, HttpMessageNotWritableException {
HttpHeaders headers = outputMessage.getHeaders();
final HttpHeaders headers = outputMessage.getHeaders();
if (headers.getContentType() == null) {
if (contentType == null || contentType.isWildcardType() || contentType.isWildcardSubtype()) {
contentType = getDefaultContentType(t);
@ -181,8 +184,36 @@ public abstract class AbstractHttpMessageConverter<T> implements HttpMessageConv @@ -181,8 +184,36 @@ public abstract class AbstractHttpMessageConverter<T> implements HttpMessageConv
headers.setContentLength(contentLength);
}
}
writeInternal(t, outputMessage);
outputMessage.getBody().flush();
if (outputMessage instanceof StreamingHttpOutputMessage) {
StreamingHttpOutputMessage streamingOutputMessage =
(StreamingHttpOutputMessage) outputMessage;
streamingOutputMessage.setBody(new StreamingHttpOutputMessage.Body() {
@Override
public void writeTo(final OutputStream outputStream) throws IOException {
writeInternal(t, new HttpOutputMessage() {
@Override
public OutputStream getBody() throws IOException {
return outputStream;
}
@Override
public HttpHeaders getHeaders() {
return headers;
}
@Override
public Cookies getCookies() {
return null;
}
});
}
});
}
else {
writeInternal(t, outputMessage);
outputMessage.getBody().flush();
}
}
/**

36
spring-web/src/test/java/org/springframework/http/client/AbstractHttpRequestFactoryTestCase.java

@ -18,6 +18,7 @@ package org.springframework.http.client; @@ -18,6 +18,7 @@ package org.springframework.http.client;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.Arrays;
import java.util.Enumeration;
@ -40,8 +41,10 @@ import org.junit.BeforeClass; @@ -40,8 +41,10 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.StreamingHttpOutputMessage;
import org.springframework.util.FileCopyUtils;
import org.springframework.util.SocketUtils;
import org.springframework.util.StreamUtils;
import static org.junit.Assert.*;
@ -111,9 +114,21 @@ public abstract class AbstractHttpRequestFactoryTestCase { @@ -111,9 +114,21 @@ public abstract class AbstractHttpRequestFactoryTestCase {
request.getHeaders().add(headerName, headerValue1);
String headerValue2 = "value2";
request.getHeaders().add(headerName, headerValue2);
byte[] body = "Hello World".getBytes("UTF-8");
final byte[] body = "Hello World".getBytes("UTF-8");
request.getHeaders().setContentLength(body.length);
FileCopyUtils.copy(body, request.getBody());
if (request instanceof StreamingHttpOutputMessage) {
StreamingHttpOutputMessage streamingRequest =
(StreamingHttpOutputMessage) request;
streamingRequest.setBody(new StreamingHttpOutputMessage.Body() {
@Override
public void writeTo(OutputStream outputStream) throws IOException {
StreamUtils.copy(body, outputStream);
}
});
}
else {
StreamUtils.copy(body, request.getBody());
}
ClientHttpResponse response = request.execute();
try {
assertEquals("Invalid status code", HttpStatus.OK, response.getStatusCode());
@ -131,8 +146,21 @@ public abstract class AbstractHttpRequestFactoryTestCase { @@ -131,8 +146,21 @@ public abstract class AbstractHttpRequestFactoryTestCase {
@Test(expected = IllegalStateException.class)
public void multipleWrites() throws Exception {
ClientHttpRequest request = factory.createRequest(new URI(baseUrl + "/echo"), HttpMethod.POST);
byte[] body = "Hello World".getBytes("UTF-8");
FileCopyUtils.copy(body, request.getBody());
final byte[] body = "Hello World".getBytes("UTF-8");
if (request instanceof StreamingHttpOutputMessage) {
StreamingHttpOutputMessage streamingRequest =
(StreamingHttpOutputMessage) request;
streamingRequest.setBody(new StreamingHttpOutputMessage.Body() {
@Override
public void writeTo(OutputStream outputStream) throws IOException {
StreamUtils.copy(body, outputStream);
}
});
}
else {
StreamUtils.copy(body, request.getBody());
}
ClientHttpResponse response = request.execute();
try {
FileCopyUtils.copy(body, request.getBody());

40
spring-web/src/test/java/org/springframework/http/client/StreamingHttpComponentsClientHttpRequestFactoryTests.java

@ -0,0 +1,40 @@ @@ -0,0 +1,40 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.client;
import org.junit.Test;
import org.springframework.http.HttpMethod;
public class StreamingHttpComponentsClientHttpRequestFactoryTests
extends AbstractHttpRequestFactoryTestCase {
@Override
protected ClientHttpRequestFactory createRequestFactory() {
HttpComponentsClientHttpRequestFactory requestFactory =
new HttpComponentsClientHttpRequestFactory();
requestFactory.setBufferRequestBody(false);
return requestFactory;
}
@Override
@Test
public void httpMethods() throws Exception {
assertHttpMethod("patch", HttpMethod.PATCH);
}
}
Loading…
Cancel
Save