|
|
|
@ -664,7 +664,7 @@ public class DefaultStompSessionTests { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
@Test |
|
|
|
void unsubscribeWithReceipt() { |
|
|
|
void unsubscribeWithoutReceipt() { |
|
|
|
this.session.afterConnected(this.connection); |
|
|
|
this.session.afterConnected(this.connection); |
|
|
|
assertThat(this.session.isConnected()).isTrue(); |
|
|
|
assertThat(this.session.isConnected()).isTrue(); |
|
|
|
Subscription subscription = this.session.subscribe("/topic/foo", mock()); |
|
|
|
Subscription subscription = this.session.subscribe("/topic/foo", mock()); |
|
|
|
@ -672,62 +672,40 @@ public class DefaultStompSessionTests { |
|
|
|
Receiptable receipt = subscription.unsubscribe(); |
|
|
|
Receiptable receipt = subscription.unsubscribe(); |
|
|
|
assertThat(receipt).isNotNull(); |
|
|
|
assertThat(receipt).isNotNull(); |
|
|
|
assertThat(receipt.getReceiptId()).isNull(); |
|
|
|
assertThat(receipt.getReceiptId()).isNull(); |
|
|
|
|
|
|
|
|
|
|
|
Message<byte[]> message = this.messageCaptor.getValue(); |
|
|
|
|
|
|
|
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); |
|
|
|
|
|
|
|
assertThat(accessor.getCommand()).isEqualTo(StompCommand.UNSUBSCRIBE); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders()); |
|
|
|
|
|
|
|
assertThat(stompHeaders).hasSize(1); |
|
|
|
|
|
|
|
assertThat(stompHeaders.getId()).isEqualTo(subscription.getSubscriptionId()); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
@Test |
|
|
|
void unsubscribeWithCustomHeaderAndReceipt() { |
|
|
|
void unsubscribeWithReceipt() { |
|
|
|
this.session.afterConnected(this.connection); |
|
|
|
this.session.afterConnected(this.connection); |
|
|
|
this.session.setTaskScheduler(mock()); |
|
|
|
this.session.setTaskScheduler(mock()); |
|
|
|
this.session.setAutoReceipt(true); |
|
|
|
this.session.setAutoReceipt(true); |
|
|
|
|
|
|
|
Subscription subscription = this.session.subscribe("/topic/foo", mock()); |
|
|
|
|
|
|
|
|
|
|
|
StompHeaders subHeaders = new StompHeaders(); |
|
|
|
Receiptable receipt = subscription.unsubscribe(); |
|
|
|
subHeaders.setDestination("/topic/foo"); |
|
|
|
|
|
|
|
Subscription subscription = this.session.subscribe(subHeaders, mock()); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
StompHeaders custom = new StompHeaders(); |
|
|
|
|
|
|
|
custom.set("x-cust", "value"); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Receiptable receipt = subscription.unsubscribe(custom); |
|
|
|
|
|
|
|
assertThat(receipt).isNotNull(); |
|
|
|
assertThat(receipt).isNotNull(); |
|
|
|
assertThat(receipt.getReceiptId()).isNotNull(); |
|
|
|
assertThat(receipt.getReceiptId()).isNotNull(); |
|
|
|
|
|
|
|
|
|
|
|
Message<byte[]> message = this.messageCaptor.getValue(); |
|
|
|
Message<byte[]> message = this.messageCaptor.getValue(); |
|
|
|
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); |
|
|
|
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); |
|
|
|
assertThat(accessor.getCommand()).isEqualTo(StompCommand.UNSUBSCRIBE); |
|
|
|
assertThat(accessor.getReceipt()).isEqualTo(receipt.getReceiptId()); |
|
|
|
|
|
|
|
|
|
|
|
StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders()); |
|
|
|
|
|
|
|
assertThat(stompHeaders.getId()).isEqualTo(subscription.getSubscriptionId()); |
|
|
|
|
|
|
|
assertThat(stompHeaders.get("x-cust")).containsExactly("value"); |
|
|
|
|
|
|
|
assertThat(stompHeaders.getReceipt()).isEqualTo(receipt.getReceiptId()); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
@Test |
|
|
|
void receiptReceivedOnUnsubscribe() { |
|
|
|
void receiptReceivedOnUnsubscribe() { |
|
|
|
this.session.afterConnected(this.connection); |
|
|
|
this.session.afterConnected(this.connection); |
|
|
|
TaskScheduler scheduler = mock(); |
|
|
|
this.session.setTaskScheduler(mock()); |
|
|
|
this.session.setTaskScheduler(scheduler); |
|
|
|
|
|
|
|
this.session.setAutoReceipt(true); |
|
|
|
this.session.setAutoReceipt(true); |
|
|
|
|
|
|
|
|
|
|
|
Subscription subscription = this.session.subscribe("/topic/foo", mock()); |
|
|
|
Subscription subscription = this.session.subscribe("/topic/foo", mock()); |
|
|
|
Receiptable receipt = subscription.unsubscribe(); |
|
|
|
Receiptable receipt = subscription.unsubscribe(); |
|
|
|
|
|
|
|
|
|
|
|
StompHeaderAccessor ack = StompHeaderAccessor.create(StompCommand.RECEIPT); |
|
|
|
|
|
|
|
ack.setReceiptId(receipt.getReceiptId()); |
|
|
|
|
|
|
|
ack.setLeaveMutable(true); |
|
|
|
|
|
|
|
Message<byte[]> receiptMessage = MessageBuilder.createMessage(new byte[0], ack.getMessageHeaders()); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
AtomicBoolean called = new AtomicBoolean(false); |
|
|
|
AtomicBoolean called = new AtomicBoolean(false); |
|
|
|
receipt.addReceiptTask(() -> called.set(true)); |
|
|
|
receipt.addReceiptTask(() -> called.set(true)); |
|
|
|
|
|
|
|
|
|
|
|
this.session.handleMessage(receiptMessage); |
|
|
|
StompHeaderAccessor ack = StompHeaderAccessor.create(StompCommand.RECEIPT); |
|
|
|
|
|
|
|
ack.setReceiptId(receipt.getReceiptId()); |
|
|
|
|
|
|
|
ack.setLeaveMutable(true); |
|
|
|
|
|
|
|
this.session.handleMessage(MessageBuilder.createMessage(new byte[0], ack.getMessageHeaders())); |
|
|
|
|
|
|
|
|
|
|
|
assertThat(called.get()).isTrue(); |
|
|
|
assertThat(called.get()).isTrue(); |
|
|
|
} |
|
|
|
} |
|
|
|
|