|
|
|
|
@ -16,7 +16,6 @@
@@ -16,7 +16,6 @@
|
|
|
|
|
|
|
|
|
|
package org.springframework.messaging.simp.stomp; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import java.nio.ByteBuffer; |
|
|
|
|
import java.util.Collections; |
|
|
|
|
import java.util.List; |
|
|
|
|
@ -28,7 +27,6 @@ import org.springframework.util.Assert;
@@ -28,7 +27,6 @@ import org.springframework.util.Assert;
|
|
|
|
|
import org.springframework.util.LinkedMultiValueMap; |
|
|
|
|
import org.springframework.util.MultiValueMap; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* An extension of {@link org.springframework.messaging.simp.stomp.StompDecoder} |
|
|
|
|
* that buffers content remaining in the input ByteBuffer after the parent |
|
|
|
|
@ -45,6 +43,7 @@ import org.springframework.util.MultiValueMap;
@@ -45,6 +43,7 @@ import org.springframework.util.MultiValueMap;
|
|
|
|
|
* |
|
|
|
|
* @author Rossen Stoyanchev |
|
|
|
|
* @since 4.0.3 |
|
|
|
|
* @see StompDecoder |
|
|
|
|
*/ |
|
|
|
|
public class BufferingStompDecoder { |
|
|
|
|
|
|
|
|
|
@ -52,84 +51,63 @@ public class BufferingStompDecoder {
@@ -52,84 +51,63 @@ public class BufferingStompDecoder {
|
|
|
|
|
|
|
|
|
|
private final int bufferSizeLimit; |
|
|
|
|
|
|
|
|
|
private final Queue<ByteBuffer> chunks = new LinkedBlockingQueue<>(); |
|
|
|
|
private final Queue<ByteBuffer> chunks = new LinkedBlockingQueue<ByteBuffer>(); |
|
|
|
|
|
|
|
|
|
private volatile Integer expectedContentLength; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Create a new {@code BufferingStompDecoder} wrapping the given {@code StompDecoder}. |
|
|
|
|
* @param stompDecoder the target decoder to wrap |
|
|
|
|
* @param bufferSizeLimit the buffer size limit |
|
|
|
|
*/ |
|
|
|
|
public BufferingStompDecoder(StompDecoder stompDecoder, int bufferSizeLimit) { |
|
|
|
|
Assert.notNull(stompDecoder, "'stompDecoder' is required"); |
|
|
|
|
Assert.isTrue(bufferSizeLimit > 0, "Buffer size must be greater than 0"); |
|
|
|
|
Assert.notNull(stompDecoder, "StompDecoder is required"); |
|
|
|
|
Assert.isTrue(bufferSizeLimit > 0, "Buffer size limit must be greater than 0"); |
|
|
|
|
this.stompDecoder = stompDecoder; |
|
|
|
|
this.bufferSizeLimit = bufferSizeLimit; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Return the wrapped |
|
|
|
|
* {@link org.springframework.messaging.simp.stomp.StompDecoder}. |
|
|
|
|
* Return the wrapped {@link StompDecoder}. |
|
|
|
|
*/ |
|
|
|
|
public StompDecoder getStompDecoder() { |
|
|
|
|
public final StompDecoder getStompDecoder() { |
|
|
|
|
return this.stompDecoder; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Return the configured buffer size limit. |
|
|
|
|
*/ |
|
|
|
|
public int getBufferSizeLimit() { |
|
|
|
|
public final int getBufferSizeLimit() { |
|
|
|
|
return this.bufferSizeLimit; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Calculate the current buffer size. |
|
|
|
|
*/ |
|
|
|
|
public int getBufferSize() { |
|
|
|
|
int size = 0; |
|
|
|
|
for (ByteBuffer buffer : this.chunks) { |
|
|
|
|
size = size + buffer.remaining(); |
|
|
|
|
} |
|
|
|
|
return size; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Get the expected content length of the currently buffered, incomplete STOMP frame. |
|
|
|
|
*/ |
|
|
|
|
public Integer getExpectedContentLength() { |
|
|
|
|
return this.expectedContentLength; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Decodes one or more STOMP frames from the given {@code ByteBuffer} into a |
|
|
|
|
* list of {@link Message}s. |
|
|
|
|
* |
|
|
|
|
* <p>If there was enough data to parse a "content-length" header, then the |
|
|
|
|
* value is used to determine how much more data is needed before a new |
|
|
|
|
* attempt to decode is made. |
|
|
|
|
* |
|
|
|
|
* <p>If there was not enough data to parse the "content-length", or if there |
|
|
|
|
* is "content-length" header, every subsequent call to decode attempts to |
|
|
|
|
* parse again with all available data. Therefore the presence of a "content-length" |
|
|
|
|
* header helps to optimize the decoding of large messages. |
|
|
|
|
* |
|
|
|
|
* @param newBuffer a buffer containing new data to decode |
|
|
|
|
* |
|
|
|
|
* @return decoded messages or an empty list |
|
|
|
|
* @throws StompConversionException raised in case of decoding issues |
|
|
|
|
*/ |
|
|
|
|
public List<Message<byte[]>> decode(ByteBuffer newBuffer) { |
|
|
|
|
|
|
|
|
|
this.chunks.add(newBuffer); |
|
|
|
|
|
|
|
|
|
checkBufferLimits(); |
|
|
|
|
|
|
|
|
|
if (getExpectedContentLength() != null && getBufferSize() < this.expectedContentLength) { |
|
|
|
|
return Collections.<Message<byte[]>>emptyList(); |
|
|
|
|
if (this.expectedContentLength != null && getBufferSize() < this.expectedContentLength) { |
|
|
|
|
return Collections.emptyList(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ByteBuffer bufferToDecode = assembleChunksAndReset(); |
|
|
|
|
|
|
|
|
|
MultiValueMap<String, String> headers = new LinkedMultiValueMap<>(); |
|
|
|
|
MultiValueMap<String, String> headers = new LinkedMultiValueMap<String, String>(); |
|
|
|
|
List<Message<byte[]>> messages = this.stompDecoder.decode(bufferToDecode, headers); |
|
|
|
|
|
|
|
|
|
if (bufferToDecode.hasRemaining()) { |
|
|
|
|
@ -140,21 +118,6 @@ public class BufferingStompDecoder {
@@ -140,21 +118,6 @@ public class BufferingStompDecoder {
|
|
|
|
|
return messages; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void checkBufferLimits() { |
|
|
|
|
if (getExpectedContentLength() != null) { |
|
|
|
|
if (getExpectedContentLength() > getBufferSizeLimit()) { |
|
|
|
|
throw new StompConversionException( |
|
|
|
|
"The 'content-length' header " + getExpectedContentLength() + |
|
|
|
|
" exceeds the configured message buffer size limit " + getBufferSizeLimit()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (getBufferSize() > getBufferSizeLimit()) { |
|
|
|
|
throw new StompConversionException("The configured stomp frame buffer size limit of " + |
|
|
|
|
getBufferSizeLimit() + " bytes has been exceeded"); |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private ByteBuffer assembleChunksAndReset() { |
|
|
|
|
ByteBuffer result; |
|
|
|
|
if (this.chunks.size() == 1) { |
|
|
|
|
@ -172,4 +135,36 @@ public class BufferingStompDecoder {
@@ -172,4 +135,36 @@ public class BufferingStompDecoder {
|
|
|
|
|
return result; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void checkBufferLimits() { |
|
|
|
|
if (this.expectedContentLength != null) { |
|
|
|
|
if (this.expectedContentLength > this.bufferSizeLimit) { |
|
|
|
|
throw new StompConversionException( |
|
|
|
|
"STOMP 'content-length' header value " + this.expectedContentLength + |
|
|
|
|
" exceeds configured buffer size limit " + this.bufferSizeLimit); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (getBufferSize() > this.bufferSizeLimit) { |
|
|
|
|
throw new StompConversionException("The configured STOMP buffer size limit of " + |
|
|
|
|
this.bufferSizeLimit + " bytes has been exceeded"); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Calculate the current buffer size. |
|
|
|
|
*/ |
|
|
|
|
public int getBufferSize() { |
|
|
|
|
int size = 0; |
|
|
|
|
for (ByteBuffer buffer : this.chunks) { |
|
|
|
|
size = size + buffer.remaining(); |
|
|
|
|
} |
|
|
|
|
return size; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Get the expected content length of the currently buffered, incomplete STOMP frame. |
|
|
|
|
*/ |
|
|
|
|
public Integer getExpectedContentLength() { |
|
|
|
|
return this.expectedContentLength; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|