Browse Source

Zero-copy support

This commit introduces support for zero-copy file transfers in the HTTP
response, through the ZeroCopyHttpOutputMessage subinterface of
ReactiveHttpOutputMessage.
pull/1111/head
Arjen Poutsma 10 years ago
parent
commit
451e296a78
  1. 42
      spring-web-reactive/src/main/java/org/springframework/http/ZeroCopyHttpOutputMessage.java
  2. 2
      spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java
  3. 13
      spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java
  4. 31
      spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java
  5. 12
      spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java
  6. 34
      spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java
  7. 98
      spring-web-reactive/src/test/java/org/springframework/http/server/reactive/ZeroCopyIntegrationTests.java
  8. BIN
      spring-web-reactive/src/test/resources/org/springframework/http/server/reactive/spring.png

42
spring-web-reactive/src/main/java/org/springframework/http/ZeroCopyHttpOutputMessage.java

@ -0,0 +1,42 @@ @@ -0,0 +1,42 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http;
import java.io.File;
import reactor.core.publisher.Mono;
/**
* Sub-interface of {@code ReactiveOutputMessage} that has support for "zero-copy"
* file transfers.
*
* @author Arjen Poutsma
* @see <a href="https://en.wikipedia.org/wiki/Zero-copy">Zero-copy</a>
*/
public interface ZeroCopyHttpOutputMessage extends ReactiveHttpOutputMessage {
/**
* Set the body of the message to the given {@link File} which will be
* used to write to the underlying HTTP layer.
* @param file the file to transfer
* @param position the position within the file from which the transfer is to begin
* @param count the number of bytes to be transferred
* @return a publisher that indicates completion or error.
*/
Mono<Void> setBody(File file, long position, long count);
}

2
spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java

