| | |
| | | |
| | | /** |
| | | * 执行py任务, 返回执行uuid |
| | | * |
| | | * @param task |
| | | */ |
| | | private String execPyTask(Task task, String secId, int num) { |
| | |
| | | |
| | | /** |
| | | * 一键获客任务 |
| | | * |
| | | * @param demandId |
| | | * @return |
| | | */ |
| | | @Override |
| | | public String hotStart(Long demandId) { |
| | | // 缓存当前demand上一次执行时间 |
| | | String keyInterval = String.format(Constant.DEMAND_JOB_INTERVAL_KEY, demandId.toString()); |
| | | log.info("==========>hotStart, 当前需求ID:{} ", demandId); |
| | | try { |
| | | Object interval = redisUtil.get(keyInterval); |
| | | if (interval != null) { |
| | | Long lastJobTime = Long.valueOf(interval.toString()); |
| | | log.info("==========>上一次执行时间redis缓存key:{}, 缓存时间(long)Time:{}", keyInterval, lastJobTime); |
| | | if (DateTime.now().getTime() < lastJobTime + 3600000) { |
| | | log.info("==========>hotStart, 1小时内只执行一次同一需求ID: {} 下的任务", demandId); |
| | | return "系统当前限制同一需求1小时内只执行一次,请稍后再试!"; // 1小时内执行一次 |
| | | } |
| | | }else{ |
| | | log.info("==========>redis缓存key:{} 为null", keyInterval); |
| | | } |
| | | } catch (Exception e) { |
| | | log.info("==========>hotStart, Exception {}", e.toString()); |
| | | } |
| | | |
| | | Long currentTenantId = TenantContext.getCurrentTenantId(); |
| | | DemandOneClickLog saveOrUpdate = new DemandOneClickLog(); |
| | | saveOrUpdate.setDemandId(demandId); |
| | |
| | | String secId = DateUtil.format(new Date(), "yyyyMMddHHmmssSSS"); |
| | | // 如果存在待完善的粉丝数据,就先完善,直到没有可完善的粉丝数据,反之才进行数据采集 |
| | | int count = baseMapper.countUpdateProfile(demand.getId()); |
| | | log.info("==========>hotStart, 需求:{} ,待完善的粉丝数: {} ",demand.getId() + ", "+ demand.getName(), count); |
| | | log.info("==========>需求Id:{} 的待完善粉丝数: {} ", demand.getId() + ", " + demand.getName(), count); |
| | | List<String> uuidList = new ArrayList<>(); |
| | | // 遍历当前demand下的所有任务 |
| | | for (int i = 0; i < list.size(); i++) { |
| | | Task task = list.get(i); |
| | | if (count == 0) { |
| | |
| | | // 将多个脚本放到一个执行链条里面 |
| | | String uuid = execPyTask(task, secId, i); |
| | | uuidList.add(uuid); |
| | | log.info("==========>hotStart, execPyTask ====> {}", task.getTaskId() + " == "+ task.getPyScript()); |
| | | String nowStr = Long.toString(DateTime.now().getTime()); |
| | | redisUtil.set(keyInterval, nowStr); // 缓存执行时间,防止连续调用py接口 |
| | | log.info("==========>hotStart, execPyTask 执行当前需求Id:{} 下任务TaskId: {}", demandId, task.getTaskId() + " == " + task.getPyScript() + "== 执行时间now time:" + nowStr); |
| | | } else { |
| | | // 如果有待完善的粉丝数据,直接进行完善,直到没有可以完善的了 |
| | | if (task.getPyScript().contains(Constant.profile_prefix)) { |
| | |
| | | // 缓存中存在多少次执行记录了 |
| | | String key = String.format(Constant.DEMAND_UPDATE_USER_PROFILE, demandId, "user_profile_spider"); |
| | | Object o = redisUtil.get(key); |
| | | int num =0; |
| | | int cycle=0; |
| | | int num = 0; |
| | | int cycle = 0; |
| | | if (null != o) { |
| | | String str = o.toString(); |
| | | String[] split = str.split(":"); |
| | |
| | | return null; |
| | | } else { |
| | | cycle += 1; |
| | | log.info("==========>hotStart,继续执行,当前执行的次数 {}",cycle); |
| | | log.info("==========>hotStart,继续执行,当前执行的次数 {}", cycle); |
| | | redisUtil.set(key, count + ":" + cycle, 2 * 60 * 60); |
| | | } |
| | | } else { |
| | |
| | | } |
| | | String key = String.format(Constant.DEMAND_TASK_NOTICE_KEY, demandId); |
| | | redisUtil.set(key, uuidList.toString()); |
| | | log.info("一键启动后返回的批次号: {}",secId); |
| | | log.info("一键启动后返回的批次号: {}", secId); |
| | | return secId; |
| | | } |
| | | } |
| | |
| | | |
| | | /** |
| | | * 一键获客定时任务 |
| | | * |
| | | * @param tenantId |
| | | */ |
| | | @Override |
| | | public void hotStartJob(Long tenantId) { |
| | | log.info("==========> hotStartJob"); |
| | | List<DemandOneClickLog> list = demandOneClickLogService.list(new LambdaQueryWrapper<DemandOneClickLog>().eq(DemandOneClickLog::getIsDeleted,0)); |
| | | List<DemandOneClickLog> list = demandOneClickLogService.list(new LambdaQueryWrapper<DemandOneClickLog>().eq(DemandOneClickLog::getIsDeleted, 0)); |
| | | if (CollectionUtil.isNotEmpty(list)) { |
| | | // 检查平台待私信粉丝数 是否大于 账号数*平台私信上限的2倍 |
| | | // 如果没有达到,就执行 |
| | |
| | | log.info("==========>租户ID:{}, 需求ID:{},", tenantId, demandId); |
| | | // 平台粉丝数 |
| | | List<PlatformFansData> platformFansData = dyCommentsGotService.platformData(); |
| | | if(CollectionUtil.isEmpty(platformFansData)) continue; |
| | | if (CollectionUtil.isEmpty(platformFansData)) continue; |
| | | // 策略判断是否执行数据采集和完善任务 |
| | | boolean b = checkExecStrategy(obj, platformFansData); |
| | | // 1、如果可私信的粉丝数达标了,不在执行定时任务 |
| | |
| | | } |
| | | } else { |
| | | // 2.2 没有可完善的粉丝数据时,执行数据采集和评论采集 |
| | | log.info("==========>没有可完善的粉丝数据,开始执行数据采集和评论采集, 租户ID:{}, 需求ID:{} ",tenantId, demandId); |
| | | log.info("==========>没有可完善的粉丝数据,开始执行数据采集和评论采集, 租户ID:{}, 需求ID:{} ", tenantId, demandId); |
| | | List<Task> taskList = this.list(new LambdaQueryWrapper<Task>().eq(Task::getRequireId, demandId)); |
| | | if (!CollectionUtil.isEmpty(taskList)) { |
| | | for (int i = 0; i < taskList.size(); i++) { |
| | |
| | | } |
| | | } |
| | | } |
| | | }catch (Exception e){ |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | log.error("==========>DemandOneClickLog error:{}", e.toString()); |
| | | } |
| | |
| | | |
| | | /** |
| | | * 执行抖音大V账号信息同步run spider |
| | | * |
| | | * @return |
| | | */ |
| | | @Override |
| | |
| | | // 4.如果任务执行完成后待完善粉丝数量 > 0 需要重复执行完善脚本 |
| | | if (!CollectionUtil.isEmpty(demandUserProfile)) { |
| | | Set<Map.Entry<Long, String>> entrySet = demandUserProfile.entrySet(); |
| | | log.info("当前执行的需求数 {}",entrySet.size()); |
| | | log.info("当前执行的需求数 {}", entrySet.size()); |
| | | for (Map.Entry<Long, String> obj : entrySet) { |
| | | Long demandId = obj.getKey(); |
| | | log.info("当前执行的需求 {}",demandId); |
| | | log.info("当前执行的需求 {}", demandId); |
| | | String script = obj.getValue(); |
| | | int count = baseMapper.countUpdateProfile(demandId); |
| | | String key = String.format(Constant.DEMAND_UPDATE_USER_PROFILE, demandId, script); |
| | |
| | | |
| | | /** |
| | | * 调用py接口 |
| | | * |
| | | * @param task |
| | | * @return |
| | | */ |
| | |
| | | |
| | | /** |
| | | * 获取任务执行策略 |
| | | * |
| | | * @param obj |
| | | * @param platformFansData |
| | | * @return |
| | |
| | | |
| | | /** |
| | | * 任务执行步骤 |
| | | * |
| | | * @param task |
| | | * @param taskLog |
| | | * @return |
| | |
| | | |
| | | /** |
| | | * 任务执行完成状态 |
| | | * |
| | | * @param first |
| | | * @param demandUserProfile |
| | | * @param demandId |
| | |
| | | |
| | | /** |
| | | * 任务执行日志状态变更 |
| | | * |
| | | * @param jobExecLog |
| | | * @param list |
| | | */ |