Browse Source

Introduce MessageHeader accessor types

A new type MessageHeaderAccesssor provides read/write access to
MessageHeaders along with typed getter/setter methods along the lines
of the existing MessageBuilder methods (internally MessageBuilder
merely delegates to MessageHeaderAccessor). This class is extensible
with sub-classes expected to provide typed getter/setter methods for
specific categories of message headers.

NativeMessageHeaderAccessor is one specific sub-class that further
provides read/write access to headers from some external message
source (e.g. STOMP headers). Native headers are stored in a separate
MultiValueMap and kept under a specific key.
pull/286/merge
Rossen Stoyanchev 13 years ago
parent
commit
486b4101ec
  1. 2
      build.gradle
  2. 43
      spring-context/src/main/java/org/springframework/messaging/MessageHeaders.java
  3. 6
      spring-context/src/main/java/org/springframework/messaging/support/GenericMessage.java
  4. 247
      spring-context/src/main/java/org/springframework/messaging/support/MessageBuilder.java
  5. 219
      spring-context/src/main/java/org/springframework/messaging/support/MessageHeaderAccesssor.java
  6. 140
      spring-context/src/main/java/org/springframework/messaging/support/NativeMessageHeaderAccessor.java
  7. 6
      spring-websocket/src/main/java/org/springframework/web/messaging/service/AbstractPubSubMessageHandler.java
  8. 12
      spring-websocket/src/main/java/org/springframework/web/messaging/service/ReactorPubSubMessageHandler.java
  9. 4
      spring-websocket/src/main/java/org/springframework/web/messaging/service/method/AnnotationPubSubMessageHandler.java
  10. 4
      spring-websocket/src/main/java/org/springframework/web/messaging/service/method/MessageBodyArgumentResolver.java
  11. 8
      spring-websocket/src/main/java/org/springframework/web/messaging/service/method/MessageReturnValueHandler.java
  12. 147
      spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompHeaderAccessor.java
  13. 4
      spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompMessageConverter.java
  14. 12
      spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompRelayPubSubMessageHandler.java
  15. 12
      spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompWebSocketHandler.java
  16. 251
      spring-websocket/src/main/java/org/springframework/web/messaging/support/PubSubHeaderAccesssor.java
  17. 6
      spring-websocket/src/main/java/org/springframework/web/messaging/support/PubSubMessageBuilder.java
  18. 167
      spring-websocket/src/main/java/org/springframework/web/messaging/support/WebMessageHeaderAccesssor.java
  19. 6
      spring-websocket/src/test/java/org/springframework/web/messaging/stomp/support/StompMessageConverterTests.java

2
build.gradle

