使用scrapy采集社交媒体的公开信息
xixi
2024-09-09 c2bc097a274d47308edd341158fae82bea5cd009
实现快手大V信息完善
2 files modified
1 files added
135 ■■■■■ changed files
aijuke_spider/pipelines.py 40 ●●●●● patch | view | raw | blame | history
aijuke_spider/settings.py 1 ●●●● patch | view | raw | blame | history
aijuke_spider/spiders/kuaishou/kuaishou_v_user_profile_spider.py 94 ●●●●● patch | view | raw | blame | history
aijuke_spider/pipelines.py
@@ -862,9 +862,7 @@
class DouyinVUserProfileSpiderPipeline:
    def __init__(self):
        self.session = matrix_session()
        self.buffer = []
        self.logger = logging.getLogger('scrapy.pipelines.DouyinUserProfilePipLine')
        self.profile_num = 0
    def process_item(self, item, spider):
        if spider.name != 'douyin_v_user_profile_spider':
@@ -898,3 +896,41 @@
        return item
class KuaiShouVUserProfileSpiderPipeline:
    def __init__(self):
        self.session = matrix_session()
        self.logger = logging.getLogger('scrapy.pipelines.KuaiShouVUserProfileSpiderPipeline')
    def process_item(self, item, spider):
        if spider.name != 'kuaishou_v_user_profile_spider':
            return item
        user_info = item.get('user_info', {})
        if not user_info:
            return
        ownerCount = user_info.get('ownerCount', {}) if user_info.get('ownerCount', {}) is not None else {}
        v_user = item.get('v_user')
        try:
            v_user = self.session.query(V).filter(
                V.sec_id == v_user.sec_id,
            ).first()
            v_user.post=ownerCount.get('photo_public')  # 作品数
            v_user.follow=ownerCount.get('follow')  # user_id  # 关注数
            # v_user.fans=ownerCount.get('fan')  # 粉丝数
            v_user.fans=int(float(ownerCount.get('fan').replace('万', '')) * 10000) if '万' in ownerCount.get('fan') else int(ownerCount.get('fan'))   # 粉丝数
            # v_user. praise=ownerCount.get('praise'),
            v_user.unique_no=user_info.get('profile', {}).get('user_id')
            v_user.introduction=user_info.get('profile', {}).get('user_text')  # 简介
            v_user.nickname=user_info.get('profile', {}).get('user_name', None) # 昵称
            v_user.avatar = user_info.get('profile', {}).get('headurl') # 头像链接
            v_user.sec_id=user_info.get('profile', {}).get('user_id', None)
            self.session.commit()
        except SQLAlchemyError as e:
            self.session.rollback()
            traceback.print_exc()
            self.logger.error(f'更新大v数据出现错误{v_user}')
        return item
aijuke_spider/settings.py
@@ -83,6 +83,7 @@
    "aijuke_spider.pipelines.KuaiShouCommentSpiderPipeline": 300,
    "aijuke_spider.pipelines.KuaiShouUserProfileSpiderPipeline": 300,
    "aijuke_spider.pipelines.KuaiShouVUserSpiderPipeline": 300,
    "aijuke_spider.pipelines.KuaiShouVUserProfileSpiderPipeline": 300,
    "aijuke_spider.pipelines.XHSSearchSpiderPipeline": 300,
aijuke_spider/spiders/kuaishou/kuaishou_v_user_profile_spider.py
New file
@@ -0,0 +1,94 @@
import json
import traceback
import urllib
from typing import List
import scrapy
from scrapy.http import Response
from aijuke_spider.config.db_config import matrix_session
from aijuke_spider.items import CommentGot, V
from aijuke_spider.spiders.douyin.utils import gen_abogus
from aijuke_spider.spiders.kuaishou.utils import RandomUserAgent
class KuaiShouVUserProfileSpider(scrapy.Spider):
    platform = "KS"
    name = "kuaishou_v_user_profile_spider"
    allowed_domains = ["kuaishou.com"]
    base_url = "https://www.kuaishou.com"
    headers = {
        'user-agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36 Edg/126.0.0.0",
        'Accept-Encoding': "gzip, deflate, br, zstd",
        'Content-Type': "application/json",
        'sec-ch-ua': "\"Not/A)Brand\";v=\"8\", \"Chromium\";v=\"126\", \"Microsoft Edge\";v=\"126\"",
        'sec-ch-ua-mobile': "?0",
        'sec-ch-ua-platform': "\"Windows\"",
        'Origin': "https://www.kuaishou.com",
        'Sec-Fetch-Site': "same-origin",
        'Sec-Fetch-Mode': "cors",
        'Sec-Fetch-Dest': "empty",
        'Referer': "https://www.kuaishou.com/profile/3xwpkd9patqbvic",
        'Accept-Language': "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
    }
    data = {
        "operationName": "visionProfile",
        "variables": {
            "userId": ''
        },
        "query": "query visionProfile($userId: String) {\n  visionProfile(userId: $userId) {\n    result\n    hostName\n    userProfile {\n      ownerCount {\n        fan\n        photo\n        follow\n        photo_public\n        __typename\n      }\n      profile {\n        gender\n        user_name\n        user_id\n        headurl\n        user_text\n        user_profile_bg_url\n        __typename\n      }\n      isFollowing\n      __typename\n    }\n    __typename\n  }\n}\n"
    }
    def __init__(self, *args, **kwargs):
        super(KuaiShouVUserProfileSpider, self).__init__(*args, **kwargs)
        # 从 kwargs 中获取自定义参数
        self.tenant_id = kwargs.get('tenant_id', None)
    def start_requests(self):
        v_user_list = self.get_v_user()
        for v_user in v_user_list:
            data = self.data.copy()
            data.get('variables').update({'userId': v_user.unique_no})
            headers = self.headers.copy()
            yield scrapy.Request(url=f'{self.base_url}/graphql', method='POST', body=json.dumps(data),
                                 headers=headers, meta={'v_user': v_user, 'data': data}, callback=self.parse)
    def parse(self, response: Response, *args, **kwargs):
        try:
            if response.status == 200 and len(response.text) > 0:
                user_info = response.json().get('data',{}).get('visionProfile',{}).get('userProfile',{})
                if user_info:
                    yield {
                        'user_info': user_info,
                        'v_user': response.meta.get('v_user'),
                    }
                    self.logger.debug(f'抓取用户,{user_info.get("profile", None).get("user_name")},'
                                      f'{user_info.get("profile", None).get("user_id")}')
            else:
                self.logger.debug(response.body)
        except json.JSONDecodeError:
            self.logger.error("响应体不是有效的 JSON 格式")
            self.logger.error(response.body)
        except Exception as e:
            self.logger.error(f"处理响应出现问题{e}")
            self.logger.error(response.body)
    def get_v_user(self) -> List[V]:
        with matrix_session() as session:
            try:
                query = session.query(V).filter(
                    V.platform == self.platform,
                    V.url.isnot(None),
                    V.nickname.is_(None)
                ).order_by(V.id.desc())
                if self.tenant_id is not None:
                    query = query.filter(
                        V.tenant_id == self.tenant_id,
                    )
                result = query.all()
                self.logger.info(f'查询到{len(result)}条数据')
                return result
            except Exception as e:
                self.logger.error(e)
                traceback.print_exc()
                return []