@ -18,6 +18,7 @@ package org.springframework.web.socket.handler;
import org.apache.commons.logging.Log ;
import org.apache.commons.logging.Log ;
import org.apache.commons.logging.LogFactory ;
import org.apache.commons.logging.LogFactory ;
import org.springframework.web.socket.CloseStatus ;
import org.springframework.web.socket.WebSocketMessage ;
import org.springframework.web.socket.WebSocketMessage ;
import org.springframework.web.socket.WebSocketSession ;
import org.springframework.web.socket.WebSocketSession ;
@ -58,7 +59,7 @@ public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorat
private final int sendTimeLimit ;
private final int sendTimeLimit ;
private volatile boolean sessionL imitExceeded;
private volatile boolean l imitExceeded;
private final Lock flushLock = new ReentrantLock ( ) ;
private final Lock flushLock = new ReentrantLock ( ) ;
@ -85,7 +86,7 @@ public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorat
public void sendMessage ( WebSocketMessage < ? > message ) throws IOException {
public void sendMessage ( WebSocketMessage < ? > message ) throws IOException {
if ( this . sessionL imitExceeded) {
if ( this . l imitExceeded) {
return ;
return ;
}
}
@ -94,8 +95,8 @@ public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorat
do {
do {
if ( ! tryFlushMessageBuffer ( ) ) {
if ( ! tryFlushMessageBuffer ( ) ) {
if ( logger . isDebug Enabled ( ) ) {
if ( logger . isTrace Enabled ( ) ) {
logger . debug ( "Another send already in progress, session id '" +
logger . trace ( "Another send already in progress, session id '" +
getId ( ) + "'" + ", in-progress send time " + getTimeSinceSendStarted ( ) +
getId ( ) + "'" + ", in-progress send time " + getTimeSinceSendStarted ( ) +
" (ms)" + ", buffer size " + this . bufferSize + " bytes" ) ;
" (ms)" + ", buffer size " + this . bufferSize + " bytes" ) ;
}
}
@ -107,7 +108,7 @@ public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorat
}
}
private boolean tryFlushMessageBuffer ( ) throws IOException {
private boolean tryFlushMessageBuffer ( ) throws IOException {
if ( this . flushLock . tryLock ( ) & & ! this . sessionL imitExceeded) {
if ( this . flushLock . tryLock ( ) & & ! this . l imitExceeded) {
try {
try {
while ( true ) {
while ( true ) {
WebSocketMessage < ? > messageToSend = this . buffer . poll ( ) ;
WebSocketMessage < ? > messageToSend = this . buffer . poll ( ) ;
@ -130,17 +131,22 @@ public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorat
}
}
private void checkSessionLimits ( ) throws IOException {
private void checkSessionLimits ( ) throws IOException {
if ( this . closeLock . tryLock ( ) & & ! this . sessionL imitExceeded) {
if ( this . closeLock . tryLock ( ) & & ! this . l imitExceeded) {
try {
try {
if ( getTimeSinceSendStarted ( ) > this . sendTimeLimit ) {
if ( getTimeSinceSendStarted ( ) > this . sendTimeLimit ) {
sessionLimitReached (
"Message send time " + getTimeSinceSendStarted ( ) +
String errorMessage = "Message send time " + getTimeSinceSendStarted ( ) +
" (ms) exceeded the allowed limit " + this . sendTimeLimit ) ;
" (ms) exceeded the allowed limit " + this . sendTimeLimit ;
sessionLimitReached ( errorMessage , CloseStatus . SESSION_NOT_RELIABLE ) ;
}
}
else if ( this . bufferSize . get ( ) > this . bufferSizeLimit ) {
else if ( this . bufferSize . get ( ) > this . bufferSizeLimit ) {
sessionLimitReached (
"The send buffer size " + this . bufferSize . get ( ) + " bytes for " +
String errorMessage = "The send buffer size " + this . bufferSize . get ( ) + " bytes for " +
"session '" + getId ( ) + " exceeded the allowed limit " + this . bufferSizeLimit ) ;
"session '" + getId ( ) + " exceeded the allowed limit " + this . bufferSizeLimit ;
sessionLimitReached ( errorMessage ,
( getTimeSinceSendStarted ( ) > = 10000 ? CloseStatus . SESSION_NOT_RELIABLE : null ) ) ;
}
}
}
}
finally {
finally {
@ -149,9 +155,9 @@ public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorat
}
}
}
}
private void sessionLimitReached ( String reason ) {
private void sessionLimitReached ( String reason , CloseStatus status ) {
this . sessionL imitExceeded = true ;
this . l imitExceeded = true ;
throw new SessionLimitExceededException ( reason ) ;
throw new SessionLimitExceededException ( reason , status ) ;
}
}
}
}