From f053cdec5158ab7cbf01526f967c42aefa924e11 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 4 Feb 2016 17:22:36 -0500 Subject: [PATCH] Improve close in ConcurrentWebSocketSessionDecorator Before this commit the concurrent session wrapper mainly protected the sending of messages. The close itself however may also cause a message to be sent as is the case of the SockJS protocol. This change protects the close and checks if the session has exceeded send time or buffer limits in which case the close status is changed to SESSION_NOT_RELIABLE (introduced in commit cbd5af3a) which in turn signals that extra care should be exercised when closing the session. Issue: SPR-13904 --- .../ConcurrentWebSocketSessionDecorator.java | 29 +++++- ...currentWebSocketSessionDecoratorTests.java | 99 ++++++++++--------- 2 files changed, 81 insertions(+), 47 deletions(-) diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/handler/ConcurrentWebSocketSessionDecorator.java b/spring-websocket/src/main/java/org/springframework/web/socket/handler/ConcurrentWebSocketSessionDecorator.java index b0efe1753d4..16de9aa4ccd 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/handler/ConcurrentWebSocketSessionDecorator.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/handler/ConcurrentWebSocketSessionDecorator.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -169,8 +169,31 @@ public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorat @Override public void close(CloseStatus status) throws IOException { - this.shutdownInProgress = true; - super.close(status); + this.closeLock.lock(); + try { + if (this.shutdownInProgress) { + return; + } + if (!CloseStatus.SESSION_NOT_RELIABLE.equals(status)) { + try { + checkSessionLimits(); + } + catch (SessionLimitExceededException ex) { + // Ignore + } + if (this.limitExceeded) { + if (logger.isDebugEnabled()) { + logger.debug("Changing close status " + status + " to SESSION_NOT_RELIABLE."); + } + status = CloseStatus.SESSION_NOT_RELIABLE; + } + } + this.shutdownInProgress = true; + super.close(status); + } + finally { + this.closeLock.unlock(); + } } diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/handler/ConcurrentWebSocketSessionDecoratorTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/handler/ConcurrentWebSocketSessionDecoratorTests.java index 7e73979ab39..c4cdb02a426 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/handler/ConcurrentWebSocketSessionDecoratorTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/handler/ConcurrentWebSocketSessionDecoratorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,6 +27,7 @@ import org.junit.Test; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketMessage; +import org.springframework.web.socket.WebSocketSession; import static org.junit.Assert.*; @@ -186,6 +187,59 @@ public class ConcurrentWebSocketSessionDecoratorTests { } } + @Test + public void closeStatusNormal() throws Exception { + + BlockingSession delegate = new BlockingSession(); + delegate.setOpen(true); + WebSocketSession decorator = new ConcurrentWebSocketSessionDecorator(delegate, 10 * 1000, 1024); + + decorator.close(CloseStatus.PROTOCOL_ERROR); + assertEquals(CloseStatus.PROTOCOL_ERROR, delegate.getCloseStatus()); + + decorator.close(CloseStatus.SERVER_ERROR); + assertEquals("Should have been ignored", CloseStatus.PROTOCOL_ERROR, delegate.getCloseStatus()); + } + + @Test + public void closeStatusChangesToSessionNotReliable() throws Exception { + + BlockingSession blockingSession = new BlockingSession(); + blockingSession.setId("123"); + blockingSession.setOpen(true); + CountDownLatch sentMessageLatch = blockingSession.getSentMessageLatch(); + + int sendTimeLimit = 100; + int bufferSizeLimit = 1024; + + final ConcurrentWebSocketSessionDecorator concurrentSession = + new ConcurrentWebSocketSessionDecorator(blockingSession, sendTimeLimit, bufferSizeLimit); + + Executors.newSingleThreadExecutor().submit(new Runnable() { + @Override + public void run() { + TextMessage message = new TextMessage("slow message"); + try { + concurrentSession.sendMessage(message); + } + catch (IOException e) { + e.printStackTrace(); + } + } + }); + + assertTrue(sentMessageLatch.await(5, TimeUnit.SECONDS)); + + // ensure some send time elapses + Thread.sleep(sendTimeLimit + 100); + + concurrentSession.close(CloseStatus.PROTOCOL_ERROR); + + assertEquals("CloseStatus should have changed to SESSION_NOT_RELIABLE", + CloseStatus.SESSION_NOT_RELIABLE, blockingSession.getCloseStatus()); + } + + private static class BlockingSession extends TestWebSocketSession { @@ -220,47 +274,4 @@ public class ConcurrentWebSocketSessionDecoratorTests { } -// @Test -// public void sendSessionLimitException() throws IOException, InterruptedException { -// -// BlockingSession blockingSession = new BlockingSession(); -// blockingSession.setOpen(true); -// CountDownLatch sentMessageLatch = blockingSession.getSentMessageLatch(); -// -// int sendTimeLimit = 10 * 1000; -// int bufferSizeLimit = 1024; -// -// final ConcurrentWebSocketSessionDecorator concurrentSession = -// new ConcurrentWebSocketSessionDecorator(blockingSession, sendTimeLimit, bufferSizeLimit); -// -// Executors.newSingleThreadExecutor().submit(new Runnable() { -// @Override -// public void run() { -// TextMessage textMessage = new TextMessage("slow message"); -// try { -// concurrentSession.sendMessage(textMessage); -// } -// catch (IOException e) { -// e.printStackTrace(); -// } -// } -// }); -// -// assertTrue(sentMessageLatch.await(5, TimeUnit.SECONDS)); -// -// StringBuilder sb = new StringBuilder(); -// for (int i=0 ; i < 1023; i++) { -// sb.append("a"); -// } -// -// TextMessage message = new TextMessage(sb.toString()); -// concurrentSession.sendMessage(message); -// -// assertEquals(1023, concurrentSession.getBufferSize()); -// assertTrue(blockingSession.isOpen()); -// -// concurrentSession.sendMessage(message); -// assertFalse(blockingSession.isOpen()); -// } - }