|
|
|
|
@ -1,5 +1,5 @@
@@ -1,5 +1,5 @@
|
|
|
|
|
/* |
|
|
|
|
* Copyright 2002-2019 the original author or authors. |
|
|
|
|
* Copyright 2002-2020 the original author or authors. |
|
|
|
|
* |
|
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
|
|
|
* you may not use this file except in compliance with the License. |
|
|
|
|
@ -16,12 +16,10 @@
@@ -16,12 +16,10 @@
|
|
|
|
|
|
|
|
|
|
package org.springframework.messaging.simp.stomp; |
|
|
|
|
|
|
|
|
|
import java.io.ByteArrayOutputStream; |
|
|
|
|
import java.io.DataOutputStream; |
|
|
|
|
import java.io.IOException; |
|
|
|
|
import java.nio.charset.StandardCharsets; |
|
|
|
|
import java.util.Collections; |
|
|
|
|
import java.util.LinkedHashMap; |
|
|
|
|
import java.util.LinkedList; |
|
|
|
|
import java.util.List; |
|
|
|
|
import java.util.Map; |
|
|
|
|
import java.util.Map.Entry; |
|
|
|
|
@ -47,9 +45,9 @@ import org.springframework.util.Assert;
@@ -47,9 +45,9 @@ import org.springframework.util.Assert;
|
|
|
|
|
*/ |
|
|
|
|
public class StompEncoder { |
|
|
|
|
|
|
|
|
|
private static final byte LF = '\n'; |
|
|
|
|
private static final Byte LINE_FEED_BYTE = '\n'; |
|
|
|
|
|
|
|
|
|
private static final byte COLON = ':'; |
|
|
|
|
private static final Byte COLON_BYTE = ':'; |
|
|
|
|
|
|
|
|
|
private static final Log logger = SimpLogging.forLogName(StompEncoder.class); |
|
|
|
|
|
|
|
|
|
@ -93,38 +91,28 @@ public class StompEncoder {
@@ -93,38 +91,28 @@ public class StompEncoder {
|
|
|
|
|
Assert.notNull(headers, "'headers' is required"); |
|
|
|
|
Assert.notNull(payload, "'payload' is required"); |
|
|
|
|
|
|
|
|
|
try { |
|
|
|
|
ByteArrayOutputStream baos = new ByteArrayOutputStream(128 + payload.length); |
|
|
|
|
DataOutputStream output = new DataOutputStream(baos); |
|
|
|
|
|
|
|
|
|
if (SimpMessageType.HEARTBEAT.equals(SimpMessageHeaderAccessor.getMessageType(headers))) { |
|
|
|
|
logger.trace("Encoding heartbeat"); |
|
|
|
|
output.write(StompDecoder.HEARTBEAT_PAYLOAD); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
else { |
|
|
|
|
StompCommand command = StompHeaderAccessor.getCommand(headers); |
|
|
|
|
if (command == null) { |
|
|
|
|
throw new IllegalStateException("Missing STOMP command: " + headers); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
output.write(command.toString().getBytes(StandardCharsets.UTF_8)); |
|
|
|
|
output.write(LF); |
|
|
|
|
writeHeaders(command, headers, payload, output); |
|
|
|
|
output.write(LF); |
|
|
|
|
writeBody(payload, output); |
|
|
|
|
output.write((byte) 0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return baos.toByteArray(); |
|
|
|
|
if (SimpMessageType.HEARTBEAT.equals(SimpMessageHeaderAccessor.getMessageType(headers))) { |
|
|
|
|
logger.trace("Encoding heartbeat"); |
|
|
|
|
return StompDecoder.HEARTBEAT_PAYLOAD; |
|
|
|
|
} |
|
|
|
|
catch (IOException ex) { |
|
|
|
|
throw new StompConversionException("Failed to encode STOMP frame, headers=" + headers, ex); |
|
|
|
|
|
|
|
|
|
StompCommand command = StompHeaderAccessor.getCommand(headers); |
|
|
|
|
if (command == null) { |
|
|
|
|
throw new IllegalStateException("Missing STOMP command: " + headers); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Result result = new DefaultResult(); |
|
|
|
|
result.add(command.toString().getBytes(StandardCharsets.UTF_8)); |
|
|
|
|
result.add(LINE_FEED_BYTE); |
|
|
|
|
writeHeaders(command, headers, payload, result); |
|
|
|
|
result.add(LINE_FEED_BYTE); |
|
|
|
|
result.add(payload); |
|
|
|
|
result.add((byte) 0); |
|
|
|
|
return result.toByteArray(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void writeHeaders(StompCommand command, Map<String, Object> headers, byte[] payload, |
|
|
|
|
DataOutputStream output) throws IOException { |
|
|
|
|
private void writeHeaders( |
|
|
|
|
StompCommand command, Map<String, Object> headers, byte[] payload, Result result) { |
|
|
|
|
|
|
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
|
Map<String,List<String>> nativeHeaders = |
|
|
|
|
@ -154,18 +142,18 @@ public class StompEncoder {
@@ -154,18 +142,18 @@ public class StompEncoder {
|
|
|
|
|
|
|
|
|
|
byte[] encodedKey = encodeHeaderKey(entry.getKey(), shouldEscape); |
|
|
|
|
for (String value : values) { |
|
|
|
|
output.write(encodedKey); |
|
|
|
|
output.write(COLON); |
|
|
|
|
output.write(encodeHeaderValue(value, shouldEscape)); |
|
|
|
|
output.write(LF); |
|
|
|
|
result.add(encodedKey); |
|
|
|
|
result.add(COLON_BYTE); |
|
|
|
|
result.add(encodeHeaderValue(value, shouldEscape)); |
|
|
|
|
result.add(LINE_FEED_BYTE); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (command.requiresContentLength()) { |
|
|
|
|
int contentLength = payload.length; |
|
|
|
|
output.write("content-length:".getBytes(StandardCharsets.UTF_8)); |
|
|
|
|
output.write(Integer.toString(contentLength).getBytes(StandardCharsets.UTF_8)); |
|
|
|
|
output.write(LF); |
|
|
|
|
result.add("content-length:".getBytes(StandardCharsets.UTF_8)); |
|
|
|
|
result.add(Integer.toString(contentLength).getBytes(StandardCharsets.UTF_8)); |
|
|
|
|
result.add(LINE_FEED_BYTE); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -229,8 +217,51 @@ public class StompEncoder {
@@ -229,8 +217,51 @@ public class StompEncoder {
|
|
|
|
|
return sb; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void writeBody(byte[] payload, DataOutputStream output) throws IOException { |
|
|
|
|
output.write(payload); |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Accumulates byte content and returns an aggregated byte[] at the end. |
|
|
|
|
*/ |
|
|
|
|
private interface Result { |
|
|
|
|
|
|
|
|
|
void add(byte[] bytes); |
|
|
|
|
|
|
|
|
|
void add(byte b); |
|
|
|
|
|
|
|
|
|
byte[] toByteArray(); |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@SuppressWarnings("serial") |
|
|
|
|
private static class DefaultResult extends LinkedList<Object> implements Result { |
|
|
|
|
|
|
|
|
|
private int size; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public void add(byte[] bytes) { |
|
|
|
|
this.size += bytes.length; |
|
|
|
|
super.add(bytes); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public void add(byte b) { |
|
|
|
|
this.size++; |
|
|
|
|
super.add(b); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public byte[] toByteArray() { |
|
|
|
|
byte[] result = new byte[this.size]; |
|
|
|
|
int position = 0; |
|
|
|
|
for (Object o : this) { |
|
|
|
|
if (o instanceof byte[]) { |
|
|
|
|
byte[] src = (byte[]) o; |
|
|
|
|
System.arraycopy(src, 0, result, position, src.length); |
|
|
|
|
position += src.length; |
|
|
|
|
} |
|
|
|
|
else { |
|
|
|
|
result[position++] = (Byte) o; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return result; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|