yellowDog
2024-07-18 ccd354ec9f6372726622a97ac7476d50f8b0cb5a
feat:实现全量粉丝采集,修改数据源,修改每个视频的采集数量为120条
2 files deleted
5 files modified
4 files added
283 ■■■■■ changed files
config/base_config.py 2 ●●● patch | view | raw | blame | history
config/db_config.py 5 ●●●● patch | view | raw | blame | history
config/dy_get_data.py 51 ●●●●● patch | view | raw | blame | history
dao/dy/dy_video_id.py 71 ●●●●● patch | view | raw | blame | history
media_platform/douyin/client.py 23 ●●●● patch | view | raw | blame | history
media_platform/douyin/core.py 18 ●●●● patch | view | raw | blame | history
model/__init__.py patch | view | raw | blame | history
model/model.py 56 ●●●●● patch | view | raw | blame | history
model/sqlalchemy_config.py 19 ●●●●● patch | view | raw | blame | history
store/douyin/__init__.py 27 ●●●● patch | view | raw | blame | history
store/douyin/dy_video_id.py 11 ●●●●● patch | view | raw | blame | history
config/base_config.py
@@ -1,5 +1,5 @@
# 基础配置
from config.dy_get_data import get_dy_video_id
from dao.dy.dy_video_id import get_dy_video_id
PLATFORM = "dy"
KEYWORDS = "短视频代运营,代运营"
config/db_config.py
@@ -1,10 +1,8 @@
import os
import toml
from sqlalchemy import URL
current_dir = os.path.dirname(__file__)
import urllib.parse
# 构建配置文件的完整路径
config_file_path = os.path.join(current_dir, 'db_config.toml')
@@ -13,7 +11,7 @@
mysql_config = toml.load(config_file_path).get('mysql')
matrix_config = toml.load(config_file_path).get('matrix')
# matrix_config['password'] = urllib.parse.quote_plus(matrix_config['password'])
print(matrix_config)
# mysql config
RELATION_DB_PWD = os.getenv("RELATION_DB_PWD", mysql_config.get('password'))
@@ -31,7 +29,6 @@
matrix_DB_NAME = os.getenv("matrix_DB_NAME", matrix_config.get('database'))
matrix_DB_URL = f"mysql://{matrix_DB_USER}:{matrix_DB_PWD}@{matrix_DB_HOST}:{matrix_DB_PORT}/{matrix_DB_NAME}"
print(matrix_DB_URL)
# redis config
REDIS_DB_HOST = "127.0.0.1"  # your redis host
REDIS_DB_PWD = os.getenv("REDIS_DB_PWD", "123456")  # your redis password
config/dy_get_data.py
File was deleted
dao/dy/dy_video_id.py
New file
@@ -0,0 +1,71 @@
import time
from typing import Optional, Dict
from sqlalchemy import desc
from model.sqlalchemy_config import Session
from model.model import Aweme
def get_dy_video_id():
    session = Session()
    """查询视频id列表"""
    try:
        aweme_list = session.query(Aweme).filter(
            Aweme.platform == 'DY',
            Aweme.tenant_id == 1,
            Aweme.video_id.isnot(None),
            Aweme.grab_status.is_(False),
            Aweme.task_id.isnot(None),
            Aweme.tenant_id.isnot(None),
            Aweme.task_name.isnot(None)
        ).order_by(desc(Aweme.id)).all()
        aweme_list = [aweme.video_id for aweme in aweme_list]
    except Exception as e:
        print(f"Error occurred while fetching data from the database: {e}")
        return []
    finally:
        session.close()
    return aweme_list
def change_status(video_id):
    session = Session()
    try:
        existing_aweme = session.query(Aweme).filter(Aweme.video_id == video_id).first()
        if existing_aweme:
            existing_aweme.grab_status = True
            session.commit()
            return existing_aweme
        else:
            print(f"Aweme with id {video_id} not found.")
            return None
    except Exception as e:
        session.rollback()
        print(f"Error occurred while updating data in the database: {e}")
        return None
    finally:
        session.close()
def get_task_info_by_video_id(video_id) -> Optional[Dict[str, Aweme]]:
    session = Session()
    try:
        aweme = session.query(Aweme).filter(Aweme.video_id == video_id).first()
        if aweme:
            return {
                aweme.video_id: aweme
            }
        else:
            print(f"Aweme with id {video_id} not found.")
            return None
    except Exception as e:
        print(f"Error occurred while fetching data from the database: {e}")
        return None
    finally:
        session.close()
