分布式任务处理XXL-JOB
创始人
2025-05-31 15:51:30

分布式任务处理XXL-JOB

什么是分布式任务调度

对一个视频的转码可以理解为一个任务的执行,如果视频的数量比较多,如何去高效处理一批任务呢?
1、多线程
多线程是充分利用单机的资源。
2、分布式加多线程
充分利用多台计算机,每台计算机使用多线程处理。
方案2可扩展性更强。
方案2是一种分布式任务调度的处理方案。
什么是分布式任务调度?
我们可以先思考一下下面业务场景的解决方案:
每隔24小时执行数据备份任务。
12306网站会根据车次不同,设置几个时间点分批次放票。
某财务系统需要在每天上午10点前结算前一天的账单数据,统计汇总。
商品成功发货后,需要向客户发送短信提醒。
类似的场景还有很多,我们该如何实现?
多线程方式实现:
学过多线程的同学,可能会想到,我们可以开启一个线程,每sleep一段时间,就去检查是否已到预期执行时间。

任务调度顾名思义,就是对任务的调度,它是指系统为了完成特定业务,基于给定时间点,给定时间间隔或者给定执行次数自动执行任务。
通常任务调度的程序是集成在应用中的,比如:优惠卷服务中包括了定时发放优惠卷的的调度程序,结算服务中包括了定期生成报表的任务调度程序,由于采用分布式架构,一个服务往往会部署多个冗余实例来运行我们的业务,在这种分布式系统环境下运行任务调度,我们称之为分布式任务调度,如下图:
在这里插入图片描述

分布式调度要实现的目标:
不管是任务调度程序集成在应用程序中,还是单独构建的任务调度系统,如果采用分布式调度任务的方式就相当于将任务调度程序分布式构建,这样就可以具有分布式系统的特点,并且提高任务的调度处理能力:
1、并行任务调度
并行任务调度实现靠多线程,如果有大量任务需要调度,此时光靠多线程就会有瓶颈了,因为一台计算机CPU的处理能力是有限的。
如果将任务调度程序分布式部署,每个结点还可以部署为集群,这样就可以让多台计算机共同去完成任务调度,我们可以将任务分割为若干个分片,由不同的实例并行执行,来提高任务调度的处理效率。
2、高可用
若某一个实例宕机,不影响其他实例来执行任务。
3、弹性扩容
当集群中增加实例就可以提高并执行任务的处理效率。
4、任务管理与监测
对系统中存在的所有定时任务进行统一的管理及监测。让开发人员及运维人员能够时刻了解任务执行情况,从而做出快速的应急处理响应。
5、避免任务重复执行
当任务调度以集群方式部署,同一个任务调度可能会执行多次,比如在上面提到的电商系统中到点发优惠券的例子,就会发放多次优惠券,对公司造成很多损失,所以我们需要控制相同的任务在多个运行实例上只执行一次。

XXL-JOB

XXL-JOB是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。
官网:https://www.xuxueli.com/xxl-job/
XXL-JOB主要有调度中心、执行器、任务:
在这里插入图片描述

调度中心:
负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码;
主要职责为执行器管理、任务管理、监控运维、日志管理等
任务执行器:
负责接收调度请求并执行任务逻辑;
只要职责是注册服务、任务执行服务(接收到任务后会放入线程池中的任务队列)、执行结果上报、日志服务等
任务:
负责执行具体的业务处理。
调度中心与执行器之间的工作流程如下:
在这里插入图片描述
执行流程:
1.任务执行器根据配置的调度中心的地址,自动注册到调度中心
2.达到任务触发条件,调度中心下发任务
3.执行器基于线程池执行任务,并把执行结果放入内存队列中、把执行日志写入日志文件中
4.执行器消费内存队列中的执行结果,主动上报给调度中心
5.当用户在调度中心查看任务日志,调度中心请求任务执行器,任务执行器读取任务日志文件并返回日志详情

搭建XXL-JOB

