From efdb99f8cecc4afb592afad79c761081d5d5cf22 Mon Sep 17 00:00:00 2001 From: lee <4766465@qq.com> Date: Wed, 18 Dec 2024 13:27:00 +0800 Subject: [PATCH] init --- xxl-job/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java | 150 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 150 insertions(+), 0 deletions(-) diff --git a/xxl-job/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java b/xxl-job/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java new file mode 100644 index 0000000..398713d --- /dev/null +++ b/xxl-job/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java @@ -0,0 +1,150 @@ +package com.xxl.job.admin.core.thread; + +import com.xxl.job.admin.core.conf.XxlJobAdminConfig; +import com.xxl.job.admin.core.trigger.TriggerTypeEnum; +import com.xxl.job.admin.core.trigger.XxlJobTrigger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * job trigger thread pool helper + * + * @author xuxueli 2018-07-03 21:08:07 + */ +public class JobTriggerPoolHelper { + private static Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class); + + + // ---------------------- trigger pool ---------------------- + + // fast/slow thread pool + private ThreadPoolExecutor fastTriggerPool = null; + private ThreadPoolExecutor slowTriggerPool = null; + + public void start(){ + fastTriggerPool = new ThreadPoolExecutor( + 10, + XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(), + 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(1000), + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode()); + } + }); + + slowTriggerPool = new ThreadPoolExecutor( + 10, + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(), + 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(2000), + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode()); + } + }); + } + + + public void stop() { + //triggerPool.shutdown(); + fastTriggerPool.shutdownNow(); + slowTriggerPool.shutdownNow(); + logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success."); + } + + + // job timeout count + private volatile long minTim = System.currentTimeMillis()/60000; // ms > min + private volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>(); + + + /** + * add trigger + */ + public void addTrigger(final int jobId, + final TriggerTypeEnum triggerType, + final int failRetryCount, + final String executorShardingParam, + final String executorParam, + final String addressList) { + + // choose thread pool + ThreadPoolExecutor triggerPool_ = fastTriggerPool; + AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId); + if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min + triggerPool_ = slowTriggerPool; + } + + // trigger + triggerPool_.execute(new Runnable() { + @Override + public void run() { + + long start = System.currentTimeMillis(); + + try { + // do trigger + XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } finally { + + // check timeout-count-map + long minTim_now = System.currentTimeMillis()/60000; + if (minTim != minTim_now) { + minTim = minTim_now; + jobTimeoutCountMap.clear(); + } + + // incr timeout-count-map + long cost = System.currentTimeMillis()-start; + if (cost > 500) { // ob-timeout threshold 500ms + AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1)); + if (timeoutCount != null) { + timeoutCount.incrementAndGet(); + } + } + + } + + } + }); + } + + + + // ---------------------- helper ---------------------- + + private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper(); + + public static void toStart() { + helper.start(); + } + public static void toStop() { + helper.stop(); + } + + /** + * @param jobId + * @param triggerType + * @param failRetryCount + * >=0: use this param + * <0: use param from job info config + * @param executorShardingParam + * @param executorParam + * null: use job param + * not null: cover job param + */ + public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) { + helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList); + } + +} -- Gitblit v1.9.3