From efdb99f8cecc4afb592afad79c761081d5d5cf22 Mon Sep 17 00:00:00 2001
From: lee <4766465@qq.com>
Date: Wed, 18 Dec 2024 13:27:00 +0800
Subject: [PATCH] init

---
 xxl-job/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java |  252 ++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 252 insertions(+), 0 deletions(-)

diff --git a/xxl-job/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java b/xxl-job/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java
new file mode 100644
index 0000000..cf07a55
--- /dev/null
+++ b/xxl-job/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java
@@ -0,0 +1,252 @@
+package com.xxl.job.core.thread;
+
+import com.xxl.job.core.biz.model.HandleCallbackParam;
+import com.xxl.job.core.biz.model.ReturnT;
+import com.xxl.job.core.biz.model.TriggerParam;
+import com.xxl.job.core.context.XxlJobContext;
+import com.xxl.job.core.context.XxlJobHelper;
+import com.xxl.job.core.executor.XxlJobExecutor;
+import com.xxl.job.core.handler.IJobHandler;
+import com.xxl.job.core.log.XxlJobFileAppender;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.*;
+
+
+/**
+ * handler thread
+ * @author xuxueli 2016-1-16 19:52:47
+ */
+public class JobThread extends Thread{
+	private static Logger logger = LoggerFactory.getLogger(JobThread.class);
+
+	private int jobId;
+	private IJobHandler handler;
+	private LinkedBlockingQueue<TriggerParam> triggerQueue;
+	private Set<Long> triggerLogIdSet;		// avoid repeat trigger for the same TRIGGER_LOG_ID
+
+	private volatile boolean toStop = false;
+	private String stopReason;
+
+    private boolean running = false;    // if running job
+	private int idleTimes = 0;			// idel times
+
+
+	public JobThread(int jobId, IJobHandler handler) {
+		this.jobId = jobId;
+		this.handler = handler;
+		this.triggerQueue = new LinkedBlockingQueue<TriggerParam>();
+		this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<Long>());
+
+		// assign job thread name
+		this.setName("xxl-job, JobThread-"+jobId+"-"+System.currentTimeMillis());
+	}
+	public IJobHandler getHandler() {
+		return handler;
+	}
+
+    /**
+     * new trigger to queue
+     *
+     * @param triggerParam
+     * @return
+     */
+	public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
+		// avoid repeat
+		if (triggerLogIdSet.contains(triggerParam.getLogId())) {
+			logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
+			return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
+		}
+
+		triggerLogIdSet.add(triggerParam.getLogId());
+		triggerQueue.add(triggerParam);
+        return ReturnT.SUCCESS;
+	}
+
+    /**
+     * kill job thread
+     *
+     * @param stopReason
+     */
+	public void toStop(String stopReason) {
+		/**
+		 * Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep),
+		 * 在阻塞出抛出InterruptedException异常,但是并不会终止运行的线程本身;
+		 * 所以需要注意,此处彻底销毁本线程,需要通过共享变量方式;
+		 */
+		this.toStop = true;
+		this.stopReason = stopReason;
+	}
+
+    /**
+     * is running job
+     * @return
+     */
+    public boolean isRunningOrHasQueue() {
+        return running || triggerQueue.size()>0;
+    }
+
+    @Override
+	public void run() {
+
+    	// init
+    	try {
+			handler.init();
+		} catch (Throwable e) {
+    		logger.error(e.getMessage(), e);
+		}
+
+		// execute
+		while(!toStop){
+			running = false;
+			idleTimes++;
+
+            TriggerParam triggerParam = null;
+            try {
+				// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
+				triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
+				if (triggerParam!=null) {
+					running = true;
+					idleTimes = 0;
+					triggerLogIdSet.remove(triggerParam.getLogId());
+
+					// log filename, like "logPath/yyyy-MM-dd/9999.log"
+					String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
+					XxlJobContext xxlJobContext = new XxlJobContext(
+							triggerParam.getJobId(),
+							triggerParam.getExecutorParams(),
+							logFileName,
+							triggerParam.getBroadcastIndex(),
+							triggerParam.getBroadcastTotal());
+
+					// init job context
+					XxlJobContext.setXxlJobContext(xxlJobContext);
+
+					// execute
+					XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());
+
+					if (triggerParam.getExecutorTimeout() > 0) {
+						// limit timeout
+						Thread futureThread = null;
+						try {
+							FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
+								@Override
+								public Boolean call() throws Exception {
+
+									// init job context
+									XxlJobContext.setXxlJobContext(xxlJobContext);
+
+									handler.execute();
+									return true;
+								}
+							});
+							futureThread = new Thread(futureTask);
+							futureThread.start();
+
+							Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
+						} catch (TimeoutException e) {
+
+							XxlJobHelper.log("<br>----------- xxl-job job execute timeout");
+							XxlJobHelper.log(e);
+
+							// handle result
+							XxlJobHelper.handleTimeout("job execute timeout ");
+						} finally {
+							futureThread.interrupt();
+						}
+					} else {
+						// just execute
+						handler.execute();
+					}
+
+					// valid execute handle data
+					if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {
+						XxlJobHelper.handleFail("job handle result lost.");
+					} else {
+						String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();
+						tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)
+								?tempHandleMsg.substring(0, 50000).concat("...")
+								:tempHandleMsg;
+						XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);
+					}
+					XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="
+							+ XxlJobContext.getXxlJobContext().getHandleCode()
+							+ ", handleMsg = "
+							+ XxlJobContext.getXxlJobContext().getHandleMsg()
+					);
+
+				} else {
+					if (idleTimes > 30) {
+						if(triggerQueue.size() == 0) {	// avoid concurrent trigger causes jobId-lost
+							XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
+						}
+					}
+				}
+			} catch (Throwable e) {
+				if (toStop) {
+					XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
+				}
+
+				// handle result
+				StringWriter stringWriter = new StringWriter();
+				e.printStackTrace(new PrintWriter(stringWriter));
+				String errorMsg = stringWriter.toString();
+
+				XxlJobHelper.handleFail(errorMsg);
+
+				XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
+			} finally {
+                if(triggerParam != null) {
+                    // callback handler info
+                    if (!toStop) {
+                        // commonm
+                        TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
+                        		triggerParam.getLogId(),
+								triggerParam.getLogDateTime(),
+								XxlJobContext.getXxlJobContext().getHandleCode(),
+								XxlJobContext.getXxlJobContext().getHandleMsg() )
+						);
+                    } else {
+                        // is killed
+                        TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
+                        		triggerParam.getLogId(),
+								triggerParam.getLogDateTime(),
+								XxlJobContext.HANDLE_CODE_FAIL,
+								stopReason + " [job running, killed]" )
+						);
+                    }
+                }
+            }
+        }
+
+		// callback trigger request in queue
+		while(triggerQueue !=null && triggerQueue.size()>0){
+			TriggerParam triggerParam = triggerQueue.poll();
+			if (triggerParam!=null) {
+				// is killed
+				TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
+						triggerParam.getLogId(),
+						triggerParam.getLogDateTime(),
+						XxlJobContext.HANDLE_CODE_FAIL,
+						stopReason + " [job not executed, in the job queue, killed.]")
+				);
+			}
+		}
+
+		// destroy
+		try {
+			handler.destroy();
+		} catch (Throwable e) {
+			logger.error(e.getMessage(), e);
+		}
+
+		logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
+	}
+}

--
Gitblit v1.9.3