From 6cf17449fa1014c378fe101fc1cb4c3670984817 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 4 Apr 2013 15:48:59 -0400 Subject: [PATCH] Add endpoint connection manager --- .../AbstractEndpointConnectionManager.java | 227 ++++++++++++++++++ .../AnnotatedEndpointConnectionManager.java | 46 ++++ .../client/EndpointConnectionManager.java | 78 ++++++ .../server/endpoint/EndpointExporter.java | 15 +- .../server/endpoint/EndpointRegistration.java | 28 +-- .../endpoint/ServletEndpointExporter.java | 26 +- .../server/endpoint/SpringConfigurator.java | 21 +- 7 files changed, 402 insertions(+), 39 deletions(-) create mode 100644 spring-websocket/src/main/java/org/springframework/websocket/client/AbstractEndpointConnectionManager.java create mode 100644 spring-websocket/src/main/java/org/springframework/websocket/client/AnnotatedEndpointConnectionManager.java create mode 100644 spring-websocket/src/main/java/org/springframework/websocket/client/EndpointConnectionManager.java diff --git a/spring-websocket/src/main/java/org/springframework/websocket/client/AbstractEndpointConnectionManager.java b/spring-websocket/src/main/java/org/springframework/websocket/client/AbstractEndpointConnectionManager.java new file mode 100644 index 00000000000..1cb0d35102e --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/websocket/client/AbstractEndpointConnectionManager.java @@ -0,0 +1,227 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.websocket.client; + +import java.io.IOException; +import java.net.URI; + +import javax.websocket.ContainerProvider; +import javax.websocket.DeploymentException; +import javax.websocket.Session; +import javax.websocket.WebSocketContainer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.context.SmartLifecycle; +import org.springframework.core.task.SimpleAsyncTaskExecutor; +import org.springframework.core.task.TaskExecutor; +import org.springframework.util.Assert; +import org.springframework.web.util.UriComponentsBuilder; + + +/** + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public abstract class AbstractEndpointConnectionManager implements ApplicationContextAware, SmartLifecycle { + + protected final Log logger = LogFactory.getLog(getClass()); + + private final Class endpointClass; + + private final Object endpointBean; + + private final URI uri; + + private boolean autoStartup = false; + + private int phase = Integer.MAX_VALUE; + + private final WebSocketContainer webSocketContainer = ContainerProvider.getWebSocketContainer(); + + private Session session; + + private ApplicationContext applicationContext; + + private TaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("EndpointConnectionManager-"); + + private final Object lifecycleMonitor = new Object(); + + + public AbstractEndpointConnectionManager(Class endpointClass, String uriTemplate, Object... uriVariables) { + Assert.notNull(endpointClass, "endpointClass is required"); + this.endpointClass = endpointClass; + this.endpointBean = null; + this.uri = initUri(uriTemplate, uriVariables); + } + + public AbstractEndpointConnectionManager(Object endpointBean, String uriTemplate, Object... uriVariables) { + Assert.notNull(endpointBean, "endpointBean is required"); + this.endpointClass = null; + this.endpointBean = endpointBean; + this.uri = initUri(uriTemplate, uriVariables); + } + + private static URI initUri(String uri, Object... uriVariables) { + return UriComponentsBuilder.fromUriString(uri).buildAndExpand(uriVariables).encode().toUri(); + } + + public void setAsyncSendTimeout(long timeoutInMillis) { + this.webSocketContainer.setAsyncSendTimeout(timeoutInMillis); + } + + public void setMaxSessionIdleTimeout(long timeoutInMillis) { + this.webSocketContainer.setDefaultMaxSessionIdleTimeout(timeoutInMillis); + } + + public void setMaxTextMessageBufferSize(int bufferSize) { + this.webSocketContainer.setDefaultMaxTextMessageBufferSize(bufferSize); + } + + public void setMaxBinaryMessageBufferSize(Integer bufferSize) { + this.webSocketContainer.setDefaultMaxBinaryMessageBufferSize(bufferSize); + } + + /** + * Set whether to auto-connect to the {@link #setDefaultUri(URI) default URI} after + * this endpoint connection factory has been initialized and the Spring context has + * been refreshed. + *

Default is "false". + */ + public void setAutoStartup(boolean autoStartup) { + this.autoStartup = autoStartup; + } + + /** + * Return the value for the 'autoStartup' property. If "true", this endpoint + * connection factory will connect to the {@link #setDefaultUri(URI) default URI} upon + * a ContextRefreshedEvent. + */ + public boolean isAutoStartup() { + return this.autoStartup; + } + + /** + * Specify the phase in which this endpoint connection factory should be + * auto-connected and closed. The startup order proceeds from lowest to highest, and + * the shutdown order is the reverse of that. By default this value is + * Integer.MAX_VALUE meaning that this endpoint connection factory connects as late as + * possible and is closed as soon as possible. + */ + public void setPhase(int phase) { + this.phase = phase; + } + + /** + * Return the phase in which this endpoint connection factory will be auto-connected + * and stopped. + */ + public int getPhase() { + return this.phase; + } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + } + + protected URI getUri() { + return this.uri; + } + + protected WebSocketContainer getWebSocketContainer() { + return this.webSocketContainer; + } + + protected Object getEndpoint() { + if (this.endpointClass != null) { + return this.applicationContext.getAutowireCapableBeanFactory().createBean(this.endpointClass); + } + else { + return this.endpointBean; + } + } + + /** + * Auto-connects to the configured {@link #setDefaultUri(URI) default URI}. + */ + public void start() { + synchronized (this.lifecycleMonitor) { + if (!isRunning()) { + this.taskExecutor.execute(new Runnable() { + @Override + public void run() { + synchronized (lifecycleMonitor) { + try { + logger.info("Connecting to endpoint at URI " + uri); + session = connect(getEndpoint()); + logger.info("Successfully connected"); + } + catch (Exception ex) { + logger.error("Failed to connect to endpoint at " + uri, ex); + } + } + } + }); + } + } + } + + protected abstract Session connect(Object endpoint) throws DeploymentException, IOException; + + /** + * Deactivates the configured message endpoint. + */ + public void stop() { + synchronized (this.lifecycleMonitor) { + if (isRunning()) { + try { + this.session.close(); + } + catch (IOException e) { + // ignore + } + } + this.session = null; + } + } + + public void stop(Runnable callback) { + synchronized (this.lifecycleMonitor) { + this.stop(); + callback.run(); + } + } + + /** + * Return whether the configured message endpoint is currently active. + */ + public boolean isRunning() { + synchronized (this.lifecycleMonitor) { + if ((this.session != null) && this.session.isOpen()) { + return true; + } + this.session = null; + return false; + } + } + +} diff --git a/spring-websocket/src/main/java/org/springframework/websocket/client/AnnotatedEndpointConnectionManager.java b/spring-websocket/src/main/java/org/springframework/websocket/client/AnnotatedEndpointConnectionManager.java new file mode 100644 index 00000000000..ccc4c3f4a0a --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/websocket/client/AnnotatedEndpointConnectionManager.java @@ -0,0 +1,46 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.websocket.client; + +import java.io.IOException; + +import javax.websocket.DeploymentException; +import javax.websocket.Session; + + +/** + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class AnnotatedEndpointConnectionManager extends AbstractEndpointConnectionManager { + + + public AnnotatedEndpointConnectionManager(Class endpointClass, String uriTemplate, Object... uriVariables) { + super(endpointClass, uriTemplate, uriVariables); + } + + public AnnotatedEndpointConnectionManager(Object endpointBean, String uriTemplate, Object... uriVariables) { + super(endpointBean, uriTemplate, uriVariables); + } + + @Override + protected Session connect(Object endpoint) throws DeploymentException, IOException { + return getWebSocketContainer().connectToServer(endpoint, getUri()); + } + +} diff --git a/spring-websocket/src/main/java/org/springframework/websocket/client/EndpointConnectionManager.java b/spring-websocket/src/main/java/org/springframework/websocket/client/EndpointConnectionManager.java new file mode 100644 index 00000000000..472a9df942e --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/websocket/client/EndpointConnectionManager.java @@ -0,0 +1,78 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.websocket.client; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import javax.websocket.ClientEndpointConfig; +import javax.websocket.ClientEndpointConfig.Configurator; +import javax.websocket.Decoder; +import javax.websocket.DeploymentException; +import javax.websocket.Encoder; +import javax.websocket.Endpoint; +import javax.websocket.Extension; +import javax.websocket.Session; + + +/** + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class EndpointConnectionManager extends AbstractEndpointConnectionManager { + + private final ClientEndpointConfig.Builder configBuilder = ClientEndpointConfig.Builder.create(); + + + public EndpointConnectionManager(Class endpointClass, String uriTemplate, Object... uriVariables) { + super(endpointClass, uriTemplate, uriVariables); + } + + public EndpointConnectionManager(Endpoint endpointBean, String uriTemplate, Object... uriVariables) { + super(endpointBean, uriTemplate, uriVariables); + } + + public void setSubProtocols(String... subprotocols) { + this.configBuilder.preferredSubprotocols(Arrays.asList(subprotocols)); + } + + public void setExtensions(Extension... extensions) { + this.configBuilder.extensions(Arrays.asList(extensions)); + } + + public void setEncoders(List> encoders) { + this.configBuilder.encoders(encoders); + } + + public void setDecoders(List> decoders) { + this.configBuilder.decoders(decoders); + } + + public void setConfigurator(Configurator configurator) { + this.configBuilder.configurator(configurator); + } + + @Override + protected Session connect(Object endpoint) throws DeploymentException, IOException { + Endpoint typedEndpoint = (Endpoint) endpoint; + ClientEndpointConfig endpointConfig = this.configBuilder.build(); + return getWebSocketContainer().connectToServer(typedEndpoint, endpointConfig, getUri()); + } + +} diff --git a/spring-websocket/src/main/java/org/springframework/websocket/server/endpoint/EndpointExporter.java b/spring-websocket/src/main/java/org/springframework/websocket/server/endpoint/EndpointExporter.java index 9c287e7071f..0596a27a81b 100644 --- a/spring-websocket/src/main/java/org/springframework/websocket/server/endpoint/EndpointExporter.java +++ b/spring-websocket/src/main/java/org/springframework/websocket/server/endpoint/EndpointExporter.java @@ -19,7 +19,6 @@ import java.util.Map; import javax.websocket.DeploymentException; import javax.websocket.server.ServerContainer; -import javax.websocket.server.ServerContainerProvider; import javax.websocket.server.ServerEndpoint; import javax.websocket.server.ServerEndpointConfig; @@ -44,7 +43,7 @@ import org.springframework.util.ObjectUtils; * @author Rossen Stoyanchev * @since 4.0 */ -public class EndpointExporter implements InitializingBean, BeanPostProcessor, BeanFactoryAware { +public abstract class EndpointExporter implements InitializingBean, BeanPostProcessor, BeanFactoryAware { private static Log logger = LogFactory.getLog(EndpointExporter.class); @@ -112,7 +111,7 @@ public class EndpointExporter implements InitializingBean, BeanPostProcessor, Be if (logger.isInfoEnabled()) { logger.info("Detected @ServerEndpoint bean '" + beanName + "', registering it as an endpoint by type"); } - ServerContainerProvider.getServerContainer().addEndpoint(beanType); + getServerContainer().addEndpoint(beanType); } catch (DeploymentException e) { throw new IllegalStateException("Failed to register @ServerEndpoint bean type " + beanName, e); @@ -121,10 +120,16 @@ public class EndpointExporter implements InitializingBean, BeanPostProcessor, Be } } + /** + * Return the {@link ServerContainer} instance, a process which is undefined outside + * of standalone containers (section 6.4 of the spec). + */ + protected abstract ServerContainer getServerContainer(); + @Override public void afterPropertiesSet() throws Exception { - ServerContainer serverContainer = ServerContainerProvider.getServerContainer(); + ServerContainer serverContainer = getServerContainer(); Assert.notNull(serverContainer, "javax.websocket.server.ServerContainer not available"); if (this.maxSessionIdleTimeout != null) { @@ -159,7 +164,7 @@ public class EndpointExporter implements InitializingBean, BeanPostProcessor, Be logger.info("Registering bean '" + beanName + "' as javax.websocket.Endpoint under path " + sec.getPath()); } - ServerContainerProvider.getServerContainer().addEndpoint(sec); + getServerContainer().addEndpoint(sec); } catch (DeploymentException e) { throw new IllegalStateException("Failed to deploy Endpoint bean " + bean, e); diff --git a/spring-websocket/src/main/java/org/springframework/websocket/server/endpoint/EndpointRegistration.java b/spring-websocket/src/main/java/org/springframework/websocket/server/endpoint/EndpointRegistration.java index 385f8ece2b9..12367774fb6 100644 --- a/spring-websocket/src/main/java/org/springframework/websocket/server/endpoint/EndpointRegistration.java +++ b/spring-websocket/src/main/java/org/springframework/websocket/server/endpoint/EndpointRegistration.java @@ -37,8 +37,6 @@ import org.springframework.util.Assert; import org.springframework.util.ClassUtils; import org.springframework.web.context.ContextLoader; import org.springframework.web.context.WebApplicationContext; -import org.springframework.websocket.WebSocketHandler; -import org.springframework.websocket.endpoint.StandardWebSocketHandlerAdapter; /** @@ -60,7 +58,7 @@ public class EndpointRegistration implements ServerEndpointConfig, BeanFactoryAw private final Class endpointClass; - private final Object bean; + private final Object endpointBean; private List subprotocols = new ArrayList(); @@ -97,9 +95,7 @@ public class EndpointRegistration implements ServerEndpointConfig, BeanFactoryAw Assert.isTrue((endpointClass != null || bean != null), "Neither endpoint class nor endpoint bean provided"); this.path = path; this.endpointClass = endpointClass; - this.bean = bean; - // this will fail if the bean is not a valid Endpoint type - getEndpointClass(); + this.endpointBean = bean; } @Override @@ -113,17 +109,14 @@ public class EndpointRegistration implements ServerEndpointConfig, BeanFactoryAw if (this.endpointClass != null) { return this.endpointClass; } - Class beanClass = this.bean.getClass(); + Class beanClass = this.endpointBean.getClass(); if (beanClass.equals(String.class)) { - beanClass = this.beanFactory.getType((String) this.bean); + beanClass = this.beanFactory.getType((String) this.endpointBean); } beanClass = ClassUtils.getUserClass(beanClass); if (Endpoint.class.isAssignableFrom(beanClass)) { return (Class) beanClass; } - else if (WebSocketHandler.class.isAssignableFrom(beanClass)) { - return StandardWebSocketHandlerAdapter.class; - } else { throw new IllegalStateException("Invalid endpoint bean: must be of type ... TODO "); } @@ -138,16 +131,11 @@ public class EndpointRegistration implements ServerEndpointConfig, BeanFactoryAw } return wac.getAutowireCapableBeanFactory().createBean(this.endpointClass); } - Object bean = this.bean; - if (this.bean instanceof String) { - bean = this.beanFactory.getBean((String) this.bean); - } - if (bean instanceof WebSocketHandler) { - return new StandardWebSocketHandlerAdapter((WebSocketHandler) bean); - } - else { - return (Endpoint) bean; + Object bean = this.endpointBean; + if (this.endpointBean instanceof String) { + bean = this.beanFactory.getBean((String) this.endpointBean); } + return (Endpoint) bean; } @Override diff --git a/spring-websocket/src/main/java/org/springframework/websocket/server/endpoint/ServletEndpointExporter.java b/spring-websocket/src/main/java/org/springframework/websocket/server/endpoint/ServletEndpointExporter.java index 2e36774f796..d62b2f44c6a 100644 --- a/spring-websocket/src/main/java/org/springframework/websocket/server/endpoint/ServletEndpointExporter.java +++ b/spring-websocket/src/main/java/org/springframework/websocket/server/endpoint/ServletEndpointExporter.java @@ -17,18 +17,25 @@ package org.springframework.websocket.server.endpoint; import javax.servlet.ServletContext; +import javax.websocket.server.ServerContainer; +import javax.websocket.server.ServerContainerProvider; -import org.apache.tomcat.websocket.server.WsServerContainer; +import org.springframework.util.Assert; import org.springframework.web.context.ServletContextAware; /** + * A sub-class of {@link EndpointExporter} for use with a Servlet container runtime. * * @author Rossen Stoyanchev * @since 4.0 */ public class ServletEndpointExporter extends EndpointExporter implements ServletContextAware { + /** + * + */ + private static final String SERVER_CONTAINER_ATTR_NAME = "javax.websocket.server.ServerContainer"; private ServletContext servletContext; @@ -38,17 +45,18 @@ public class ServletEndpointExporter extends EndpointExporter implements Servlet } public ServletContext getServletContext() { - return servletContext; + return this.servletContext; } @Override - public void afterPropertiesSet() throws Exception { - - // TODO: this is needed (see WsListener) but remove hard dependency - WsServerContainer sc = WsServerContainer.getServerContainer(); - sc.setServletContext(this.servletContext); - - super.afterPropertiesSet(); + protected ServerContainer getServerContainer() { + Assert.notNull(this.servletContext, "A ServletContext is needed to access the WebSocket ServerContainer"); + ServerContainer container = (ServerContainer) this.servletContext.getAttribute(SERVER_CONTAINER_ATTR_NAME); + if (container == null) { + // Remove when Tomcat has caught up to http://java.net/jira/browse/WEBSOCKET_SPEC-165 + return ServerContainerProvider.getServerContainer(); + } + return container; } } diff --git a/spring-websocket/src/main/java/org/springframework/websocket/server/endpoint/SpringConfigurator.java b/spring-websocket/src/main/java/org/springframework/websocket/server/endpoint/SpringConfigurator.java index d412fd9e299..6ed65012e67 100644 --- a/spring-websocket/src/main/java/org/springframework/websocket/server/endpoint/SpringConfigurator.java +++ b/spring-websocket/src/main/java/org/springframework/websocket/server/endpoint/SpringConfigurator.java @@ -21,6 +21,8 @@ import java.util.Map; import javax.websocket.server.ServerEndpoint; import javax.websocket.server.ServerEndpointConfig.Configurator; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.springframework.web.context.ContextLoader; import org.springframework.web.context.WebApplicationContext; @@ -35,27 +37,36 @@ import org.springframework.web.context.WebApplicationContext; */ public class SpringConfigurator extends Configurator { + private static Log logger = LogFactory.getLog(SpringConfigurator.class); + @Override public T getEndpointInstance(Class endpointClass) throws InstantiationException { WebApplicationContext wac = ContextLoader.getCurrentWebApplicationContext(); if (wac == null) { - throw new IllegalStateException("Failed to find WebApplicationContext. " - + "Was org.springframework.web.context.ContextLoader used to load the WebApplicationContext?"); + String message = "Failed to find the root WebApplicationContext. Was ContextLoaderListener not used?"; + logger.error(message); + throw new IllegalStateException(message); } Map beans = wac.getBeansOfType(endpointClass); if (beans.isEmpty()) { - // Initialize a new bean instance + if (logger.isTraceEnabled()) { + logger.trace("Creating new @ServerEndpoint instance of type " + endpointClass); + } return wac.getAutowireCapableBeanFactory().createBean(endpointClass); } if (beans.size() == 1) { - // Return the matching bean instance + if (logger.isTraceEnabled()) { + logger.trace("Using @ServerEndpoint singleton " + beans.keySet().iterator().next()); + } return beans.values().iterator().next(); } else { - // This should never happen (@ServerEndpoint has a single path mapping) .. + // This should never happen .. + String message = "Found more than one matching @ServerEndpoint beans of type " + endpointClass; + logger.error(message); throw new IllegalStateException("Found more than one matching beans of type " + endpointClass); } }