Browse Source

Polish

Issue: SPR-16519
pull/1708/head
Rossen Stoyanchev 8 years ago
parent
commit
356ef5199e
  1. 89
      spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java
  2. 31
      spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompSession.java

89
spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2017 the original author or authors. * Copyright 2002-2018 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -211,20 +211,20 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
@Override @Override
public Receiptable send(String destination, Object payload) { public Receiptable send(String destination, Object payload) {
StompHeaders stompHeaders = new StompHeaders(); StompHeaders headers = new StompHeaders();
stompHeaders.setDestination(destination); headers.setDestination(destination);
return send(stompHeaders, payload); return send(headers, payload);
} }
@Override @Override
public Receiptable send(StompHeaders stompHeaders, Object payload) { public Receiptable send(StompHeaders headers, Object payload) {
Assert.hasText(stompHeaders.getDestination(), "Destination header is required"); Assert.hasText(headers.getDestination(), "Destination header is required");
String receiptId = checkOrAddReceipt(stompHeaders); String receiptId = checkOrAddReceipt(headers);
Receiptable receiptable = new ReceiptHandler(receiptId); Receiptable receiptable = new ReceiptHandler(receiptId);
StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.SEND); StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.SEND);
accessor.addNativeHeaders(stompHeaders); accessor.addNativeHeaders(headers);
Message<byte[]> message = createMessage(accessor, payload); Message<byte[]> message = createMessage(accessor, payload);
execute(message); execute(message);
@ -232,11 +232,11 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
} }
@Nullable @Nullable
private String checkOrAddReceipt(StompHeaders stompHeaders) { private String checkOrAddReceipt(StompHeaders headers) {
String receiptId = stompHeaders.getReceipt(); String receiptId = headers.getReceipt();
if (isAutoReceiptEnabled() && receiptId == null) { if (isAutoReceiptEnabled() && receiptId == null) {
receiptId = String.valueOf(DefaultStompSession.this.receiptIndex.getAndIncrement()); receiptId = String.valueOf(DefaultStompSession.this.receiptIndex.getAndIncrement());
stompHeaders.setReceipt(receiptId); headers.setReceipt(receiptId);
} }
return receiptId; return receiptId;
} }
@ -292,26 +292,26 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
@Override @Override
public Subscription subscribe(String destination, StompFrameHandler handler) { public Subscription subscribe(String destination, StompFrameHandler handler) {
StompHeaders stompHeaders = new StompHeaders(); StompHeaders headers = new StompHeaders();
stompHeaders.setDestination(destination); headers.setDestination(destination);
return subscribe(stompHeaders, handler); return subscribe(headers, handler);
} }
@Override @Override
public Subscription subscribe(StompHeaders stompHeaders, StompFrameHandler handler) { public Subscription subscribe(StompHeaders headers, StompFrameHandler handler) {
Assert.hasText(stompHeaders.getDestination(), "Destination header is required"); Assert.hasText(headers.getDestination(), "Destination header is required");
Assert.notNull(handler, "StompFrameHandler must not be null"); Assert.notNull(handler, "StompFrameHandler must not be null");
String subscriptionId = stompHeaders.getId(); String subscriptionId = headers.getId();
if (!StringUtils.hasText(subscriptionId)) { if (!StringUtils.hasText(subscriptionId)) {
subscriptionId = String.valueOf(DefaultStompSession.this.subscriptionIndex.getAndIncrement()); subscriptionId = String.valueOf(DefaultStompSession.this.subscriptionIndex.getAndIncrement());
stompHeaders.setId(subscriptionId); headers.setId(subscriptionId);
} }
checkOrAddReceipt(stompHeaders); checkOrAddReceipt(headers);
Subscription subscription = new DefaultSubscription(stompHeaders, handler); Subscription subscription = new DefaultSubscription(headers, handler);
StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.SUBSCRIBE); StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.SUBSCRIBE);
accessor.addNativeHeaders(stompHeaders); accessor.addNativeHeaders(headers);
Message<byte[]> message = createMessage(accessor, EMPTY_PAYLOAD); Message<byte[]> message = createMessage(accessor, EMPTY_PAYLOAD);
execute(message); execute(message);
@ -320,17 +320,14 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
@Override @Override
public Receiptable acknowledge(String messageId, boolean consumed) { public Receiptable acknowledge(String messageId, boolean consumed) {
StompHeaders stompHeaders = new StompHeaders(); StompHeaders headers = new StompHeaders();
if ("1.1".equals(this.version)) { if ("1.1".equals(this.version)) {
stompHeaders.setMessageId(messageId); headers.setMessageId(messageId);
} }
else { else {
stompHeaders.setId(messageId); headers.setId(messageId);
} }
return acknowledge(headers, consumed);
Receiptable receiptable = acknowledge(stompHeaders, consumed);
return receiptable;
} }
@Override @Override
@ -347,10 +344,10 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
return receiptable; return receiptable;
} }
private void unsubscribe(String id, @Nullable StompHeaders stompHeaders) { private void unsubscribe(String id, @Nullable StompHeaders headers) {
StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.UNSUBSCRIBE); StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.UNSUBSCRIBE);
if (stompHeaders != null) { if (headers != null) {
accessor.addNativeHeaders(stompHeaders); accessor.addNativeHeaders(headers);
} }
accessor.setSubscriptionId(id); accessor.setSubscriptionId(id);
Message<byte[]> message = createMessage(accessor, EMPTY_PAYLOAD); Message<byte[]> message = createMessage(accessor, EMPTY_PAYLOAD);
@ -403,7 +400,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
accessor.setSessionId(this.sessionId); accessor.setSessionId(this.sessionId);
StompCommand command = accessor.getCommand(); StompCommand command = accessor.getCommand();
Map<String, List<String>> nativeHeaders = accessor.getNativeHeaders(); Map<String, List<String>> nativeHeaders = accessor.getNativeHeaders();
StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(nativeHeaders); StompHeaders headers = StompHeaders.readOnlyStompHeaders(nativeHeaders);
boolean isHeartbeat = accessor.isHeartbeat(); boolean isHeartbeat = accessor.isHeartbeat();
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Received " + accessor.getDetailedLogMessage(message.getPayload())); logger.trace("Received " + accessor.getDetailedLogMessage(message.getPayload()));
@ -411,9 +408,9 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
try { try {
if (StompCommand.MESSAGE.equals(command)) { if (StompCommand.MESSAGE.equals(command)) {
DefaultSubscription subscription = this.subscriptions.get(stompHeaders.getSubscription()); DefaultSubscription subscription = this.subscriptions.get(headers.getSubscription());
if (subscription != null) { if (subscription != null) {
invokeHandler(subscription.getHandler(), message, stompHeaders); invokeHandler(subscription.getHandler(), message, headers);
} }
else if (logger.isDebugEnabled()) { else if (logger.isDebugEnabled()) {
logger.debug("No handler for: " + accessor.getDetailedLogMessage(message.getPayload()) + logger.debug("No handler for: " + accessor.getDetailedLogMessage(message.getPayload()) +
@ -422,7 +419,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
} }
else { else {
if (StompCommand.RECEIPT.equals(command)) { if (StompCommand.RECEIPT.equals(command)) {
String receiptId = stompHeaders.getReceiptId(); String receiptId = headers.getReceiptId();
ReceiptHandler handler = this.receiptHandlers.get(receiptId); ReceiptHandler handler = this.receiptHandlers.get(receiptId);
if (handler != null) { if (handler != null) {
handler.handleReceiptReceived(); handler.handleReceiptReceived();
@ -432,13 +429,13 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
} }
} }
else if (StompCommand.CONNECTED.equals(command)) { else if (StompCommand.CONNECTED.equals(command)) {
initHeartbeatTasks(stompHeaders); initHeartbeatTasks(headers);
this.version = stompHeaders.getFirst("version"); this.version = headers.getFirst("version");
this.sessionFuture.set(this); this.sessionFuture.set(this);
this.sessionHandler.afterConnected(this, stompHeaders); this.sessionHandler.afterConnected(this, headers);
} }
else if (StompCommand.ERROR.equals(command)) { else if (StompCommand.ERROR.equals(command)) {
invokeHandler(this.sessionHandler, message, stompHeaders); invokeHandler(this.sessionHandler, message, headers);
} }
else if (!isHeartbeat && logger.isTraceEnabled()) { else if (!isHeartbeat && logger.isTraceEnabled()) {
logger.trace("Message not handled."); logger.trace("Message not handled.");
@ -446,16 +443,16 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
} }
} }
catch (Throwable ex) { catch (Throwable ex) {
this.sessionHandler.handleException(this, command, stompHeaders, message.getPayload(), ex); this.sessionHandler.handleException(this, command, headers, message.getPayload(), ex);
} }
} }
private void invokeHandler(StompFrameHandler handler, Message<byte[]> message, StompHeaders stompHeaders) { private void invokeHandler(StompFrameHandler handler, Message<byte[]> message, StompHeaders headers) {
if (message.getPayload().length == 0) { if (message.getPayload().length == 0) {
handler.handleFrame(stompHeaders, null); handler.handleFrame(headers, null);
return; return;
} }
Type payloadType = handler.getPayloadType(stompHeaders); Type payloadType = handler.getPayloadType(headers);
Class<?> resolvedType = ResolvableType.forType(payloadType).resolve(); Class<?> resolvedType = ResolvableType.forType(payloadType).resolve();
if (resolvedType == null) { if (resolvedType == null) {
throw new MessageConversionException("Unresolvable payload type [" + payloadType + throw new MessageConversionException("Unresolvable payload type [" + payloadType +
@ -466,7 +463,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
throw new MessageConversionException("No suitable converter for payload type [" + payloadType + throw new MessageConversionException("No suitable converter for payload type [" + payloadType +
"] from handler type [" + handler.getClass() + "]"); "] from handler type [" + handler.getClass() + "]");
} }
handler.handleFrame(stompHeaders, object); handler.handleFrame(headers, object);
} }
private void initHeartbeatTasks(StompHeaders connectedHeaders) { private void initHeartbeatTasks(StompHeaders connectedHeaders) {
@ -659,11 +656,11 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
} }
@Override @Override
public void unsubscribe(@Nullable StompHeaders stompHeaders) { public void unsubscribe(@Nullable StompHeaders headers) {
String id = this.headers.getId(); String id = this.headers.getId();
if (id != null) { if (id != null) {
DefaultStompSession.this.subscriptions.remove(id); DefaultStompSession.this.subscriptions.remove(id);
DefaultStompSession.this.unsubscribe(id, stompHeaders); DefaultStompSession.this.unsubscribe(id, headers);
} }
} }

31
spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompSession.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2017 the original author or authors. * Copyright 2002-2018 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -39,10 +39,10 @@ public interface StompSession {
/** /**
* When enabled, a receipt header is automatically added to future * When enabled, a receipt header is automatically added to future
* {@code send} and {@code subscribe} operations on this session, which causes * {@code send} and {@code subscribe} operations on this session, which
* the server to return a RECEIPT. An application can then use the * causes the server to return a RECEIPT. An application can then use
* {@link StompSession.Receiptable * the {@link StompSession.Receiptable Receiptable} returned from the
* Receiptable} returned from the operation to track the receipt. * operation to track the receipt.
* <p>A receipt header can also be added manually through the overloaded * <p>A receipt header can also be added manually through the overloaded
* methods that accept {@code StompHeaders}. * methods that accept {@code StompHeaders}.
*/ */
@ -59,11 +59,11 @@ public interface StompSession {
Receiptable send(String destination, Object payload); Receiptable send(String destination, Object payload);
/** /**
* An overloaded version of {@link #send(String, Object)} that accepts * An overloaded version of {@link #send(String, Object)} with full
* full {@link StompHeaders} instead of a destination. The headers must * {@link StompHeaders} instead of just a destination. The headers must
* contain a destination and may also have other headers such as * contain a destination and may also have other headers such as
* "content-type" or custom headers for the broker to propagate to subscribers, * "content-type" or custom headers for the broker to propagate to
* or broker-specific, non-standard headers.. * subscribers, or broker-specific, non-standard headers..
* @param headers the message headers * @param headers the message headers
* @param payload the message payload * @param payload the message payload
* @return a Receiptable for tracking receipts * @return a Receiptable for tracking receipts
@ -81,7 +81,7 @@ public interface StompSession {
/** /**
* An overloaded version of {@link #subscribe(String, StompFrameHandler)} * An overloaded version of {@link #subscribe(String, StompFrameHandler)}
* that accepts full {@link StompHeaders} rather instead of a destination. * with full {@link StompHeaders} instead of just a destination.
* @param headers the headers for the subscribe message frame * @param headers the headers for the subscribe message frame
* @param handler the handler for received messages * @param handler the handler for received messages
* @return a handle to use to unsubscribe and/or track receipts * @return a handle to use to unsubscribe and/or track receipts
@ -102,15 +102,12 @@ public interface StompSession {
Receiptable acknowledge(String messageId, boolean consumed); Receiptable acknowledge(String messageId, boolean consumed);
/** /**
* Send an acknowledgement whether a message was consumed or not resulting * An overloaded version of {@link #acknowledge(String, boolean)} with
* in an ACK or NACK frame respectively. * full {@link StompHeaders} instead of just a {@code messageId}.
* <p><strong>Note:</strong> to use this when subscribing you must set the * @param headers the headers for the ACK or NACK message frame
* {@link StompHeaders#setAck(String) ack} header to "client" or
* "client-individual" in order ot use this.
* @param headers the headers for the ACK or NACK frame
* @param consumed whether the message was consumed or not * @param consumed whether the message was consumed or not
* @return a Receiptable for tracking receipts * @return a Receiptable for tracking receipts
* @since 5.1 * @since 5.0.5
*/ */
Receiptable acknowledge(StompHeaders headers, boolean consumed); Receiptable acknowledge(StompHeaders headers, boolean consumed);

Loading…
Cancel
Save