From 451e296a78615430ff6ee857c95a285990020f4a Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Wed, 20 Apr 2016 13:37:23 +0200 Subject: [PATCH] Zero-copy support This commit introduces support for zero-copy file transfers in the HTTP response, through the ZeroCopyHttpOutputMessage subinterface of ReactiveHttpOutputMessage. --- .../http/ZeroCopyHttpOutputMessage.java | 42 ++++++++ .../reactive/AbstractServerHttpResponse.java | 2 +- .../reactive/ReactorServerHttpResponse.java | 13 ++- .../reactive/RxNettyServerHttpResponse.java | 31 ++++++ .../reactive/UndertowHttpHandlerAdapter.java | 12 ++- .../reactive/UndertowServerHttpResponse.java | 34 +++++- .../reactive/ZeroCopyIntegrationTests.java | 98 ++++++++++++++++++ .../http/server/reactive/spring.png | Bin 0 -> 951 bytes 8 files changed, 225 insertions(+), 7 deletions(-) create mode 100644 spring-web-reactive/src/main/java/org/springframework/http/ZeroCopyHttpOutputMessage.java create mode 100644 spring-web-reactive/src/test/java/org/springframework/http/server/reactive/ZeroCopyIntegrationTests.java create mode 100644 spring-web-reactive/src/test/resources/org/springframework/http/server/reactive/spring.png diff --git a/spring-web-reactive/src/main/java/org/springframework/http/ZeroCopyHttpOutputMessage.java b/spring-web-reactive/src/main/java/org/springframework/http/ZeroCopyHttpOutputMessage.java new file mode 100644 index 00000000000..9499c8d81f3 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/http/ZeroCopyHttpOutputMessage.java @@ -0,0 +1,42 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http; + +import java.io.File; + +import reactor.core.publisher.Mono; + +/** + * Sub-interface of {@code ReactiveOutputMessage} that has support for "zero-copy" + * file transfers. + * + * @author Arjen Poutsma + * @see Zero-copy + */ +public interface ZeroCopyHttpOutputMessage extends ReactiveHttpOutputMessage { + + /** + * Set the body of the message to the given {@link File} which will be + * used to write to the underlying HTTP layer. + * @param file the file to transfer + * @param position the position within the file from which the transfer is to begin + * @param count the number of bytes to be transferred + * @return a publisher that indicates completion or error. + */ + Mono setBody(File file, long position, long count); + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java index edf2a51a239..22041600e8c 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java @@ -94,7 +94,7 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse { applyBeforeCommit().after(() -> setBodyInternal(writePublisher))); } - private Mono applyBeforeCommit() { + protected Mono applyBeforeCommit() { Mono mono = Mono.empty(); if (this.state.compareAndSet(STATE_NEW, STATE_COMMITTING)) { for (Supplier> action : this.beforeCommitActions) { diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java index 8eabb393142..98dbe3f7aaa 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java @@ -16,6 +16,8 @@ package org.springframework.http.server.reactive; +import java.io.File; + import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.HttpResponseStatus; @@ -31,6 +33,7 @@ import org.springframework.core.io.buffer.DataBufferAllocator; import org.springframework.core.io.buffer.NettyDataBuffer; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseCookie; +import org.springframework.http.ZeroCopyHttpOutputMessage; import org.springframework.util.Assert; /** @@ -39,7 +42,8 @@ import org.springframework.util.Assert; * @author Stephane Maldini * @author Rossen Stoyanchev */ -public class ReactorServerHttpResponse extends AbstractServerHttpResponse { +public class ReactorServerHttpResponse extends AbstractServerHttpResponse + implements ZeroCopyHttpOutputMessage { private final HttpChannel channel; @@ -99,4 +103,11 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse { return Unpooled.wrappedBuffer(buffer.asByteBuffer()); } } + + @Override + public Mono setBody(File file, long position, long count) { + return applyBeforeCommit().after(() -> { + return this.channel.sendFile(file, position, count); + }); + } } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java index a45c2b1b17b..f22b7944338 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java @@ -104,4 +104,35 @@ public class RxNettyServerHttpResponse extends AbstractServerHttpResponse { } } +/* + While the underlying implementation of {@link ZeroCopyHttpOutputMessage} seems to + work; it does bypass {@link #applyBeforeCommit} and more importantly it doesn't change + its {@linkplain #state()). Therefore it's commented out, for now. + + We should revisit this code once + https://github.com/ReactiveX/RxNetty/issues/194 has been fixed. + + + @Override + public Mono setBody(File file, long position, long count) { + Channel channel = this.response.unsafeNettyChannel(); + + HttpResponse httpResponse = + new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + io.netty.handler.codec.http.HttpHeaders headers = httpResponse.headers(); + + for (Map.Entry> header : getHeaders().entrySet()) { + String headerName = header.getKey(); + for (String headerValue : header.getValue()) { + headers.add(headerName, headerValue); + } + } + Mono responseWrite = MonoChannelFuture.from(channel.write(httpResponse)); + + FileRegion fileRegion = new DefaultFileRegion(file, position, count); + Mono fileWrite = MonoChannelFuture.from(channel.writeAndFlush(fileRegion)); + + return Flux.concat(applyBeforeCommit(), responseWrite, fileWrite).after(); + } +*/ } \ No newline at end of file diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java index 8acffe45bbe..05cf650a5e6 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java @@ -67,9 +67,12 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle requestBody.registerListener(); ServerHttpRequest request = new UndertowServerHttpRequest(exchange, requestBody); - ResponseBodySubscriber responseBody = new ResponseBodySubscriber(exchange); + StreamSinkChannel responseChannel = exchange.getResponseChannel(); + ResponseBodySubscriber responseBody = + new ResponseBodySubscriber(exchange, responseChannel); responseBody.registerListener(); - ServerHttpResponse response = new UndertowServerHttpResponse(exchange, + ServerHttpResponse response = + new UndertowServerHttpResponse(exchange, responseChannel, publisher -> Mono.from(subscriber -> publisher.subscribe(responseBody)), allocator); @@ -202,9 +205,10 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle private Subscription subscription; - public ResponseBodySubscriber(HttpServerExchange exchange) { + public ResponseBodySubscriber(HttpServerExchange exchange, + StreamSinkChannel responseChannel) { this.exchange = exchange; - this.responseChannel = exchange.getResponseChannel(); + this.responseChannel = responseChannel; } public void registerListener() { diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java index 1d3d1598da0..67dd0a893ce 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java @@ -16,6 +16,10 @@ package org.springframework.http.server.reactive; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.channels.FileChannel; import java.util.List; import java.util.Map; import java.util.function.Function; @@ -25,12 +29,14 @@ import io.undertow.server.handlers.Cookie; import io.undertow.server.handlers.CookieImpl; import io.undertow.util.HttpString; import org.reactivestreams.Publisher; +import org.xnio.channels.StreamSinkChannel; import reactor.core.publisher.Mono; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferAllocator; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseCookie; +import org.springframework.http.ZeroCopyHttpOutputMessage; import org.springframework.util.Assert; /** @@ -39,19 +45,25 @@ import org.springframework.util.Assert; * @author Marek Hawrylczak * @author Rossen Stoyanchev */ -public class UndertowServerHttpResponse extends AbstractServerHttpResponse { +public class UndertowServerHttpResponse extends AbstractServerHttpResponse + implements ZeroCopyHttpOutputMessage { private final HttpServerExchange exchange; + private final StreamSinkChannel responseChannel; + private final Function, Mono> responseBodyWriter; public UndertowServerHttpResponse(HttpServerExchange exchange, + StreamSinkChannel responseChannel, Function, Mono> responseBodyWriter, DataBufferAllocator allocator) { super(allocator); Assert.notNull(exchange, "'exchange' is required."); + Assert.notNull(responseChannel, "'responseChannel' must not be null"); Assert.notNull(responseBodyWriter, "'responseBodyWriter' must not be null"); this.exchange = exchange; + this.responseChannel = responseChannel; this.responseBodyWriter = responseBodyWriter; } @@ -71,6 +83,26 @@ public class UndertowServerHttpResponse extends AbstractServerHttpResponse { return this.responseBodyWriter.apply(publisher); } + @Override + public Mono setBody(File file, long position, long count) { + writeHeaders(); + writeCookies(); + try { + FileChannel in = new FileInputStream(file).getChannel(); + long result = this.responseChannel.transferFrom(in, position, count); + if (result < count) { + return Mono.error(new IOException("Could only write " + result + + " out of " + count + " bytes")); + } + else { + return Mono.empty(); + } + } + catch (IOException ex) { + return Mono.error(ex); + } + } + @Override protected void writeHeaders() { for (Map.Entry> entry : getHeaders().entrySet()) { diff --git a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/ZeroCopyIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/ZeroCopyIntegrationTests.java new file mode 100644 index 00000000000..bee5462c1a1 --- /dev/null +++ b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/ZeroCopyIntegrationTests.java @@ -0,0 +1,98 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.server.reactive; + +import java.io.File; +import java.net.URI; + +import org.junit.Test; +import reactor.core.publisher.Mono; + +import org.springframework.core.io.ClassPathResource; +import org.springframework.core.io.Resource; +import org.springframework.http.MediaType; +import org.springframework.http.RequestEntity; +import org.springframework.http.ResponseEntity; +import org.springframework.http.ZeroCopyHttpOutputMessage; +import org.springframework.http.server.reactive.boot.ReactorHttpServer; +import org.springframework.http.server.reactive.boot.UndertowHttpServer; +import org.springframework.web.client.RestTemplate; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; + +/** + * @author Arjen Poutsma + */ +public class ZeroCopyIntegrationTests extends AbstractHttpHandlerIntegrationTests { + + private final ZeroCopyHandler handler = new ZeroCopyHandler(); + + @Override + protected HttpHandler createHttpHandler() { + return handler; + } + + @Test + public void zeroCopy() throws Exception { + // Zero-copy only does not support servlet + assumeTrue(server instanceof ReactorHttpServer || + server instanceof UndertowHttpServer); + + RestTemplate restTemplate = new RestTemplate(); + + RequestEntity request = + RequestEntity.get(new URI("http://localhost:" + port)).build(); + + ResponseEntity response = restTemplate.exchange(request, byte[].class); + + Resource logo = + new ClassPathResource("spring.png", ZeroCopyIntegrationTests.class); + + assertTrue(response.hasBody()); + assertEquals(logo.contentLength(), response.getHeaders().getContentLength()); + assertEquals(logo.contentLength(), response.getBody().length); + assertEquals(MediaType.IMAGE_PNG, response.getHeaders().getContentType()); + + } + + private static class ZeroCopyHandler implements HttpHandler { + + @Override + public Mono handle(ServerHttpRequest request, ServerHttpResponse response) { + try { + ZeroCopyHttpOutputMessage zeroCopyResponse = + (ZeroCopyHttpOutputMessage) response; + + Resource logo = new ClassPathResource("spring.png", + ZeroCopyIntegrationTests.class); + File logoFile = logo.getFile(); + zeroCopyResponse.getHeaders().setContentType(MediaType.IMAGE_PNG); + zeroCopyResponse.getHeaders().setContentLength(logoFile.length()); + return zeroCopyResponse.setBody(logoFile, 0, logoFile.length()); + + } + catch (Throwable ex) { + return Mono.error(ex); + } + + + } + } + +} \ No newline at end of file diff --git a/spring-web-reactive/src/test/resources/org/springframework/http/server/reactive/spring.png b/spring-web-reactive/src/test/resources/org/springframework/http/server/reactive/spring.png new file mode 100644 index 0000000000000000000000000000000000000000..2fec781a5e31ff09dd56ed4e01c1ee1927c67b2b GIT binary patch literal 951 zcmV;o14#UdP)R$ekqBST9o%}ZHk zR|8$mS|QMuiXyr^6htK5e2BCQVxInInzd=2ot<&DorU?}_i|=t=XZ|Z`OcZ6kB=UD z=%I%mDx%|IP@y-=Zvq4ac%ytI=odP$&fDap!FNHPQtB<@<3I)&6v58hJXJk2N4I=A{zxnc%9YW!vscwtsRZ zZvd}6E#J~*AoX{QQr8CLXbs$*_ z@@=*t5?^HEQ%G?Kgk}7*r{%3+a~VgnF0gX<^-FbNqZUSg_c@kC=bK5pP4>QSsSaHC zw0wFEOG5Vrlj>HZt9xxy9r)-eaaw6o&Tu3oGnnj`PaQjTj`Dt$whjMHoQm$M{F5@21e8omm*VI9z$B?T z&^P8K61iS{jTsts%GzV9ah7A@i3IiT(pniHy_Du;wFL@?n+qhHN-t;zh;c6uL;&v>8YG`;o4;(TiQ3 z2+(|`l2sNXIevp7hc2=twTk0QVMMlhn6P-%_tvtmR#-XCEByjn1i80#Czx`U<3}>4 zz|6}@ji@U^gH-%w)?+ZZP{I`vbehUexa% z1xJ8t`2dRO*{GsihuB=uE0TX=zqYNMYl=H+`hq-eKGMh}3Hzl9PA2{gz<@9i54M9G zP$8UbScP