0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看威廉希尔官方网站 视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

xxl-job任务调度中间件解决定时任务的调度问题

jf_ro2CN3Fa 来源:码农参上 2023-01-31 09:53 次阅读

xxl-job是一款非常优秀的任务调度中间件,轻量级、使用简单、支持分布式等优点,让它广泛应用在我们的项目中,解决了不少定时任务的调度问题。

我们都知道,在使用过程中需要先到xxl-job的任务调度中心页面上,配置执行器executor 和具体的任务job ,这一过程如果项目中的定时任务数量不多还好说,如果任务多了的话还是挺费工夫的。

cf3f43f8-9ed1-11ed-bfe3-dac502259ad0.png

假设项目中有上百个这样的定时任务,那么每个任务都需要走一遍绑定jobHander后端接口,填写cron表达式这个流程…

我就想问问,填多了谁能不迷糊?

于是出于功能优化(偷懒 )这一动机,前几天我萌生了一个想法,有没有什么方法能够告别xxl-job的管理页面,能够让我不再需要到页面上去手动注册执行器和任务,实现让它们自动注册到调度中心呢。

分析

分析一下,其实我们要做的很简单,只要在项目启动时主动注册executor和各个jobHandler到调度中心就可以了,流程如下:

cf548d08-9ed1-11ed-bfe3-dac502259ad0.jpg

有的小伙伴们可能要问了,我在页面上创建执行器 的时候,不是有一个选项叫做自动注册 吗,为什么我们这里还要自己添加新执行器?

其实这里有个误区,这里的自动注册指的是会根据项目中配置的xxl.job.executor.appname,将配置的机器地址自动注册到这个执行器的地址列表中。但是如果你之前没有手动创建过执行器,那么是不会给你自动添加一个新执行器到调度中心的。

既然有了想法咱们就直接开干,先到github上拉一份xxl-job的源码下来

整个项目导入idea后,先看一下结构:

cf5d6acc-9ed1-11ed-bfe3-dac502259ad0.png

结合着文档和代码,先梳理一下各个模块都是干什么的:

xxl-job-admin:任务调度中心,启动后就可以访问管理页面,进行执行器和任务的注册、以及任务调用等功能了

xxl-job-core:公共依赖,项目中使用到xxl-job时要引入的依赖包

xxl-job-executor-samples:执行示例,分别包含了springboot版本和不使用框架的版本

为了弄清楚注册和查询executor和jobHandler调用的是哪些接口,我们先从页面上去抓一个请求看看:

cf6a820c-9ed1-11ed-bfe3-dac502259ad0.png

好了,这样就能定位到xxl-job-admin模块中/jobgroup/save这个接口,接下来可以很容易地找到源码位置:

cf752586-9ed1-11ed-bfe3-dac502259ad0.png

按照这个思路,可以找到下面这几个关键接口:

/jobgroup/pageList:执行器列表的条件查询

/jobgroup/save:添加执行器

/jobinfo/pageList:任务列表的条件查询

/jobinfo/add:添加任务

但是如果直接调用这些接口,那么就会发现它会跳转到xxl-job-admin的的登录页面:

cf803e58-9ed1-11ed-bfe3-dac502259ad0.png

其实想想也明白,出于安全性考虑,调度中心的接口也不可能允许裸调的。那么再回头看一下刚才页面上的请求就会发现,它在Headers中添加了一条名为XXL_JOB_LOGIN_IDENTITY的cookie:

cf8a6c7a-9ed1-11ed-bfe3-dac502259ad0.png

至于这条cookie,则是在通过用户名和密码调用调度中心的/login接口时返回的,在返回的response可以直接拿到。只要保存下来,并在之后每次请求时携带,就能够正常访问其他接口了。

到这里,我们需要的5个接口就基本准备齐了,接下来准备开始正式的改造工作。

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

改造

我们改造的目的是实现一个starter,以后只要引入这个starter就能实现executor和jobHandler的自动注册,要引入的关键依赖有下面两个:


com.xuxueli
xxl-job-core
2.3.0



