diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java index 0f304e6603f..0fdcce0a377 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -211,20 +211,20 @@ public class DefaultStompSession implements ConnectionHandlingStompSession { @Override public Receiptable send(String destination, Object payload) { - StompHeaders stompHeaders = new StompHeaders(); - stompHeaders.setDestination(destination); - return send(stompHeaders, payload); + StompHeaders headers = new StompHeaders(); + headers.setDestination(destination); + return send(headers, payload); } @Override - public Receiptable send(StompHeaders stompHeaders, Object payload) { - Assert.hasText(stompHeaders.getDestination(), "Destination header is required"); + public Receiptable send(StompHeaders headers, Object payload) { + Assert.hasText(headers.getDestination(), "Destination header is required"); - String receiptId = checkOrAddReceipt(stompHeaders); + String receiptId = checkOrAddReceipt(headers); Receiptable receiptable = new ReceiptHandler(receiptId); StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.SEND); - accessor.addNativeHeaders(stompHeaders); + accessor.addNativeHeaders(headers); Message message = createMessage(accessor, payload); execute(message); @@ -232,11 +232,11 @@ public class DefaultStompSession implements ConnectionHandlingStompSession { } @Nullable - private String checkOrAddReceipt(StompHeaders stompHeaders) { - String receiptId = stompHeaders.getReceipt(); + private String checkOrAddReceipt(StompHeaders headers) { + String receiptId = headers.getReceipt(); if (isAutoReceiptEnabled() && receiptId == null) { receiptId = String.valueOf(DefaultStompSession.this.receiptIndex.getAndIncrement()); - stompHeaders.setReceipt(receiptId); + headers.setReceipt(receiptId); } return receiptId; } @@ -292,26 +292,26 @@ public class DefaultStompSession implements ConnectionHandlingStompSession { @Override public Subscription subscribe(String destination, StompFrameHandler handler) { - StompHeaders stompHeaders = new StompHeaders(); - stompHeaders.setDestination(destination); - return subscribe(stompHeaders, handler); + StompHeaders headers = new StompHeaders(); + headers.setDestination(destination); + return subscribe(headers, handler); } @Override - public Subscription subscribe(StompHeaders stompHeaders, StompFrameHandler handler) { - Assert.hasText(stompHeaders.getDestination(), "Destination header is required"); + public Subscription subscribe(StompHeaders headers, StompFrameHandler handler) { + Assert.hasText(headers.getDestination(), "Destination header is required"); Assert.notNull(handler, "StompFrameHandler must not be null"); - String subscriptionId = stompHeaders.getId(); + String subscriptionId = headers.getId(); if (!StringUtils.hasText(subscriptionId)) { subscriptionId = String.valueOf(DefaultStompSession.this.subscriptionIndex.getAndIncrement()); - stompHeaders.setId(subscriptionId); + headers.setId(subscriptionId); } - checkOrAddReceipt(stompHeaders); - Subscription subscription = new DefaultSubscription(stompHeaders, handler); + checkOrAddReceipt(headers); + Subscription subscription = new DefaultSubscription(headers, handler); StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.SUBSCRIBE); - accessor.addNativeHeaders(stompHeaders); + accessor.addNativeHeaders(headers); Message message = createMessage(accessor, EMPTY_PAYLOAD); execute(message); @@ -320,17 +320,14 @@ public class DefaultStompSession implements ConnectionHandlingStompSession { @Override public Receiptable acknowledge(String messageId, boolean consumed) { - StompHeaders stompHeaders = new StompHeaders(); + StompHeaders headers = new StompHeaders(); if ("1.1".equals(this.version)) { - stompHeaders.setMessageId(messageId); + headers.setMessageId(messageId); } else { - stompHeaders.setId(messageId); + headers.setId(messageId); } - - Receiptable receiptable = acknowledge(stompHeaders, consumed); - - return receiptable; + return acknowledge(headers, consumed); } @Override @@ -347,10 +344,10 @@ public class DefaultStompSession implements ConnectionHandlingStompSession { return receiptable; } - private void unsubscribe(String id, @Nullable StompHeaders stompHeaders) { + private void unsubscribe(String id, @Nullable StompHeaders headers) { StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.UNSUBSCRIBE); - if (stompHeaders != null) { - accessor.addNativeHeaders(stompHeaders); + if (headers != null) { + accessor.addNativeHeaders(headers); } accessor.setSubscriptionId(id); Message message = createMessage(accessor, EMPTY_PAYLOAD); @@ -403,7 +400,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession { accessor.setSessionId(this.sessionId); StompCommand command = accessor.getCommand(); Map> nativeHeaders = accessor.getNativeHeaders(); - StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(nativeHeaders); + StompHeaders headers = StompHeaders.readOnlyStompHeaders(nativeHeaders); boolean isHeartbeat = accessor.isHeartbeat(); if (logger.isTraceEnabled()) { logger.trace("Received " + accessor.getDetailedLogMessage(message.getPayload())); @@ -411,9 +408,9 @@ public class DefaultStompSession implements ConnectionHandlingStompSession { try { if (StompCommand.MESSAGE.equals(command)) { - DefaultSubscription subscription = this.subscriptions.get(stompHeaders.getSubscription()); + DefaultSubscription subscription = this.subscriptions.get(headers.getSubscription()); if (subscription != null) { - invokeHandler(subscription.getHandler(), message, stompHeaders); + invokeHandler(subscription.getHandler(), message, headers); } else if (logger.isDebugEnabled()) { logger.debug("No handler for: " + accessor.getDetailedLogMessage(message.getPayload()) + @@ -422,7 +419,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession { } else { if (StompCommand.RECEIPT.equals(command)) { - String receiptId = stompHeaders.getReceiptId(); + String receiptId = headers.getReceiptId(); ReceiptHandler handler = this.receiptHandlers.get(receiptId); if (handler != null) { handler.handleReceiptReceived(); @@ -432,13 +429,13 @@ public class DefaultStompSession implements ConnectionHandlingStompSession { } } else if (StompCommand.CONNECTED.equals(command)) { - initHeartbeatTasks(stompHeaders); - this.version = stompHeaders.getFirst("version"); + initHeartbeatTasks(headers); + this.version = headers.getFirst("version"); this.sessionFuture.set(this); - this.sessionHandler.afterConnected(this, stompHeaders); + this.sessionHandler.afterConnected(this, headers); } else if (StompCommand.ERROR.equals(command)) { - invokeHandler(this.sessionHandler, message, stompHeaders); + invokeHandler(this.sessionHandler, message, headers); } else if (!isHeartbeat && logger.isTraceEnabled()) { logger.trace("Message not handled."); @@ -446,16 +443,16 @@ public class DefaultStompSession implements ConnectionHandlingStompSession { } } catch (Throwable ex) { - this.sessionHandler.handleException(this, command, stompHeaders, message.getPayload(), ex); + this.sessionHandler.handleException(this, command, headers, message.getPayload(), ex); } } - private void invokeHandler(StompFrameHandler handler, Message message, StompHeaders stompHeaders) { + private void invokeHandler(StompFrameHandler handler, Message message, StompHeaders headers) { if (message.getPayload().length == 0) { - handler.handleFrame(stompHeaders, null); + handler.handleFrame(headers, null); return; } - Type payloadType = handler.getPayloadType(stompHeaders); + Type payloadType = handler.getPayloadType(headers); Class resolvedType = ResolvableType.forType(payloadType).resolve(); if (resolvedType == null) { throw new MessageConversionException("Unresolvable payload type [" + payloadType + @@ -466,7 +463,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession { throw new MessageConversionException("No suitable converter for payload type [" + payloadType + "] from handler type [" + handler.getClass() + "]"); } - handler.handleFrame(stompHeaders, object); + handler.handleFrame(headers, object); } private void initHeartbeatTasks(StompHeaders connectedHeaders) { @@ -659,11 +656,11 @@ public class DefaultStompSession implements ConnectionHandlingStompSession { } @Override - public void unsubscribe(@Nullable StompHeaders stompHeaders) { + public void unsubscribe(@Nullable StompHeaders headers) { String id = this.headers.getId(); if (id != null) { DefaultStompSession.this.subscriptions.remove(id); - DefaultStompSession.this.unsubscribe(id, stompHeaders); + DefaultStompSession.this.unsubscribe(id, headers); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompSession.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompSession.java index 6882c681100..833e3980a99 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompSession.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompSession.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,10 +39,10 @@ public interface StompSession { /** * When enabled, a receipt header is automatically added to future - * {@code send} and {@code subscribe} operations on this session, which causes - * the server to return a RECEIPT. An application can then use the - * {@link StompSession.Receiptable - * Receiptable} returned from the operation to track the receipt. + * {@code send} and {@code subscribe} operations on this session, which + * causes the server to return a RECEIPT. An application can then use + * the {@link StompSession.Receiptable Receiptable} returned from the + * operation to track the receipt. *

A receipt header can also be added manually through the overloaded * methods that accept {@code StompHeaders}. */ @@ -59,11 +59,11 @@ public interface StompSession { Receiptable send(String destination, Object payload); /** - * An overloaded version of {@link #send(String, Object)} that accepts - * full {@link StompHeaders} instead of a destination. The headers must + * An overloaded version of {@link #send(String, Object)} with full + * {@link StompHeaders} instead of just a destination. The headers must * contain a destination and may also have other headers such as - * "content-type" or custom headers for the broker to propagate to subscribers, - * or broker-specific, non-standard headers.. + * "content-type" or custom headers for the broker to propagate to + * subscribers, or broker-specific, non-standard headers.. * @param headers the message headers * @param payload the message payload * @return a Receiptable for tracking receipts @@ -81,7 +81,7 @@ public interface StompSession { /** * An overloaded version of {@link #subscribe(String, StompFrameHandler)} - * that accepts full {@link StompHeaders} rather instead of a destination. + * with full {@link StompHeaders} instead of just a destination. * @param headers the headers for the subscribe message frame * @param handler the handler for received messages * @return a handle to use to unsubscribe and/or track receipts @@ -102,15 +102,12 @@ public interface StompSession { Receiptable acknowledge(String messageId, boolean consumed); /** - * Send an acknowledgement whether a message was consumed or not resulting - * in an ACK or NACK frame respectively. - *

Note: to use this when subscribing you must set the - * {@link StompHeaders#setAck(String) ack} header to "client" or - * "client-individual" in order ot use this. - * @param headers the headers for the ACK or NACK frame + * An overloaded version of {@link #acknowledge(String, boolean)} with + * full {@link StompHeaders} instead of just a {@code messageId}. + * @param headers the headers for the ACK or NACK message frame * @param consumed whether the message was consumed or not * @return a Receiptable for tracking receipts - * @since 5.1 + * @since 5.0.5 */ Receiptable acknowledge(StompHeaders headers, boolean consumed);