QQ空间相册下载

介绍:批量下载相册的所有照片(仅限于相册是可以访问且不需要回到问题的)

还是使用selenum的形式模拟登陆,然后获取到ck后,在通过api接口获取相关的相册数据

# -*- coding: UTF-8 -*-

import os
import random
import time
from collections import namedtuple

import requests
from selenium import webdriver
from selenium.webdriver.chrome.service import Service

from io_in_out import *

curpath = os.path.dirname(os.path.realpath(__file__))
curpath = io_in_arg(curpath)

QzoneAlbum = namedtuple('QzoneAlbum', ['uid', 'name', 'count'])
QzonePhoto = namedtuple('QzonePhoto', ['url', 'name', 'album', 'is_video'])

app_config = {
    "max_workers": 20,  # 并行下载线程数量
 "timeout_init": 10,  # 初始超时时间,可以调整大一点,防止某些照片过大,响应太慢
 "max_attempts": 10,  # 下载失败后最大重试次数
 "is_api_debug": 10,  # 是否打印 API 的响应结果,调试的时候使用
 "executionQzoneAlbums": []  # 要排除的相册名称
}


def func_save_dir(user):
    '''
    提供下载的文件保存在哪
    保存至 <脚本目录>/qzone_photo/<用户QQ> 目录
    '''
 return os.path.join(curpath, u'qzone_photo', u'{0}'.format(user))


def func_save_photo_net_helper(session, url, timeout):
    '''
    辅助函数,先用带会话的 session 尝试下载,如果不行就去掉会话尝试下载
    '''
 if session:
        # 使用已经登陆过的账户下载,不然加密的照片下载都是写着“加密照片”
        # 使用 post 还不行,要用 get
 try:
            return session.get(url, timeout=timeout)
        except requests.ReadTimeout:
            try:
                return session.post(url, timeout=timeout)
            except requests.ReadTimeout:
                return func_save_photo_net_helper(None, url, timeout)
    else:
        return requests.get(url, timeout=timeout)


def func_save_photo(arg):
    '''
    线程函数,运行在线程池中
    文件保存格式 <相册名字>_<文件在相册的索引数字>_<文件名字>.jpeg

    1、Q.分次下载的文件,能确保同一个文件名字,都是同一个文件吗?
       A. 这个由 Qzone 的 API 保证,API 能保证顺序,那么这里就能保证顺序
    2. Q.文件名字非法,不可创建文件,怎么处理?
       A. 会用文件名字 <相册在所有相册中的索引数字>_<文件在相册的索引数字>.jpg 进行二次试创建,
         解决因为相册名字,照片名字引起的文件名非法问题。
    '''
 session, user, album_index, album_name, index, photo = arg

    dest_path = os.path.join(func_save_dir(user), album_name.strip())
    # if not os.path.exists(dest_path):
    #     os.makedirs(dest_path)
 fn = u'{0}_{1}.jpeg'.format(index, photo.name)
    if photo.is_video:
        fn = u'{0}_{1}_视频缩略图.jpeg'.format(index, photo.name)

    print("[开始下载] 相册 {0} 的第 {1} 张图片".format(album_name, index + 1))

    def _func_replace_os_path_sep(x):
        return x.replace(u'/', u'_').replace(u'\\', u'_')

    fn = _func_replace_os_path_sep(fn)
    c_p = os.path.join(dest_path, fn)
    if not io_is_path_valid(c_p):
        c_p = os.path.join(
            dest_path, u'random_name_{0}_{1}.jpeg'.format(album_index, index))

    # 可能使用其他 api 下载过文件就不再下载
 if os.path.exists(c_p):
        print("[本地已存在] 相册 {0} 的第 {1} 张图片".format(album_name, index + 1))
        return

 url = photo.url.replace('\\', '')
    max_attempts = app_config['max_attempts']
    attempts = 0
 timeout = app_config['timeout_init']
    while attempts < max_attempts:
        try:
            req = func_save_photo_net_helper(session, url, timeout)
            print("[下载成功] 相册 {0} 的第 {1} 张图片;当前重试进度 {2}/10,超时时间 {3}".format(album_name, index + 1, attempts, timeout))
            break
        except (requests.exceptions.ReadTimeout,
 requests.exceptions.ConnectionError):
            attempts += 1
 timeout += 5
 print("[重试下载] 相册 {0} 的第 {1} 张图片;当前重试进度 {2}/10,超时时间 {3}".format(album_name, index + 1, attempts, timeout))
    else:
        io_print(u'down fail user:{0} {1}'.format(user, photo.url))
        return
 c = req.content

    with open(c_p, 'wb') as f:
        f.write(c)


