XxlJob执行器源码流程
2025-09-10 09:53:32 0 举报
XxlJob执行器源码流程
作者其他创作
大纲/内容
accessToken != null && accessToken.trim().length() > 0 && !accessToken.equals(accessTokenReq)
embedServer = new EmbedServer()
newJobThread.start()
api/registryRemove
遍历 annotatedMethods并注册到执行其中
adminBiz.callback(callbackParamList)
extends
启动执行器运行日志清理线程
handler.execute()
@XxlJob init 属性执行业务方法前的初始化方法
如果配置了执行器端口则用配置的端口否则默认 9999,默认端口冲突会 +1
while (!toStop) { // 执行器server 没有停止则不断的向调度中心发送心跳}
防止重复注册
start servernetty
/log
查看执行日志
版本 2.4.0
super.start()
是继续遍历
ThreadLocal
@Componentpublic class SampleXxlJob { private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class); @XxlJob(\"demoJobHandler\") public void demoJobHandler() throws Exception { XxlJobHelper.log(\
终止任务
destroyMethod = clazz.getDeclaredMethod(xxlJob.destroy())
initMethod = clazz.getDeclaredMethod(xxlJob.init())
遍历 beanNames
触发任务执行完成回调调度中心 taskwhile(!toStop)
executorBiz.kill(killParam)
创建执行器server
process
initJobHandlerMethodRepository(applicationContext)
jobThread.pushTriggerQueue(triggerParam)
执行器 server 启动后就去开启自动注册到调度中心的线程
所有单例 bean 初始化完成后执行
启动执行器 server
GlueFactory.refreshInstance(1)
triggerCallbackThread#run
线程执行 task
忙碌检测
api/registry
/idleBeat
如果执行器 server 停止了则跳出 while 循环执行 @XxlJob 的destroy 方法
IJobHandler
/kill
executorBiz.run(triggerParam)
loadJobHandler(name) != null
@XxlJob 注解的JobHandler Method注册到执行器中
所有bean 的beanNames
入口
启动执行器server
EmbedServer
触发任务添加到触发队列中
MethodJobHandler
初始化执行器运行日志路径和glue souce 路径
/beat
bootstrap.group(...).channel(...).childHandler(...).childOption(...)
adminBiz.registryRemove(registryParam)
XxlJobContext.setXxlJobContext(xxlJobContext)
name:@XxlJob 的 value 属性值@XxlJob 注解的方法封装到 MethodJobHandler 然后再注册到执行器中
添加到回调队列
logRetentionDays < 3 时自动清理关闭
ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp()
api/callback
否获取这个 bean
校验通信 token 是否正确
com.xxl.job.core.thread.JobThread#run
init 和 destroy 方法都在 bean中定义的(即示例的SampleXxlJob)
XxlJobSpringExecutor
address = \"http://{ip_port}/\".replace(\"{ip_port}\
/run
triggerQueue.add(triggerParam)
ServerBootstrap bootstrap = new ServerBootstrap()
每个触发任务都会单独创建一个运行日志文件
@XxlJob 定义的初始化方法
adminBiz.registry(registryParam)
executorBiz.log(logParam)
getInstance().callBackQueue.add(callback)
com.xxl.job.core.thread.TriggerCallbackThread#doCallback
是否是懒加载 bean
演示下第一次触发的场景此场景 jobThread 和 jobHandler 应该都是空的BEAN 模式
TriggerCallbackThread.getInstance().start()
port = port>0?port: NetUtil.findAvailablePort(9999)
executorBiz.beat()
com.xxl.job.core.biz.client.AdminBizClient#registry
method.invoke(target)
TriggerCallbackThread.pushCallBack(...)
XxlJobExecutor
XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler())
com.xxl.job.core.biz.client.AdminBizClient#callback
triggerCallbackThread.start()
handler.init()
执行调度中心发过来的请求
初始化与调度中心交互的客户端设置调度中心地址和token
executorBiz.idleBeat(idleBeatParam)
启动执行器执行任务完成回调调度中心的线程
心跳检测
根据不同的请求执行不同的方法
触发任务
如果执行器 server 停止了那么要去调度中心移除注册
XxlJobFileAppender.initLogPath(logPath)
注册 JobThread 到执行器
start
@XxlJob destroy 属性
bean = applicationContext.getBean(beanDefinitionName)
com.xxl.job.core.executor.impl.XxlJobSpringExecutor#afterSingletonsInstantiated
bootstrap.bind(port).sync()
while(!toStop)队列中取出触发任务
com.xxl.job.core.biz.client.AdminBizClient#registryRemove
JobLogFileCleanThread.getInstance().start(logRetentionDays)
执行定时任务的业务方法
doCallback(callbackParamList)
HandleCallbackParam callback = getInstance().callBackQueue.take()
调度中心 restful api
com.xxl.job.core.server.EmbedServer.EmbedHttpServerHandler#channelRead0
如果配置文件中没有配置执行器 address 则去生成
finally
com.xxl.job.core.thread.ExecutorRegistryThread#start
handler.destroy()
如果配置了执行器 IP 则用配置的否则用默认本机的IP
0 条评论
下一页