Browse Source

StompEncoder performance improvement plus related polishing

Issue: SPR-14747
pull/1191/head
Juergen Hoeller 10 years ago
parent
commit
6c764f6b8a
  1. 93
      spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BufferingStompDecoder.java
  2. 34
      spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompClientSupport.java
  3. 3
      spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompDecoder.java
  4. 28
      spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompEncoder.java

93
spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BufferingStompDecoder.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2014 the original author or authors. * Copyright 2002-2016 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -16,7 +16,6 @@
package org.springframework.messaging.simp.stomp; package org.springframework.messaging.simp.stomp;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -28,7 +27,6 @@ import org.springframework.util.Assert;
import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap; import org.springframework.util.MultiValueMap;
/** /**
* An extension of {@link org.springframework.messaging.simp.stomp.StompDecoder} * An extension of {@link org.springframework.messaging.simp.stomp.StompDecoder}
* that buffers content remaining in the input ByteBuffer after the parent * that buffers content remaining in the input ByteBuffer after the parent
@ -45,6 +43,7 @@ import org.springframework.util.MultiValueMap;
* *
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @since 4.0.3 * @since 4.0.3
* @see StompDecoder
*/ */
public class BufferingStompDecoder { public class BufferingStompDecoder {
@ -57,70 +56,50 @@ public class BufferingStompDecoder {
private volatile Integer expectedContentLength; private volatile Integer expectedContentLength;
/**
* Create a new {@code BufferingStompDecoder} wrapping the given {@code StompDecoder}.
* @param stompDecoder the target decoder to wrap
* @param bufferSizeLimit the buffer size limit
*/
public BufferingStompDecoder(StompDecoder stompDecoder, int bufferSizeLimit) { public BufferingStompDecoder(StompDecoder stompDecoder, int bufferSizeLimit) {
Assert.notNull(stompDecoder, "'stompDecoder' is required"); Assert.notNull(stompDecoder, "StompDecoder is required");
Assert.isTrue(bufferSizeLimit > 0, "Buffer size must be greater than 0"); Assert.isTrue(bufferSizeLimit > 0, "Buffer size limit must be greater than 0");
this.stompDecoder = stompDecoder; this.stompDecoder = stompDecoder;
this.bufferSizeLimit = bufferSizeLimit; this.bufferSizeLimit = bufferSizeLimit;
} }
/** /**
* Return the wrapped * Return the wrapped {@link StompDecoder}.
* {@link org.springframework.messaging.simp.stomp.StompDecoder}.
*/ */
public StompDecoder getStompDecoder() { public final StompDecoder getStompDecoder() {
return this.stompDecoder; return this.stompDecoder;
} }
/** /**
* Return the configured buffer size limit. * Return the configured buffer size limit.
*/ */
public int getBufferSizeLimit() { public final int getBufferSizeLimit() {
return this.bufferSizeLimit; return this.bufferSizeLimit;
} }
/**
* Calculate the current buffer size.
*/
public int getBufferSize() {
int size = 0;
for (ByteBuffer buffer : this.chunks) {
size = size + buffer.remaining();
}
return size;
}
/**
* Get the expected content length of the currently buffered, incomplete STOMP frame.
*/
public Integer getExpectedContentLength() {
return this.expectedContentLength;
}
/** /**
* Decodes one or more STOMP frames from the given {@code ByteBuffer} into a * Decodes one or more STOMP frames from the given {@code ByteBuffer} into a
* list of {@link Message}s. * list of {@link Message}s.
*
* <p>If there was enough data to parse a "content-length" header, then the * <p>If there was enough data to parse a "content-length" header, then the
* value is used to determine how much more data is needed before a new * value is used to determine how much more data is needed before a new
* attempt to decode is made. * attempt to decode is made.
*
* <p>If there was not enough data to parse the "content-length", or if there * <p>If there was not enough data to parse the "content-length", or if there
* is "content-length" header, every subsequent call to decode attempts to * is "content-length" header, every subsequent call to decode attempts to
* parse again with all available data. Therefore the presence of a "content-length" * parse again with all available data. Therefore the presence of a "content-length"
* header helps to optimize the decoding of large messages. * header helps to optimize the decoding of large messages.
*
* @param newBuffer a buffer containing new data to decode * @param newBuffer a buffer containing new data to decode
*
* @return decoded messages or an empty list * @return decoded messages or an empty list
* @throws StompConversionException raised in case of decoding issues * @throws StompConversionException raised in case of decoding issues
*/ */
public List<Message<byte[]>> decode(ByteBuffer newBuffer) { public List<Message<byte[]>> decode(ByteBuffer newBuffer) {
this.chunks.add(newBuffer); this.chunks.add(newBuffer);
checkBufferLimits(); checkBufferLimits();
if (getExpectedContentLength() != null && getBufferSize() < this.expectedContentLength) { if (getExpectedContentLength() != null && getBufferSize() < this.expectedContentLength) {
@ -128,7 +107,6 @@ public class BufferingStompDecoder {
} }
ByteBuffer bufferToDecode = assembleChunksAndReset(); ByteBuffer bufferToDecode = assembleChunksAndReset();
MultiValueMap<String, String> headers = new LinkedMultiValueMap<String, String>(); MultiValueMap<String, String> headers = new LinkedMultiValueMap<String, String>();
List<Message<byte[]>> messages = this.stompDecoder.decode(bufferToDecode, headers); List<Message<byte[]>> messages = this.stompDecoder.decode(bufferToDecode, headers);
@ -140,21 +118,6 @@ public class BufferingStompDecoder {
return messages; return messages;
} }
private void checkBufferLimits() {
if (getExpectedContentLength() != null) {
if (getExpectedContentLength() > getBufferSizeLimit()) {
throw new StompConversionException(
"The 'content-length' header " + getExpectedContentLength() +
" exceeds the configured message buffer size limit " + getBufferSizeLimit());
}
}
if (getBufferSize() > getBufferSizeLimit()) {
throw new StompConversionException("The configured stomp frame buffer size limit of " +
getBufferSizeLimit() + " bytes has been exceeded");
}
}
private ByteBuffer assembleChunksAndReset() { private ByteBuffer assembleChunksAndReset() {
ByteBuffer result; ByteBuffer result;
if (this.chunks.size() == 1) { if (this.chunks.size() == 1) {
@ -172,4 +135,36 @@ public class BufferingStompDecoder {
return result; return result;
} }
private void checkBufferLimits() {
if (this.expectedContentLength != null) {
if (this.expectedContentLength > this.bufferSizeLimit) {
throw new StompConversionException(
"'content-length' header value " + this.expectedContentLength +
" exceeds configured message buffer size limit " + this.bufferSizeLimit);
}
}
if (getBufferSize() > this.bufferSizeLimit) {
throw new StompConversionException("The configured stomp frame buffer size limit of " +
this.bufferSizeLimit + " bytes has been exceeded");
}
}
/**
* Calculate the current buffer size.
*/
public int getBufferSize() {
int size = 0;
for (ByteBuffer buffer : this.chunks) {
size = size + buffer.remaining();
}
return size;
}
/**
* Get the expected content length of the currently buffered, incomplete STOMP frame.
*/
public Integer getExpectedContentLength() {
return this.expectedContentLength;
}
} }

34
spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompClientSupport.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2015 the original author or authors. * Copyright 2002-2016 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -26,15 +26,14 @@ import org.springframework.util.Assert;
/** /**
* Base class for STOMP client implementations. * Base class for STOMP client implementations.
* *
* <p>Subclasses can connect over WebSocket or TCP using any library. * <p>Subclasses can connect over WebSocket or TCP using any library. When creating
* When creating a new connection a sub-class can create an instance of * a new connection, a subclass can create an instance of @link DefaultStompSession}
* {@link DefaultStompSession} which extends * which extends {@link org.springframework.messaging.tcp.TcpConnectionHandler}
* {@link org.springframework.messaging.tcp.TcpConnectionHandler * whose lifecycle methods the subclass must then invoke.
* TcpConnectionHandler} whose lifecycle methods the sub-class must then invoke.
* *
* <p>In effect {@code TcpConnectionHandler} and {@code TcpConnection} are the * <p>In effect, {@code TcpConnectionHandler} and {@code TcpConnection} are the
* contracts any sub-class must adapt to while using {@link StompEncoder} and * contracts that any subclass must adapt to while using {@link StompEncoder}
* {@link StompDecoder} to encode and decode STOMP messages. * and {@link StompDecoder} to encode and decode STOMP messages.
* *
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @since 4.2 * @since 4.2
@ -58,7 +57,7 @@ public abstract class StompClientSupport {
* @param messageConverter the message converter to use * @param messageConverter the message converter to use
*/ */
public void setMessageConverter(MessageConverter messageConverter) { public void setMessageConverter(MessageConverter messageConverter) {
Assert.notNull(messageConverter, "'messageConverter' must not be null"); Assert.notNull(messageConverter, "MessageConverter must not be null");
this.messageConverter = messageConverter; this.messageConverter = messageConverter;
} }
@ -92,7 +91,7 @@ public abstract class StompClientSupport {
* CONNECT frame. The first number represents how often the client will write * CONNECT frame. The first number represents how often the client will write
* or send a heart-beat. The second is how often the server should write. * or send a heart-beat. The second is how often the server should write.
* A value of 0 means no heart-beats. * A value of 0 means no heart-beats.
* <p>By default this is set to "10000,10000" but sub-classes may override * <p>By default this is set to "10000,10000" but subclasses may override
* that default and for example set it to "0,0" if they require a * that default and for example set it to "0,0" if they require a
* TaskScheduler to be configured first. * TaskScheduler to be configured first.
* @param heartbeat the value for the CONNECT "heart-beat" header * @param heartbeat the value for the CONNECT "heart-beat" header
@ -100,22 +99,23 @@ public abstract class StompClientSupport {
* http://stomp.github.io/stomp-specification-1.2.html#Heart-beating</a> * http://stomp.github.io/stomp-specification-1.2.html#Heart-beating</a>
*/ */
public void setDefaultHeartbeat(long[] heartbeat) { public void setDefaultHeartbeat(long[] heartbeat) {
Assert.notNull(heartbeat); if (heartbeat == null || heartbeat.length != 2 || heartbeat[0] < 0 || heartbeat[1] < 0) {
Assert.isTrue(heartbeat[0] >= 0 && heartbeat[1] >=0 , "Invalid heart-beat: " + Arrays.toString(heartbeat)); throw new IllegalArgumentException("Invalid heart-beat: " + Arrays.toString(heartbeat));
}
this.defaultHeartbeat = heartbeat; this.defaultHeartbeat = heartbeat;
} }
/** /**
* Return the configured default heart-beat value, never {@code null}. * Return the configured default heart-beat value (never {@code null}).
*/ */
public long[] getDefaultHeartbeat() { public long[] getDefaultHeartbeat() {
return this.defaultHeartbeat; return this.defaultHeartbeat;
} }
/** /**
* Whether heartbeats are enabled. Returns {@code false} if * Determine whether heartbeats are enabled.
* {@link #setDefaultHeartbeat defaultHeartbeat} is set to "0,0", and * <p>Returns {@code false} if {@link #setDefaultHeartbeat defaultHeartbeat}
* {@code true} otherwise. * is set to "0,0", and {@code true} otherwise.
*/ */
public boolean isDefaultHeartbeatEnabled() { public boolean isDefaultHeartbeatEnabled() {
return (getDefaultHeartbeat() != null && getDefaultHeartbeat()[0] != 0 && getDefaultHeartbeat()[1] != 0); return (getDefaultHeartbeat() != null && getDefaultHeartbeat()[0] != 0 && getDefaultHeartbeat()[1] != 0);

3
spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompDecoder.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2015 the original author or authors. * Copyright 2002-2016 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -53,7 +53,6 @@ public class StompDecoder {
private static final Log logger = LogFactory.getLog(StompDecoder.class); private static final Log logger = LogFactory.getLog(StompDecoder.class);
private MessageHeaderInitializer headerInitializer; private MessageHeaderInitializer headerInitializer;

28
spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompEncoder.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2014 the original author or authors. * Copyright 2002-2016 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -19,7 +19,7 @@ package org.springframework.messaging.simp.stomp;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
@ -39,14 +39,15 @@ import org.springframework.util.Assert;
* @author Andy Wilkinson * @author Andy Wilkinson
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @since 4.0 * @since 4.0
* @see StompDecoder
*/ */
public final class StompEncoder { public class StompEncoder {
private static final byte LF = '\n'; private static final byte LF = '\n';
private static final byte COLON = ':'; private static final byte COLON = ':';
private final Log logger = LogFactory.getLog(StompEncoder.class); private static final Log logger = LogFactory.getLog(StompEncoder.class);
/** /**
@ -78,9 +79,13 @@ public final class StompEncoder {
} }
output.write(StompDecoder.HEARTBEAT_PAYLOAD); output.write(StompDecoder.HEARTBEAT_PAYLOAD);
} }
else { else {
StompCommand command = StompHeaderAccessor.getCommand(headers); StompCommand command = StompHeaderAccessor.getCommand(headers);
Assert.notNull(command, "Missing STOMP command: " + headers); if (command == null) {
throw new IllegalStateException("Missing STOMP command: " + headers);
}
output.write(command.toString().getBytes(StompDecoder.UTF8_CHARSET)); output.write(command.toString().getBytes(StompDecoder.UTF8_CHARSET));
output.write(LF); output.write(LF);
writeHeaders(command, headers, payload, output); writeHeaders(command, headers, payload, output);
@ -96,8 +101,8 @@ public final class StompEncoder {
} }
} }
private void writeHeaders(StompCommand command, Map<String, Object> headers, byte[] payload, DataOutputStream output) private void writeHeaders(StompCommand command, Map<String, Object> headers, byte[] payload,
throws IOException { DataOutputStream output) throws IOException {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Map<String,List<String>> nativeHeaders = Map<String,List<String>> nativeHeaders =
@ -114,22 +119,25 @@ public final class StompEncoder {
boolean shouldEscape = (command != StompCommand.CONNECT && command != StompCommand.CONNECTED); boolean shouldEscape = (command != StompCommand.CONNECT && command != StompCommand.CONNECTED);
for (Entry<String, List<String>> entry : nativeHeaders.entrySet()) { for (Entry<String, List<String>> entry : nativeHeaders.entrySet()) {
byte[] key = encodeHeaderString(entry.getKey(), shouldEscape);
if (command.requiresContentLength() && "content-length".equals(entry.getKey())) { if (command.requiresContentLength() && "content-length".equals(entry.getKey())) {
continue; continue;
} }
List<String> values = entry.getValue(); List<String> values = entry.getValue();
if (StompCommand.CONNECT.equals(command) && if (StompCommand.CONNECT.equals(command) &&
StompHeaderAccessor.STOMP_PASSCODE_HEADER.equals(entry.getKey())) { StompHeaderAccessor.STOMP_PASSCODE_HEADER.equals(entry.getKey())) {
values = Arrays.asList(StompHeaderAccessor.getPasscode(headers)); values = Collections.singletonList(StompHeaderAccessor.getPasscode(headers));
} }
byte[] encodedKey = encodeHeaderString(entry.getKey(), shouldEscape);
for (String value : values) { for (String value : values) {
output.write(key); output.write(encodedKey);
output.write(COLON); output.write(COLON);
output.write(encodeHeaderString(value, shouldEscape)); output.write(encodeHeaderString(value, shouldEscape));
output.write(LF); output.write(LF);
} }
} }
if (command.requiresContentLength()) { if (command.requiresContentLength()) {
int contentLength = payload.length; int contentLength = payload.length;
output.write("content-length:".getBytes(StompDecoder.UTF8_CHARSET)); output.write("content-length:".getBytes(StompDecoder.UTF8_CHARSET));

Loading…
Cancel
Save