class QzonePhotoManager(object):
    """
    查询QQ空间相册并下载的类。
    """

 # 相册列表
 albumbase = (
        'https://user.qzone.qq.com/proxy/domain/photo.qzone.qq.com/fcgi-bin/fcg_list_album_v3?'
        'g_tk={gtk}&t={t}&hostUin={dest_user}&uin={user}'
        '&appid=4&inCharset=utf-8&outCharset=utf-8&source=qzone&plat=qzone&format=jsonp'
        '&notice=0&filter=1&handset=4&pageNumModeSort=40&pageNumModeClass=15&needUserInfo=1'
        '&idcNum=4&callbackFun=shine0&callback=shine0_Callback')

    # 这里的g_tk出现问题,目前版本的话,会出现有些照片下载下来后实际是不行的,显示加密文件,只要想办法处理这个的算法即可
 photobase = (
        'https://h5.qzone.qq.com/proxy/domain/photo.qzone.qq.com/fcgi-bin/'
        'cgi_list_photo?g_tk={gtk}&t={t}&mode=0&idcNum=4&hostUin={dest_user}'
        '&topicId={album_id}&noTopic=0&uin={user}&pageStart={pageStart}&pageNum={pageNum}'
        '&skipCmtCount=0&singleurl=1&batchId=&notice=0&appid=4&inCharset=utf-8&outCharset=utf-8'
        '&source=qzone&plat=qzone&outstyle=json&format=jsonp&json_esc=1&question=&answer='
        '&callbackFun=shine0&callback=shine0_Callback&_={_}')

    def __init__(self, user, password):
        self.user = user
        self.password = password

        self.headers = {
            "Referer": "https://creator.douyin.com/",
 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36',
 'Host': 'creator.douyin.com/',
 'sec-ch-ua-mobile': '?0',
 'Sec-Fetch-Mode': 'cors',
 'Sec-Fetch-Dest': 'empty',
 'sec-ch-ua': '"Chromium";v="110", "Not A(Brand";v="24", "Google Chrome";v="110"',
 'Sec-Fetch-Site': 'same-origin',
 'sec-ch-ua-platform': '"macOS"',
 'X-Requested-With': 'XMLHttpRequest',
 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,it;q=0.7',
 'Accept': 'application/json, text/javascript, */*; q=0.01'
 }
        self.options = webdriver.ChromeOptions()
        self.options.add_experimental_option('useAutomationExtension', False)
        self.options.add_experimental_option('excludeSwitches', ['enable-automation'])
        self.options.add_experimental_option("detach", True)
        self.options.add_argument("disable-blink-features=AutomationControlled")
        self.options.add_argument('lang=zh-CN,zh')
        self.options.add_argument(
            'user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36')
        BINARY_LOCATION = "/Users/JPlane/Desktop/JPlane/application/Google Chrome.app/Contents/MacOS/Google Chrome"
 CHROME_DRIVER_BINARY = "/Users/JPlane/Desktop/space/py/wxservice/xxxx/chromedriver-mac-x64/chromedriver"
 self.options.binary_location = BINARY_LOCATION
        self.chrome_driver_binary = CHROME_DRIVER_BINARY
        service = Service(executable_path=CHROME_DRIVER_BINARY)
        driver = webdriver.Chrome(service=service, options=self.options)

        # driver = webdriver.Chrome('/Users/JPlane/Desktop/space/py/ddy/qzone-photo-downloader/chromedriver')
        # 使用 get() 方法打开待抓取的 URL
 driver.get('http://user.qzone.qq.com')
        time.sleep(8)
        # 等待 5 秒后,判断页面是否需要登录,通过查找页面是否有相应的 DIV 的 id 来判断
 try:
            driver.find_element_by_id('login_div')
            a = True
        except:
            a = False
        if a == True:
            # 如果页面存在登录的 DIV,则模拟登录
 driver.switch_to.frame('login_frame')
            driver.find_element_by_id('switcher_plogin').click()
            driver.find_element_by_id('u').clear()  # 选择用户名框
 driver.find_element_by_id('u').send_keys(user)
            driver.find_element_by_id('p').clear()
            driver.find_element_by_id('p').send_keys(password)
            driver.find_element_by_id('login_button').click()
            time.sleep(3)
        driver.implicitly_wait(3)

        # 判断好友空间是否设置了权限,通过判断是否存在元素 ID:QM_OwnerInfo_Icon
 try:
            driver.find_element_by_id('QM_OwnerInfo_Icon')
            b = True
        except:
            b = False
 # 如果有权限能够访问到说说页面,那么定位元素和数据,并解析
 if b == True:
            print("登录成功")

        # 尝试一下获取 Cookie,使用 get_cookies()
 cookies = driver.get_cookies()
        # print(cookies)

 cookies_dict = {}
        cookies_dict = {c['name']: c['value'] for c in cookies}

        self.cookie = cookies_dict
        self.session = ""
 self.qzone_g_tk = self.calc_g_tk(cookies_dict['p_skey'])

        driver.close()
        driver.quit()




    # -----------------
    # 计算 g_tk
    # -----------------
 def calc_g_tk(self, p_skey):
        t = 5381
 for c in p_skey:
            t += (t << 5) + ord(c)
        return t & 2147483647

 def access_net(self, url, timeout):
        '''
        使用登录时的 session,cookie 访问网络 ,适用于高版本的 qzone api
        '''
 r = requests.get(url, cookies=self.cookie, timeout=timeout)
        # r = self.session.get(url, timeout=timeout)
 c = r.text
        c = c.replace('shine0_Callback(', '').replace(');', '')
        # print(c)
 return c

    def get_albums(self, dest_user):
        import json
        albums = []
        url = self.albumbase.format(gtk=self.qzone_g_tk,
 t=random.Random().random(),
 dest_user=dest_user,
 user=self.user)
        if app_config['is_api_debug']:
            print(url)
        c = self.access_net(url, timeout=8)
        if c:
            c = json.loads(c)
            if app_config['is_api_debug']:
                print(c)
            if c['code'] != 0:
                return albums
            if ('data' in c) and ('albumListModeSort' in c['data']):
                for i in c['data']['albumListModeSort']:
                    if i.__contains__('albumList'):
                        if (i['albumList']) is None:
                            continue
                        if app_config['is_api_debug']:
                            print(i['albumList'])
                        # for ii in i['albumList']:
 albums.append(QzoneAlbum._make([i['id'], i['name'], i['total']]))
        if app_config['is_api_debug']:
            print(albums)
        return albums

    def get_photos_by_album(self, dest_user, album):
        import json

        photos = []
        pageStart = 0
 pageNum = 500  # 接口最多返回 500 条照片
 totalInAlbum = 0  # 总照片数量
 totalInPage = 0  # 当次分页拿到了多少张照片

 while True:
            url = self.photobase.format(gtk=self.qzone_g_tk,
 t=random.Random().random(),
 dest_user=dest_user,
 user=self.user,
 album_id=album.uid,
 pageStart=pageStart,
 pageNum=pageNum,
 _=int(time.time()*1000),
 )
            if app_config['is_api_debug']:
                print(url)
            c = self.access_net(url, timeout=app_config['timeout_init'])
            if app_config['is_api_debug']:
                print(c)

            if c:
                c = json.loads(c)
                if c['code'] != 0:
                    return photos
                if 'data' in c:
                    totalInAlbum = c['data']['totalInAlbum']
                    totalInPage = c['data']['totalInPage']
                    if totalInAlbum == 0:  # 该相册没有照片
 return photos
                    if totalInPage == 0:  # 当次请求没有获取到照片,也就说明到了最后的页数
 return photos

                    if 'photoList' in c['data']:
                        photolist = c['data']['photoList']
                        if photolist is None:
                            return photos
                        for i in photolist:
                            if i['raw']:
                                pic_url = i['raw']
                            else:
                                pic_url = i['url']
                            photos.append(QzonePhoto._make([pic_url, i['name'], album, i['is_video']]))
                    # 如果第一次总数就已经是获取到的数量,就说明只有第一页,不需要继续下一页
 if totalInAlbum == totalInPage:
                        return photos
                    # 下一页的请求参数
 pageStart = pageStart + totalInPage

        return photos

    def get_photos(self, dest_user):
        '''
        能访问所有相册, 前提是先有权限访问该相册
 :param dest_user:
 :return:
        '''
 from concurrent.futures import ThreadPoolExecutor

        # 先获得所有相册
 albums = self.get_albums(dest_user)
        photos_all = []
        io_print(u'获取到 {0} 个相册'.format(len(albums)))

        for i in range(len(albums)):
            if app_config['is_api_debug']:
                print(f'        {albums[i].name}')

            dest_path = os.path.join(func_save_dir(dest_user), albums[i].name)
            if not os.path.exists(dest_path):
                os.makedirs(dest_path)

        for i, album in enumerate(albums):
            if album.name in app_config["executionQzoneAlbums"]:
                print(f'该相册排除不下载: {album.name}')
                continue
 # 根据相册 id 获取相册内所有照片
 photos = self.get_photos_by_album(dest_user, album)
            photos = [(self.session, dest_user, i, album.name, si, photo)
                      for si, photo in enumerate(photos)]
            print(f'{album.name}-->总共{len(photos)}张')
            p = func_save_dir(dest_user)

            if not os.path.exists(p):
                os.makedirs(p)
            photos_all.extend(photos)

        max_workers = app_config['max_workers']
        print(f'启用多线程下载,并行下载线程数量为 {max_workers}')
        with ThreadPoolExecutor(max_workers=max_workers) as pool:
            r = pool.map(func_save_photo, photos_all)
            list(r)

        if not albums:
            io_stderr_print(u'未找到 {0} 可下载的相册'.format(dest_user))


