From efdb99f8cecc4afb592afad79c761081d5d5cf22 Mon Sep 17 00:00:00 2001
From: lee <4766465@qq.com>
Date: Wed, 18 Dec 2024 13:27:00 +0800
Subject: [PATCH] init

---
 xxl-job/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java |  260 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 260 insertions(+), 0 deletions(-)

diff --git a/xxl-job/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java b/xxl-job/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java
new file mode 100644
index 0000000..40acac0
--- /dev/null
+++ b/xxl-job/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java
@@ -0,0 +1,260 @@
+package com.xxl.job.core.thread;
+
+import com.xxl.job.core.biz.AdminBiz;
+import com.xxl.job.core.biz.model.HandleCallbackParam;
+import com.xxl.job.core.biz.model.ReturnT;
+import com.xxl.job.core.context.XxlJobContext;
+import com.xxl.job.core.context.XxlJobHelper;
+import com.xxl.job.core.enums.RegistryConfig;
+import com.xxl.job.core.executor.XxlJobExecutor;
+import com.xxl.job.core.log.XxlJobFileAppender;
+import com.xxl.job.core.util.FileUtil;
+import com.xxl.job.core.util.JdkSerializeTool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by xuxueli on 16/7/22.
+ */
+public class TriggerCallbackThread {
+    private static Logger logger = LoggerFactory.getLogger(TriggerCallbackThread.class);
+
+    private static TriggerCallbackThread instance = new TriggerCallbackThread();
+    public static TriggerCallbackThread getInstance(){
+        return instance;
+    }
+
+    /**
+     * job results callback queue
+     */
+    private LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<HandleCallbackParam>();
+    public static void pushCallBack(HandleCallbackParam callback){
+        getInstance().callBackQueue.add(callback);
+        logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId());
+    }
+
+    /**
+     * callback thread
+     */
+    private Thread triggerCallbackThread;
+    private Thread triggerRetryCallbackThread;
+    private volatile boolean toStop = false;
+    public void start() {
+
+        // valid
+        if (XxlJobExecutor.getAdminBizList() == null) {
+            logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");
+            return;
+        }
+
+        // callback
+        triggerCallbackThread = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+
+                // normal callback
+                while(!toStop){
+                    try {
+                        HandleCallbackParam callback = getInstance().callBackQueue.take();
+                        if (callback != null) {
+
+                            // callback list param
+                            List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
+                            int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
+                            callbackParamList.add(callback);
+
+                            // callback, will retry if error
+                            if (callbackParamList!=null && callbackParamList.size()>0) {
+                                doCallback(callbackParamList);
+                            }
+                        }
+                    } catch (Exception e) {
+                        if (!toStop) {
+                            logger.error(e.getMessage(), e);
+                        }
+                    }
+                }
+
+                // last callback
+                try {
+                    List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
+                    int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
+                    if (callbackParamList!=null && callbackParamList.size()>0) {
+                        doCallback(callbackParamList);
+                    }
+                } catch (Exception e) {
+                    if (!toStop) {
+                        logger.error(e.getMessage(), e);
+                    }
+                }
+                logger.info(">>>>>>>>>>> xxl-job, executor callback thread destroy.");
+
+            }
+        });
+        triggerCallbackThread.setDaemon(true);
+        triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread");
+        triggerCallbackThread.start();
+
+
+        // retry
+        triggerRetryCallbackThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                while(!toStop){
+                    try {
+                        retryFailCallbackFile();
+                    } catch (Exception e) {
+                        if (!toStop) {
+                            logger.error(e.getMessage(), e);
+                        }
+
+                    }
+                    try {
+                        TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
+                    } catch (InterruptedException e) {
+                        if (!toStop) {
+                            logger.error(e.getMessage(), e);
+                        }
+                    }
+                }
+                logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destroy.");
+            }
+        });
+        triggerRetryCallbackThread.setDaemon(true);
+        triggerRetryCallbackThread.start();
+
+    }
+    public void toStop(){
+        toStop = true;
+        // stop callback, interrupt and wait
+        if (triggerCallbackThread != null) {    // support empty admin address
+            triggerCallbackThread.interrupt();
+            try {
+                triggerCallbackThread.join();
+            } catch (InterruptedException e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        // stop retry, interrupt and wait
+        if (triggerRetryCallbackThread != null) {
+            triggerRetryCallbackThread.interrupt();
+            try {
+                triggerRetryCallbackThread.join();
+            } catch (InterruptedException e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+    }
+
+    /**
+     * do callback, will retry if error
+     * @param callbackParamList
+     */
+    private void doCallback(List<HandleCallbackParam> callbackParamList){
+        boolean callbackRet = false;
+        // callback, will retry if error
+        for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
+            try {
+                ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
+                if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
+                    callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");
+                    callbackRet = true;
+                    break;
+                } else {
+                    callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult);
+                }
+            } catch (Exception e) {
+                callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage());
+            }
+        }
+        if (!callbackRet) {
+            appendFailCallbackFile(callbackParamList);
+        }
+    }
+
+    /**
+     * callback log
+     */
+    private void callbackLog(List<HandleCallbackParam> callbackParamList, String logContent){
+        for (HandleCallbackParam callbackParam: callbackParamList) {
+            String logFileName = XxlJobFileAppender.makeLogFileName(new Date(callbackParam.getLogDateTim()), callbackParam.getLogId());
+            XxlJobContext.setXxlJobContext(new XxlJobContext(
+                    -1,
+                    null,
+                    logFileName,
+                    -1,
+                    -1));
+            XxlJobHelper.log(logContent);
+        }
+    }
+
+
+    // ---------------------- fail-callback file ----------------------
+
+    private static String failCallbackFilePath = XxlJobFileAppender.getLogPath().concat(File.separator).concat("callbacklog").concat(File.separator);
+    private static String failCallbackFileName = failCallbackFilePath.concat("xxl-job-callback-{x}").concat(".log");
+
+    private void appendFailCallbackFile(List<HandleCallbackParam> callbackParamList){
+        // valid
+        if (callbackParamList==null || callbackParamList.size()==0) {
+            return;
+        }
+
+        // append file
+        byte[] callbackParamList_bytes = JdkSerializeTool.serialize(callbackParamList);
+
+        File callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis())));
+        if (callbackLogFile.exists()) {
+            for (int i = 0; i < 100; i++) {
+                callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis()).concat("-").concat(String.valueOf(i)) ));
+                if (!callbackLogFile.exists()) {
+                    break;
+                }
+            }
+        }
+        FileUtil.writeFileContent(callbackLogFile, callbackParamList_bytes);
+    }
+
+    private void retryFailCallbackFile(){
+
+        // valid
+        File callbackLogPath = new File(failCallbackFilePath);
+        if (!callbackLogPath.exists()) {
+            return;
+        }
+        if (callbackLogPath.isFile()) {
+            callbackLogPath.delete();
+        }
+        if (!(callbackLogPath.isDirectory() && callbackLogPath.list()!=null && callbackLogPath.list().length>0)) {
+            return;
+        }
+
+        // load and clear file, retry
+        for (File callbaclLogFile: callbackLogPath.listFiles()) {
+            byte[] callbackParamList_bytes = FileUtil.readFileContent(callbaclLogFile);
+
+            // avoid empty file
+            if(callbackParamList_bytes == null || callbackParamList_bytes.length < 1){
+                callbaclLogFile.delete();
+                continue;
+            }
+
+            List<HandleCallbackParam> callbackParamList = (List<HandleCallbackParam>) JdkSerializeTool.deserialize(callbackParamList_bytes, List.class);
+
+            callbaclLogFile.delete();
+            doCallback(callbackParamList);
+        }
+
+    }
+
+}

--
Gitblit v1.9.3