org.springframework.boot
spring-boot-autoconfigure

1、接口调用

在调用调度中心的接口前,先把xxl-job-admin模块中的XxlJobInfo和XxlJobGroup这两个类拿到我们的starter项目中,用于接收接口调用的结果。

登录接口

创建一个JobLoginService,在调用业务接口前,需要通过登录接口获取cookie,并在获取到cookie后,缓存到本地的Map中。

privatefinalMaploginCookie=newHashMap<>();

publicvoidlogin(){
Stringurl=adminAddresses+"/login";
HttpResponseresponse=HttpRequest.post(url)
.form("userName",username)
.form("password",password)
.execute();
Listcookies=response.getCookies();
OptionalcookieOpt=cookies.stream()
.filter(cookie->cookie.getName().equals("XXL_JOB_LOGIN_IDENTITY")).findFirst();
if(!cookieOpt.isPresent())
thrownewRuntimeException("getxxl-jobcookieerror!");

Stringvalue=cookieOpt.get().getValue();
loginCookie.put("XXL_JOB_LOGIN_IDENTITY",value);
}

其他接口在调用时,直接从缓存中获取cookie,如果缓存中不存在则调用/login接口,为了避免这一过程失败,允许最多重试3次。

publicStringgetCookie(){
for(inti=0;i< 3; i++) {
        String cookieStr = loginCookie.get("XXL_JOB_LOGIN_IDENTITY");
        if (cookieStr !=null) {
            return "XXL_JOB_LOGIN_IDENTITY="+cookieStr;
        }
        login();
    }
    throw new RuntimeException("get xxl-job cookie error!");
}

执行器接口

创建一个JobGroupService,根据appName和执行器名称title查询执行器列表:

publicListgetJobGroup(){
Stringurl=adminAddresses+"/jobgroup/pageList";
HttpResponseresponse=HttpRequest.post(url)
.form("appname",appName)
.form("title",title)
.cookie(jobLoginService.getCookie())
.execute();

Stringbody=response.body();
JSONArrayarray=JSONUtil.parse(body).getByPath("data",JSONArray.class);
Listlist=array.stream()
.map(o->JSONUtil.toBean((JSONObject)o,XxlJobGroup.class))
.collect(Collectors.toList());
returnlist;
}

我们在后面要根据配置文件中的appName和title判断当前执行器是否已经被注册到调度中心过,如果已经注册过那么则跳过,而/jobgroup/pageList接口是一个模糊查询接口,所以在查询列表的结果列表中,还需要再进行一次精确匹配。

publicbooleanpreciselyCheck(){
ListjobGroup=getJobGroup();
Optionalhas=jobGroup.stream()
.filter(xxlJobGroup->xxlJobGroup.getAppname().equals(appName)
&&xxlJobGroup.getTitle().equals(title))
.findAny();
returnhas.isPresent();
}

注册新executor到调度中心:

publicbooleanautoRegisterGroup(){
Stringurl=adminAddresses+"/jobgroup/save";
HttpResponseresponse=HttpRequest.post(url)
.form("appname",appName)
.form("title",title)
.cookie(jobLoginService.getCookie())
.execute();
Objectcode=JSONUtil.parse(response.body()).getByPath("code");
returncode.equals(200);
}

任务接口

创建一个JobInfoService,根据执行器id,jobHandler名称查询任务列表,和上面一样,也是模糊查询:

publicListgetJobInfo(IntegerjobGroupId,StringexecutorHandler){
Stringurl=adminAddresses+"/jobinfo/pageList";
HttpResponseresponse=HttpRequest.post(url)
.form("jobGroup",jobGroupId)
.form("executorHandler",executorHandler)
.form("triggerStatus",-1)
.cookie(jobLoginService.getCookie())
.execute();

Stringbody=response.body();
JSONArrayarray=JSONUtil.parse(body).getByPath("data",JSONArray.class);
Listlist=array.stream()
.map(o->JSONUtil.toBean((JSONObject)o,XxlJobInfo.class))
.collect(Collectors.toList());

returnlist;
}

