leo.li
2024-09-10 60c3acf95feca995f8f64f6b97f461ff7dadb2b5
update
5 files modified
191 ■■■■■ changed files
src/main/java/com/jw/ai/config/Constant.java 4 ●●●● patch | view | raw | blame | history
src/main/java/com/jw/ai/controller/TaskController.java 8 ●●●●● patch | view | raw | blame | history
src/main/java/com/jw/ai/listener/RedisKeyExpirationListener.java 3 ●●●● patch | view | raw | blame | history
src/main/java/com/jw/ai/service/ITaskService.java 7 ●●●●● patch | view | raw | blame | history
src/main/java/com/jw/ai/service/impl/TaskServiceImpl.java 169 ●●●●● patch | view | raw | blame | history
src/main/java/com/jw/ai/config/Constant.java
@@ -65,6 +65,10 @@
    public static final String DEMAND_UPDATE_USER_PROFILE = "DEMAND_UPDATE_USER_PROFILE:%s:%s";
    public static final String DEMAND_UPDATE_USER_PROFILE_KEY = "DEMAND_UPDATE_USER_PROFILE";
    public static final String DEMAND_UPDATE_USER_PROFILE_LISTEN = "DEMAND_UPDATE_USER_PROFILE_LISTEN:%s:%s";
    public static final String DEMAND_UPDATE_USER_PROFILE_LISTEN_KEY = "DEMAND_UPDATE_USER_PROFILE_LISTEN";
    public static final String DEMAND_TASK_LISTEN = "DEMAND_TASK_LISTEN:%s:%s:%s:%s";
    public static final String DEMAND_TASK_LISTEN_KEY = "DEMAND_TASK_LISTEN";
    public static final String DEMAND_TASK_NOTICE_KEY = "DEMAND_TASK_NOTICE";
    public static final String DEMAND_JOB_NAME = "%s_%s_%s";
src/main/java/com/jw/ai/controller/TaskController.java
@@ -132,4 +132,12 @@
        taskService.hotStartJob(tenantId);
        return ServerResponseEntity.success();
    }
    @GetMapping("callback")
    @Operation(description = "回调执行任务",summary = "回调通知执行任务")
    public ServerResponseEntity<?> callback(String uuid, String secId) {
        log.info("回调执行任务 {} {}", uuid, secId);
        taskService.callback(uuid, secId);
        return ServerResponseEntity.success();
    }
}
src/main/java/com/jw/ai/listener/RedisKeyExpirationListener.java
@@ -95,7 +95,7 @@
                }
            }
            // 监控完善执行结果
            if (expireKey.contains(Constant.DEMAND_UPDATE_USER_PROFILE_LISTEN)) {
            if (expireKey.contains(Constant.DEMAND_UPDATE_USER_PROFILE_LISTEN_KEY)) {
                String[] split = expireKey.split(":");
                if (split.length > 2) {
                    Long demandId = Long.valueOf(split[1]);
@@ -117,5 +117,4 @@
            }
        }
    }
}
src/main/java/com/jw/ai/service/ITaskService.java
@@ -90,4 +90,11 @@
     * @return
     */
    void hotStartJob(Long tenantId);
    /**
     * 回调执行任务
     * @param uuid
     * @param secId
     */
    void callback(String uuid, String secId);
}
src/main/java/com/jw/ai/service/impl/TaskServiceImpl.java
@@ -39,6 +39,7 @@
import com.jw.ai.vo.*;
import javafx.scene.Group;
import lombok.extern.slf4j.Slf4j;
import nonapi.io.github.classgraph.json.JSONUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.tomcat.util.bcel.Const;
@@ -146,7 +147,8 @@
                int row = baseMapper.update(null, new LambdaUpdateWrapper<Task>().set(Task::getStatus, 0).set(Task::getGmtModified, new Date()).eq(Task::getTaskId, id));
                log.info("更新任务状态结果 {}", row);
            }
            execPyTask(task, null);
            String uuid = execPyTask(task, null, 0);
        }
    }