@ -94,7 +94,7 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse { @@ -94,7 +94,7 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
applyBeforeCommit().after(() -> setBodyInternal(writePublisher)));
}
private Mono<Void> applyBeforeCommit() {
protected Mono<Void> applyBeforeCommit() {
Mono<Void> mono = Mono.empty();
if (this.state.compareAndSet(STATE_NEW, STATE_COMMITTING)) {
for (Supplier<? extends Mono<Void>> action : this.beforeCommitActions) {

13
spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java

@ -16,6 +16,8 @@ @@ -16,6 +16,8 @@
package org.springframework.http.server.reactive;
import java.io.File;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpResponseStatus;
@ -31,6 +33,7 @@ import org.springframework.core.io.buffer.DataBufferAllocator; @@ -31,6 +33,7 @@ import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.core.io.buffer.NettyDataBuffer;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseCookie;
import org.springframework.http.ZeroCopyHttpOutputMessage;
import org.springframework.util.Assert;
/**
@ -39,7 +42,8 @@ import org.springframework.util.Assert; @@ -39,7 +42,8 @@ import org.springframework.util.Assert;
* @author Stephane Maldini
* @author Rossen Stoyanchev
*/
public class ReactorServerHttpResponse extends AbstractServerHttpResponse {
public class ReactorServerHttpResponse extends AbstractServerHttpResponse
implements ZeroCopyHttpOutputMessage {
private final HttpChannel channel;
@ -99,4 +103,11 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse { @@ -99,4 +103,11 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse {
return Unpooled.wrappedBuffer(buffer.asByteBuffer());
}
}
@Override
public Mono<Void> setBody(File file, long position, long count) {
return applyBeforeCommit().after(() -> {
return this.channel.sendFile(file, position, count);
});
}
}

31
spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java

@ -104,4 +104,35 @@ public class RxNettyServerHttpResponse extends AbstractServerHttpResponse { @@ -104,4 +104,35 @@ public class RxNettyServerHttpResponse extends AbstractServerHttpResponse {
}
}
/*
While the underlying implementation of {@link ZeroCopyHttpOutputMessage} seems to
work; it does bypass {@link #applyBeforeCommit} and more importantly it doesn't change
its {@linkplain #state()). Therefore it's commented out, for now.
We should revisit this code once
https://github.com/ReactiveX/RxNetty/issues/194 has been fixed.
@Override
public Mono<Void> setBody(File file, long position, long count) {
Channel channel = this.response.unsafeNettyChannel();
HttpResponse httpResponse =
new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
io.netty.handler.codec.http.HttpHeaders headers = httpResponse.headers();
for (Map.Entry<String, List<String>> header : getHeaders().entrySet()) {
String headerName = header.getKey();
for (String headerValue : header.getValue()) {
headers.add(headerName, headerValue);
}
}
Mono<Void> responseWrite = MonoChannelFuture.from(channel.write(httpResponse));
FileRegion fileRegion = new DefaultFileRegion(file, position, count);
Mono<Void> fileWrite = MonoChannelFuture.from(channel.writeAndFlush(fileRegion));
return Flux.concat(applyBeforeCommit(), responseWrite, fileWrite).after();
}
*/
}

12
spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java

@ -67,9 +67,12 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle @@ -67,9 +67,12 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
requestBody.registerListener();
ServerHttpRequest request = new UndertowServerHttpRequest(exchange, requestBody);
ResponseBodySubscriber responseBody = new ResponseBodySubscriber(exchange);
StreamSinkChannel responseChannel = exchange.getResponseChannel();
ResponseBodySubscriber responseBody =
new ResponseBodySubscriber(exchange, responseChannel);
responseBody.registerListener();
ServerHttpResponse response = new UndertowServerHttpResponse(exchange,
ServerHttpResponse response =
new UndertowServerHttpResponse(exchange, responseChannel,
publisher -> Mono.from(subscriber -> publisher.subscribe(responseBody)),
allocator);
@ -202,9 +205,10 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle @@ -202,9 +205,10 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
private Subscription subscription;
public ResponseBodySubscriber(HttpServerExchange exchange) {
public ResponseBodySubscriber(HttpServerExchange exchange,
StreamSinkChannel responseChannel) {
this.exchange = exchange;
this.responseChannel = exchange.getResponseChannel();
this.responseChannel = responseChannel;
}
public void registerListener() {

34
spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java

@ -16,6 +16,10 @@ @@ -16,6 +16,10 @@
package org.springframework.http.server.reactive;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
@ -25,12 +29,14 @@ import io.undertow.server.handlers.Cookie; @@ -25,12 +29,14 @@ import io.undertow.server.handlers.Cookie;
import io.undertow.server.handlers.CookieImpl;
import io.undertow.util.HttpString;
import org.reactivestreams.Publisher;
import org.xnio.channels.StreamSinkChannel;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseCookie;
import org.springframework.http.ZeroCopyHttpOutputMessage;
import org.springframework.util.Assert;
/**
@ -39,19 +45,25 @@ import org.springframework.util.Assert; @@ -39,19 +45,25 @@ import org.springframework.util.Assert;
* @author Marek Hawrylczak
* @author Rossen Stoyanchev
*/
public class UndertowServerHttpResponse extends AbstractServerHttpResponse {
public class UndertowServerHttpResponse extends AbstractServerHttpResponse
implements ZeroCopyHttpOutputMessage {
private final HttpServerExchange exchange;
private final StreamSinkChannel responseChannel;
private final Function<Publisher<DataBuffer>, Mono<Void>> responseBodyWriter;
public UndertowServerHttpResponse(HttpServerExchange exchange,
StreamSinkChannel responseChannel,
Function<Publisher<DataBuffer>, Mono<Void>> responseBodyWriter,
DataBufferAllocator allocator) {
super(allocator);
Assert.notNull(exchange, "'exchange' is required.");
Assert.notNull(responseChannel, "'responseChannel' must not be null");
Assert.notNull(responseBodyWriter, "'responseBodyWriter' must not be null");
this.exchange = exchange;
this.responseChannel = responseChannel;
this.responseBodyWriter = responseBodyWriter;
}
@ -71,6 +83,26 @@ public class UndertowServerHttpResponse extends AbstractServerHttpResponse { @@ -71,6 +83,26 @@ public class UndertowServerHttpResponse extends AbstractServerHttpResponse {
return this.responseBodyWriter.apply(publisher);
}
@Override
public Mono<Void> setBody(File file, long position, long count) {
writeHeaders();
writeCookies();
try {
FileChannel in = new FileInputStream(file).getChannel();
long result = this.responseChannel.transferFrom(in, position, count);
if (result < count) {
return Mono.error(new IOException("Could only write " + result +
" out of " + count + " bytes"));
}
else {
return Mono.empty();
}
}
catch (IOException ex) {
return Mono.error(ex);
}
}
@Override
protected void writeHeaders() {
for (Map.Entry<String, List<String>> entry : getHeaders().entrySet()) {

98
spring-web-reactive/src/test/java/org/springframework/http/server/reactive/ZeroCopyIntegrationTests.java

@ -0,0 +1,98 @@ @@ -0,0 +1,98 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.server.reactive;
import java.io.File;
import java.net.URI;
import org.junit.Test;
import reactor.core.publisher.Mono;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.http.MediaType;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.http.ZeroCopyHttpOutputMessage;
import org.springframework.http.server.reactive.boot.ReactorHttpServer;
import org.springframework.http.server.reactive.boot.UndertowHttpServer;
import org.springframework.web.client.RestTemplate;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
/**
* @author Arjen Poutsma
*/
public class ZeroCopyIntegrationTests extends AbstractHttpHandlerIntegrationTests {
private final ZeroCopyHandler handler = new ZeroCopyHandler();
@Override
protected HttpHandler createHttpHandler() {
return handler;
}
@Test
public void zeroCopy() throws Exception {
// Zero-copy only does not support servlet
assumeTrue(server instanceof ReactorHttpServer ||
server instanceof UndertowHttpServer);
RestTemplate restTemplate = new RestTemplate();
RequestEntity request =
RequestEntity.get(new URI("http://localhost:" + port)).build();
ResponseEntity<byte[]> response = restTemplate.exchange(request, byte[].class);
Resource logo =
new ClassPathResource("spring.png", ZeroCopyIntegrationTests.class);
assertTrue(response.hasBody());
assertEquals(logo.contentLength(), response.getHeaders().getContentLength());
assertEquals(logo.contentLength(), response.getBody().length);
assertEquals(MediaType.IMAGE_PNG, response.getHeaders().getContentType());
}
private static class ZeroCopyHandler implements HttpHandler {
@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
try {
ZeroCopyHttpOutputMessage zeroCopyResponse =
(ZeroCopyHttpOutputMessage) response;
Resource logo = new ClassPathResource("spring.png",
ZeroCopyIntegrationTests.class);
File logoFile = logo.getFile();
zeroCopyResponse.getHeaders().setContentType(MediaType.IMAGE_PNG);
zeroCopyResponse.getHeaders().setContentLength(logoFile.length());
return zeroCopyResponse.setBody(logoFile, 0, logoFile.length());
}
catch (Throwable ex) {
return Mono.error(ex);
}
}
}
}

BIN
spring-web-reactive/src/test/resources/org/springframework/http/server/reactive/spring.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 951 B

Loading…
Cancel
Save