From 545c4effb11d71aaf7ac07fe1433cd3ab7e0eb2a Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Mon, 24 Mar 2014 13:52:37 -0400 Subject: [PATCH] Polish StompDecoder and the new Buffering sub-class Issue: SPR-11527 --- .../simp/stomp/BufferingStompDecoder.java | 56 ++++++++++++++----- .../messaging/simp/stomp/StompDecoder.java | 48 ++++++++++------ .../messaging/StompSubProtocolHandler.java | 14 ++++- 3 files changed, 85 insertions(+), 33 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BufferingStompDecoder.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BufferingStompDecoder.java index 25bc9e2c350..6d61c0b40e1 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BufferingStompDecoder.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BufferingStompDecoder.java @@ -23,7 +23,6 @@ import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Queue; @@ -32,13 +31,17 @@ import java.util.concurrent.LinkedBlockingQueue; /** * An extension of {@link org.springframework.messaging.simp.stomp.StompDecoder} - * that chunks any bytes remaining after a single full STOMP frame has been read. - * The remaining bytes may contain more STOMP frames or an incomplete STOMP frame. + * that buffers content remaining in the input ByteBuffer after the parent + * class has read all (complete) STOMP frames from it. The remaining content + * represents an incomplete STOMP frame. When called repeatedly with additional + * data, the decode method returns one or more messages or, if there is not + * enough data still, continues to buffer. * - *

Similarly if there is not enough content for a full STOMP frame, the content - * is buffered until more input is received. That means the - * {@link #decode(java.nio.ByteBuffer)} effectively never returns {@code null} as - * the parent class does. + *

A single instance of this decoder can be invoked repeatedly to read all + * messages from a single stream (e.g. WebSocket session) as long as decoding + * does not fail. If there is an exception, StompDecoder instance should not + * be used any more as its internal state is not guaranteed to be consistent. + * It is expected that the underlying session is closed at that point. * * @author Rossen Stoyanchev * @since 4.0.3 @@ -58,10 +61,16 @@ public class BufferingStompDecoder extends StompDecoder { } + /** + * Return the configured buffer size limit. + */ public int getBufferSizeLimit() { return this.bufferSizeLimit; } + /** + * Calculate the current buffer size. + */ public int getBufferSize() { int size = 0; for (ByteBuffer buffer : this.chunks) { @@ -70,15 +79,36 @@ public class BufferingStompDecoder extends StompDecoder { return size; } + /** + * Get the expected content length of the currently buffered, incomplete STOMP frame. + */ public Integer getExpectedContentLength() { return this.expectedContentLength; } + /** + * Decodes one or more STOMP frames from the given {@code ByteBuffer} into a + * list of {@link Message}s. + * + *

If there was enough data to parse a "content-length" header, then the + * value is used to determine how much more data is needed before a new + * attempt to decode is made. + * + *

If there was not enough data to parse the "content-length", or if there + * is "content-length" header, every subsequent call to decode attempts to + * parse again with all available data. Therefore the presence of a "content-length" + * header helps to optimize the decoding of large messages. + * + * @param newBuffer a buffer containing new data to decode + * + * @return decoded messages or an empty list + * @throws StompConversionException raised in case of decoding issues + */ @Override - public List> decode(ByteBuffer newData) { + public List> decode(ByteBuffer newBuffer) { - this.chunks.add(newData); + this.chunks.add(newBuffer); checkBufferLimits(); @@ -86,13 +116,13 @@ public class BufferingStompDecoder extends StompDecoder { return Collections.>emptyList(); } - ByteBuffer buffer = assembleChunksAndReset(); + ByteBuffer bufferToDecode = assembleChunksAndReset(); MultiValueMap headers = new LinkedMultiValueMap(); - List> messages = decode(buffer, headers); + List> messages = decode(bufferToDecode, headers); - if (buffer.hasRemaining()) { - this.chunks.add(buffer); + if (bufferToDecode.hasRemaining()) { + this.chunks.add(bufferToDecode); this.expectedContentLength = getContentLength(headers); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompDecoder.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompDecoder.java index 10e7df65823..2edd0202963 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompDecoder.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompDecoder.java @@ -28,14 +28,18 @@ import org.apache.commons.logging.LogFactory; import org.springframework.messaging.Message; import org.springframework.messaging.simp.SimpMessageType; import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.Assert; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; /** - * Decodes one or more STOMP frames from a {@link ByteBuffer}. If the buffer - * contains any additional (incomplete) data, or perhaps not enough data to - * form even one Message, the the buffer is reset and the value returned is - * an empty list indicating that no more message can be read. + * Decodes one or more STOMP frames contained in a {@link ByteBuffer}. + * + *

An attempt is made to read all complete STOMP frames from the buffer, which + * could be zero, one, or more. If there is any left-over content, i.e. an incomplete + * STOMP frame, at the end the buffer is reset to point to the beginning of the + * partial content. The caller is then responsible for dealing with that + * incomplete content by buffering until there is more input available. * * @author Andy Wilkinson * @author Rossen Stoyanchev @@ -52,10 +56,8 @@ public class StompDecoder { /** - * Decodes one or more STOMP frames from the given {@code buffer} into a - * list of {@link Message}s. - * - *

If the given ByteBuffer contains partial STOMP frame content, or additional + * Decodes one or more STOMP frames from the given {@code ByteBuffer} into a + * list of {@link Message}s. If the input buffer contains any incplcontains partial STOMP frame content, or additional * content with a partial STOMP frame, the buffer is reset and {@code null} is * returned. * @@ -68,27 +70,37 @@ public class StompDecoder { } /** - * Decodes one or more STOMP frames from the given {@code buffer} into a - * list of {@link Message}s. + * Decodes one or more STOMP frames from the given {@code buffer} and returns + * a list of {@link Message}s. * - *

If the given ByteBuffer contains partial STOMP frame content, or additional - * content with a partial STOMP frame, the buffer is reset and {@code null} is - * returned. + *

If the given ByteBuffer contains only partial STOMP frame content and no + * complete STOMP frames, an empty list is returned, and the buffer is reset to + * to where it was. + * + *

If the buffer contains one ore more STOMP frames, those are returned and + * the buffer reset to point to the beginning of the unused partial content. + * + *

The input headers map is used to store successfully parsed headers and + * is cleared after ever successfully read message. So when partial content is + * read the caller can check if a "content-length" header was read, which helps + * to determine how much more content is needed before the next STOMP frame + * can be decoded. * * @param buffer The buffer to decode the STOMP frame from - * @param headers an empty map that will be filled with the successfully parsed - * headers of the last decoded message, or the last attempt at decoding an - * (incomplete) STOMP frame. This can be useful for detecting 'content-length'. + * @param headers an empty map that will contain successfully parsed headers + * in cases where the partial buffer ended with a partial STOMP frame * - * @return the decoded messages or an empty list + * @return decoded messages or an empty list + * @throws StompConversionException raised in case of decoding issues */ public List> decode(ByteBuffer buffer, MultiValueMap headers) { + Assert.notNull(headers, "headers is required"); List> messages = new ArrayList>(); while (buffer.hasRemaining()) { - headers.clear(); Message m = decodeMessage(buffer, headers); if (m != null) { messages.add(m); + headers.clear(); } else { break; diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java index 7b44bda20b0..97275eeb62f 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java @@ -79,7 +79,16 @@ public class StompSubProtocolHandler implements SubProtocolHandler { /** - * Set the message buffer size limit in bytes. + * Configure the maximum size of the buffer used when a STOMP message has been + * split over multiple WebSocket messages. + * + *

While the STOMP spec version 1.2 (current as of 4.0.3) does not discuss + * STOMP over WebSocket explicitly, a number of clients already split messages + * around 16K boundaries. Therefore partial content must be buffered before a + * full message can be assembled. + * + *

By default this property is set to 64K. + * * @since 4.0.3 */ public void setMessageBufferSizeLimit(int messageBufferSizeLimit) { @@ -87,7 +96,8 @@ public class StompSubProtocolHandler implements SubProtocolHandler { } /** - * Get the message buffer size limit in bytes. + * Get the configured message buffer size limit in bytes. + * * @since 4.0.3 */ public int getMessageBufferSizeLimit() {