Browse Source

Write prelude on successive SockJS streaming requests

sockjs-client expects a prelude to be written on every request with
streaming transports. The protocol tests don't make this clear and
don't expose this issue.

The test case for SPR-11183 (writing 20K messages in succession) did
expose the issue and this commit addresses it.

Issue: SPR-11183
pull/443/head
Rossen Stoyanchev 12 years ago
parent
commit
c1f3da082c
  1. 6
      spring-websocket/src/main/java/org/springframework/web/socket/sockjs/SockJsMessageDeliveryException.java
  2. 2
      spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/handler/AbstractHttpSendingTransportHandler.java
  3. 2
      spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/handler/HtmlFileTransportHandler.java
  4. 119
      spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSession.java
  5. 4
      spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractSockJsSession.java
  6. 2
      spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/StreamingSockJsSession.java
  7. 280
      spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/AbstractSockJsSessionTests.java
  8. 74
      spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/BaseAbstractSockJsSessionTests.java
  9. 46
      spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/HttpSockJsSessionTests.java
  10. 299
      spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/SockJsSessionTests.java
  11. 4
      spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/WebSocketServerSockJsSessionTests.java

6
spring-websocket/src/main/java/org/springframework/web/socket/sockjs/SockJsMessageDeliveryException.java

@ -40,6 +40,12 @@ public class SockJsMessageDeliveryException extends SockJsException {
this.undeliveredMessages = undeliveredMessages; this.undeliveredMessages = undeliveredMessages;
} }
public SockJsMessageDeliveryException(String sessionId, List<String> undeliveredMessages, String message) {
super("Failed to deliver message(s) " + undeliveredMessages + " for session "
+ sessionId + ": " + message, sessionId, null);
this.undeliveredMessages = undeliveredMessages;
}
public List<String> getUndeliveredMessages() { public List<String> getUndeliveredMessages() {
return this.undeliveredMessages; return this.undeliveredMessages;
} }

2
spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/handler/AbstractHttpSendingTransportHandler.java

