From efdb99f8cecc4afb592afad79c761081d5d5cf22 Mon Sep 17 00:00:00 2001 From: lee <4766465@qq.com> Date: Wed, 18 Dec 2024 13:27:00 +0800 Subject: [PATCH] init --- xxl-job/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java | 252 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 252 insertions(+), 0 deletions(-) diff --git a/xxl-job/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java b/xxl-job/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java new file mode 100644 index 0000000..cf07a55 --- /dev/null +++ b/xxl-job/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java @@ -0,0 +1,252 @@ +package com.xxl.job.core.thread; + +import com.xxl.job.core.biz.model.HandleCallbackParam; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.biz.model.TriggerParam; +import com.xxl.job.core.context.XxlJobContext; +import com.xxl.job.core.context.XxlJobHelper; +import com.xxl.job.core.executor.XxlJobExecutor; +import com.xxl.job.core.handler.IJobHandler; +import com.xxl.job.core.log.XxlJobFileAppender; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.*; + + +/** + * handler thread + * @author xuxueli 2016-1-16 19:52:47 + */ +public class JobThread extends Thread{ + private static Logger logger = LoggerFactory.getLogger(JobThread.class); + + private int jobId; + private IJobHandler handler; + private LinkedBlockingQueue<TriggerParam> triggerQueue; + private Set<Long> triggerLogIdSet; // avoid repeat trigger for the same TRIGGER_LOG_ID + + private volatile boolean toStop = false; + private String stopReason; + + private boolean running = false; // if running job + private int idleTimes = 0; // idel times + + + public JobThread(int jobId, IJobHandler handler) { + this.jobId = jobId; + this.handler = handler; + this.triggerQueue = new LinkedBlockingQueue<TriggerParam>(); + this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<Long>()); + + // assign job thread name + this.setName("xxl-job, JobThread-"+jobId+"-"+System.currentTimeMillis()); + } + public IJobHandler getHandler() { + return handler; + } + + /** + * new trigger to queue + * + * @param triggerParam + * @return + */ + public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) { + // avoid repeat + if (triggerLogIdSet.contains(triggerParam.getLogId())) { + logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId()); + return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId()); + } + + triggerLogIdSet.add(triggerParam.getLogId()); + triggerQueue.add(triggerParam); + return ReturnT.SUCCESS; + } + + /** + * kill job thread + * + * @param stopReason + */ + public void toStop(String stopReason) { + /** + * Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep), + * 在阻塞出抛出InterruptedException异常,但是并不会终止运行的线程本身; + * 所以需要注意,此处彻底销毁本线程,需要通过共享变量方式; + */ + this.toStop = true; + this.stopReason = stopReason; + } + + /** + * is running job + * @return + */ + public boolean isRunningOrHasQueue() { + return running || triggerQueue.size()>0; + } + + @Override + public void run() { + + // init + try { + handler.init(); + } catch (Throwable e) { + logger.error(e.getMessage(), e); + } + + // execute + while(!toStop){ + running = false; + idleTimes++; + + TriggerParam triggerParam = null; + try { + // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout) + triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); + if (triggerParam!=null) { + running = true; + idleTimes = 0; + triggerLogIdSet.remove(triggerParam.getLogId()); + + // log filename, like "logPath/yyyy-MM-dd/9999.log" + String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId()); + XxlJobContext xxlJobContext = new XxlJobContext( + triggerParam.getJobId(), + triggerParam.getExecutorParams(), + logFileName, + triggerParam.getBroadcastIndex(), + triggerParam.getBroadcastTotal()); + + // init job context + XxlJobContext.setXxlJobContext(xxlJobContext); + + // execute + XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam()); + + if (triggerParam.getExecutorTimeout() > 0) { + // limit timeout + Thread futureThread = null; + try { + FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + + // init job context + XxlJobContext.setXxlJobContext(xxlJobContext); + + handler.execute(); + return true; + } + }); + futureThread = new Thread(futureTask); + futureThread.start(); + + Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS); + } catch (TimeoutException e) { + + XxlJobHelper.log("<br>----------- xxl-job job execute timeout"); + XxlJobHelper.log(e); + + // handle result + XxlJobHelper.handleTimeout("job execute timeout "); + } finally { + futureThread.interrupt(); + } + } else { + // just execute + handler.execute(); + } + + // valid execute handle data + if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) { + XxlJobHelper.handleFail("job handle result lost."); + } else { + String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg(); + tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000) + ?tempHandleMsg.substring(0, 50000).concat("...") + :tempHandleMsg; + XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg); + } + XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode=" + + XxlJobContext.getXxlJobContext().getHandleCode() + + ", handleMsg = " + + XxlJobContext.getXxlJobContext().getHandleMsg() + ); + + } else { + if (idleTimes > 30) { + if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost + XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); + } + } + } + } catch (Throwable e) { + if (toStop) { + XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason); + } + + // handle result + StringWriter stringWriter = new StringWriter(); + e.printStackTrace(new PrintWriter(stringWriter)); + String errorMsg = stringWriter.toString(); + + XxlJobHelper.handleFail(errorMsg); + + XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------"); + } finally { + if(triggerParam != null) { + // callback handler info + if (!toStop) { + // commonm + TriggerCallbackThread.pushCallBack(new HandleCallbackParam( + triggerParam.getLogId(), + triggerParam.getLogDateTime(), + XxlJobContext.getXxlJobContext().getHandleCode(), + XxlJobContext.getXxlJobContext().getHandleMsg() ) + ); + } else { + // is killed + TriggerCallbackThread.pushCallBack(new HandleCallbackParam( + triggerParam.getLogId(), + triggerParam.getLogDateTime(), + XxlJobContext.HANDLE_CODE_FAIL, + stopReason + " [job running, killed]" ) + ); + } + } + } + } + + // callback trigger request in queue + while(triggerQueue !=null && triggerQueue.size()>0){ + TriggerParam triggerParam = triggerQueue.poll(); + if (triggerParam!=null) { + // is killed + TriggerCallbackThread.pushCallBack(new HandleCallbackParam( + triggerParam.getLogId(), + triggerParam.getLogDateTime(), + XxlJobContext.HANDLE_CODE_FAIL, + stopReason + " [job not executed, in the job queue, killed.]") + ); + } + } + + // destroy + try { + handler.destroy(); + } catch (Throwable e) { + logger.error(e.getMessage(), e); + } + + logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread()); + } +} -- Gitblit v1.9.3