diff --git a/spring-core/src/main/java/org/springframework/core/codec/ByteArrayEncoder.java b/spring-core/src/main/java/org/springframework/core/codec/ByteArrayEncoder.java index 6b538dbf7e0..6eef1a1f771 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/ByteArrayEncoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/ByteArrayEncoder.java @@ -52,15 +52,21 @@ public class ByteArrayEncoder extends AbstractEncoder { DataBufferFactory bufferFactory, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { - // The following (byte[] bytes) lambda signature declaration is necessary for Eclipse. - return Flux.from(inputStream).map((byte[] bytes) -> { - DataBuffer dataBuffer = bufferFactory.wrap(bytes); - if (logger.isDebugEnabled() && !Hints.isLoggingSuppressed(hints)) { - String logPrefix = Hints.getLogPrefix(hints); - logger.debug(logPrefix + "Writing " + dataBuffer.readableByteCount() + " bytes"); - } - return dataBuffer; - }); + // Use (byte[] bytes) for Eclipse + return Flux.from(inputStream).map((byte[] bytes) -> + encodeValue(bytes, bufferFactory, elementType, mimeType, hints)); + } + + @Override + public DataBuffer encodeValue(byte[] bytes, DataBufferFactory bufferFactory, + ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map hints) { + + DataBuffer dataBuffer = bufferFactory.wrap(bytes); + if (logger.isDebugEnabled() && !Hints.isLoggingSuppressed(hints)) { + String logPrefix = Hints.getLogPrefix(hints); + logger.debug(logPrefix + "Writing " + dataBuffer.readableByteCount() + " bytes"); + } + return dataBuffer; } } diff --git a/spring-core/src/main/java/org/springframework/core/codec/ByteBufferEncoder.java b/spring-core/src/main/java/org/springframework/core/codec/ByteBufferEncoder.java index 1f394302b81..8d600661526 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/ByteBufferEncoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/ByteBufferEncoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,14 +53,20 @@ public class ByteBufferEncoder extends AbstractEncoder { DataBufferFactory bufferFactory, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { - return Flux.from(inputStream).map(byteBuffer -> { - DataBuffer dataBuffer = bufferFactory.wrap(byteBuffer); - if (logger.isDebugEnabled() && !Hints.isLoggingSuppressed(hints)) { - String logPrefix = Hints.getLogPrefix(hints); - logger.debug(logPrefix + "Writing " + dataBuffer.readableByteCount() + " bytes"); - } - return dataBuffer; - }); + return Flux.from(inputStream).map(byteBuffer -> + encodeValue(byteBuffer, bufferFactory, elementType, mimeType, hints)); + } + + @Override + public DataBuffer encodeValue(ByteBuffer byteBuffer, DataBufferFactory bufferFactory, + ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map hints) { + + DataBuffer dataBuffer = bufferFactory.wrap(byteBuffer); + if (logger.isDebugEnabled() && !Hints.isLoggingSuppressed(hints)) { + String logPrefix = Hints.getLogPrefix(hints); + logger.debug(logPrefix + "Writing " + dataBuffer.readableByteCount() + " bytes"); + } + return dataBuffer; } } diff --git a/spring-core/src/main/java/org/springframework/core/codec/CharSequenceEncoder.java b/spring-core/src/main/java/org/springframework/core/codec/CharSequenceEncoder.java index 10088ebb4cb..267a68421f5 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/CharSequenceEncoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/CharSequenceEncoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -71,32 +71,37 @@ public final class CharSequenceEncoder extends AbstractEncoder { DataBufferFactory bufferFactory, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { - Charset charset = getCharset(mimeType); + return Flux.from(inputStream).map(charSequence -> + encodeValue(charSequence, bufferFactory, elementType, mimeType, hints)); + } - return Flux.from(inputStream).map(charSequence -> { - if (!Hints.isLoggingSuppressed(hints)) { - LogFormatUtils.traceDebug(logger, traceOn -> { - String formatted = LogFormatUtils.formatValue(charSequence, !traceOn); - return Hints.getLogPrefix(hints) + "Writing " + formatted; - }); - } - boolean release = true; - int capacity = calculateCapacity(charSequence, charset); - DataBuffer dataBuffer = bufferFactory.allocateBuffer(capacity); - try { - dataBuffer.write(charSequence, charset); - release = false; - } - catch (CoderMalfunctionError ex) { - throw new EncodingException("String encoding error: " + ex.getMessage(), ex); - } - finally { - if (release) { - DataBufferUtils.release(dataBuffer); - } + @Override + public DataBuffer encodeValue(CharSequence charSequence, DataBufferFactory bufferFactory, + ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map hints) { + + if (!Hints.isLoggingSuppressed(hints)) { + LogFormatUtils.traceDebug(logger, traceOn -> { + String formatted = LogFormatUtils.formatValue(charSequence, !traceOn); + return Hints.getLogPrefix(hints) + "Writing " + formatted; + }); + } + boolean release = true; + Charset charset = getCharset(mimeType); + int capacity = calculateCapacity(charSequence, charset); + DataBuffer dataBuffer = bufferFactory.allocateBuffer(capacity); + try { + dataBuffer.write(charSequence, charset); + release = false; + } + catch (CoderMalfunctionError ex) { + throw new EncodingException("String encoding error: " + ex.getMessage(), ex); + } + finally { + if (release) { + DataBufferUtils.release(dataBuffer); } - return dataBuffer; - }); + } + return dataBuffer; } int calculateCapacity(CharSequence sequence, Charset charset) { diff --git a/spring-core/src/main/java/org/springframework/core/codec/DataBufferEncoder.java b/spring-core/src/main/java/org/springframework/core/codec/DataBufferEncoder.java index 3a7c853afa4..88e8f1cc2a9 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/DataBufferEncoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/DataBufferEncoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,15 +53,25 @@ public class DataBufferEncoder extends AbstractEncoder { @Nullable Map hints) { Flux flux = Flux.from(inputStream); + if (logger.isDebugEnabled() && !Hints.isLoggingSuppressed(hints)) { + flux = flux.doOnNext(buffer -> logValue(buffer, hints)); + } + return flux; + } + + @Override + public DataBuffer encodeValue(DataBuffer buffer, DataBufferFactory bufferFactory, + ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map hints) { if (logger.isDebugEnabled() && !Hints.isLoggingSuppressed(hints)) { - flux = flux.doOnNext(buffer -> { - String logPrefix = Hints.getLogPrefix(hints); - logger.debug(logPrefix + "Writing " + buffer.readableByteCount() + " bytes"); - }); + logValue(buffer, hints); } + return buffer; + } - return flux; + private void logValue(DataBuffer buffer, @Nullable Map hints) { + String logPrefix = Hints.getLogPrefix(hints); + logger.debug(logPrefix + "Writing " + buffer.readableByteCount() + " bytes"); } } diff --git a/spring-core/src/main/java/org/springframework/core/codec/Encoder.java b/spring-core/src/main/java/org/springframework/core/codec/Encoder.java index 7b1ee00190b..00f42009ae0 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/Encoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/Encoder.java @@ -60,13 +60,35 @@ public interface Encoder { * @param elementType the expected type of elements in the input stream; * this type must have been previously passed to the {@link #canEncode} * method and it must have returned {@code true}. - * @param mimeType the MIME type for the output stream (optional) - * @param hints additional information about how to do encode + * @param mimeType the MIME type for the output content (optional) + * @param hints additional information about how to encode * @return the output stream */ Flux encode(Publisher inputStream, DataBufferFactory bufferFactory, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints); + /** + * Encode an Object of type T to a data buffer. This is useful for scenarios + * that produce a stream of discrete messages (or events) and the + * content for each is encoded individually. + *

By default this method raises {@link UnsupportedOperationException} + * and it is expected that some encoders cannot produce a single buffer or + * cannot do so synchronously (e.g. encoding a {@code Resource}). + * @param value the value to be encoded + * @param bufferFactory for creating the output {@code DataBuffer} + * @param valueType the type for the value being encoded + * @param mimeType the MIME type for the output content (optional) + * @param hints additional information about how to encode + * @return the encoded content + * @since 5.2 + */ + default DataBuffer encodeValue(T value, DataBufferFactory bufferFactory, + ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map hints) { + + // It may not be possible to produce a single DataBuffer synchronously + throw new UnsupportedOperationException(); + } + /** * Return the list of mime types this encoder supports. */ diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index b12f9434b2d..96dd72abcd1 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -441,6 +441,10 @@ public abstract class DataBufferUtils { public static Mono join(Publisher dataBuffers) { Assert.notNull(dataBuffers, "'dataBuffers' must not be null"); + if (dataBuffers instanceof Mono) { + return (Mono) dataBuffers; + } + return Flux.from(dataBuffers) .collectList() .filter(list -> !list.isEmpty()) diff --git a/spring-web/src/main/java/org/springframework/http/codec/json/AbstractJackson2Encoder.java b/spring-web/src/main/java/org/springframework/http/codec/json/AbstractJackson2Encoder.java index 652ecb5d289..f9f4d73eda5 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/json/AbstractJackson2Encoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/json/AbstractJackson2Encoder.java @@ -119,7 +119,7 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple if (inputStream instanceof Mono) { return Mono.from(inputStream).map(value -> - encodeValue(value, mimeType, bufferFactory, elementType, hints, encoding)).flux(); + encodeValue(value, bufferFactory, elementType, mimeType, hints, encoding)).flux(); } else { return this.streamingMediaTypes.stream() @@ -129,7 +129,7 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple byte[] separator = STREAM_SEPARATORS.getOrDefault(mediaType, NEWLINE_SEPARATOR); return Flux.from(inputStream).map(value -> { DataBuffer buffer = encodeValue( - value, mimeType, bufferFactory, elementType, hints, encoding); + value, bufferFactory, elementType, mimeType, hints, encoding); if (separator != null) { buffer.write(separator); } @@ -139,13 +139,20 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple .orElseGet(() -> { ResolvableType listType = ResolvableType.forClassWithGenerics(List.class, elementType); return Flux.from(inputStream).collectList().map(list -> - encodeValue(list, mimeType, bufferFactory, listType, hints, encoding)).flux(); + encodeValue(list, bufferFactory, listType, mimeType, hints, encoding)).flux(); }); } } - private DataBuffer encodeValue(Object value, @Nullable MimeType mimeType, DataBufferFactory bufferFactory, - ResolvableType elementType, @Nullable Map hints, JsonEncoding encoding) { + @Override + public DataBuffer encodeValue(Object value, DataBufferFactory bufferFactory, + ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map hints) { + + return encodeValue(value, bufferFactory, valueType, mimeType, hints, getJsonEncoding(mimeType)); + } + + private DataBuffer encodeValue(Object value, DataBufferFactory bufferFactory, ResolvableType valueType, + @Nullable MimeType mimeType, @Nullable Map hints, JsonEncoding encoding) { if (!Hints.isLoggingSuppressed(hints)) { LogFormatUtils.traceDebug(logger, traceOn -> { @@ -154,7 +161,7 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple }); } - JavaType javaType = getJavaType(elementType.getType(), null); + JavaType javaType = getJavaType(valueType.getType(), null); Class jsonView = (hints != null ? (Class) hints.get(Jackson2CodecSupport.JSON_VIEW_HINT) : null); ObjectWriter writer = (jsonView != null ? getObjectMapper().writerWithView(jsonView) : getObjectMapper().writer()); @@ -163,7 +170,7 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple writer = writer.forType(javaType); } - writer = customizeWriter(writer, mimeType, elementType, hints); + writer = customizeWriter(writer, mimeType, valueType, hints); DataBuffer buffer = bufferFactory.allocateBuffer(); boolean release = true; diff --git a/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufEncoder.java b/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufEncoder.java index 3be1ac47744..3d7b3fe0320 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufEncoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufEncoder.java @@ -73,29 +73,39 @@ public class ProtobufEncoder extends ProtobufCodecSupport implements HttpMessage public Flux encode(Publisher inputStream, DataBufferFactory bufferFactory, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { - return Flux.from(inputStream) - .map(message -> { - DataBuffer buffer = bufferFactory.allocateBuffer(); - boolean release = true; - try { - if (!(inputStream instanceof Mono)) { - message.writeDelimitedTo(buffer.asOutputStream()); - } - else { - message.writeTo(buffer.asOutputStream()); - } - release = false; - return buffer; - } - catch (IOException ex) { - throw new IllegalStateException("Unexpected I/O error while writing to data buffer", ex); - } - finally { - if (release) { - DataBufferUtils.release(buffer); - } - } - }); + return Flux.from(inputStream).map(message -> + encodeValue(message, bufferFactory, !(inputStream instanceof Mono))); + } + + @Override + public DataBuffer encodeValue(Message message, DataBufferFactory bufferFactory, + ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map hints) { + + return encodeValue(message, bufferFactory, false); + } + + private DataBuffer encodeValue(Message message, DataBufferFactory bufferFactory, boolean delimited) { + + DataBuffer buffer = bufferFactory.allocateBuffer(); + boolean release = true; + try { + if (delimited) { + message.writeDelimitedTo(buffer.asOutputStream()); + } + else { + message.writeTo(buffer.asOutputStream()); + } + release = false; + return buffer; + } + catch (IOException ex) { + throw new IllegalStateException("Unexpected I/O error while writing to data buffer", ex); + } + finally { + if (release) { + DataBufferUtils.release(buffer); + } + } } @Override diff --git a/spring-web/src/main/java/org/springframework/http/codec/xml/Jaxb2XmlEncoder.java b/spring-web/src/main/java/org/springframework/http/codec/xml/Jaxb2XmlEncoder.java index 59a11970add..9a8ee7890f5 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/xml/Jaxb2XmlEncoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/xml/Jaxb2XmlEncoder.java @@ -99,7 +99,15 @@ public class Jaxb2XmlEncoder extends AbstractSingleValueEncoder { @Override protected Flux encode(Object value, DataBufferFactory bufferFactory, - ResolvableType type, @Nullable MimeType mimeType, @Nullable Map hints) { + ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map hints) { + + // we're relying on doOnDiscard in base class + return Mono.fromCallable(() -> encodeValue(value, bufferFactory, valueType, mimeType, hints)).flux(); + } + + @Override + public DataBuffer encodeValue(Object value, DataBufferFactory bufferFactory, + ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map hints) { if (!Hints.isLoggingSuppressed(hints)) { LogFormatUtils.traceDebug(logger, traceOn -> { @@ -108,30 +116,27 @@ public class Jaxb2XmlEncoder extends AbstractSingleValueEncoder { }); } - return Flux.defer(() -> { - boolean release = true; - DataBuffer buffer = bufferFactory.allocateBuffer(1024); - try { - OutputStream outputStream = buffer.asOutputStream(); - Class clazz = ClassUtils.getUserClass(value); - Marshaller marshaller = initMarshaller(clazz); - marshaller.marshal(value, outputStream); - release = false; - return Mono.fromCallable(() -> buffer); // relying on doOnDiscard in base class - } - catch (MarshalException ex) { - return Flux.error(new EncodingException( - "Could not marshal " + value.getClass() + " to XML", ex)); - } - catch (JAXBException ex) { - return Flux.error(new CodecException("Invalid JAXB configuration", ex)); - } - finally { - if (release) { - DataBufferUtils.release(buffer); - } + boolean release = true; + DataBuffer buffer = bufferFactory.allocateBuffer(1024); + try { + OutputStream outputStream = buffer.asOutputStream(); + Class clazz = ClassUtils.getUserClass(value); + Marshaller marshaller = initMarshaller(clazz); + marshaller.marshal(value, outputStream); + release = false; + return buffer; + } + catch (MarshalException ex) { + throw new EncodingException("Could not marshal " + value.getClass() + " to XML", ex); + } + catch (JAXBException ex) { + throw new CodecException("Invalid JAXB configuration", ex); + } + finally { + if (release) { + DataBufferUtils.release(buffer); } - }); + } } private Marshaller initMarshaller(Class clazz) throws JAXBException {