panchengyong
2024-11-24 63da19a9986cfe9632ddd855230142aacc9b3f21
优化1小时内只执行一次同一需求ID下的任务
2 files modified
56 ■■■■ changed files
src/main/java/com/jw/ai/config/Constant.java 1 ●●●● patch | view | raw | blame | history
src/main/java/com/jw/ai/service/impl/TaskServiceImpl.java 55 ●●●● patch | view | raw | blame | history
src/main/java/com/jw/ai/config/Constant.java
@@ -71,6 +71,7 @@
    public static final String DEMAND_TASK_NOTICE_KEY = "DEMAND_TASK_NOTICE:%s";
    public static final String DEMAND_JOB_NAME = "%s_%s_%s";
    public static final String DEMAND_JOB_INTERVAL_KEY = "DEMAND_JOB_LASTEAST_EXEC:%s";
    // =====================exec_platform============================
    public static final Integer EC = 1;
src/main/java/com/jw/ai/service/impl/TaskServiceImpl.java
@@ -158,6 +158,7 @@
    /**
     * 执行py任务, 返回执行uuid
     *
     * @param task
     */
    private String execPyTask(Task task, String secId, int num) {
@@ -317,11 +318,31 @@
    /**
     * 一键获客任务
     *
     * @param demandId
     * @return
     */
    @Override
    public String hotStart(Long demandId) {
        // 缓存当前demand上一次执行时间
        String keyInterval = String.format(Constant.DEMAND_JOB_INTERVAL_KEY, demandId.toString());
        log.info("==========>hotStart, 当前需求ID:{} ", demandId);
        try {
            Object interval = redisUtil.get(keyInterval);
            if (interval != null) {
                Long lastJobTime = Long.valueOf(interval.toString());
                log.info("==========>上一次执行时间redis缓存key:{}, 缓存时间(long)Time:{}", keyInterval, lastJobTime);
                if (DateTime.now().getTime() < lastJobTime + 3600000) {
                    log.info("==========>hotStart, 1小时内只执行一次同一需求ID: {} 下的任务", demandId);
                    return "系统当前限制同一需求1小时内只执行一次,请稍后再试!";    // 1小时内执行一次
                }
            }else{
                log.info("==========>redis缓存key:{} 为null", keyInterval);
            }
        } catch (Exception e) {
            log.info("==========>hotStart, Exception {}", e.toString());
        }
        Long currentTenantId = TenantContext.getCurrentTenantId();
        DemandOneClickLog saveOrUpdate = new DemandOneClickLog();
        saveOrUpdate.setDemandId(demandId);
@@ -358,8 +379,9 @@
                String secId = DateUtil.format(new Date(), "yyyyMMddHHmmssSSS");
                // 如果存在待完善的粉丝数据,就先完善,直到没有可完善的粉丝数据,反之才进行数据采集
                int count = baseMapper.countUpdateProfile(demand.getId());
                log.info("==========>hotStart, 需求:{} ,待完善的粉丝数: {} ",demand.getId() + ", "+ demand.getName(), count);
                log.info("==========>需求Id:{} 的待完善粉丝数: {} ", demand.getId() + ", " + demand.getName(), count);
                List<String> uuidList = new ArrayList<>();
                // 遍历当前demand下的所有任务
                for (int i = 0; i < list.size(); i++) {
                    Task task = list.get(i);
                    if (count == 0) {
@@ -367,7 +389,9 @@
                        // 将多个脚本放到一个执行链条里面
                        String uuid = execPyTask(task, secId, i);
                        uuidList.add(uuid);
                        log.info("==========>hotStart, execPyTask ====> {}", task.getTaskId() + " == "+ task.getPyScript());
                        String nowStr = Long.toString(DateTime.now().getTime());
                        redisUtil.set(keyInterval, nowStr); // 缓存执行时间,防止连续调用py接口
                        log.info("==========>hotStart, execPyTask 执行当前需求Id:{} 下任务TaskId: {}", demandId, task.getTaskId() + " == " + task.getPyScript() + "== 执行时间now time:" + nowStr);
                    } else {
                        // 如果有待完善的粉丝数据,直接进行完善,直到没有可以完善的了
                        if (task.getPyScript().contains(Constant.profile_prefix)) {
@@ -382,8 +406,8 @@
                                // 缓存中存在多少次执行记录了
                                String key = String.format(Constant.DEMAND_UPDATE_USER_PROFILE, demandId, "user_profile_spider");
                                Object o = redisUtil.get(key);
                                int num =0;
                                int cycle=0;
                                int num = 0;
                                int cycle = 0;
                                if (null != o) {
                                    String str = o.toString();
                                    String[] split = str.split(":");
@@ -396,7 +420,7 @@
                                        return null;
                                    } else {
                                        cycle += 1;
                                        log.info("==========>hotStart,继续执行,当前执行的次数 {}",cycle);
                                        log.info("==========>hotStart,继续执行,当前执行的次数 {}", cycle);
                                        redisUtil.set(key, count + ":" + cycle, 2 * 60 * 60);
                                    }
                                } else {
@@ -420,7 +444,7 @@
                }
                String key = String.format(Constant.DEMAND_TASK_NOTICE_KEY, demandId);
                redisUtil.set(key, uuidList.toString());
                log.info("一键启动后返回的批次号: {}",secId);
                log.info("一键启动后返回的批次号: {}", secId);
                return secId;
            }
        }
@@ -455,12 +479,13 @@
    /**
     * 一键获客定时任务
     *
     * @param tenantId
     */
    @Override
    public void hotStartJob(Long tenantId) {
        log.info("==========> hotStartJob");
        List<DemandOneClickLog> list = demandOneClickLogService.list(new LambdaQueryWrapper<DemandOneClickLog>().eq(DemandOneClickLog::getIsDeleted,0));
        List<DemandOneClickLog> list = demandOneClickLogService.list(new LambdaQueryWrapper<DemandOneClickLog>().eq(DemandOneClickLog::getIsDeleted, 0));
        if (CollectionUtil.isNotEmpty(list)) {
            // 检查平台待私信粉丝数 是否大于 账号数*平台私信上限的2倍
            // 如果没有达到,就执行
@@ -471,7 +496,7 @@
                    log.info("==========>租户ID:{}, 需求ID:{},", tenantId, demandId);
                    // 平台粉丝数
                    List<PlatformFansData> platformFansData = dyCommentsGotService.platformData();
                    if(CollectionUtil.isEmpty(platformFansData)) continue;
                    if (CollectionUtil.isEmpty(platformFansData)) continue;
                    // 策略判断是否执行数据采集和完善任务
                    boolean b = checkExecStrategy(obj, platformFansData);
                    // 1、如果可私信的粉丝数达标了,不在执行定时任务
@@ -521,7 +546,7 @@
                        }
                    } else {
                        // 2.2 没有可完善的粉丝数据时,执行数据采集和评论采集
                        log.info("==========>没有可完善的粉丝数据,开始执行数据采集和评论采集, 租户ID:{}, 需求ID:{} ",tenantId, demandId);
                        log.info("==========>没有可完善的粉丝数据,开始执行数据采集和评论采集, 租户ID:{}, 需求ID:{} ", tenantId, demandId);
                        List<Task> taskList = this.list(new LambdaQueryWrapper<Task>().eq(Task::getRequireId, demandId));
                        if (!CollectionUtil.isEmpty(taskList)) {
                            for (int i = 0; i < taskList.size(); i++) {
@@ -533,7 +558,7 @@
                            }
                        }
                    }
                }catch (Exception e){
                } catch (Exception e) {
                    e.printStackTrace();
                    log.error("==========>DemandOneClickLog error:{}", e.toString());
                }
@@ -597,6 +622,7 @@
    /**
     * 执行抖音大V账号信息同步run spider
     *
     * @return
     */
    @Override
@@ -708,10 +734,10 @@
            // 4.如果任务执行完成后待完善粉丝数量 > 0 需要重复执行完善脚本
            if (!CollectionUtil.isEmpty(demandUserProfile)) {
                Set<Map.Entry<Long, String>> entrySet = demandUserProfile.entrySet();
                log.info("当前执行的需求数 {}",entrySet.size());
                log.info("当前执行的需求数 {}", entrySet.size());
                for (Map.Entry<Long, String> obj : entrySet) {
                    Long demandId = obj.getKey();
                    log.info("当前执行的需求 {}",demandId);
                    log.info("当前执行的需求 {}", demandId);
                    String script = obj.getValue();
                    int count = baseMapper.countUpdateProfile(demandId);
                    String key = String.format(Constant.DEMAND_UPDATE_USER_PROFILE, demandId, script);
@@ -768,6 +794,7 @@
    /**
     * 调用py接口
     *
     * @param task
     * @return
     */
@@ -791,6 +818,7 @@
    /**
     * 获取任务执行策略
     *
     * @param obj
     * @param platformFansData
     * @return
@@ -850,6 +878,7 @@
    /**
     * 任务执行步骤
     *
     * @param task
     * @param taskLog
     * @return
@@ -915,6 +944,7 @@
    /**
     * 任务执行完成状态
     *
     * @param first
     * @param demandUserProfile
     * @param demandId
@@ -992,6 +1022,7 @@
    /**
     * 任务执行日志状态变更
     *
     * @param jobExecLog
     * @param list
     */