前言
xxl-job源码阅读第一章:定时任务执行链路分析
一时兴起,花一下午看了一遍这部分源码,写的不错。
PS:写完才发现有官方文档讲解…尴尬😅
使用方式
部署xxl-job-admin
官网文档秒了。
地址:https://www.xuxueli.com/xxl-job/#%E4%BA%8C%E3%80%81%E5%BF%AB%E9%80%9F%E5%85%A5%E9%97%A8
装载配置类
1 |
|
实现任务接口
注意两点:
@JobHandler
注解用于唯一识别任务- 实现了
IJobHandler
接口才会被加载到任务列表中
1 |
|
Xxl-job Server端源码分析
版本:xxl-job-core1.9.1
统一入口 — 执行器
执行器即XxlJobExecutor
,被注册为上下文,主要是为了能用ApplicationContext
获取各种信息,例如Bean信息等等。1
2
3
4
5
6
7
8
9
10
11
12
13
14public class XxlJobExecutor implements ApplicationContextAware{
...
private static ApplicationContext applicationContext;
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
...
}
加载任务处理器
把刚才自己定义的定时任务处理器和其他所有加入了@JobHandler
注解的类加载进来。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27private static ConcurrentHashMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
return jobHandlerRepository.put(name, jobHandler);
}
private static void initJobHandlerRepository(ApplicationContext applicationContext){
if (applicationContext == null) {
return;
}
// init job handler action
Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class);
if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
for (Object serviceBean : serviceBeanMap.values()) {
if (serviceBean instanceof IJobHandler){
String name = serviceBean.getClass().getAnnotation(JobHandler.class).value();
IJobHandler handler = (IJobHandler) serviceBean;
if (loadJobHandler(name) != null) {
throw new RuntimeException("xxl-job jobhandler naming conflicts.");
}
registJobHandler(name, handler);
}
}
}
}
初始化客户端(调度中心)
xxl-job是C/S架构的,客户端是RPC服务的调用者,即xxl-job-admin
,带有一个前端。这里可以有多个调度中心,是因为支持集群部署。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21private static List<AdminBiz> adminBizList;
private static void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
if (adminAddresses!=null && adminAddresses.trim().length()>0) {
for (String address: adminAddresses.trim().split(",")) {
if (address!=null && address.trim().length()>0) {
// http://ip:port/xxl-job-admin-1.8.2 + /api
String addressUrl = address.concat(AdminBiz.MAPPING);
AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, addressUrl, accessToken).getObject();
if (adminBizList == null) {
adminBizList = new ArrayList<AdminBiz>();
}
adminBizList.add(adminBiz);
}
}
}
}
public static List<AdminBiz> getAdminBizList(){
return adminBizList;
}
初始化RPC服务器工厂
1 | private NetComServerFactory serverFactory = new NetComServerFactory(); |
任务执行线程池
并没有用线程池,而是一个并发哈希表。这里有个疑问,如果并发线程量过大是不是有安全问题?
1 | private static ConcurrentHashMap<Integer, JobThread> JobThreadRepository = new ConcurrentHashMap<Integer, JobThread>(); |
Web服务器
该服务器用于获取请求、执行任务,是核心的执行逻辑。
主体是封装了一个jetty服务器(servlet容器,HTTP协议),利用反射实现服务调用。
相关类如下:
- NetComServerFactory:RPC服务器工厂类,同时负责维护RPC服务。
- JettyServer:核心类,真正运行的服务器。
- JettyServerHandler:jetty的handler,负责执行任务。
- ExecutorBiz:包含RPC方法的接口,被工厂类加载并执行请求调用的RPC方法。
- ExecutorBizImpl:真正提供服务的类,实现了ExecutorBiz接口。
- JobThread:真正执行任务(完成Job)的线程,维护一个阻塞队列用于执行任务。
- ExecutorRegistryThread:用于将服务器注册到客户端上,或取消注册。
- TriggerCallbackThread:维护一个阻塞队列,用于对任务执行结果实施callback。
NetComServerFactory
入口中初始化的就是这个Server工厂,工厂里面内容很简单,简化后如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39// 服务器,核心类
JettyServer server = new JettyServer(); //有start(), destroy()两个方法
// RPC服务存储,通过反射加载进来
private static Map<String, Object> serviceMap = new HashMap<String, Object>();
// 认证token,携带正确token的请求才能被执行
private static String accessToken;
// 初始化中放入ExecutorBiz的方法,Key是接口名
public static void putService(Class<?> iface, Object serviceBean){
serviceMap.put(iface.getName(), serviceBean);
}
// 初始化中设置token的方法
public static void setAccessToken(String accessToken) {
NetComServerFactory.accessToken = accessToken;
}
// 真正实现RPC服务调用的静态方法
public static RpcResponse invokeService(RpcRequest request, Object serviceBean) {
...
try {
Class<?> serviceClass = serviceBean.getClass();
String methodName = request.getMethodName();
Class<?>[] parameterTypes = request.getParameterTypes();
Object[] parameters = request.getParameters();
FastClass serviceFastClass = FastClass.create(serviceClass);
FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
Object result = serviceFastMethod.invoke(serviceBean, parameters);
response.setResult(result);
} catch (Throwable t) {
t.printStackTrace();
response.setError(t.getMessage());
}
return response;
}
JettyServer
JettyServer
包含服务器(非阻塞线程池)、注册线程、回调线程三部分。
简化代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50public class JettyServer {
private Server server;
private Thread thread;
public void start(final int port, final String ip, final String appName) throws Exception {
thread = new Thread(new Runnable() {
public void run() {
// The Server
server = new Server(new ExecutorThreadPool()); // 非阻塞
// HTTP connector
ServerConnector connector = new ServerConnector(server);
connector.setHost(ip);
connector.setPort(port);
server.setConnectors(new Connector[]{connector});
// Set a handler
HandlerCollection handlerc =new HandlerCollection();
handlerc.setHandlers(new Handler[]{new JettyServerHandler()});
server.setHandler(handlerc);
// Start server
server.start();
// Start Registry-Server
ExecutorRegistryThread.getInstance().start(port, ip, appName);
// Start Callback-Server
TriggerCallbackThread.getInstance().start();
server.join(); // block until thread stopped
}
});
thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
thread.start();
}
public void destroy() {
// destroy Registry-Server
ExecutorRegistryThread.getInstance().toStop();
// destroy Callback-Server
TriggerCallbackThread.getInstance().toStop();
// 关闭服务器
server.stop();
server.destroy();
// 关闭总线程
if (thread.isAlive()) {
thread.interrupt();
}
}
}
值得注意的是这里的ExecutorThreadPool
是jetty自己封装的ThreadPoolExecutor
,最大线程池大小为256, 线程timeout为1分钟,并使用了Unbounded LinkedBlockingQueue
作为任务队列。
1 | public ExecutorThreadPool() |
JettyServerHandler
这个类就是用来衔接jetty接到的请求和实际处理方法NetComServerFactory#invokeService
的,对入参出参和异常进行一些处理。
简化代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40public class JettyServerHandler extends AbstractHandler {
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
// invoke
RpcResponse rpcResponse = doInvoke(request);
// serialize response
byte[] responseBytes = HessianSerializer.serialize(rpcResponse);
response.setContentType("text/html;charset=utf-8");
response.setStatus(HttpServletResponse.SC_OK);
baseRequest.setHandled(true);
OutputStream out = response.getOutputStream();
out.write(responseBytes);
out.flush();
}
private RpcResponse doInvoke(HttpServletRequest request) {
try {
// deserialize request
byte[] requestBytes = HttpClientUtil.readBytes(request);
if (requestBytes == null || requestBytes.length==0) {
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setError("RpcRequest byte[] is null");
return rpcResponse;
}
RpcRequest rpcRequest = (RpcRequest) HessianSerializer.deserialize(requestBytes, RpcRequest.class);
// invoke
RpcResponse rpcResponse = NetComServerFactory.invokeService(rpcRequest, null);
return rpcResponse;
} catch (Exception e) {
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setError("Server-error:" + e.getMessage());
return rpcResponse;
}
}
}
ExecutorBizImpl
真正执行任务的类,实现了ExecutorBiz
接口。
ExecutorBiz
接口定义如下:1
2
3
4
5
6
7
8
9
10
11
12public interface ExecutorBiz {
// 心跳,用于确认服务器状态
public ReturnT<String> beat();
// 判断任务是否在执行
public ReturnT<String> idleBeat(int jobId);
// 终止任务
public ReturnT<String> kill(int jobId);
// 读日志,返回日志内容
public ReturnT<LogResult> log(long logDateTim, int logId, int fromLineNum);
// 根据参数执行任务,TriggerParam里携带jobId
public ReturnT<String> run(TriggerParam triggerParam);
}
ExecutorBizImpl
通过上下文XxlJobExecutor
获取资源(例如JobThread
)实现各个功能,这里我们只关注run
方法的实现。
1 |
|
JobThread
这是最终执行任务的线程,通过前面的分析可以看到,一项任务被最终交付给了TriggerQueue,并等待完成执行回调实现异步执行。
那么什么是TriggerQueue?它是一个存放Trigger参数的LinkedBlockingQueue
阻塞队列,用于存放待执行的任务。
JobThread
会按指定的时间间隔轮询该队列,尝试取出任务执行。任务执行结束后,将结果交给Callback线程执行回调。
还有一些其他细节,例如唯一日志ID的维护(通过Set),终止任务的实现(多种情况),可以参考代码分析。
代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155public class JobThread extends Thread{
private static Logger logger = LoggerFactory.getLogger(JobThread.class);
private int jobId;
private IJobHandler handler;
private LinkedBlockingQueue<TriggerParam> triggerQueue;
private ConcurrentHashSet<Integer> triggerLogIdSet; // avoid repeat trigger for the same TRIGGER_LOG_ID
private boolean toStop = false;
private String stopReason;
private boolean running = false; // if running job
private int idleTimes = 0; // idel times
public JobThread(int jobId, IJobHandler handler) {
this.jobId = jobId;
this.handler = handler;
this.triggerQueue = new LinkedBlockingQueue<TriggerParam>();
this.triggerLogIdSet = new ConcurrentHashSet<Integer>();
}
public IJobHandler getHandler() {
return handler;
}
/**
* new trigger to queue
*
* @param triggerParam
* @return
*/
public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
// avoid repeat
if (triggerLogIdSet.contains(triggerParam.getLogId())) {
logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
}
triggerLogIdSet.add(triggerParam.getLogId());
triggerQueue.add(triggerParam);
return ReturnT.SUCCESS;
}
/**
* kill job thread
*
* @param stopReason
*/
public void toStop(String stopReason) {
/**
* Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep),
* 在阻塞出抛出InterruptedException异常,但是并不会终止运行的线程本身;
* 所以需要注意,此处彻底销毁本线程,需要通过共享变量方式;
*/
this.toStop = true;
this.stopReason = stopReason;
}
/**
* is running job
* @return
*/
public boolean isRunningOrHasQueue() {
return running || triggerQueue.size()>0;
}
public void run() {
// init
try {
handler.init();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
// execute
while(!toStop){
running = false;
idleTimes++;
TriggerParam triggerParam = null;
ReturnT<String> executeResult = null;
try {
// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
if (triggerParam!=null) {
running = true;
idleTimes = 0;
triggerLogIdSet.remove(triggerParam.getLogId());
// log filename, like "logPath/yyyy-MM-dd/9999.log"
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId());
XxlJobFileAppender.contextHolder.set(logFileName);
ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));
// execute
XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams());
executeResult = handler.execute(triggerParam.getExecutorParams());
if (executeResult == null) {
executeResult = IJobHandler.FAIL;
}
XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult);
} else {
if (idleTimes > 30) {
XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
}
}
} catch (Throwable e) {
if (toStop) {
XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
}
StringWriter stringWriter = new StringWriter();
e.printStackTrace(new PrintWriter(stringWriter));
String errorMsg = stringWriter.toString();
executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg);
XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
} finally {
if(triggerParam != null) {
// callback handler info
if (!toStop) {
// commonm
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), executeResult));
} else {
// is killed
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [业务运行中,被强制终止]");
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), stopResult));
}
}
}
}
// callback trigger request in queue
while(triggerQueue !=null && triggerQueue.size()>0){
TriggerParam triggerParam = triggerQueue.poll();
if (triggerParam!=null) {
// is killed
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [任务尚未执行,在调度队列中被终止]");
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), stopResult));
}
}
// destroy
try {
handler.destroy();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
}
}
TriggerCallbackThread
回调线程,用于处理执行完的任务结果,通知调度中心执行完毕。
同样的,使用一个阻塞队列存放待回调的执行结果。
代码简化后如下:
1 | public class TriggerCallbackThread { |
后记
总结一下,一个定时任务(run请求)从调度中心出发到执行完毕的链路为:JettyServer
->
JettyServerHandler
->
NetComServerFactory#invokeService
->
ExecutorBizImpl#run
->
JobThread
->
TriggerCallbackThread
->
AdminBiz#callback
首发于 silencezheng.top,转载请注明出处。