def entry():
    # 你的 QQ和密码,QQ号必须写,密码可以省略,然后使用网页快速登录功能
 main_user = xxxxxxx
 main_pass = 'xxxxxxxx'

 # 要处理的目标 QQ 号,此处可填入多个QQ号,中间用逗号隔开
 dest_users = [
        xxxxxxx
 ]

    a = QzonePhotoManager(main_user, main_pass)
    io_print(u'登录成功')

    # 一些优化配置,根据自己的需要调整
    # 如果不需要调整的话,可以将这个配置在这里注释掉使用默认配置
 global app_config
    app_config["max_workers"] = 2  # 并行下载线程数量
 app_config["timeout_init"] = 50  # 初始超时时间,可以调整大一点,防止某些照片过大,响应太慢
 app_config["max_attempts"] = 3  # 下载失败后最大重试次数
 app_config["is_api_debug"] = True  # 是否打印 API 的响应结果,在调试的时候使用
 app_config["executionQzoneAlbums"] = [
    ]  # 排除不下载的相册名称,多个用逗号分隔,比如 'a','b'
    # 如果遇到下载失败的,产生超时异常终止程序运行的,可以再重新运行,已经下载过的文件不会重新下载
 for e in dest_users:
        io_print(u'正在处理用户 {0}'.format(e))
        a.get_photos(e)
        io_print(u'处理完成')


if __name__ == '__main__':
    entry()

Powered By Z-BlogPHP 1.7.3

Copyright 粤ICP备2024347557号 Rights Reserved.