From c1f3da082c3b0fb470cee991d7ae96264d52f5bb Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 16 Jan 2014 12:15:04 -0500 Subject: [PATCH] Write prelude on successive SockJS streaming requests sockjs-client expects a prelude to be written on every request with streaming transports. The protocol tests don't make this clear and don't expose this issue. The test case for SPR-11183 (writing 20K messages in succession) did expose the issue and this commit addresses it. Issue: SPR-11183 --- .../SockJsMessageDeliveryException.java | 6 + .../AbstractHttpSendingTransportHandler.java | 2 +- .../handler/HtmlFileTransportHandler.java | 2 +- .../session/AbstractHttpSockJsSession.java | 119 +++++-- .../session/AbstractSockJsSession.java | 4 +- .../session/StreamingSockJsSession.java | 2 +- .../session/AbstractSockJsSessionTests.java | 280 ++-------------- .../BaseAbstractSockJsSessionTests.java | 74 ----- ...Tests.java => HttpSockJsSessionTests.java} | 46 ++- .../transport/session/SockJsSessionTests.java | 299 ++++++++++++++++++ .../WebSocketServerSockJsSessionTests.java | 4 +- 11 files changed, 452 insertions(+), 386 deletions(-) delete mode 100644 spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/BaseAbstractSockJsSessionTests.java rename spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/{AbstractHttpSockJsSessionTests.java => HttpSockJsSessionTests.java} (80%) create mode 100644 spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/SockJsSessionTests.java diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/SockJsMessageDeliveryException.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/SockJsMessageDeliveryException.java index 0de584e78d4..8d7016093e7 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/SockJsMessageDeliveryException.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/SockJsMessageDeliveryException.java @@ -40,6 +40,12 @@ public class SockJsMessageDeliveryException extends SockJsException { this.undeliveredMessages = undeliveredMessages; } + public SockJsMessageDeliveryException(String sessionId, List undeliveredMessages, String message) { + super("Failed to deliver message(s) " + undeliveredMessages + " for session " + + sessionId + ": " + message, sessionId, null); + this.undeliveredMessages = undeliveredMessages; + } + public List getUndeliveredMessages() { return this.undeliveredMessages; } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/handler/AbstractHttpSendingTransportHandler.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/handler/AbstractHttpSendingTransportHandler.java index 6e373b4599c..0bba697e103 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/handler/AbstractHttpSendingTransportHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/handler/AbstractHttpSendingTransportHandler.java @@ -78,7 +78,7 @@ public abstract class AbstractHttpSendingTransportHandler extends AbstractTransp } else if (!sockJsSession.isActive()) { logger.debug("starting " + getTransportType() + " async request"); - sockJsSession.startLongPollingRequest(request, response, getFrameFormat(request)); + sockJsSession.handleSuccessiveRequest(request, response, getFrameFormat(request)); } else { logger.debug("another " + getTransportType() + " connection still open: " + sockJsSession); diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/handler/HtmlFileTransportHandler.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/handler/HtmlFileTransportHandler.java index d9d3e9dbe3a..276ea2bda69 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/handler/HtmlFileTransportHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/handler/HtmlFileTransportHandler.java @@ -133,7 +133,7 @@ public class HtmlFileTransportHandler extends AbstractHttpSendingTransportHandle } @Override - protected void afterRequestUpdated() { + protected void writePrelude() { // we already validated the parameter above.. String callback = getCallbackParam(getRequest()); diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSession.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSession.java index 3d3315dba3e..6a95305bf38 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSession.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSession.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -127,10 +127,40 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { } + /** + * Handle the first HTTP request, i.e. the one that starts a SockJS session. + * Write a prelude to the response (if needed), send the SockJS "open" frame + * to indicate to the client the session is opened, and invoke the + * delegate WebSocketHandler to provide it with the newly opened session. + *

+ * The "xhr" and "jsonp" (polling-based) transports completes the initial request + * as soon as the open frame is sent. Following that the client should start a + * successive polling request within the same SockJS session. + *

+ * The "xhr_streaming", "eventsource", and "htmlfile" transports are streaming + * based and will leave the initial request open in order to stream one or + * more messages. However, even streaming based transports eventually recycle + * the long running request, after a certain number of bytes have been streamed + * (128K by default), and allow the client to start a successive request within + * the same SockJS session. + * + * @param request the current request + * @param response the current response + * @param frameFormat the transport-specific SocksJS frame format to use + * + * @see #handleSuccessiveRequest(org.springframework.http.server.ServerHttpRequest, org.springframework.http.server.ServerHttpResponse, org.springframework.web.socket.sockjs.frame.SockJsFrameFormat) + */ public synchronized void handleInitialRequest(ServerHttpRequest request, ServerHttpResponse response, SockJsFrameFormat frameFormat) throws SockJsException { - updateRequest(request, response, frameFormat); + initRequest(request, response, frameFormat); + + this.uri = request.getURI(); + this.handshakeHeaders = request.getHeaders(); + this.principal = request.getPrincipal(); + this.localAddress = request.getLocalAddress(); + this.remoteAddress = request.getRemoteAddress(); + try { writePrelude(); writeFrame(SockJsFrame.openFrame()); @@ -140,12 +170,6 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { throw new SockJsTransportFailureException("Failed to send \"open\" frame", getId(), ex); } - this.uri = request.getURI(); - this.handshakeHeaders = request.getHeaders(); - this.principal = request.getPrincipal(); - this.localAddress = request.getLocalAddress(); - this.remoteAddress = request.getRemoteAddress(); - try { delegateConnectionEstablished(); } @@ -154,13 +178,60 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { } } + private void initRequest(ServerHttpRequest request, ServerHttpResponse response, + SockJsFrameFormat frameFormat) { + + Assert.notNull(request, "Request must not be null"); + Assert.notNull(response, "Response must not be null"); + Assert.notNull(frameFormat, "SockJsFrameFormat must not be null"); + + this.request = request; + this.response = response; + this.asyncRequestControl = request.getAsyncRequestControl(response); + this.frameFormat = frameFormat; + } + protected void writePrelude() throws IOException { } - public synchronized void startLongPollingRequest(ServerHttpRequest request, + /** + * Handle all HTTP requests part of the same SockJS session except for the very + * first, initial request. Write a prelude (if needed) and keep the request + * open and ready to send a message from the server to the client. + *

+ * The "xhr" and "jsonp" (polling-based) transports completes the request when + * the next message is sent, which could be an array of messages cached during + * the time between successive requests, or it could be a heartbeat message + * sent if no other messages were sent (by default within 25 seconds). + *

+ * The "xhr_streaming", "eventsource", and "htmlfile" transports are streaming + * based and will leave the request open longer in order to stream messages over + * a period of time. However, even streaming based transports eventually recycle + * the long running request, after a certain number of bytes have been streamed + * (128K by default), and allow the client to start a successive request within + * the same SockJS session. + * + * @param request the current request + * @param response the current response + * @param frameFormat the transport-specific SocksJS frame format to use + * + * @see #handleInitialRequest(org.springframework.http.server.ServerHttpRequest, org.springframework.http.server.ServerHttpResponse, org.springframework.web.socket.sockjs.frame.SockJsFrameFormat) + */ + public synchronized void handleSuccessiveRequest(ServerHttpRequest request, ServerHttpResponse response, SockJsFrameFormat frameFormat) throws SockJsException { - updateRequest(request, response, frameFormat); + initRequest(request, response, frameFormat); + try { + writePrelude(); + } + catch (Throwable ex) { + tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR); + throw new SockJsTransportFailureException("Failed to send \"open\" frame", getId(), ex); + } + startAsyncRequest(); + } + + protected void startAsyncRequest() throws SockJsException { try { this.asyncRequestControl.start(-1); scheduleHeartbeat(); @@ -172,20 +243,6 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { } } - private void updateRequest(ServerHttpRequest request, ServerHttpResponse response, SockJsFrameFormat frameFormat) { - Assert.notNull(request, "Request must not be null"); - Assert.notNull(response, "Response must not be null"); - Assert.notNull(frameFormat, "SockJsFrameFormat must not be null"); - this.request = request; - this.response = response; - this.asyncRequestControl = request.getAsyncRequestControl(response); - this.frameFormat = frameFormat; - afterRequestUpdated(); - } - - protected void afterRequestUpdated() { - } - @Override public synchronized boolean isActive() { return (this.asyncRequestControl != null && !this.asyncRequestControl.isCompleted()); @@ -210,10 +267,22 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { } private void tryFlushCache() throws SockJsTransportFailureException { - if (isActive() && !getMessageCache().isEmpty()) { + if (this.messageCache.isEmpty()) { + logger.trace("Nothing to flush"); + return; + } + if (logger.isTraceEnabled()) { + logger.trace(this.messageCache.size() + " message(s) to flush"); + } + if (isActive()) { logger.trace("Flushing messages"); flushCache(); } + else { + if (logger.isTraceEnabled()) { + logger.trace("Not ready to flush"); + } + } } /** diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractSockJsSession.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractSockJsSession.java index cf6718c67c7..97aa048d74b 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractSockJsSession.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractSockJsSession.java @@ -16,9 +16,7 @@ package org.springframework.web.socket.sockjs.transport.session; -import java.io.EOFException; import java.io.IOException; -import java.net.SocketException; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; @@ -149,7 +147,7 @@ public abstract class AbstractSockJsSession implements SockJsSession { for (String message : messages) { try { if (isClosed()) { - throw new SockJsMessageDeliveryException(this.id, undelivered, null); + throw new SockJsMessageDeliveryException(this.id, undelivered, "Session closed"); } else { this.handler.handleMessage(this, new TextMessage(message)); diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/StreamingSockJsSession.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/StreamingSockJsSession.java index 381142ce6c3..7af49fb7695 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/StreamingSockJsSession.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/StreamingSockJsSession.java @@ -55,7 +55,7 @@ public class StreamingSockJsSession extends AbstractHttpSockJsSession { // the WebSocketHandler delegate may have closed the session if (!isClosed()) { - super.startLongPollingRequest(request, response, frameFormat); + super.startAsyncRequest(); } } diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/AbstractSockJsSessionTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/AbstractSockJsSessionTests.java index acffb113652..f80e249dca6 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/AbstractSockJsSessionTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/AbstractSockJsSessionTests.java @@ -16,285 +16,59 @@ package org.springframework.web.socket.sockjs.transport.session; -import java.io.IOException; -import java.sql.Date; -import java.util.Arrays; -import java.util.Collections; -import java.util.concurrent.ScheduledFuture; - -import org.junit.Test; -import org.springframework.web.socket.CloseStatus; -import org.springframework.web.socket.TextMessage; +import org.junit.Before; +import org.springframework.scheduling.TaskScheduler; import org.springframework.web.socket.WebSocketHandler; -import org.springframework.web.socket.sockjs.SockJsMessageDeliveryException; -import org.springframework.web.socket.sockjs.SockJsTransportFailureException; -import org.springframework.web.socket.sockjs.frame.SockJsFrame; -import org.springframework.web.socket.handler.ExceptionWebSocketHandlerDecorator; +import org.springframework.web.socket.sockjs.transport.session.AbstractSockJsSession; import static org.junit.Assert.*; -import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; /** - * Test fixture for {@link AbstractSockJsSession}. + * Base class for SockJS Session tests classes. * * @author Rossen Stoyanchev */ -public class AbstractSockJsSessionTests extends BaseAbstractSockJsSessionTests { - - - @Override - protected TestSockJsSession initSockJsSession() { - return new TestSockJsSession("1", this.sockJsConfig, this.webSocketHandler, - Collections.emptyMap()); - } - - @Test - public void getTimeSinceLastActive() throws Exception { - - Thread.sleep(1); - - long time1 = this.session.getTimeSinceLastActive(); - assertTrue(time1 > 0); - - Thread.sleep(1); - - long time2 = this.session.getTimeSinceLastActive(); - assertTrue(time2 > time1); - - this.session.delegateConnectionEstablished(); - - Thread.sleep(1); - - this.session.setActive(false); - assertTrue(this.session.getTimeSinceLastActive() > 0); - - this.session.setActive(true); - assertEquals(0, this.session.getTimeSinceLastActive()); - } - - @Test - public void delegateConnectionEstablished() throws Exception { - assertNew(); - this.session.delegateConnectionEstablished(); - assertOpen(); - verify(this.webSocketHandler).afterConnectionEstablished(this.session); - } - - @Test - public void delegateError() throws Exception { - Exception ex = new Exception(); - this.session.delegateError(ex); - verify(this.webSocketHandler).handleTransportError(this.session, ex); - } - - @Test - public void delegateMessages() throws Exception { - String msg1 = "message 1"; - String msg2 = "message 2"; - this.session.delegateMessages(new String[] { msg1, msg2 }); - - verify(this.webSocketHandler).handleMessage(this.session, new TextMessage(msg1)); - verify(this.webSocketHandler).handleMessage(this.session, new TextMessage(msg2)); - verifyNoMoreInteractions(this.webSocketHandler); - } - - @Test - public void delegateMessagesWithErrorAndConnectionClosing() throws Exception { - - WebSocketHandler wsHandler = new ExceptionWebSocketHandlerDecorator(this.webSocketHandler); - TestSockJsSession sockJsSession = new TestSockJsSession("1", this.sockJsConfig, - wsHandler, Collections.emptyMap()); - - String msg1 = "message 1"; - String msg2 = "message 2"; - String msg3 = "message 3"; - - doThrow(new IOException()).when(this.webSocketHandler).handleMessage(sockJsSession, new TextMessage(msg2)); - - sockJsSession.delegateConnectionEstablished(); - try { - sockJsSession.delegateMessages(new String[] { msg1, msg2, msg3 }); - fail("expected exception"); - } - catch (SockJsMessageDeliveryException ex) { - assertEquals(Arrays.asList(msg3), ex.getUndeliveredMessages()); - verify(this.webSocketHandler).afterConnectionEstablished(sockJsSession); - verify(this.webSocketHandler).handleMessage(sockJsSession, new TextMessage(msg1)); - verify(this.webSocketHandler).handleMessage(sockJsSession, new TextMessage(msg2)); - verify(this.webSocketHandler).afterConnectionClosed(sockJsSession, CloseStatus.SERVER_ERROR); - verifyNoMoreInteractions(this.webSocketHandler); - } - } - - @Test - public void delegateConnectionClosed() throws Exception { - this.session.delegateConnectionEstablished(); - this.session.delegateConnectionClosed(CloseStatus.GOING_AWAY); - - assertClosed(); - assertEquals(1, this.session.getNumberOfLastActiveTimeUpdates()); - assertTrue(this.session.didCancelHeartbeat()); - verify(this.webSocketHandler).afterConnectionClosed(this.session, CloseStatus.GOING_AWAY); - } - - @Test - public void closeWhenNotOpen() throws Exception { - - assertNew(); - - this.session.close(); - assertNull("Close not ignored for a new session", this.session.getCloseStatus()); - - this.session.delegateConnectionEstablished(); - assertOpen(); +public abstract class AbstractSockJsSessionTests { - this.session.close(); - assertClosed(); - assertEquals(3000, this.session.getCloseStatus().getCode()); + protected WebSocketHandler webSocketHandler; - this.session.close(CloseStatus.SERVER_ERROR); - assertEquals("Close should be ignored if already closed", 3000, this.session.getCloseStatus().getCode()); - } - - @Test - public void closeWhenNotActive() throws Exception { - - this.session.delegateConnectionEstablished(); - assertOpen(); - - this.session.setActive(false); - this.session.close(); - - assertEquals(Collections.emptyList(), this.session.getSockJsFramesWritten()); - } - - @Test - public void close() throws Exception { - - this.session.delegateConnectionEstablished(); - assertOpen(); - - this.session.setActive(true); - this.session.close(); - - assertEquals(1, this.session.getSockJsFramesWritten().size()); - assertEquals(SockJsFrame.closeFrameGoAway(), this.session.getSockJsFramesWritten().get(0)); - - assertEquals(1, this.session.getNumberOfLastActiveTimeUpdates()); - assertTrue(this.session.didCancelHeartbeat()); - - assertEquals(new CloseStatus(3000, "Go away!"), this.session.getCloseStatus()); - assertClosed(); - verify(this.webSocketHandler).afterConnectionClosed(this.session, new CloseStatus(3000, "Go away!")); - } - - @Test - public void closeWithWriteFrameExceptions() throws Exception { - - this.session.setExceptionOnWrite(new IOException()); - - this.session.delegateConnectionEstablished(); - this.session.setActive(true); - this.session.close(); - - assertEquals(new CloseStatus(3000, "Go away!"), this.session.getCloseStatus()); - assertClosed(); - } - - @Test - public void closeWithWebSocketHandlerExceptions() throws Exception { + protected StubSockJsServiceConfig sockJsConfig; - doThrow(new Exception()).when(this.webSocketHandler).afterConnectionClosed(this.session, CloseStatus.NORMAL); + protected TaskScheduler taskScheduler; - this.session.delegateConnectionEstablished(); - this.session.setActive(true); - this.session.close(CloseStatus.NORMAL); + protected S session; - assertEquals(CloseStatus.NORMAL, this.session.getCloseStatus()); - assertClosed(); - } - @Test - public void tryCloseWithWebSocketHandlerExceptions() throws Exception { + @Before + public void setUp() { + this.webSocketHandler = mock(WebSocketHandler.class); + this.taskScheduler = mock(TaskScheduler.class); - this.session.delegateConnectionEstablished(); - this.session.setActive(true); - this.session.tryCloseWithSockJsTransportError(new Exception(), CloseStatus.BAD_DATA); + this.sockJsConfig = new StubSockJsServiceConfig(); + this.sockJsConfig.setTaskScheduler(this.taskScheduler); - assertEquals(CloseStatus.BAD_DATA, this.session.getCloseStatus()); - assertClosed(); + this.session = initSockJsSession(); } - @Test - public void writeFrame() throws Exception { - this.session.writeFrame(SockJsFrame.openFrame()); + protected abstract S initSockJsSession(); - assertEquals(1, this.session.getSockJsFramesWritten().size()); - assertEquals(SockJsFrame.openFrame(), this.session.getSockJsFramesWritten().get(0)); + protected void assertNew() { + assertState(true, false, false); } - @Test - public void writeFrameIoException() throws Exception { - this.session.setExceptionOnWrite(new IOException()); - this.session.delegateConnectionEstablished(); - try { - this.session.writeFrame(SockJsFrame.openFrame()); - fail("expected exception"); - } - catch (SockJsTransportFailureException ex) { - assertEquals(CloseStatus.SERVER_ERROR, this.session.getCloseStatus()); - verify(this.webSocketHandler).afterConnectionClosed(this.session, CloseStatus.SERVER_ERROR); - } + protected void assertOpen() { + assertState(false, true, false); } - @Test - public void sendHeartbeatWhenNotActive() throws Exception { - this.session.setActive(false); - this.session.sendHeartbeat(); - - assertEquals(Collections.emptyList(), this.session.getSockJsFramesWritten()); + protected void assertClosed() { + assertState(false, false, true); } - @Test - public void sendHeartbeat() throws Exception { - this.session.setActive(true); - this.session.sendHeartbeat(); - - assertEquals(1, this.session.getSockJsFramesWritten().size()); - assertEquals(SockJsFrame.heartbeatFrame(), this.session.getSockJsFramesWritten().get(0)); - - verify(this.taskScheduler).schedule(any(Runnable.class), any(Date.class)); - verifyNoMoreInteractions(this.taskScheduler); - } - - @Test - public void scheduleHeartbeatNotActive() throws Exception { - this.session.setActive(false); - this.session.scheduleHeartbeat(); - - verifyNoMoreInteractions(this.taskScheduler); - } - - @Test - public void scheduleAndCancelHeartbeat() throws Exception { - - ScheduledFuture task = mock(ScheduledFuture.class); - doReturn(task).when(this.taskScheduler).schedule(any(Runnable.class), any(Date.class)); - - this.session.setActive(true); - this.session.scheduleHeartbeat(); - - verify(this.taskScheduler).schedule(any(Runnable.class), any(Date.class)); - verifyNoMoreInteractions(this.taskScheduler); - - doReturn(false).when(task).isDone(); - - this.session.cancelHeartbeat(); - - verify(task).isDone(); - verify(task).cancel(false); - verifyNoMoreInteractions(task); + private void assertState(boolean isNew, boolean isOpen, boolean isClosed) { + assertEquals(isNew, this.session.isNew()); + assertEquals(isOpen, this.session.isOpen()); + assertEquals(isClosed, this.session.isClosed()); } } diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/BaseAbstractSockJsSessionTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/BaseAbstractSockJsSessionTests.java deleted file mode 100644 index a4022104cfb..00000000000 --- a/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/BaseAbstractSockJsSessionTests.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright 2002-2013 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.web.socket.sockjs.transport.session; - -import org.junit.Before; -import org.springframework.scheduling.TaskScheduler; -import org.springframework.web.socket.WebSocketHandler; -import org.springframework.web.socket.sockjs.transport.session.AbstractSockJsSession; - -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - -/** - * Base class for {@link AbstractSockJsSession} classes. - * - * @author Rossen Stoyanchev - */ -public abstract class BaseAbstractSockJsSessionTests { - - protected WebSocketHandler webSocketHandler; - - protected StubSockJsServiceConfig sockJsConfig; - - protected TaskScheduler taskScheduler; - - protected S session; - - - @Before - public void setUp() { - this.webSocketHandler = mock(WebSocketHandler.class); - this.taskScheduler = mock(TaskScheduler.class); - - this.sockJsConfig = new StubSockJsServiceConfig(); - this.sockJsConfig.setTaskScheduler(this.taskScheduler); - - this.session = initSockJsSession(); - } - - protected abstract S initSockJsSession(); - - protected void assertNew() { - assertState(true, false, false); - } - - protected void assertOpen() { - assertState(false, true, false); - } - - protected void assertClosed() { - assertState(false, false, true); - } - - private void assertState(boolean isNew, boolean isOpen, boolean isClosed) { - assertEquals(isNew, this.session.isNew()); - assertEquals(isOpen, this.session.isOpen()); - assertEquals(isClosed, this.session.isClosed()); - } - -} diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSessionTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/HttpSockJsSessionTests.java similarity index 80% rename from spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSessionTests.java rename to spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/HttpSockJsSessionTests.java index 8c8f803349e..e5bbe43aee9 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSessionTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/HttpSockJsSessionTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,23 +28,22 @@ import org.springframework.http.server.ServletServerHttpRequest; import org.springframework.http.server.ServletServerHttpResponse; import org.springframework.mock.web.test.MockHttpServletRequest; import org.springframework.mock.web.test.MockHttpServletResponse; -import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.sockjs.frame.DefaultSockJsFrameFormat; import org.springframework.web.socket.sockjs.frame.SockJsFrameFormat; import org.springframework.web.socket.sockjs.frame.SockJsFrame; import org.springframework.web.socket.sockjs.transport.SockJsServiceConfig; -import org.springframework.web.socket.sockjs.transport.session.AbstractHttpSockJsSessionTests.TestAbstractHttpSockJsSession; +import org.springframework.web.socket.sockjs.transport.session.HttpSockJsSessionTests.TestAbstractHttpSockJsSession; import static org.junit.Assert.*; import static org.mockito.Mockito.*; /** - * Test fixture for {@link AbstractHttpSockJsSession}. + * Unit tests for {@link AbstractHttpSockJsSession}. * * @author Rossen Stoyanchev */ -public class AbstractHttpSockJsSessionTests extends BaseAbstractSockJsSessionTests { +public class HttpSockJsSessionTests extends AbstractSockJsSessionTests { protected ServerHttpRequest request; @@ -57,6 +56,11 @@ public class AbstractHttpSockJsSessionTests extends BaseAbstractSockJsSessionTes private SockJsFrameFormat frameFormat; + @Override + protected TestAbstractHttpSockJsSession initSockJsSession() { + return new TestAbstractHttpSockJsSession(this.sockJsConfig, this.webSocketHandler, null); + } + @Before public void setup() { @@ -72,30 +76,25 @@ public class AbstractHttpSockJsSessionTests extends BaseAbstractSockJsSessionTes this.request = new ServletServerHttpRequest(this.servletRequest); } - @Override - protected TestAbstractHttpSockJsSession initSockJsSession() { - return new TestAbstractHttpSockJsSession(this.sockJsConfig, this.webSocketHandler, null); - } - @Test - public void setInitialRequest() throws Exception { + public void handleInitialRequest() throws Exception { this.session.handleInitialRequest(this.request, this.response, this.frameFormat); assertTrue(this.session.hasRequest()); assertTrue(this.session.hasResponse()); - assertEquals("o", this.servletResponse.getContentAsString()); + assertEquals("hhh\no", this.servletResponse.getContentAsString()); assertFalse(this.servletRequest.isAsyncStarted()); verify(this.webSocketHandler).afterConnectionEstablished(this.session); } @Test - public void setLongPollingRequest() throws Exception { + public void handleSuccessiveRequest() throws Exception { this.session.getMessageCache().add("x"); - this.session.startLongPollingRequest(this.request, this.response, this.frameFormat); + this.session.handleSuccessiveRequest(this.request, this.response, this.frameFormat); assertTrue(this.session.hasRequest()); assertTrue(this.session.hasResponse()); @@ -104,19 +103,9 @@ public class AbstractHttpSockJsSessionTests extends BaseAbstractSockJsSessionTes assertTrue(this.session.wasHeartbeatScheduled()); assertTrue(this.session.wasCacheFlushed()); - verifyNoMoreInteractions(this.webSocketHandler); - } - - @Test - public void setLongPollingRequestWhenClosed() throws Exception { - - this.session.delegateConnectionClosed(CloseStatus.NORMAL); - assertClosed(); - - this.session.startLongPollingRequest(this.request, this.response, this.frameFormat); + assertEquals("hhh\n", this.servletResponse.getContentAsString()); - assertEquals("c[3000,\"Go away!\"]", this.servletResponse.getContentAsString()); - assertFalse(this.servletRequest.isAsyncStarted()); + verifyNoMoreInteractions(this.webSocketHandler); } @@ -135,6 +124,11 @@ public class AbstractHttpSockJsSessionTests extends BaseAbstractSockJsSessionTes super("1", config, handler, attributes); } + @Override + protected void writePrelude() throws IOException { + getResponse().getBody().write("hhh\n".getBytes()); + } + public boolean wasCacheFlushed() { return this.cacheFlushed; } diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/SockJsSessionTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/SockJsSessionTests.java new file mode 100644 index 00000000000..b4f5aec93fd --- /dev/null +++ b/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/SockJsSessionTests.java @@ -0,0 +1,299 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.socket.sockjs.transport.session; + +import java.io.IOException; +import java.sql.Date; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.ScheduledFuture; + +import org.junit.Test; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.sockjs.SockJsMessageDeliveryException; +import org.springframework.web.socket.sockjs.SockJsTransportFailureException; +import org.springframework.web.socket.sockjs.frame.SockJsFrame; +import org.springframework.web.socket.handler.ExceptionWebSocketHandlerDecorator; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +/** + * Test fixture for {@link AbstractSockJsSession}. + * + * @author Rossen Stoyanchev + */ +public class SockJsSessionTests extends AbstractSockJsSessionTests { + + + @Override + protected TestSockJsSession initSockJsSession() { + return new TestSockJsSession("1", this.sockJsConfig, this.webSocketHandler, Collections.emptyMap()); + } + + @Test + public void getTimeSinceLastActive() throws Exception { + + Thread.sleep(1); + + long time1 = this.session.getTimeSinceLastActive(); + assertTrue(time1 > 0); + + Thread.sleep(1); + + long time2 = this.session.getTimeSinceLastActive(); + assertTrue(time2 > time1); + + this.session.delegateConnectionEstablished(); + + Thread.sleep(1); + + this.session.setActive(false); + assertTrue(this.session.getTimeSinceLastActive() > 0); + + this.session.setActive(true); + assertEquals(0, this.session.getTimeSinceLastActive()); + } + + @Test + public void delegateConnectionEstablished() throws Exception { + assertNew(); + this.session.delegateConnectionEstablished(); + assertOpen(); + verify(this.webSocketHandler).afterConnectionEstablished(this.session); + } + + @Test + public void delegateError() throws Exception { + Exception ex = new Exception(); + this.session.delegateError(ex); + verify(this.webSocketHandler).handleTransportError(this.session, ex); + } + + @Test + public void delegateMessages() throws Exception { + String msg1 = "message 1"; + String msg2 = "message 2"; + this.session.delegateMessages(new String[] { msg1, msg2 }); + + verify(this.webSocketHandler).handleMessage(this.session, new TextMessage(msg1)); + verify(this.webSocketHandler).handleMessage(this.session, new TextMessage(msg2)); + verifyNoMoreInteractions(this.webSocketHandler); + } + + @Test + public void delegateMessagesWithErrorAndConnectionClosing() throws Exception { + + WebSocketHandler wsHandler = new ExceptionWebSocketHandlerDecorator(this.webSocketHandler); + TestSockJsSession sockJsSession = new TestSockJsSession("1", this.sockJsConfig, + wsHandler, Collections.emptyMap()); + + String msg1 = "message 1"; + String msg2 = "message 2"; + String msg3 = "message 3"; + + doThrow(new IOException()).when(this.webSocketHandler).handleMessage(sockJsSession, new TextMessage(msg2)); + + sockJsSession.delegateConnectionEstablished(); + try { + sockJsSession.delegateMessages(new String[] { msg1, msg2, msg3 }); + fail("expected exception"); + } + catch (SockJsMessageDeliveryException ex) { + assertEquals(Arrays.asList(msg3), ex.getUndeliveredMessages()); + verify(this.webSocketHandler).afterConnectionEstablished(sockJsSession); + verify(this.webSocketHandler).handleMessage(sockJsSession, new TextMessage(msg1)); + verify(this.webSocketHandler).handleMessage(sockJsSession, new TextMessage(msg2)); + verify(this.webSocketHandler).afterConnectionClosed(sockJsSession, CloseStatus.SERVER_ERROR); + verifyNoMoreInteractions(this.webSocketHandler); + } + } + + @Test + public void delegateConnectionClosed() throws Exception { + this.session.delegateConnectionEstablished(); + this.session.delegateConnectionClosed(CloseStatus.GOING_AWAY); + + assertClosed(); + assertEquals(1, this.session.getNumberOfLastActiveTimeUpdates()); + assertTrue(this.session.didCancelHeartbeat()); + verify(this.webSocketHandler).afterConnectionClosed(this.session, CloseStatus.GOING_AWAY); + } + + @Test + public void closeWhenNotOpen() throws Exception { + + assertNew(); + + this.session.close(); + assertNull("Close not ignored for a new session", this.session.getCloseStatus()); + + this.session.delegateConnectionEstablished(); + assertOpen(); + + this.session.close(); + assertClosed(); + assertEquals(3000, this.session.getCloseStatus().getCode()); + + this.session.close(CloseStatus.SERVER_ERROR); + assertEquals("Close should be ignored if already closed", 3000, this.session.getCloseStatus().getCode()); + } + + @Test + public void closeWhenNotActive() throws Exception { + + this.session.delegateConnectionEstablished(); + assertOpen(); + + this.session.setActive(false); + this.session.close(); + + assertEquals(Collections.emptyList(), this.session.getSockJsFramesWritten()); + } + + @Test + public void close() throws Exception { + + this.session.delegateConnectionEstablished(); + assertOpen(); + + this.session.setActive(true); + this.session.close(); + + assertEquals(1, this.session.getSockJsFramesWritten().size()); + assertEquals(SockJsFrame.closeFrameGoAway(), this.session.getSockJsFramesWritten().get(0)); + + assertEquals(1, this.session.getNumberOfLastActiveTimeUpdates()); + assertTrue(this.session.didCancelHeartbeat()); + + assertEquals(new CloseStatus(3000, "Go away!"), this.session.getCloseStatus()); + assertClosed(); + verify(this.webSocketHandler).afterConnectionClosed(this.session, new CloseStatus(3000, "Go away!")); + } + + @Test + public void closeWithWriteFrameExceptions() throws Exception { + + this.session.setExceptionOnWrite(new IOException()); + + this.session.delegateConnectionEstablished(); + this.session.setActive(true); + this.session.close(); + + assertEquals(new CloseStatus(3000, "Go away!"), this.session.getCloseStatus()); + assertClosed(); + } + + @Test + public void closeWithWebSocketHandlerExceptions() throws Exception { + + doThrow(new Exception()).when(this.webSocketHandler).afterConnectionClosed(this.session, CloseStatus.NORMAL); + + this.session.delegateConnectionEstablished(); + this.session.setActive(true); + this.session.close(CloseStatus.NORMAL); + + assertEquals(CloseStatus.NORMAL, this.session.getCloseStatus()); + assertClosed(); + } + + @Test + public void tryCloseWithWebSocketHandlerExceptions() throws Exception { + + this.session.delegateConnectionEstablished(); + this.session.setActive(true); + this.session.tryCloseWithSockJsTransportError(new Exception(), CloseStatus.BAD_DATA); + + assertEquals(CloseStatus.BAD_DATA, this.session.getCloseStatus()); + assertClosed(); + } + + @Test + public void writeFrame() throws Exception { + this.session.writeFrame(SockJsFrame.openFrame()); + + assertEquals(1, this.session.getSockJsFramesWritten().size()); + assertEquals(SockJsFrame.openFrame(), this.session.getSockJsFramesWritten().get(0)); + } + + @Test + public void writeFrameIoException() throws Exception { + this.session.setExceptionOnWrite(new IOException()); + this.session.delegateConnectionEstablished(); + try { + this.session.writeFrame(SockJsFrame.openFrame()); + fail("expected exception"); + } + catch (SockJsTransportFailureException ex) { + assertEquals(CloseStatus.SERVER_ERROR, this.session.getCloseStatus()); + verify(this.webSocketHandler).afterConnectionClosed(this.session, CloseStatus.SERVER_ERROR); + } + } + + @Test + public void sendHeartbeatWhenNotActive() throws Exception { + this.session.setActive(false); + this.session.sendHeartbeat(); + + assertEquals(Collections.emptyList(), this.session.getSockJsFramesWritten()); + } + + @Test + public void sendHeartbeat() throws Exception { + this.session.setActive(true); + this.session.sendHeartbeat(); + + assertEquals(1, this.session.getSockJsFramesWritten().size()); + assertEquals(SockJsFrame.heartbeatFrame(), this.session.getSockJsFramesWritten().get(0)); + + verify(this.taskScheduler).schedule(any(Runnable.class), any(Date.class)); + verifyNoMoreInteractions(this.taskScheduler); + } + + @Test + public void scheduleHeartbeatNotActive() throws Exception { + this.session.setActive(false); + this.session.scheduleHeartbeat(); + + verifyNoMoreInteractions(this.taskScheduler); + } + + @Test + public void scheduleAndCancelHeartbeat() throws Exception { + + ScheduledFuture task = mock(ScheduledFuture.class); + doReturn(task).when(this.taskScheduler).schedule(any(Runnable.class), any(Date.class)); + + this.session.setActive(true); + this.session.scheduleHeartbeat(); + + verify(this.taskScheduler).schedule(any(Runnable.class), any(Date.class)); + verifyNoMoreInteractions(this.taskScheduler); + + doReturn(false).when(task).isDone(); + + this.session.cancelHeartbeat(); + + verify(task).isDone(); + verify(task).cancel(false); + verifyNoMoreInteractions(task); + } + +} diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/WebSocketServerSockJsSessionTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/WebSocketServerSockJsSessionTests.java index 85620c85862..3a80b135615 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/WebSocketServerSockJsSessionTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/WebSocketServerSockJsSessionTests.java @@ -37,11 +37,11 @@ import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; /** - * Test fixture for {@link WebSocketServerSockJsSession}. + * Unit tests for {@link WebSocketServerSockJsSession}. * * @author Rossen Stoyanchev */ -public class WebSocketServerSockJsSessionTests extends BaseAbstractSockJsSessionTests { +public class WebSocketServerSockJsSessionTests extends AbstractSockJsSessionTests { private TestWebSocketSession webSocketSession;