@ -129,13 +129,22 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
@@ -129,13 +129,22 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
public Flux < Message > decode ( Publisher < DataBuffer > inputStream , ResolvableType elementType ,
@Nullable MimeType mimeType , @Nullable Map < String , Object > hints ) {
MessageDecoderFunction decoderFunction = new MessageDecoderFunction ( elementType , this . maxMessageSize ) ;
MessageDecoderFunction decoderFunction =
new MessageDecoderFunction ( elementType , this . maxMessageSize , initMessageSizeReader ( ) ) ;
return Flux . from ( inputStream )
. flatMapIterable ( decoderFunction )
. doOnTerminate ( decoderFunction : : discard ) ;
}
/ * *
* Return a reader for message size information encoded in the input stream .
* @since 7 . 0
* /
protected MessageSizeReader initMessageSizeReader ( ) {
return new DefaultMessageSizeReader ( ) ;
}
@Override
public Mono < Message > decodeToMono ( Publisher < DataBuffer > inputStream , ResolvableType elementType ,
@Nullable MimeType mimeType , @Nullable Map < String , Object > hints ) {
@ -150,9 +159,7 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
@@ -150,9 +159,7 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
try {
Message . Builder builder = getMessageBuilder ( targetType . toClass ( ) ) ;
ByteBuffer byteBuffer = ByteBuffer . allocate ( dataBuffer . readableByteCount ( ) ) ;
dataBuffer . toByteBuffer ( byteBuffer ) ;
builder . mergeFrom ( CodedInputStream . newInstance ( byteBuffer ) , this . extensionRegistry ) ;
merge ( dataBuffer , builder ) ;
return builder . build ( ) ;
}
catch ( IOException ex ) {
@ -166,6 +173,17 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
@@ -166,6 +173,17 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
}
}
/ * *
* Use merge methods on { @link Message . Builder } to read a single message
* from the given { @code DataBuffer } .
* @since 7 . 0
* /
protected void merge ( DataBuffer dataBuffer , Message . Builder builder ) throws IOException {
ByteBuffer byteBuffer = ByteBuffer . allocate ( dataBuffer . readableByteCount ( ) ) ;
dataBuffer . toByteBuffer ( byteBuffer ) ;
builder . mergeFrom ( CodedInputStream . newInstance ( byteBuffer ) , this . extensionRegistry ) ;
}
/ * *
* Create a new { @code Message . Builder } instance for the given class .
@ -196,15 +214,14 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
@@ -196,15 +214,14 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
private int messageBytesToRead ;
private int offset ;
private final MessageSizeReader messageSizeReader ;
public MessageDecoderFunction ( ResolvableType elementType , int maxMessageSize ) {
public MessageDecoderFunction ( ResolvableType elementType , int maxMessageSize , MessageSizeReader messageSizeReader ) {
this . elementType = elementType ;
this . maxMessageSize = maxMessageSize ;
this . messageSizeReader = messageSizeReader ;
}
@Override
public Iterable < ? extends Message > apply ( DataBuffer input ) {
try {
@ -214,9 +231,11 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
@@ -214,9 +231,11 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
do {
if ( this . output = = null ) {
if ( ! readMessageSize ( input ) ) {
Integer messageSize = this . messageSizeReader . readMessageSize ( input ) ;
if ( messageSize = = null ) {
return messages ;
}
this . messageBytesToRead = messageSize ;
if ( this . maxMessageSize > 0 & & this . messageBytesToRead > this . maxMessageSize ) {
throw new DataBufferLimitException (
"The number of bytes to read for message " +
@ -262,60 +281,89 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
@@ -262,60 +281,89 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
}
}
public void discard ( ) {
if ( this . output ! = null ) {
DataBufferUtils . release ( this . output ) ;
}
}
}
/ * *
* Component to read the size of a message . Implementations must be
* stateful and expect size information is potentially split
* across input chunks .
* @since 7 . 0
* /
protected interface MessageSizeReader {
/ * *
* Parse message size as a varint from the input stream , updating { @code messageBytesToRead } and
* { @code offset } fields if needed to allow processing of upcoming chunks .
* Inspired from { @link CodedInputStream # readRawVarint32 ( int , java . io . InputStream ) }
* @return { @code true } when the message size is parsed successfully , { @code false } when the message size is
* truncated
* @see < a href = "https://developers.google.com/protocol-buffers/docs/encoding#varints" > Base 128 Varints < / a >
* Read the message size from the given buffer . This method may be
* called multiple times before the message size is fully read .
* @return return the message size , or { @code null } if the data in the
* input buffer was insufficient
* /
private boolean readMessageSize ( DataBuffer input ) {
@Nullable Integer readMessageSize ( DataBuffer input ) ;
}
/ * *
* Default reader for Protobuf messages .
* < p > Parses the message size as a varint from the input stream .
* Inspired by { @link CodedInputStream # readRawVarint32 ( int , java . io . InputStream ) } ,
* @see < a href = "https://developers.google.com/protocol-buffers/docs/encoding#varints" > Base 128 Varints < / a >
* /
private static class DefaultMessageSizeReader implements MessageSizeReader {
private int offset ;
private int messageSize ;
@Override
public @Nullable Integer readMessageSize ( DataBuffer input ) {
if ( this . offset = = 0 ) {
if ( input . readableByteCount ( ) = = 0 ) {
return false ;
return null ;
}
int firstByte = input . read ( ) ;
if ( ( firstByte & 0x80 ) = = 0 ) {
this . messageBytesToRead = firstByte ;
return true ;
this . messageSize = firstByte ;
return getAndReset ( ) ;
}
this . messageBytesToRead = firstByte & 0x7f ;
this . messageSize = firstByte & 0x7f ;
this . offset = 7 ;
}
if ( this . offset < 32 ) {
for ( ; this . offset < 32 ; this . offset + = 7 ) {
if ( input . readableByteCount ( ) = = 0 ) {
return false ;
return null ;
}
final int b = input . read ( ) ;
this . messageBytesToRead | = ( b & 0x7f ) < < this . offset ;
this . messageSize | = ( b & 0x7f ) < < this . offset ;
if ( ( b & 0x80 ) = = 0 ) {
this . offset = 0 ;
return true ;
return getAndReset ( ) ;
}
}
}
// Keep reading up to 64 bits.
for ( ; this . offset < 64 ; this . offset + = 7 ) {
if ( input . readableByteCount ( ) = = 0 ) {
return false ;
return null ;
}
final int b = input . read ( ) ;
if ( ( b & 0x80 ) = = 0 ) {
this . offset = 0 ;
return true ;
return getAndReset ( ) ;
}
}
this . offset = 0 ;
getAndReset ( ) ;
throw new DecodingException ( "Cannot parse message size: malformed varint" ) ;
}
public void discard ( ) {
if ( this . output ! = null ) {
DataBufferUtils . release ( this . output ) ;
}
private @Nullable Integer getAndReset ( ) {
Integer result = ( this . messageSize ! = 0 ? this . messageSize : null ) ;
this . offset = 0 ;
this . messageSize = 0 ;
return result ;
}
}