注册一个新任务,最终返回创建的新任务的id:

publicIntegeraddJobInfo(XxlJobInfoxxlJobInfo){
Stringurl=adminAddresses+"/jobinfo/add";
MapparamMap=BeanUtil.beanToMap(xxlJobInfo);
HttpResponseresponse=HttpRequest.post(url)
.form(paramMap)
.cookie(jobLoginService.getCookie())
.execute();

JSONjson=JSONUtil.parse(response.body());
Objectcode=json.getByPath("code");
if(code.equals(200)){
returnConvert.toInt(json.getByPath("content"));
}
thrownewRuntimeException("addjobInfoerror!");
}

2、创建新注解

在创建任务时,必填字段除了执行器和jobHandler之外,还有任务描述负责人Cron表达式调度类型运行模式 。在这里,我们默认调度类型为CRON、运行模式为BEAN,另外的3个字段的信息需要用户指定。

因此我们需要创建一个新注解@XxlRegister,来配合原生的@XxlJob注解进行使用,填写这几个字段的信息:

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public@interfaceXxlRegister{
Stringcron();
StringjobDesc()default"defaultjobDesc";
Stringauthor()default"defaultAuthor";
inttriggerStatus()default0;
}

最后,额外添加了一个triggerStatus属性,表示任务的默认调度状态,0为停止状态,1为运行状态。

3、自动注册核心

基本准备工作做完后,下面实现自动注册执行器和jobHandler的核心代码。核心类实现ApplicationListener接口,在接收到ApplicationReadyEvent事件后开始执行自动注册逻辑。

@Component
publicclassXxlJobAutoRegisterimplementsApplicationListener,
ApplicationContextAware{
privatestaticfinalLoglog=LogFactory.get();
privateApplicationContextapplicationContext;
@Autowired
privateJobGroupServicejobGroupService;
@Autowired
privateJobInfoServicejobInfoService;

@Override
publicvoidsetApplicationContext(ApplicationContextapplicationContext)throwsBeansException{
this.applicationContext=applicationContext;
}

@Override
publicvoidonApplicationEvent(ApplicationReadyEventevent){
addJobGroup();//注册执行器
addJobInfo();//注册任务
}
}

自动注册执行器的代码非常简单,根据配置文件中的appName和title精确匹配查看调度中心是否已有执行器被注册过了,如果存在则跳过,不存在则新注册一个:

privatevoidaddJobGroup(){
if(jobGroupService.preciselyCheck())
return;

if(jobGroupService.autoRegisterGroup())
log.info("autoregisterxxl-jobgroupsuccess!");
}

自动注册任务的逻辑则相对复杂一些,需要完成:

通过applicationContext拿到spring容器中的所有bean,再拿到这些bean中所有添加了@XxlJob注解的方法

对上面获取到的方法进行检查,是否添加了我们自定义的@XxlRegister注解,如果没有则跳过,不进行自动注册

对同时添加了@XxlJob和@XxlRegister的方法,通过执行器id和jobHandler的值判断是否已经在调度中心注册过了,如果已存在则跳过

对于满足注解条件且没有注册过的jobHandler,调用接口注册到调度中心

具体代码如下:

privatevoidaddJobInfo(){
ListjobGroups=jobGroupService.getJobGroup();
XxlJobGroupxxlJobGroup=jobGroups.get(0);

String[]beanDefinitionNames=applicationContext.getBeanNamesForType(Object.class,false,true);
for(StringbeanDefinitionName:beanDefinitionNames){
Objectbean=applicationContext.getBean(beanDefinitionName);

MapannotatedMethods=MethodIntrospector.selectMethods(bean.getClass(),
newMethodIntrospector.MetadataLookup(){
@Override
publicXxlJobinspect(Methodmethod){
returnAnnotatedElementUtils.findMergedAnnotation(method,XxlJob.class);
}
});
for(Map.EntrymethodXxlJobEntry:annotatedMethods.entrySet()){
MethodexecuteMethod=methodXxlJobEntry.getKey();
XxlJobxxlJob=methodXxlJobEntry.getValue();

//自动注册
if(executeMethod.isAnnotationPresent(XxlRegister.class)){
XxlRegisterxxlRegister=executeMethod.getAnnotation(XxlRegister.class);
ListjobInfo=jobInfoService.getJobInfo(xxlJobGroup.getId(),xxlJob.value());
if(!jobInfo.isEmpty()){
//因为是模糊查询,需要再判断一次
Optionalfirst=jobInfo.stream()
.filter(xxlJobInfo->xxlJobInfo.getExecutorHandler().equals(xxlJob.value()))
.findFirst();
if(first.isPresent())
continue;
}

XxlJobInfoxxlJobInfo=createXxlJobInfo(xxlJobGroup,xxlJob,xxlRegister);
IntegerjobInfoId=jobInfoService.addJobInfo(xxlJobInfo);
}
}
}
}

4、自动装配

创建一个配置类,用于扫描bean:

@Configuration
@ComponentScan(basePackages="com.xxl.job.plus.executor")
publicclassXxlJobPlusConfig{
}

将它添加到META-INF/spring.factories文件:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=
com.xxl.job.plus.executor.config.XxlJobPlusConfig

到这里starter的编写就完成了,可以通过maven发布jar包到本地或者私服:

mvncleaninstall/deploy

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

项目地址:https://github.com/YunaiV/yudao-cloud

视频教程:https://doc.iocoder.cn/video/

测试

新建一个springboot项目,引入我们在上面打好的包:


com.cn.hydra
xxljob-autoregister-spring-boot-starter
0.0.1

在application.properties中配置xxl-job的信息,首先是原生的配置内容:

xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
xxl.job.accessToken=default_token
xxl.job.executor.appname=xxl-job-executor-test
xxl.job.executor.address=
xxl.job.executor.ip=127.0.0.1
xxl.job.executor.port=9999
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
xxl.job.executor.logretentiondays=30

此外还要额外添加我们自己的starter要求的新配置内容:

#admin用户名
xxl.job.admin.username=admin
#admin密码
xxl.job.admin.password=123456
#执行器名称
xxl.job.executor.title=test-title

完成后在代码中配置一下XxlJobSpringExecutor,然后在测试接口上添加原生@XxlJob注解和我们自定义的@XxlRegister注解:

@XxlJob(value="testJob")
@XxlRegister(cron="000**?*",
author="hydra",
jobDesc="测试job")
publicvoidtestJob(){
System.out.println("#码农参上");
}


@XxlJob(value="testJob222")
@XxlRegister(cron="591-20**?",
triggerStatus=1)
publicvoidtestJob2(){
System.out.println("#作者:Hydra");
}

@XxlJob(value="testJob444")
@XxlRegister(cron="595923**?")
publicvoidtestJob4(){
System.out.println("helloxxljob");
}

启动项目,可以看到执行器自动注册成功:

cf9c5fa2-9ed1-11ed-bfe3-dac502259ad0.png

再打开调度中心的任务管理页面,可以看到同时添加了两个注解的任务也已经自动完成了注册:

cfa28cd8-9ed1-11ed-bfe3-dac502259ad0.png

从页面上手动执行任务进行测试,可以执行成功:

cfab0962-9ed1-11ed-bfe3-dac502259ad0.png

到这里,starter的编写和测试过程就算基本完成了,项目中引入后,以后也能省出更多的时间来摸鱼学习了~







审核编辑:刘清

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • Micron
    +关注

    关注

    0

    文章

    30

    浏览量

    56140
  • 执行器
    +关注

    关注

    5

    文章

    377

    浏览量

    19348
  • RBAC
    +关注

    关注

    0

    文章

    44

    浏览量

    9962

原文标题:魔改xxl-job,彻底告别手动配置任务!