if __name__ == '__main__':
    change_status(video_id=7392489372630699276)
    print(get_task_info_by_video_id(7392489372630699276))
media_platform/douyin/client.py
@@ -9,6 +9,7 @@
from playwright.async_api import BrowserContext, Page
from base.base_crawler import AbstractApiClient
from dao.dy.dy_video_id import change_status
from tools import utils
from var import request_keyword_var
@@ -127,12 +128,12 @@
            "count": 10  # must be set to 10
        }
        if sort_type != SearchSortType.GENERAL or publish_time != PublishTimeType.UNLIMITED:
           params["filter_selected"] = urllib.parse.quote(json.dumps({
               "sort_type": str(sort_type.value),
               "publish_time": str(publish_time.value)
           }))
           params["is_filter_search"] = 1
           params["search_source"] = "tab_search"
            params["filter_selected"] = urllib.parse.quote(json.dumps({
                "sort_type": str(sort_type.value),
                "publish_time": str(publish_time.value)
            }))
            params["is_filter_search"] = 1
            params["search_source"] = "tab_search"
        referer_url = "https://www.douyin.com/search/" + keyword
        referer_url += f"?publish_time={publish_time.value}&sort_type={sort_type.value}&type=general"
        headers = copy.copy(self.headers)
@@ -151,7 +152,10 @@
        headers = copy.copy(self.headers)
        # headers["Cookie"] = "s_v_web_id=verify_lol4a8dv_wpQ1QMyP_xemd_4wON_8Yzr_FJa8DN1vdY2m;"
        del headers["Origin"]
        print(params)
        print(headers)
        res = await self.get("/aweme/v1/web/aweme/detail/", params, headers)
        return res.get("aweme_detail", {})
    async def get_aweme_comments(self, aweme_id: str, cursor: int = 0):
@@ -206,11 +210,16 @@
        result = []
        comments_has_more = 1
        comments_cursor = 0
        max_fetch_comment_count = 5
        # 最大请求5次,每次20条
        fetch_comment_count = 0
        while comments_has_more:
            comments_res = await self.get_aweme_comments(aweme_id, comments_cursor)
            comments_has_more = comments_res.get("has_more", 0)
            comments_cursor = comments_res.get("cursor", 0)
            comments = comments_res.get("comments", [])
            if fetch_comment_count >= max_fetch_comment_count:
                comments_has_more = 0
            if not comments:
                continue
            result.extend(comments)
@@ -218,6 +227,8 @@
                await callback(aweme_id, comments)
            await asyncio.sleep(crawl_interval)
            fetch_comment_count += 1
            change_status(aweme_id)
            if not is_fetch_sub_comments:
                continue
            # 获取二级评论
media_platform/douyin/core.py
@@ -66,7 +66,7 @@
                await self.search()
            elif config.CRAWLER_TYPE == "detail":
                # Get the information and comments of the specified post
                await self.get_specified_awemes()
                await self.batch_get_note_comments(config.DY_SPECIFIED_ID_LIST)
            elif config.CRAWLER_TYPE == "creator":
                # Get the information and comments of the specified creator
                await self.get_creators_and_videos()
@@ -117,14 +117,14 @@
    async def get_specified_awemes(self):
        """Get the information and comments of the specified post"""
        semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
        task_list = [
            self.get_aweme_detail(aweme_id=aweme_id, semaphore=semaphore) for aweme_id in config.DY_SPECIFIED_ID_LIST
        ]
        aweme_details = await asyncio.gather(*task_list)
        for aweme_detail in aweme_details:
            if aweme_detail is not None:
                await douyin_store.update_douyin_aweme(aweme_detail)
        # semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
        # task_list = [
        #     self.get_aweme_detail(aweme_id=aweme_id, semaphore=semaphore) for aweme_id in config.DY_SPECIFIED_ID_LIST
        # ]
        # aweme_details = await asyncio.gather(*task_list)
        # for aweme_detail in aweme_details:
        #     if aweme_detail is not None:
        #         await douyin_store.update_douyin_aweme(aweme_detail)
        await self.batch_get_note_comments(config.DY_SPECIFIED_ID_LIST)
    async def get_aweme_detail(self, aweme_id: str, semaphore: asyncio.Semaphore) -> Any:
