博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
如何实现分布式定时任务(xxl的实现)
阅读量:4663 次
发布时间:2019-06-09

本文共 8463 字,大约阅读时间需要 28 分钟。

1、前言

    定时任务在任何系统中都非常重要,如:订单48小时自动完成,每日重新给会员送优惠券,游戏中每隔半小时给玩家添加体力等等。

对于小型系统我们可以用quartz和spring task实现定时任务,这样都任务存在如下几个任务:

1)单点问题,如果任务服务器挂了,定时任务就挂了;

2)如果任务服务和业务代码耦合在一起,业务服务部署多台主机,任务服务在每天机器上都会触发,引起任务重复执行;

3)任务不可预知执行情况,需要开发人员每天去检查日志,查看是否执行成功;

4)当任务失败了之后,没办法手动执行任务 

   这时候分布式任务就该出场了。那么分布式任务是如何解决上面当问题当昵?

2、名词说明

    调度中心:负责任务调度当服务;

    执行器:   执行任务当服务器;

    管理中心:负责任务的创建更新删除,查看任务状态,执行过程的服务器。

3、架构图

     

  说明

   1)服务注册中心可以是zookeeper,eureka,也可以是自己实现的。

   2)leader选择器可以替换为分布式锁(redission),在调度任务的时候控制只有一个调度中心在分配任务,当然也可以使用select * from for update。

         目前xxl-job就是采用select * from for update 加时间轮的方式实现的。

package 
com.xxl.job.admin.core.thread;
 
import 
com.xxl.job.admin.core.conf.XxlJobAdminConfig;
import 
com.xxl.job.admin.core.cron.CronExpression;
import 
com.xxl.job.admin.core.model.XxlJobInfo;
import 
com.xxl.job.admin.core.trigger.TriggerTypeEnum;
import 
org.slf4j.Logger;
import 
org.slf4j.LoggerFactory;
 
import 
java.sql.Connection;
import 
java.sql.PreparedStatement;
import 
java.sql.SQLException;
import 
java.util.*;
import 
java.util.concurrent.ConcurrentHashMap;
import 
java.util.concurrent.TimeUnit;
 
