From 55d6f88dcd4bfc2f3ca993d73bedbff3a9bd5dee Mon Sep 17 00:00:00 2001 From: Brian Clozel Date: Mon, 12 Sep 2016 12:18:56 +0200 Subject: [PATCH] Add ResourceRegionEncoder This commit adds the necessary infrastructure for the support of HTTP Range requests. The new `ResourceRegionEncoder` can write `ResourceRegion` objects as streams of bytes. The `ResourceRegionEncoder` relies on an encoding hint `BOUNDARY_STRING_HINT`. If present, the encoder infers that multiple `ResourceRegion`s should be encoded and that the provided boundary String should be used to separate ranges by mime boundaries. If that hint is absent, only a single resource region is encoded. Issue: SPR-14664 --- .../core/codec/ResourceRegionEncoder.java | 146 ++++++++++++++++++ .../core/io/buffer/DataBufferUtils.java | 39 ++++- .../springframework/util/ResourceUtils.java | 24 +++ .../codec/ResourceRegionEncoderTests.java | 145 +++++++++++++++++ .../core/io/buffer/DataBufferUtilsTests.java | 30 ++++ .../http/codec/ResourceHttpMessageWriter.java | 17 +- 6 files changed, 385 insertions(+), 16 deletions(-) create mode 100644 spring-core/src/main/java/org/springframework/core/codec/ResourceRegionEncoder.java create mode 100644 spring-core/src/test/java/org/springframework/core/codec/ResourceRegionEncoderTests.java diff --git a/spring-core/src/main/java/org/springframework/core/codec/ResourceRegionEncoder.java b/spring-core/src/main/java/org/springframework/core/codec/ResourceRegionEncoder.java new file mode 100644 index 00000000000..80fd50bb240 --- /dev/null +++ b/spring-core/src/main/java/org/springframework/core/codec/ResourceRegionEncoder.java @@ -0,0 +1,146 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.codec; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.OptionalLong; + +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.core.ResolvableType; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.support.ResourceRegion; +import org.springframework.util.Assert; +import org.springframework.util.MimeType; +import org.springframework.util.MimeTypeUtils; +import org.springframework.util.ResourceUtils; +import org.springframework.util.StreamUtils; + +/** + * Encoder for {@link ResourceRegion}s. + * + * @author Brian Clozel + * @since 5.0 + */ +public class ResourceRegionEncoder extends AbstractEncoder { + + public static final int DEFAULT_BUFFER_SIZE = StreamUtils.BUFFER_SIZE; + + public static final String BOUNDARY_STRING_HINT = ResourceRegionEncoder.class.getName() + ".boundaryString"; + + private final int bufferSize; + + + public ResourceRegionEncoder() { + this(DEFAULT_BUFFER_SIZE); + } + + public ResourceRegionEncoder(int bufferSize) { + super(MimeTypeUtils.APPLICATION_OCTET_STREAM, MimeTypeUtils.ALL); + Assert.isTrue(bufferSize > 0, "'bufferSize' must be larger than 0"); + this.bufferSize = bufferSize; + } + + @Override + public boolean canEncode(ResolvableType elementType, MimeType mimeType) { + + return super.canEncode(elementType, mimeType) + && ResourceRegion.class.isAssignableFrom(elementType.getRawClass()); + } + + @Override + public Flux encode(Publisher inputStream, + DataBufferFactory bufferFactory, ResolvableType elementType, MimeType mimeType, Map hints) { + + Assert.notNull(inputStream, "'inputStream' must not be null"); + Assert.notNull(bufferFactory, "'bufferFactory' must not be null"); + Assert.notNull(elementType, "'elementType' must not be null"); + + if (inputStream instanceof Mono) { + return ((Mono) inputStream) + .flatMap(region -> writeResourceRegion(region, bufferFactory)); + } + else { + Assert.notNull(hints, "'hints' must not be null"); + Assert.isTrue(hints.containsKey(BOUNDARY_STRING_HINT), "'hints' must contain boundaryString hint"); + final String boundaryString = (String) hints.get(BOUNDARY_STRING_HINT); + + byte[] startBoundary = getAsciiBytes("\r\n--" + boundaryString + "\r\n"); + byte[] contentType = getAsciiBytes("Content-Type: " + mimeType.toString() + "\r\n"); + + Flux regions = Flux.from(inputStream). + concatMap(region -> + Flux.concat( + getRegionPrefix(bufferFactory, startBoundary, contentType, region), + writeResourceRegion(region, bufferFactory) + )); + return Flux.concat(regions, getRegionSuffix(bufferFactory, boundaryString)); + } + } + + private Flux getRegionPrefix(DataBufferFactory bufferFactory, byte[] startBoundary, + byte[] contentType, ResourceRegion region) { + + return Flux.just( + bufferFactory.allocateBuffer(startBoundary.length).write(startBoundary), + bufferFactory.allocateBuffer(contentType.length).write(contentType), + bufferFactory.wrap(ByteBuffer.wrap(getContentRangeHeader(region))) + ); + } + + private Flux writeResourceRegion(ResourceRegion region, DataBufferFactory bufferFactory) { + try { + ReadableByteChannel resourceChannel = region.getResource().readableChannel(); + Flux in = DataBufferUtils.read(resourceChannel, bufferFactory, this.bufferSize); + Flux skipped = DataBufferUtils.skipUntilByteCount(in, region.getPosition()); + return DataBufferUtils.takeUntilByteCount(skipped, region.getCount()); + } + catch (IOException exc) { + return Flux.error(exc); + } + } + + private Flux getRegionSuffix(DataBufferFactory bufferFactory, String boundaryString) { + byte[] endBoundary = getAsciiBytes("\r\n--" + boundaryString + "--"); + return Flux.just(bufferFactory.allocateBuffer(endBoundary.length).write(endBoundary)); + } + + private byte[] getAsciiBytes(String in) { + return in.getBytes(StandardCharsets.US_ASCII); + } + + private byte[] getContentRangeHeader(ResourceRegion region) { + long start = region.getPosition(); + long end = start + region.getCount() - 1; + OptionalLong contentLength = ResourceUtils.contentLength(region.getResource()); + if (contentLength.isPresent()) { + return getAsciiBytes("Content-Range: bytes " + start + "-" + end + "/" + contentLength.getAsLong() + "\r\n\r\n"); + } + else { + return getAsciiBytes("Content-Range: bytes " + start + "-" + end + "\r\n\r\n"); + } + } + +} diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index 5dccfffd787..b9172bbf883 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -31,10 +31,11 @@ import reactor.core.publisher.SynchronousSink; import org.springframework.util.Assert; -/**i +/** * Utility class for working with {@link DataBuffer}s. * * @author Arjen Poutsma + * @author Brian Clozel * @since 5.0 */ public abstract class DataBufferUtils { @@ -119,6 +120,42 @@ public abstract class DataBufferUtils { }); } + /** + * Skip buffers from the given {@link Publisher} until the total + * {@linkplain DataBuffer#readableByteCount() byte count} reaches + * the given maximum byte count, or until the publisher is complete. + * @param publisher the publisher to filter + * @param maxByteCount the maximum byte count + * @return a flux with the remaining part of the given publisher + */ + public static Flux skipUntilByteCount(Publisher publisher, long maxByteCount) { + Assert.notNull(publisher, "Publisher must not be null"); + Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number"); + AtomicLong byteCountDown = new AtomicLong(maxByteCount); + + return Flux.from(publisher). + skipUntil(dataBuffer -> { + int delta = -dataBuffer.readableByteCount(); + long currentCount = byteCountDown.addAndGet(delta); + if(currentCount < 0) { + return true; + } else { + DataBufferUtils.release(dataBuffer); + return false; + } + }). + map(dataBuffer -> { + long currentCount = byteCountDown.get(); + // slice first buffer, then let others flow through + if (currentCount < 0) { + int skip = (int) (currentCount + dataBuffer.readableByteCount()); + byteCountDown.set(0); + return dataBuffer.slice(skip, dataBuffer.readableByteCount() - skip); + } + return dataBuffer; + }); + } + /** * Retain the given data buffer, it it is a {@link PooledDataBuffer}. * @param dataBuffer the data buffer to retain diff --git a/spring-core/src/main/java/org/springframework/util/ResourceUtils.java b/spring-core/src/main/java/org/springframework/util/ResourceUtils.java index 0f37194c056..1ab9a67cac5 100644 --- a/spring-core/src/main/java/org/springframework/util/ResourceUtils.java +++ b/spring-core/src/main/java/org/springframework/util/ResourceUtils.java @@ -18,11 +18,16 @@ package org.springframework.util; import java.io.File; import java.io.FileNotFoundException; +import java.io.IOException; import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; import java.net.URLConnection; +import java.util.OptionalLong; + +import org.springframework.core.io.InputStreamResource; +import org.springframework.core.io.Resource; /** * Utility methods for resolving resource locations to files in the @@ -384,4 +389,23 @@ public abstract class ResourceUtils { con.setUseCaches(con.getClass().getSimpleName().startsWith("JNLP")); } + /** + * Determine, if possible, the contentLength of the given resource + * without reading it. + * @param resource the resource instance + * @return the contentLength of the resource + */ + public static OptionalLong contentLength(Resource resource) { + // Don't try to determine contentLength on InputStreamResource - cannot be read afterwards... + // Note: custom InputStreamResource subclasses could provide a pre-calculated content length! + if (InputStreamResource.class != resource.getClass()) { + try { + return OptionalLong.of(resource.contentLength()); + } + catch (IOException ignored) { + } + } + return OptionalLong.empty(); + } + } diff --git a/spring-core/src/test/java/org/springframework/core/codec/ResourceRegionEncoderTests.java b/spring-core/src/test/java/org/springframework/core/codec/ResourceRegionEncoderTests.java new file mode 100644 index 00000000000..83f1ef3c027 --- /dev/null +++ b/spring-core/src/test/java/org/springframework/core/codec/ResourceRegionEncoderTests.java @@ -0,0 +1,145 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.codec; + +import static org.hamcrest.Matchers.*; +import static org.junit.Assert.*; + +import java.nio.charset.StandardCharsets; +import java.util.Collections; + +import org.junit.Before; +import org.junit.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.core.ResolvableType; +import org.springframework.core.io.ByteArrayResource; +import org.springframework.core.io.Resource; +import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.core.io.buffer.support.DataBufferTestUtils; +import org.springframework.core.io.support.ResourceRegion; +import org.springframework.tests.TestSubscriber; +import org.springframework.util.MimeType; +import org.springframework.util.MimeTypeUtils; +import org.springframework.util.StringUtils; + +/** + * Test cases for {@link ResourceRegionEncoder} class. + * + * @author Brian Clozel + */ +public class ResourceRegionEncoderTests extends AbstractDataBufferAllocatingTestCase { + + private ResourceRegionEncoder encoder; + + private Resource resource; + + @Before + public void setUp() { + this.encoder = new ResourceRegionEncoder(); + String content = "Spring Framework test resource content."; + this.resource = new ByteArrayResource(content.getBytes(StandardCharsets.UTF_8)); + this.bufferFactory = new DefaultDataBufferFactory(); + } + + @Test + public void canEncode() { + ResolvableType resourceRegion = ResolvableType.forClass(ResourceRegion.class); + MimeType allMimeType = MimeType.valueOf("*/*"); + + assertFalse(this.encoder.canEncode(ResolvableType.forClass(Resource.class), + MimeTypeUtils.APPLICATION_OCTET_STREAM)); + assertFalse(this.encoder.canEncode(ResolvableType.forClass(Resource.class), allMimeType)); + assertTrue(this.encoder.canEncode(resourceRegion, MimeTypeUtils.APPLICATION_OCTET_STREAM)); + assertTrue(this.encoder.canEncode(resourceRegion, allMimeType)); + } + + @Test + public void shouldEncodeResourceRegion() throws Exception { + + ResourceRegion region = new ResourceRegion(this.resource, 0, 6); + Flux result = this.encoder.encode(Mono.just(region), this.bufferFactory, + ResolvableType.forClass(ResourceRegion.class), MimeTypeUtils.APPLICATION_OCTET_STREAM + , Collections.emptyMap()); + + TestSubscriber.subscribe(result) + .assertNoError() + .assertComplete() + .assertValuesWith(stringConsumer("Spring")); + } + + @Test + public void shouldEncodeMultipleResourceRegions() throws Exception { + + Flux regions = Flux.just( + new ResourceRegion(this.resource, 0, 6), + new ResourceRegion(this.resource, 7, 9), + new ResourceRegion(this.resource, 17, 4), + new ResourceRegion(this.resource, 22, 17) + ); + String boundary = MimeTypeUtils.generateMultipartBoundaryString(); + + Flux result = this.encoder.encode(regions, this.bufferFactory, + ResolvableType.forClass(ResourceRegion.class), + MimeType.valueOf("text/plain"), + Collections.singletonMap(ResourceRegionEncoder.BOUNDARY_STRING_HINT, boundary) + ); + + Mono reduced = result + .reduce(bufferFactory.allocateBuffer(), (previous, current) -> { + previous.write(current); + DataBufferUtils.release(current); + return previous; + }); + + TestSubscriber + .subscribe(reduced) + .assertNoError() + .assertComplete() + .assertValuesWith(dataBuffer -> { + String content = DataBufferTestUtils.dumpString(dataBuffer, StandardCharsets.UTF_8); + String[] ranges = StringUtils.tokenizeToStringArray(content, "\r\n", false, true); + + assertThat(ranges[0], is("--" + boundary)); + assertThat(ranges[1], is("Content-Type: text/plain")); + assertThat(ranges[2], is("Content-Range: bytes 0-5/39")); + assertThat(ranges[3], is("Spring")); + + assertThat(ranges[4], is("--" + boundary)); + assertThat(ranges[5], is("Content-Type: text/plain")); + assertThat(ranges[6], is("Content-Range: bytes 7-15/39")); + assertThat(ranges[7], is("Framework")); + + assertThat(ranges[8], is("--" + boundary)); + assertThat(ranges[9], is("Content-Type: text/plain")); + assertThat(ranges[10], is("Content-Range: bytes 17-20/39")); + assertThat(ranges[11], is("test")); + + assertThat(ranges[12], is("--" + boundary)); + assertThat(ranges[13], is("Content-Type: text/plain")); + assertThat(ranges[14], is("Content-Range: bytes 22-38/39")); + assertThat(ranges[15], is("resource content.")); + + assertThat(ranges[16], is("--" + boundary + "--")); + }); + } + +} diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java index d1fa5551d68..a132952f1d7 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java @@ -99,4 +99,34 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { release(baz); } + @Test + public void skipUntilByteCount() { + DataBuffer foo = stringBuffer("foo"); + DataBuffer bar = stringBuffer("bar"); + DataBuffer baz = stringBuffer("baz"); + Flux flux = Flux.just(foo, bar, baz); + Flux result = DataBufferUtils.skipUntilByteCount(flux, 5L); + + TestSubscriber + .subscribe(result) + .assertNoError() + .assertComplete() + .assertValuesWith(stringConsumer("r"), stringConsumer("baz")); + } + + @Test + public void skipUntilByteCountShouldSkipAll() { + DataBuffer foo = stringBuffer("foo"); + DataBuffer bar = stringBuffer("bar"); + DataBuffer baz = stringBuffer("baz"); + Flux flux = Flux.just(foo, bar, baz); + Flux result = DataBufferUtils.skipUntilByteCount(flux, 9L); + + TestSubscriber + .subscribe(result) + .assertNoError() + .assertNoValues() + .assertComplete(); + } + } diff --git a/spring-web/src/main/java/org/springframework/http/codec/ResourceHttpMessageWriter.java b/spring-web/src/main/java/org/springframework/http/codec/ResourceHttpMessageWriter.java index 0541c24a7c5..df025dcc268 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/ResourceHttpMessageWriter.java +++ b/spring-web/src/main/java/org/springframework/http/codec/ResourceHttpMessageWriter.java @@ -28,13 +28,13 @@ import reactor.core.publisher.Mono; import org.springframework.core.ResolvableType; import org.springframework.core.codec.ResourceDecoder; import org.springframework.core.codec.ResourceEncoder; -import org.springframework.core.io.InputStreamResource; import org.springframework.core.io.Resource; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.http.MediaTypeFactory; import org.springframework.http.ReactiveHttpOutputMessage; import org.springframework.http.ZeroCopyHttpOutputMessage; +import org.springframework.util.ResourceUtils; /** * Implementation of {@link HttpMessageWriter} that can write @@ -81,7 +81,7 @@ public class ResourceHttpMessageWriter extends EncoderHttpMessageWriter contentLength(Resource resource) { - // Don't try to determine contentLength on InputStreamResource - cannot be read afterwards... - // Note: custom InputStreamResource subclasses could provide a pre-calculated content length! - if (InputStreamResource.class != resource.getClass()) { - try { - return Optional.of(resource.contentLength()); - } - catch (IOException ignored) { - } - } - return Optional.empty(); - } - private static Optional getFile(Resource resource) { if (resource.isFile()) { try {