src/main/java/com/jw/ai/config/Constant.java
@@ -64,6 +64,7 @@ public static final String DEMAND_ANALYSIS = "DEMAND_ANALYSIS:%s:%s"; public static final String DEMAND_UPDATE_USER_PROFILE = "DEMAND_UPDATE_USER_PROFILE:%s:%s"; public static final String DEMAND_UPDATE_USER_PROFILE_KEY = "DEMAND_UPDATE_USER_PROFILE"; public static final String DEMAND_UPDATE_USER_PROFILE_LISTEN = "DEMAND_UPDATE_USER_PROFILE_LISTEN:%s:%s"; public static final String DEMAND_JOB_NAME = "%s_%s_%s"; src/main/java/com/jw/ai/listener/RedisKeyExpirationListener.java
@@ -1,12 +1,16 @@ package com.jw.ai.listener; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.jw.ai.config.Constant; import com.jw.ai.entity.Demand; import com.jw.ai.entity.DemandOneClickLog; import com.jw.ai.entity.JobExecLog; import com.jw.ai.enums.DemandStatusEnum; import com.jw.ai.mapper.DemandMapper; import com.jw.ai.mapper.JobExecLogMapper; import com.jw.ai.service.IDemandOneClickLogService; import com.jw.ai.service.ITaskService; import com.jw.ai.util.RedisUtil; import com.jw.ai.util.TenantContext; import lombok.extern.slf4j.Slf4j; @@ -26,6 +30,14 @@ @Autowired private IDemandOneClickLogService demandOneClickLogService; @Autowired private JobExecLogMapper jobExecLogMapper; @Autowired private RedisUtil redisUtil; @Autowired private ITaskService taskService; public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); @@ -82,6 +94,26 @@ } } } // 监控完善执行结果 if (expireKey.contains(Constant.DEMAND_UPDATE_USER_PROFILE_LISTEN)) { String[] split = expireKey.split(":"); if (split.length > 2) { Long demandId = Long.valueOf(split[1]); String uuid = split[2]; log.info(" demand uuid {}, {}", demandId, uuid); Demand demand = demandMapper.selectById(demandId); if (null != demand) { JobExecLog jobExecLog = jobExecLogMapper.selectOne(new LambdaQueryWrapper<JobExecLog>().eq(JobExecLog::getUuid, uuid).last("limit 1")); if (null != jobExecLog && null != jobExecLog.getFinishTime()) { log.info("执行下一次完善信息"); taskService.hotStart(demandId); } else { String listen = String.format(Constant.DEMAND_UPDATE_USER_PROFILE_LISTEN, demandId, uuid); redisUtil.set(listen, uuid, 60); } } } } } } src/main/java/com/jw/ai/service/impl/TaskServiceImpl.java
@@ -40,6 +40,8 @@ import javafx.scene.Group; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.common.protocol.types.Field; import org.apache.tomcat.util.bcel.Const; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -152,14 +154,14 @@ * 执行py任务 * @param task */ private void execPyTask(Task task,String secId) { private String execPyTask(Task task,String secId) { if (Constant.PY.equals(task.getExecPlatform())) { Task selectOne = baseMapper.selectOne(new LambdaQueryWrapper<Task>().eq(Task::getRequireId, task.getRequireId()).in(Task::getPyScript, Constant.douyin_search_spider, Constant.kuaishou_search_spider, Constant.xhs_search_spider).last("limit 1")); if (null == selectOne) { log.info("demand {} none collect data return", task.getRequireName()); return; return null; } // python任务需要调用api执行操作,并修改需求的状态 String uuid = IdUtil.simpleUUID(); @@ -209,7 +211,9 @@ } taskLogService.save(taskLog); } return uuid; } return null; } @@ -345,11 +349,51 @@ for (int i = 0; i < list.size(); i++) { Task task = list.get(i); if (count == 0) { // 如果没有待完善的数据,直接开始请求流程 execPyTask(task, secId); } else { // 如果有待完善的粉丝数据,直接进行完善,直到没有可以完善的了 if (task.getPyScript().contains(Constant.profile_prefix)) { execPyTask(task, secId); break; String s = execPyTask(task, secId); log.info("uuid ====> {}",s); count = baseMapper.countUpdateProfile(demand.getId()); log.info("执行结束后待完善的粉丝数据:{}", count); if (count == 0) { break; } else { // 缓存中存在多少次执行记录了 String key = String.format(Constant.DEMAND_UPDATE_USER_PROFILE, demandId, "user_profile_spider"); Object o = redisUtil.get(key); int num =0; int cycle=0; if (null != o) { String str = o.toString(); String[] split = str.split(":"); num = Integer.valueOf(split[0]); cycle = Integer.valueOf(split[1]); if (cycle == 10 && count == num) { // 无可用cookie,直接跳出 log.info("无可用cookie,直接跳出"); return null; } else { cycle += 1; log.info("继续执行,当前执行的次数 {}",cycle); redisUtil.set(key, count + ":" + cycle, 2 * 60 * 60); } } else { cycle += 1; redisUtil.set(key, count + ":" + cycle, 2 * 60 * 60); } JobExecLog jobExecLog = jobExecLogMapper.selectOne(new LambdaQueryWrapper<JobExecLog>().eq(JobExecLog::getUuid, s).last("limit 1")); // 如果返回结果了 if (null != jobExecLog && null != jobExecLog.getFinishTime()) { hotStart(demandId); } else { // 将剩余要执行的次数放在redis里面去监听 String listen = String.format(Constant.DEMAND_UPDATE_USER_PROFILE_LISTEN, demandId, s); redisUtil.set(listen, s, 60); } } } } }