/**
 
* @author xuxueli 2019-05-21
 
*/
public 
class 
JobScheduleHelper {
    
private 
static 
Logger logger = LoggerFactory.getLogger(JobScheduleHelper.
class
);
 
    
private 
static 
JobScheduleHelper instance = 
new 
JobScheduleHelper();
    
public 
static 
JobScheduleHelper getInstance(){
        
return 
instance;
    
}
 
    
private 
Thread scheduleThread;
    
private 
Thread ringThread;
    
private 
volatile 
boolean 
toStop = 
false
;
    
private 
volatile 
static 
Map<Integer, List<Integer>> ringData = 
new 
ConcurrentHashMap<>();
 
    
public 
void 
start(){
 
        
// schedule thread
        
scheduleThread = 
new 
Thread(
new 
Runnable() {
            
@Override
            
public 
void 
run() {
 
                
try 
{
                    
TimeUnit.MILLISECONDS.sleep(
5000 
- System.currentTimeMillis()%
1000 
);
                
catch 
(InterruptedException e) {
                    
if 
(!toStop) {
                        
logger.error(e.getMessage(), e);
                    
}
                
}
                
logger.info(
">>>>>>>>> init xxl-job admin scheduler success."
);
 
                
while 
(!toStop) {
 
                    
// 扫描任务
                    
long 
start = System.currentTimeMillis();
                    
Connection conn = 
null
;
                    
PreparedStatement preparedStatement = 
null
;
                    
try 
{
                        
if 
(conn==
null 
|| conn.isClosed()) {
                            
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                        
}
                        
conn.setAutoCommit(
false
);
 
                        
preparedStatement = conn.prepareStatement(  
"select * from xxl_job_lock where lock_name = 'schedule_lock' for update" 
);
                        
preparedStatement.execute();
 
                        
// tx start
 
                        
// 1、预读10s内调度任务
                        
long 
maxNextTime = System.currentTimeMillis() + 
10000
;
                        
long 
nowTime = System.currentTimeMillis();
                        
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(maxNextTime);
                        
if 
(scheduleList!=
null 
&& scheduleList.size()>
0
) {
                            
// 2、推送时间轮
                            
for 
(XxlJobInfo jobInfo: scheduleList) {
 
                                
// 时间轮刻度计算
                                
int 
ringSecond = -
1
;
                                
if 
(jobInfo.getTriggerNextTime() < nowTime - 
10000
) {   
// 过期超10s:本地忽略,当前时间开始计算下次触发时间
                                    
ringSecond = -
1
;
 
                                    
jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
                                    
jobInfo.setTriggerNextTime(
                                            
new 
CronExpression(jobInfo.getJobCron())
                                                    
.getNextValidTimeAfter(
new 
Date())
                                                    
.getTime()
                                    
);
                                
else 
if 
(jobInfo.getTriggerNextTime() < nowTime) {    
// 过期10s内:立即触发一次,当前时间开始计算下次触发时间
                                    
ringSecond = (
int
)((nowTime/
1000
)%
60
);
 
                                    
jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
                                    
jobInfo.setTriggerNextTime(
                                            
new 
CronExpression(jobInfo.getJobCron())
                                                    
.getNextValidTimeAfter(
new 
Date())
                                                    
.getTime()
                                    
);
                                
else 
{    
// 未过期:正常触发,递增计算下次触发时间
                                    
ringSecond = (
int
)((jobInfo.getTriggerNextTime()/
1000
)%
60
);
 
                                    
jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
                                    
jobInfo.setTriggerNextTime(
                                            
new 
CronExpression(jobInfo.getJobCron())
                                                    
.getNextValidTimeAfter(
new 
Date(jobInfo.getTriggerNextTime()))
                                                    
.getTime()
                                    
);
                                
}
                                
if 
(ringSecond == -
1
) {
                                    
continue
;
                                
}
 
                                
// push async ring
                                
List<Integer> ringItemData = ringData.get(ringSecond);
                                
if 
(ringItemData == 
null
) {
                                    
ringItemData = 
new 
ArrayList<Integer>();
                                    
ringData.put(ringSecond, ringItemData);
                                
}
                                
ringItemData.add(jobInfo.getId());
 
                                
logger.debug(
">>>>>>>>>>> xxl-job, push time-ring : " 
+ ringSecond + 
" = " 
+ Arrays.asList(ringItemData) );
                            
}
 
                            
// 3、更新trigger信息
                            
for 
(XxlJobInfo jobInfo: scheduleList) {
                                
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
                            
}
 
                        
}
 
                        
// tx stop
 
                        
conn.commit();
                    
catch 
(Exception e) {
                        
if 
(!toStop) {
                            
logger.error(
">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}"
, e);
                        
}
                    
finally 
{
                        
if 
(conn != 
null
) {
                            
try 
{
                                
conn.close();
                            
catch 
(SQLException e) {
                            
}
                        
}
                        
if 
(
null 
!= preparedStatement) {
                            
try 
{
                                
preparedStatement.close();
                            
catch 
(SQLException ignore) {
                            
}
                        
}
                    
}
                    
long 
cost = System.currentTimeMillis()-start;
 
                    
// next second, align second
                    
try 
{
                        
if 
(cost < 
1000
) {
                            
TimeUnit.MILLISECONDS.sleep(
1000 
- System.currentTimeMillis()%
1000
);
                        
}
                    
catch 
(InterruptedException e) {
                        
if 
(!toStop) {
                            
logger.error(e.getMessage(), e);
                        
}
                    
}
 
                
}
                
logger.info(
">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop"
);
            
}
        
});
        
scheduleThread.setDaemon(
true
);
        
scheduleThread.setName(
"xxl-job, admin JobScheduleHelper#scheduleThread"
);
        
scheduleThread.start();
 
 
        
// ring thread
        
ringThread = 
new 
Thread(
new 
Runnable() {
            
@Override
            
public 
void 
run() {
 
                
// align second
                
try 
{
                    
TimeUnit.MILLISECONDS.sleep(
1000 
- System.currentTimeMillis()%
1000 
);
                
catch 
(InterruptedException e) {
                    
if 
(!toStop) {
                        
logger.error(e.getMessage(), e);
                    
}
                
}
 
                
int 
lastSecond = -
1
;
                
while 
(!toStop) {
 
                    
try 
{
                        
// second data
                        
List<Integer> ringItemData = 
new 
ArrayList<>();
                        
int 
nowSecond = (
int
)((System.currentTimeMillis()/
1000
)%
60
);   
// 避免处理耗时太长,跨过刻度;
                        
if 
(lastSecond == -
1
) {
                            
lastSecond = (nowSecond+
59
)%
60
;
                        
}
                        
for 
(
int 
i = 
1
; i <=
60
; i++) {
                            
int 
secondItem = (lastSecond+i)%
60
;
 
                            
List<Integer> tmpData = ringData.remove(secondItem);
                            
if 
(tmpData != 
null
) {
                                
ringItemData.addAll(tmpData);
                            
}
 
                            
if 
(secondItem == nowSecond) {
                                
break
;
                            
}
                        
}
                        
lastSecond = nowSecond;
 
                        
// ring trigger
                        
logger.debug(
">>>>>>>>>>> xxl-job, time-ring beat : " 
+ nowSecond + 
" = " 
+ Arrays.asList(ringItemData) );
                        
if 
(ringItemData!=
null 
&& ringItemData.size()>
0
) {
                            
// do trigger
                            
for 
(
int 
jobId: ringItemData) {
                                
// do trigger
                                
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -
1
null
null
);
                            
}
                            
// clear
                            
ringItemData.clear();
                        
}
                    
catch 
(Exception e) {
                        
if 
(!toStop) {
                            
logger.error(
">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}"
, e);
                        
}
                    
}
 
                    
// next second, align second
                    
try 
{
                        
TimeUnit.MILLISECONDS.sleep(
1000 
- System.currentTimeMillis()%
1000
);
                    
catch 
(InterruptedException e) {
                        
if 
(!toStop) {
                            
logger.error(e.getMessage(), e);
                        
}
                    
}
                
}
                
logger.info(
">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop"
);
            
}
        
});
        
ringThread.setDaemon(
true
);
        
ringThread.setName(
"xxl-job, admin JobScheduleHelper#ringThread"
);
        
ringThread.start();
    
}
 
    
public 
void 
toStop(){
        
toStop = 
true
;
 
        
// interrupt and wait
        
scheduleThread.interrupt();
        
try 
{
            
scheduleThread.join();
        
catch 
(InterruptedException e) {
            
logger.error(e.getMessage(), e);
        
}
 
        
// interrupt and wait
        
ringThread.interrupt();
        
try 
{
            
ringThread.join();
        
catch 
(InterruptedException e) {
            
logger.error(e.getMessage(), e);
        
}
    
}
 
}

                       通过代码,我们可以发现调度中心由两个线程完成,第一个线程不停的取最近10s钟待开始的任务,把任务放入时间轮中,第二个线程从时间轮中获取需要开始的任务,开始执行任务。

                      当然任务调度还可以使用DelayQueue()

 

 

        定时任务一直有一个头疼的问题,就是高频调度的执行时间比较长的任务,一般建议指定到单独一台主机上并保证在单机上任务不会并发执行来解决。

 

 

    4、分布式定时任务中依赖任务的解决方案

          1)任务依赖不支持环,只支持DAG;

              如:A->B->(C,D)->E    其中CD并行,其余串行

          2)下游任务只支持上游所有任务都成功并调度时间到了,才执行任务;

                如:

               

                JobA只有在Job1,Job2,Job3都执行完,并且3点时间到了才能执行。

          3)不支持有不同调度周期的任务存在依赖关系

               如:A->B      B的前置任务为A, A的调度周期为每15分钟调度一次, B为每天早上1点调度,该任务不建议分布式调度中心执行。

               不支持原因:

               1)改种情况在具体业务中比较少;

               2)支持改种流程会提升分布式定时任务对负责度同时很难判断前置任务是成功还是失败;

               3)建议把A任务拆分为两个任务,一个为B对前置任务A1,一个为每15分钟执行一次(调度时间过滤掉A1)的任务

       

       实现:

       

 

          在任务回调成功之后,查询任务到依赖任务,开始执行。

         这里面有几个问题需要解决:

         1、任务重复执行:

         

           如上面任务,JobA依赖Job1,Job2,Job3执行,同时JobA3点也会调度执行,在3点左右时,Job3执行完后会执行JobA,同时cron调度也会执行JobA,在这种情况怎么保证JobA只被执行一次。

                 解决办法:在JobA执行前需要把JobA的状态修改为正在执行中,此时,通过update  where jobId = #{jobId} and status=#{未开始执行} 方法执行更新,如果更新记录为1的,任务可以进行执行,如果更新记录为0,抛弃该任务的执行。

           2、怎么判断任务该不该执行

                

                 条件一:1点钟Job1执行完了,开始找后置任务JobA,JobA是否该执行?怎么判断?

                                JobA不该执行,前置任务Job2,Job3 都没开始执行,Job1不能执行;

                 条件二:3点钟Job3执行完了,开始找后置任务JobA,JobA是否该执行?怎么判断?

                                JobA不该执行,前置任务Job1,Job2,Job3 都执行完了,但是Cron时间还没到,Job1不能执行;

                 条件三:3点15分调度器开始调度,JobA是否该执行,怎么判断?

                               JobA该执行,前置任务Job1,Job2,Job3 都执行完了,Cron时间也到了;   

                 判断任务是否执行的逻辑: 如果JobA执行时,需要判断Job1,Job2,Job3是否执行,下面拿Job1为例

                 假设Job1的历史任务都是正常执行成功的。

                 情况1:  2019-06-26 00:30:00(today)时,Job1的上一次执行成功时间为2019-06-25:01:00:00 (lastDay),下一次执行时间为:2019-06-26 01:00:00(nextDay).

   

                情况2:  2019-06-26 01:30:00时,Job1的上一次执行成功时间为2019-06-26:01:00:00,下一次执行时间为:2019-06-27 01:00:00.

       

                 

             

             3、任务失败了,怎么办?

                    任务失败应该同时执行带依赖执行和不带依赖执行,由页面配置控制。

             

              4、任务失败了,页面配置执行任务时,是否可传参数,参数怎么在任务间传递?

                    页面配置传参数时,参数需要传递给依赖任务。

              5、查看任务执行状态时,是否可以查看依赖到表执行情况? 

转载于:https://www.cnblogs.com/smileIce/p/11156412.html

你可能感兴趣的文章
java中的值传递和引用传递2<原文:http://blog.csdn.net/niuniu20008/article/details/2953785>...
查看>>
css实现背景图片模糊
查看>>
什么是runtime?什么是webgl?
查看>>
秋季学习总结
查看>>
categorical_crossentropy VS. sparse_categorical_crossentropy
查看>>
强引用,弱引用,4种Java引用浅解(涉及jvm垃圾回收)
查看>>
多线程如何确定线程数
查看>>
UGUI RectTransform
查看>>
学前班
查看>>
手把手教您扩展虚拟内存
查看>>
android-samples-mvp
查看>>
oracle 11g r2安装
查看>>
关于自关联1
查看>>
存储控制器、MMU、flash控制器介绍
查看>>
hdu-1814(2-sat)
查看>>
自我反省
查看>>
反射,得到Type引用的三种方式
查看>>
Objective-C数据类型之id,SEL,BOOL,nil,NULL和NSNull
查看>>
js获取网页屏幕可见区域高度
查看>>
Vector
查看>>