前言
在此之前对于xxl-job仅仅限于使用级别,对其底层实现一无所知,闲来无事,翻翻源码研究一下
准备
工程目录结构分析
1
2
3
4
5
6
| doc:官方文档
xxl-job-admin:调度中心
xxl-job-core:公共依赖
xxl-job-executor-samples:执行器Sample示例(选择合适的版本执行器,可直接使用,也可以参考其并将现有项目改造成执行器)
:xxl-job-executor-sample-springboot:Springboot版本,通过Springboot管理执行器,推荐这种方式;
:xxl-job-executor-sample-frameless:无框架版本;
|
xxl-job-admin模块是调度中心,统一管理任务调度平台上调度任务,负责触发调度执行,并且提供任务管理平台。
xxl-job-core模块是公共的依赖,供调度中心以及调度任务依赖。
xxl-job-executor-samples是执行器案例,有两个执行器案例,xxl-job-executor-sample-springboot是spring版本。xxl-job-executor-sample-frameless是普通的版本,直接通过main方法启动。
调度中心启动流程分析
在启动xxl-job,首先应该启动xxl-job-admin,然后启动执行案例的其中一个。xxl-job-admin是springboot项目,可以直接启动,具体启动方法直接参考官方的文档,非常的详细。今天要将的内容就是xxl-job-admin的启动流程分析。
在启动xxl-job-admin的过程中,XxlJobAdminConfig是配置类,在afterPropertiesSet方法中会创建调度器以及对调度器进行初始化。
1
2
3
4
5
6
7
8
9
| private XxlJobScheduler xxlJobScheduler;
@Override
public void afterPropertiesSet() throws Exception {
adminConfig = this;
xxlJobScheduler = new XxlJobScheduler();
xxlJobScheduler.init();
}
|
xxlJobScheduler.init()方法主要做了如下几件事
- 国际化初始化 initI18n()
- 触发器线程池创建 JobTriggerPoolHelper.toStart();
- 注册监控器启动 JobRegistryHelper.getInstance().start();
- 失败监控器启动 JobFailMonitorHelper.getInstance().start();
- 丢失监控器启动 JobCompleteHelper.getInstance().start();
- 日志任务启动 JobLogReportHelper.getInstance().start();
- 调度启动 JobScheduleHelper.getInstance().start();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| // xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java
public void init() throws Exception {
// init i18n 初始化国际化
initI18n();
// admin trigger pool start 触发器线程池创建
JobTriggerPoolHelper.toStart();
// admin registry monitor run 注册监控器启动
JobRegistryHelper.getInstance().start();
// admin fail-monitor run 失败监控器启动
JobFailMonitorHelper.getInstance().start();
// admin lose-monitor run ( depend on JobTriggerPoolHelper )
JobCompleteHelper.getInstance().start();
// admin log report start 调度启动
JobLogReportHelper.getInstance().start();
// start-schedule ( depend on JobTriggerPoolHelper )
JobScheduleHelper.getInstance().start();
logger.info(">>>>>>>>> init xxl-job admin success.");
}
|
触发器线程池创建
JobTriggerPoolHelper.toStart()最终调用JobTriggerPoolHelper.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
| public class JobTriggerPoolHelper {
private static Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class);
// ....
private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper();
public static void toStart() {
helper.start();
}
public static void toStop() {
helper.stop();
}
// ...
// ---------------------- trigger pool ----------------------
// fast/slow thread pool
private ThreadPoolExecutor fastTriggerPool = null;
private ThreadPoolExecutor slowTriggerPool = null;
public void start(){
fastTriggerPool = new ThreadPoolExecutor(
10,
// 最大线程数量从配置中取
XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
}
});
slowTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
}
});
}
public void stop() {
//triggerPool.shutdown();
fastTriggerPool.shutdownNow();
slowTriggerPool.shutdownNow();
logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success.");
}
// ...
}
|
fastTriggerPool为快速线程池、slowTriggerPool为慢速线程池,都是采用阻塞队列LinkedBlockingQueue,快速线程池的阻塞队列大小为1000,慢速线程池的阻塞队列大小为2000。快速线程池、慢速线程池在什么时候被用来调度任务呢?默认是用快速调度器调度任务的,当缓存中等待被调度的同一个任务的数量大于10的时候,就用慢速调度器调度任务。
注册监控器启动
JobRegistryHelper.getInstance().start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
| /**
* job registry instance
* @author xuxueli 2016-10-02 19:10:24
*/
public class JobRegistryHelper {
private static Logger logger = LoggerFactory.getLogger(JobRegistryHelper.class);
private static JobRegistryHelper instance = new JobRegistryHelper();
public static JobRegistryHelper getInstance(){
return instance;
}
private ThreadPoolExecutor registryOrRemoveThreadPool = null;
private Thread registryMonitorThread;
private volatile boolean toStop = false;
public void start(){
// for registry or remove
registryOrRemoveThreadPool = new ThreadPoolExecutor(
2,
10,
30L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
r.run();
logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");
}
});
// for monitor
registryMonitorThread = new Thread(new Runnable() {
@Override
public void run() {
while (!toStop) {
try {
// auto registry group
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
if (groupList!=null && !groupList.isEmpty()) {
// remove dead address (admin/executor)
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
if (ids!=null && ids.size()>0) {
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
}
// fresh online address (admin/executor)
HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
if (list != null) {
for (XxlJobRegistry item: list) {
if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
String appname = item.getRegistryKey();
List<String> registryList = appAddressMap.get(appname);
if (registryList == null) {
registryList = new ArrayList<String>();
}
if (!registryList.contains(item.getRegistryValue())) {
registryList.add(item.getRegistryValue());
}
appAddressMap.put(appname, registryList);
}
}
}
// fresh group address
for (XxlJobGroup group: groupList) {
List<String> registryList = appAddressMap.get(group.getAppname());
String addressListStr = null;
if (registryList!=null && !registryList.isEmpty()) {
Collections.sort(registryList);
StringBuilder addressListSB = new StringBuilder();
for (String item:registryList) {
addressListSB.append(item).append(",");
}
addressListStr = addressListSB.toString();
addressListStr = addressListStr.substring(0, addressListStr.length()-1);
}
group.setAddressList(addressListStr);
group.setUpdateTime(new Date());
XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
try {
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
}
});
// 将registryMonitorThread设置为守护线程
registryMonitorThread.setDaemon(true);
registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
registryMonitorThread.start();
}
// ...
}
|
- JobRegistryHelper的start创建了一个调度任务注册线程池registryOrRemoveThreadPool以及注册监控器线程registryMonitorThread,调度任务注册线程池用来执行调度任务的注册,注册监控器线程用来监控执行器的机器是否下线。然后将registryMonitorThread设置为守护线程,最后启动registryMonitorThread线程,开始监控执行器的机器。
- registryMonitorThread线程的run方法的代码逻辑参考上述代码片段的registryMonitorThread=new 中的run方法:run()方法一直执行,直到服务停止.
- 第一将已经下线的执行器的记录从数据库中删除,
- 第二将还在线的执行器机器记录重新设置执行器地址以及更新执行器的时间,然后更新数据库的记录。
- 怎么判定执行器已经下线了:如果数据库中的update_time字段小于当前时间减去死亡期限,那么说明已经执行器在死亡期限没有进行更新时间,就判定已经下线了。执行器在启动的时候,会启动一个执行器线程不断的执行注册任务,执行器任务会更新update_time字段。
失败监控器启动
JobFailMonitorHelper.getInstance().start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
| // ...
monitorThread = new Thread(new Runnable() {
@Override
public void run() {
// monitor
while (!toStop) {
try {
List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
if (failLogIds!=null && !failLogIds.isEmpty()) {
for (long failLogId: failLogIds) {
// lock log
int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
if (lockRet < 1) {
continue;
}
XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());
// 1、fail retry monitor
if (log.getExecutorFailRetryCount() > 0) {
JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
}
// 2、fail alarm monitor
int newAlarmStatus = 0; // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
newAlarmStatus = alarmResult?2:3;
} else {
newAlarmStatus = 1;
}
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
}
}
try {
TimeUnit.SECONDS.sleep(10);
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");
}
});
// ...
|
- 1 创建了一个名字为monitorThread的线程,并设为守护线程,然后启动这个线程。run方法一直运行,直到线程停止。
- 2 run方法的首先从数据库中获取失败的调度任务日志列表(每次最多一千条)
- 3 遍历失败的调度任务日志列表:
- 3.1 将失败的调度任务日志进行锁定。
- 3.2 如果调度任务的失败重试次数大于0,可重试,触发任务执行(JobTriggerPoolHelper.trigger()),更新任务日志信息(重试次数减一)。
- 3.3 当邮件不为空时,触发告警信息,最后将锁定的日志状态更新为告警状态。
丢失(掉线)监控器启动
JobCompleteHelper.getInstance().start();代码组织同上略去
- 启动回调线程池callbackThreadPool和丢失(掉线)监控线程monitorThread
- 丢失(掉线)监控线程monitorThread任务结果丢失处理:调度记录停留在 “运行中” 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
日志任务启动
JobLogReportHelper.getInstance().start() 代码组织同上略去
1、统计当前时间前三天的触发任务的数量、运行中的任务的数量、成功的任务数量、任务失败的数量,然后保存在数据库中。
2、根据配置的保存日志的过期时间,将已经过期的日志从数据库中查出来,然后清理过期的日志。日志任务启动是创意了一个线程,然后一直在后台运行。
调度启动 JobScheduleHelper.getInstance().start();
参考资料: https://www.zhihu.com/column/c_1442233921593761792