From efdb99f8cecc4afb592afad79c761081d5d5cf22 Mon Sep 17 00:00:00 2001 From: lee <4766465@qq.com> Date: Wed, 18 Dec 2024 13:27:00 +0800 Subject: [PATCH] init --- xxl-job/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java | 172 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 172 insertions(+), 0 deletions(-) diff --git a/xxl-job/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java b/xxl-job/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java new file mode 100644 index 0000000..8bdf709 --- /dev/null +++ b/xxl-job/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java @@ -0,0 +1,172 @@ +package com.xxl.job.core.biz.impl; + +import com.xxl.job.core.biz.ExecutorBiz; +import com.xxl.job.core.biz.model.*; +import com.xxl.job.core.enums.ExecutorBlockStrategyEnum; +import com.xxl.job.core.executor.XxlJobExecutor; +import com.xxl.job.core.glue.GlueFactory; +import com.xxl.job.core.glue.GlueTypeEnum; +import com.xxl.job.core.handler.IJobHandler; +import com.xxl.job.core.handler.impl.GlueJobHandler; +import com.xxl.job.core.handler.impl.ScriptJobHandler; +import com.xxl.job.core.log.XxlJobFileAppender; +import com.xxl.job.core.thread.JobThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; + +/** + * Created by xuxueli on 17/3/1. + */ +public class ExecutorBizImpl implements ExecutorBiz { + private static Logger logger = LoggerFactory.getLogger(ExecutorBizImpl.class); + + @Override + public ReturnT<String> beat() { + return ReturnT.SUCCESS; + } + + @Override + public ReturnT<String> idleBeat(IdleBeatParam idleBeatParam) { + + // isRunningOrHasQueue + boolean isRunningOrHasQueue = false; + JobThread jobThread = XxlJobExecutor.loadJobThread(idleBeatParam.getJobId()); + if (jobThread != null && jobThread.isRunningOrHasQueue()) { + isRunningOrHasQueue = true; + } + + if (isRunningOrHasQueue) { + return new ReturnT<String>(ReturnT.FAIL_CODE, "job thread is running or has trigger queue."); + } + return ReturnT.SUCCESS; + } + + @Override + public ReturnT<String> run(TriggerParam triggerParam) { + // load old:jobHandler + jobThread + JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId()); + IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null; + String removeOldReason = null; + + // valid:jobHandler + jobThread + GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType()); + if (GlueTypeEnum.BEAN == glueTypeEnum) { + + // new jobhandler + IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler()); + + // valid old jobThread + if (jobThread!=null && jobHandler != newJobHandler) { + // change handler, need kill old thread + removeOldReason = "change jobhandler or glue type, and terminate the old job thread."; + + jobThread = null; + jobHandler = null; + } + + // valid handler + if (jobHandler == null) { + jobHandler = newJobHandler; + if (jobHandler == null) { + return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found."); + } + } + + } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) { + + // valid old jobThread + if (jobThread != null && + !(jobThread.getHandler() instanceof GlueJobHandler + && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) { + // change handler or gluesource updated, need kill old thread + removeOldReason = "change job source or glue type, and terminate the old job thread."; + + jobThread = null; + jobHandler = null; + } + + // valid handler + if (jobHandler == null) { + try { + IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource()); + jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime()); + } catch (Exception e) { + logger.error(e.getMessage(), e); + return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage()); + } + } + } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) { + + // valid old jobThread + if (jobThread != null && + !(jobThread.getHandler() instanceof ScriptJobHandler + && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) { + // change script or gluesource updated, need kill old thread + removeOldReason = "change job source or glue type, and terminate the old job thread."; + + jobThread = null; + jobHandler = null; + } + + // valid handler + if (jobHandler == null) { + jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType())); + } + } else { + return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid."); + } + + // executor block strategy + if (jobThread != null) { + ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null); + if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) { + // discard when running + if (jobThread.isRunningOrHasQueue()) { + return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle()); + } + } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) { + // kill running jobThread + if (jobThread.isRunningOrHasQueue()) { + removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle(); + + jobThread = null; + } + } else { + // just queue trigger + } + } + + // replace thread (new or exists invalid) + if (jobThread == null) { + jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason); + } + + // push data to queue + ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam); + return pushResult; + } + + @Override + public ReturnT<String> kill(KillParam killParam) { + // kill handlerThread, and create new one + JobThread jobThread = XxlJobExecutor.loadJobThread(killParam.getJobId()); + if (jobThread != null) { + XxlJobExecutor.removeJobThread(killParam.getJobId(), "scheduling center kill job."); + return ReturnT.SUCCESS; + } + + return new ReturnT<String>(ReturnT.SUCCESS_CODE, "job thread already killed."); + } + + @Override + public ReturnT<LogResult> log(LogParam logParam) { + // log filename: logPath/yyyy-MM-dd/9999.log + String logFileName = XxlJobFileAppender.makeLogFileName(new Date(logParam.getLogDateTim()), logParam.getLogId()); + + LogResult logResult = XxlJobFileAppender.readLog(logFileName, logParam.getFromLineNum()); + return new ReturnT<LogResult>(logResult); + } + +} -- Gitblit v1.9.3