model/__init__.py
model/model.py
New file
@@ -0,0 +1,56 @@
from sqlalchemy import Column, String, BigInteger
from sqlalchemy.orm import declarative_base
Base = declarative_base()
class Aweme(Base):
    __tablename__ = 'aijuke_grab_post'
    id = Column(BigInteger, primary_key=True, autoincrement=True)
    tenant_id = Column(String(500))
    video_id = Column(String(500))
    task_id = Column(String(500))
    task_name = Column(String(500))
    device_no = Column(String(500))
    platform = Column(String(500))
    title = Column(String(500))
    grab_status = Column(BigInteger)
    def to_dict(self):
        return {
            'id': self.id,
            'tenant_id': self.tenant_id,
            'video_id': self.video_id,
            'task_id': self.task_id,
            'task_name': self.task_name,
            'device_no': self.device_no,
            'platform': self.platform,
            'grab_status': self.grab_status,
            'title': self.title,
        }
# class CommentGot(Base):
#     __tablename__ = 'aijuke_comment_got'
#     id = Column(BigInteger, primary_key=True, autoincrement=True)
#     tenant_id = Column(String(500))
#     video_id = Column(String(500))
#     task_id = Column(String(500))
#     task_name = Column(String(500))
#     device_no = Column(String(500))
#     title = Column(String(500))
#     platform = Column(String(500))
#     nickname= Column(String(500))
# class GrabPost(Base):
#     __tablename__ = 'aijuke_grab_post'
#     id = Column(BigInteger, primary_key=True, autoincrement=True)
#     tenant_id = Column(String(500))
#     video_id = Column(String(500))
#     task_id = Column(String(500))
#     task_name = Column(String(500))
#     device_no = Column(String(500))
#     platform = Column(String(500))
#     title = Column(String(500))
model/sqlalchemy_config.py
New file
@@ -0,0 +1,19 @@
import os
import toml
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
current_dir = os.path.dirname(__file__)
# 构建配置文件的完整路径
config_file_path = os.path.join(current_dir, '../config/db_config.toml')
# 加载配置文件
mysql_config = toml.load(config_file_path).get('matrix')
# 创建数据库连接
engine = create_engine(
    f'mysql+pymysql://{mysql_config.get("user")}:{mysql_config.get("password")}@{mysql_config.get("host")}:{mysql_config.get("port")}/{mysql_config.get("database")}'
)
Session = sessionmaker(bind=engine)
store/douyin/__init__.py
@@ -4,8 +4,8 @@
# @Desc    :
from typing import List
import config
from store.douyin import dy_video_id
from dao.dy.dy_video_id import get_dy_video_id, change_status, get_task_info_by_video_id
from model.model import Aweme
from .douyin_store_impl import *
@@ -59,7 +59,7 @@
        "update_time": utils.get_current_timestamp(),
        "url": f"https://www.douyin.com/video/{aweme_id}"
    }
    dy_video_id.change_status_by_aweme_id(save_content_item.get('aweme_id'))
    get_dy_video_id().change_status_by_aweme_id(save_content_item.get('aweme_id'))
    utils.logger.info(
        f"[store.douyin.update_douyin_aweme] douyin aweme id:{aweme_id}, title:{save_content_item.get('title')}")
    await DouyinStoreFactory.create_store().store_content(content_item=save_content_item)
@@ -70,6 +70,9 @@
        return
    for comment_item in comments:
        await update_dy_aweme_comment(aweme_id, comment_item)
cache_data= {}
async def update_dy_aweme_comment(aweme_id: str, comment_item: Dict):
@@ -112,9 +115,23 @@
        # "parent_comment_id": parent_comment_id
    }
    # print(save_comment_item)
    utils.logger.info(
        f"[store.douyin.update_dy_aweme_comment] douyin aweme comment: , content: {save_comment_item.get('content')}")
    def get_task_info(video_id) -> Aweme:
        if task_info := cache_data.get(video_id):
            return task_info
        else:
            task_info = get_task_info_by_video_id(video_id)
            cache_data.update(task_info)
            return cache_data.get(video_id)
    save_comment_item.update({
        'task_id': get_task_info(save_comment_item.get("video_id")).task_id,
        'task_name': get_task_info(save_comment_item.get("video_id")).task_name,
        'tenant_id': get_task_info(save_comment_item.get("video_id")).tenant_id,
        'device_no': get_task_info(save_comment_item.get("video_id")).device_no,
        'title': get_task_info(save_comment_item.get("video_id")).title,
    })
    utils.logger.info(
        f"[store.douyin.update_dy_aweme_comment] douyin aweme comment: , content: {save_comment_item}")
    await DouyinStoreFactory.create_store().store_comment(comment_item=save_comment_item)
store/douyin/dy_video_id.py
File was deleted