首先下载XXL-JOB
GitHub:https://github.com/xuxueli/xxl-job
码云:https://gitee.com/xuxueli0323/xxl-job
这里我们使用2.3.1版本:
下载链接:https://github.com/xuxueli/xxl-job/releases/tag/2.3.1
在这里插入图片描述
xxl-job-admin:调度中心
xxl-job-core:公共依赖
xxl-job-executor-samples:执行器Sample示例(选择合适的版本执行器,可直接使用)
:xxl-job-executor-sample-springboot:Springboot版本,通过Springboot管理执行器,推荐这种方式;
:xxl-job-executor-sample-frameless:无框架版本;
doc :文档资料,包含数据库脚本

运行完数据库脚本之后,
修改配置文件中的数据库链接信息,
在本机idea运行xxl-job调度中心

执行器

下边配置执行器,执行器负责与调度中心通信接收调度中心发起的任务调度请求。
1、下边进入调度中心添加执行器
在这里插入图片描述
点击新增,填写执行器信息,appname是前边在nacos中配置xxl信息时指定的执行器的应用名。
在这里插入图片描述
添加成功:
在这里插入图片描述
在微服务项目中添加依赖,因为我的微服务依赖了父工程,在父工程已经指定了版本,所以这样配置,如果你们的没有我这样的项目结构,就需要指定版本

com.xuxuelixxl-job-core

3、在nacos下的微服务的yaml下配置xxl-job
因为我使用了nacos作为注册配置中心

xxl:job:admin: addresses: localhost:8080/xxl-job-adminexecutor:appname: media-process-serviceaddress: ip: port: 9999logpath: /data/applogs/xxl-job/jobhandlerlogretentiondays: 30accessToken: default_token

注意配置中的appname这是执行器的应用名,port是执行器启动的端口,如果本地启动多个执行器注意端口不能重复。
4、配置xxl-job的执行器
将xxl-job示例工程下配置类拷贝到媒资管理的service工程下
在这里插入图片描述
拷贝至:
在这里插入图片描述
到此完成媒资管理模块service工程配置xxl-job执行器,在xxl-job调度中心添加执行器,下边准备测试执行器与调度中心是否正常通信,因为接口工程依赖了service工程,所以启动媒资管理模块的接口工程。
启动后观察日志,出现下边的日志表示执行器在调度中心注册成功

同时观察调度中心中的执行器界面

在这里插入图片描述

执行任务

下边编写任务,参考示例工程中任务类的编写方法,如下图:
在这里插入图片描述
在媒资服务service包下新建jobhandler存放任务类,下边参考示例工程编写一个任务类

import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;/*** @description 测试执行器* @author Mr.M* @date 2022/9/13 20:32* @version 1.0*/@Component@Slf4j
public class SampleJob {/*** 1、简单任务示例(Bean模式)*/@XxlJob("testJob")public void testJob() throws Exception {log.info("开始执行.....");}}

下边在调度中心添加任务,进入任务管理
在这里插入图片描述
点击新增,填写任务信息
在这里插入图片描述
注意红色标记处:
调度类型:
固定速度指按固定的间隔定时调度。
Cron,通过Cron表达式实现更丰富的定时调度策略。
Cron表达式是一个字符串,通过它可以定义调度策略,格式如下:
{秒数} {分钟} {小时} {日期} {月份} {星期} {年份(可为空)}
xxl-job提供图形界面去配置:
在这里插入图片描述
一些例子如下:
30 10 1 * * ? 每天1点10分30秒触发
0/30 * * * * ? 每30秒触发一次

  • 0/10 * * * ? 每10分钟触发一次
    运行模式有BEAN和GLUE,bean模式较常用就是在项目工程中编写执行器的任务代码,GLUE是将任务代码编写在调度中心。
    JobHandler即任务方法名,填写任务方法上边@XxlJob注解中的名称。
    路由策略:当执行器集群部署时,调度中心向哪个执行器下发任务,这里选择第一个表示只向第一个执行器下发任务,路由策略的其它选项稍后在分片广播章节详细解释。
    添加成功,启动任务
    在这里插入图片描述
    通过调度日志查看任务执行情况
    在这里插入图片描述
    下边启动媒资管理的service工程,启动执行器。
    观察执行器方法的执行。
    在这里插入图片描述
    如果要停止任务需要在调度中心操作
    在这里插入图片描述
    任务跑一段时间注意清理日志
    在这里插入图片描述