文章出处:【微信号:芋道源码,微信公众号:芋道源码】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    简单的任务调度代码

    通过定时器节拍控制任务执行周期,此代码的中断函数时AVR的简单的任务调度.rar (2.4 KB )
    发表于 06-12 04:35

    Linux系统定时任务Crond

    会定期(默认每分钟检查一次)检查系统中是否有要执行的任务工作,如果有,便会根据其预先设定的定时任务规则自动执行该定时任务工作,这个crond定时任务服务就相当于我们平时早起使用的闹钟一
    发表于 07-05 06:22

    运行调度中心后访问出现500错误怎么解决

    XXL-Job 访问调度中心出现 500 应用程序异常
    发表于 11-08 09:39

    关于XXL-JOB定时调度器的使用总结

    XXL-JOB 定时调度器 使用小结
    发表于 04-23 14:53

    UCOSIII的任务管理与任务调度和切换简述

    就绪表7、任务调度和切换1、任务调度时间片轮转调度2、任务切换8、UCOSIII的
    发表于 02-18 06:14

    DVS系统硬实时周期任务动态调度算法

    与实时任务的可调度分析不同,实时DVS调度在保证任务截止时间限制同时,还要关注任务执行的处理器功耗。功耗研究一段时间的累积效果,传统基于最坏
    发表于 12-16 23:55 12次下载

    OPC 实时任务系统动态调度算法的研究与设计The Stud

    本文基于已有的OPC Server 实时任务模型,设计了处理混合任务集的动态调度算法(基于截止期优先)和实现方式。该算法实现了对混合任集可调度性的判断,可以完成有硬实时性要
    发表于 05-31 15:36 13次下载

    时任务双容错调度算法

    云环境中的处理机故障已成为云计算不可忽视的问题,容错成为设计和发展云计算系统的关键需求。针对一些容错调度算法在任务调度过程中调度效率低下以及任务
    发表于 01-14 11:26 0次下载

    移动终端最优节能任务调度

    讨论在移动终端设备下硬实时任务调度的原理、机制、策略。在硬实时任务对时效性的要求与现时任务对能耗管理的要求这2个约束条件下对任务进行
    发表于 02-07 16:30 1次下载

    Python定时任务的实现方式

    调度模块schedule实现定时任务 利用任务框架APScheduler实现定时任务 Job 作业 Trigger 触发器 Executor
    的头像 发表于 10-08 15:20 5725次阅读

    什么是定时任务 xxl-job架构设计方案

    同一个执行器集群内AppName(xxl.job.executor.appname)需要保持一致;调度中心根据该配置动态发现不同集群的在线执行器列表。
    发表于 11-14 12:44 1499次阅读

    xxl-job惊艳的设计,怎能叫人不爱

    xxl-job 定义了两个接口 ExecutorBiz,AdminBiz,ExecutorBiz 接口中封装了向心跳,暂停,触发执行等操作,AdminBiz 封装了回调,注册,取消注册操作,接口的实现类中,并没有通信相关的处理。
    的头像 发表于 12-22 14:43 763次阅读

    SpringBoot-动态定时任务调度

    先说业务场景,根据用户输入的cron表达式进行定时调度,举个例子:如图
    的头像 发表于 04-07 14:56 830次阅读
    SpringBoot-动态<b class='flag-5'>定时任务</b><b class='flag-5'>调度</b>

    分布式定时调度xxl-job最佳实践方法

    定时任务是按照指定时间周期运行任务。使用场景为在某个固定时间点执行,或者周期性的去执行某个任务,比如:每天晚上24点做数据汇总,
    的头像 发表于 11-30 11:06 1554次阅读
    分布式<b class='flag-5'>定时调度</b>:<b class='flag-5'>xxl-job</b>最佳实践方法

    华玉通软宣布“海鸥”确定性调度中间件(SEAGULL DS)正式商用

    今天,华玉通软(下称“华玉”)宣布“海鸥”确定性调度中间件(SEAGULL DS)正式商用。
    的头像 发表于 03-17 11:01 673次阅读
    华玉通软宣布“海鸥”确定性<b class='flag-5'>调度</b><b class='flag-5'>中间件</b>(SEAGULL DS)正式商用