diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BufferingStompDecoder.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BufferingStompDecoder.java index bf5218749da..ac4fbbbb7da 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BufferingStompDecoder.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BufferingStompDecoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2014 the original author or authors. + * Copyright 2002-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,7 +16,6 @@ package org.springframework.messaging.simp.stomp; - import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; @@ -28,7 +27,6 @@ import org.springframework.util.Assert; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; - /** * An extension of {@link org.springframework.messaging.simp.stomp.StompDecoder} * that buffers content remaining in the input ByteBuffer after the parent @@ -45,6 +43,7 @@ import org.springframework.util.MultiValueMap; * * @author Rossen Stoyanchev * @since 4.0.3 + * @see StompDecoder */ public class BufferingStompDecoder { @@ -57,70 +56,50 @@ public class BufferingStompDecoder { private volatile Integer expectedContentLength; + /** + * Create a new {@code BufferingStompDecoder} wrapping the given {@code StompDecoder}. + * @param stompDecoder the target decoder to wrap + * @param bufferSizeLimit the buffer size limit + */ public BufferingStompDecoder(StompDecoder stompDecoder, int bufferSizeLimit) { - Assert.notNull(stompDecoder, "'stompDecoder' is required"); - Assert.isTrue(bufferSizeLimit > 0, "Buffer size must be greater than 0"); + Assert.notNull(stompDecoder, "StompDecoder is required"); + Assert.isTrue(bufferSizeLimit > 0, "Buffer size limit must be greater than 0"); this.stompDecoder = stompDecoder; this.bufferSizeLimit = bufferSizeLimit; } /** - * Return the wrapped - * {@link org.springframework.messaging.simp.stomp.StompDecoder}. + * Return the wrapped {@link StompDecoder}. */ - public StompDecoder getStompDecoder() { + public final StompDecoder getStompDecoder() { return this.stompDecoder; } /** * Return the configured buffer size limit. */ - public int getBufferSizeLimit() { + public final int getBufferSizeLimit() { return this.bufferSizeLimit; } - /** - * Calculate the current buffer size. - */ - public int getBufferSize() { - int size = 0; - for (ByteBuffer buffer : this.chunks) { - size = size + buffer.remaining(); - } - return size; - } - - /** - * Get the expected content length of the currently buffered, incomplete STOMP frame. - */ - public Integer getExpectedContentLength() { - return this.expectedContentLength; - } - /** * Decodes one or more STOMP frames from the given {@code ByteBuffer} into a * list of {@link Message}s. - * *

If there was enough data to parse a "content-length" header, then the * value is used to determine how much more data is needed before a new * attempt to decode is made. - * *

If there was not enough data to parse the "content-length", or if there * is "content-length" header, every subsequent call to decode attempts to * parse again with all available data. Therefore the presence of a "content-length" * header helps to optimize the decoding of large messages. - * * @param newBuffer a buffer containing new data to decode - * * @return decoded messages or an empty list * @throws StompConversionException raised in case of decoding issues */ public List> decode(ByteBuffer newBuffer) { - this.chunks.add(newBuffer); - checkBufferLimits(); if (getExpectedContentLength() != null && getBufferSize() < this.expectedContentLength) { @@ -128,7 +107,6 @@ public class BufferingStompDecoder { } ByteBuffer bufferToDecode = assembleChunksAndReset(); - MultiValueMap headers = new LinkedMultiValueMap(); List> messages = this.stompDecoder.decode(bufferToDecode, headers); @@ -140,21 +118,6 @@ public class BufferingStompDecoder { return messages; } - private void checkBufferLimits() { - if (getExpectedContentLength() != null) { - if (getExpectedContentLength() > getBufferSizeLimit()) { - throw new StompConversionException( - "The 'content-length' header " + getExpectedContentLength() + - " exceeds the configured message buffer size limit " + getBufferSizeLimit()); - } - } - if (getBufferSize() > getBufferSizeLimit()) { - throw new StompConversionException("The configured stomp frame buffer size limit of " + - getBufferSizeLimit() + " bytes has been exceeded"); - - } - } - private ByteBuffer assembleChunksAndReset() { ByteBuffer result; if (this.chunks.size() == 1) { @@ -172,4 +135,36 @@ public class BufferingStompDecoder { return result; } + private void checkBufferLimits() { + if (this.expectedContentLength != null) { + if (this.expectedContentLength > this.bufferSizeLimit) { + throw new StompConversionException( + "'content-length' header value " + this.expectedContentLength + + " exceeds configured message buffer size limit " + this.bufferSizeLimit); + } + } + if (getBufferSize() > this.bufferSizeLimit) { + throw new StompConversionException("The configured stomp frame buffer size limit of " + + this.bufferSizeLimit + " bytes has been exceeded"); + } + } + + /** + * Calculate the current buffer size. + */ + public int getBufferSize() { + int size = 0; + for (ByteBuffer buffer : this.chunks) { + size = size + buffer.remaining(); + } + return size; + } + + /** + * Get the expected content length of the currently buffered, incomplete STOMP frame. + */ + public Integer getExpectedContentLength() { + return this.expectedContentLength; + } + } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompClientSupport.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompClientSupport.java index e56b2a3703e..708f277b9cc 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompClientSupport.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompClientSupport.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,15 +26,14 @@ import org.springframework.util.Assert; /** * Base class for STOMP client implementations. * - *

Subclasses can connect over WebSocket or TCP using any library. - * When creating a new connection a sub-class can create an instance of - * {@link DefaultStompSession} which extends - * {@link org.springframework.messaging.tcp.TcpConnectionHandler - * TcpConnectionHandler} whose lifecycle methods the sub-class must then invoke. + *

Subclasses can connect over WebSocket or TCP using any library. When creating + * a new connection, a subclass can create an instance of @link DefaultStompSession} + * which extends {@link org.springframework.messaging.tcp.TcpConnectionHandler} + * whose lifecycle methods the subclass must then invoke. * - *

In effect {@code TcpConnectionHandler} and {@code TcpConnection} are the - * contracts any sub-class must adapt to while using {@link StompEncoder} and - * {@link StompDecoder} to encode and decode STOMP messages. + *

In effect, {@code TcpConnectionHandler} and {@code TcpConnection} are the + * contracts that any subclass must adapt to while using {@link StompEncoder} + * and {@link StompDecoder} to encode and decode STOMP messages. * * @author Rossen Stoyanchev * @since 4.2 @@ -58,7 +57,7 @@ public abstract class StompClientSupport { * @param messageConverter the message converter to use */ public void setMessageConverter(MessageConverter messageConverter) { - Assert.notNull(messageConverter, "'messageConverter' must not be null"); + Assert.notNull(messageConverter, "MessageConverter must not be null"); this.messageConverter = messageConverter; } @@ -92,7 +91,7 @@ public abstract class StompClientSupport { * CONNECT frame. The first number represents how often the client will write * or send a heart-beat. The second is how often the server should write. * A value of 0 means no heart-beats. - *

By default this is set to "10000,10000" but sub-classes may override + *

By default this is set to "10000,10000" but subclasses may override * that default and for example set it to "0,0" if they require a * TaskScheduler to be configured first. * @param heartbeat the value for the CONNECT "heart-beat" header @@ -100,22 +99,23 @@ public abstract class StompClientSupport { * http://stomp.github.io/stomp-specification-1.2.html#Heart-beating */ public void setDefaultHeartbeat(long[] heartbeat) { - Assert.notNull(heartbeat); - Assert.isTrue(heartbeat[0] >= 0 && heartbeat[1] >=0 , "Invalid heart-beat: " + Arrays.toString(heartbeat)); + if (heartbeat == null || heartbeat.length != 2 || heartbeat[0] < 0 || heartbeat[1] < 0) { + throw new IllegalArgumentException("Invalid heart-beat: " + Arrays.toString(heartbeat)); + } this.defaultHeartbeat = heartbeat; } /** - * Return the configured default heart-beat value, never {@code null}. + * Return the configured default heart-beat value (never {@code null}). */ public long[] getDefaultHeartbeat() { return this.defaultHeartbeat; } /** - * Whether heartbeats are enabled. Returns {@code false} if - * {@link #setDefaultHeartbeat defaultHeartbeat} is set to "0,0", and - * {@code true} otherwise. + * Determine whether heartbeats are enabled. + *

Returns {@code false} if {@link #setDefaultHeartbeat defaultHeartbeat} + * is set to "0,0", and {@code true} otherwise. */ public boolean isDefaultHeartbeatEnabled() { return (getDefaultHeartbeat() != null && getDefaultHeartbeat()[0] != 0 && getDefaultHeartbeat()[1] != 0); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompDecoder.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompDecoder.java index 28a636a984a..3bd92ebedf5 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompDecoder.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompDecoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,7 +53,6 @@ public class StompDecoder { private static final Log logger = LogFactory.getLog(StompDecoder.class); - private MessageHeaderInitializer headerInitializer; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompEncoder.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompEncoder.java index 228fbcdb577..86d7a7bfa58 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompEncoder.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompEncoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2014 the original author or authors. + * Copyright 2002-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,7 @@ package org.springframework.messaging.simp.stomp; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -39,14 +39,15 @@ import org.springframework.util.Assert; * @author Andy Wilkinson * @author Rossen Stoyanchev * @since 4.0 + * @see StompDecoder */ -public final class StompEncoder { +public class StompEncoder { private static final byte LF = '\n'; private static final byte COLON = ':'; - private final Log logger = LogFactory.getLog(StompEncoder.class); + private static final Log logger = LogFactory.getLog(StompEncoder.class); /** @@ -78,9 +79,13 @@ public final class StompEncoder { } output.write(StompDecoder.HEARTBEAT_PAYLOAD); } + else { StompCommand command = StompHeaderAccessor.getCommand(headers); - Assert.notNull(command, "Missing STOMP command: " + headers); + if (command == null) { + throw new IllegalStateException("Missing STOMP command: " + headers); + } + output.write(command.toString().getBytes(StompDecoder.UTF8_CHARSET)); output.write(LF); writeHeaders(command, headers, payload, output); @@ -96,8 +101,8 @@ public final class StompEncoder { } } - private void writeHeaders(StompCommand command, Map headers, byte[] payload, DataOutputStream output) - throws IOException { + private void writeHeaders(StompCommand command, Map headers, byte[] payload, + DataOutputStream output) throws IOException { @SuppressWarnings("unchecked") Map> nativeHeaders = @@ -114,22 +119,25 @@ public final class StompEncoder { boolean shouldEscape = (command != StompCommand.CONNECT && command != StompCommand.CONNECTED); for (Entry> entry : nativeHeaders.entrySet()) { - byte[] key = encodeHeaderString(entry.getKey(), shouldEscape); if (command.requiresContentLength() && "content-length".equals(entry.getKey())) { continue; } + List values = entry.getValue(); if (StompCommand.CONNECT.equals(command) && StompHeaderAccessor.STOMP_PASSCODE_HEADER.equals(entry.getKey())) { - values = Arrays.asList(StompHeaderAccessor.getPasscode(headers)); + values = Collections.singletonList(StompHeaderAccessor.getPasscode(headers)); } + + byte[] encodedKey = encodeHeaderString(entry.getKey(), shouldEscape); for (String value : values) { - output.write(key); + output.write(encodedKey); output.write(COLON); output.write(encodeHeaderString(value, shouldEscape)); output.write(LF); } } + if (command.requiresContentLength()) { int contentLength = payload.length; output.write("content-length:".getBytes(StompDecoder.UTF8_CHARSET));