diff --git a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/ResourceDecoder.java b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/ResourceDecoder.java new file mode 100644 index 00000000000..71b3ac8810a --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/ResourceDecoder.java @@ -0,0 +1,77 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.codec.support; + +import java.io.InputStream; + +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.core.ResolvableType; +import org.springframework.core.io.ByteArrayResource; +import org.springframework.core.io.InputStreamResource; +import org.springframework.core.io.Resource; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.support.DataBufferUtils; +import org.springframework.util.MimeType; +import org.springframework.util.MimeTypeUtils; + +/** + * A decoder for {@link Resource}s. + * + * @author Arjen Poutsma + */ +public class ResourceDecoder extends AbstractDecoder { + + public ResourceDecoder() { + super(MimeTypeUtils.ALL); + } + + @Override + public boolean canDecode(ResolvableType type, MimeType mimeType, Object... hints) { + Class clazz = type.getRawClass(); + return (InputStreamResource.class.equals(clazz) || + clazz.isAssignableFrom(ByteArrayResource.class)) && + super.canDecode(type, mimeType, hints); + } + + @Override + public Flux decode(Publisher inputStream, ResolvableType type, + MimeType mimeType, Object... hints) { + Class clazz = type.getRawClass(); + + Flux body = Flux.from(inputStream); + + if (InputStreamResource.class.equals(clazz)) { + InputStream is = DataBufferUtils.toInputStream(body); + return Flux.just(new InputStreamResource(is)); + } + else if (clazz.isAssignableFrom(ByteArrayResource.class)) { + Mono singleBuffer = body.reduce(DataBuffer::write); + return Flux.from(singleBuffer.map(buffer -> { + byte[] bytes = new byte[buffer.readableByteCount()]; + buffer.read(bytes); + return new ByteArrayResource(bytes); + })); + } + else { + return Flux.error(new IllegalStateException( + "Unsupported resource class: " + clazz)); + } + } +} diff --git a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/ResourceEncoder.java b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/ResourceEncoder.java new file mode 100644 index 00000000000..2451ecf9a84 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/ResourceEncoder.java @@ -0,0 +1,78 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.codec.support; + +import java.io.IOException; +import java.io.InputStream; + +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.core.ResolvableType; +import org.springframework.core.io.Resource; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferAllocator; +import org.springframework.core.io.buffer.support.DataBufferUtils; +import org.springframework.util.Assert; +import org.springframework.util.MimeType; +import org.springframework.util.MimeTypeUtils; +import org.springframework.util.StreamUtils; + +/** + * An encoder for {@link Resource}s. + * @author Arjen Poutsma + */ +public class ResourceEncoder extends AbstractEncoder { + + public static final int DEFAULT_BUFFER_SIZE = StreamUtils.BUFFER_SIZE; + + private final int bufferSize; + + public ResourceEncoder() { + this(DEFAULT_BUFFER_SIZE); + } + + public ResourceEncoder(int bufferSize) { + super(MimeTypeUtils.ALL); + Assert.isTrue(bufferSize > 0, "'bufferSize' must be larger than 0"); + this.bufferSize = bufferSize; + } + + @Override + public boolean canEncode(ResolvableType type, MimeType mimeType, Object... hints) { + Class clazz = type.getRawClass(); + return (super.canEncode(type, mimeType, hints) && + Resource.class.isAssignableFrom(clazz)); + } + + @Override + public Flux encode(Publisher inputStream, + DataBufferAllocator allocator, ResolvableType type, MimeType mimeType, + Object... hints) { + return Flux.from(inputStream). + concatMap(resource -> { + try { + InputStream is = resource.getInputStream(); + return DataBufferUtils.read(is, allocator, this.bufferSize); + } + catch (IOException ex) { + return Mono.error(ex); + } + }); + } +} diff --git a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/StringEncoder.java b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/StringEncoder.java index 810d6de3110..14914e4b158 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/StringEncoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/StringEncoder.java @@ -45,7 +45,7 @@ public class StringEncoder extends AbstractEncoder { @Override public boolean canEncode(ResolvableType type, MimeType mimeType, Object... hints) { Class clazz = type.getRawClass(); - return (super.canEncode(type, mimeType, hints) && String.class.isAssignableFrom(clazz)); + return (super.canEncode(type, mimeType, hints) && String.class.equals(clazz)); } @Override diff --git a/spring-web-reactive/src/main/java/org/springframework/core/io/support/ResourceUtils2.java b/spring-web-reactive/src/main/java/org/springframework/core/io/support/ResourceUtils2.java new file mode 100644 index 00000000000..94a3af3f14f --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/core/io/support/ResourceUtils2.java @@ -0,0 +1,59 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.io.support; + +import java.io.IOException; +import java.net.URI; + +import org.springframework.core.io.ByteArrayResource; +import org.springframework.core.io.DescriptiveResource; +import org.springframework.core.io.InputStreamResource; +import org.springframework.core.io.Resource; +import org.springframework.util.Assert; +import org.springframework.util.ResourceUtils; + +/** + * @author Arjen Poutsma + */ +public abstract class ResourceUtils2 { + + /** + * Indicates whether the given resource has a file, so that {@link + * Resource#getFile()} + * can be called without an {@link java.io.IOException}. + * @param resource the resource to check + * @return {@code true} if the given resource has a file; {@code false} otherwise + */ + // TODO: refactor into Resource.hasFile() method + public static boolean hasFile(Resource resource) { + Assert.notNull(resource, "'resource' must not be null"); + + // the following Resource implementations do not support getURI/getFile + if (resource instanceof ByteArrayResource || + resource instanceof DescriptiveResource || + resource instanceof InputStreamResource) { + return false; + } + try { + URI resourceUri = resource.getURI(); + return ResourceUtils.URL_PROTOCOL_FILE.equals(resourceUri.getScheme()); + } + catch (IOException ignored) { + } + return false; + } +} diff --git a/spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/CodecHttpMessageConverter.java b/spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/CodecHttpMessageConverter.java index b55ad9c4cb2..4444a42619f 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/CodecHttpMessageConverter.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/CodecHttpMessageConverter.java @@ -29,12 +29,16 @@ import org.springframework.core.codec.Decoder; import org.springframework.core.codec.Encoder; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferAllocator; +import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.http.ReactiveHttpInputMessage; import org.springframework.http.ReactiveHttpOutputMessage; import org.springframework.http.support.MediaTypeUtils; /** + * Implementation of the {@link HttpMessageConverter} interface that delegates to + * {@link Encoder} and {@link Decoder}. + * * @author Arjen Poutsma */ public class CodecHttpMessageConverter implements HttpMessageConverter { @@ -43,6 +47,32 @@ public class CodecHttpMessageConverter implements HttpMessageConverter { private final Decoder decoder; + /** + * Create a {@code CodecHttpMessageConverter} with the given {@link Encoder}. When + * using this constructor, all read-related methods will in {@code false} or an + * {@link IllegalStateException}. + * @param encoder the encoder to use + */ + public CodecHttpMessageConverter(Encoder encoder) { + this(encoder, null); + } + + /** + * Create a {@code CodecHttpMessageConverter} with the given {@link Decoder}. When + * using this constructor, all write-related methods will in {@code false} or an + * {@link IllegalStateException}. + * @param decoder the decoder to use + */ + public CodecHttpMessageConverter(Decoder decoder) { + this(null, decoder); + } + + /** + * Create a {@code CodecHttpMessageConverter} with the given {@link Encoder} and + * {@link Decoder}. + * @param encoder the encoder to use, can be {@code null} + * @param decoder the decoder to use, can be {@code null} + */ public CodecHttpMessageConverter(Encoder encoder, Decoder decoder) { this.encoder = encoder; this.decoder = decoder; @@ -94,9 +124,13 @@ public class CodecHttpMessageConverter implements HttpMessageConverter { if (this.encoder == null) { return Mono.error(new IllegalStateException("No decoder set")); } - outputMessage.getHeaders().setContentType(contentType); + HttpHeaders headers = outputMessage.getHeaders(); + if (headers.getContentType() == null) { + headers.setContentType(contentType); + } DataBufferAllocator allocator = outputMessage.allocator(); - Flux body = encoder.encode(inputStream, allocator, type, contentType); + Flux body = + this.encoder.encode(inputStream, allocator, type, contentType); return outputMessage.setBody(body); } } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/ResourceHttpMessageConverter.java b/spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/ResourceHttpMessageConverter.java index 5091fc6fd2a..79392a518d5 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/ResourceHttpMessageConverter.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/ResourceHttpMessageConverter.java @@ -18,10 +18,6 @@ package org.springframework.http.converter.reactive; import java.io.File; import java.io.IOException; -import java.io.InputStream; -import java.net.URI; -import java.util.Collections; -import java.util.List; import java.util.Optional; import org.reactivestreams.Publisher; @@ -29,106 +25,46 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.core.ResolvableType; -import org.springframework.core.io.ByteArrayResource; -import org.springframework.core.io.DescriptiveResource; +import org.springframework.core.codec.support.ResourceDecoder; +import org.springframework.core.codec.support.ResourceEncoder; import org.springframework.core.io.InputStreamResource; import org.springframework.core.io.Resource; -import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.support.DataBufferUtils; +import org.springframework.core.io.support.ResourceUtils2; import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpRangeResource; import org.springframework.http.MediaType; -import org.springframework.http.ReactiveHttpInputMessage; import org.springframework.http.ReactiveHttpOutputMessage; import org.springframework.http.ZeroCopyHttpOutputMessage; import org.springframework.http.support.MediaTypeUtils; import org.springframework.util.MimeTypeUtils2; -import org.springframework.util.ResourceUtils; -import org.springframework.util.StreamUtils; /** + * Implementation of {@link HttpMessageConverter} that can read and write + * {@link Resource Resources} and supports byte range requests. + ** * @author Arjen Poutsma */ -public class ResourceHttpMessageConverter implements HttpMessageConverter { +public class ResourceHttpMessageConverter extends CodecHttpMessageConverter { - private static final int BUFFER_SIZE = StreamUtils.BUFFER_SIZE; - - private static final List SUPPORTED_MEDIA_TYPES = - Collections.singletonList(MediaType.ALL); - - @Override - public boolean canRead(ResolvableType type, MediaType mediaType) { - return Resource.class.isAssignableFrom(type.getRawClass()); - } - - @Override - public boolean canWrite(ResolvableType type, MediaType mediaType) { - return Resource.class.isAssignableFrom(type.getRawClass()); - } - - @Override - public List getReadableMediaTypes() { - return SUPPORTED_MEDIA_TYPES; + public ResourceHttpMessageConverter() { + super(new ResourceEncoder(), new ResourceDecoder()); } - @Override - public List getWritableMediaTypes() { - return SUPPORTED_MEDIA_TYPES; - } - - @Override - public Flux read(ResolvableType type, - ReactiveHttpInputMessage inputMessage) { - Class clazz = type.getRawClass(); - - Flux body = inputMessage.getBody(); - - if (InputStreamResource.class.equals(clazz)) { - InputStream is = DataBufferUtils.toInputStream(body); - return Flux.just(new InputStreamResource(is)); - } - else if (clazz.isAssignableFrom(ByteArrayResource.class)) { - Mono singleBuffer = body.reduce(DataBuffer::write); - return Flux.from(singleBuffer.map(buffer -> { - byte[] bytes = new byte[buffer.readableByteCount()]; - buffer.read(bytes); - return new ByteArrayResource(bytes); - })); - } - else { - return Flux.error(new IllegalStateException( - "Unsupported resource class: " + clazz)); - } + public ResourceHttpMessageConverter(int bufferSize) { + super(new ResourceEncoder(bufferSize), new ResourceDecoder()); } @Override public Mono write(Publisher inputStream, ResolvableType type, MediaType contentType, ReactiveHttpOutputMessage outputMessage) { - - if (inputStream instanceof Mono) { - // single resource - return Mono.from(Flux.from(inputStream). - flatMap(resource -> { - HttpHeaders headers = outputMessage.getHeaders(); - addHeaders(headers, resource, contentType); - - if (resource instanceof HttpRangeResource) { - return writePartialContent((HttpRangeResource) resource, - outputMessage); - } - else { - return writeContent(resource, outputMessage, 0, -1); - } - - - })); - } - else { - // multiple resources, not supported! - return Mono.error(new IllegalArgumentException( - "Multiple resources not yet supported")); - } + return Mono.from(Flux.from(inputStream). + take(1). + concatMap(resource -> { + HttpHeaders headers = outputMessage.getHeaders(); + addHeaders(headers, resource, contentType); + + return writeContent(resource, type, contentType, outputMessage); + })); } protected void addHeaders(HttpHeaders headers, Resource resource, @@ -146,54 +82,24 @@ public class ResourceHttpMessageConverter implements HttpMessageConverter writeContent(Resource resource, - ReactiveHttpOutputMessage outputMessage, long position, long count) { + private Mono writeContent(Resource resource, ResolvableType type, + MediaType contentType, ReactiveHttpOutputMessage outputMessage) { if (outputMessage instanceof ZeroCopyHttpOutputMessage) { Optional file = getFile(resource); if (file.isPresent()) { ZeroCopyHttpOutputMessage zeroCopyResponse = (ZeroCopyHttpOutputMessage) outputMessage; - if (count < 0) { - count = file.get().length(); - } - - return zeroCopyResponse.setBody(file.get(), position, count); - } - } - - // non-zero copy fallback - try { - InputStream is = resource.getInputStream(); - long skipped = is.skip(position); - if (skipped < position) { - return Mono.error(new IOException( - "Skipped only " + skipped + " bytes out of " + count + - " required.")); - } - - Flux responseBody = - DataBufferUtils.read(is, outputMessage.allocator(), BUFFER_SIZE); - if (count > 0) { - responseBody = DataBufferUtils.takeUntilByteCount(responseBody, count); + return zeroCopyResponse + .setBody(file.get(), (long) 0, file.get().length()); } - - return outputMessage.setBody(responseBody); - } - catch (IOException ex) { - return Mono.error(ex); } - } - - protected Mono writePartialContent(HttpRangeResource resource, - ReactiveHttpOutputMessage outputMessage) { - - // TODO: implement - return Mono.empty(); + // non-zero copy fallback, using ResourceEncoder + return super.write(Mono.just(resource), type, + outputMessage.getHeaders().getContentType(), outputMessage); } private static Optional contentLength(Resource resource) { @@ -210,18 +116,12 @@ public class ResourceHttpMessageConverter implements HttpMessageConverter getFile(Resource resource) { - // TODO: introduce Resource.hasFile() property to bypass the potential IOException thrown in Resource.getFile() - // the following Resource implementations do not support getURI/getFile - if (!(resource instanceof ByteArrayResource || - resource instanceof DescriptiveResource || - resource instanceof InputStreamResource)) { + if (ResourceUtils2.hasFile(resource)) { try { - URI resourceUri = resource.getURI(); - if (ResourceUtils.URL_PROTOCOL_FILE.equals(resourceUri.getScheme())) { - return Optional.of(ResourceUtils.getFile(resourceUri)); - } + return Optional.of(resource.getFile()); } catch (IOException ignored) { + // should not happen } } return Optional.empty(); diff --git a/spring-web-reactive/src/test/java/org/springframework/core/codec/support/ResourceDecoderTests.java b/spring-web-reactive/src/test/java/org/springframework/core/codec/support/ResourceDecoderTests.java new file mode 100644 index 00000000000..e2df2423649 --- /dev/null +++ b/spring-web-reactive/src/test/java/org/springframework/core/codec/support/ResourceDecoderTests.java @@ -0,0 +1,80 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.codec.support; + +import java.io.IOException; + +import org.junit.Test; +import reactor.core.publisher.Flux; +import reactor.core.test.TestSubscriber; + +import org.springframework.core.ResolvableType; +import org.springframework.core.io.ByteArrayResource; +import org.springframework.core.io.InputStreamResource; +import org.springframework.core.io.Resource; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.http.MediaType; +import org.springframework.util.StreamUtils; + +import static org.junit.Assert.*; + +/** + * @author Arjen Poutsma + */ +public class ResourceDecoderTests extends AbstractAllocatingTestCase { + + private final ResourceDecoder decoder = new ResourceDecoder(); + + @Test + public void canDecode() throws Exception { + assertTrue(decoder.canDecode(ResolvableType.forClass(InputStreamResource.class), + MediaType.TEXT_PLAIN)); + assertTrue(decoder.canDecode(ResolvableType.forClass(ByteArrayResource.class), + MediaType.TEXT_PLAIN)); + assertTrue(decoder.canDecode(ResolvableType.forClass(Resource.class), + MediaType.TEXT_PLAIN)); + assertTrue(decoder.canDecode(ResolvableType.forClass(InputStreamResource.class), + MediaType.APPLICATION_JSON)); + } + + @Test + public void decode() throws Exception { + DataBuffer fooBuffer = stringBuffer("foo"); + DataBuffer barBuffer = stringBuffer("bar"); + Flux source = Flux.just(fooBuffer, barBuffer); + + Flux result = + decoder.decode(source, ResolvableType.forClass(Resource.class), null); + + TestSubscriber testSubscriber = new TestSubscriber<>(); + testSubscriber.bindTo(result). + assertNoError(). + assertComplete(). + assertValuesWith(resource -> { + try { + byte[] bytes = + StreamUtils.copyToByteArray(resource.getInputStream()); + assertEquals("foobar", new String(bytes)); + } + catch (IOException e) { + fail(e.getMessage()); + } + }); + + } + +} \ No newline at end of file diff --git a/spring-web-reactive/src/test/java/org/springframework/core/codec/support/ResourceEncoderTests.java b/spring-web-reactive/src/test/java/org/springframework/core/codec/support/ResourceEncoderTests.java new file mode 100644 index 00000000000..45421ccdcf1 --- /dev/null +++ b/spring-web-reactive/src/test/java/org/springframework/core/codec/support/ResourceEncoderTests.java @@ -0,0 +1,71 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.codec.support; + +import java.nio.charset.StandardCharsets; + +import org.junit.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.test.TestSubscriber; + +import org.springframework.core.ResolvableType; +import org.springframework.core.io.ByteArrayResource; +import org.springframework.core.io.InputStreamResource; +import org.springframework.core.io.Resource; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.http.MediaType; + +import static org.junit.Assert.assertTrue; + +/** + * @author Arjen Poutsma + */ +public class ResourceEncoderTests extends AbstractAllocatingTestCase { + + private final ResourceEncoder encoder = new ResourceEncoder(); + + @Test + public void canEncode() throws Exception { + assertTrue(encoder.canEncode(ResolvableType.forClass(InputStreamResource.class), + MediaType.TEXT_PLAIN)); + assertTrue(encoder.canEncode(ResolvableType.forClass(ByteArrayResource.class), + MediaType.TEXT_PLAIN)); + assertTrue(encoder.canEncode(ResolvableType.forClass(Resource.class), + MediaType.TEXT_PLAIN)); + assertTrue(encoder.canEncode(ResolvableType.forClass(InputStreamResource.class), + MediaType.APPLICATION_JSON)); + } + + @Test + public void encode() throws Exception { + String s = "foo"; + Resource resource = new ByteArrayResource(s.getBytes(StandardCharsets.UTF_8)); + + Mono source = Mono.just(resource); + + Flux output = + encoder.encode(source, allocator, ResolvableType.forClass(Resource.class), + null); + + TestSubscriber testSubscriber = new TestSubscriber<>(); + testSubscriber.bindTo(output).assertNoError().assertComplete() + .assertValues(stringBuffer(s)); + + } + +} \ No newline at end of file