@ -1,5 +1,5 @@
@@ -1,5 +1,5 @@
/ *
* Copyright 2002 - 2017 the original author or authors .
* Copyright 2002 - 2018 the original author or authors .
*
* Licensed under the Apache License , Version 2 . 0 ( the "License" ) ;
* you may not use this file except in compliance with the License .
@ -211,20 +211,20 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
@@ -211,20 +211,20 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
@Override
public Receiptable send ( String destination , Object payload ) {
StompHeaders stompH eaders = new StompHeaders ( ) ;
stompH eaders. setDestination ( destination ) ;
return send ( stompH eaders, payload ) ;
StompHeaders h eaders = new StompHeaders ( ) ;
h eaders. setDestination ( destination ) ;
return send ( h eaders, payload ) ;
}
@Override
public Receiptable send ( StompHeaders stompH eaders, Object payload ) {
Assert . hasText ( stompH eaders. getDestination ( ) , "Destination header is required" ) ;
public Receiptable send ( StompHeaders h eaders, Object payload ) {
Assert . hasText ( h eaders. getDestination ( ) , "Destination header is required" ) ;
String receiptId = checkOrAddReceipt ( stompH eaders) ;
String receiptId = checkOrAddReceipt ( h eaders) ;
Receiptable receiptable = new ReceiptHandler ( receiptId ) ;
StompHeaderAccessor accessor = createHeaderAccessor ( StompCommand . SEND ) ;
accessor . addNativeHeaders ( stompH eaders) ;
accessor . addNativeHeaders ( h eaders) ;
Message < byte [ ] > message = createMessage ( accessor , payload ) ;
execute ( message ) ;
@ -232,11 +232,11 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
@@ -232,11 +232,11 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
}
@Nullable
private String checkOrAddReceipt ( StompHeaders stompH eaders) {
String receiptId = stompH eaders. getReceipt ( ) ;
private String checkOrAddReceipt ( StompHeaders h eaders) {
String receiptId = h eaders. getReceipt ( ) ;
if ( isAutoReceiptEnabled ( ) & & receiptId = = null ) {
receiptId = String . valueOf ( DefaultStompSession . this . receiptIndex . getAndIncrement ( ) ) ;
stompH eaders. setReceipt ( receiptId ) ;
h eaders. setReceipt ( receiptId ) ;
}
return receiptId ;
}
@ -292,26 +292,26 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
@@ -292,26 +292,26 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
@Override
public Subscription subscribe ( String destination , StompFrameHandler handler ) {
StompHeaders stompH eaders = new StompHeaders ( ) ;
stompH eaders. setDestination ( destination ) ;
return subscribe ( stompH eaders, handler ) ;
StompHeaders h eaders = new StompHeaders ( ) ;
h eaders. setDestination ( destination ) ;
return subscribe ( h eaders, handler ) ;
}
@Override
public Subscription subscribe ( StompHeaders stompH eaders, StompFrameHandler handler ) {
Assert . hasText ( stompH eaders. getDestination ( ) , "Destination header is required" ) ;
public Subscription subscribe ( StompHeaders h eaders, StompFrameHandler handler ) {
Assert . hasText ( h eaders. getDestination ( ) , "Destination header is required" ) ;
Assert . notNull ( handler , "StompFrameHandler must not be null" ) ;
String subscriptionId = stompH eaders. getId ( ) ;
String subscriptionId = h eaders. getId ( ) ;
if ( ! StringUtils . hasText ( subscriptionId ) ) {
subscriptionId = String . valueOf ( DefaultStompSession . this . subscriptionIndex . getAndIncrement ( ) ) ;
stompH eaders. setId ( subscriptionId ) ;
h eaders. setId ( subscriptionId ) ;
}
checkOrAddReceipt ( stompH eaders) ;
Subscription subscription = new DefaultSubscription ( stompH eaders, handler ) ;
checkOrAddReceipt ( h eaders) ;
Subscription subscription = new DefaultSubscription ( h eaders, handler ) ;
StompHeaderAccessor accessor = createHeaderAccessor ( StompCommand . SUBSCRIBE ) ;
accessor . addNativeHeaders ( stompH eaders) ;
accessor . addNativeHeaders ( h eaders) ;
Message < byte [ ] > message = createMessage ( accessor , EMPTY_PAYLOAD ) ;
execute ( message ) ;
@ -320,30 +320,34 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
@@ -320,30 +320,34 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
@Override
public Receiptable acknowledge ( String messageId , boolean consumed ) {
StompHeaders stompH eaders = new StompHeaders ( ) ;
StompHeaders h eaders = new StompHeaders ( ) ;
if ( "1.1" . equals ( this . version ) ) {
stompH eaders. setMessageId ( messageId ) ;
h eaders. setMessageId ( messageId ) ;
}
else {
stompH eaders. setId ( messageId ) ;
h eaders. setId ( messageId ) ;
}
return acknowledge ( headers , consumed ) ;
}
String receiptId = checkOrAddReceipt ( stompHeaders ) ;
@Override
public Receiptable acknowledge ( StompHeaders headers , boolean consumed ) {
String receiptId = checkOrAddReceipt ( headers ) ;
Receiptable receiptable = new ReceiptHandler ( receiptId ) ;
StompCommand command = ( consumed ? StompCommand . ACK : StompCommand . NACK ) ;
StompHeaderAccessor accessor = createHeaderAccessor ( command ) ;
accessor . addNativeHeaders ( stompH eaders) ;
accessor . addNativeHeaders ( h eaders) ;
Message < byte [ ] > message = createMessage ( accessor , null ) ;
execute ( message ) ;
return receiptable ;
}
private void unsubscribe ( String id , @Nullable StompHeaders stompH eaders) {
private void unsubscribe ( String id , @Nullable StompHeaders h eaders) {
StompHeaderAccessor accessor = createHeaderAccessor ( StompCommand . UNSUBSCRIBE ) ;
if ( stompH eaders ! = null ) {
accessor . addNativeHeaders ( stompH eaders) ;
if ( h eaders ! = null ) {
accessor . addNativeHeaders ( h eaders) ;
}
accessor . setSubscriptionId ( id ) ;
Message < byte [ ] > message = createMessage ( accessor , EMPTY_PAYLOAD ) ;
@ -396,7 +400,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
@@ -396,7 +400,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
accessor . setSessionId ( this . sessionId ) ;
StompCommand command = accessor . getCommand ( ) ;
Map < String , List < String > > nativeHeaders = accessor . getNativeHeaders ( ) ;
StompHeaders stompH eaders = StompHeaders . readOnlyStompHeaders ( nativeHeaders ) ;
StompHeaders h eaders = StompHeaders . readOnlyStompHeaders ( nativeHeaders ) ;
boolean isHeartbeat = accessor . isHeartbeat ( ) ;
if ( logger . isTraceEnabled ( ) ) {
logger . trace ( "Received " + accessor . getDetailedLogMessage ( message . getPayload ( ) ) ) ;
@ -404,9 +408,9 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
@@ -404,9 +408,9 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
try {
if ( StompCommand . MESSAGE . equals ( command ) ) {
DefaultSubscription subscription = this . subscriptions . get ( stompH eaders. getSubscription ( ) ) ;
DefaultSubscription subscription = this . subscriptions . get ( h eaders. getSubscription ( ) ) ;
if ( subscription ! = null ) {
invokeHandler ( subscription . getHandler ( ) , message , stompH eaders) ;
invokeHandler ( subscription . getHandler ( ) , message , h eaders) ;
}
else if ( logger . isDebugEnabled ( ) ) {
logger . debug ( "No handler for: " + accessor . getDetailedLogMessage ( message . getPayload ( ) ) +
@ -415,7 +419,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
@@ -415,7 +419,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
}
else {
if ( StompCommand . RECEIPT . equals ( command ) ) {
String receiptId = stompH eaders. getReceiptId ( ) ;
String receiptId = h eaders. getReceiptId ( ) ;
ReceiptHandler handler = this . receiptHandlers . get ( receiptId ) ;
if ( handler ! = null ) {
handler . handleReceiptReceived ( ) ;
@ -425,13 +429,13 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
@@ -425,13 +429,13 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
}
}
else if ( StompCommand . CONNECTED . equals ( command ) ) {
initHeartbeatTasks ( stompH eaders) ;
this . version = stompH eaders. getFirst ( "version" ) ;
initHeartbeatTasks ( h eaders) ;
this . version = h eaders. getFirst ( "version" ) ;
this . sessionFuture . set ( this ) ;
this . sessionHandler . afterConnected ( this , stompH eaders) ;
this . sessionHandler . afterConnected ( this , h eaders) ;
}
else if ( StompCommand . ERROR . equals ( command ) ) {
invokeHandler ( this . sessionHandler , message , stompH eaders) ;
invokeHandler ( this . sessionHandler , message , h eaders) ;
}
else if ( ! isHeartbeat & & logger . isTraceEnabled ( ) ) {
logger . trace ( "Message not handled." ) ;
@ -439,16 +443,16 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
@@ -439,16 +443,16 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
}
}
catch ( Throwable ex ) {
this . sessionHandler . handleException ( this , command , stompH eaders, message . getPayload ( ) , ex ) ;
this . sessionHandler . handleException ( this , command , h eaders, message . getPayload ( ) , ex ) ;
}
}
private void invokeHandler ( StompFrameHandler handler , Message < byte [ ] > message , StompHeaders stompH eaders) {
private void invokeHandler ( StompFrameHandler handler , Message < byte [ ] > message , StompHeaders h eaders) {
if ( message . getPayload ( ) . length = = 0 ) {
handler . handleFrame ( stompH eaders, null ) ;
handler . handleFrame ( h eaders, null ) ;
return ;
}
Type payloadType = handler . getPayloadType ( stompH eaders) ;
Type payloadType = handler . getPayloadType ( h eaders) ;
Class < ? > resolvedType = ResolvableType . forType ( payloadType ) . resolve ( ) ;
if ( resolvedType = = null ) {
throw new MessageConversionException ( "Unresolvable payload type [" + payloadType +
@ -459,7 +463,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
@@ -459,7 +463,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
throw new MessageConversionException ( "No suitable converter for payload type [" + payloadType +
"] from handler type [" + handler . getClass ( ) + "]" ) ;
}
handler . handleFrame ( stompH eaders, object ) ;
handler . handleFrame ( h eaders, object ) ;
}
private void initHeartbeatTasks ( StompHeaders connectedHeaders ) {
@ -652,11 +656,11 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
@@ -652,11 +656,11 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
}
@Override
public void unsubscribe ( @Nullable StompHeaders stompH eaders) {
public void unsubscribe ( @Nullable StompHeaders h eaders) {
String id = this . headers . getId ( ) ;
if ( id ! = null ) {
DefaultStompSession . this . subscriptions . remove ( id ) ;
DefaultStompSession . this . unsubscribe ( id , stompH eaders) ;
DefaultStompSession . this . unsubscribe ( id , h eaders) ;
}
}