| | |
| | | import com.jw.ai.vo.*; |
| | | import javafx.scene.Group; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import nonapi.io.github.classgraph.json.JSONUtils; |
| | | import org.apache.commons.lang3.StringUtils; |
| | | import org.apache.kafka.common.protocol.types.Field; |
| | | import org.apache.tomcat.util.bcel.Const; |
| | |
| | | int row = baseMapper.update(null, new LambdaUpdateWrapper<Task>().set(Task::getStatus, 0).set(Task::getGmtModified, new Date()).eq(Task::getTaskId, id)); |
| | | log.info("更新任务状态结果 {}", row); |
| | | } |
| | | execPyTask(task, null); |
| | | String uuid = execPyTask(task, null, 0); |
| | | |
| | | } |
| | | } |
| | | |
| | |
| | | * 执行py任务 |
| | | * @param task |
| | | */ |
| | | private String execPyTask(Task task,String secId) { |
| | | private String execPyTask(Task task, String secId, int num) { |
| | | if (Constant.PY.equals(task.getExecPlatform())) { |
| | | Task selectOne = baseMapper.selectOne(new LambdaQueryWrapper<Task>().eq(Task::getRequireId, task.getRequireId()).in(Task::getPyScript, Constant.douyin_search_spider, |
| | | Constant.kuaishou_search_spider, |
| | | Constant.xhs_search_spider).last("limit 1")); |
| | | Task selectOne = baseMapper.selectOne(new LambdaQueryWrapper<Task>() |
| | | .eq(Task::getRequireId, task.getRequireId()) |
| | | .in(Task::getPyScript, Constant.douyin_search_spider, Constant.kuaishou_search_spider, Constant.xhs_search_spider) |
| | | .last("limit 1")); |
| | | if (null == selectOne) { |
| | | log.info("demand {} none collect data return", task.getRequireName()); |
| | | return null; |
| | | } |
| | | // python任务需要调用api执行操作,并修改需求的状态 |
| | | String uuid = IdUtil.simpleUUID(); |
| | | String s = execPyJob(task, uuid, selectOne, secId); |
| | | CommonRes bean = JSONUtil.toBean(s, CommonRes.class); |
| | | if (null != bean && Constant.OK.equals(bean.getStatus())) { |
| | | // 设置任务执行步骤 |
| | | TaskLog taskLog = new TaskLog(); |
| | | int step = stepSet(task, taskLog); |
| | | boolean row = demandService.update(null, new LambdaUpdateWrapper<Demand>().set(Demand::getStatus, step).set(Demand::getUpdateTime, new Date()).eq(Demand::getId, task.getRequireId())); |
| | | log.info("更新状态:{}", row); |
| | | // 保存任务执行记录 |
| | | JobExecLog jobExecLog = new JobExecLog(); |
| | | jobExecLog.setTaskId(task.getTaskId()); |
| | | jobExecLog.setProject("aijuke_spider"); |
| | | jobExecLog.setSpiderScript(task.getPyScript()); |
| | | jobExecLog.setName(task.getJobName()); |
| | | jobExecLog.setJobid(task.getJobUniqueNo()); |
| | | jobExecLog.setStatus(Constant.YES); |
| | | jobExecLog.setTenantId(task.getTenantId()); |
| | | jobExecLog.setUuid(uuid); |
| | | if (StringUtils.isNotBlank(secId)) { |
| | | // 一键获客的日志增加批次号 |
| | | jobExecLog.setSecId(secId); |
| | | } |
| | | jobExecLog.setStartTime(new Date()); |
| | | jobExecLogMapper.insert(jobExecLog); |
| | | log.info("---------->{}", jobExecLog.getId()); |
| | | |
| | | // 任务执行流水 |
| | | taskLog.setTaskId(task.getTaskId()); |
| | | taskLog.setDemandId(task.getRequireId()); |
| | | taskLog.setCreateTime(new Date()); |
| | | taskLog.setUpdateTime(new Date()); |
| | | taskLog.setJobid(task.getJobUniqueNo()); |
| | | taskLog.setTenantId(task.getTenantId()); |
| | | if (task.getPyScript().contains(Constant.search_prefix)) { |
| | | taskLog.setTaskType(1); |
| | | } else if (task.getPyScript().contains(Constant.comment_prefix)) { |
| | | taskLog.setTaskType(2); |
| | | } else if (task.getPyScript().contains(Constant.profile_prefix)) { |
| | | taskLog.setTaskType(3); |
| | | } |
| | | taskLog.setTaskStatus(1); |
| | | taskLog.setUuid(uuid); |
| | | if (StringUtils.isNotBlank(secId)) { |
| | | taskLog.setSecId(secId); |
| | | } |
| | | taskLogService.save(taskLog); |
| | | //开始执行任务,执行首次,后面的通过回调执行 |
| | | if (num == 0) { |
| | | String s = execPyJob(task, uuid, selectOne, secId); |
| | | CommonRes bean = JSONUtil.toBean(s, CommonRes.class); |
| | | log.info("任务启动成功: {}", bean.getJobid()); |
| | | // 设置任务执行步骤 |
| | | TaskLog taskLog = new TaskLog(); |
| | | int step = stepSet(task,taskLog); |
| | | demandService.update(null, new LambdaUpdateWrapper<Demand>().set(Demand::getStatus, step).set(Demand::getUpdateTime, new Date()).eq(Demand::getId, task.getRequireId())); |
| | | // 保存任务执行记录 |
| | | JobExecLog jobExecLog = new JobExecLog(); |
| | | jobExecLog.setTaskId(task.getTaskId()); |
| | | jobExecLog.setProject("aijuke_spider"); |
| | | jobExecLog.setSpiderScript(task.getPyScript()); |
| | | jobExecLog.setName(task.getJobName()); |
| | | jobExecLog.setJobid(task.getJobUniqueNo()); |
| | | jobExecLog.setStatus(Constant.YES); |
| | | jobExecLog.setTenantId(task.getTenantId()); |
| | | jobExecLog.setUuid(uuid); |
| | | if (StringUtils.isNotBlank(secId)) { |
| | | // 一键获客的日志增加批次号 |
| | | jobExecLog.setSecId(secId); |
| | | } |
| | | jobExecLog.setStartTime(new Date()); |
| | | jobExecLogMapper.insert(jobExecLog); |
| | | |
| | | // 任务执行流水 |
| | | taskLog.setTaskId(task.getTaskId()); |
| | | taskLog.setDemandId(task.getRequireId()); |
| | | taskLog.setCreateTime(new Date()); |
| | | taskLog.setUpdateTime(new Date()); |
| | | taskLog.setJobid(task.getJobUniqueNo()); |
| | | taskLog.setTenantId(task.getTenantId()); |
| | | if (task.getPyScript().contains(Constant.search_prefix)) { |
| | | taskLog.setTaskType(1); |
| | | } else if (task.getPyScript().contains(Constant.comment_prefix)) { |
| | | taskLog.setTaskType(2); |
| | | } else if (task.getPyScript().contains(Constant.profile_prefix)) { |
| | | taskLog.setTaskType(3); |
| | | } |
| | | taskLog.setTaskStatus(1); |
| | | taskLog.setUuid(uuid); |
| | | if (StringUtils.isNotBlank(secId)) { |
| | | taskLog.setSecId(secId); |
| | | } |
| | | taskLogService.save(taskLog); |
| | | JobExecLog execLog = null; |
| | | do { |
| | | log.info("等待执行结果完成。。。。。。。"); |
| | | execLog = jobExecLogMapper.selectOne(new LambdaQueryWrapper<JobExecLog>().eq(JobExecLog::getUuid, uuid).last("limit 1")); |
| | | } while (null == execLog || null == execLog.getFinishTime()); |
| | | log.info("执行完成,继续执行下一个操作。。。。"); |
| | | |
| | | int status = updateTaskDoneStatus(task.getPyScript(), task.getRequireId()); |
| | | boolean update = demandService.update(null, new LambdaUpdateWrapper<Demand>().set(Demand::getStatus, status).eq(Demand::getId, task.getRequireId())); |
| | | log.info("更新 ==> {} {}", update, status); |
| | | } |
| | | return uuid; |
| | | } |
| | |
| | | String secId = DateUtil.format(new Date(), "yyyyMMddHHmmssSSS"); |
| | | // 如果存在待完善的粉丝数据,就先完善,直到没有可完善的粉丝数据,反之才进行数据采集 |
| | | int count = baseMapper.countUpdateProfile(demand.getId()); |
| | | List<String> uuidList = new ArrayList<>(); |
| | | for (int i = 0; i < list.size(); i++) { |
| | | Task task = list.get(i); |
| | | if (count == 0) { |
| | | // 如果没有待完善的数据,直接开始请求流程 |
| | | execPyTask(task, secId); |
| | | // 将多个脚本放到一个执行链条里面 |
| | | String uuid = execPyTask(task, secId, i); |
| | | uuidList.add(uuid); |
| | | } else { |
| | | // 如果有待完善的粉丝数据,直接进行完善,直到没有可以完善的了 |
| | | if (task.getPyScript().contains(Constant.profile_prefix)) { |
| | | String s = execPyTask(task, secId); |
| | | String s = execPyTask(task, secId, 0); |
| | | uuidList.add(s); |
| | | log.info("uuid ====> {}",s); |
| | | count = baseMapper.countUpdateProfile(demand.getId()); |
| | | log.info("执行结束后待完善的粉丝数据:{}", count); |
| | |
| | | } |
| | | } |
| | | } |
| | | redisUtil.set(Constant.DEMAND_TASK_NOTICE_KEY, uuidList.toString()); |
| | | log.info("一键启动后返回的批次号: {}",secId); |
| | | return secId; |
| | | } |
| | |
| | | for (int i = 0; i < taskList.size(); i++) { |
| | | Task task = taskList.get(i); |
| | | if (task.getPyScript().contains(Constant.comment_prefix) || task.getPyScript().contains(Constant.search_prefix)) { |
| | | execPyTask(task, null); |
| | | String uuid = execPyTask(task, null, 0); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } finally { |
| | | TenantContext.clear(); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void callback(String uuid, String secId) { |
| | | // 回调,当次任务执行完成,更新状态 |
| | | JobExecLog execLog = jobExecLogMapper.selectOne(new LambdaQueryWrapper<JobExecLog>().eq(JobExecLog::getUuid, uuid)); |
| | | if (null != execLog) { |
| | | int status = updateTaskDoneStatus(execLog.getSpiderScript(), null); |
| | | Task task = this.getOne(new LambdaQueryWrapper<Task>().eq(Task::getTaskId, execLog.getTaskId())); |
| | | boolean update = demandService.update(null, new LambdaUpdateWrapper<Demand>().set(Demand::getStatus, status).eq(Demand::getId, task.getRequireId())); |
| | | log.info("callback ===》{}", update); |
| | | } |
| | | Object o = redisUtil.get(Constant.DEMAND_TASK_NOTICE_KEY); |
| | | if (null != o) { |
| | | String string = o.toString(); |
| | | JSONArray uuids = JSONUtil.parseArray(string); |
| | | String nextUuid = null; |
| | | if (uuids.size() > 1) { |
| | | for (int i = 0; i < uuids.size(); i++) { |
| | | String id = uuids.getStr(i); |
| | | if (uuid.equals(id)) { |
| | | if (i == uuids.size() - 1) { |
| | | log.info("最后一个任务,没有下一个要执行的了"); |
| | | } else { |
| | | nextUuid = uuids.getStr(i + 1); |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | } |
| | | if (StringUtils.isNotBlank(nextUuid)) { |
| | | JobExecLog jobExecLog = jobExecLogMapper.selectOne(new LambdaQueryWrapper<JobExecLog>().eq(JobExecLog::getUuid, nextUuid).last("limit 1")); |
| | | if (null != jobExecLog) { |
| | | Task task = this.getOne(new LambdaQueryWrapper<Task>().eq(Task::getTaskId, jobExecLog.getTaskId())); |
| | | if (null != task) { |
| | | Task selectOne = baseMapper.selectOne(new LambdaQueryWrapper<Task>().eq(Task::getRequireId, task.getRequireId()).in(Task::getPyScript, Constant.douyin_search_spider, Constant.kuaishou_search_spider, Constant.xhs_search_spider).last("limit 1")); |
| | | String s = execPyJob(task, uuid, selectOne, secId); |
| | | CommonRes bean = JSONUtil.toBean(s, CommonRes.class); |
| | | log.info("回调接口:任务启动成功: {}", bean.getJobid()); |
| | | } |
| | | } |
| | | } |
| | | } |
| | |
| | | body.put("project", "aijuke_spider"); |
| | | body.put("spider", task.getPyScript()); |
| | | body.put("jobid", task.getJobName()); |
| | | body.put("job_unique_no", task.getJobUniqueNo()); |
| | | body.put("task_id", selectOne.getTaskId()); |
| | | body.put("task_name", selectOne.getName()); |
| | | body.put("tenant_id", selectOne.getTenantId()); |