@ -18,34 +18,53 @@ package org.springframework.messaging.simp.config;
import org.springframework.lang.Nullable ;
import org.springframework.lang.Nullable ;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor ;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor ;
import org.springframework.util.Assert ;
/ * *
/ * *
* A registration class for customizing the properties of { @link ThreadPoolTaskExecutor } .
* A registration class for customizing the properties of { @link ThreadPoolTaskExecutor } .
*
*
* @author Rossen Stoyanchev
* @author Rossen Stoyanchev
* @author Juergen Hoeller
* @since 4 . 0
* @since 4 . 0
* /
* /
public class TaskExecutorRegistration {
public class TaskExecutorRegistration {
@Nullable
private final ThreadPoolTaskExecutor taskExecutor ;
private ThreadPoolTaskExecutor taskExecutor ;
private int corePoolSize = Runtime . getRuntime ( ) . availableProcessors ( ) * 2 ;
@Nullable
private Integer corePoolSize ;
private int maxPoolSize = Integer . MAX_VALUE ;
@Nullable
private Integer maxPoolSize ;
private int queueCapacity = Integer . MAX_VALUE ;
@Nullable
private Integer keepAliveSeconds ;
private int keepAliveSeconds = 60 ;
@Nullable
private Integer queueCapacity ;
/ * *
* Create a new { @code TaskExecutorRegistration } for a default
* { @link ThreadPoolTaskExecutor } .
* /
public TaskExecutorRegistration ( ) {
public TaskExecutorRegistration ( ) {
this . taskExecutor = new ThreadPoolTaskExecutor ( ) ;
this . taskExecutor . setCorePoolSize ( Runtime . getRuntime ( ) . availableProcessors ( ) * 2 ) ;
this . taskExecutor . setAllowCoreThreadTimeOut ( true ) ;
}
}
/ * *
* Create a new { @code TaskExecutorRegistration } for a given
* { @link ThreadPoolTaskExecutor } .
* @param taskExecutor the executor to use
* /
public TaskExecutorRegistration ( ThreadPoolTaskExecutor taskExecutor ) {
public TaskExecutorRegistration ( ThreadPoolTaskExecutor taskExecutor ) {
Assert . notNull ( taskExecutor , "ThreadPoolTaskExecutor must not be null" ) ;
this . taskExecutor = taskExecutor ;
this . taskExecutor = taskExecutor ;
}
}
/ * *
/ * *
* Set the core pool size of the ThreadPoolExecutor .
* Set the core pool size of the ThreadPoolExecutor .
* < p > < strong > NOTE : < / strong > The core pool size is effectively the max pool size
* < p > < strong > NOTE : < / strong > The core pool size is effectively the max pool size
@ -77,6 +96,18 @@ public class TaskExecutorRegistration {
return this ;
return this ;
}
}
/ * *
* Set the time limit for which threads may remain idle before being terminated .
* If there are more than the core number of threads currently in the pool ,
* after waiting this amount of time without processing a task , excess threads
* will be terminated . This overrides any value set in the constructor .
* < p > By default this is set to 60 .
* /
public TaskExecutorRegistration keepAliveSeconds ( int keepAliveSeconds ) {
this . keepAliveSeconds = keepAliveSeconds ;
return this ;
}
/ * *
/ * *
* Set the queue capacity for the ThreadPoolExecutor .
* Set the queue capacity for the ThreadPoolExecutor .
* < p > < strong > NOTE : < / strong > when an unbounded { @code queueCapacity } is configured
* < p > < strong > NOTE : < / strong > when an unbounded { @code queueCapacity } is configured
@ -91,26 +122,21 @@ public class TaskExecutorRegistration {
return this ;
return this ;
}
}
/ * *
* Set the time limit for which threads may remain idle before being terminated .
* If there are more than the core number of threads currently in the pool ,
* after waiting this amount of time without processing a task , excess threads
* will be terminated . This overrides any value set in the constructor .
* < p > By default this is set to 60 .
* /
public TaskExecutorRegistration keepAliveSeconds ( int keepAliveSeconds ) {
this . keepAliveSeconds = keepAliveSeconds ;
return this ;
}
protected ThreadPoolTaskExecutor getTaskExecutor ( ) {
protected ThreadPoolTaskExecutor getTaskExecutor ( ) {
ThreadPoolTaskExecutor executor = ( this . taskExecutor ! = null ? this . taskExecutor : new ThreadPoolTaskExecutor ( ) ) ;
if ( this . corePoolSize ! = null ) {
executor . setCorePoolSize ( this . corePoolSize ) ;
this . taskExecutor . setCorePoolSize ( this . corePoolSize ) ;
executor . setMaxPoolSize ( this . maxPoolSize ) ;
}
executor . setKeepAliveSeconds ( this . keepAliveSeconds ) ;
if ( this . maxPoolSize ! = null ) {
executor . setQueueCapacity ( this . queueCapacity ) ;
this . taskExecutor . setMaxPoolSize ( this . maxPoolSize ) ;
executor . setAllowCoreThreadTimeOut ( true ) ;
}
return executor ;
if ( this . keepAliveSeconds ! = null ) {
this . taskExecutor . setKeepAliveSeconds ( this . keepAliveSeconds ) ;
}
if ( this . queueCapacity ! = null ) {
this . taskExecutor . setQueueCapacity ( this . queueCapacity ) ;
}
return this . taskExecutor ;
}
}
}
}