介绍:批量下载相册的所有照片(仅限于相册是可以访问且不需要回到问题的)
还是使用selenum的形式模拟登陆,然后获取到ck后,在通过api接口获取相关的相册数据
# -*- coding: UTF-8 -*-
import os
import random
import time
from collections import namedtuple
import requests
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from io_in_out import *
curpath = os.path.dirname(os.path.realpath(__file__))
curpath = io_in_arg(curpath)
QzoneAlbum = namedtuple('QzoneAlbum', ['uid', 'name', 'count'])
QzonePhoto = namedtuple('QzonePhoto', ['url', 'name', 'album', 'is_video'])
app_config = {
"max_workers": 20, # 并行下载线程数量
"timeout_init": 10, # 初始超时时间,可以调整大一点,防止某些照片过大,响应太慢
"max_attempts": 10, # 下载失败后最大重试次数
"is_api_debug": 10, # 是否打印 API 的响应结果,调试的时候使用
"executionQzoneAlbums": [] # 要排除的相册名称
}
def func_save_dir(user):
'''
提供下载的文件保存在哪
保存至 <脚本目录>/qzone_photo/<用户QQ> 目录
'''
return os.path.join(curpath, u'qzone_photo', u'{0}'.format(user))
def func_save_photo_net_helper(session, url, timeout):
'''
辅助函数,先用带会话的 session 尝试下载,如果不行就去掉会话尝试下载
'''
if session:
# 使用已经登陆过的账户下载,不然加密的照片下载都是写着“加密照片”
# 使用 post 还不行,要用 get
try:
return session.get(url, timeout=timeout)
except requests.ReadTimeout:
try:
return session.post(url, timeout=timeout)
except requests.ReadTimeout:
return func_save_photo_net_helper(None, url, timeout)
else:
return requests.get(url, timeout=timeout)
def func_save_photo(arg):
'''
线程函数,运行在线程池中
文件保存格式 <相册名字>_<文件在相册的索引数字>_<文件名字>.jpeg
1、Q.分次下载的文件,能确保同一个文件名字,都是同一个文件吗?
A. 这个由 Qzone 的 API 保证,API 能保证顺序,那么这里就能保证顺序
2. Q.文件名字非法,不可创建文件,怎么处理?
A. 会用文件名字 <相册在所有相册中的索引数字>_<文件在相册的索引数字>.jpg 进行二次试创建,
解决因为相册名字,照片名字引起的文件名非法问题。
'''
session, user, album_index, album_name, index, photo = arg
dest_path = os.path.join(func_save_dir(user), album_name.strip())
# if not os.path.exists(dest_path):
# os.makedirs(dest_path)
fn = u'{0}_{1}.jpeg'.format(index, photo.name)
if photo.is_video:
fn = u'{0}_{1}_视频缩略图.jpeg'.format(index, photo.name)
print("[开始下载] 相册 {0} 的第 {1} 张图片".format(album_name, index + 1))
def _func_replace_os_path_sep(x):
return x.replace(u'/', u'_').replace(u'\\', u'_')
fn = _func_replace_os_path_sep(fn)
c_p = os.path.join(dest_path, fn)
if not io_is_path_valid(c_p):
c_p = os.path.join(
dest_path, u'random_name_{0}_{1}.jpeg'.format(album_index, index))
# 可能使用其他 api 下载过文件就不再下载
if os.path.exists(c_p):
print("[本地已存在] 相册 {0} 的第 {1} 张图片".format(album_name, index + 1))
return
url = photo.url.replace('\\', '')
max_attempts = app_config['max_attempts']
attempts = 0
timeout = app_config['timeout_init']
while attempts < max_attempts:
try:
req = func_save_photo_net_helper(session, url, timeout)
print("[下载成功] 相册 {0} 的第 {1} 张图片;当前重试进度 {2}/10,超时时间 {3}".format(album_name, index + 1, attempts, timeout))
break
except (requests.exceptions.ReadTimeout,
requests.exceptions.ConnectionError):
attempts += 1
timeout += 5
print("[重试下载] 相册 {0} 的第 {1} 张图片;当前重试进度 {2}/10,超时时间 {3}".format(album_name, index + 1, attempts, timeout))
else:
io_print(u'down fail user:{0} {1}'.format(user, photo.url))
return
c = req.content
with open(c_p, 'wb') as f:
f.write(c)
class QzonePhotoManager(object):
"""
查询QQ空间相册并下载的类。
"""
# 相册列表
albumbase = (
'https://user.qzone.qq.com/proxy/domain/photo.qzone.qq.com/fcgi-bin/fcg_list_album_v3?'
'g_tk={gtk}&t={t}&hostUin={dest_user}&uin={user}'
'&appid=4&inCharset=utf-8&outCharset=utf-8&source=qzone&plat=qzone&format=jsonp'
'¬ice=0&filter=1&handset=4&pageNumModeSort=40&pageNumModeClass=15&needUserInfo=1'
'&idcNum=4&callbackFun=shine0&callback=shine0_Callback')
# 这里的g_tk出现问题,目前版本的话,会出现有些照片下载下来后实际是不行的,显示加密文件,只要想办法处理这个的算法即可
photobase = (
'https://h5.qzone.qq.com/proxy/domain/photo.qzone.qq.com/fcgi-bin/'
'cgi_list_photo?g_tk={gtk}&t={t}&mode=0&idcNum=4&hostUin={dest_user}'
'&topicId={album_id}&noTopic=0&uin={user}&pageStart={pageStart}&pageNum={pageNum}'
'&skipCmtCount=0&singleurl=1&batchId=¬ice=0&appid=4&inCharset=utf-8&outCharset=utf-8'
'&source=qzone&plat=qzone&outstyle=json&format=jsonp&json_esc=1&question=&answer='
'&callbackFun=shine0&callback=shine0_Callback&_={_}')
def __init__(self, user, password):
self.user = user
self.password = password
self.headers = {
"Referer": "https://creator.douyin.com/",
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36',
'Host': 'creator.douyin.com/',
'sec-ch-ua-mobile': '?0',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Dest': 'empty',
'sec-ch-ua': '"Chromium";v="110", "Not A(Brand";v="24", "Google Chrome";v="110"',
'Sec-Fetch-Site': 'same-origin',
'sec-ch-ua-platform': '"macOS"',
'X-Requested-With': 'XMLHttpRequest',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,it;q=0.7',
'Accept': 'application/json, text/javascript, */*; q=0.01'
}
self.options = webdriver.ChromeOptions()
self.options.add_experimental_option('useAutomationExtension', False)
self.options.add_experimental_option('excludeSwitches', ['enable-automation'])
self.options.add_experimental_option("detach", True)
self.options.add_argument("disable-blink-features=AutomationControlled")
self.options.add_argument('lang=zh-CN,zh')
self.options.add_argument(
'user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36')
BINARY_LOCATION = "/Users/JPlane/Desktop/JPlane/application/Google Chrome.app/Contents/MacOS/Google Chrome"
CHROME_DRIVER_BINARY = "/Users/JPlane/Desktop/space/py/wxservice/xxxx/chromedriver-mac-x64/chromedriver"
self.options.binary_location = BINARY_LOCATION
self.chrome_driver_binary = CHROME_DRIVER_BINARY
service = Service(executable_path=CHROME_DRIVER_BINARY)
driver = webdriver.Chrome(service=service, options=self.options)
# driver = webdriver.Chrome('/Users/JPlane/Desktop/space/py/ddy/qzone-photo-downloader/chromedriver')
# 使用 get() 方法打开待抓取的 URL
driver.get('http://user.qzone.qq.com')
time.sleep(8)
# 等待 5 秒后,判断页面是否需要登录,通过查找页面是否有相应的 DIV 的 id 来判断
try:
driver.find_element_by_id('login_div')
a = True
except:
a = False
if a == True:
# 如果页面存在登录的 DIV,则模拟登录
driver.switch_to.frame('login_frame')
driver.find_element_by_id('switcher_plogin').click()
driver.find_element_by_id('u').clear() # 选择用户名框
driver.find_element_by_id('u').send_keys(user)
driver.find_element_by_id('p').clear()
driver.find_element_by_id('p').send_keys(password)
driver.find_element_by_id('login_button').click()
time.sleep(3)
driver.implicitly_wait(3)
# 判断好友空间是否设置了权限,通过判断是否存在元素 ID:QM_OwnerInfo_Icon
try:
driver.find_element_by_id('QM_OwnerInfo_Icon')
b = True
except:
b = False
# 如果有权限能够访问到说说页面,那么定位元素和数据,并解析
if b == True:
print("登录成功")
# 尝试一下获取 Cookie,使用 get_cookies()
cookies = driver.get_cookies()
# print(cookies)
cookies_dict = {}
cookies_dict = {c['name']: c['value'] for c in cookies}
self.cookie = cookies_dict
self.session = ""
self.qzone_g_tk = self.calc_g_tk(cookies_dict['p_skey'])
driver.close()
driver.quit()
# -----------------
# 计算 g_tk
# -----------------
def calc_g_tk(self, p_skey):
t = 5381
for c in p_skey:
t += (t << 5) + ord(c)
return t & 2147483647
def access_net(self, url, timeout):
'''
使用登录时的 session,cookie 访问网络 ,适用于高版本的 qzone api
'''
r = requests.get(url, cookies=self.cookie, timeout=timeout)
# r = self.session.get(url, timeout=timeout)
c = r.text
c = c.replace('shine0_Callback(', '').replace(');', '')
# print(c)
return c
def get_albums(self, dest_user):
import json
albums = []
url = self.albumbase.format(gtk=self.qzone_g_tk,
t=random.Random().random(),
dest_user=dest_user,
user=self.user)
if app_config['is_api_debug']:
print(url)
c = self.access_net(url, timeout=8)
if c:
c = json.loads(c)
if app_config['is_api_debug']:
print(c)
if c['code'] != 0:
return albums
if ('data' in c) and ('albumListModeSort' in c['data']):
for i in c['data']['albumListModeSort']:
if i.__contains__('albumList'):
if (i['albumList']) is None:
continue
if app_config['is_api_debug']:
print(i['albumList'])
# for ii in i['albumList']:
albums.append(QzoneAlbum._make([i['id'], i['name'], i['total']]))
if app_config['is_api_debug']:
print(albums)
return albums
def get_photos_by_album(self, dest_user, album):
import json
photos = []
pageStart = 0
pageNum = 500 # 接口最多返回 500 条照片
totalInAlbum = 0 # 总照片数量
totalInPage = 0 # 当次分页拿到了多少张照片
while True:
url = self.photobase.format(gtk=self.qzone_g_tk,
t=random.Random().random(),
dest_user=dest_user,
user=self.user,
album_id=album.uid,
pageStart=pageStart,
pageNum=pageNum,
_=int(time.time()*1000),
)
if app_config['is_api_debug']:
print(url)
c = self.access_net(url, timeout=app_config['timeout_init'])
if app_config['is_api_debug']:
print(c)
if c:
c = json.loads(c)
if c['code'] != 0:
return photos
if 'data' in c:
totalInAlbum = c['data']['totalInAlbum']
totalInPage = c['data']['totalInPage']
if totalInAlbum == 0: # 该相册没有照片
return photos
if totalInPage == 0: # 当次请求没有获取到照片,也就说明到了最后的页数
return photos
if 'photoList' in c['data']:
photolist = c['data']['photoList']
if photolist is None:
return photos
for i in photolist:
if i['raw']:
pic_url = i['raw']
else:
pic_url = i['url']
photos.append(QzonePhoto._make([pic_url, i['name'], album, i['is_video']]))
# 如果第一次总数就已经是获取到的数量,就说明只有第一页,不需要继续下一页
if totalInAlbum == totalInPage:
return photos
# 下一页的请求参数
pageStart = pageStart + totalInPage
return photos
def get_photos(self, dest_user):
'''
能访问所有相册, 前提是先有权限访问该相册
:param dest_user:
:return:
'''
from concurrent.futures import ThreadPoolExecutor
# 先获得所有相册
albums = self.get_albums(dest_user)
photos_all = []
io_print(u'获取到 {0} 个相册'.format(len(albums)))
for i in range(len(albums)):
if app_config['is_api_debug']:
print(f' {albums[i].name}')
dest_path = os.path.join(func_save_dir(dest_user), albums[i].name)
if not os.path.exists(dest_path):
os.makedirs(dest_path)
for i, album in enumerate(albums):
if album.name in app_config["executionQzoneAlbums"]:
print(f'该相册排除不下载: {album.name}')
continue
# 根据相册 id 获取相册内所有照片
photos = self.get_photos_by_album(dest_user, album)
photos = [(self.session, dest_user, i, album.name, si, photo)
for si, photo in enumerate(photos)]
print(f'{album.name}-->总共{len(photos)}张')
p = func_save_dir(dest_user)
if not os.path.exists(p):
os.makedirs(p)
photos_all.extend(photos)
max_workers = app_config['max_workers']
print(f'启用多线程下载,并行下载线程数量为 {max_workers}')
with ThreadPoolExecutor(max_workers=max_workers) as pool:
r = pool.map(func_save_photo, photos_all)
list(r)
if not albums:
io_stderr_print(u'未找到 {0} 可下载的相册'.format(dest_user))
def entry():
# 你的 QQ和密码,QQ号必须写,密码可以省略,然后使用网页快速登录功能
main_user = xxxxxxx
main_pass = 'xxxxxxxx'
# 要处理的目标 QQ 号,此处可填入多个QQ号,中间用逗号隔开
dest_users = [
xxxxxxx
]
a = QzonePhotoManager(main_user, main_pass)
io_print(u'登录成功')
# 一些优化配置,根据自己的需要调整
# 如果不需要调整的话,可以将这个配置在这里注释掉使用默认配置
global app_config
app_config["max_workers"] = 2 # 并行下载线程数量
app_config["timeout_init"] = 50 # 初始超时时间,可以调整大一点,防止某些照片过大,响应太慢
app_config["max_attempts"] = 3 # 下载失败后最大重试次数
app_config["is_api_debug"] = True # 是否打印 API 的响应结果,在调试的时候使用
app_config["executionQzoneAlbums"] = [
] # 排除不下载的相册名称,多个用逗号分隔,比如 'a','b'
# 如果遇到下载失败的,产生超时异常终止程序运行的,可以再重新运行,已经下载过的文件不会重新下载
for e in dest_users:
io_print(u'正在处理用户 {0}'.format(e))
a.get_photos(e)
io_print(u'处理完成')
if __name__ == '__main__':
entry()