package com.xxl.job.core.thread;
|
|
import com.xxl.job.core.biz.model.HandleCallbackParam;
|
import com.xxl.job.core.biz.model.ReturnT;
|
import com.xxl.job.core.biz.model.TriggerParam;
|
import com.xxl.job.core.context.XxlJobContext;
|
import com.xxl.job.core.context.XxlJobHelper;
|
import com.xxl.job.core.executor.XxlJobExecutor;
|
import com.xxl.job.core.handler.IJobHandler;
|
import com.xxl.job.core.log.XxlJobFileAppender;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
|
import java.io.PrintWriter;
|
import java.io.StringWriter;
|
import java.util.Collections;
|
import java.util.Date;
|
import java.util.HashSet;
|
import java.util.Set;
|
import java.util.concurrent.*;
|
|
|
/**
|
* handler thread
|
* @author xuxueli 2016-1-16 19:52:47
|
*/
|
public class JobThread extends Thread{
|
private static Logger logger = LoggerFactory.getLogger(JobThread.class);
|
|
private int jobId;
|
private IJobHandler handler;
|
private LinkedBlockingQueue<TriggerParam> triggerQueue;
|
private Set<Long> triggerLogIdSet; // avoid repeat trigger for the same TRIGGER_LOG_ID
|
|
private volatile boolean toStop = false;
|
private String stopReason;
|
|
private boolean running = false; // if running job
|
private int idleTimes = 0; // idel times
|
|
|
public JobThread(int jobId, IJobHandler handler) {
|
this.jobId = jobId;
|
this.handler = handler;
|
this.triggerQueue = new LinkedBlockingQueue<TriggerParam>();
|
this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<Long>());
|
|
// assign job thread name
|
this.setName("xxl-job, JobThread-"+jobId+"-"+System.currentTimeMillis());
|
}
|
public IJobHandler getHandler() {
|
return handler;
|
}
|
|
/**
|
* new trigger to queue
|
*
|
* @param triggerParam
|
* @return
|
*/
|
public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
|
// avoid repeat
|
if (triggerLogIdSet.contains(triggerParam.getLogId())) {
|
logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
|
return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
|
}
|
|
triggerLogIdSet.add(triggerParam.getLogId());
|
triggerQueue.add(triggerParam);
|
return ReturnT.SUCCESS;
|
}
|
|
/**
|
* kill job thread
|
*
|
* @param stopReason
|
*/
|
public void toStop(String stopReason) {
|
/**
|
* Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep),
|
* 在阻塞出抛出InterruptedException异常,但是并不会终止运行的线程本身;
|
* 所以需要注意,此处彻底销毁本线程,需要通过共享变量方式;
|
*/
|
this.toStop = true;
|
this.stopReason = stopReason;
|
}
|
|
/**
|
* is running job
|
* @return
|
*/
|
public boolean isRunningOrHasQueue() {
|
return running || triggerQueue.size()>0;
|
}
|
|
@Override
|
public void run() {
|
|
// init
|
try {
|
handler.init();
|
} catch (Throwable e) {
|
logger.error(e.getMessage(), e);
|
}
|
|
// execute
|
while(!toStop){
|
running = false;
|
idleTimes++;
|
|
TriggerParam triggerParam = null;
|
try {
|
// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
|
triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
|
if (triggerParam!=null) {
|
running = true;
|
idleTimes = 0;
|
triggerLogIdSet.remove(triggerParam.getLogId());
|
|
// log filename, like "logPath/yyyy-MM-dd/9999.log"
|
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
|
XxlJobContext xxlJobContext = new XxlJobContext(
|
triggerParam.getJobId(),
|
triggerParam.getExecutorParams(),
|
logFileName,
|
triggerParam.getBroadcastIndex(),
|
triggerParam.getBroadcastTotal());
|
|
// init job context
|
XxlJobContext.setXxlJobContext(xxlJobContext);
|
|
// execute
|
XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());
|
|
if (triggerParam.getExecutorTimeout() > 0) {
|
// limit timeout
|
Thread futureThread = null;
|
try {
|
FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
|
@Override
|
public Boolean call() throws Exception {
|
|
// init job context
|
XxlJobContext.setXxlJobContext(xxlJobContext);
|
|
handler.execute();
|
return true;
|
}
|
});
|
futureThread = new Thread(futureTask);
|
futureThread.start();
|
|
Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
|
} catch (TimeoutException e) {
|
|
XxlJobHelper.log("<br>----------- xxl-job job execute timeout");
|
XxlJobHelper.log(e);
|
|
// handle result
|
XxlJobHelper.handleTimeout("job execute timeout ");
|
} finally {
|
futureThread.interrupt();
|
}
|
} else {
|
// just execute
|
handler.execute();
|
}
|
|
// valid execute handle data
|
if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {
|
XxlJobHelper.handleFail("job handle result lost.");
|
} else {
|
String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();
|
tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)
|
?tempHandleMsg.substring(0, 50000).concat("...")
|
:tempHandleMsg;
|
XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);
|
}
|
XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="
|
+ XxlJobContext.getXxlJobContext().getHandleCode()
|
+ ", handleMsg = "
|
+ XxlJobContext.getXxlJobContext().getHandleMsg()
|
);
|
|
} else {
|
if (idleTimes > 30) {
|
if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost
|
XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
|
}
|
}
|
}
|
} catch (Throwable e) {
|
if (toStop) {
|
XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
|
}
|
|
// handle result
|
StringWriter stringWriter = new StringWriter();
|
e.printStackTrace(new PrintWriter(stringWriter));
|
String errorMsg = stringWriter.toString();
|
|
XxlJobHelper.handleFail(errorMsg);
|
|
XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
|
} finally {
|
if(triggerParam != null) {
|
// callback handler info
|
if (!toStop) {
|
// commonm
|
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
|
triggerParam.getLogId(),
|
triggerParam.getLogDateTime(),
|
XxlJobContext.getXxlJobContext().getHandleCode(),
|
XxlJobContext.getXxlJobContext().getHandleMsg() )
|
);
|
} else {
|
// is killed
|
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
|
triggerParam.getLogId(),
|
triggerParam.getLogDateTime(),
|
XxlJobContext.HANDLE_CODE_FAIL,
|
stopReason + " [job running, killed]" )
|
);
|
}
|
}
|
}
|
}
|
|
// callback trigger request in queue
|
while(triggerQueue !=null && triggerQueue.size()>0){
|
TriggerParam triggerParam = triggerQueue.poll();
|
if (triggerParam!=null) {
|
// is killed
|
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
|
triggerParam.getLogId(),
|
triggerParam.getLogDateTime(),
|
XxlJobContext.HANDLE_CODE_FAIL,
|
stopReason + " [job not executed, in the job queue, killed.]")
|
);
|
}
|
}
|
|
// destroy
|
try {
|
handler.destroy();
|
} catch (Throwable e) {
|
logger.error(e.getMessage(), e);
|
}
|
|
logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
|
}
|
}
|