feat:实现全量粉丝采集,修改数据源,修改每个视频的采集数量为120条
2 files deleted
5 files modified
4 files added
| | |
| | | # 基础配置 |
| | | from config.dy_get_data import get_dy_video_id |
| | | from dao.dy.dy_video_id import get_dy_video_id |
| | | |
| | | PLATFORM = "dy" |
| | | KEYWORDS = "短视频代运营,代运营" |
| | |
| | | import os |
| | | |
| | | import toml |
| | | from sqlalchemy import URL |
| | | |
| | | current_dir = os.path.dirname(__file__) |
| | | import urllib.parse |
| | | |
| | | # 构建配置文件的完整路径 |
| | | config_file_path = os.path.join(current_dir, 'db_config.toml') |
| | |
| | | mysql_config = toml.load(config_file_path).get('mysql') |
| | | matrix_config = toml.load(config_file_path).get('matrix') |
| | | # matrix_config['password'] = urllib.parse.quote_plus(matrix_config['password']) |
| | | print(matrix_config) |
| | | |
| | | |
| | | # mysql config |
| | | RELATION_DB_PWD = os.getenv("RELATION_DB_PWD", mysql_config.get('password')) |
| | |
| | | matrix_DB_NAME = os.getenv("matrix_DB_NAME", matrix_config.get('database')) |
| | | matrix_DB_URL = f"mysql://{matrix_DB_USER}:{matrix_DB_PWD}@{matrix_DB_HOST}:{matrix_DB_PORT}/{matrix_DB_NAME}" |
| | | |
| | | print(matrix_DB_URL) |
| | | # redis config |
| | | REDIS_DB_HOST = "127.0.0.1" # your redis host |
| | | REDIS_DB_PWD = os.getenv("REDIS_DB_PWD", "123456") # your redis password |
New file |
| | |
| | | import time |
| | | from typing import Optional, Dict |
| | | |
| | | from sqlalchemy import desc |
| | | |
| | | from model.sqlalchemy_config import Session |
| | | from model.model import Aweme |
| | | |
| | | |
| | | def get_dy_video_id(): |
| | | session = Session() |
| | | """查询视频id列表""" |
| | | try: |
| | | aweme_list = session.query(Aweme).filter( |
| | | Aweme.platform == 'DY', |
| | | Aweme.tenant_id == 1, |
| | | Aweme.video_id.isnot(None), |
| | | Aweme.grab_status.is_(False), |
| | | Aweme.task_id.isnot(None), |
| | | Aweme.tenant_id.isnot(None), |
| | | Aweme.task_name.isnot(None) |
| | | ).order_by(desc(Aweme.id)).all() |
| | | aweme_list = [aweme.video_id for aweme in aweme_list] |
| | | except Exception as e: |
| | | print(f"Error occurred while fetching data from the database: {e}") |
| | | return [] |
| | | finally: |
| | | session.close() |
| | | return aweme_list |
| | | |
| | | |
| | | def change_status(video_id): |
| | | session = Session() |
| | | try: |
| | | existing_aweme = session.query(Aweme).filter(Aweme.video_id == video_id).first() |
| | | if existing_aweme: |
| | | existing_aweme.grab_status = True |
| | | session.commit() |
| | | return existing_aweme |
| | | else: |
| | | print(f"Aweme with id {video_id} not found.") |
| | | return None |
| | | except Exception as e: |
| | | session.rollback() |
| | | print(f"Error occurred while updating data in the database: {e}") |
| | | return None |
| | | finally: |
| | | session.close() |
| | | |
| | | |
| | | def get_task_info_by_video_id(video_id) -> Optional[Dict[str, Aweme]]: |
| | | session = Session() |
| | | try: |
| | | aweme = session.query(Aweme).filter(Aweme.video_id == video_id).first() |
| | | if aweme: |
| | | return { |
| | | aweme.video_id: aweme |
| | | } |
| | | else: |
| | | print(f"Aweme with id {video_id} not found.") |
| | | return None |
| | | except Exception as e: |
| | | print(f"Error occurred while fetching data from the database: {e}") |
| | | return None |
| | | finally: |
| | | session.close() |
| | | |
| | | |
| | | if __name__ == '__main__': |
| | | change_status(video_id=7392489372630699276) |
| | | print(get_task_info_by_video_id(7392489372630699276)) |
| | |
| | | from playwright.async_api import BrowserContext, Page |
| | | |
| | | from base.base_crawler import AbstractApiClient |
| | | from dao.dy.dy_video_id import change_status |
| | | from tools import utils |
| | | from var import request_keyword_var |
| | | |
| | |
| | | "count": 10 # must be set to 10 |
| | | } |
| | | if sort_type != SearchSortType.GENERAL or publish_time != PublishTimeType.UNLIMITED: |
| | | params["filter_selected"] = urllib.parse.quote(json.dumps({ |
| | | "sort_type": str(sort_type.value), |
| | | "publish_time": str(publish_time.value) |
| | | })) |
| | | params["is_filter_search"] = 1 |
| | | params["search_source"] = "tab_search" |
| | | params["filter_selected"] = urllib.parse.quote(json.dumps({ |
| | | "sort_type": str(sort_type.value), |
| | | "publish_time": str(publish_time.value) |
| | | })) |
| | | params["is_filter_search"] = 1 |
| | | params["search_source"] = "tab_search" |
| | | referer_url = "https://www.douyin.com/search/" + keyword |
| | | referer_url += f"?publish_time={publish_time.value}&sort_type={sort_type.value}&type=general" |
| | | headers = copy.copy(self.headers) |
| | |
| | | headers = copy.copy(self.headers) |
| | | # headers["Cookie"] = "s_v_web_id=verify_lol4a8dv_wpQ1QMyP_xemd_4wON_8Yzr_FJa8DN1vdY2m;" |
| | | del headers["Origin"] |
| | | print(params) |
| | | print(headers) |
| | | res = await self.get("/aweme/v1/web/aweme/detail/", params, headers) |
| | | |
| | | return res.get("aweme_detail", {}) |
| | | |
| | | async def get_aweme_comments(self, aweme_id: str, cursor: int = 0): |
| | |
| | | result = [] |
| | | comments_has_more = 1 |
| | | comments_cursor = 0 |
| | | max_fetch_comment_count = 5 |
| | | # 最大请求5次,每次20条 |
| | | fetch_comment_count = 0 |
| | | while comments_has_more: |
| | | comments_res = await self.get_aweme_comments(aweme_id, comments_cursor) |
| | | comments_has_more = comments_res.get("has_more", 0) |
| | | comments_cursor = comments_res.get("cursor", 0) |
| | | comments = comments_res.get("comments", []) |
| | | if fetch_comment_count >= max_fetch_comment_count: |
| | | comments_has_more = 0 |
| | | if not comments: |
| | | continue |
| | | result.extend(comments) |
| | |
| | | await callback(aweme_id, comments) |
| | | |
| | | await asyncio.sleep(crawl_interval) |
| | | fetch_comment_count += 1 |
| | | change_status(aweme_id) |
| | | if not is_fetch_sub_comments: |
| | | continue |
| | | # 获取二级评论 |
| | |
| | | await self.search() |
| | | elif config.CRAWLER_TYPE == "detail": |
| | | # Get the information and comments of the specified post |
| | | await self.get_specified_awemes() |
| | | await self.batch_get_note_comments(config.DY_SPECIFIED_ID_LIST) |
| | | elif config.CRAWLER_TYPE == "creator": |
| | | # Get the information and comments of the specified creator |
| | | await self.get_creators_and_videos() |
| | |
| | | |
| | | async def get_specified_awemes(self): |
| | | """Get the information and comments of the specified post""" |
| | | semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) |
| | | task_list = [ |
| | | self.get_aweme_detail(aweme_id=aweme_id, semaphore=semaphore) for aweme_id in config.DY_SPECIFIED_ID_LIST |
| | | ] |
| | | aweme_details = await asyncio.gather(*task_list) |
| | | for aweme_detail in aweme_details: |
| | | if aweme_detail is not None: |
| | | await douyin_store.update_douyin_aweme(aweme_detail) |
| | | # semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) |
| | | # task_list = [ |
| | | # self.get_aweme_detail(aweme_id=aweme_id, semaphore=semaphore) for aweme_id in config.DY_SPECIFIED_ID_LIST |
| | | # ] |
| | | # aweme_details = await asyncio.gather(*task_list) |
| | | # for aweme_detail in aweme_details: |
| | | # if aweme_detail is not None: |
| | | # await douyin_store.update_douyin_aweme(aweme_detail) |
| | | await self.batch_get_note_comments(config.DY_SPECIFIED_ID_LIST) |
| | | |
| | | async def get_aweme_detail(self, aweme_id: str, semaphore: asyncio.Semaphore) -> Any: |
New file |
| | |
| | | from sqlalchemy import Column, String, BigInteger |
| | | from sqlalchemy.orm import declarative_base |
| | | |
| | | Base = declarative_base() |
| | | |
| | | |
| | | class Aweme(Base): |
| | | __tablename__ = 'aijuke_grab_post' |
| | | id = Column(BigInteger, primary_key=True, autoincrement=True) |
| | | tenant_id = Column(String(500)) |
| | | video_id = Column(String(500)) |
| | | task_id = Column(String(500)) |
| | | task_name = Column(String(500)) |
| | | device_no = Column(String(500)) |
| | | platform = Column(String(500)) |
| | | title = Column(String(500)) |
| | | grab_status = Column(BigInteger) |
| | | |
| | | def to_dict(self): |
| | | return { |
| | | 'id': self.id, |
| | | 'tenant_id': self.tenant_id, |
| | | 'video_id': self.video_id, |
| | | 'task_id': self.task_id, |
| | | 'task_name': self.task_name, |
| | | 'device_no': self.device_no, |
| | | 'platform': self.platform, |
| | | 'grab_status': self.grab_status, |
| | | 'title': self.title, |
| | | } |
| | | |
| | | |
| | | # class CommentGot(Base): |
| | | # __tablename__ = 'aijuke_comment_got' |
| | | # id = Column(BigInteger, primary_key=True, autoincrement=True) |
| | | # tenant_id = Column(String(500)) |
| | | # video_id = Column(String(500)) |
| | | # task_id = Column(String(500)) |
| | | # task_name = Column(String(500)) |
| | | # device_no = Column(String(500)) |
| | | # title = Column(String(500)) |
| | | # platform = Column(String(500)) |
| | | # nickname= Column(String(500)) |
| | | |
| | | |
| | | # class GrabPost(Base): |
| | | # __tablename__ = 'aijuke_grab_post' |
| | | # id = Column(BigInteger, primary_key=True, autoincrement=True) |
| | | # tenant_id = Column(String(500)) |
| | | # video_id = Column(String(500)) |
| | | # task_id = Column(String(500)) |
| | | # task_name = Column(String(500)) |
| | | # device_no = Column(String(500)) |
| | | # platform = Column(String(500)) |
| | | # title = Column(String(500)) |
| | | |
New file |
| | |
| | | import os |
| | | |
| | | import toml |
| | | from sqlalchemy import create_engine |
| | | from sqlalchemy.orm import sessionmaker |
| | | |
| | | current_dir = os.path.dirname(__file__) |
| | | |
| | | # 构建配置文件的完整路径 |
| | | config_file_path = os.path.join(current_dir, '../config/db_config.toml') |
| | | |
| | | # 加载配置文件 |
| | | mysql_config = toml.load(config_file_path).get('matrix') |
| | | |
| | | # 创建数据库连接 |
| | | engine = create_engine( |
| | | f'mysql+pymysql://{mysql_config.get("user")}:{mysql_config.get("password")}@{mysql_config.get("host")}:{mysql_config.get("port")}/{mysql_config.get("database")}' |
| | | ) |
| | | Session = sessionmaker(bind=engine) |
| | |
| | | # @Desc : |
| | | from typing import List |
| | | |
| | | import config |
| | | from store.douyin import dy_video_id |
| | | from dao.dy.dy_video_id import get_dy_video_id, change_status, get_task_info_by_video_id |
| | | from model.model import Aweme |
| | | |
| | | from .douyin_store_impl import * |
| | | |
| | |
| | | "update_time": utils.get_current_timestamp(), |
| | | "url": f"https://www.douyin.com/video/{aweme_id}" |
| | | } |
| | | dy_video_id.change_status_by_aweme_id(save_content_item.get('aweme_id')) |
| | | get_dy_video_id().change_status_by_aweme_id(save_content_item.get('aweme_id')) |
| | | utils.logger.info( |
| | | f"[store.douyin.update_douyin_aweme] douyin aweme id:{aweme_id}, title:{save_content_item.get('title')}") |
| | | await DouyinStoreFactory.create_store().store_content(content_item=save_content_item) |
| | |
| | | return |
| | | for comment_item in comments: |
| | | await update_dy_aweme_comment(aweme_id, comment_item) |
| | | |
| | | |
| | | cache_data= {} |
| | | |
| | | |
| | | async def update_dy_aweme_comment(aweme_id: str, comment_item: Dict): |
| | |
| | | # "parent_comment_id": parent_comment_id |
| | | } |
| | | # print(save_comment_item) |
| | | utils.logger.info( |
| | | f"[store.douyin.update_dy_aweme_comment] douyin aweme comment: , content: {save_comment_item.get('content')}") |
| | | def get_task_info(video_id) -> Aweme: |
| | | if task_info := cache_data.get(video_id): |
| | | return task_info |
| | | else: |
| | | task_info = get_task_info_by_video_id(video_id) |
| | | cache_data.update(task_info) |
| | | return cache_data.get(video_id) |
| | | |
| | | save_comment_item.update({ |
| | | 'task_id': get_task_info(save_comment_item.get("video_id")).task_id, |
| | | 'task_name': get_task_info(save_comment_item.get("video_id")).task_name, |
| | | 'tenant_id': get_task_info(save_comment_item.get("video_id")).tenant_id, |
| | | 'device_no': get_task_info(save_comment_item.get("video_id")).device_no, |
| | | 'title': get_task_info(save_comment_item.get("video_id")).title, |
| | | }) |
| | | utils.logger.info( |
| | | f"[store.douyin.update_dy_aweme_comment] douyin aweme comment: , content: {save_comment_item}") |
| | | await DouyinStoreFactory.create_store().store_comment(comment_item=save_comment_item) |
| | | |
| | | |