什么是动态线程池?
在线程池日常实践中我们常常会遇到以下问题:
代码中创建了一个线程池却不知道核心参数设置多少比较合适。
参数设置好后,上线发现需要调整,改代码重启服务非常麻烦。
线程池相对于开发人员来说是个黑箱,运行情况在出现问题 前很难被感知。
因此,动态可监控线程池一种针对以上痛点开发的线程池管理工具。主要可实现功能有:提供对 Spring 应用内线程池实例的全局管控、应用运行时动态变更线程池参数以及线程池数据采集和监控阈值报警。
已经实现的优秀开源动态线程池
hippo4j、dynamic-tp.....
实现思路
核心管理类
需要能实现对线程池的
服务注册
获取已经注册好的线程池
以及对注册号线程池参数的刷新。
对于每一个线程池,我们使用一个线程池名字作为标识每个线程池的唯一ID。
伪代码实现
publicclassDtpRegistry{ /** *储存线程池 */ privatestaticfinalMapEXECUTOR_MAP=newConcurrentHashMap<>(); /** *获取线程池 *@paramexecutorName线程池名字 */ publicstaticExecutorgetExecutor(StringexecutorName){ returnEXECUTOR_MAP.get(executorName); } /** *线程池注册 *@paramexecutorName线程池名字 */ publicstaticvoidregistry(StringexecutorName,Executorexecutor){ //注册 EXECUTOR_MAP.put(executorName,executorWrapper); } /** *刷新线程池参数 *@paramexecutorName线程池名字 *@paramproperties线程池参数 */ publicstaticvoidrefresh(StringexecutorName,ThreadPoolPropertiesproperties){ Executorexecutor=EXECUTOR_MAP.get(executorName) //刷新参数 //....... } }
如何创建线程池?
STEP 1. 我们可以使用yml配置文件的方式配置一个线程池,将线程池实例的创建交由Spring容器。
相关配置
publicclassDtpProperties{ privateListexecutors; } publicclassThreadPoolProperties{ /** *标识每个线程池的唯一名字 */ privateStringpoolName; privateStringpoolType="common"; /** *是否为守护线程 */ privatebooleanisDaemon=false; /** *以下都是核心参数 */ privateintcorePoolSize=1; privateintmaximumPoolSize=1; privatelongkeepAliveTime; privateTimeUnittimeUnit=TimeUnit.SECONDS; privateStringqueueType="arrayBlockingQueue"; privateintqueueSize=5; privateStringthreadFactoryPrefix="-td-"; privateStringRejectedExecutionHandler; }
yml example:
spring: dtp: executors: #线程池1 -poolName:dtpExecutor1 corePoolSize:5 maximumPoolSize:10 #线程池2 -poolName:dtpExecutor2 corePoolSize:2 maximumPoolSize:15
STEP 2 根据配置信息添加线程池的BeanDefinition
关键类
@Slf4j publicclassDtpImportBeanDefinitionRegistrarimplementsImportBeanDefinitionRegistrar,EnvironmentAware{ privateEnvironmentenvironment; @Override publicvoidregisterBeanDefinitions(AnnotationMetadataimportingClassMetadata,BeanDefinitionRegistryregistry){ log.info("注册"); //绑定资源 DtpPropertiesdtpProperties=newDtpProperties(); ResourceBundlerUtil.bind(environment,dtpProperties); Listexecutors=dtpProperties.getExecutors(); if(Objects.isNull(executors)){ log.info("未检测本地到配置文件线程池"); return; } //注册beanDefinition executors.forEach((executorProp)->{ BeanUtil.registerIfAbsent(registry,executorProp); }); } @Override publicvoidsetEnvironment(Environmentenvironment){ this.environment=environment; } } /** * *工具类 * */ publicclassBeanUtil{ publicstaticvoidregisterIfAbsent(BeanDefinitionRegistryregistry,ThreadPoolPropertiesexecutorProp){ register(registry,executorProp.getPoolName(),executorProp); } publicstaticvoidregister(BeanDefinitionRegistryregistry,StringbeanName,ThreadPoolPropertiesexecutorProp){ Class extends Executor>executorType=ExecutorType.getClazz(executorProp.getPoolType()); Object[]args=assembleArgs(executorProp); register(registry,beanName,executorType,args); } publicstaticvoidregister(BeanDefinitionRegistryregistry,StringbeanName,Class>clazz,Object[]args){ BeanDefinitionBuilderbuilder=BeanDefinitionBuilder.genericBeanDefinition(clazz); for(Objectarg:args){ builder.addConstructorArgValue(arg); } registry.registerBeanDefinition(beanName,builder.getBeanDefinition()); } privatestaticObject[]assembleArgs(ThreadPoolPropertiesexecutorProp){ returnnewObject[]{ executorProp.getCorePoolSize(), executorProp.getMaximumPoolSize(), executorProp.getKeepAliveTime(), executorProp.getTimeUnit(), QueueType.getInstance(executorProp.getQueueType(),executorProp.getQueueSize()), newNamedThreadFactory( executorProp.getPoolName()+executorProp.getThreadFactoryPrefix(), executorProp.isDaemon() ), //先默认不做设置 RejectPolicy.ABORT.getValue() }; } }
下面解释一下这个类的作用,environment实例中储存着spring启动时解析的yml配置,所以我们spring提供的Binder将配置绑定到我们前面定义的DtpProperties类中,方便后续使用。接下来的比较简单,就是将线程池的BeanDefinition注册到IOC容器中,让spring去帮我们实例化这个bean。
STEP 3. 将已经实例化的线程池注册到核心类 DtpRegistry 中
我们注册了 beanDefinition 后,spring会帮我们实例化出来, 在这之后我们可以根据需要将这个bean进行进一步的处理,spring也提供了很多机制让我们对bean的生命周期管理进行更多的扩展。对应到这里我们就是将实例化出来的线程池注册到核心类 DtpRegistry 中进行管理。
这里我们使用 BeanPostProcessor 进行处理。
@Slf4j publicclassDtpBeanPostProcessorimplementsBeanPostProcessor{ privateDefaultListableBeanFactorybeanFactory; @Override publicObjectpostProcessAfterInitialization(Objectbean,StringbeanName)throwsBeansException{ if(beaninstanceofDtpExecutor){ //直接纳入管理 DtpRegistry.registry(beanName,(DtpExecutor)bean); } returnbean; } }
这里的逻辑很简单, 就是判断一下这个bean是不是线程池,是就统一管理起来。
STEP 4. 启用 BeanDefinitionRegistrar 和 BeanPostProcessor
在springboot程序中,只要加一个@MapperScan注解就能启用mybatis的功能,我们可以学习其在spring中的启用方式,自定义一个注解:
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Import(DtpImportSelector.class) public@interfaceEnableDynamicThreadPool{ }
其中,比较关键的是@Import注解,spring会导入注解中的类DtpImportSelector
而DtpImportSelector这个类实现了:
publicclassDtpImportSelectorimplementsDeferredImportSelector{ @Override publicString[]selectImports(AnnotationMetadataimportingClassMetadata){ returnnewString[]{ DtpImportBeanDefinitionRegistrar.class.getName(), DtpBeanPostProcessor.class.getName() }; } }
这样,只要我们再启动类或者配置类上加上@EnableDynamicThreadPool这个注解,spring就会将DtpImportBeanDefinitionRegistrar和DtpBeanPostProcessor这两个类加入spring容器管理,从而实现我们的线程池的注册。
@SpringBootApplication @EnableDynamicThreadPool publicclassApplication{ publicstaticvoidmain(String[]args){ SpringApplication.run(Application.class,args); } }
如何实现线程池配置的动态刷新
首先明确一点,对于线程池的实现类,例如:ThreadPoolExecutor等,都有提供核心参数对应的 set 方法,让我们实现参数修改。因此,在核心类DtpRegistry中的refresh方法,我们可以这样写:
publicclassDtpRegistry{ /** *储存线程池 */ privatestaticfinalMapEXECUTOR_MAP=newConcurrentHashMap<>(); /** *刷新线程池参数 *@paramexecutorName线程池名字 *@paramproperties线程池参数 */ publicstaticvoidrefresh(StringexecutorName,ThreadPoolPropertiesproperties){ ThreadPoolExecutorexecutor=EXECUTOR_MAP.get(executorName) //设置参数 executor.setCorePoolSize(...); executor.setMaximumPoolSize(...); ...... } }
而这些新参数怎么来呢?我们可以引入Nacos、Apollo等配置中心,实现他们的监听器方法,在监听器方法里调用DtpRegistry的refresh方法刷新即可。
审核编辑:刘清
-
线程池
+关注
关注
0文章
57浏览量
6844
原文标题:动态线程池的简单实现思路
文章出处:【微信号:芋道源码,微信公众号:芋道源码】欢迎添加关注!文章转载请注明出处。
发布评论请先 登录
相关推荐
评论