Browse Source

Merge branch '5.1.x'

pull/22617/head
Arjen Poutsma 7 years ago
parent
commit
228cae216e
  1. 71
      spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java

71
spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java

@ -27,9 +27,9 @@ import io.undertow.server.handlers.Cookie; @@ -27,9 +27,9 @@ import io.undertow.server.handlers.Cookie;
import io.undertow.server.handlers.CookieImpl;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.xnio.channels.Channels;
import org.xnio.channels.StreamSinkChannel;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
@ -124,19 +124,25 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl @@ -124,19 +124,25 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl
@Override
public Mono<Void> writeWith(Path file, long position, long count) {
return doCommit(() ->
Mono.defer(() -> {
try (FileChannel source = FileChannel.open(file, StandardOpenOption.READ)) {
Mono.create(sink -> {
try {
FileChannel source = FileChannel.open(file, StandardOpenOption.READ);
TransferBodyListener listener = new TransferBodyListener(source, position,
count, sink);
sink.onDispose(listener::closeSource);
StreamSinkChannel destination = this.exchange.getResponseChannel();
Channels.transferBlocking(destination, source, position, count);
return Mono.empty();
destination.getWriteSetter().set(listener::transfer);
listener.transfer(destination);
}
catch (IOException ex) {
return Mono.error(ex);
sink.error(ex);
}
}));
}
@Override
protected Processor<? super Publisher<? extends DataBuffer>, Void> createBodyFlushProcessor() {
return new ResponseBodyFlushProcessor();
@ -296,4 +302,55 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl @@ -296,4 +302,55 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl
}
}
private static class TransferBodyListener {
private final FileChannel source;
private final MonoSink<Void> sink;
private long position;
private long count;
public TransferBodyListener(FileChannel source, long position, long count, MonoSink<Void> sink) {
this.source = source;
this.sink = sink;
this.position = position;
this.count = count;
}
public void transfer(StreamSinkChannel destination) {
try {
while (this.count > 0) {
long len = destination.transferFrom(this.source, this.position, this.count);
if (len != 0) {
this.position += len;
this.count -= len;
}
else {
destination.resumeWrites();
return;
}
}
this.sink.success();
}
catch (IOException ex) {
this.sink.error(ex);
}
}
public void closeSource() {
try {
this.source.close();
}
catch (IOException ignore) {
}
}
}
}

Loading…
Cancel
Save