分片广播

掌握了xxl-job的基本使用,下边思考如何进行分布式任务处理呢?如下图,我们会启动多个执行器组成一个集群,去执行任务。
在这里插入图片描述
执行器在集群部署下调度中心有哪些路由策略呢?
查看xxl-job官方文档,阅读高级配置相关的内容:

SQL
高级配置:- 路由策略:当执行器集群部署时,提供丰富的路由策略,包括;FIRST(第一个):固定选择第一个机器;LAST(最后一个):固定选择最后一个机器;ROUND(轮询):;RANDOM(随机):随机选择在线的机器;CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举;FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;- 子任务:每个任务都拥有一个唯一的任务ID(任务ID可以从任务列表获取),当本任务执行结束并且执行成功时,将会触发子任务ID所对应的任务的一次主动调度,通过子任务可以实现一个任务执行完成去执行另一个任务。- 调度过期策略:- 忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;- 立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;- 阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;- 任务超时时间:支持自定义任务超时时间,任务运行超时将会主动中断任务;- 失败重试次数;支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试;

下边要重点说的是分片广播策略,分片是指是调度中心以执行器为维度进行分片,将集群中的执行器标上序号:0,1,2,3…,广播是指每次调度会向集群中的所有执行器发送任务调度,请求中携带分片参数。
如下图:
在这里插入图片描述
每个执行器收到调度请求同时接收分片参数。
xxl-job支持动态扩容执行器集群从而动态增加分片数量,当有任务量增加可以部署更多的执行器到集群中,调度中心会动态修改分片的数量。
作业分片适用哪些场景呢?

  • 分片任务场景:10个执行器的集群来处理10w条数据,每台机器只需要处理1w条数据,耗时降低10倍;
  • 广播任务场景:广播执行器同时运行shell脚本、广播集群节点进行缓存更新等。
    所以,广播分片方式不仅可以充分发挥每个执行器的能力,并且根据分片参数可以控制任务是否执行,最终灵活控制了执行器集群分布式处理任务。
    使用说明:
    “分片广播” 和普通任务开发流程一致,不同之处在于可以获取分片参数进行分片业务处理。
    Java语言任务获取分片参数方式:
    BEAN、GLUE模式(Java),可参考Sample示例执行器中的示例任务"ShardingJobHandler":
