Browse Source

Improve close in ConcurrentWebSocketSessionDecorator

Before this commit the concurrent session wrapper mainly protected the
sending of messages. The close itself however may also cause a message
to be sent as is the case of the SockJS protocol.

This change protects the close and checks if the session has exceeded
send time or buffer limits in which case the close status is changed
to SESSION_NOT_RELIABLE (introduced in commit cbd5af3a) which in turn
signals that extra care should be exercised when closing the session.

Issue: SPR-13904
pull/966/head
Rossen Stoyanchev 10 years ago
parent
commit
f053cdec51
  1. 29
      spring-websocket/src/main/java/org/springframework/web/socket/handler/ConcurrentWebSocketSessionDecorator.java
  2. 99
      spring-websocket/src/test/java/org/springframework/web/socket/handler/ConcurrentWebSocketSessionDecoratorTests.java

29
spring-websocket/src/main/java/org/springframework/web/socket/handler/ConcurrentWebSocketSessionDecorator.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -169,8 +169,31 @@ public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorat @@ -169,8 +169,31 @@ public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorat
@Override
public void close(CloseStatus status) throws IOException {
this.shutdownInProgress = true;
super.close(status);
this.closeLock.lock();
try {
if (this.shutdownInProgress) {
return;
}
if (!CloseStatus.SESSION_NOT_RELIABLE.equals(status)) {
try {
checkSessionLimits();
}
catch (SessionLimitExceededException ex) {
// Ignore
}
if (this.limitExceeded) {
if (logger.isDebugEnabled()) {
logger.debug("Changing close status " + status + " to SESSION_NOT_RELIABLE.");
}
status = CloseStatus.SESSION_NOT_RELIABLE;
}
}
this.shutdownInProgress = true;
super.close(status);
}
finally {
this.closeLock.unlock();
}
}

99
spring-websocket/src/test/java/org/springframework/web/socket/handler/ConcurrentWebSocketSessionDecoratorTests.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -27,6 +27,7 @@ import org.junit.Test; @@ -27,6 +27,7 @@ import org.junit.Test;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import static org.junit.Assert.*;
@ -186,6 +187,59 @@ public class ConcurrentWebSocketSessionDecoratorTests { @@ -186,6 +187,59 @@ public class ConcurrentWebSocketSessionDecoratorTests {
}
}
@Test
public void closeStatusNormal() throws Exception {
BlockingSession delegate = new BlockingSession();
delegate.setOpen(true);
WebSocketSession decorator = new ConcurrentWebSocketSessionDecorator(delegate, 10 * 1000, 1024);
decorator.close(CloseStatus.PROTOCOL_ERROR);
assertEquals(CloseStatus.PROTOCOL_ERROR, delegate.getCloseStatus());
decorator.close(CloseStatus.SERVER_ERROR);
assertEquals("Should have been ignored", CloseStatus.PROTOCOL_ERROR, delegate.getCloseStatus());
}
@Test
public void closeStatusChangesToSessionNotReliable() throws Exception {
BlockingSession blockingSession = new BlockingSession();
blockingSession.setId("123");
blockingSession.setOpen(true);
CountDownLatch sentMessageLatch = blockingSession.getSentMessageLatch();
int sendTimeLimit = 100;
int bufferSizeLimit = 1024;
final ConcurrentWebSocketSessionDecorator concurrentSession =
new ConcurrentWebSocketSessionDecorator(blockingSession, sendTimeLimit, bufferSizeLimit);
Executors.newSingleThreadExecutor().submit(new Runnable() {
@Override
public void run() {
TextMessage message = new TextMessage("slow message");
try {
concurrentSession.sendMessage(message);
}
catch (IOException e) {
e.printStackTrace();
}
}
});
assertTrue(sentMessageLatch.await(5, TimeUnit.SECONDS));
// ensure some send time elapses
Thread.sleep(sendTimeLimit + 100);
concurrentSession.close(CloseStatus.PROTOCOL_ERROR);
assertEquals("CloseStatus should have changed to SESSION_NOT_RELIABLE",
CloseStatus.SESSION_NOT_RELIABLE, blockingSession.getCloseStatus());
}
private static class BlockingSession extends TestWebSocketSession {
@ -220,47 +274,4 @@ public class ConcurrentWebSocketSessionDecoratorTests { @@ -220,47 +274,4 @@ public class ConcurrentWebSocketSessionDecoratorTests {
}
// @Test
// public void sendSessionLimitException() throws IOException, InterruptedException {
//
// BlockingSession blockingSession = new BlockingSession();
// blockingSession.setOpen(true);
// CountDownLatch sentMessageLatch = blockingSession.getSentMessageLatch();
//
// int sendTimeLimit = 10 * 1000;
// int bufferSizeLimit = 1024;
//
// final ConcurrentWebSocketSessionDecorator concurrentSession =
// new ConcurrentWebSocketSessionDecorator(blockingSession, sendTimeLimit, bufferSizeLimit);
//
// Executors.newSingleThreadExecutor().submit(new Runnable() {
// @Override
// public void run() {
// TextMessage textMessage = new TextMessage("slow message");
// try {
// concurrentSession.sendMessage(textMessage);
// }
// catch (IOException e) {
// e.printStackTrace();
// }
// }
// });
//
// assertTrue(sentMessageLatch.await(5, TimeUnit.SECONDS));
//
// StringBuilder sb = new StringBuilder();
// for (int i=0 ; i < 1023; i++) {
// sb.append("a");
// }
//
// TextMessage message = new TextMessage(sb.toString());
// concurrentSession.sendMessage(message);
//
// assertEquals(1023, concurrentSession.getBufferSize());
// assertTrue(blockingSession.isOpen());
//
// concurrentSession.sendMessage(message);
// assertFalse(blockingSession.isOpen());
// }
}

Loading…
Cancel
Save