|
|
|
@ -18,7 +18,7 @@ package org.springframework.messaging.simp.stomp; |
|
|
|
|
|
|
|
|
|
|
|
import java.lang.reflect.Type; |
|
|
|
import java.lang.reflect.Type; |
|
|
|
import java.util.ArrayList; |
|
|
|
import java.util.ArrayList; |
|
|
|
import java.util.Arrays; |
|
|
|
import java.util.Collections; |
|
|
|
import java.util.Date; |
|
|
|
import java.util.Date; |
|
|
|
import java.util.List; |
|
|
|
import java.util.List; |
|
|
|
import java.util.Map; |
|
|
|
import java.util.Map; |
|
|
|
@ -112,8 +112,8 @@ public class DefaultStompSession implements ConnectionHandlingStompSession { |
|
|
|
* @param connectHeaders headers for the STOMP CONNECT frame |
|
|
|
* @param connectHeaders headers for the STOMP CONNECT frame |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
public DefaultStompSession(StompSessionHandler sessionHandler, StompHeaders connectHeaders) { |
|
|
|
public DefaultStompSession(StompSessionHandler sessionHandler, StompHeaders connectHeaders) { |
|
|
|
Assert.notNull(sessionHandler, "'sessionHandler' is required."); |
|
|
|
Assert.notNull(sessionHandler, "StompSessionHandler must not be null"); |
|
|
|
Assert.notNull(connectHeaders, "'connectHeaders' is required."); |
|
|
|
Assert.notNull(connectHeaders, "StompHeaders must not be null"); |
|
|
|
this.sessionId = idGenerator.generateId().toString(); |
|
|
|
this.sessionId = idGenerator.generateId().toString(); |
|
|
|
this.sessionHandler = sessionHandler; |
|
|
|
this.sessionHandler = sessionHandler; |
|
|
|
this.connectHeaders = connectHeaders; |
|
|
|
this.connectHeaders = connectHeaders; |
|
|
|
@ -145,7 +145,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession { |
|
|
|
* @param messageConverter the message converter to use |
|
|
|
* @param messageConverter the message converter to use |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
public void setMessageConverter(MessageConverter messageConverter) { |
|
|
|
public void setMessageConverter(MessageConverter messageConverter) { |
|
|
|
Assert.notNull(messageConverter, "'messageConverter' must not be null"); |
|
|
|
Assert.notNull(messageConverter, "MessageConverter must not be null"); |
|
|
|
this.converter = messageConverter; |
|
|
|
this.converter = messageConverter; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@ -213,7 +213,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession { |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public Receiptable send(StompHeaders stompHeaders, Object payload) { |
|
|
|
public Receiptable send(StompHeaders stompHeaders, Object payload) { |
|
|
|
Assert.hasText(stompHeaders.getDestination(), "'destination' header is required"); |
|
|
|
Assert.hasText(stompHeaders.getDestination(), "Destination header is required"); |
|
|
|
|
|
|
|
|
|
|
|
String receiptId = checkOrAddReceipt(stompHeaders); |
|
|
|
String receiptId = checkOrAddReceipt(stompHeaders); |
|
|
|
Receiptable receiptable = new ReceiptHandler(receiptId); |
|
|
|
Receiptable receiptable = new ReceiptHandler(receiptId); |
|
|
|
@ -292,8 +292,8 @@ public class DefaultStompSession implements ConnectionHandlingStompSession { |
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public Subscription subscribe(StompHeaders stompHeaders, StompFrameHandler handler) { |
|
|
|
public Subscription subscribe(StompHeaders stompHeaders, StompFrameHandler handler) { |
|
|
|
String destination = stompHeaders.getDestination(); |
|
|
|
String destination = stompHeaders.getDestination(); |
|
|
|
Assert.hasText(destination, "'destination' is required"); |
|
|
|
Assert.hasText(destination, "Destination header is required"); |
|
|
|
Assert.notNull(handler, "'handler' is required"); |
|
|
|
Assert.notNull(handler, "StompFrameHandler must not be null"); |
|
|
|
|
|
|
|
|
|
|
|
String subscriptionId = stompHeaders.getId(); |
|
|
|
String subscriptionId = stompHeaders.getId(); |
|
|
|
if (!StringUtils.hasText(subscriptionId)) { |
|
|
|
if (!StringUtils.hasText(subscriptionId)) { |
|
|
|
@ -397,7 +397,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession { |
|
|
|
} |
|
|
|
} |
|
|
|
else if (logger.isDebugEnabled()) { |
|
|
|
else if (logger.isDebugEnabled()) { |
|
|
|
logger.debug("No handler for: " + accessor.getDetailedLogMessage(message.getPayload()) + |
|
|
|
logger.debug("No handler for: " + accessor.getDetailedLogMessage(message.getPayload()) + |
|
|
|
". Perhaps just unscubscribed?"); |
|
|
|
". Perhaps just unsubscribed?"); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
else { |
|
|
|
else { |
|
|
|
@ -464,7 +464,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession { |
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public void handleFailure(Throwable ex) { |
|
|
|
public void handleFailure(Throwable ex) { |
|
|
|
try { |
|
|
|
try { |
|
|
|
this.sessionFuture.setException(ex); // no-op if already set
|
|
|
|
this.sessionFuture.setException(ex); // no-op if already set
|
|
|
|
this.sessionHandler.handleTransportError(this, ex); |
|
|
|
this.sessionHandler.handleTransportError(this, ex); |
|
|
|
} |
|
|
|
} |
|
|
|
catch (Throwable ex2) { |
|
|
|
catch (Throwable ex2) { |
|
|
|
@ -477,7 +477,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession { |
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public void afterConnectionClosed() { |
|
|
|
public void afterConnectionClosed() { |
|
|
|
if (logger.isDebugEnabled()) { |
|
|
|
if (logger.isDebugEnabled()) { |
|
|
|
logger.debug("Connection closed session id=" + this.sessionId); |
|
|
|
logger.debug("Connection closed in session id=" + this.sessionId); |
|
|
|
} |
|
|
|
} |
|
|
|
if (!this.closing) { |
|
|
|
if (!this.closing) { |
|
|
|
resetConnection(); |
|
|
|
resetConnection(); |
|
|
|
@ -493,7 +493,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession { |
|
|
|
conn.close(); |
|
|
|
conn.close(); |
|
|
|
} |
|
|
|
} |
|
|
|
catch (Throwable ex) { |
|
|
|
catch (Throwable ex) { |
|
|
|
// Ignore
|
|
|
|
// ignore
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
@ -519,7 +519,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private void initReceiptHandling() { |
|
|
|
private void initReceiptHandling() { |
|
|
|
Assert.notNull(getTaskScheduler(), "To track receipts a TaskScheduler must be configured"); |
|
|
|
Assert.notNull(getTaskScheduler(), "To track receipts, a TaskScheduler must be configured"); |
|
|
|
DefaultStompSession.this.receiptHandlers.put(this.receiptId, this); |
|
|
|
DefaultStompSession.this.receiptHandlers.put(this.receiptId, this); |
|
|
|
Date startTime = new Date(System.currentTimeMillis() + getReceiptTimeLimit()); |
|
|
|
Date startTime = new Date(System.currentTimeMillis() + getReceiptTimeLimit()); |
|
|
|
this.future = getTaskScheduler().schedule(new Runnable() { |
|
|
|
this.future = getTaskScheduler().schedule(new Runnable() { |
|
|
|
@ -546,10 +546,11 @@ public class DefaultStompSession implements ConnectionHandlingStompSession { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private void addTask(Runnable task, boolean successTask) { |
|
|
|
private void addTask(Runnable task, boolean successTask) { |
|
|
|
Assert.notNull(this.receiptId, "To track receipts, set autoReceiptEnabled=true or add 'receiptId' header"); |
|
|
|
Assert.notNull(this.receiptId, |
|
|
|
|
|
|
|
"To track receipts, set autoReceiptEnabled=true or add 'receiptId' header"); |
|
|
|
synchronized (this) { |
|
|
|
synchronized (this) { |
|
|
|
if (this.result != null && this.result == successTask) { |
|
|
|
if (this.result != null && this.result == successTask) { |
|
|
|
invoke(Arrays.asList(task)); |
|
|
|
invoke(Collections.singletonList(task)); |
|
|
|
} |
|
|
|
} |
|
|
|
else { |
|
|
|
else { |
|
|
|
if (successTask) { |
|
|
|
if (successTask) { |
|
|
|
@ -568,7 +569,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession { |
|
|
|
runnable.run(); |
|
|
|
runnable.run(); |
|
|
|
} |
|
|
|
} |
|
|
|
catch (Throwable ex) { |
|
|
|
catch (Throwable ex) { |
|
|
|
// Ignore
|
|
|
|
// ignore
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
@ -607,12 +608,11 @@ public class DefaultStompSession implements ConnectionHandlingStompSession { |
|
|
|
|
|
|
|
|
|
|
|
public DefaultSubscription(String id, String destination, String receiptId, StompFrameHandler handler) { |
|
|
|
public DefaultSubscription(String id, String destination, String receiptId, StompFrameHandler handler) { |
|
|
|
super(receiptId); |
|
|
|
super(receiptId); |
|
|
|
Assert.notNull(destination, "'destination' is required"); |
|
|
|
Assert.notNull(destination, "Destination must not be null"); |
|
|
|
Assert.notNull(handler, "'handler' handler is required"); |
|
|
|
Assert.notNull(handler, "StompFrameHandler must not be null"); |
|
|
|
this.id = id; |
|
|
|
this.id = id; |
|
|
|
this.destination = destination; |
|
|
|
this.destination = destination; |
|
|
|
this.handler = handler; |
|
|
|
this.handler = handler; |
|
|
|
|
|
|
|
|
|
|
|
DefaultStompSession.this.subscriptions.put(id, this); |
|
|
|
DefaultStompSession.this.subscriptions.put(id, this); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|