/*** 2、分片广播任务*/
@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception {// 分片序号,从0开始int shardIndex = XxlJobHelper.getShardIndex();// 分片总数int shardTotal = XxlJobHelper.getShardTotal();....

下边测试作业分片:
1、定义作业分片的任务方法

/*** 2、分片广播任务*/@XxlJob("shardingJobHandler")public void shardingJobHandler() throws Exception {// 分片参数int shardIndex = XxlJobHelper.getShardIndex();int shardTotal = XxlJobHelper.getShardTotal();log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
log.info("开始执行第"+shardIndex+"批任务");}

2、在调度中心添加任务
在这里插入图片描述
添加成功:
在这里插入图片描述
启动任务,观察日志
在这里插入图片描述
下边启动两个执行器实例,观察每个实例的执行情况
首先在nacos中配置media-service的本地优先配置:

#配置本地优先
spring:cloud:config:override-none: true

将微服务启动两个实例
两个实例的在启动时注意端口不能冲突:
实例1 在VM options处添加:-Dserver.port=63051 -Dxxl.job.executor.port=9998
实例2 在VM options处添加:-Dserver.port=63050 -Dxxl.job.executor.port=9999
例如:
在这里插入图片描述
启动两个实例
观察任务调度中心,稍等片刻执行器有两个
在这里插入图片描述
观察两个执行实例的日志:
在这里插入图片描述
另一实例的日志如下:
在这里插入图片描述
从日志可以看每个实例的分片序号不同。
如果其中一个执行器挂掉,只剩下一个执行器在工作,稍等片刻调用中心发现少了一个执行器将动态调整总分片数为1。
到此作业分片任务调试完成,此时我们可以思考:
当一次分片广播到来,各执行器如何根据分片参数去分布式执行任务,保证执行器之间执行的任务不重复呢?

作业分片方案

掌握了xxl-job的分片广播调度方式,下边思考如何分布式去执行学成在线平台中的视频处理任务。
任务添加成功后,对于要处理的任务会添加到待处理任务表中,现在启动多个执行器实例去查询这些待处理任务,此时如何保证多个执行器不会查询到重复的任务呢?
XXL-JOB并不直接提供数据处理的功能,它只会给执行器分配好分片序号,在向执行器任务调度的同时下发分片总数以及分片序号等参数,执行器收到这些参数根据自己的业务需求去利用这些参数。
下图表示了多个执行器获取视频处理任务的结构:
在这里插入图片描述
每个执行器收到广播任务有两个参数:分片总数、分片序号。每个执行从数据表取任务时可以让任务id 模上 分片总数,如果等于分片序号则执行此任务。
上边两个执行器实例那么分片总数为2,序号为0、1,从任务1开始,如下:
1 % 2 = 1 执行器2执行
2 % 2 = 0 执行器1执行
3 % 2 = 1 执行器2执行
以此类推.
如何保证任务不重复执行?
通过作业分片方案保证了执行器之间查询到不重复的任务,如果一个执行器在处理一个视频还没有完成,此时调度中心又一次请求调度,为了不重复处理同一个视频该怎么办?
首先配置调度过期策略:
查看文档如下:
- 调度过期策略:调度中心错过调度时间的补偿处理策略,包括:忽略、立即补偿触发一次等;
- 忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;
- 立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;
- 阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;
这里我们选择忽略,如果立即执行一次就可能重复执行相同的任务。
在这里插入图片描述
其次,再看阻塞处理策略,阻塞处理策略就是当前执行器正在执行任务还没有结束时调度中心进行任务调度,此时该如何处理。
查看文档如下:
单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;
这里如果选择覆盖之前调度则可能重复执行任务,这里选择 丢弃后续调度或单机串行方式来避免任务重复执行。
只做这些配置可以保证任务不会重复执行吗?
做不到,还需要保证任务处理的幂等性,什么是任务的幂等性?任务的幂等性是指:对于数据的操作不论多少次,操作的结果始终是一致的。在本项目中要实现的是不论多少次任务调度同一个视频只执行一次成功的转码。
什么是幂等性?
它描述了一次和多次请求某一个资源对于资源本身应该具有同样的结果。
幂等性是为了解决重复提交问题,比如:恶意刷单,重复支付等。
解决幂等性常用的方案:
1)数据库约束,比如:唯一索引,主键。
2)乐观锁,常用于数据库,更新数据时根据乐观锁状态去更新。
3)唯一序列号,操作传递一个唯一序列号,操作时判断与该序列号相等则执行。
基于以上分析,在执行器接收调度请求去执行视频处理任务时要实现视频处理的幂等性,要有办法去判断该视频是否处理完成,如果正在处理中或处理完则不再处理。这里我们在数据库视频处理表中添加处理状态字段,视频处理完成更新状态为完成,执行视频处理前判断状态是否完成,如果完成则不再处理。

相关内容

热门资讯

