From 3c486c02abf3896e036e8f03fa0a0addd95eeebd Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Wed, 20 Apr 2016 13:53:36 +0200 Subject: [PATCH] Reactive HttpMessageConverter This commit introduces a reactive version of the HttpMessageConverter. During the implementation of zero-copy support, it became apparent that it was ueful to have a common abstraction between client and server that operated on HttpMessages rather than DataBuffers. Two HttpMessageConverter implementations are provided: - The CodecHttpMessageConverter, based on Encoder/Decoder. - The ResourceHttpMessageConverter, using zero-copy if available. --- .../reactive/CodecHttpMessageConverter.java | 102 ++++++++ .../reactive/HttpMessageConverter.java | 91 +++++++ .../ResourceHttpMessageConverter.java | 231 ++++++++++++++++++ 3 files changed, 424 insertions(+) create mode 100644 spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/CodecHttpMessageConverter.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/HttpMessageConverter.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/ResourceHttpMessageConverter.java diff --git a/spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/CodecHttpMessageConverter.java b/spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/CodecHttpMessageConverter.java new file mode 100644 index 00000000000..b55ad9c4cb2 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/CodecHttpMessageConverter.java @@ -0,0 +1,102 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.converter.reactive; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.core.ResolvableType; +import org.springframework.core.codec.Decoder; +import org.springframework.core.codec.Encoder; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferAllocator; +import org.springframework.http.MediaType; +import org.springframework.http.ReactiveHttpInputMessage; +import org.springframework.http.ReactiveHttpOutputMessage; +import org.springframework.http.support.MediaTypeUtils; + +/** + * @author Arjen Poutsma + */ +public class CodecHttpMessageConverter implements HttpMessageConverter { + + private final Encoder encoder; + + private final Decoder decoder; + + public CodecHttpMessageConverter(Encoder encoder, Decoder decoder) { + this.encoder = encoder; + this.decoder = decoder; + } + + @Override + public boolean canRead(ResolvableType type, MediaType mediaType) { + return this.decoder != null && this.decoder.canDecode(type, mediaType); + } + + @Override + public boolean canWrite(ResolvableType type, MediaType mediaType) { + return this.encoder != null && this.encoder.canEncode(type, mediaType); + } + + @Override + public List getReadableMediaTypes() { + return this.decoder != null ? this.decoder.getSupportedMimeTypes().stream(). + map(MediaTypeUtils::toMediaType). + collect(Collectors.toList()) : Collections.emptyList(); + } + + @Override + public List getWritableMediaTypes() { + return this.encoder != null ? this.encoder.getSupportedMimeTypes().stream(). + map(MediaTypeUtils::toMediaType). + collect(Collectors.toList()) : Collections.emptyList(); + } + + @Override + public Flux read(ResolvableType type, ReactiveHttpInputMessage inputMessage) { + if (this.decoder == null) { + return Flux.error(new IllegalStateException("No decoder set")); + } + MediaType contentType = inputMessage.getHeaders().getContentType(); + if (contentType == null) { + contentType = MediaType.APPLICATION_OCTET_STREAM; + } + + Flux body = inputMessage.getBody(); + + return this.decoder.decode(body, type, contentType); + } + + @Override + public Mono write(Publisher inputStream, ResolvableType type, + MediaType contentType, + ReactiveHttpOutputMessage outputMessage) { + if (this.encoder == null) { + return Mono.error(new IllegalStateException("No decoder set")); + } + outputMessage.getHeaders().setContentType(contentType); + DataBufferAllocator allocator = outputMessage.allocator(); + Flux body = encoder.encode(inputStream, allocator, type, contentType); + return outputMessage.setBody(body); + } +} diff --git a/spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/HttpMessageConverter.java b/spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/HttpMessageConverter.java new file mode 100644 index 00000000000..ce001c5d086 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/HttpMessageConverter.java @@ -0,0 +1,91 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.converter.reactive; + +import java.util.List; + +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.core.ResolvableType; +import org.springframework.http.MediaType; +import org.springframework.http.ReactiveHttpInputMessage; +import org.springframework.http.ReactiveHttpOutputMessage; + +/** + * Strategy interface that specifies a converter that can convert from and to HTTP + * requests and responses. + * @author Arjen Poutsma + */ +public interface HttpMessageConverter { + + /** + * Indicates whether the given class can be read by this converter. + * @param type the type to test for readability + * @param mediaType the media type to read, can be {@code null} if not specified. + * Typically the value of a {@code Content-Type} header. + * @return {@code true} if readable; {@code false} otherwise + */ + boolean canRead(ResolvableType type, MediaType mediaType); + + /** + * Return the list of {@link MediaType} objects that can be read by this converter. + * @return the list of supported readable media types + */ + List getReadableMediaTypes(); + + /** + * Read an object of the given type form the given input message, and returns it. + * @param type the type of object to return. This type must have previously been + * passed to the + * {@link #canRead canRead} method of this interface, which must have returned {@code + * true}. + * @param inputMessage the HTTP input message to read from + * @return the converted object + */ + Flux read(ResolvableType type, ReactiveHttpInputMessage inputMessage); + + /** + * Indicates whether the given class can be written by this converter. + * @param type the class to test for writability + * @param mediaType the media type to write, can be {@code null} if not specified. + * Typically the value of an {@code Accept} header. + * @return {@code true} if writable; {@code false} otherwise + */ + boolean canWrite(ResolvableType type, MediaType mediaType); + + /** + * Return the list of {@link MediaType} objects that can be written by this + * converter. + * @return the list of supported readable media types + */ + List getWritableMediaTypes(); + + /** + * Write an given object to the given output message. + * @param inputStream the input stream to write + * @param type the stream element type to process. + * @param contentType the content type to use when writing. May be {@code null} to + * indicate that the default content type of the converter must be used. + * @param outputMessage the message to write to + * @return + */ + Mono write(Publisher inputStream, + ResolvableType type, MediaType contentType, + ReactiveHttpOutputMessage outputMessage); +} \ No newline at end of file diff --git a/spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/ResourceHttpMessageConverter.java b/spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/ResourceHttpMessageConverter.java new file mode 100644 index 00000000000..5091fc6fd2a --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/ResourceHttpMessageConverter.java @@ -0,0 +1,231 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.converter.reactive; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.core.ResolvableType; +import org.springframework.core.io.ByteArrayResource; +import org.springframework.core.io.DescriptiveResource; +import org.springframework.core.io.InputStreamResource; +import org.springframework.core.io.Resource; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.support.DataBufferUtils; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpRangeResource; +import org.springframework.http.MediaType; +import org.springframework.http.ReactiveHttpInputMessage; +import org.springframework.http.ReactiveHttpOutputMessage; +import org.springframework.http.ZeroCopyHttpOutputMessage; +import org.springframework.http.support.MediaTypeUtils; +import org.springframework.util.MimeTypeUtils2; +import org.springframework.util.ResourceUtils; +import org.springframework.util.StreamUtils; + +/** + * @author Arjen Poutsma + */ +public class ResourceHttpMessageConverter implements HttpMessageConverter { + + private static final int BUFFER_SIZE = StreamUtils.BUFFER_SIZE; + + private static final List SUPPORTED_MEDIA_TYPES = + Collections.singletonList(MediaType.ALL); + + @Override + public boolean canRead(ResolvableType type, MediaType mediaType) { + return Resource.class.isAssignableFrom(type.getRawClass()); + } + + @Override + public boolean canWrite(ResolvableType type, MediaType mediaType) { + return Resource.class.isAssignableFrom(type.getRawClass()); + } + + @Override + public List getReadableMediaTypes() { + return SUPPORTED_MEDIA_TYPES; + } + + @Override + public List getWritableMediaTypes() { + return SUPPORTED_MEDIA_TYPES; + } + + @Override + public Flux read(ResolvableType type, + ReactiveHttpInputMessage inputMessage) { + Class clazz = type.getRawClass(); + + Flux body = inputMessage.getBody(); + + if (InputStreamResource.class.equals(clazz)) { + InputStream is = DataBufferUtils.toInputStream(body); + return Flux.just(new InputStreamResource(is)); + } + else if (clazz.isAssignableFrom(ByteArrayResource.class)) { + Mono singleBuffer = body.reduce(DataBuffer::write); + return Flux.from(singleBuffer.map(buffer -> { + byte[] bytes = new byte[buffer.readableByteCount()]; + buffer.read(bytes); + return new ByteArrayResource(bytes); + })); + } + else { + return Flux.error(new IllegalStateException( + "Unsupported resource class: " + clazz)); + } + } + + @Override + public Mono write(Publisher inputStream, + ResolvableType type, MediaType contentType, + ReactiveHttpOutputMessage outputMessage) { + + if (inputStream instanceof Mono) { + // single resource + return Mono.from(Flux.from(inputStream). + flatMap(resource -> { + HttpHeaders headers = outputMessage.getHeaders(); + addHeaders(headers, resource, contentType); + + if (resource instanceof HttpRangeResource) { + return writePartialContent((HttpRangeResource) resource, + outputMessage); + } + else { + return writeContent(resource, outputMessage, 0, -1); + } + + + })); + } + else { + // multiple resources, not supported! + return Mono.error(new IllegalArgumentException( + "Multiple resources not yet supported")); + } + } + + protected void addHeaders(HttpHeaders headers, Resource resource, + MediaType contentType) { + if (headers.getContentType() == null) { + if (contentType == null || + !contentType.isConcrete() || + MediaType.APPLICATION_OCTET_STREAM.equals(contentType)) { + contentType = MimeTypeUtils2.getMimeType(resource.getFilename()). + map(MediaTypeUtils::toMediaType). + orElse(MediaType.APPLICATION_OCTET_STREAM); + } + headers.setContentType(contentType); + } + if (headers.getContentLength() < 0) { + contentLength(resource).ifPresent(headers::setContentLength); + } + headers.add(HttpHeaders.ACCEPT_RANGES, "bytes"); + } + + private Mono writeContent(Resource resource, + ReactiveHttpOutputMessage outputMessage, long position, long count) { + if (outputMessage instanceof ZeroCopyHttpOutputMessage) { + Optional file = getFile(resource); + if (file.isPresent()) { + ZeroCopyHttpOutputMessage zeroCopyResponse = + (ZeroCopyHttpOutputMessage) outputMessage; + + if (count < 0) { + count = file.get().length(); + } + + return zeroCopyResponse.setBody(file.get(), position, count); + } + } + + // non-zero copy fallback + try { + InputStream is = resource.getInputStream(); + long skipped = is.skip(position); + if (skipped < position) { + return Mono.error(new IOException( + "Skipped only " + skipped + " bytes out of " + count + + " required.")); + } + + Flux responseBody = + DataBufferUtils.read(is, outputMessage.allocator(), BUFFER_SIZE); + if (count > 0) { + responseBody = DataBufferUtils.takeUntilByteCount(responseBody, count); + } + + return outputMessage.setBody(responseBody); + } + catch (IOException ex) { + return Mono.error(ex); + } + } + + protected Mono writePartialContent(HttpRangeResource resource, + ReactiveHttpOutputMessage outputMessage) { + + // TODO: implement + + return Mono.empty(); + } + + private static Optional contentLength(Resource resource) { + // Don't try to determine contentLength on InputStreamResource - cannot be read afterwards... + // Note: custom InputStreamResource subclasses could provide a pre-calculated content length! + if (InputStreamResource.class != resource.getClass()) { + try { + return Optional.of(resource.contentLength()); + } + catch (IOException ignored) { + } + } + return Optional.empty(); + } + + private static Optional getFile(Resource resource) { + // TODO: introduce Resource.hasFile() property to bypass the potential IOException thrown in Resource.getFile() + // the following Resource implementations do not support getURI/getFile + if (!(resource instanceof ByteArrayResource || + resource instanceof DescriptiveResource || + resource instanceof InputStreamResource)) { + try { + URI resourceUri = resource.getURI(); + if (ResourceUtils.URL_PROTOCOL_FILE.equals(resourceUri.getScheme())) { + return Optional.of(ResourceUtils.getFile(resourceUri)); + } + } + catch (IOException ignored) { + } + } + return Optional.empty(); + } + + +}