需求
客户想在系统A的框架体系中,直接调用查看xxlJob的原有功能,以及新增部分功能调用优化,本篇文章即记录此次改造中部分思路以及具体实现方法。
目录
远程调用
想在原系统中远程调用远程方法,常用有http RestFul API接口的方式(openfeign和ribbon)、RPC(dubbo)、消息队列(rocketmq,rabbitmq)
因为这边原系统就是springboot,故使用Feign的方式较为方便。
引入包
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
<version>3.1.2</version>
</dependency>
原xxljob中的接口是标准的RestFul接口,所以远程调用的时候只需要将接口参数封装成interface接口api即可调用。
@FeignClient(name = "jobInfo",
url = "${userConfig.uri}",
path = "/jobinfo")
public interface JobInfoApi {
@RequestMapping(value = "/add", method = RequestMethod.POST,consumes = {"application/x-www-form-urlencoded"})
ReturnT<String> add(@RequestHeader("ServerInfo") String token, @RequestBody XxlJobInfo jobInfo);
@RequestMapping(value = "/update", method = RequestMethod.POST,consumes = {"application/x-www-form-urlencoded"})
ReturnT<String> update(@RequestHeader("ServerInfo") String token, @RequestBody XxlJobInfo jobInfo);
@RequestMapping(value = "/remove", method = RequestMethod.POST)
ReturnT<String> remove(@RequestHeader("ServerInfo") String token, @RequestParam(name = "id") Integer id);
@RequestMapping(value = "/stop", method = RequestMethod.POST)
ReturnT<String> pause(@RequestHeader("ServerInfo") String token, @RequestParam(name = "id") Integer id);
@RequestMapping(value = "/start", method = RequestMethod.POST)
ReturnT<String> start(@RequestHeader("ServerInfo") String token, @RequestParam(name = "id") Integer id);
@RequestMapping(value = "/trigger", method = RequestMethod.POST)
ReturnT<String> triggerJob(@RequestHeader("ServerInfo") String token, @RequestParam(name = "id") Integer id, @RequestParam(name = "executorParam") String executorParam,
@RequestParam(name = "addressList") String addressList,@RequestParam(name = "ignoreSubTask") String ignoreSubTask,
@RequestParam(name = "taskLogId") Integer taskLogId);
@RequestMapping(value = "/nextTriggerTime", method = RequestMethod.POST)
ReturnT<List<String>> nextTriggerTime(@RequestHeader("ServerInfo") String token, @RequestParam(name = "scheduleType") String scheduleType, @RequestParam(name = "scheduleConf") String scheduleConf);
@RequestMapping(value = "/getOneById", method = RequestMethod.POST)
ReturnT<XxlJobInfo> getOneById(@RequestHeader("ServerInfo") String token, @RequestParam(name = "id") Integer id);
}
由于是内网调用,所以远程调用原本需要特别注意的一些加密或者认证此处也可以略微简化,我这边是通过在所有写的调用接口中额外附带了一个参数Header,然后再在xxl-job-admin调度平台中的PermissionInterceptor拦截器中修改,如果Header一致,就直接允许调用。
@Value(value = "${userConfig.auth-token:PORTAL}")
private String authToken;
public class PermissionInterceptor extends HandlerInterceptorAdapter {
@Resource
private LoginService loginService;
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
if (!(handler instanceof HandlerMethod)) {
return super.preHandle(request, response, handler);
}
// if need login
boolean needLogin = true;
boolean needAdminuser = false;
HandlerMethod method = (HandlerMethod) handler;
PermissionLimit permission = method.getMethodAnnotation(PermissionLimit.class);
if (permission != null) {
needLogin = permission.limit();
needAdminuser = permission.adminuser();
}
//来自portal的请求临时跳过登录校验
String serverInfo = request.getHeader("ServerInfo");
if (needLogin && !"PORTAL".equals(serverInfo)) {
XxlJobUser loginUser = loginService.ifLogin(request, response);
if (loginUser == null) {
response.setStatus(302);
response.setHeader("location", request.getContextPath() + "/toLogin");
return false;
}
if (needAdminuser && loginUser.getRole() != 1) {
throw new RuntimeException(I18nUtil.getString("system_permission_limit"));
}
request.setAttribute(LoginService.LOGIN_IDENTITY_KEY, loginUser);
}
return super.preHandle(request, response, handler);
}
}
子任务控制
需求中除了远程调用接口整合以后,还需要额外增加功能控制,或者额外补充相关日志等内容。
其中一个需求场景:原系统中一旦配置了子任务,每次执行任务时默认都会将子任务一并启动,客户想要在不修改配置的前提下每次控制是否启动。
此处我的实现方式就是前端传参中额外附带一个参数[ignoreSubTask],默认不传,如果带上后为1的话就不执行后续子任务,XxlJob中任务执行的执行链如下
- JobInfoController ->
- JobTriggerPoolHelper.trigger(*,*,*,*,*,ignoreSubTask) ->
- JobTriggerPoolHelper.addTrigger(*,*,*,*,*,ignoreSubTask) ->
- XxlJobTrigger.trigger(*,*,*,*,*,ignoreSubTask) ->
- XxlJobTrigger.processTrigger(*,*,*,*,*,ignoreSubTask)
执行到第五步其实单次的执行链就结束了,我前面的流程目标其实就是为了将这个ignoreSubTask参数,存入xxljob中执行前保存的的初始化XxlJobLog中,这个日志会记录相关执行状态,以及用于执行器调用后回调函数的状态更新,而其中实现子任务执行的执行链在执行器executor执行完对应任务后就会发送一个回调请求触发调度平台的后续动作。
- JobApiController.api() -> adminBiz.callback(callbackParamList) ->
- AdminBizImpl.callback() ->
- JobCompleteHelper.getInstance().callback(callbackParamList); ->
- XxlJobCompleter.updateHandleInfoAndFinish(log); ->
- XxlJobCompleter->finishJob(xxlJobLog);
private static void finishJob(XxlJobLog xxlJobLog) {
// 1、handle success, to trigger child job
String triggerChildMsg = null;
if (XxlJobContext.HANDLE_COCE_SUCCESS == xxlJobLog.getHandleCode()) {
XxlJobInfo xxlJobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(xxlJobLog.getJobId());
String ignoreSubTask = xxlJobLog.getIgnoreSubTask();
if (xxlJobInfo != null && xxlJobInfo.getChildJobId() != null && xxlJobInfo.getChildJobId().trim().length() > 0 &&
!TriggerTypeEnum.IGNORE_FLAG.getTitle().equals(ignoreSubTask)) {
......
......
......
}
需求中控制子任务的相关逻辑就是在上面的if判断中进行,是否执行子任务。为啥要把这个参数放到xxljoblog中?就是偷懒吧,不想单独额外一个流程传参,尽量少的改动原来的流程,仅此而已。
日志增强
另外一个场景,因为客户需求额外需要存储记录一套日志,用来更新专门总览表以方便查看,所以每次在调用前,以及调用的回调中,除了xxljoblog自带的日志更新外,额外记录了一套自己的日志,用来实现特定的需求,可以添加日志的地方在上面的执行链列表已经列出来了,这里不重复列出,此处补充一个日志更新场景,即执行器注册丢失的时候且任务状态为进行中,超过十分钟,这里我们就沿用xxljob这个判断逻辑
xxljob作者的的实现方式是在JobCompleteHelper中开启一个线程,轮询检查是否有任务执行时间超过指定时间,并且对应的执行器心跳丢失,就将这些任务执行结果置为失败
monitorThread = new Thread(new Runnable() {
@Override
public void run() {
// wait for JobTriggerPoolHelper-init
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
// monitor
while (!toStop) {
try {
// 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
Date losedTime = DateUtil.addMinutes(new Date(), -10);
List<Long> losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);
if (losedJobIds!=null && losedJobIds.size()>0) {
for (Long logId: losedJobIds) {
XxlJobLog jobLog = new XxlJobLog();
jobLog.setId(logId);
jobLog.setHandleTime(new Date());
jobLog.setHandleCode(ReturnT.FAIL_CODE);
jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );
XxlJobCompleter.updateHandleInfoAndFinish(jobLog);
...............
...............
}
}
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
}
}
try {
TimeUnit.SECONDS.sleep(60);
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop");
}
});
末
本篇其实没有讲太多xxljob的底层源码,其实只是梳理记录了下自己这次改造中,对于xxljob实现功能的改造思路,肤浅且碎碎念,如果能帮到有同样需求的人就更好了。其他具体的比如xxljob的时间轮或者自动注册等相关设计,可以看看网上其他博主的文章,也可以直接去源码里研究。