leo.li
2024-09-09 0e800d82ae7b8686ea2c70011a17b0762ccecb06
完善更新粉丝数据
3 files modified
85 ■■■■■ changed files
src/main/java/com/jw/ai/config/Constant.java 1 ●●●● patch | view | raw | blame | history
src/main/java/com/jw/ai/listener/RedisKeyExpirationListener.java 32 ●●●●● patch | view | raw | blame | history
src/main/java/com/jw/ai/service/impl/TaskServiceImpl.java 52 ●●●●● patch | view | raw | blame | history
src/main/java/com/jw/ai/config/Constant.java
@@ -64,6 +64,7 @@
    public static final String DEMAND_ANALYSIS = "DEMAND_ANALYSIS:%s:%s";
    public static final String DEMAND_UPDATE_USER_PROFILE = "DEMAND_UPDATE_USER_PROFILE:%s:%s";
    public static final String DEMAND_UPDATE_USER_PROFILE_KEY = "DEMAND_UPDATE_USER_PROFILE";
    public static final String DEMAND_UPDATE_USER_PROFILE_LISTEN = "DEMAND_UPDATE_USER_PROFILE_LISTEN:%s:%s";
    public static final String DEMAND_JOB_NAME = "%s_%s_%s";
src/main/java/com/jw/ai/listener/RedisKeyExpirationListener.java
@@ -1,12 +1,16 @@
package com.jw.ai.listener;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.jw.ai.config.Constant;
import com.jw.ai.entity.Demand;
import com.jw.ai.entity.DemandOneClickLog;
import com.jw.ai.entity.JobExecLog;
import com.jw.ai.enums.DemandStatusEnum;
import com.jw.ai.mapper.DemandMapper;
import com.jw.ai.mapper.JobExecLogMapper;
import com.jw.ai.service.IDemandOneClickLogService;
import com.jw.ai.service.ITaskService;
import com.jw.ai.util.RedisUtil;
import com.jw.ai.util.TenantContext;
import lombok.extern.slf4j.Slf4j;
@@ -26,6 +30,14 @@
    @Autowired
    private IDemandOneClickLogService demandOneClickLogService;
    @Autowired
    private JobExecLogMapper jobExecLogMapper;
    @Autowired
    private RedisUtil redisUtil;
    @Autowired
    private ITaskService taskService;
    public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
@@ -82,6 +94,26 @@
                    }
                }
            }
            // 监控完善执行结果
            if (expireKey.contains(Constant.DEMAND_UPDATE_USER_PROFILE_LISTEN)) {
                String[] split = expireKey.split(":");
                if (split.length > 2) {
                    Long demandId = Long.valueOf(split[1]);
                    String uuid = split[2];
                    log.info(" demand uuid {}, {}", demandId, uuid);
                    Demand demand = demandMapper.selectById(demandId);
                    if (null != demand) {
                        JobExecLog jobExecLog = jobExecLogMapper.selectOne(new LambdaQueryWrapper<JobExecLog>().eq(JobExecLog::getUuid, uuid).last("limit 1"));
                        if (null != jobExecLog && null != jobExecLog.getFinishTime()) {
                            log.info("执行下一次完善信息");
                            taskService.hotStart(demandId);
                        } else {
                            String listen = String.format(Constant.DEMAND_UPDATE_USER_PROFILE_LISTEN, demandId, uuid);
                            redisUtil.set(listen, uuid, 60);
                        }
                    }
                }
            }
        }
    }
src/main/java/com/jw/ai/service/impl/TaskServiceImpl.java
@@ -40,6 +40,8 @@
import javafx.scene.Group;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.tomcat.util.bcel.Const;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@@ -152,14 +154,14 @@
     * 执行py任务
     * @param task
     */
    private void execPyTask(Task task,String secId) {
    private String execPyTask(Task task,String secId) {
        if (Constant.PY.equals(task.getExecPlatform())) {
            Task selectOne = baseMapper.selectOne(new LambdaQueryWrapper<Task>().eq(Task::getRequireId, task.getRequireId()).in(Task::getPyScript, Constant.douyin_search_spider,
                    Constant.kuaishou_search_spider,
                    Constant.xhs_search_spider).last("limit 1"));
            if (null == selectOne) {
                log.info("demand {} none collect data  return", task.getRequireName());
                return;
                return null;
            }
            // python任务需要调用api执行操作,并修改需求的状态
            String uuid = IdUtil.simpleUUID();
@@ -209,7 +211,9 @@
                }
                taskLogService.save(taskLog);
            }
            return uuid;
        }
        return null;
    }
@@ -345,11 +349,51 @@
                for (int i = 0; i < list.size(); i++) {
                    Task task = list.get(i);
                    if (count == 0) {
                        // 如果没有待完善的数据,直接开始请求流程
                        execPyTask(task, secId);
                    } else {
                        // 如果有待完善的粉丝数据,直接进行完善,直到没有可以完善的了
                        if (task.getPyScript().contains(Constant.profile_prefix)) {
                            execPyTask(task, secId);
                            break;
                            String s = execPyTask(task, secId);
                            log.info("uuid ====> {}",s);
                            count = baseMapper.countUpdateProfile(demand.getId());
                            log.info("执行结束后待完善的粉丝数据:{}", count);
                            if (count == 0) {
                                break;
                            } else {
                                // 缓存中存在多少次执行记录了
                                String key = String.format(Constant.DEMAND_UPDATE_USER_PROFILE, demandId, "user_profile_spider");
                                Object o = redisUtil.get(key);
                                int num =0;
                                int cycle=0;
                                if (null != o) {
                                    String str = o.toString();
                                    String[] split = str.split(":");
                                    num = Integer.valueOf(split[0]);
                                    cycle = Integer.valueOf(split[1]);
                                    if (cycle == 10 && count == num) {
                                        // 无可用cookie,直接跳出
                                        log.info("无可用cookie,直接跳出");
                                        return null;
                                    } else {
                                        cycle += 1;
                                        log.info("继续执行,当前执行的次数 {}",cycle);
                                        redisUtil.set(key, count + ":" + cycle, 2 * 60 * 60);
                                    }
                                } else {
                                    cycle += 1;
                                    redisUtil.set(key, count + ":" + cycle, 2 * 60 * 60);
                                }
                                JobExecLog jobExecLog = jobExecLogMapper.selectOne(new LambdaQueryWrapper<JobExecLog>().eq(JobExecLog::getUuid, s).last("limit 1"));
                                // 如果返回结果了
                                if (null != jobExecLog && null != jobExecLog.getFinishTime()) {
                                    hotStart(demandId);
                                } else {
                                    // 将剩余要执行的次数放在redis里面去监听
                                    String listen = String.format(Constant.DEMAND_UPDATE_USER_PROFILE_LISTEN, demandId, s);
                                    redisUtil.set(listen, s, 60);
                                }
                            }
                        }
                    }
                }