From 2794224b28430190c53c7cd284ccfe33c8eb5d1c Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 17 Apr 2013 11:19:28 -0400 Subject: [PATCH] Add onClosed to SockJsSessionSupport sub-classes As opposed to close(), which actively closes the session, the onClosed method is called when the underlying connection has been closed or disconnected. --- .../springframework/sockjs/SockJsSession.java | 2 + .../sockjs/SockJsSessionSupport.java | 5 + .../sockjs/server/AbstractServerSession.java | 12 +- .../sockjs/server/AbstractSockJsService.java | 33 ++---- .../sockjs/server/SockJsService.java | 2 - .../server/support/DefaultSockJsService.java | 14 +-- .../support/SockJsHttpRequestHandler.java | 42 +++++-- .../transport/AbstractHttpServerSession.java | 7 ++ .../AbstractSockJsWebSocketHandler.java | 108 ------------------ .../transport/SockJsWebSocketHandler.java | 75 ++++++++++-- .../WebSocketSockJsHandlerAdapter.java | 81 ++++++++++++- .../websocket/WebSocketSession.java | 2 + .../endpoint/StandardWebSocketSession.java | 5 + .../endpoint/WebSocketHandlerEndpoint.java | 47 ++++---- 14 files changed, 253 insertions(+), 182 deletions(-) delete mode 100644 spring-websocket/src/main/java/org/springframework/sockjs/server/transport/AbstractSockJsWebSocketHandler.java diff --git a/spring-websocket/src/main/java/org/springframework/sockjs/SockJsSession.java b/spring-websocket/src/main/java/org/springframework/sockjs/SockJsSession.java index 0dab8e17f52..e7cd75bf4b1 100644 --- a/spring-websocket/src/main/java/org/springframework/sockjs/SockJsSession.java +++ b/spring-websocket/src/main/java/org/springframework/sockjs/SockJsSession.java @@ -27,6 +27,8 @@ import java.io.IOException; */ public interface SockJsSession { + String getId(); + void sendMessage(String text) throws IOException; void close(); diff --git a/spring-websocket/src/main/java/org/springframework/sockjs/SockJsSessionSupport.java b/spring-websocket/src/main/java/org/springframework/sockjs/SockJsSessionSupport.java index 8c9747dc0a6..0dac36118db 100644 --- a/spring-websocket/src/main/java/org/springframework/sockjs/SockJsSessionSupport.java +++ b/spring-websocket/src/main/java/org/springframework/sockjs/SockJsSessionSupport.java @@ -114,6 +114,11 @@ public abstract class SockJsSessionSupport implements SockJsSession { this.sockJsHandler.handleException(this, ex); } + public void connectionClosed() { + this.state = State.CLOSED; + this.sockJsHandler.sessionClosed(this); + } + public void close() { this.state = State.CLOSED; this.sockJsHandler.sessionClosed(this); diff --git a/spring-websocket/src/main/java/org/springframework/sockjs/server/AbstractServerSession.java b/spring-websocket/src/main/java/org/springframework/sockjs/server/AbstractServerSession.java index caa436512b2..43473eb9972 100644 --- a/spring-websocket/src/main/java/org/springframework/sockjs/server/AbstractServerSession.java +++ b/spring-websocket/src/main/java/org/springframework/sockjs/server/AbstractServerSession.java @@ -58,10 +58,18 @@ public abstract class AbstractServerSession extends SockJsSessionSupport { protected abstract void sendMessageInternal(String message) throws IOException; + + @Override + public void connectionClosed() { + logger.debug("Session closed"); + super.close(); + cancelHeartbeat(); + } + + @Override public final synchronized void close() { if (!isClosed()) { logger.debug("Closing session"); - if (isActive()) { // deliver messages "in flight" before sending close frame try { @@ -71,9 +79,7 @@ public abstract class AbstractServerSession extends SockJsSessionSupport { // ignore } } - super.close(); - cancelHeartbeat(); closeInternal(); } diff --git a/spring-websocket/src/main/java/org/springframework/sockjs/server/AbstractSockJsService.java b/spring-websocket/src/main/java/org/springframework/sockjs/server/AbstractSockJsService.java index 16740af5562..399bf5d5d66 100644 --- a/spring-websocket/src/main/java/org/springframework/sockjs/server/AbstractSockJsService.java +++ b/spring-websocket/src/main/java/org/springframework/sockjs/server/AbstractSockJsService.java @@ -40,7 +40,6 @@ import org.springframework.util.CollectionUtils; import org.springframework.util.DigestUtils; import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; -import org.springframework.web.util.UriUtils; /** @@ -57,7 +56,7 @@ public abstract class AbstractSockJsService private static final int ONE_YEAR = 365 * 24 * 60 * 60; - private final String prefix; + private String name = getClass().getSimpleName() + "@" + Integer.toHexString(hashCode()); private String clientLibraryUrl = "https://d1fxtkz8shb9d2.cloudfront.net/sockjs-0.3.4.min.js"; @@ -74,31 +73,25 @@ public abstract class AbstractSockJsService private final TaskSchedulerHolder heartbeatSchedulerHolder; - /** - * Class constructor... - * - * @param prefix the path prefix for the SockJS service. All requests with a path - * that begins with the specified prefix will be handled by this service. In a - * Servlet container this is the path within the current servlet mapping. - */ - public AbstractSockJsService(String prefix) { - Assert.hasText(prefix, "prefix is required"); - this.prefix = prefix; + + public AbstractSockJsService() { this.heartbeatSchedulerHolder = new TaskSchedulerHolder("SockJs-heartbeat-"); } - public AbstractSockJsService(String prefix, TaskScheduler heartbeatScheduler) { - Assert.hasText(prefix, "prefix is required"); + public AbstractSockJsService(TaskScheduler heartbeatScheduler) { Assert.notNull(heartbeatScheduler, "heartbeatScheduler is required"); - this.prefix = prefix; this.heartbeatSchedulerHolder = new TaskSchedulerHolder(heartbeatScheduler); } /** - * The path prefix to which the SockJS service is mapped. + * A unique name for the service mainly for logging purposes. */ - public String getPrefix() { - return this.prefix; + public void setName(String name) { + this.name = name; + } + + public String getName() { + return this.name; } /** @@ -236,10 +229,6 @@ public abstract class AbstractSockJsService // Ignore invalid Content-Type (TODO) } - String path = UriUtils.decode(request.getURI().getPath(), "URF-8"); - int index = path.indexOf(this.prefix); - sockJsPath = path.substring(index + this.prefix.length()); - try { if (sockJsPath.equals("") || sockJsPath.equals("/")) { response.getHeaders().setContentType(new MediaType("text", "plain", Charset.forName("UTF-8"))); diff --git a/spring-websocket/src/main/java/org/springframework/sockjs/server/SockJsService.java b/spring-websocket/src/main/java/org/springframework/sockjs/server/SockJsService.java index dfb1e002d46..23c79be7d21 100644 --- a/spring-websocket/src/main/java/org/springframework/sockjs/server/SockJsService.java +++ b/spring-websocket/src/main/java/org/springframework/sockjs/server/SockJsService.java @@ -31,8 +31,6 @@ import org.springframework.websocket.WebSocketHandler; */ public interface SockJsService { - String getPrefix(); - /** * Pre-register {@link SockJsHandler} instances so they can be adapted to * {@link WebSocketHandler} and hence re-used at runtime when diff --git a/spring-websocket/src/main/java/org/springframework/sockjs/server/support/DefaultSockJsService.java b/spring-websocket/src/main/java/org/springframework/sockjs/server/support/DefaultSockJsService.java index b0f311ec64f..60f8143f706 100644 --- a/spring-websocket/src/main/java/org/springframework/sockjs/server/support/DefaultSockJsService.java +++ b/spring-websocket/src/main/java/org/springframework/sockjs/server/support/DefaultSockJsService.java @@ -70,13 +70,11 @@ public class DefaultSockJsService extends AbstractSockJsService implements Initi private final Map sockJsHandlers = new HashMap(); - public DefaultSockJsService(String prefix) { - super(prefix); + public DefaultSockJsService() { this.sessionTimeoutSchedulerHolder = new TaskSchedulerHolder("SockJs-sessionTimeout-"); } - public DefaultSockJsService(String prefix, TaskScheduler heartbeatScheduler, TaskScheduler sessionTimeoutScheduler) { - super(prefix, heartbeatScheduler); + public DefaultSockJsService(TaskScheduler heartbeatScheduler, TaskScheduler sessionTimeoutScheduler) { Assert.notNull(sessionTimeoutScheduler, "sessionTimeoutScheduler is required"); this.sessionTimeoutSchedulerHolder = new TaskSchedulerHolder(sessionTimeoutScheduler); } @@ -146,23 +144,23 @@ public class DefaultSockJsService extends AbstractSockJsService implements Initi try { int count = sessions.size(); if (logger.isTraceEnabled() && (count != 0)) { - logger.trace("Checking " + count + " session(s) for timeouts [" + getPrefix() + "]"); + logger.trace("Checking " + count + " session(s) for timeouts [" + getName() + "]"); } for (SockJsSessionSupport session : sessions.values()) { if (session.getTimeSinceLastActive() > getDisconnectDelay()) { if (logger.isTraceEnabled()) { - logger.trace("Removing " + session + " for [" + getPrefix() + "]"); + logger.trace("Removing " + session + " for [" + getName() + "]"); } session.close(); sessions.remove(session.getId()); } } if (logger.isTraceEnabled() && (count != 0)) { - logger.trace(sessions.size() + " remaining session(s) [" + getPrefix() + "]"); + logger.trace(sessions.size() + " remaining session(s) [" + getName() + "]"); } } catch (Throwable t) { - logger.error("Failed to complete session timeout checks for [" + getPrefix() + "]", t); + logger.error("Failed to complete session timeout checks for [" + getName() + "]", t); } } }, getDisconnectDelay()); diff --git a/spring-websocket/src/main/java/org/springframework/sockjs/server/support/SockJsHttpRequestHandler.java b/spring-websocket/src/main/java/org/springframework/sockjs/server/support/SockJsHttpRequestHandler.java index 977f9eb8b9d..22d058e0eb9 100644 --- a/spring-websocket/src/main/java/org/springframework/sockjs/server/support/SockJsHttpRequestHandler.java +++ b/spring-websocket/src/main/java/org/springframework/sockjs/server/support/SockJsHttpRequestHandler.java @@ -46,6 +46,8 @@ import org.springframework.websocket.HandlerProvider; */ public class SockJsHttpRequestHandler implements HttpRequestHandler, BeanFactoryAware { + private final String prefix; + private final SockJsService sockJsService; private final HandlerProvider handlerProvider; @@ -53,23 +55,50 @@ public class SockJsHttpRequestHandler implements HttpRequestHandler, BeanFactory private final UrlPathHelper urlPathHelper = new UrlPathHelper(); - public SockJsHttpRequestHandler(SockJsService sockJsService, SockJsHandler sockJsHandler) { + /** + * Class constructor with {@link SockJsHandler} instance ... + * + * @param prefix the path prefix for the SockJS service. All requests with a path + * that begins with the specified prefix will be handled by this service. In a + * Servlet container this is the path within the current servlet mapping. + */ + public SockJsHttpRequestHandler(String prefix, SockJsService sockJsService, SockJsHandler sockJsHandler) { + + Assert.hasText(prefix, "prefix is required"); Assert.notNull(sockJsService, "sockJsService is required"); Assert.notNull(sockJsHandler, "sockJsHandler is required"); + + this.prefix = prefix; this.sockJsService = sockJsService; this.sockJsService.registerSockJsHandlers(Collections.singleton(sockJsHandler)); this.handlerProvider = new HandlerProvider(sockJsHandler); } - public SockJsHttpRequestHandler(SockJsService sockJsService, Class sockJsHandlerClass) { + /** + * Class constructor with {@link SockJsHandler} type (per request) ... + * + * @param prefix the path prefix for the SockJS service. All requests with a path + * that begins with the specified prefix will be handled by this service. In a + * Servlet container this is the path within the current servlet mapping. + */ + public SockJsHttpRequestHandler(String prefix, SockJsService sockJsService, + Class sockJsHandlerClass) { + + Assert.hasText(prefix, "prefix is required"); Assert.notNull(sockJsService, "sockJsService is required"); Assert.notNull(sockJsHandlerClass, "sockJsHandlerClass is required"); + + this.prefix = prefix; this.sockJsService = sockJsService; this.handlerProvider = new HandlerProvider(sockJsHandlerClass); } - public String getMappingPattern() { - return this.sockJsService.getPrefix() + "/**"; + public String getPrefix() { + return this.prefix; + } + + public String getPattern() { + return this.prefix + "/**"; } @Override @@ -82,10 +111,9 @@ public class SockJsHttpRequestHandler implements HttpRequestHandler, BeanFactory throws ServletException, IOException { String lookupPath = this.urlPathHelper.getLookupPathForRequest(request); - String prefix = this.sockJsService.getPrefix(); - Assert.isTrue(lookupPath.startsWith(prefix), - "Request path does not match the prefix of the SockJsService " + prefix); + Assert.isTrue(lookupPath.startsWith(this.prefix), + "Request path does not match the prefix of the SockJsService " + this.prefix); String sockJsPath = lookupPath.substring(prefix.length()); diff --git a/spring-websocket/src/main/java/org/springframework/sockjs/server/transport/AbstractHttpServerSession.java b/spring-websocket/src/main/java/org/springframework/sockjs/server/transport/AbstractHttpServerSession.java index e03fb4a4c97..47efdea55b8 100644 --- a/spring-websocket/src/main/java/org/springframework/sockjs/server/transport/AbstractHttpServerSession.java +++ b/spring-websocket/src/main/java/org/springframework/sockjs/server/transport/AbstractHttpServerSession.java @@ -108,6 +108,13 @@ public abstract class AbstractHttpServerSession extends AbstractServerSession { */ protected abstract void flushCache() throws IOException; + @Override + public void connectionClosed() { + super.connectionClosed(); + resetRequest(); + } + + @Override protected void closeInternal() { resetRequest(); } diff --git a/spring-websocket/src/main/java/org/springframework/sockjs/server/transport/AbstractSockJsWebSocketHandler.java b/spring-websocket/src/main/java/org/springframework/sockjs/server/transport/AbstractSockJsWebSocketHandler.java deleted file mode 100644 index 2f723092fee..00000000000 --- a/spring-websocket/src/main/java/org/springframework/sockjs/server/transport/AbstractSockJsWebSocketHandler.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Copyright 2002-2013 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.sockjs.server.transport; - -import java.io.InputStream; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.springframework.sockjs.SockJsHandler; -import org.springframework.sockjs.SockJsSessionSupport; -import org.springframework.sockjs.server.SockJsConfiguration; -import org.springframework.util.Assert; -import org.springframework.websocket.WebSocketHandler; -import org.springframework.websocket.WebSocketSession; - - -/** - * - * @author Rossen Stoyanchev - * @since 4.0 - */ -public abstract class AbstractSockJsWebSocketHandler implements WebSocketHandler { - - protected final Log logger = LogFactory.getLog(getClass()); - - private final SockJsConfiguration sockJsConfig; - - private final SockJsHandler sockJsHandler; - - private final Map sessions = - new ConcurrentHashMap(); - - - public AbstractSockJsWebSocketHandler(SockJsConfiguration sockJsConfig, SockJsHandler sockJsHandler) { - Assert.notNull(sockJsConfig, "sockJsConfig is required"); - Assert.notNull(sockJsHandler, "sockJsHandler is required"); - this.sockJsConfig = sockJsConfig; - this.sockJsHandler = sockJsHandler; - } - - protected SockJsConfiguration getSockJsConfig() { - return this.sockJsConfig; - } - - protected SockJsHandler getSockJsHandler() { - return this.sockJsHandler; - } - - protected SockJsSessionSupport getSockJsSession(WebSocketSession wsSession) { - return this.sessions.get(wsSession); - } - - @Override - public void newSession(WebSocketSession wsSession) throws Exception { - if (logger.isDebugEnabled()) { - logger.debug("New session: " + wsSession); - } - SockJsSessionSupport session = createSockJsSession(wsSession); - this.sessions.put(wsSession, session); - } - - protected abstract SockJsSessionSupport createSockJsSession(WebSocketSession wsSession) throws Exception; - - @Override - public void handleTextMessage(WebSocketSession wsSession, String message) throws Exception { - if (logger.isTraceEnabled()) { - logger.trace("Received payload " + message); - } - SockJsSessionSupport session = getSockJsSession(wsSession); - session.delegateMessages(message); - } - - @Override - public void handleBinaryMessage(WebSocketSession session, InputStream message) throws Exception { - // should not happen - throw new UnsupportedOperationException(); - } - - @Override - public void handleException(WebSocketSession webSocketSession, Throwable exception) { - SockJsSessionSupport session = getSockJsSession(webSocketSession); - session.delegateException(exception); - } - - @Override - public void sessionClosed(WebSocketSession webSocketSession, int statusCode, String reason) throws Exception { - logger.debug("WebSocket connection closed " + webSocketSession); - SockJsSessionSupport session = this.sessions.remove(webSocketSession); - session.close(); - } - -} diff --git a/spring-websocket/src/main/java/org/springframework/sockjs/server/transport/SockJsWebSocketHandler.java b/spring-websocket/src/main/java/org/springframework/sockjs/server/transport/SockJsWebSocketHandler.java index f90fbdb13e8..2da57cbf354 100644 --- a/spring-websocket/src/main/java/org/springframework/sockjs/server/transport/SockJsWebSocketHandler.java +++ b/spring-websocket/src/main/java/org/springframework/sockjs/server/transport/SockJsWebSocketHandler.java @@ -17,12 +17,18 @@ package org.springframework.sockjs.server.transport; import java.io.IOException; +import java.io.InputStream; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.springframework.sockjs.SockJsHandler; import org.springframework.sockjs.SockJsSessionSupport; import org.springframework.sockjs.server.AbstractServerSession; import org.springframework.sockjs.server.SockJsConfiguration; import org.springframework.sockjs.server.SockJsFrame; +import org.springframework.util.Assert; import org.springframework.util.StringUtils; import org.springframework.websocket.WebSocketHandler; import org.springframework.websocket.WebSocketSession; @@ -37,19 +43,47 @@ import com.fasterxml.jackson.databind.ObjectMapper; * @author Rossen Stoyanchev * @since 4.0 */ -public class SockJsWebSocketHandler extends AbstractSockJsWebSocketHandler { +public class SockJsWebSocketHandler implements WebSocketHandler { + + private static final Log logger = LogFactory.getLog(SockJsWebSocketHandler.class); + + private final SockJsConfiguration sockJsConfig; + + private final SockJsHandler sockJsHandler; + + private final Map sessions = + new ConcurrentHashMap(); // TODO: JSON library used must be configurable private final ObjectMapper objectMapper = new ObjectMapper(); public SockJsWebSocketHandler(SockJsConfiguration sockJsConfig, SockJsHandler sockJsHandler) { - super(sockJsConfig, sockJsHandler); + Assert.notNull(sockJsConfig, "sockJsConfig is required"); + Assert.notNull(sockJsHandler, "sockJsHandler is required"); + this.sockJsConfig = sockJsConfig; + this.sockJsHandler = sockJsHandler; + } + + protected SockJsConfiguration getSockJsConfig() { + return this.sockJsConfig; + } + + protected SockJsHandler getSockJsHandler() { + return this.sockJsHandler; + } + + protected SockJsSessionSupport getSockJsSession(WebSocketSession wsSession) { + return this.sessions.get(wsSession); } @Override - protected SockJsSessionSupport createSockJsSession(WebSocketSession wsSession) throws Exception { - return new WebSocketServerSession(wsSession, getSockJsConfig()); + public void newSession(WebSocketSession wsSession) throws Exception { + if (logger.isDebugEnabled()) { + logger.debug("New session: " + wsSession); + } + SockJsSessionSupport session = new WebSocketServerSession(wsSession, getSockJsConfig()); + this.sessions.put(wsSession, session); } @Override @@ -72,6 +106,25 @@ public class SockJsWebSocketHandler extends AbstractSockJsWebSocketHandler { } } + @Override + public void handleBinaryMessage(WebSocketSession session, InputStream message) throws Exception { + // should not happen + throw new UnsupportedOperationException(); + } + + @Override + public void handleException(WebSocketSession webSocketSession, Throwable exception) { + SockJsSessionSupport session = getSockJsSession(webSocketSession); + session.delegateException(exception); + } + + @Override + public void sessionClosed(WebSocketSession webSocketSession, int statusCode, String reason) throws Exception { + logger.debug("WebSocket session closed " + webSocketSession); + SockJsSessionSupport session = this.sessions.remove(webSocketSession); + session.connectionClosed(); + } + private class WebSocketServerSession extends AbstractServerSession { @@ -107,15 +160,23 @@ public class SockJsWebSocketHandler extends AbstractSockJsWebSocketHandler { } @Override - public void closeInternal() { - this.webSocketSession.close(); + public void connectionClosed() { + super.connectionClosed(); this.webSocketSession = null; + } + + @Override + public void closeInternal() { + deactivate(); updateLastActiveTime(); } @Override protected void deactivate() { - this.webSocketSession.close(); + if (this.webSocketSession != null) { + this.webSocketSession.close(); + this.webSocketSession = null; + } } } diff --git a/spring-websocket/src/main/java/org/springframework/sockjs/server/transport/WebSocketSockJsHandlerAdapter.java b/spring-websocket/src/main/java/org/springframework/sockjs/server/transport/WebSocketSockJsHandlerAdapter.java index 9ec2a9e386a..1c266860e07 100644 --- a/spring-websocket/src/main/java/org/springframework/sockjs/server/transport/WebSocketSockJsHandlerAdapter.java +++ b/spring-websocket/src/main/java/org/springframework/sockjs/server/transport/WebSocketSockJsHandlerAdapter.java @@ -17,10 +17,16 @@ package org.springframework.sockjs.server.transport; import java.io.IOException; +import java.io.InputStream; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.springframework.sockjs.SockJsHandler; import org.springframework.sockjs.SockJsSessionSupport; import org.springframework.sockjs.server.SockJsConfiguration; +import org.springframework.util.Assert; import org.springframework.websocket.WebSocketHandler; import org.springframework.websocket.WebSocketSession; @@ -33,22 +39,78 @@ import org.springframework.websocket.WebSocketSession; * @author Rossen Stoyanchev * @since 4.0 */ -public class WebSocketSockJsHandlerAdapter extends AbstractSockJsWebSocketHandler { +public class WebSocketSockJsHandlerAdapter implements WebSocketHandler { + + private static final Log logger = LogFactory.getLog(WebSocketSockJsHandlerAdapter.class); + + private final SockJsConfiguration sockJsConfig; + + private final SockJsHandler sockJsHandler; + + private final Map sessions = + new ConcurrentHashMap(); public WebSocketSockJsHandlerAdapter(SockJsConfiguration sockJsConfig, SockJsHandler sockJsHandler) { - super(sockJsConfig, sockJsHandler); + Assert.notNull(sockJsConfig, "sockJsConfig is required"); + Assert.notNull(sockJsHandler, "sockJsHandler is required"); + this.sockJsConfig = sockJsConfig; + this.sockJsHandler = sockJsHandler; + } + + protected SockJsConfiguration getSockJsConfig() { + return this.sockJsConfig; + } + + protected SockJsHandler getSockJsHandler() { + return this.sockJsHandler; + } + + protected SockJsSessionSupport getSockJsSession(WebSocketSession wsSession) { + return this.sessions.get(wsSession); + } + + @Override + public void newSession(WebSocketSession wsSession) throws Exception { + if (logger.isDebugEnabled()) { + logger.debug("New session: " + wsSession); + } + SockJsSessionSupport session = new WebSocketSessionAdapter(wsSession); + this.sessions.put(wsSession, session); } @Override - protected SockJsSessionSupport createSockJsSession(WebSocketSession wsSession) throws Exception { - return new WebSocketSessionAdapter(wsSession); + public void handleTextMessage(WebSocketSession wsSession, String message) throws Exception { + if (logger.isTraceEnabled()) { + logger.trace("Received payload " + message); + } + SockJsSessionSupport session = getSockJsSession(wsSession); + session.delegateMessages(message); + } + + @Override + public void handleBinaryMessage(WebSocketSession session, InputStream message) throws Exception { + // should not happen + throw new UnsupportedOperationException(); + } + + @Override + public void handleException(WebSocketSession webSocketSession, Throwable exception) { + SockJsSessionSupport session = getSockJsSession(webSocketSession); + session.delegateException(exception); + } + + @Override + public void sessionClosed(WebSocketSession webSocketSession, int statusCode, String reason) throws Exception { + logger.debug("WebSocket session closed " + webSocketSession); + SockJsSessionSupport session = this.sessions.remove(webSocketSession); + session.connectionClosed(); } private class WebSocketSessionAdapter extends SockJsSessionSupport { - private final WebSocketSession wsSession; + private WebSocketSession wsSession; public WebSocketSessionAdapter(WebSocketSession wsSession) throws Exception { @@ -67,11 +129,20 @@ public class WebSocketSockJsHandlerAdapter extends AbstractSockJsWebSocketHandle this.wsSession.sendText(message); } + @Override + public void connectionClosed() { + logger.debug("Session closed"); + super.connectionClosed(); + this.wsSession = null; + } + + @Override public void close() { if (!isClosed()) { logger.debug("Closing session"); super.close(); this.wsSession.close(); + this.wsSession = null; } } } diff --git a/spring-websocket/src/main/java/org/springframework/websocket/WebSocketSession.java b/spring-websocket/src/main/java/org/springframework/websocket/WebSocketSession.java index 2abe644a5a4..a9af7d7d7b9 100644 --- a/spring-websocket/src/main/java/org/springframework/websocket/WebSocketSession.java +++ b/spring-websocket/src/main/java/org/springframework/websocket/WebSocketSession.java @@ -27,6 +27,8 @@ import java.io.IOException; */ public interface WebSocketSession { + String getId(); + boolean isOpen(); void sendText(String text) throws IOException; diff --git a/spring-websocket/src/main/java/org/springframework/websocket/endpoint/StandardWebSocketSession.java b/spring-websocket/src/main/java/org/springframework/websocket/endpoint/StandardWebSocketSession.java index dc4126b168f..546f16c96e4 100644 --- a/spring-websocket/src/main/java/org/springframework/websocket/endpoint/StandardWebSocketSession.java +++ b/spring-websocket/src/main/java/org/springframework/websocket/endpoint/StandardWebSocketSession.java @@ -40,6 +40,11 @@ public class StandardWebSocketSession implements WebSocketSession { this.session = session; } + @Override + public String getId() { + return this.session.getId(); + } + @Override public boolean isOpen() { return ((this.session != null) && this.session.isOpen()); diff --git a/spring-websocket/src/main/java/org/springframework/websocket/endpoint/WebSocketHandlerEndpoint.java b/spring-websocket/src/main/java/org/springframework/websocket/endpoint/WebSocketHandlerEndpoint.java index b70076b3d8f..7680f24ef3a 100644 --- a/spring-websocket/src/main/java/org/springframework/websocket/endpoint/WebSocketHandlerEndpoint.java +++ b/spring-websocket/src/main/java/org/springframework/websocket/endpoint/WebSocketHandlerEndpoint.java @@ -58,7 +58,7 @@ public class WebSocketHandlerEndpoint extends Endpoint { try { WebSocketSession webSocketSession = new StandardWebSocketSession(session); this.sessions.put(session.getId(), webSocketSession); - session.addMessageHandler(new StandardMessageHandler(session.getId())); + session.addMessageHandler(new StandardMessageHandler(session)); this.webSocketHandler.newSession(webSocketSession); } catch (Throwable ex) { @@ -69,16 +69,19 @@ public class WebSocketHandlerEndpoint extends Endpoint { @Override public void onClose(javax.websocket.Session session, CloseReason closeReason) { - String id = session.getId(); if (logger.isDebugEnabled()) { - logger.debug("Closing session: " + session + ", " + closeReason); + logger.debug("Session closed: " + session + ", " + closeReason); } try { - WebSocketSession webSocketSession = getSession(id); - this.sessions.remove(id); - int code = closeReason.getCloseCode().getCode(); - String reason = closeReason.getReasonPhrase(); - this.webSocketHandler.sessionClosed(webSocketSession, code, reason); + WebSocketSession wsSession = this.sessions.remove(session.getId()); + if (wsSession != null) { + int code = closeReason.getCloseCode().getCode(); + String reason = closeReason.getReasonPhrase(); + this.webSocketHandler.sessionClosed(wsSession, code, reason); + } + else { + Assert.notNull(wsSession, "No WebSocket session"); + } } catch (Throwable ex) { // TODO @@ -90,8 +93,13 @@ public class WebSocketHandlerEndpoint extends Endpoint { public void onError(javax.websocket.Session session, Throwable exception) { logger.error("Error for WebSocket session: " + session.getId(), exception); try { - WebSocketSession webSocketSession = getSession(session.getId()); - this.webSocketHandler.handleException(webSocketSession, exception); + WebSocketSession wsSession = getWebSocketSession(session); + if (wsSession != null) { + this.webSocketHandler.handleException(wsSession, exception); + } + else { + logger.warn("WebSocketSession not found. Perhaps onError was called after onClose?"); + } } catch (Throwable ex) { // TODO @@ -99,29 +107,28 @@ public class WebSocketHandlerEndpoint extends Endpoint { } } - private WebSocketSession getSession(String sourceSessionId) { - WebSocketSession webSocketSession = this.sessions.get(sourceSessionId); - Assert.notNull(webSocketSession, "No session"); - return webSocketSession; + private WebSocketSession getWebSocketSession(javax.websocket.Session session) { + return this.sessions.get(session.getId()); } private class StandardMessageHandler implements MessageHandler.Whole { - private final String sessionId; + private final javax.websocket.Session session; - public StandardMessageHandler(String sessionId) { - this.sessionId = sessionId; + public StandardMessageHandler(javax.websocket.Session session) { + this.session = session; } @Override public void onMessage(String message) { if (logger.isTraceEnabled()) { - logger.trace("Message for session [" + this.sessionId + "]: " + message); + logger.trace("Message for session [" + this.session + "]: " + message); } + WebSocketSession wsSession = getWebSocketSession(this.session); + Assert.notNull(wsSession, "WebSocketSession not found"); try { - WebSocketSession session = getSession(this.sessionId); - WebSocketHandlerEndpoint.this.webSocketHandler.handleTextMessage(session, message); + WebSocketHandlerEndpoint.this.webSocketHandler.handleTextMessage(wsSession, message); } catch (Throwable ex) { // TODO