@ -490,7 +490,7 @@ project("spring-websocket") { @@ -490,7 +490,7 @@ project("spring-websocket") {
repositories {
maven { url "https://repository.apache.org/content/repositories/snapshots" } // tomcat-websocket-* snapshots
maven { url "https://maven.java.net/content/repositories/releases" } // javax.websocket, tyrus
mavenLocal() // temporary workaround for locally installed (latest) reactor
mavenLocal() // temporary workaround for locally installed (latest) reactor
maven { url 'http://repo.springsource.org/snapshot' } // reactor
}
}

43
spring-context/src/main/java/org/springframework/messaging/MessageHeaders.java

@ -35,8 +35,8 @@ import org.apache.commons.logging.LogFactory; @@ -35,8 +35,8 @@ import org.apache.commons.logging.LogFactory;
/**
* The headers for a {@link Message}.<br>
* IMPORTANT: MessageHeaders are immutable. Any mutating operation (e.g., put(..), putAll(..) etc.)
* will result in {@link UnsupportedOperationException}
* IMPORTANT: This class is immutable. Any mutating operation (e.g., put(..), putAll(..) etc.)
* will throw {@link UnsupportedOperationException}
*
* <p>To create MessageHeaders instance use fluent MessageBuilder API
* <pre>
@ -52,11 +52,10 @@ import org.apache.commons.logging.LogFactory; @@ -52,11 +52,10 @@ import org.apache.commons.logging.LogFactory;
*
* @author Arjen Poutsma
* @author Mark Fisher
* @author Oleg Zhurakousky
* @author Gary Russell
* @since 4.0
*/
public class MessageHeaders implements Map<String, Object>, Serializable {
public final class MessageHeaders implements Map<String, Object>, Serializable {
private static final long serialVersionUID = -4615750558355702881L;
@ -74,26 +73,12 @@ public class MessageHeaders implements Map<String, Object>, Serializable { @@ -74,26 +73,12 @@ public class MessageHeaders implements Map<String, Object>, Serializable {
public static final String TIMESTAMP = "timestamp";
public static final String CORRELATION_ID = "correlationId";
public static final String REPLY_CHANNEL = "replyChannel";
public static final String ERROR_CHANNEL = "errorChannel";
public static final String EXPIRATION_DATE = "expirationDate";
public static final String PRIORITY = "priority";
public static final String SEQUENCE_NUMBER = "sequenceNumber";
public static final String SEQUENCE_SIZE = "sequenceSize";
public static final String SEQUENCE_DETAILS = "sequenceDetails";
public static final String CONTENT_TYPE = "contentType";
public static final String POSTPROCESS_RESULT = "postProcessResult";
public static final List<String> HEADER_NAMES = Arrays.asList(ID, TIMESTAMP);
@ -121,28 +106,6 @@ public class MessageHeaders implements Map<String, Object>, Serializable { @@ -121,28 +106,6 @@ public class MessageHeaders implements Map<String, Object>, Serializable {
return this.get(TIMESTAMP, Long.class);
}
public Long getExpirationDate() {
return this.get(EXPIRATION_DATE, Long.class);
}
public Object getCorrelationId() {
return this.get(CORRELATION_ID);
}
public Integer getSequenceNumber() {
Integer sequenceNumber = this.get(SEQUENCE_NUMBER, Integer.class);
return (sequenceNumber != null ? sequenceNumber : 0);
}
public Integer getSequenceSize() {
Integer sequenceSize = this.get(SEQUENCE_SIZE, Integer.class);
return (sequenceSize != null ? sequenceSize : 0);
}
public Integer getPriority() {
return this.get(PRIORITY, Integer.class);
}
public Object getReplyChannel() {
return this.get(REPLY_CHANNEL);
}

6
spring-context/src/main/java/org/springframework/messaging/support/GenericMessage.java

@ -66,15 +66,11 @@ public class GenericMessage<T> implements Message<T>, Serializable { @@ -66,15 +66,11 @@ public class GenericMessage<T> implements Message<T>, Serializable {
else {
headers = new HashMap<String, Object>(headers);
}
this.headers = createMessageHeaders(headers);
this.headers = new MessageHeaders(headers);
this.payload = payload;
}
protected MessageHeaders createMessageHeaders(Map<String, Object> headers) {
return new MessageHeaders(headers);
}
public MessageHeaders getHeaders() {
return this.headers;
}

247
spring-context/src/main/java/org/springframework/messaging/support/MessageBuilder.java

@ -16,41 +16,28 @@ @@ -16,41 +16,28 @@
package org.springframework.messaging.support;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;
import org.springframework.util.PatternMatchUtils;
import org.springframework.util.StringUtils;
/**
* TODO
* A builder for creating {@link GenericMessage} or {@link ErrorMessage} if the payload is
* {@link Throwable}.
*
* @author Arjen Poutsma
* @author Mark Fisher
* @author Oleg Zhurakousky
* @author Dave Syer
* @since 4.0
*/
public final class MessageBuilder<T> {
private final T payload;
private final Map<String, Object> headers = new HashMap<String, Object>();
private final MessageHeaderAccesssor headerAccessor;
private final Message<T> originalMessage;
private volatile boolean modified;
/**
* Private constructor to be invoked from the static factory methods only.
@ -59,15 +46,13 @@ public final class MessageBuilder<T> { @@ -59,15 +46,13 @@ public final class MessageBuilder<T> {
Assert.notNull(payload, "payload must not be null");
this.payload = payload;
this.originalMessage = originalMessage;
if (originalMessage != null) {
this.copyHeaders(originalMessage.getHeaders());
this.modified = (!this.payload.equals(originalMessage.getPayload()));
}
this.headerAccessor = new MessageHeaderAccesssor(originalMessage);
}
/**
* Create a builder for a new {@link Message} instance pre-populated with all of the headers copied from the
* provided message. The payload of the provided Message will also be used as the payload for the new message.
* Create a builder for a new {@link Message} instance pre-populated with all of the
* headers copied from the provided message. The payload of the provided Message will
* also be used as the payload for the new message.
*
* @param message the Message from which the payload and all headers will be copied
*/
@ -88,248 +73,88 @@ public final class MessageBuilder<T> { @@ -88,248 +73,88 @@ public final class MessageBuilder<T> {
}
/**
* Set the value for the given header name. If the provided value is <code>null</code>, the header will be removed.
* Set the value for the given header name. If the provided value is <code>null</code>
* , the header will be removed.
*/
public MessageBuilder<T> setHeader(String headerName, Object headerValue) {
Assert.isTrue(!this.isReadOnly(headerName), "The '" + headerName + "' header is read-only.");
if (StringUtils.hasLength(headerName) && !headerName.equals(MessageHeaders.ID)
&& !headerName.equals(MessageHeaders.TIMESTAMP)) {
this.verifyType(headerName, headerValue);
if (headerValue == null) {
Object removedValue = this.headers.remove(headerName);
if (removedValue != null) {
this.modified = true;
}
}
else {
Object replacedValue = this.headers.put(headerName, headerValue);
if (!headerValue.equals(replacedValue)) {
this.modified = true;
}
}
}
this.headerAccessor.setHeader(headerName, headerValue);
return this;
}
/**
* Set the value for the given header name only if the header name is not already associated with a value.
* Set the value for the given header name only if the header name is not already
* associated with a value.
*/
public MessageBuilder<T> setHeaderIfAbsent(String headerName, Object headerValue) {
if (this.headers.get(headerName) == null) {
this.setHeader(headerName, headerValue);
}
this.headerAccessor.setHeaderIfAbsent(headerName, headerValue);
return this;
}
/**
* Removes all headers provided via array of 'headerPatterns'. As the name suggests the array
* may contain simple matching patterns for header names. Supported pattern styles are:
* "xxx*", "*xxx", "*xxx*" and "xxx*yyy".
*
* @param headerPatterns
* Removes all headers provided via array of 'headerPatterns'. As the name suggests
* the array may contain simple matching patterns for header names. Supported pattern
* styles are: "xxx*", "*xxx", "*xxx*" and "xxx*yyy".
*/
public MessageBuilder<T> removeHeaders(String... headerPatterns) {
List<String> headersToRemove = new ArrayList<String>();
for (String pattern : headerPatterns) {
if (StringUtils.hasLength(pattern)){
if (pattern.contains("*")){
for (String headerName : this.headers.keySet()) {
if (PatternMatchUtils.simpleMatch(pattern, headerName)){
headersToRemove.add(headerName);
}
}
}
else {
headersToRemove.add(pattern);
}
}
}
for (String headerToRemove : headersToRemove) {
this.removeHeader(headerToRemove);
}
this.headerAccessor.removeHeaders(headerPatterns);
return this;
}
/**
* Remove the value for the given header name.
*/
public MessageBuilder<T> removeHeader(String headerName) {
if (StringUtils.hasLength(headerName) && !headerName.equals(MessageHeaders.ID)
&& !headerName.equals(MessageHeaders.TIMESTAMP)) {
Object removedValue = this.headers.remove(headerName);
if (removedValue != null) {
this.modified = true;
}
}
this.headerAccessor.removeHeader(headerName);
return this;
}
/**
* Copy the name-value pairs from the provided Map. This operation will overwrite any existing values. Use {
* {@link #copyHeadersIfAbsent(Map)} to avoid overwriting values. Note that the 'id' and 'timestamp' header values
* will never be overwritten.
*
* @see MessageHeaders#ID
* @see MessageHeaders#TIMESTAMP
* Copy the name-value pairs from the provided Map. This operation will overwrite any
* existing values. Use { {@link #copyHeadersIfAbsent(Map)} to avoid overwriting
* values. Note that the 'id' and 'timestamp' header values will never be overwritten.
*/
public MessageBuilder<T> copyHeaders(Map<String, ?> headersToCopy) {
Set<String> keys = headersToCopy.keySet();
for (String key : keys) {
if (!this.isReadOnly(key)) {
this.setHeader(key, headersToCopy.get(key));
}
}
this.headerAccessor.copyHeaders(headersToCopy);
return this;
}
/**
* Copy the name-value pairs from the provided Map. This operation will <em>not</em> overwrite any existing values.
* Copy the name-value pairs from the provided Map. This operation will <em>not</em>
* overwrite any existing values.
*/
public MessageBuilder<T> copyHeadersIfAbsent(Map<String, ?> headersToCopy) {
Set<String> keys = headersToCopy.keySet();
for (String key : keys) {
if (!this.isReadOnly(key)) {
this.setHeaderIfAbsent(key, headersToCopy.get(key));
}
}
return this;
}
public MessageBuilder<T> setExpirationDate(Long expirationDate) {
return this.setHeader(MessageHeaders.EXPIRATION_DATE, expirationDate);
}
public MessageBuilder<T> setExpirationDate(Date expirationDate) {
if (expirationDate != null) {
return this.setHeader(MessageHeaders.EXPIRATION_DATE, expirationDate.getTime());
}
else {
return this.setHeader(MessageHeaders.EXPIRATION_DATE, null);
}
}
public MessageBuilder<T> setCorrelationId(Object correlationId) {
return this.setHeader(MessageHeaders.CORRELATION_ID, correlationId);
}
public MessageBuilder<T> pushSequenceDetails(Object correlationId, int sequenceNumber, int sequenceSize) {
Object incomingCorrelationId = headers.get(MessageHeaders.CORRELATION_ID);
@SuppressWarnings("unchecked")
List<List<Object>> incomingSequenceDetails = (List<List<Object>>) headers.get(MessageHeaders.SEQUENCE_DETAILS);
if (incomingCorrelationId != null) {
if (incomingSequenceDetails == null) {
incomingSequenceDetails = new ArrayList<List<Object>>();
}
else {
incomingSequenceDetails = new ArrayList<List<Object>>(incomingSequenceDetails);
}
incomingSequenceDetails.add(Arrays.asList(incomingCorrelationId,
headers.get(MessageHeaders.SEQUENCE_NUMBER), headers.get(MessageHeaders.SEQUENCE_SIZE)));
incomingSequenceDetails = Collections.unmodifiableList(incomingSequenceDetails);
}
if (incomingSequenceDetails != null) {
setHeader(MessageHeaders.SEQUENCE_DETAILS, incomingSequenceDetails);
}
return setCorrelationId(correlationId).setSequenceNumber(sequenceNumber).setSequenceSize(sequenceSize);
}
public MessageBuilder<T> popSequenceDetails() {
String key = MessageHeaders.SEQUENCE_DETAILS;
if (!headers.containsKey(key)) {
return this;
}
@SuppressWarnings("unchecked")
List<List<Object>> incomingSequenceDetails = new ArrayList<List<Object>>((List<List<Object>>) headers.get(key));
List<Object> sequenceDetails = incomingSequenceDetails.remove(incomingSequenceDetails.size() - 1);
Assert.state(sequenceDetails.size() == 3, "Wrong sequence details (not created by MessageBuilder?): "
+ sequenceDetails);
setCorrelationId(sequenceDetails.get(0));
Integer sequenceNumber = (Integer) sequenceDetails.get(1);
Integer sequenceSize = (Integer) sequenceDetails.get(2);
if (sequenceNumber != null) {
setSequenceNumber(sequenceNumber);
}
if (sequenceSize != null) {
setSequenceSize(sequenceSize);
}
if (!incomingSequenceDetails.isEmpty()) {
headers.put(MessageHeaders.SEQUENCE_DETAILS, incomingSequenceDetails);
}
else {
headers.remove(MessageHeaders.SEQUENCE_DETAILS);
}
this.headerAccessor.copyHeadersIfAbsent(headersToCopy);
return this;
}
public MessageBuilder<T> setReplyChannel(MessageChannel replyChannel) {
return this.setHeader(MessageHeaders.REPLY_CHANNEL, replyChannel);
this.headerAccessor.setReplyChannel(replyChannel);
return this;
}
public MessageBuilder<T> setReplyChannelName(String replyChannelName) {
return this.setHeader(MessageHeaders.REPLY_CHANNEL, replyChannelName);
this.headerAccessor.setReplyChannelName(replyChannelName);
return this;
}
public MessageBuilder<T> setErrorChannel(MessageChannel errorChannel) {
return this.setHeader(MessageHeaders.ERROR_CHANNEL, errorChannel);
this.headerAccessor.setErrorChannel(errorChannel);
return this;
}
public MessageBuilder<T> setErrorChannelName(String errorChannelName) {
return this.setHeader(MessageHeaders.ERROR_CHANNEL, errorChannelName);
}
public MessageBuilder<T> setSequenceNumber(Integer sequenceNumber) {
return this.setHeader(MessageHeaders.SEQUENCE_NUMBER, sequenceNumber);
}
public MessageBuilder<T> setSequenceSize(Integer sequenceSize) {
return this.setHeader(MessageHeaders.SEQUENCE_SIZE, sequenceSize);
}
public MessageBuilder<T> setPriority(Integer priority) {
return this.setHeader(MessageHeaders.PRIORITY, priority);
this.headerAccessor.setErrorChannelName(errorChannelName);
return this;
}
@SuppressWarnings("unchecked")
public Message<T> build() {
if (!this.modified && this.originalMessage != null) {
if ((this.originalMessage != null) && !this.headerAccessor.isModified()) {
return this.originalMessage;
}
if (this.payload instanceof Throwable) {
return (Message<T>) new ErrorMessage((Throwable) this.payload, this.headers);
}
return new GenericMessage<T>(this.payload, this.headers);
}
private boolean isReadOnly(String headerName) {
return MessageHeaders.ID.equals(headerName) || MessageHeaders.TIMESTAMP.equals(headerName);
}
private void verifyType(String headerName, Object headerValue) {
if (headerName != null && headerValue != null) {
if (MessageHeaders.ID.equals(headerName)) {
Assert.isTrue(headerValue instanceof UUID, "The '" + headerName + "' header value must be a UUID.");
}
else if (MessageHeaders.TIMESTAMP.equals(headerName)) {
Assert.isTrue(headerValue instanceof Long, "The '" + headerName + "' header value must be a Long.");
}
else if (MessageHeaders.EXPIRATION_DATE.equals(headerName)) {
Assert.isTrue(headerValue instanceof Date || headerValue instanceof Long, "The '" + headerName
+ "' header value must be a Date or Long.");
}
else if (MessageHeaders.ERROR_CHANNEL.equals(headerName)
|| MessageHeaders.REPLY_CHANNEL.endsWith(headerName)) {
Assert.isTrue(headerValue instanceof MessageChannel || headerValue instanceof String, "The '"
+ headerName + "' header value must be a MessageChannel or String.");
}
else if (MessageHeaders.SEQUENCE_NUMBER.equals(headerName)
|| MessageHeaders.SEQUENCE_SIZE.equals(headerName)) {
Assert.isTrue(Integer.class.isAssignableFrom(headerValue.getClass()), "The '" + headerName
+ "' header value must be an Integer.");
}
else if (MessageHeaders.PRIORITY.equals(headerName)) {
Assert.isTrue(Integer.class.isAssignableFrom(headerValue.getClass()), "The '" + headerName
+ "' header value must be an Integer.");
}
return (Message<T>) new ErrorMessage((Throwable) this.payload, this.headerAccessor.toMap());
}
return new GenericMessage<T>(this.payload, this.headerAccessor.toMap());
}
}

219
spring-context/src/main/java/org/springframework/messaging/support/MessageHeaderAccesssor.java

@ -0,0 +1,219 @@ @@ -0,0 +1,219 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.support;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.PatternMatchUtils;
import org.springframework.util.StringUtils;
/**
* A base class for read/write access to {@link MessageHeaders}. Supports creation of new
* headers or modification of existing message headers.
* <p>
* Sub-classes can provide additinoal typed getters and setters for convenient access to
* specific headers. Getters and setters should delegate to {@link #getHeader(String)} or
* {@link #setHeader(String, Object)} respectively. At the end {@link #toMap()} can be
* used to obtain the resulting headers.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class MessageHeaderAccesssor {
protected Log logger = LogFactory.getLog(getClass());
// wrapped read-only message headers
private final MessageHeaders originalHeaders;
// header updates
private final Map<String, Object> headers = new HashMap<String, Object>(4);
/**
* A constructor for creating new message headers.
*/
public MessageHeaderAccesssor() {
this.originalHeaders = null;
}
/**
* A constructor for accessing and modifying existing message headers.
*/
public MessageHeaderAccesssor(Message<?> message) {
this.originalHeaders = (message != null) ? message.getHeaders() : null;
}
/**
* Return a header map including original, wrapped headers (if any) plus additional
* header updates made through accessor methods.
*/
public Map<String, Object> toMap() {
Map<String, Object> result = new HashMap<String, Object>();
if (this.originalHeaders != null) {
result.putAll(this.originalHeaders);
}
for (String key : this.headers.keySet()) {
Object value = this.headers.get(key);
if (value == null) {
result.remove(key);
}
else {
result.put(key, value);
}
}
return result;
}
public boolean isModified() {
return (!this.headers.isEmpty());
}
public Object getHeader(String headerName) {
if (this.headers.containsKey(headerName)) {
return this.headers.get(headerName);
}
else if (this.originalHeaders != null) {
return this.originalHeaders.get(headerName);
}
return null;
}
/**
* Set the value for the given header name. If the provided value is {@code null} the
* header will be removed.
*/
public void setHeader(String name, Object value) {
Assert.isTrue(!isReadOnly(name), "The '" + name + "' header is read-only.");
if (!ObjectUtils.nullSafeEquals(value, getHeader(name))) {
this.headers.put(name, value);
}
}
protected boolean isReadOnly(String headerName) {
return MessageHeaders.ID.equals(headerName) || MessageHeaders.TIMESTAMP.equals(headerName);
}
/**
* Set the value for the given header name only if the header name is not already associated with a value.
*/
public void setHeaderIfAbsent(String name, Object value) {
if (getHeader(name) == null) {
setHeader(name, value);
}
}
/**
* Removes all headers provided via array of 'headerPatterns'. As the name suggests
* the array may contain simple matching patterns for header names. Supported pattern
* styles are: "xxx*", "*xxx", "*xxx*" and "xxx*yyy".
*/
public void removeHeaders(String... headerPatterns) {
List<String> headersToRemove = new ArrayList<String>();
for (String pattern : headerPatterns) {
if (StringUtils.hasLength(pattern)){
if (pattern.contains("*")){
for (String headerName : this.headers.keySet()) {
if (PatternMatchUtils.simpleMatch(pattern, headerName)){
headersToRemove.add(headerName);
}
}
}
else {
headersToRemove.add(pattern);
}
}
}
for (String headerToRemove : headersToRemove) {
removeHeader(headerToRemove);
}
}
/**
* Remove the value for the given header name.
*/
public void removeHeader(String headerName) {
if (StringUtils.hasLength(headerName) && !isReadOnly(headerName)) {
setHeader(headerName, null);
}
}
/**
* Copy the name-value pairs from the provided Map. This operation will overwrite any
* existing values. Use { {@link #copyHeadersIfAbsent(Map)} to avoid overwriting
* values.
*/
public void copyHeaders(Map<String, ?> headersToCopy) {
Set<String> keys = headersToCopy.keySet();
for (String key : keys) {
if (!isReadOnly(key)) {
setHeader(key, headersToCopy.get(key));
}
}
}
/**
* Copy the name-value pairs from the provided Map. This operation will <em>not</em>
* overwrite any existing values.
*/
public void copyHeadersIfAbsent(Map<String, ?> headersToCopy) {
Set<String> keys = headersToCopy.keySet();
for (String key : keys) {
if (!this.isReadOnly(key)) {
setHeaderIfAbsent(key, headersToCopy.get(key));
}
}
}
public void setReplyChannel(MessageChannel replyChannel) {
setHeader(MessageHeaders.REPLY_CHANNEL, replyChannel);
}
public void setReplyChannelName(String replyChannelName) {
setHeader(MessageHeaders.REPLY_CHANNEL, replyChannelName);
}
public void setErrorChannel(MessageChannel errorChannel) {
setHeader(MessageHeaders.ERROR_CHANNEL, errorChannel);
}
public void setErrorChannelName(String errorChannelName) {
setHeader(MessageHeaders.ERROR_CHANNEL, errorChannelName);
}
@Override
public String toString() {
return getClass().getSimpleName() + " [originalHeaders=" + this.originalHeaders
+ ", updated headers=" + this.headers + "]";
}
}

140
spring-context/src/main/java/org/springframework/messaging/support/NativeMessageHeaderAccessor.java

@ -0,0 +1,140 @@ @@ -0,0 +1,140 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.support;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.messaging.Message;
import org.springframework.util.CollectionUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.ObjectUtils;
/**
* An extension of {@link MessageHeaderAccesssor} that also provides read/write access to
* message headers from an external message source. Native message headers are kept
* in a {@link MultiValueMap} under the key {@link #NATIVE_HEADERS}.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class NativeMessageHeaderAccessor extends MessageHeaderAccesssor {
public static final String NATIVE_HEADERS = "nativeHeaders";
// wrapped native headers
private final Map<String, List<String>> originalNativeHeaders;
// native header updates
private final MultiValueMap<String, String> nativeHeaders = new LinkedMultiValueMap<String, String>(4);
/**
* A constructor for creating new headers, accepting an optional native header map.
*/
public NativeMessageHeaderAccessor(Map<String, List<String>> nativeHeaders) {
super();
this.originalNativeHeaders = nativeHeaders;
}
/**
* A constructor for accessing and modifying existing message headers.
*/
public NativeMessageHeaderAccessor(Message<?> message) {
super(message);
this.originalNativeHeaders = initNativeHeaders(message);
}
private static Map<String, List<String>> initNativeHeaders(Message<?> message) {
if (message != null) {
@SuppressWarnings("unchecked")
Map<String, List<String>> headers = (Map<String, List<String>>) message.getHeaders().get(NATIVE_HEADERS);
if (headers != null) {
return headers;
}
}
return null;
}
@Override
public Map<String, Object> toMap() {
Map<String, Object> result = super.toMap();
result.put(NATIVE_HEADERS, toNativeHeaderMap());
return result;
}
@Override
public boolean isModified() {
return (super.isModified() || (!this.nativeHeaders.isEmpty()));
}
/**
* Return a map with native headers including original, wrapped headers (if any) plus
* additional header updates made through accessor methods.
*/
public Map<String, List<String>> toNativeHeaderMap() {
Map<String, List<String>> result = new HashMap<String, List<String>>();
if (this.originalNativeHeaders != null) {
result.putAll(this.originalNativeHeaders);
}
for (String key : this.nativeHeaders.keySet()) {
List<String> value = this.nativeHeaders.get(key);
if (value == null) {
result.remove(key);
}
else {
result.put(key, value);
}
}
return result;
}
protected List<String> getNativeHeader(String headerName) {
if (this.nativeHeaders.containsKey(headerName)) {
return this.nativeHeaders.get(headerName);
}
else if (this.originalNativeHeaders != null) {
return this.originalNativeHeaders.get(headerName);
}
return null;
}
protected String getFirstNativeHeader(String headerName) {
List<String> values = getNativeHeader(headerName);
return CollectionUtils.isEmpty(values) ? null : values.get(0);
}
/**
* Set the value for the given header name. If the provided value is {@code null} the
* header will be removed.
*/
protected void putNativeHeader(String name, List<String> value) {
if (!ObjectUtils.nullSafeEquals(value, getHeader(name))) {
this.nativeHeaders.put(name, value);
}
}
protected void setNativeHeader(String name, String value) {
this.nativeHeaders.set(name, value);
}
}

6
spring-websocket/src/main/java/org/springframework/web/messaging/service/AbstractPubSubMessageHandler.java

@ -30,7 +30,7 @@ import org.springframework.util.AntPathMatcher; @@ -30,7 +30,7 @@ import org.springframework.util.AntPathMatcher;
import org.springframework.util.CollectionUtils;
import org.springframework.util.PathMatcher;
import org.springframework.web.messaging.MessageType;
import org.springframework.web.messaging.support.PubSubHeaderAccesssor;
import org.springframework.web.messaging.support.WebMessageHeaderAccesssor;
/**
@ -80,7 +80,7 @@ public abstract class AbstractPubSubMessageHandler implements MessageHandler { @@ -80,7 +80,7 @@ public abstract class AbstractPubSubMessageHandler implements MessageHandler {
protected boolean isDestinationAllowed(Message<?> message) {
PubSubHeaderAccesssor headers = PubSubHeaderAccesssor.wrap(message);
WebMessageHeaderAccesssor headers = WebMessageHeaderAccesssor.wrap(message);
String destination = headers.getDestination();
if (destination == null) {
@ -116,7 +116,7 @@ public abstract class AbstractPubSubMessageHandler implements MessageHandler { @@ -116,7 +116,7 @@ public abstract class AbstractPubSubMessageHandler implements MessageHandler {
@Override
public final void handleMessage(Message<?> message) throws MessagingException {
PubSubHeaderAccesssor headers = PubSubHeaderAccesssor.wrap(message);
WebMessageHeaderAccesssor headers = WebMessageHeaderAccesssor.wrap(message);
MessageType messageType = headers.getMessageType();
if (!canHandle(message, messageType)) {

12
spring-websocket/src/main/java/org/springframework/web/messaging/service/ReactorPubSubMessageHandler.java

@ -31,7 +31,7 @@ import org.springframework.web.messaging.MessageType; @@ -31,7 +31,7 @@ import org.springframework.web.messaging.MessageType;
import org.springframework.web.messaging.PubSubChannelRegistry;
import org.springframework.web.messaging.converter.CompositeMessageConverter;
import org.springframework.web.messaging.converter.MessageConverter;
import org.springframework.web.messaging.support.PubSubHeaderAccesssor;
import org.springframework.web.messaging.support.WebMessageHeaderAccesssor;
import reactor.core.Reactor;
import reactor.fn.Consumer;
@ -78,7 +78,7 @@ public class ReactorPubSubMessageHandler extends AbstractPubSubMessageHandler { @@ -78,7 +78,7 @@ public class ReactorPubSubMessageHandler extends AbstractPubSubMessageHandler {
logger.debug("Subscribe " + message);
}
PubSubHeaderAccesssor headers = PubSubHeaderAccesssor.wrap(message);
WebMessageHeaderAccesssor headers = WebMessageHeaderAccesssor.wrap(message);
String subscriptionId = headers.getSubscriptionId();
BroadcastingConsumer consumer = new BroadcastingConsumer(subscriptionId);
@ -107,7 +107,7 @@ public class ReactorPubSubMessageHandler extends AbstractPubSubMessageHandler { @@ -107,7 +107,7 @@ public class ReactorPubSubMessageHandler extends AbstractPubSubMessageHandler {
try {
// Convert to byte[] payload before the fan-out
PubSubHeaderAccesssor headers = PubSubHeaderAccesssor.wrap(message);
WebMessageHeaderAccesssor headers = WebMessageHeaderAccesssor.wrap(message);
byte[] payload = payloadConverter.convertToPayload(message.getPayload(), headers.getContentType());
Message<?> m = MessageBuilder.withPayload(payload).copyHeaders(message.getHeaders()).build();
@ -120,7 +120,7 @@ public class ReactorPubSubMessageHandler extends AbstractPubSubMessageHandler { @@ -120,7 +120,7 @@ public class ReactorPubSubMessageHandler extends AbstractPubSubMessageHandler {
@Override
public void handleDisconnect(Message<?> message) {
PubSubHeaderAccesssor headers = PubSubHeaderAccesssor.wrap(message);
WebMessageHeaderAccesssor headers = WebMessageHeaderAccesssor.wrap(message);
removeSubscriptions(headers.getSessionId());
}
@ -149,11 +149,11 @@ public class ReactorPubSubMessageHandler extends AbstractPubSubMessageHandler { @@ -149,11 +149,11 @@ public class ReactorPubSubMessageHandler extends AbstractPubSubMessageHandler {
Message<?> sentMessage = event.getData();
PubSubHeaderAccesssor headers = PubSubHeaderAccesssor.wrap(sentMessage);
WebMessageHeaderAccesssor headers = WebMessageHeaderAccesssor.wrap(sentMessage);
headers.setSubscriptionId(this.subscriptionId);
Message<?> clientMessage = MessageBuilder.withPayload(
sentMessage.getPayload()).copyHeaders(headers.toHeaders()).build();
sentMessage.getPayload()).copyHeaders(headers.toMap()).build();
clientChannel.send(clientMessage);
}

4
spring-websocket/src/main/java/org/springframework/web/messaging/service/method/AnnotationPubSubMessageHandler.java

@ -45,7 +45,7 @@ import org.springframework.web.messaging.annotation.UnsubscribeEvent; @@ -45,7 +45,7 @@ import org.springframework.web.messaging.annotation.UnsubscribeEvent;
import org.springframework.web.messaging.converter.MessageConverter;
import org.springframework.web.messaging.service.AbstractPubSubMessageHandler;
import org.springframework.web.messaging.support.MessageHolder;
import org.springframework.web.messaging.support.PubSubHeaderAccesssor;
import org.springframework.web.messaging.support.WebMessageHeaderAccesssor;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.method.HandlerMethodSelector;
@ -185,7 +185,7 @@ public class AnnotationPubSubMessageHandler extends AbstractPubSubMessageHandler @@ -185,7 +185,7 @@ public class AnnotationPubSubMessageHandler extends AbstractPubSubMessageHandler
private void handleMessageInternal(final Message<?> message, Map<MappingInfo, HandlerMethod> handlerMethods) {
PubSubHeaderAccesssor headers = PubSubHeaderAccesssor.wrap(message);
WebMessageHeaderAccesssor headers = WebMessageHeaderAccesssor.wrap(message);
String destination = headers.getDestination();
HandlerMethod match = getHandlerMethod(destination, handlerMethods);

4
spring-websocket/src/main/java/org/springframework/web/messaging/service/method/MessageBodyArgumentResolver.java

@ -25,7 +25,7 @@ import org.springframework.web.messaging.annotation.MessageBody; @@ -25,7 +25,7 @@ import org.springframework.web.messaging.annotation.MessageBody;
import org.springframework.web.messaging.converter.CompositeMessageConverter;
import org.springframework.web.messaging.converter.MessageConversionException;
import org.springframework.web.messaging.converter.MessageConverter;
import org.springframework.web.messaging.support.PubSubHeaderAccesssor;
import org.springframework.web.messaging.support.WebMessageHeaderAccesssor;
/**
@ -52,7 +52,7 @@ public class MessageBodyArgumentResolver implements ArgumentResolver { @@ -52,7 +52,7 @@ public class MessageBodyArgumentResolver implements ArgumentResolver {
Object arg = null;
MessageBody annot = parameter.getParameterAnnotation(MessageBody.class);
MediaType contentType = (MediaType) message.getHeaders().get(PubSubHeaderAccesssor.CONTENT_TYPE);
MediaType contentType = (MediaType) message.getHeaders().get(WebMessageHeaderAccesssor.CONTENT_TYPE);
if (annot == null || annot.required()) {
Class<?> sourceType = message.getPayload().getClass();

8
spring-websocket/src/main/java/org/springframework/web/messaging/service/method/MessageReturnValueHandler.java

@ -21,7 +21,7 @@ import org.springframework.messaging.Message; @@ -21,7 +21,7 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.web.messaging.support.PubSubHeaderAccesssor;
import org.springframework.web.messaging.support.WebMessageHeaderAccesssor;
/**
@ -66,10 +66,10 @@ public class MessageReturnValueHandler implements ReturnValueHandler { @@ -66,10 +66,10 @@ public class MessageReturnValueHandler implements ReturnValueHandler {
return;
}
PubSubHeaderAccesssor headers = PubSubHeaderAccesssor.wrap(message);
WebMessageHeaderAccesssor headers = WebMessageHeaderAccesssor.wrap(message);
Assert.notNull(headers.getSubscriptionId(), "No subscription id: " + message);
PubSubHeaderAccesssor returnHeaders = PubSubHeaderAccesssor.wrap(returnMessage);
WebMessageHeaderAccesssor returnHeaders = WebMessageHeaderAccesssor.wrap(returnMessage);
returnHeaders.setSessionId(headers.getSessionId());
returnHeaders.setSubscriptionId(headers.getSubscriptionId());
@ -78,7 +78,7 @@ public class MessageReturnValueHandler implements ReturnValueHandler { @@ -78,7 +78,7 @@ public class MessageReturnValueHandler implements ReturnValueHandler {
}
returnMessage = MessageBuilder.withPayload(
returnMessage.getPayload()).copyHeaders(returnHeaders.toHeaders()).build();
returnMessage.getPayload()).copyHeaders(returnHeaders.toMap()).build();
this.clientChannel.send(returnMessage);
}

147
spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompHeaderAccessor.java

@ -16,8 +16,8 @@ @@ -16,8 +16,8 @@
package org.springframework.web.messaging.stomp.support;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -26,12 +26,9 @@ import java.util.concurrent.atomic.AtomicLong; @@ -26,12 +26,9 @@ import java.util.concurrent.atomic.AtomicLong;
import org.springframework.http.MediaType;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.StringUtils;
import org.springframework.web.messaging.stomp.StompCommand;
import org.springframework.web.messaging.support.PubSubHeaderAccesssor;
import org.springframework.web.messaging.support.WebMessageHeaderAccesssor;
/**
@ -39,13 +36,13 @@ import org.springframework.web.messaging.support.PubSubHeaderAccesssor; @@ -39,13 +36,13 @@ import org.springframework.web.messaging.support.PubSubHeaderAccesssor;
* STOMP-specific headers of an existing message.
* <p>
* Use one of the static factory method in this class, then call getters and setters, and
* at the end if necessary call {@link #toHeaders()} to obtain the updated headers
* or call {@link #toStompMessageHeaders()} to obtain only the STOMP-specific headers.
* at the end if necessary call {@link #toMap()} to obtain the updated headers
* or call {@link #toNativeHeaderMap()} to obtain only the STOMP-specific headers.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class StompHeaderAccessor extends PubSubHeaderAccesssor {
public class StompHeaderAccessor extends WebMessageHeaderAccesssor {
public static final String STOMP_ID = "id";
@ -80,55 +77,40 @@ public class StompHeaderAccessor extends PubSubHeaderAccesssor { @@ -80,55 +77,40 @@ public class StompHeaderAccessor extends PubSubHeaderAccesssor {
public static final String HEARTBEAT = "heart-beat";
private static final String STOMP_HEADERS = "stompHeaders";
private static final AtomicLong messageIdCounter = new AtomicLong();
private final Map<String, String> headers;
/**
* A constructor for creating new STOMP message headers.
* This constructor is private. See factory methods in this sub-classes.
*/
private StompHeaderAccessor(StompCommand command, Map<String, List<String>> externalSourceHeaders) {
super(command.getMessageType(), command, externalSourceHeaders);
this.headers = new HashMap<String, String>(4);
updateMessageHeaders();
initWebMessageHeaders();
}
private void updateMessageHeaders() {
if (getExternalSourceHeaders().isEmpty()) {
return;
}
String destination = getHeaderValue(DESTINATION);
private void initWebMessageHeaders() {
String destination = getFirstNativeHeader(DESTINATION);
if (destination != null) {
super.setDestination(destination);
}
String contentType = getHeaderValue(CONTENT_TYPE);
String contentType = getFirstNativeHeader(CONTENT_TYPE);
if (contentType != null) {
super.setContentType(MediaType.parseMediaType(contentType));
}
if (StompCommand.SUBSCRIBE.equals(getStompCommand())) {
if (getHeaderValue(STOMP_ID) != null) {
super.setSubscriptionId(getHeaderValue(STOMP_ID));
if (getFirstNativeHeader(STOMP_ID) != null) {
super.setSubscriptionId(getFirstNativeHeader(STOMP_ID));
}
}
}
/**
* A constructor for accessing and modifying existing message headers. This
* constructor is protected. See factory methods in this class.
* A constructor for accessing and modifying existing message headers.
*/
@SuppressWarnings("unchecked")
private StompHeaderAccessor(Message<?> message) {
super(message);
this.headers = (message.getHeaders() .get(STOMP_HEADERS) != null) ?
(Map<String, String>) message.getHeaders().get(STOMP_HEADERS) : new HashMap<String, String>(4);
}
/**
* Create {@link StompHeaderAccessor} for a new {@link Message}.
*/
@ -152,53 +134,35 @@ public class StompHeaderAccessor extends PubSubHeaderAccesssor { @@ -152,53 +134,35 @@ public class StompHeaderAccessor extends PubSubHeaderAccesssor {
/**
* Return the original, wrapped headers (i.e. unmodified) or a new Map including any
* updates made via setters.
* Return STOMP headers including original, wrapped STOMP headers (if any) plus
* additional header updates made through accessor methods.
*/
@Override
public Map<String, Object> toHeaders() {
Map<String, Object> result = super.toHeaders();
if (isModified()) {
result.put(STOMP_HEADERS, this.headers);
}
return result;
}
public Map<String, List<String>> toNativeHeaderMap() {
@Override
public boolean isModified() {
return (super.isModified() || !this.headers.isEmpty());
}
/**
* Return STOMP headers and any custom headers that may have been sent by
* a remote endpoint, if this message originated from outside.
*/
public Map<String, List<String>> toStompMessageHeaders() {
MultiValueMap<String, String> result = new LinkedMultiValueMap<String, String>();
result.putAll(getExternalSourceHeaders());
result.setAll(this.headers);
Map<String, List<String>> result = super.toNativeHeaderMap();
String destination = super.getDestination();
if (destination != null) {
result.set(DESTINATION, destination);
result.put(DESTINATION, Arrays.asList(destination));
}
MediaType contentType = getContentType();
if (contentType != null) {
result.set(CONTENT_TYPE, contentType.toString());
result.put(CONTENT_TYPE, Arrays.asList(contentType.toString()));
}
if (StompCommand.MESSAGE.equals(getStompCommand())) {
String subscriptionId = getSubscriptionId();
if (subscriptionId != null) {
result.set(SUBSCRIPTION, subscriptionId);
result.put(SUBSCRIPTION, Arrays.asList(subscriptionId));
}
else {
logger.warn("STOMP MESSAGE frame should have a subscription: " + this.toString());
}
if ((getMessageId() == null)) {
result.set(MESSAGE_ID, getSessionId() + "-" + messageIdCounter.getAndIncrement());
String messageId = getSessionId() + "-" + messageIdCounter.getAndIncrement();
result.put(MESSAGE_ID, Arrays.asList(messageId));
}
}
@ -216,42 +180,37 @@ public class StompHeaderAccessor extends PubSubHeaderAccesssor { @@ -216,42 +180,37 @@ public class StompHeaderAccessor extends PubSubHeaderAccesssor {
}
public Set<String> getAcceptVersion() {
String rawValue = getHeaderValue(ACCEPT_VERSION);
String rawValue = getFirstNativeHeader(ACCEPT_VERSION);
return (rawValue != null) ? StringUtils.commaDelimitedListToSet(rawValue) : Collections.<String>emptySet();
}
private String getHeaderValue(String headerName) {
List<String> values = getExternalSourceHeaders().get(headerName);
return !CollectionUtils.isEmpty(values) ? values.get(0) : this.headers.get(headerName);
}
public void setAcceptVersion(String acceptVersion) {
this.headers.put(ACCEPT_VERSION, acceptVersion);
setNativeHeader(ACCEPT_VERSION, acceptVersion);
}
public void setHost(String host) {
this.headers.put(HOST, host);
setNativeHeader(HOST, host);
}
public String getHost() {
return getHeaderValue(HOST);
return getFirstNativeHeader(HOST);
}
@Override
public void setDestination(String destination) {
super.setDestination(destination);
this.headers.put(DESTINATION, destination);
setNativeHeader(DESTINATION, destination);
}
@Override
public void setDestinations(List<String> destinations) {
Assert.isTrue((destinations != null) && (destinations.size() == 1), "STOMP allows one destination per message");
super.setDestinations(destinations);
this.headers.put(DESTINATION, destinations.get(0));
setNativeHeader(DESTINATION, destinations.get(0));
}
public long[] getHeartbeat() {
String rawValue = getHeaderValue(HEARTBEAT);
String rawValue = getFirstNativeHeader(HEARTBEAT);
if (!StringUtils.hasText(rawValue)) {
return null;
}
@ -263,99 +222,91 @@ public class StompHeaderAccessor extends PubSubHeaderAccesssor { @@ -263,99 +222,91 @@ public class StompHeaderAccessor extends PubSubHeaderAccesssor {
public void setContentType(MediaType mediaType) {
if (mediaType != null) {
super.setContentType(mediaType);
this.headers.put(CONTENT_TYPE, mediaType.toString());
setNativeHeader(CONTENT_TYPE, mediaType.toString());
}
}
public MediaType getContentType() {
String value = getHeaderValue(CONTENT_TYPE);
String value = getFirstNativeHeader(CONTENT_TYPE);
return (value != null) ? MediaType.parseMediaType(value) : null;
}
public Integer getContentLength() {
String contentLength = getHeaderValue(CONTENT_LENGTH);
String contentLength = getFirstNativeHeader(CONTENT_LENGTH);
return StringUtils.hasText(contentLength) ? new Integer(contentLength) : null;
}
public void setContentLength(int contentLength) {
this.headers.put(CONTENT_LENGTH, String.valueOf(contentLength));
setNativeHeader(CONTENT_LENGTH, String.valueOf(contentLength));
}
public void setHeartbeat(long cx, long cy) {
this.headers.put(HEARTBEAT, StringUtils.arrayToCommaDelimitedString(new Object[] {cx, cy}));
setNativeHeader(HEARTBEAT, StringUtils.arrayToCommaDelimitedString(new Object[] {cx, cy}));
}
public void setAck(String ack) {
this.headers.put(ACK, ack);
setNativeHeader(ACK, ack);
}
public String getAck() {
return getHeaderValue(ACK);
return getFirstNativeHeader(ACK);
}
public void setNack(String nack) {
this.headers.put(NACK, nack);
setNativeHeader(NACK, nack);
}
public String getNack() {
return getHeaderValue(NACK);
return getFirstNativeHeader(NACK);
}
public void setLogin(String login) {
this.headers.put(LOGIN, login);
setNativeHeader(LOGIN, login);
}
public String getLogin() {
return getHeaderValue(LOGIN);
return getFirstNativeHeader(LOGIN);
}
public void setPasscode(String passcode) {
this.headers.put(PASSCODE, passcode);
setNativeHeader(PASSCODE, passcode);
}
public String getPasscode() {
return getHeaderValue(PASSCODE);
return getFirstNativeHeader(PASSCODE);
}
public void setReceiptId(String receiptId) {
this.headers.put(RECEIPT_ID, receiptId);
setNativeHeader(RECEIPT_ID, receiptId);
}
public String getReceiptId() {
return getHeaderValue(RECEIPT_ID);
return getFirstNativeHeader(RECEIPT_ID);
}
public String getMessage() {
return getHeaderValue(MESSAGE);
return getFirstNativeHeader(MESSAGE);
}
public void setMessage(String content) {
this.headers.put(MESSAGE, content);
setNativeHeader(MESSAGE, content);
}
public String getMessageId() {
return getHeaderValue(MESSAGE_ID);
return getFirstNativeHeader(MESSAGE_ID);
}
public void setMessageId(String id) {
this.headers.put(MESSAGE_ID, id);
setNativeHeader(MESSAGE_ID, id);
}
public String getVersion() {
return getHeaderValue(VERSION);
return getFirstNativeHeader(VERSION);
}
public void setVersion(String version) {
this.headers.put(VERSION, version);
}
@Override
public String toString() {
return "StompHeaders [" + "messageType=" + getMessageType() + ", protocolMessageType="
+ getProtocolMessageType() + ", destination=" + getDestination()
+ ", subscriptionId=" + getSubscriptionId() + ", sessionId=" + getSessionId()
+ ", externalSourceHeaders=" + getExternalSourceHeaders() + ", headers=" + this.headers + "]";
setNativeHeader(VERSION, version);
}
}

4
spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompMessageConverter.java

@ -99,7 +99,7 @@ public class StompMessageConverter { @@ -99,7 +99,7 @@ public class StompMessageConverter {
byte[] payload = new byte[totalLength - payloadIndex];
System.arraycopy(byteContent, payloadIndex, payload, 0, totalLength - payloadIndex);
return MessageBuilder.withPayload(payload).copyHeaders(stompHeaders.toHeaders()).build();
return MessageBuilder.withPayload(payload).copyHeaders(stompHeaders.toMap()).build();
}
private int findIndexOfPayload(byte[] bytes) {
@ -146,7 +146,7 @@ public class StompMessageConverter { @@ -146,7 +146,7 @@ public class StompMessageConverter {
try {
out.write(stompHeaders.getStompCommand().toString().getBytes("UTF-8"));
out.write(LF);
for (Entry<String, List<String>> entry : stompHeaders.toStompMessageHeaders().entrySet()) {
for (Entry<String, List<String>> entry : stompHeaders.toNativeHeaderMap().entrySet()) {
String key = entry.getKey();
key = replaceAllOutbound(key);
for (String value : entry.getValue()) {

12
spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompRelayPubSubMessageHandler.java

@ -39,7 +39,7 @@ import org.springframework.web.messaging.converter.CompositeMessageConverter; @@ -39,7 +39,7 @@ import org.springframework.web.messaging.converter.CompositeMessageConverter;
import org.springframework.web.messaging.converter.MessageConverter;
import org.springframework.web.messaging.service.AbstractPubSubMessageHandler;
import org.springframework.web.messaging.stomp.StompCommand;
import org.springframework.web.messaging.support.PubSubHeaderAccesssor;
import org.springframework.web.messaging.support.WebMessageHeaderAccesssor;
import reactor.core.Environment;
import reactor.core.Promise;
@ -120,7 +120,7 @@ public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler @@ -120,7 +120,7 @@ public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler
this.tcpClient = new TcpClient.Spec<String, String>(NettyTcpClient.class)
.using(new Environment())
.codec(new DelimitedCodec<String, String>((byte) 0, true, StandardCodecs.STRING_CODEC))
.connect("127.0.0.1", 61613)
.connect("127.0.0.1", 61616)
.get();
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
@ -129,7 +129,7 @@ public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler @@ -129,7 +129,7 @@ public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler
headers.setPasscode("guest");
headers.setHeartbeat(0, 0);
Message<?> message = MessageBuilder.withPayload(
new byte[0]).copyHeaders(headers.toStompMessageHeaders()).build();
new byte[0]).copyHeaders(headers.toNativeHeaderMap()).build();
RelaySession session = new RelaySession(message, headers) {
@Override
@ -206,7 +206,7 @@ public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler @@ -206,7 +206,7 @@ public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler
@Override
public void handleOther(Message<?> message) {
StompCommand command = (StompCommand) message.getHeaders().get(PubSubHeaderAccesssor.PROTOCOL_MESSAGE_TYPE);
StompCommand command = (StompCommand) message.getHeaders().get(WebMessageHeaderAccesssor.PROTOCOL_MESSAGE_TYPE);
Assert.notNull(command, "Expected STOMP command: " + message.getHeaders());
forwardMessage(message, command);
}
@ -326,7 +326,7 @@ public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler @@ -326,7 +326,7 @@ public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.ERROR);
headers.setSessionId(sessionId);
headers.setMessage(errorText);
Message<?> errorMessage = MessageBuilder.withPayload(new byte[0]).copyHeaders(headers.toHeaders()).build();
Message<?> errorMessage = MessageBuilder.withPayload(new byte[0]).copyHeaders(headers.toMap()).build();
sendMessageToClient(errorMessage);
}
@ -372,7 +372,7 @@ public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler @@ -372,7 +372,7 @@ public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler
MediaType contentType = headers.getContentType();
byte[] payload = payloadConverter.convertToPayload(message.getPayload(), contentType);
Message<?> byteMessage = MessageBuilder.withPayload(payload).copyHeaders(headers.toHeaders()).build();
Message<?> byteMessage = MessageBuilder.withPayload(payload).copyHeaders(headers.toMap()).build();
if (logger.isTraceEnabled()) {
logger.trace("Forwarding message " + byteMessage);

12
spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompWebSocketHandler.java

@ -38,7 +38,7 @@ import org.springframework.web.messaging.converter.CompositeMessageConverter; @@ -38,7 +38,7 @@ import org.springframework.web.messaging.converter.CompositeMessageConverter;
import org.springframework.web.messaging.converter.MessageConverter;
import org.springframework.web.messaging.stomp.StompCommand;
import org.springframework.web.messaging.stomp.StompConversionException;
import org.springframework.web.messaging.support.PubSubHeaderAccesssor;
import org.springframework.web.messaging.support.WebMessageHeaderAccesssor;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
@ -159,7 +159,7 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement @@ -159,7 +159,7 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement
// TODO: security
Message<?> connectedMessage = MessageBuilder.withPayload(EMPTY_PAYLOAD).copyHeaders(
connectedHeaders.toHeaders()).build();
connectedHeaders.toMap()).build();
byte[] bytes = getStompMessageConverter().fromMessage(connectedMessage);
session.sendMessage(new TextMessage(new String(bytes, Charset.forName("UTF-8"))));
}
@ -195,7 +195,7 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement @@ -195,7 +195,7 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.ERROR);
headers.setMessage(error.getMessage());
Message<?> message = MessageBuilder.withPayload(EMPTY_PAYLOAD).copyHeaders(headers.toHeaders()).build();
Message<?> message = MessageBuilder.withPayload(EMPTY_PAYLOAD).copyHeaders(headers.toMap()).build();
byte[] bytes = this.stompMessageConverter.fromMessage(message);
try {
@ -209,9 +209,9 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement @@ -209,9 +209,9 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
this.sessionInfos.remove(session.getId());
PubSubHeaderAccesssor headers = PubSubHeaderAccesssor.create(MessageType.DISCONNECT);
WebMessageHeaderAccesssor headers = WebMessageHeaderAccesssor.create(MessageType.DISCONNECT);
headers.setSessionId(session.getId());
Message<?> message = MessageBuilder.withPayload(new byte[0]).copyHeaders(headers.toHeaders()).build();
Message<?> message = MessageBuilder.withPayload(new byte[0]).copyHeaders(headers.toMap()).build();
this.outputChannel.send(message);
}
@ -264,7 +264,7 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement @@ -264,7 +264,7 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement
}
try {
Message<?> byteMessage = MessageBuilder.withPayload(payload).copyHeaders(headers.toHeaders()).build();
Message<?> byteMessage = MessageBuilder.withPayload(payload).copyHeaders(headers.toMap()).build();
byte[] bytes = getStompMessageConverter().fromMessage(byteMessage);
session.sendMessage(new TextMessage(new String(bytes, Charset.forName("UTF-8"))));
}

251
spring-websocket/src/main/java/org/springframework/web/messaging/support/PubSubHeaderAccesssor.java

@ -1,251 +0,0 @@ @@ -1,251 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.support;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.http.MediaType;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.web.messaging.MessageType;
/**
* A base class for working with message headers in Web, messaging protocols that support
* the publish-subscribe message pattern. Provides uniform access to specific values
* common across protocols such as a destination, message type (publish,
* subscribe/unsubscribe), session id, and others.
* <p>
* This class can be used to prepare headers for a new pub-sub message, or to access
* and/or modify headers of an existing message.
* <p>
* Use one of the static factory method in this class, then call getters and setters, and
* at the end if necessary call {@link #toHeaders()} to obtain the updated headers.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class PubSubHeaderAccesssor {
protected Log logger = LogFactory.getLog(getClass());
public static final String DESTINATIONS = "destinations";
public static final String CONTENT_TYPE = "contentType";
public static final String MESSAGE_TYPE = "messageType";
public static final String PROTOCOL_MESSAGE_TYPE = "protocolMessageType";
public static final String SESSION_ID = "sessionId";
public static final String SUBSCRIPTION_ID = "subscriptionId";
public static final String EXTERNAL_SOURCE_HEADERS = "extSourceHeaders";
private static final Map<String, List<String>> emptyMultiValueMap =
Collections.unmodifiableMap(new LinkedMultiValueMap<String, String>(0));
// wrapped read-only message headers
private final MessageHeaders originalHeaders;
// header updates
private final Map<String, Object> headers = new HashMap<String, Object>(4);
// saved headers from a message from a remote source
private final Map<String, List<String>> externalSourceHeaders;
/**
* A constructor for creating new message headers.
* This constructor is protected. See factory methods in this and sub-classes.
*/
protected PubSubHeaderAccesssor(MessageType messageType, Object protocolMessageType,
Map<String, List<String>> externalSourceHeaders) {
this.originalHeaders = null;
Assert.notNull(messageType, "messageType is required");
this.headers.put(MESSAGE_TYPE, messageType);
if (protocolMessageType != null) {
this.headers.put(PROTOCOL_MESSAGE_TYPE, protocolMessageType);
}
if (externalSourceHeaders == null) {
this.externalSourceHeaders = emptyMultiValueMap;
}
else {
this.externalSourceHeaders = Collections.unmodifiableMap(externalSourceHeaders); // TODO: list values must also be read-only
this.headers.put(EXTERNAL_SOURCE_HEADERS, this.externalSourceHeaders);
}
}
/**
* A constructor for accessing and modifying existing message headers. This
* constructor is protected. See factory methods in this and sub-classes.
*/
@SuppressWarnings("unchecked")
protected PubSubHeaderAccesssor(Message<?> message) {
Assert.notNull(message, "message is required");
this.originalHeaders = message.getHeaders();
this.externalSourceHeaders = (this.originalHeaders.get(EXTERNAL_SOURCE_HEADERS) != null) ?
(Map<String, List<String>>) this.originalHeaders.get(EXTERNAL_SOURCE_HEADERS) : emptyMultiValueMap;
}
/**
* Create {@link PubSubHeaderAccesssor} for a new {@link Message} with
* {@link MessageType#MESSAGE}.
*/
public static PubSubHeaderAccesssor create() {
return new PubSubHeaderAccesssor(MessageType.MESSAGE, null, null);
}
/**
* Create {@link PubSubHeaderAccesssor} for a new {@link Message} of a specific type.
*/
public static PubSubHeaderAccesssor create(MessageType messageType) {
return new PubSubHeaderAccesssor(messageType, null, null);
}
/**
* Create {@link PubSubHeaderAccesssor} from the headers of an existing message.
*/
public static PubSubHeaderAccesssor wrap(Message<?> message) {
return new PubSubHeaderAccesssor(message);
}
/**
* Return the original, wrapped headers (i.e. unmodified) or a new Map including any
* updates made via setters.
*/
public Map<String, Object> toHeaders() {
if (!isModified()) {
return this.originalHeaders;
}
Map<String, Object> result = new HashMap<String, Object>();
if (this.originalHeaders != null) {
result.putAll(this.originalHeaders);
}
result.putAll(this.headers);
return result;
}
public boolean isModified() {
return ((this.originalHeaders == null) || !this.headers.isEmpty());
}
public MessageType getMessageType() {
return (MessageType) getHeaderValue(MESSAGE_TYPE);
}
private Object getHeaderValue(String headerName) {
if (this.headers.get(headerName) != null) {
return this.headers.get(headerName);
}
else if ((this.originalHeaders != null) && (this.originalHeaders.get(headerName) != null)) {
return this.originalHeaders.get(headerName);
}
return null;
}
protected void setProtocolMessageType(Object protocolMessageType) {
this.headers.put(PROTOCOL_MESSAGE_TYPE, protocolMessageType);
}
protected Object getProtocolMessageType() {
return getHeaderValue(PROTOCOL_MESSAGE_TYPE);
}
public void setDestination(String destination) {
Assert.notNull(destination, "destination is required");
this.headers.put(DESTINATIONS, Arrays.asList(destination));
}
@SuppressWarnings("unchecked")
public String getDestination() {
List<String> destinations = (List<String>) getHeaderValue(DESTINATIONS);
return CollectionUtils.isEmpty(destinations) ? null : destinations.get(0);
}
@SuppressWarnings("unchecked")
public List<String> getDestinations() {
List<String> destinations = (List<String>) getHeaderValue(DESTINATIONS);
return CollectionUtils.isEmpty(destinations) ? null : destinations;
}
public void setDestinations(List<String> destinations) {
Assert.notNull(destinations, "destinations are required");
this.headers.put(DESTINATIONS, destinations);
}
public MediaType getContentType() {
return (MediaType) getHeaderValue(CONTENT_TYPE);
}
public void setContentType(MediaType contentType) {
Assert.notNull(contentType, "contentType is required");
this.headers.put(CONTENT_TYPE, contentType);
}
public String getSubscriptionId() {
return (String) getHeaderValue(SUBSCRIPTION_ID);
}
public void setSubscriptionId(String subscriptionId) {
this.headers.put(SUBSCRIPTION_ID, subscriptionId);
}
public String getSessionId() {
return (String) getHeaderValue(SESSION_ID);
}
public void setSessionId(String sessionId) {
this.headers.put(SESSION_ID, sessionId);
}
/**
* Return a read-only map of headers originating from a message received by the
* application from an external source (e.g. from a remote WebSocket endpoint). The
* header names and values are exactly as they were, and are protocol specific but may
* also be custom application headers if the protocol allows that.
*/
public Map<String, List<String>> getExternalSourceHeaders() {
return this.externalSourceHeaders;
}
@Override
public String toString() {
return "PubSubHeaders [originalHeaders=" + this.originalHeaders + ", headers="
+ this.headers + ", externalSourceHeaders=" + this.externalSourceHeaders + "]";
}
}

6
spring-websocket/src/main/java/org/springframework/web/messaging/support/PubSubMessageBuilder.java

@ -29,7 +29,7 @@ import reactor.util.Assert; @@ -29,7 +29,7 @@ import reactor.util.Assert;
*/
public class PubSubMessageBuilder<T> {
private final PubSubHeaderAccesssor headers = PubSubHeaderAccesssor.create();
private final WebMessageHeaderAccesssor headers = WebMessageHeaderAccesssor.create();
private final T payload;
@ -67,11 +67,11 @@ public class PubSubMessageBuilder<T> { @@ -67,11 +67,11 @@ public class PubSubMessageBuilder<T> {
Message<?> message = MessageHolder.getMessage();
if (message != null) {
String sessionId = PubSubHeaderAccesssor.wrap(message).getSessionId();
String sessionId = WebMessageHeaderAccesssor.wrap(message).getSessionId();
this.headers.setSessionId(sessionId);
}
return MessageBuilder.withPayload(this.payload).copyHeaders(this.headers.toHeaders()).build();
return MessageBuilder.withPayload(this.payload).copyHeaders(this.headers.toMap()).build();
}
}

167
spring-websocket/src/main/java/org/springframework/web/messaging/support/WebMessageHeaderAccesssor.java

@ -0,0 +1,167 @@ @@ -0,0 +1,167 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.support;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.springframework.http.MediaType;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.NativeMessageHeaderAccessor;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.web.messaging.MessageType;
/**
* A base class for working with message headers in Web, messaging protocols that support
* the publish-subscribe message pattern. Provides uniform access to specific values
* common across protocols such as a destination, message type (publish,
* subscribe/unsubscribe), session id, and others.
* <p>
* Use one of the static factory method in this class, then call getters and setters, and
* at the end if necessary call {@link #toMap()} to obtain the updated headers.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class WebMessageHeaderAccesssor extends NativeMessageHeaderAccessor {
public static final String DESTINATIONS = "destinations";
public static final String CONTENT_TYPE = "contentType";
public static final String MESSAGE_TYPE = "messageType";
public static final String PROTOCOL_MESSAGE_TYPE = "protocolMessageType";
public static final String SESSION_ID = "sessionId";
public static final String SUBSCRIPTION_ID = "subscriptionId";
/**
* A constructor for creating new message headers.
* This constructor is protected. See factory methods in this and sub-classes.
*/
protected WebMessageHeaderAccesssor(MessageType messageType, Object protocolMessageType,
Map<String, List<String>> externalSourceHeaders) {
super(externalSourceHeaders);
Assert.notNull(messageType, "messageType is required");
setHeader(MESSAGE_TYPE, messageType);
if (protocolMessageType != null) {
setHeader(PROTOCOL_MESSAGE_TYPE, protocolMessageType);
}
}
/**
* A constructor for accessing and modifying existing message headers. This
* constructor is protected. See factory methods in this and sub-classes.
*/
protected WebMessageHeaderAccesssor(Message<?> message) {
super(message);
Assert.notNull(message, "message is required");
}
/**
* Create {@link WebMessageHeaderAccesssor} for a new {@link Message} with
* {@link MessageType#MESSAGE}.
*/
public static WebMessageHeaderAccesssor create() {
return new WebMessageHeaderAccesssor(MessageType.MESSAGE, null, null);
}
/**
* Create {@link WebMessageHeaderAccesssor} for a new {@link Message} of a specific type.
*/
public static WebMessageHeaderAccesssor create(MessageType messageType) {
return new WebMessageHeaderAccesssor(messageType, null, null);
}
/**
* Create {@link WebMessageHeaderAccesssor} from the headers of an existing message.
*/
public static WebMessageHeaderAccesssor wrap(Message<?> message) {
return new WebMessageHeaderAccesssor(message);
}
public MessageType getMessageType() {
return (MessageType) getHeader(MESSAGE_TYPE);
}
protected void setProtocolMessageType(Object protocolMessageType) {
setHeader(PROTOCOL_MESSAGE_TYPE, protocolMessageType);
}
protected Object getProtocolMessageType() {
return getHeader(PROTOCOL_MESSAGE_TYPE);
}
public void setDestination(String destination) {
Assert.notNull(destination, "destination is required");
setHeader(DESTINATIONS, Arrays.asList(destination));
}
@SuppressWarnings("unchecked")
public String getDestination() {
List<String> destinations = (List<String>) getHeader(DESTINATIONS);
return CollectionUtils.isEmpty(destinations) ? null : destinations.get(0);
}
@SuppressWarnings("unchecked")
public List<String> getDestinations() {
List<String> destinations = (List<String>) getHeader(DESTINATIONS);
return CollectionUtils.isEmpty(destinations) ? null : destinations;
}
public void setDestinations(List<String> destinations) {
Assert.notNull(destinations, "destinations are required");
setHeader(DESTINATIONS, destinations);
}
public MediaType getContentType() {
return (MediaType) getHeader(CONTENT_TYPE);
}
public void setContentType(MediaType contentType) {
Assert.notNull(contentType, "contentType is required");
setHeader(CONTENT_TYPE, contentType);
}
public String getSubscriptionId() {
return (String) getHeader(SUBSCRIPTION_ID);
}
public void setSubscriptionId(String subscriptionId) {
setHeader(SUBSCRIPTION_ID, subscriptionId);
}
public String getSessionId() {
return (String) getHeader(SESSION_ID);
}
public void setSessionId(String sessionId) {
setHeader(SESSION_ID, sessionId);
}
}

6
spring-websocket/src/test/java/org/springframework/web/messaging/stomp/support/StompMessageConverterTests.java

@ -53,7 +53,7 @@ public class StompMessageConverterTests { @@ -53,7 +53,7 @@ public class StompMessageConverterTests {
MessageHeaders headers = message.getHeaders();
StompHeaderAccessor stompHeaders = StompHeaderAccessor.wrap(message);
assertEquals(7, stompHeaders.toHeaders().size());
assertEquals(7, stompHeaders.toMap().size());
assertEquals(Collections.singleton("1.1"), stompHeaders.getAcceptVersion());
assertEquals("github.org", stompHeaders.getHost());
@ -84,7 +84,7 @@ public class StompMessageConverterTests { @@ -84,7 +84,7 @@ public class StompMessageConverterTests {
StompHeaderAccessor stompHeaders = StompHeaderAccessor.wrap(message);
assertEquals(Collections.singleton("1.1"), stompHeaders.getAcceptVersion());
assertEquals("st\nomp.gi:thu\\b.org", stompHeaders.getExternalSourceHeaders().get("ho:\ns\rt").get(0));
assertEquals("st\nomp.gi:thu\\b.org", stompHeaders.toNativeHeaderMap().get("ho:\ns\rt").get(0));
String convertedBack = new String(this.converter.fromMessage(message), "UTF-8");
@ -128,7 +128,7 @@ public class StompMessageConverterTests { @@ -128,7 +128,7 @@ public class StompMessageConverterTests {
StompHeaderAccessor stompHeaders = StompHeaderAccessor.wrap(message);
assertEquals(Collections.singleton("1.1"), stompHeaders.getAcceptVersion());
assertEquals("st\nomp.gi:thu\\b.org", stompHeaders.getExternalSourceHeaders().get("ho:\ns\rt").get(0));
assertEquals("st\nomp.gi:thu\\b.org", stompHeaders.toNativeHeaderMap().get("ho:\ns\rt").get(0));
String convertedBack = new String(this.converter.fromMessage(message), "UTF-8");

Loading…
Cancel
Save