湾财周报|大事记 比亚迪驳斥“... 一周大事记(5月26日-6月1日) 头条 比亚迪驳斥! 长城“车圈恒大论”是行业警示还是危言耸听?...
通源石油跌1.96%,成交额1... 5月30日,通源石油跌1.96%,成交额1.03亿元,换手率4.40%,总市值23.54亿元。 异动...
中国邮储银行浙江分行2025校... 点这里 ↑ 老满说高考 作者 l 老满 生涯规划师l 升学顾问l 拆书家 这是 老满说高考公众号 的...
公募基金规模首次突破33万亿元... 每经记者:肖芮冬 每经编辑:叶峰 天赐良基日报第654期 一、今日基金新闻速览 1、华润元大基金贾...
湾财周报 大事记 比亚迪驳斥“... 一周大事记(5月26日-6月1日)头条比亚迪驳斥!长城“车圈恒大论”是行业警示还是危言耸听?近日,关...
EL表达式JSTL标签库 EL表达式     EL:Expression Language 表达式语言     ...
关于测试,我发现了哪些新大陆 关于测试 平常也只是听说过一些关于测试的术语,但并没有使用过测试工具。偶然看到编程老师...
工信部、中汽协紧急发声!汽车“... 文/刘育英新一轮汽车价格战再起。近日,工信部、中汽协纷纷发声表示反对。工业和信息化部表示,将加大对汽...
3 ROS1通讯编程提高(1) 3 ROS1通讯编程提高3.1 使用VS Code编译ROS13.1.1 VS Code的安装和配置...
募资39亿,全亏光了,账上不到... 关于天然气,用户的感觉是价格一直在上涨,但很奇怪,不管怎么涨,天然气企业仍然亏,还亏得一塌糊涂。这是...
资阳房产评估公司 这是(tel-15828298733)整理的信息,希望能帮助到大家 在当今社会,随着经济的发展和城...
华桥汇利(中国)投资基金管理有... 今年第一季度,美国企业利润出现大幅下降,且面临着来自关税上升的持续压力,这一局面可能会在今年进一步加...
ESG 报告合规与鉴证:全球政... 在当下全球经济格局里,ESG(环境、社会和公司治理)已然成为衡量企业可持续发展能力的关键指标。随着全...
【Unity 手写PBR】Bu... 写在前面 前期积累: GAMES101作业7提高-实现微表面模型你需要了解的知识 【技...
与锤巨子生物的大嘴博士持股同一... 医美龙头巨子生物“成分争议”风波持续发酵。日前,美妆博主大嘴博士(香港大学化学博士郝宇)发文,质疑巨...
Linux之进程间通信 目录 进程间通信介绍 一、为什么要进行进程间通信? 二、进程间通信目的 三、进程间通信...
从“造城”到“留客”,文旅局长... 你有没有刷到最近各地文旅局局长全体“尬舞”的视频?领导们放下架子开始跳魔性舞蹈,这场舞的背后啊,可不...
Hazel引擎学习(十一) 我自己维护引擎的github地址在这里,里面加了不少注释,有需要的可以看...
孩子的教育金,分享3个「有效」... 点击 “简七读财” ,发送消息“ 理财小工具 ”免费领取“40个赚钱工具资源包”晚上好,我是简七编...
iZotope RX 10(专... iZotope RX 10是一款专业的音频修复和增强软件,具有音频修复工具、音频增强工...
我的docker随笔40:cl... 本文介绍 clickhouse 数据库的容器化部署。 起因 某项目需生产环境数据库,因...
透视一周牛熊股:最牛股路桥信息... 过去一周(5月26日—5月30日)A股三大指数集体下跌。截至5月30日收盘,上证指数报3347.49...
基于matlab创建地面固定雷... 一、前言此示例演示如何创建和显示包含地面固定雷达、转弯飞机、等速飞机和移动地面车辆的多平台方案。二、...
暗夜发光,独自闪耀,盘点网页暗... 众所周知,网页的暗黑模式可以减少屏幕反射和蓝光辐射,减少眼睛的疲劳感&#...
C语言-程序环境和预处理(2) 文章目录预处理详解1.预定义符号2.#define2.1#define定义的标识符2.2#defin...
MySQL数据库知识整理 MySQL数据库知识整理 MySQL事务详解 事务四大特性ACID 原子性(Atomi...
Docker基础篇——最全讲解 文章目录一、CentOS安装docker二、启动帮助类命令三、镜像命令1.名词概念2.常用命令2.1...
五问“恒大论”,比亚迪回应车圈... “车圈恒大”引发的舆论风暴还在进一步发酵。近日,比亚迪集团品牌及公关处总经理李云飞在微博发文,引用多...
javafx实现聚光灯效果,圆... 系列文章专栏:javafx图形绘制、桌面录屏录音源码合集 目录 一、实现的效果 二、实现思路
300左右哪款蓝牙耳机适合学生... 近年来,随着蓝牙耳机的发展,不管是音质、外观、佩戴还是降噪都有了很大的提...