@ -78,7 +78,7 @@ public abstract class AbstractHttpSendingTransportHandler extends AbstractTransp
} }
else if (!sockJsSession.isActive()) { else if (!sockJsSession.isActive()) {
logger.debug("starting " + getTransportType() + " async request"); logger.debug("starting " + getTransportType() + " async request");
sockJsSession.startLongPollingRequest(request, response, getFrameFormat(request)); sockJsSession.handleSuccessiveRequest(request, response, getFrameFormat(request));
} }
else { else {
logger.debug("another " + getTransportType() + " connection still open: " + sockJsSession); logger.debug("another " + getTransportType() + " connection still open: " + sockJsSession);

2
spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/handler/HtmlFileTransportHandler.java

@ -133,7 +133,7 @@ public class HtmlFileTransportHandler extends AbstractHttpSendingTransportHandle
} }
@Override @Override
protected void afterRequestUpdated() { protected void writePrelude() {
// we already validated the parameter above.. // we already validated the parameter above..
String callback = getCallbackParam(getRequest()); String callback = getCallbackParam(getRequest());

119
spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSession.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2013 the original author or authors. * Copyright 2002-2014 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -127,10 +127,40 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession {
} }
/**
* Handle the first HTTP request, i.e. the one that starts a SockJS session.
* Write a prelude to the response (if needed), send the SockJS "open" frame
* to indicate to the client the session is opened, and invoke the
* delegate WebSocketHandler to provide it with the newly opened session.
* <p>
* The "xhr" and "jsonp" (polling-based) transports completes the initial request
* as soon as the open frame is sent. Following that the client should start a
* successive polling request within the same SockJS session.
* <p>
* The "xhr_streaming", "eventsource", and "htmlfile" transports are streaming
* based and will leave the initial request open in order to stream one or
* more messages. However, even streaming based transports eventually recycle
* the long running request, after a certain number of bytes have been streamed
* (128K by default), and allow the client to start a successive request within
* the same SockJS session.
*
* @param request the current request
* @param response the current response
* @param frameFormat the transport-specific SocksJS frame format to use
*
* @see #handleSuccessiveRequest(org.springframework.http.server.ServerHttpRequest, org.springframework.http.server.ServerHttpResponse, org.springframework.web.socket.sockjs.frame.SockJsFrameFormat)
*/
public synchronized void handleInitialRequest(ServerHttpRequest request, ServerHttpResponse response, public synchronized void handleInitialRequest(ServerHttpRequest request, ServerHttpResponse response,
SockJsFrameFormat frameFormat) throws SockJsException { SockJsFrameFormat frameFormat) throws SockJsException {
updateRequest(request, response, frameFormat); initRequest(request, response, frameFormat);
this.uri = request.getURI();
this.handshakeHeaders = request.getHeaders();
this.principal = request.getPrincipal();
this.localAddress = request.getLocalAddress();
this.remoteAddress = request.getRemoteAddress();
try { try {
writePrelude(); writePrelude();
writeFrame(SockJsFrame.openFrame()); writeFrame(SockJsFrame.openFrame());
@ -140,12 +170,6 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession {
throw new SockJsTransportFailureException("Failed to send \"open\" frame", getId(), ex); throw new SockJsTransportFailureException("Failed to send \"open\" frame", getId(), ex);
} }
this.uri = request.getURI();
this.handshakeHeaders = request.getHeaders();
this.principal = request.getPrincipal();
this.localAddress = request.getLocalAddress();
this.remoteAddress = request.getRemoteAddress();
try { try {
delegateConnectionEstablished(); delegateConnectionEstablished();
} }
@ -154,13 +178,60 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession {
} }
} }
private void initRequest(ServerHttpRequest request, ServerHttpResponse response,
SockJsFrameFormat frameFormat) {
Assert.notNull(request, "Request must not be null");
Assert.notNull(response, "Response must not be null");
Assert.notNull(frameFormat, "SockJsFrameFormat must not be null");
this.request = request;
this.response = response;
this.asyncRequestControl = request.getAsyncRequestControl(response);
this.frameFormat = frameFormat;
}
protected void writePrelude() throws IOException { protected void writePrelude() throws IOException {
} }
public synchronized void startLongPollingRequest(ServerHttpRequest request, /**
* Handle all HTTP requests part of the same SockJS session except for the very
* first, initial request. Write a prelude (if needed) and keep the request
* open and ready to send a message from the server to the client.
* <p>
* The "xhr" and "jsonp" (polling-based) transports completes the request when
* the next message is sent, which could be an array of messages cached during
* the time between successive requests, or it could be a heartbeat message
* sent if no other messages were sent (by default within 25 seconds).
* <p>
* The "xhr_streaming", "eventsource", and "htmlfile" transports are streaming
* based and will leave the request open longer in order to stream messages over
* a period of time. However, even streaming based transports eventually recycle
* the long running request, after a certain number of bytes have been streamed
* (128K by default), and allow the client to start a successive request within
* the same SockJS session.
*
* @param request the current request
* @param response the current response
* @param frameFormat the transport-specific SocksJS frame format to use
*
* @see #handleInitialRequest(org.springframework.http.server.ServerHttpRequest, org.springframework.http.server.ServerHttpResponse, org.springframework.web.socket.sockjs.frame.SockJsFrameFormat)
*/
public synchronized void handleSuccessiveRequest(ServerHttpRequest request,
ServerHttpResponse response, SockJsFrameFormat frameFormat) throws SockJsException { ServerHttpResponse response, SockJsFrameFormat frameFormat) throws SockJsException {
updateRequest(request, response, frameFormat); initRequest(request, response, frameFormat);
try {
writePrelude();
}
catch (Throwable ex) {
tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR);
throw new SockJsTransportFailureException("Failed to send \"open\" frame", getId(), ex);
}
startAsyncRequest();
}
protected void startAsyncRequest() throws SockJsException {
try { try {
this.asyncRequestControl.start(-1); this.asyncRequestControl.start(-1);
scheduleHeartbeat(); scheduleHeartbeat();
@ -172,20 +243,6 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession {
} }
} }
private void updateRequest(ServerHttpRequest request, ServerHttpResponse response, SockJsFrameFormat frameFormat) {
Assert.notNull(request, "Request must not be null");
Assert.notNull(response, "Response must not be null");
Assert.notNull(frameFormat, "SockJsFrameFormat must not be null");
this.request = request;
this.response = response;
this.asyncRequestControl = request.getAsyncRequestControl(response);
this.frameFormat = frameFormat;
afterRequestUpdated();
}
protected void afterRequestUpdated() {
}
@Override @Override
public synchronized boolean isActive() { public synchronized boolean isActive() {
return (this.asyncRequestControl != null && !this.asyncRequestControl.isCompleted()); return (this.asyncRequestControl != null && !this.asyncRequestControl.isCompleted());
@ -210,10 +267,22 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession {
} }
private void tryFlushCache() throws SockJsTransportFailureException { private void tryFlushCache() throws SockJsTransportFailureException {
if (isActive() && !getMessageCache().isEmpty()) { if (this.messageCache.isEmpty()) {
logger.trace("Nothing to flush");
return;
}
if (logger.isTraceEnabled()) {
logger.trace(this.messageCache.size() + " message(s) to flush");
}
if (isActive()) {
logger.trace("Flushing messages"); logger.trace("Flushing messages");
flushCache(); flushCache();
} }
else {
if (logger.isTraceEnabled()) {
logger.trace("Not ready to flush");
}
}
} }
/** /**

4
spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractSockJsSession.java

@ -16,9 +16,7 @@
package org.springframework.web.socket.sockjs.transport.session; package org.springframework.web.socket.sockjs.transport.session;
import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.net.SocketException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date; import java.util.Date;
@ -149,7 +147,7 @@ public abstract class AbstractSockJsSession implements SockJsSession {
for (String message : messages) { for (String message : messages) {
try { try {
if (isClosed()) { if (isClosed()) {
throw new SockJsMessageDeliveryException(this.id, undelivered, null); throw new SockJsMessageDeliveryException(this.id, undelivered, "Session closed");
} }
else { else {
this.handler.handleMessage(this, new TextMessage(message)); this.handler.handleMessage(this, new TextMessage(message));

2
spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/StreamingSockJsSession.java

@ -55,7 +55,7 @@ public class StreamingSockJsSession extends AbstractHttpSockJsSession {
// the WebSocketHandler delegate may have closed the session // the WebSocketHandler delegate may have closed the session
if (!isClosed()) { if (!isClosed()) {
super.startLongPollingRequest(request, response, frameFormat); super.startAsyncRequest();
} }
} }

280
spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/AbstractSockJsSessionTests.java

@ -16,285 +16,59 @@
package org.springframework.web.socket.sockjs.transport.session; package org.springframework.web.socket.sockjs.transport.session;
import java.io.IOException; import org.junit.Before;
import java.sql.Date; import org.springframework.scheduling.TaskScheduler;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.ScheduledFuture;
import org.junit.Test;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.sockjs.SockJsMessageDeliveryException; import org.springframework.web.socket.sockjs.transport.session.AbstractSockJsSession;
import org.springframework.web.socket.sockjs.SockJsTransportFailureException;
import org.springframework.web.socket.sockjs.frame.SockJsFrame;
import org.springframework.web.socket.handler.ExceptionWebSocketHandlerDecorator;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
/** /**
* Test fixture for {@link AbstractSockJsSession}. * Base class for SockJS Session tests classes.
* *
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
*/ */
public class AbstractSockJsSessionTests extends BaseAbstractSockJsSessionTests<TestSockJsSession> { public abstract class AbstractSockJsSessionTests<S extends AbstractSockJsSession> {
@Override
protected TestSockJsSession initSockJsSession() {
return new TestSockJsSession("1", this.sockJsConfig, this.webSocketHandler,
Collections.<String, Object>emptyMap());
}
@Test
public void getTimeSinceLastActive() throws Exception {
Thread.sleep(1);
long time1 = this.session.getTimeSinceLastActive();
assertTrue(time1 > 0);
Thread.sleep(1);
long time2 = this.session.getTimeSinceLastActive();
assertTrue(time2 > time1);
this.session.delegateConnectionEstablished();
Thread.sleep(1);
this.session.setActive(false);
assertTrue(this.session.getTimeSinceLastActive() > 0);
this.session.setActive(true);
assertEquals(0, this.session.getTimeSinceLastActive());
}
@Test
public void delegateConnectionEstablished() throws Exception {
assertNew();
this.session.delegateConnectionEstablished();
assertOpen();
verify(this.webSocketHandler).afterConnectionEstablished(this.session);
}
@Test
public void delegateError() throws Exception {
Exception ex = new Exception();
this.session.delegateError(ex);
verify(this.webSocketHandler).handleTransportError(this.session, ex);
}
@Test
public void delegateMessages() throws Exception {
String msg1 = "message 1";
String msg2 = "message 2";
this.session.delegateMessages(new String[] { msg1, msg2 });
verify(this.webSocketHandler).handleMessage(this.session, new TextMessage(msg1));
verify(this.webSocketHandler).handleMessage(this.session, new TextMessage(msg2));
verifyNoMoreInteractions(this.webSocketHandler);
}
@Test
public void delegateMessagesWithErrorAndConnectionClosing() throws Exception {
WebSocketHandler wsHandler = new ExceptionWebSocketHandlerDecorator(this.webSocketHandler);
TestSockJsSession sockJsSession = new TestSockJsSession("1", this.sockJsConfig,
wsHandler, Collections.<String, Object>emptyMap());
String msg1 = "message 1";
String msg2 = "message 2";
String msg3 = "message 3";
doThrow(new IOException()).when(this.webSocketHandler).handleMessage(sockJsSession, new TextMessage(msg2));
sockJsSession.delegateConnectionEstablished();
try {
sockJsSession.delegateMessages(new String[] { msg1, msg2, msg3 });
fail("expected exception");
}
catch (SockJsMessageDeliveryException ex) {
assertEquals(Arrays.asList(msg3), ex.getUndeliveredMessages());
verify(this.webSocketHandler).afterConnectionEstablished(sockJsSession);
verify(this.webSocketHandler).handleMessage(sockJsSession, new TextMessage(msg1));
verify(this.webSocketHandler).handleMessage(sockJsSession, new TextMessage(msg2));
verify(this.webSocketHandler).afterConnectionClosed(sockJsSession, CloseStatus.SERVER_ERROR);
verifyNoMoreInteractions(this.webSocketHandler);
}
}
@Test
public void delegateConnectionClosed() throws Exception {
this.session.delegateConnectionEstablished();
this.session.delegateConnectionClosed(CloseStatus.GOING_AWAY);
assertClosed();
assertEquals(1, this.session.getNumberOfLastActiveTimeUpdates());
assertTrue(this.session.didCancelHeartbeat());
verify(this.webSocketHandler).afterConnectionClosed(this.session, CloseStatus.GOING_AWAY);
}
@Test
public void closeWhenNotOpen() throws Exception {
assertNew();
this.session.close();
assertNull("Close not ignored for a new session", this.session.getCloseStatus());
this.session.delegateConnectionEstablished();
assertOpen();
this.session.close(); protected WebSocketHandler webSocketHandler;
assertClosed();
assertEquals(3000, this.session.getCloseStatus().getCode());
this.session.close(CloseStatus.SERVER_ERROR); protected StubSockJsServiceConfig sockJsConfig;
assertEquals("Close should be ignored if already closed", 3000, this.session.getCloseStatus().getCode());
}
@Test
public void closeWhenNotActive() throws Exception {
this.session.delegateConnectionEstablished();
assertOpen();
this.session.setActive(false);
this.session.close();
assertEquals(Collections.emptyList(), this.session.getSockJsFramesWritten());
}
@Test
public void close() throws Exception {
this.session.delegateConnectionEstablished();
assertOpen();
this.session.setActive(true);
this.session.close();
assertEquals(1, this.session.getSockJsFramesWritten().size());
assertEquals(SockJsFrame.closeFrameGoAway(), this.session.getSockJsFramesWritten().get(0));
assertEquals(1, this.session.getNumberOfLastActiveTimeUpdates());
assertTrue(this.session.didCancelHeartbeat());
assertEquals(new CloseStatus(3000, "Go away!"), this.session.getCloseStatus());
assertClosed();
verify(this.webSocketHandler).afterConnectionClosed(this.session, new CloseStatus(3000, "Go away!"));
}
@Test
public void closeWithWriteFrameExceptions() throws Exception {
this.session.setExceptionOnWrite(new IOException());
this.session.delegateConnectionEstablished();
this.session.setActive(true);
this.session.close();
assertEquals(new CloseStatus(3000, "Go away!"), this.session.getCloseStatus());
assertClosed();
}
@Test
public void closeWithWebSocketHandlerExceptions() throws Exception {
doThrow(new Exception()).when(this.webSocketHandler).afterConnectionClosed(this.session, CloseStatus.NORMAL); protected TaskScheduler taskScheduler;
this.session.delegateConnectionEstablished(); protected S session;
this.session.setActive(true);
this.session.close(CloseStatus.NORMAL);
assertEquals(CloseStatus.NORMAL, this.session.getCloseStatus());
assertClosed();
}
@Test @Before
public void tryCloseWithWebSocketHandlerExceptions() throws Exception { public void setUp() {
this.webSocketHandler = mock(WebSocketHandler.class);
this.taskScheduler = mock(TaskScheduler.class);
this.session.delegateConnectionEstablished(); this.sockJsConfig = new StubSockJsServiceConfig();
this.session.setActive(true); this.sockJsConfig.setTaskScheduler(this.taskScheduler);
this.session.tryCloseWithSockJsTransportError(new Exception(), CloseStatus.BAD_DATA);
assertEquals(CloseStatus.BAD_DATA, this.session.getCloseStatus()); this.session = initSockJsSession();
assertClosed();
} }
@Test protected abstract S initSockJsSession();
public void writeFrame() throws Exception {
this.session.writeFrame(SockJsFrame.openFrame());
assertEquals(1, this.session.getSockJsFramesWritten().size()); protected void assertNew() {
assertEquals(SockJsFrame.openFrame(), this.session.getSockJsFramesWritten().get(0)); assertState(true, false, false);
} }
@Test protected void assertOpen() {
public void writeFrameIoException() throws Exception { assertState(false, true, false);
this.session.setExceptionOnWrite(new IOException());
this.session.delegateConnectionEstablished();
try {
this.session.writeFrame(SockJsFrame.openFrame());
fail("expected exception");
}
catch (SockJsTransportFailureException ex) {
assertEquals(CloseStatus.SERVER_ERROR, this.session.getCloseStatus());
verify(this.webSocketHandler).afterConnectionClosed(this.session, CloseStatus.SERVER_ERROR);
}
} }
@Test protected void assertClosed() {
public void sendHeartbeatWhenNotActive() throws Exception { assertState(false, false, true);
this.session.setActive(false);
this.session.sendHeartbeat();
assertEquals(Collections.emptyList(), this.session.getSockJsFramesWritten());
} }
@Test private void assertState(boolean isNew, boolean isOpen, boolean isClosed) {
public void sendHeartbeat() throws Exception { assertEquals(isNew, this.session.isNew());
this.session.setActive(true); assertEquals(isOpen, this.session.isOpen());
this.session.sendHeartbeat(); assertEquals(isClosed, this.session.isClosed());
assertEquals(1, this.session.getSockJsFramesWritten().size());
assertEquals(SockJsFrame.heartbeatFrame(), this.session.getSockJsFramesWritten().get(0));
verify(this.taskScheduler).schedule(any(Runnable.class), any(Date.class));
verifyNoMoreInteractions(this.taskScheduler);
}
@Test
public void scheduleHeartbeatNotActive() throws Exception {
this.session.setActive(false);
this.session.scheduleHeartbeat();
verifyNoMoreInteractions(this.taskScheduler);
}
@Test
public void scheduleAndCancelHeartbeat() throws Exception {
ScheduledFuture<?> task = mock(ScheduledFuture.class);
doReturn(task).when(this.taskScheduler).schedule(any(Runnable.class), any(Date.class));
this.session.setActive(true);
this.session.scheduleHeartbeat();
verify(this.taskScheduler).schedule(any(Runnable.class), any(Date.class));
verifyNoMoreInteractions(this.taskScheduler);
doReturn(false).when(task).isDone();
this.session.cancelHeartbeat();
verify(task).isDone();
verify(task).cancel(false);
verifyNoMoreInteractions(task);
} }
} }

74
spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/BaseAbstractSockJsSessionTests.java

@ -1,74 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.socket.sockjs.transport.session;
import org.junit.Before;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.sockjs.transport.session.AbstractSockJsSession;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
/**
* Base class for {@link AbstractSockJsSession} classes.
*
* @author Rossen Stoyanchev
*/
public abstract class BaseAbstractSockJsSessionTests<S extends AbstractSockJsSession> {
protected WebSocketHandler webSocketHandler;
protected StubSockJsServiceConfig sockJsConfig;
protected TaskScheduler taskScheduler;
protected S session;
@Before
public void setUp() {
this.webSocketHandler = mock(WebSocketHandler.class);
this.taskScheduler = mock(TaskScheduler.class);
this.sockJsConfig = new StubSockJsServiceConfig();
this.sockJsConfig.setTaskScheduler(this.taskScheduler);
this.session = initSockJsSession();
}
protected abstract S initSockJsSession();
protected void assertNew() {
assertState(true, false, false);
}
protected void assertOpen() {
assertState(false, true, false);
}
protected void assertClosed() {
assertState(false, false, true);
}
private void assertState(boolean isNew, boolean isOpen, boolean isClosed) {
assertEquals(isNew, this.session.isNew());
assertEquals(isOpen, this.session.isOpen());
assertEquals(isClosed, this.session.isClosed());
}
}

46
spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSessionTests.java → spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/HttpSockJsSessionTests.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2013 the original author or authors. * Copyright 2002-2014 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -28,23 +28,22 @@ import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.http.server.ServletServerHttpResponse; import org.springframework.http.server.ServletServerHttpResponse;
import org.springframework.mock.web.test.MockHttpServletRequest; import org.springframework.mock.web.test.MockHttpServletRequest;
import org.springframework.mock.web.test.MockHttpServletResponse; import org.springframework.mock.web.test.MockHttpServletResponse;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.sockjs.frame.DefaultSockJsFrameFormat; import org.springframework.web.socket.sockjs.frame.DefaultSockJsFrameFormat;
import org.springframework.web.socket.sockjs.frame.SockJsFrameFormat; import org.springframework.web.socket.sockjs.frame.SockJsFrameFormat;
import org.springframework.web.socket.sockjs.frame.SockJsFrame; import org.springframework.web.socket.sockjs.frame.SockJsFrame;
import org.springframework.web.socket.sockjs.transport.SockJsServiceConfig; import org.springframework.web.socket.sockjs.transport.SockJsServiceConfig;
import org.springframework.web.socket.sockjs.transport.session.AbstractHttpSockJsSessionTests.TestAbstractHttpSockJsSession; import org.springframework.web.socket.sockjs.transport.session.HttpSockJsSessionTests.TestAbstractHttpSockJsSession;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
/** /**
* Test fixture for {@link AbstractHttpSockJsSession}. * Unit tests for {@link AbstractHttpSockJsSession}.
* *
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
*/ */
public class AbstractHttpSockJsSessionTests extends BaseAbstractSockJsSessionTests<TestAbstractHttpSockJsSession> { public class HttpSockJsSessionTests extends AbstractSockJsSessionTests<TestAbstractHttpSockJsSession> {
protected ServerHttpRequest request; protected ServerHttpRequest request;
@ -57,6 +56,11 @@ public class AbstractHttpSockJsSessionTests extends BaseAbstractSockJsSessionTes
private SockJsFrameFormat frameFormat; private SockJsFrameFormat frameFormat;
@Override
protected TestAbstractHttpSockJsSession initSockJsSession() {
return new TestAbstractHttpSockJsSession(this.sockJsConfig, this.webSocketHandler, null);
}
@Before @Before
public void setup() { public void setup() {
@ -72,30 +76,25 @@ public class AbstractHttpSockJsSessionTests extends BaseAbstractSockJsSessionTes
this.request = new ServletServerHttpRequest(this.servletRequest); this.request = new ServletServerHttpRequest(this.servletRequest);
} }
@Override
protected TestAbstractHttpSockJsSession initSockJsSession() {
return new TestAbstractHttpSockJsSession(this.sockJsConfig, this.webSocketHandler, null);
}
@Test @Test
public void setInitialRequest() throws Exception { public void handleInitialRequest() throws Exception {
this.session.handleInitialRequest(this.request, this.response, this.frameFormat); this.session.handleInitialRequest(this.request, this.response, this.frameFormat);
assertTrue(this.session.hasRequest()); assertTrue(this.session.hasRequest());
assertTrue(this.session.hasResponse()); assertTrue(this.session.hasResponse());
assertEquals("o", this.servletResponse.getContentAsString()); assertEquals("hhh\no", this.servletResponse.getContentAsString());
assertFalse(this.servletRequest.isAsyncStarted()); assertFalse(this.servletRequest.isAsyncStarted());
verify(this.webSocketHandler).afterConnectionEstablished(this.session); verify(this.webSocketHandler).afterConnectionEstablished(this.session);
} }
@Test @Test
public void setLongPollingRequest() throws Exception { public void handleSuccessiveRequest() throws Exception {
this.session.getMessageCache().add("x"); this.session.getMessageCache().add("x");
this.session.startLongPollingRequest(this.request, this.response, this.frameFormat); this.session.handleSuccessiveRequest(this.request, this.response, this.frameFormat);
assertTrue(this.session.hasRequest()); assertTrue(this.session.hasRequest());
assertTrue(this.session.hasResponse()); assertTrue(this.session.hasResponse());
@ -104,19 +103,9 @@ public class AbstractHttpSockJsSessionTests extends BaseAbstractSockJsSessionTes
assertTrue(this.session.wasHeartbeatScheduled()); assertTrue(this.session.wasHeartbeatScheduled());
assertTrue(this.session.wasCacheFlushed()); assertTrue(this.session.wasCacheFlushed());
verifyNoMoreInteractions(this.webSocketHandler); assertEquals("hhh\n", this.servletResponse.getContentAsString());
}
@Test
public void setLongPollingRequestWhenClosed() throws Exception {
this.session.delegateConnectionClosed(CloseStatus.NORMAL);
assertClosed();
this.session.startLongPollingRequest(this.request, this.response, this.frameFormat);
assertEquals("c[3000,\"Go away!\"]", this.servletResponse.getContentAsString()); verifyNoMoreInteractions(this.webSocketHandler);
assertFalse(this.servletRequest.isAsyncStarted());
} }
@ -135,6 +124,11 @@ public class AbstractHttpSockJsSessionTests extends BaseAbstractSockJsSessionTes
super("1", config, handler, attributes); super("1", config, handler, attributes);
} }
@Override
protected void writePrelude() throws IOException {
getResponse().getBody().write("hhh\n".getBytes());
}
public boolean wasCacheFlushed() { public boolean wasCacheFlushed() {
return this.cacheFlushed; return this.cacheFlushed;
} }

299
spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/SockJsSessionTests.java

@ -0,0 +1,299 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.socket.sockjs.transport.session;
import java.io.IOException;
import java.sql.Date;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.ScheduledFuture;
import org.junit.Test;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.sockjs.SockJsMessageDeliveryException;
import org.springframework.web.socket.sockjs.SockJsTransportFailureException;
import org.springframework.web.socket.sockjs.frame.SockJsFrame;
import org.springframework.web.socket.handler.ExceptionWebSocketHandlerDecorator;
import static org.junit.Assert.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
/**
* Test fixture for {@link AbstractSockJsSession}.
*
* @author Rossen Stoyanchev
*/
public class SockJsSessionTests extends AbstractSockJsSessionTests<TestSockJsSession> {
@Override
protected TestSockJsSession initSockJsSession() {
return new TestSockJsSession("1", this.sockJsConfig, this.webSocketHandler, Collections.<String, Object>emptyMap());
}
@Test
public void getTimeSinceLastActive() throws Exception {
Thread.sleep(1);
long time1 = this.session.getTimeSinceLastActive();
assertTrue(time1 > 0);
Thread.sleep(1);
long time2 = this.session.getTimeSinceLastActive();
assertTrue(time2 > time1);
this.session.delegateConnectionEstablished();
Thread.sleep(1);
this.session.setActive(false);
assertTrue(this.session.getTimeSinceLastActive() > 0);
this.session.setActive(true);
assertEquals(0, this.session.getTimeSinceLastActive());
}
@Test
public void delegateConnectionEstablished() throws Exception {
assertNew();
this.session.delegateConnectionEstablished();
assertOpen();
verify(this.webSocketHandler).afterConnectionEstablished(this.session);
}
@Test
public void delegateError() throws Exception {
Exception ex = new Exception();
this.session.delegateError(ex);
verify(this.webSocketHandler).handleTransportError(this.session, ex);
}
@Test
public void delegateMessages() throws Exception {
String msg1 = "message 1";
String msg2 = "message 2";
this.session.delegateMessages(new String[] { msg1, msg2 });
verify(this.webSocketHandler).handleMessage(this.session, new TextMessage(msg1));
verify(this.webSocketHandler).handleMessage(this.session, new TextMessage(msg2));
verifyNoMoreInteractions(this.webSocketHandler);
}
@Test
public void delegateMessagesWithErrorAndConnectionClosing() throws Exception {
WebSocketHandler wsHandler = new ExceptionWebSocketHandlerDecorator(this.webSocketHandler);
TestSockJsSession sockJsSession = new TestSockJsSession("1", this.sockJsConfig,
wsHandler, Collections.<String, Object>emptyMap());
String msg1 = "message 1";
String msg2 = "message 2";
String msg3 = "message 3";
doThrow(new IOException()).when(this.webSocketHandler).handleMessage(sockJsSession, new TextMessage(msg2));
sockJsSession.delegateConnectionEstablished();
try {
sockJsSession.delegateMessages(new String[] { msg1, msg2, msg3 });
fail("expected exception");
}
catch (SockJsMessageDeliveryException ex) {
assertEquals(Arrays.asList(msg3), ex.getUndeliveredMessages());
verify(this.webSocketHandler).afterConnectionEstablished(sockJsSession);
verify(this.webSocketHandler).handleMessage(sockJsSession, new TextMessage(msg1));
verify(this.webSocketHandler).handleMessage(sockJsSession, new TextMessage(msg2));
verify(this.webSocketHandler).afterConnectionClosed(sockJsSession, CloseStatus.SERVER_ERROR);
verifyNoMoreInteractions(this.webSocketHandler);
}
}
@Test
public void delegateConnectionClosed() throws Exception {
this.session.delegateConnectionEstablished();
this.session.delegateConnectionClosed(CloseStatus.GOING_AWAY);
assertClosed();
assertEquals(1, this.session.getNumberOfLastActiveTimeUpdates());
assertTrue(this.session.didCancelHeartbeat());
verify(this.webSocketHandler).afterConnectionClosed(this.session, CloseStatus.GOING_AWAY);
}
@Test
public void closeWhenNotOpen() throws Exception {
assertNew();
this.session.close();
assertNull("Close not ignored for a new session", this.session.getCloseStatus());
this.session.delegateConnectionEstablished();
assertOpen();
this.session.close();
assertClosed();
assertEquals(3000, this.session.getCloseStatus().getCode());
this.session.close(CloseStatus.SERVER_ERROR);
assertEquals("Close should be ignored if already closed", 3000, this.session.getCloseStatus().getCode());
}
@Test
public void closeWhenNotActive() throws Exception {
this.session.delegateConnectionEstablished();
assertOpen();
this.session.setActive(false);
this.session.close();
assertEquals(Collections.emptyList(), this.session.getSockJsFramesWritten());
}
@Test
public void close() throws Exception {
this.session.delegateConnectionEstablished();
assertOpen();
this.session.setActive(true);
this.session.close();
assertEquals(1, this.session.getSockJsFramesWritten().size());
assertEquals(SockJsFrame.closeFrameGoAway(), this.session.getSockJsFramesWritten().get(0));
assertEquals(1, this.session.getNumberOfLastActiveTimeUpdates());
assertTrue(this.session.didCancelHeartbeat());
assertEquals(new CloseStatus(3000, "Go away!"), this.session.getCloseStatus());
assertClosed();
verify(this.webSocketHandler).afterConnectionClosed(this.session, new CloseStatus(3000, "Go away!"));
}
@Test
public void closeWithWriteFrameExceptions() throws Exception {
this.session.setExceptionOnWrite(new IOException());
this.session.delegateConnectionEstablished();
this.session.setActive(true);
this.session.close();
assertEquals(new CloseStatus(3000, "Go away!"), this.session.getCloseStatus());
assertClosed();
}
@Test
public void closeWithWebSocketHandlerExceptions() throws Exception {
doThrow(new Exception()).when(this.webSocketHandler).afterConnectionClosed(this.session, CloseStatus.NORMAL);
this.session.delegateConnectionEstablished();
this.session.setActive(true);
this.session.close(CloseStatus.NORMAL);
assertEquals(CloseStatus.NORMAL, this.session.getCloseStatus());
assertClosed();
}
@Test
public void tryCloseWithWebSocketHandlerExceptions() throws Exception {
this.session.delegateConnectionEstablished();
this.session.setActive(true);
this.session.tryCloseWithSockJsTransportError(new Exception(), CloseStatus.BAD_DATA);
assertEquals(CloseStatus.BAD_DATA, this.session.getCloseStatus());
assertClosed();
}
@Test
public void writeFrame() throws Exception {
this.session.writeFrame(SockJsFrame.openFrame());
assertEquals(1, this.session.getSockJsFramesWritten().size());
assertEquals(SockJsFrame.openFrame(), this.session.getSockJsFramesWritten().get(0));
}
@Test
public void writeFrameIoException() throws Exception {
this.session.setExceptionOnWrite(new IOException());
this.session.delegateConnectionEstablished();
try {
this.session.writeFrame(SockJsFrame.openFrame());
fail("expected exception");
}
catch (SockJsTransportFailureException ex) {
assertEquals(CloseStatus.SERVER_ERROR, this.session.getCloseStatus());
verify(this.webSocketHandler).afterConnectionClosed(this.session, CloseStatus.SERVER_ERROR);
}
}
@Test
public void sendHeartbeatWhenNotActive() throws Exception {
this.session.setActive(false);
this.session.sendHeartbeat();
assertEquals(Collections.emptyList(), this.session.getSockJsFramesWritten());
}
@Test
public void sendHeartbeat() throws Exception {
this.session.setActive(true);
this.session.sendHeartbeat();
assertEquals(1, this.session.getSockJsFramesWritten().size());
assertEquals(SockJsFrame.heartbeatFrame(), this.session.getSockJsFramesWritten().get(0));
verify(this.taskScheduler).schedule(any(Runnable.class), any(Date.class));
verifyNoMoreInteractions(this.taskScheduler);
}
@Test
public void scheduleHeartbeatNotActive() throws Exception {
this.session.setActive(false);
this.session.scheduleHeartbeat();
verifyNoMoreInteractions(this.taskScheduler);
}
@Test
public void scheduleAndCancelHeartbeat() throws Exception {
ScheduledFuture<?> task = mock(ScheduledFuture.class);
doReturn(task).when(this.taskScheduler).schedule(any(Runnable.class), any(Date.class));
this.session.setActive(true);
this.session.scheduleHeartbeat();
verify(this.taskScheduler).schedule(any(Runnable.class), any(Date.class));
verifyNoMoreInteractions(this.taskScheduler);
doReturn(false).when(task).isDone();
this.session.cancelHeartbeat();
verify(task).isDone();
verify(task).cancel(false);
verifyNoMoreInteractions(task);
}
}

4
spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/WebSocketServerSockJsSessionTests.java

@ -37,11 +37,11 @@ import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
/** /**
* Test fixture for {@link WebSocketServerSockJsSession}. * Unit tests for {@link WebSocketServerSockJsSession}.
* *
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
*/ */
public class WebSocketServerSockJsSessionTests extends BaseAbstractSockJsSessionTests<TestWebSocketServerSockJsSession> { public class WebSocketServerSockJsSessionTests extends AbstractSockJsSessionTests<TestWebSocketServerSockJsSession> {
private TestWebSocketSession webSocketSession; private TestWebSocketSession webSocketSession;

Loading…
Cancel
Save