From efdb99f8cecc4afb592afad79c761081d5d5cf22 Mon Sep 17 00:00:00 2001 From: lee <4766465@qq.com> Date: Wed, 18 Dec 2024 13:27:00 +0800 Subject: [PATCH] init --- xxl-job/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java | 260 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 260 insertions(+), 0 deletions(-) diff --git a/xxl-job/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java b/xxl-job/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java new file mode 100644 index 0000000..40acac0 --- /dev/null +++ b/xxl-job/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java @@ -0,0 +1,260 @@ +package com.xxl.job.core.thread; + +import com.xxl.job.core.biz.AdminBiz; +import com.xxl.job.core.biz.model.HandleCallbackParam; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.context.XxlJobContext; +import com.xxl.job.core.context.XxlJobHelper; +import com.xxl.job.core.enums.RegistryConfig; +import com.xxl.job.core.executor.XxlJobExecutor; +import com.xxl.job.core.log.XxlJobFileAppender; +import com.xxl.job.core.util.FileUtil; +import com.xxl.job.core.util.JdkSerializeTool; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * Created by xuxueli on 16/7/22. + */ +public class TriggerCallbackThread { + private static Logger logger = LoggerFactory.getLogger(TriggerCallbackThread.class); + + private static TriggerCallbackThread instance = new TriggerCallbackThread(); + public static TriggerCallbackThread getInstance(){ + return instance; + } + + /** + * job results callback queue + */ + private LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<HandleCallbackParam>(); + public static void pushCallBack(HandleCallbackParam callback){ + getInstance().callBackQueue.add(callback); + logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId()); + } + + /** + * callback thread + */ + private Thread triggerCallbackThread; + private Thread triggerRetryCallbackThread; + private volatile boolean toStop = false; + public void start() { + + // valid + if (XxlJobExecutor.getAdminBizList() == null) { + logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null."); + return; + } + + // callback + triggerCallbackThread = new Thread(new Runnable() { + + @Override + public void run() { + + // normal callback + while(!toStop){ + try { + HandleCallbackParam callback = getInstance().callBackQueue.take(); + if (callback != null) { + + // callback list param + List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>(); + int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); + callbackParamList.add(callback); + + // callback, will retry if error + if (callbackParamList!=null && callbackParamList.size()>0) { + doCallback(callbackParamList); + } + } + } catch (Exception e) { + if (!toStop) { + logger.error(e.getMessage(), e); + } + } + } + + // last callback + try { + List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>(); + int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); + if (callbackParamList!=null && callbackParamList.size()>0) { + doCallback(callbackParamList); + } + } catch (Exception e) { + if (!toStop) { + logger.error(e.getMessage(), e); + } + } + logger.info(">>>>>>>>>>> xxl-job, executor callback thread destroy."); + + } + }); + triggerCallbackThread.setDaemon(true); + triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread"); + triggerCallbackThread.start(); + + + // retry + triggerRetryCallbackThread = new Thread(new Runnable() { + @Override + public void run() { + while(!toStop){ + try { + retryFailCallbackFile(); + } catch (Exception e) { + if (!toStop) { + logger.error(e.getMessage(), e); + } + + } + try { + TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); + } catch (InterruptedException e) { + if (!toStop) { + logger.error(e.getMessage(), e); + } + } + } + logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destroy."); + } + }); + triggerRetryCallbackThread.setDaemon(true); + triggerRetryCallbackThread.start(); + + } + public void toStop(){ + toStop = true; + // stop callback, interrupt and wait + if (triggerCallbackThread != null) { // support empty admin address + triggerCallbackThread.interrupt(); + try { + triggerCallbackThread.join(); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + } + + // stop retry, interrupt and wait + if (triggerRetryCallbackThread != null) { + triggerRetryCallbackThread.interrupt(); + try { + triggerRetryCallbackThread.join(); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + } + + } + + /** + * do callback, will retry if error + * @param callbackParamList + */ + private void doCallback(List<HandleCallbackParam> callbackParamList){ + boolean callbackRet = false; + // callback, will retry if error + for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { + try { + ReturnT<String> callbackResult = adminBiz.callback(callbackParamList); + if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) { + callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish."); + callbackRet = true; + break; + } else { + callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult); + } + } catch (Exception e) { + callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage()); + } + } + if (!callbackRet) { + appendFailCallbackFile(callbackParamList); + } + } + + /** + * callback log + */ + private void callbackLog(List<HandleCallbackParam> callbackParamList, String logContent){ + for (HandleCallbackParam callbackParam: callbackParamList) { + String logFileName = XxlJobFileAppender.makeLogFileName(new Date(callbackParam.getLogDateTim()), callbackParam.getLogId()); + XxlJobContext.setXxlJobContext(new XxlJobContext( + -1, + null, + logFileName, + -1, + -1)); + XxlJobHelper.log(logContent); + } + } + + + // ---------------------- fail-callback file ---------------------- + + private static String failCallbackFilePath = XxlJobFileAppender.getLogPath().concat(File.separator).concat("callbacklog").concat(File.separator); + private static String failCallbackFileName = failCallbackFilePath.concat("xxl-job-callback-{x}").concat(".log"); + + private void appendFailCallbackFile(List<HandleCallbackParam> callbackParamList){ + // valid + if (callbackParamList==null || callbackParamList.size()==0) { + return; + } + + // append file + byte[] callbackParamList_bytes = JdkSerializeTool.serialize(callbackParamList); + + File callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis()))); + if (callbackLogFile.exists()) { + for (int i = 0; i < 100; i++) { + callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis()).concat("-").concat(String.valueOf(i)) )); + if (!callbackLogFile.exists()) { + break; + } + } + } + FileUtil.writeFileContent(callbackLogFile, callbackParamList_bytes); + } + + private void retryFailCallbackFile(){ + + // valid + File callbackLogPath = new File(failCallbackFilePath); + if (!callbackLogPath.exists()) { + return; + } + if (callbackLogPath.isFile()) { + callbackLogPath.delete(); + } + if (!(callbackLogPath.isDirectory() && callbackLogPath.list()!=null && callbackLogPath.list().length>0)) { + return; + } + + // load and clear file, retry + for (File callbaclLogFile: callbackLogPath.listFiles()) { + byte[] callbackParamList_bytes = FileUtil.readFileContent(callbaclLogFile); + + // avoid empty file + if(callbackParamList_bytes == null || callbackParamList_bytes.length < 1){ + callbaclLogFile.delete(); + continue; + } + + List<HandleCallbackParam> callbackParamList = (List<HandleCallbackParam>) JdkSerializeTool.deserialize(callbackParamList_bytes, List.class); + + callbaclLogFile.delete(); + doCallback(callbackParamList); + } + + } + +} -- Gitblit v1.9.3