@@ -154,72 +156,66 @@
     * 执行py任务
     * @param task
     */
    private String execPyTask(Task task,String secId) {
    private String execPyTask(Task task, String secId, int num) {
        if (Constant.PY.equals(task.getExecPlatform())) {
            Task selectOne = baseMapper.selectOne(new LambdaQueryWrapper<Task>().eq(Task::getRequireId, task.getRequireId()).in(Task::getPyScript, Constant.douyin_search_spider,
                    Constant.kuaishou_search_spider,
                    Constant.xhs_search_spider).last("limit 1"));
            Task selectOne = baseMapper.selectOne(new LambdaQueryWrapper<Task>()
                    .eq(Task::getRequireId, task.getRequireId())
                    .in(Task::getPyScript, Constant.douyin_search_spider, Constant.kuaishou_search_spider, Constant.xhs_search_spider)
                    .last("limit 1"));
            if (null == selectOne) {
                log.info("demand {} none collect data  return", task.getRequireName());
                return null;
            }
            // python任务需要调用api执行操作,并修改需求的状态
            String uuid = IdUtil.simpleUUID();
            String s = execPyJob(task, uuid, selectOne, secId);
            CommonRes bean = JSONUtil.toBean(s, CommonRes.class);
            if (null != bean && Constant.OK.equals(bean.getStatus())) {
            // 设置任务执行步骤
            TaskLog taskLog = new TaskLog();
            int step = stepSet(task, taskLog);
            boolean row = demandService.update(null, new LambdaUpdateWrapper<Demand>().set(Demand::getStatus, step).set(Demand::getUpdateTime, new Date()).eq(Demand::getId, task.getRequireId()));
            log.info("更新状态:{}", row);
            // 保存任务执行记录
            JobExecLog jobExecLog = new JobExecLog();
            jobExecLog.setTaskId(task.getTaskId());
            jobExecLog.setProject("aijuke_spider");
            jobExecLog.setSpiderScript(task.getPyScript());
            jobExecLog.setName(task.getJobName());
            jobExecLog.setJobid(task.getJobUniqueNo());
            jobExecLog.setStatus(Constant.YES);
            jobExecLog.setTenantId(task.getTenantId());
            jobExecLog.setUuid(uuid);
            if (StringUtils.isNotBlank(secId)) {
                // 一键获客的日志增加批次号
                jobExecLog.setSecId(secId);
            }
            jobExecLog.setStartTime(new Date());
            jobExecLogMapper.insert(jobExecLog);
            log.info("---------->{}", jobExecLog.getId());
            // 任务执行流水
            taskLog.setTaskId(task.getTaskId());
            taskLog.setDemandId(task.getRequireId());
            taskLog.setCreateTime(new Date());
            taskLog.setUpdateTime(new Date());
            taskLog.setJobid(task.getJobUniqueNo());
            taskLog.setTenantId(task.getTenantId());
            if (task.getPyScript().contains(Constant.search_prefix)) {
                taskLog.setTaskType(1);
            } else if (task.getPyScript().contains(Constant.comment_prefix)) {
                taskLog.setTaskType(2);
            } else if (task.getPyScript().contains(Constant.profile_prefix)) {
                taskLog.setTaskType(3);
            }
            taskLog.setTaskStatus(1);
            taskLog.setUuid(uuid);
            if (StringUtils.isNotBlank(secId)) {
                taskLog.setSecId(secId);
            }
            taskLogService.save(taskLog);
            //开始执行任务,执行首次,后面的通过回调执行
            if (num == 0) {
                String s = execPyJob(task, uuid, selectOne, secId);
                CommonRes bean = JSONUtil.toBean(s, CommonRes.class);
                log.info("任务启动成功: {}", bean.getJobid());
                // 设置任务执行步骤
                TaskLog taskLog = new TaskLog();
                int step = stepSet(task,taskLog);
                demandService.update(null, new LambdaUpdateWrapper<Demand>().set(Demand::getStatus, step).set(Demand::getUpdateTime, new Date()).eq(Demand::getId, task.getRequireId()));
                // 保存任务执行记录
                JobExecLog jobExecLog = new JobExecLog();
                jobExecLog.setTaskId(task.getTaskId());
                jobExecLog.setProject("aijuke_spider");
                jobExecLog.setSpiderScript(task.getPyScript());
                jobExecLog.setName(task.getJobName());
                jobExecLog.setJobid(task.getJobUniqueNo());
                jobExecLog.setStatus(Constant.YES);
                jobExecLog.setTenantId(task.getTenantId());
                jobExecLog.setUuid(uuid);
                if (StringUtils.isNotBlank(secId)) {
                    // 一键获客的日志增加批次号
                    jobExecLog.setSecId(secId);
                }
                jobExecLog.setStartTime(new Date());
                jobExecLogMapper.insert(jobExecLog);
                // 任务执行流水
                taskLog.setTaskId(task.getTaskId());
                taskLog.setDemandId(task.getRequireId());
                taskLog.setCreateTime(new Date());
                taskLog.setUpdateTime(new Date());
                taskLog.setJobid(task.getJobUniqueNo());
                taskLog.setTenantId(task.getTenantId());
                if (task.getPyScript().contains(Constant.search_prefix)) {
                    taskLog.setTaskType(1);
                } else if (task.getPyScript().contains(Constant.comment_prefix)) {
                    taskLog.setTaskType(2);
                } else if (task.getPyScript().contains(Constant.profile_prefix)) {
                    taskLog.setTaskType(3);
                }
                taskLog.setTaskStatus(1);
                taskLog.setUuid(uuid);
                if (StringUtils.isNotBlank(secId)) {
                    taskLog.setSecId(secId);
                }
                taskLogService.save(taskLog);
                JobExecLog execLog = null;
                do {
                    log.info("等待执行结果完成。。。。。。。");
                    execLog = jobExecLogMapper.selectOne(new LambdaQueryWrapper<JobExecLog>().eq(JobExecLog::getUuid, uuid).last("limit 1"));
                } while (null == execLog || null == execLog.getFinishTime());
                log.info("执行完成,继续执行下一个操作。。。。");
                int status = updateTaskDoneStatus(task.getPyScript(), task.getRequireId());
                boolean update = demandService.update(null, new LambdaUpdateWrapper<Demand>().set(Demand::getStatus, status).eq(Demand::getId, task.getRequireId()));
                log.info("更新 ==> {} {}", update, status);
            }
            return uuid;
        }
@@ -356,15 +352,19 @@
                String secId = DateUtil.format(new Date(), "yyyyMMddHHmmssSSS");
                // 如果存在待完善的粉丝数据,就先完善,直到没有可完善的粉丝数据,反之才进行数据采集
                int count = baseMapper.countUpdateProfile(demand.getId());
                List<String> uuidList = new ArrayList<>();
                for (int i = 0; i < list.size(); i++) {
                    Task task = list.get(i);
                    if (count == 0) {
                        // 如果没有待完善的数据,直接开始请求流程
                        execPyTask(task, secId);
                        // 将多个脚本放到一个执行链条里面
                        String uuid = execPyTask(task, secId, i);
                        uuidList.add(uuid);
                    } else {
                        // 如果有待完善的粉丝数据,直接进行完善,直到没有可以完善的了
                        if (task.getPyScript().contains(Constant.profile_prefix)) {
                            String s = execPyTask(task, secId);
                            String s = execPyTask(task, secId, 0);
                            uuidList.add(s);
                            log.info("uuid ====> {}",s);
                            count = baseMapper.countUpdateProfile(demand.getId());
                            log.info("执行结束后待完善的粉丝数据:{}", count);
@@ -408,6 +408,7 @@
                        }
                    }
                }
                redisUtil.set(Constant.DEMAND_TASK_NOTICE_KEY, uuidList.toString());
                log.info("一键启动后返回的批次号: {}",secId);
                return secId;
            }
@@ -507,13 +508,56 @@
                            for (int i = 0; i < taskList.size(); i++) {
                                Task task = taskList.get(i);
                                if (task.getPyScript().contains(Constant.comment_prefix) || task.getPyScript().contains(Constant.search_prefix)) {
                                    execPyTask(task, null);
                                    String uuid = execPyTask(task, null, 0);
                                }
                            }
                        }
                    }
                } finally {
                    TenantContext.clear();
                }
            }
        }
    }
    @Override
    public void callback(String uuid, String secId) {
        // 回调,当次任务执行完成,更新状态
        JobExecLog execLog = jobExecLogMapper.selectOne(new LambdaQueryWrapper<JobExecLog>().eq(JobExecLog::getUuid, uuid));
        if (null != execLog) {
            int status = updateTaskDoneStatus(execLog.getSpiderScript(), null);
            Task task = this.getOne(new LambdaQueryWrapper<Task>().eq(Task::getTaskId, execLog.getTaskId()));
            boolean update = demandService.update(null, new LambdaUpdateWrapper<Demand>().set(Demand::getStatus, status).eq(Demand::getId, task.getRequireId()));
            log.info("callback ===》{}", update);
        }
        Object o = redisUtil.get(Constant.DEMAND_TASK_NOTICE_KEY);
        if (null != o) {
            String string = o.toString();
            JSONArray uuids = JSONUtil.parseArray(string);
            String nextUuid = null;
            if (uuids.size() > 1) {
                for (int i = 0; i < uuids.size(); i++) {
                    String id = uuids.getStr(i);
                    if (uuid.equals(id)) {
                        if (i == uuids.size() - 1) {
                            log.info("最后一个任务,没有下一个要执行的了");
                        } else {
                            nextUuid = uuids.getStr(i + 1);
                            break;
                        }
                    }
                }
            }
            if (StringUtils.isNotBlank(nextUuid)) {
                JobExecLog jobExecLog = jobExecLogMapper.selectOne(new LambdaQueryWrapper<JobExecLog>().eq(JobExecLog::getUuid, nextUuid).last("limit 1"));
                if (null != jobExecLog) {
                    Task task = this.getOne(new LambdaQueryWrapper<Task>().eq(Task::getTaskId, jobExecLog.getTaskId()));
                    if (null != task) {
                        Task selectOne = baseMapper.selectOne(new LambdaQueryWrapper<Task>().eq(Task::getRequireId, task.getRequireId()).in(Task::getPyScript, Constant.douyin_search_spider, Constant.kuaishou_search_spider, Constant.xhs_search_spider).last("limit 1"));
                        String s = execPyJob(task, uuid, selectOne, secId);
                        CommonRes bean = JSONUtil.toBean(s, CommonRes.class);
                        log.info("回调接口:任务启动成功: {}", bean.getJobid());
                    }
                }
            }
        }
@@ -684,6 +728,7 @@
        body.put("project", "aijuke_spider");
        body.put("spider", task.getPyScript());
        body.put("jobid", task.getJobName());
        body.put("job_unique_no", task.getJobUniqueNo());
        body.put("task_id", selectOne.getTaskId());
        body.put("task_name", selectOne.getName());
        body.put("tenant_id", selectOne.getTenantId());