From 309ffc6d0d2e62c85c09ad0ed0eca86eedad8df8 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Fri, 10 Aug 2018 16:14:22 +0300 Subject: [PATCH] OverflowStrategy in ConcurrentWebSocketSessionDecorator Issue: SPR-17140 --- .../ConcurrentWebSocketSessionDecorator.java | 70 +++++++++++++++++-- ...currentWebSocketSessionDecoratorTests.java | 34 ++++++++- 2 files changed, 95 insertions(+), 9 deletions(-) diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/handler/ConcurrentWebSocketSessionDecorator.java b/spring-websocket/src/main/java/org/springframework/web/socket/handler/ConcurrentWebSocketSessionDecorator.java index 43e80875e25..6e0cfe27406 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/handler/ConcurrentWebSocketSessionDecorator.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/handler/ConcurrentWebSocketSessionDecorator.java @@ -52,6 +52,8 @@ public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorat private final int bufferSizeLimit; + private final OverflowStrategy overflowStrategy; + private final Queue> buffer = new LinkedBlockingQueue<>(); private final AtomicInteger bufferSize = new AtomicInteger(); @@ -68,15 +70,31 @@ public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorat /** - * Create a new {@code ConcurrentWebSocketSessionDecorator}. + * Basic constructor. * @param delegate the {@code WebSocketSession} to delegate to * @param sendTimeLimit the send-time limit (milliseconds) * @param bufferSizeLimit the buffer-size limit (number of bytes) */ public ConcurrentWebSocketSessionDecorator(WebSocketSession delegate, int sendTimeLimit, int bufferSizeLimit) { + this(delegate, sendTimeLimit, bufferSizeLimit, OverflowStrategy.TERMINATE); + } + + /** + * Constructor that also specifies the overflow strategy to use. + * @param delegate the {@code WebSocketSession} to delegate to + * @param sendTimeLimit the send-time limit (milliseconds) + * @param bufferSizeLimit the buffer-size limit (number of bytes) + * @param overflowStrategy the overflow strategy to use; by default the + * session is terminated. + * @since 5.1 + */ + public ConcurrentWebSocketSessionDecorator( + WebSocketSession delegate, int sendTimeLimit, int bufferSizeLimit, OverflowStrategy overflowStrategy) { + super(delegate); this.sendTimeLimit = sendTimeLimit; this.bufferSizeLimit = bufferSizeLimit; + this.overflowStrategy = overflowStrategy; } @@ -148,7 +166,7 @@ public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorat if (message == null || shouldNotSend()) { break; } - this.bufferSize.addAndGet(message.getPayloadLength() * -1); + this.bufferSize.addAndGet(-message.getPayloadLength()); this.sendStartTime = System.currentTimeMillis(); getDelegate().sendMessage(message); this.sendStartTime = 0; @@ -167,14 +185,35 @@ public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorat if (!shouldNotSend() && this.closeLock.tryLock()) { try { if (getTimeSinceSendStarted() > getSendTimeLimit()) { - String format = "Message send time %d (ms) for session '%s' exceeded the allowed limit %d"; + String format = "Send time %d (ms) for session '%s' exceeded the allowed limit %d"; String reason = String.format(format, getTimeSinceSendStarted(), getId(), getSendTimeLimit()); limitExceeded(reason); } else if (getBufferSize() > getBufferSizeLimit()) { - String format = "The send buffer size %d bytes for session '%s' exceeded the allowed limit %d"; - String reason = String.format(format, getBufferSize(), getId(), getBufferSizeLimit()); - limitExceeded(reason); + switch (this.overflowStrategy) { + case TERMINATE: + String format = "Buffer size %d bytes for session '%s' exceeds the allowed limit %d"; + String reason = String.format(format, getBufferSize(), getId(), getBufferSizeLimit()); + limitExceeded(reason); + break; + case DROP: + int i = 0; + while (getBufferSize() > getBufferSizeLimit()) { + WebSocketMessage message = this.buffer.poll(); + if (message == null) { + break; + } + this.bufferSize.addAndGet(-message.getPayloadLength()); + i++; + } + if (logger.isDebugEnabled()) { + logger.debug("Dropped " + i + " messages, buffer size: " + getBufferSize()); + } + break; + default: + // Should never happen.. + throw new IllegalStateException("Unexpected OverflowStrategy: " + this.overflowStrategy); + } } } finally { @@ -223,4 +262,23 @@ public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorat return getDelegate().toString(); } + + /** + * Enum for options of what to do when the buffer fills up. + * @since 5.1 + */ + public enum OverflowStrategy { + + /** + * Throw {@link SessionLimitExceededException} that would will result + * in the session being terminated. + */ + TERMINATE, + + /** + * Drop the oldest messages from the buffer. + */ + DROP + } + } diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/handler/ConcurrentWebSocketSessionDecoratorTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/handler/ConcurrentWebSocketSessionDecoratorTests.java index e790ace4ce1..eed231bd244 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/handler/ConcurrentWebSocketSessionDecoratorTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/handler/ConcurrentWebSocketSessionDecoratorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,6 +28,7 @@ import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketMessage; import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator.OverflowStrategy; import static org.junit.Assert.*; @@ -104,7 +105,7 @@ public class ConcurrentWebSocketSessionDecoratorTests { } catch (SessionLimitExceededException ex) { String actual = ex.getMessage(); - String regex = "Message send time [\\d]+ \\(ms\\) for session '123' exceeded the allowed limit 100"; + String regex = "Send time [\\d]+ \\(ms\\) for session '123' exceeded the allowed limit 100"; assertTrue("Unexpected message: " + actual, actual.matches(regex)); assertEquals(CloseStatus.SESSION_NOT_RELIABLE, ex.getStatus()); } @@ -139,12 +140,39 @@ public class ConcurrentWebSocketSessionDecoratorTests { } catch (SessionLimitExceededException ex) { String actual = ex.getMessage(); - String regex = "The send buffer size [\\d]+ bytes for session '123' exceeded the allowed limit 1024"; + String regex = "Buffer size [\\d]+ bytes for session '123' exceeds the allowed limit 1024"; assertTrue("Unexpected message: " + actual, actual.matches(regex)); assertEquals(CloseStatus.SESSION_NOT_RELIABLE, ex.getStatus()); } } + @Test // SPR-17140 + public void overflowStrategyDrop() throws IOException, InterruptedException { + + BlockingSession session = new BlockingSession(); + session.setId("123"); + session.setOpen(true); + + final ConcurrentWebSocketSessionDecorator decorator = + new ConcurrentWebSocketSessionDecorator(session, 10*1000, 1024, OverflowStrategy.DROP); + + sendBlockingMessage(decorator); + + StringBuilder sb = new StringBuilder(); + for (int i = 0 ; i < 1023; i++) { + sb.append("a"); + } + + for (int i=0; i < 5; i++) { + TextMessage message = new TextMessage(sb.toString()); + decorator.sendMessage(message); + } + + assertEquals(1023, decorator.getBufferSize()); + assertTrue(session.isOpen()); + + } + @Test public void closeStatusNormal() throws Exception {