Browse Source

Use TaskScheduler instead of ThreadPoolTaskScheduler

Closes gh-22943
pull/23031/head
Rossen Stoyanchev 7 years ago
parent
commit
0b2fcbfe8a
  1. 3
      spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java
  2. 27
      spring-websocket/src/main/java/org/springframework/web/socket/config/WebSocketMessageBrokerStats.java

3
spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java

@ -53,6 +53,7 @@ import org.springframework.messaging.simp.user.UserRegistryMessageHandler; @@ -53,6 +53,7 @@ import org.springframework.messaging.simp.user.UserRegistryMessageHandler;
import org.springframework.messaging.support.AbstractSubscribableChannel;
import org.springframework.messaging.support.ExecutorSubscribableChannel;
import org.springframework.messaging.support.ImmutableMessageChannelInterceptor;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.Assert;
@ -360,7 +361,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC @@ -360,7 +361,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
// Expose alias for 4.1 compatibility
@Bean(name = {"messageBrokerTaskScheduler", "messageBrokerSockJsTaskScheduler"})
public ThreadPoolTaskScheduler messageBrokerTaskScheduler() {
public TaskScheduler messageBrokerTaskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("MessageBroker-");
scheduler.setPoolSize(Runtime.getRuntime().availableProcessors());

27
spring-websocket/src/main/java/org/springframework/web/socket/config/WebSocketMessageBrokerStats.java

@ -16,9 +16,10 @@ @@ -16,9 +16,10 @@
package org.springframework.web.socket.config;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -27,6 +28,7 @@ import org.apache.commons.logging.LogFactory; @@ -27,6 +28,7 @@ import org.apache.commons.logging.LogFactory;
import org.springframework.lang.Nullable;
import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.socket.messaging.StompSubProtocolHandler;
@ -70,7 +72,7 @@ public class WebSocketMessageBrokerStats { @@ -70,7 +72,7 @@ public class WebSocketMessageBrokerStats {
private ThreadPoolExecutor outboundChannelExecutor;
@Nullable
private ScheduledThreadPoolExecutor sockJsTaskScheduler;
private TaskScheduler sockJsTaskScheduler;
@Nullable
private ScheduledFuture<?> loggingTask;
@ -112,17 +114,17 @@ public class WebSocketMessageBrokerStats { @@ -112,17 +114,17 @@ public class WebSocketMessageBrokerStats {
this.outboundChannelExecutor = outboundChannelExecutor.getThreadPoolExecutor();
}
public void setSockJsTaskScheduler(ThreadPoolTaskScheduler sockJsTaskScheduler) {
this.sockJsTaskScheduler = sockJsTaskScheduler.getScheduledThreadPoolExecutor();
public void setSockJsTaskScheduler(TaskScheduler sockJsTaskScheduler) {
this.sockJsTaskScheduler = sockJsTaskScheduler;
this.loggingTask = initLoggingTask(TimeUnit.MINUTES.toMillis(1));
}
@Nullable
private ScheduledFuture<?> initLoggingTask(long initialDelay) {
if (this.sockJsTaskScheduler != null && this.loggingPeriod > 0 && logger.isInfoEnabled()) {
return this.sockJsTaskScheduler.scheduleAtFixedRate(() ->
logger.info(WebSocketMessageBrokerStats.this.toString()),
initialDelay, this.loggingPeriod, TimeUnit.MILLISECONDS);
return this.sockJsTaskScheduler.scheduleWithFixedDelay(
() -> logger.info(WebSocketMessageBrokerStats.this.toString()),
Instant.now().plusMillis(initialDelay), Duration.ofMillis(this.loggingPeriod));
}
return null;
}
@ -186,7 +188,16 @@ public class WebSocketMessageBrokerStats { @@ -186,7 +188,16 @@ public class WebSocketMessageBrokerStats {
* Get stats about the SockJS task scheduler.
*/
public String getSockJsTaskSchedulerStatsInfo() {
return (this.sockJsTaskScheduler != null ? getExecutorStatsInfo(this.sockJsTaskScheduler) : "null");
if (this.sockJsTaskScheduler == null) {
return "null";
}
if (this.sockJsTaskScheduler instanceof ThreadPoolTaskScheduler) {
return getExecutorStatsInfo(((ThreadPoolTaskScheduler) this.sockJsTaskScheduler)
.getScheduledThreadPoolExecutor());
}
else {
return "unknown";
}
}
private String getExecutorStatsInfo(Executor executor) {

Loading…
Cancel
Save