Browse Source

Split messages only if configured to do so

See gh-31970
pull/32425/head
rstoyanchev 2 years ago
parent
commit
73ee86c666
  1. 74
      spring-websocket/src/main/java/org/springframework/web/socket/messaging/WebSocketStompClient.java
  2. 25
      spring-websocket/src/test/java/org/springframework/web/socket/messaging/WebSocketStompClientIntegrationTests.java

74
spring-websocket/src/main/java/org/springframework/web/socket/messaging/WebSocketStompClient.java

@ -20,6 +20,7 @@ import java.io.IOException; @@ -20,6 +20,7 @@ import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@ -75,16 +76,13 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif @@ -75,16 +76,13 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
private static final Log logger = LogFactory.getLog(WebSocketStompClient.class);
/**
* The default max size for in&outbound STOMP message.
*/
private static final int DEFAULT_MESSAGE_MAX_SIZE = 64 * 1024;
private final WebSocketClient webSocketClient;
private int inboundMessageSizeLimit = DEFAULT_MESSAGE_MAX_SIZE;
private int inboundMessageSizeLimit = 64 * 1024;
private int outboundMessageSizeLimit = DEFAULT_MESSAGE_MAX_SIZE;
@Nullable
private Integer outboundMessageSizeLimit;
private boolean autoStartup = true;
@ -131,7 +129,7 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif @@ -131,7 +129,7 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
* Since a STOMP message can be received in multiple WebSocket messages,
* buffering may be required and this property determines the maximum buffer
* size per message.
* <p>By default this is set to 64 * 1024 (64K), see {@link WebSocketStompClient#DEFAULT_MESSAGE_MAX_SIZE}.
* <p>By default this is set to 64 * 1024 (64K).
*/
public void setInboundMessageSizeLimit(int inboundMessageSizeLimit) {
this.inboundMessageSizeLimit = inboundMessageSizeLimit;
@ -148,10 +146,10 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif @@ -148,10 +146,10 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
* Configure the maximum size allowed for outbound STOMP message.
* If STOMP message's size exceeds {@link WebSocketStompClient#outboundMessageSizeLimit},
* STOMP message is split into multiple frames.
* <p>By default this is set to 64 * 1024 (64K), see {@link WebSocketStompClient#DEFAULT_MESSAGE_MAX_SIZE}.
* <p>By default this is not set in which case each STOMP message are not split.
* @since 6.2
*/
public void setOutboundMessageSizeLimit(int outboundMessageSizeLimit) {
public void setOutboundMessageSizeLimit(Integer outboundMessageSizeLimit) {
this.outboundMessageSizeLimit = outboundMessageSizeLimit;
}
@ -159,7 +157,8 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif @@ -159,7 +157,8 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
* Get the configured outbound message buffer size in bytes.
* @since 6.2
*/
public int getOutboundMessageSizeLimit() {
@Nullable
public Integer getOutboundMessageSizeLimit() {
return this.outboundMessageSizeLimit;
}
@ -479,8 +478,13 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif @@ -479,8 +478,13 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
try {
WebSocketSession session = this.session;
Assert.state(session != null, "No WebSocketSession available");
for (WebSocketMessage<?> webSocketMessage : this.codec.encode(message, session.getClass())) {
session.sendMessage(webSocketMessage);
if (this.codec.hasSplittingEncoder()) {
for (WebSocketMessage<?> outMessage : this.codec.encodeAndSplit(message, session.getClass())) {
session.sendMessage(outMessage);
}
}
else {
session.sendMessage(this.codec.encode(message, session.getClass()));
}
future.complete(null);
}
@ -592,11 +596,13 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif @@ -592,11 +596,13 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
private final BufferingStompDecoder bufferingDecoder;
@Nullable
private final SplittingStompEncoder splittingEncoder;
public StompWebSocketMessageCodec(int inboundMessageSizeLimit, int outboundMessageSizeLimit) {
public StompWebSocketMessageCodec(int inboundMessageSizeLimit, @Nullable Integer outboundMessageSizeLimit) {
this.bufferingDecoder = new BufferingStompDecoder(DECODER, inboundMessageSizeLimit);
this.splittingEncoder = new SplittingStompEncoder(ENCODER, outboundMessageSizeLimit);
this.splittingEncoder = (outboundMessageSizeLimit != null ?
new SplittingStompEncoder(ENCODER, outboundMessageSizeLimit) : null);
}
public List<Message<byte[]>> decode(WebSocketMessage<?> webSocketMessage) {
@ -622,21 +628,41 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif @@ -622,21 +628,41 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
return result;
}
public List<WebSocketMessage<?>> encode(Message<byte[]> message, Class<? extends WebSocketSession> sessionType) {
public boolean hasSplittingEncoder() {
return (this.splittingEncoder != null);
}
public WebSocketMessage<?> encode(Message<byte[]> message, Class<? extends WebSocketSession> sessionType) {
StompHeaderAccessor accessor = getStompHeaderAccessor(message);
byte[] payload = message.getPayload();
byte[] frame = ENCODER.encode(accessor.getMessageHeaders(), payload);
return (useBinary(accessor, payload, sessionType) ? new BinaryMessage(frame) : new TextMessage(frame));
}
public List<WebSocketMessage<?>> encodeAndSplit(Message<byte[]> message, Class<? extends WebSocketSession> sessionType) {
Assert.state(this.splittingEncoder != null, "No SplittingEncoder");
StompHeaderAccessor accessor = getStompHeaderAccessor(message);
byte[] payload = message.getPayload();
List<byte[]> frames = this.splittingEncoder.encode(accessor.getMessageHeaders(), payload);
boolean useBinary = useBinary(accessor, payload, sessionType);
List<WebSocketMessage<?>> messages = new ArrayList<>(frames.size());
frames.forEach(frame -> messages.add(useBinary ? new BinaryMessage(frame) : new TextMessage(frame)));
return messages;
}
private static StompHeaderAccessor getStompHeaderAccessor(Message<byte[]> message) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
Assert.notNull(accessor, "No StompHeaderAccessor available");
byte[] payload = message.getPayload();
List<byte[]> frames = splittingEncoder.encode(accessor.getMessageHeaders(), payload);
return accessor;
}
private static boolean useBinary(
StompHeaderAccessor accessor, byte[] payload, Class<? extends WebSocketSession> sessionType) {
boolean useBinary = (payload.length > 0 &&
return (payload.length > 0 &&
!(SockJsSession.class.isAssignableFrom(sessionType)) &&
MimeTypeUtils.APPLICATION_OCTET_STREAM.isCompatibleWith(accessor.getContentType()));
List<WebSocketMessage<?>> messages = new ArrayList<>();
for (byte[] frame : frames) {
messages.add(useBinary ? new BinaryMessage(frame) : new TextMessage(frame));
}
return messages;
}
}

25
spring-websocket/src/test/java/org/springframework/web/socket/messaging/WebSocketStompClientIntegrationTests.java

@ -66,6 +66,8 @@ class WebSocketStompClientIntegrationTests { @@ -66,6 +66,8 @@ class WebSocketStompClientIntegrationTests {
private AnnotationConfigWebApplicationContext wac;
private String url;
@BeforeEach
void setUp(TestInfo testInfo) throws Exception {
@ -83,6 +85,8 @@ class WebSocketStompClientIntegrationTests { @@ -83,6 +85,8 @@ class WebSocketStompClientIntegrationTests {
WebSocketClient webSocketClient = new StandardWebSocketClient();
this.stompClient = new WebSocketStompClient(webSocketClient);
this.stompClient.setMessageConverter(new StringMessageConverter());
this.url = "ws://127.0.0.1:" + this.server.getPort() + "/stomp";
}
@AfterEach
@ -109,17 +113,30 @@ class WebSocketStompClientIntegrationTests { @@ -109,17 +113,30 @@ class WebSocketStompClientIntegrationTests {
@Test
@SuppressWarnings("deprecation")
void publishSubscribe() throws Exception {
String url = "ws://127.0.0.1:" + this.server.getPort() + "/stomp";
TestHandler testHandler = new TestHandler("/topic/foo", "payload");
this.stompClient.connect(url, testHandler);
this.stompClient.connectAsync(this.url, testHandler);
assertThat(testHandler.awaitForMessageCount(1, 5000)).isTrue();
assertThat(testHandler.getReceived()).containsExactly("payload");
}
@Test
void publishSubscribeWithSlitMessage() throws Exception {
StringBuilder sb = new StringBuilder();
while (sb.length() < 1024) {
sb.append("A message with a long body... ");
}
String payload = sb.toString();
TestHandler testHandler = new TestHandler("/topic/foo", payload);
this.stompClient.setOutboundMessageSizeLimit(512);
this.stompClient.connectAsync(this.url, testHandler);
assertThat(testHandler.awaitForMessageCount(1, 5000)).isTrue();
assertThat(testHandler.getReceived()).containsExactly(payload);
}
@Configuration(proxyBeanMethods = false)
static class TestConfig extends WebSocketMessageBrokerConfigurationSupport {

Loading…
Cancel
Save