前言
我想给我写的播放器换一套演示视频和弹幕,而弹幕正好在 AcFun
(以下简称A站)有,于是想着利用 Python
写一个脚本爬取完整的弹幕列表。本文只讲解爬取A站弹幕,至于弹幕格式转换不在本文范畴。
注:爬取番剧弹幕和爬取个人视频略有不同,本文只适用于爬取番剧的弹幕,但思路都是一样的,在两者有差异的地方我会有所标注。
分析网页
找到弹幕接口的请求与响应内容
首先打开想要爬取弹幕的视频,这里我用番剧《摇曳露营△》的播放页作示例:链接,然后打开开发者工具
(F12或者Ctrl+Shift+i),将选项卡切换到”网络“
,接着再刷新一次页面。
面对一大堆数据,我们可以使用 Ctrl+F
进行搜索,挑一条弹幕作为搜索内容即可。
(图中右侧的载荷里是请求参数,不同视频的resourceId也不同)
很容易就找到了弹幕接口的信息,不过这里能找到两个弹幕的接口,分别是 pollByPosition
和 list
。
经过研究发现,pollByPosition
接口是视频播放时分段进行获取弹幕的接口,这种隔一段时间获取一次弹幕的方式能确保弹幕的实时性;而 list
接口则是网页右侧展示的弹幕列表的接口,该接口可以获取到按时间正序与倒序排序的弹幕。两个接口都可以进行爬取数据,本文就用 list
接口作为示例。
通过手动翻页再分析请求头发现请求参数的 pcursor 对应上了页码,而响应内容也有一个 pcursor 刚好对应上了下一页的页码;当我选择最后一页时,发现响应内容的 pcursor 变成了 “no_more”,我想这应该代表已经是最后一页没有下一页了。
列出请求表
请求地址 | https://www.acfun.cn/rest/pc-direct/new-danmaku/list | |
---|---|---|
请求方式 | POST | |
请求内容类型(Content-Type) | application/x-www-form-urlencoded | |
请求参数 | resourceId | 视频的唯一id,等同于videoId。 |
resourceType | 似乎固定填9。 | |
enableAdvanced | 填true或false。是否开启高级弹幕(可能)。 | |
pcursor | 填数字。分页的页码,从1开始。 | |
count | 填数字。获取的弹幕数量,限制在[1~200]之间,再多也等同200。 | |
sortType | 填1或2。1为不排序,2为排序(挺奇葩的)。 | |
asc | 填true或false。true为正序,false为倒序。 |
列出响应表
响应内容类型(content-type) | application/json | |
---|---|---|
响应参数 | result | 似乎固定为0 |
danmakus | 弹幕列表 | |
styleDanmakuCount | 可能是花里胡哨弹幕的数量? | |
requestId | ||
pcursor | 下一页的页码,当值为“no_more”则表示没有下一页 | |
host-name | ||
totalCount | 该视频的总弹幕数量 |
经过分析可以得到以上的请求与响应表格(基于效果进行的猜测,不一定100%准确)。
在代码的实现上我们可以爬取每一页的弹幕再组合在一起,就能得到所有弹幕的集合。
但在这之前也有个问题,resourceId
没有(通过代码的)获取方式,不可能每次都通过开发者选项获取吧。
不过好在也有解决方法:
首先想到通过查看网页源代码的方式,在页面空白处右键鼠标即可找到:
利用Ctrl+F查找resourceId,但寻了一会没找到有用的信息(而且都在js代码里不太好反推),索性直接搜寻我当前视频已知的resourceId:11137579
(上文中的响应内容有),于是找到了这两个有效的信息(下图左侧):
经过搜索发现 11137579
经常在 window.pageInfo
与 window.bangumiList
这两行中出现,于是我在网页的控制台直接输入了个这两个变量名,这样可以非常方便地查看那两行后面赋值的json数据。
经过查阅,可以确定pageInfo变量中的 videoId
即为该视频的 resourceId
,而 bangumiList 中的 items 有整个番剧的所有话/集(第1话、第2话...这种),每一话中同样也有videoId。这样一来就可以一次性获取到当前视频的 resourceId 和整部番中所有话的 resourceId,一举两得。
注:非番剧的视频的 pageInfo 这一行是 window.pageInfo = window.videoInfo
,并且视频的 resourceId 是 currentVideoId
,与番剧视频略有不同。
代码实践
实现获取当前的 resourceId 与整部番的数据
通过链接地址来获取 resourceId(videoId),由于该 id 是存在于网页的 js 代码中,固没办法用 parsel 库,于是转而使用正则表达式匹配。实现步骤如下:
使用 request 库获取接口的数据
这就很简单了,不过 user-agent 需要自定义,否则请求会被A站拒绝。
### 只适合于番剧 import json import re import requests web_link = "https://www.acfun.cn/bangumi/aa6000991" # 地址 response = requests.get( web_link, headers={"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"} ) # print(response.text) # 响应内容
使用正则匹配
# 正则获取pageInfo(仅适用于番剧视频) page_info_str = re.findall(r'window\.pageInfo = window\.bangumiData = \{(.{0,})\};\n', response.text)[0] info = json.loads('{' + page_info_str + '}') # Map类型的pageInfo数据 print("当前页面的(番剧)视频videoId:{}".format(page_info["videoId"])) # 正则获取bangumiList(仅适用于番剧视频) bangumi_list_str = re.findall(r'window\.bangumiList = \{(.{0,})\};\n', response.text)[0] bangumi_list = json.loads('{' + bangumi_list_str + '}') # Map类型的bangumiList数据 for item in bangumi_list["items"]: print("{}:{},videoId:{}".format(item["episodeName"], item["title"], item["videoId"]))
输出结果如下
注:如果是非番剧视频,即个人用户上传的视频,则这样获取 pageInfo:
page_info_str = re.findall(r'window\.pageInfo = window\.videoInfo = \{(.{0,})\};\n', response.text)[0] page_info = json.loads('{' + page_info_str + '}') print("当前页面的视频videoId:{}".format(page_info["currentVideoId"]))
最后将代码封装成函数
def get_bangumi_video_info(web_link): """ 通过网站链接获取当前视频与整部番剧的信息(仅限番剧页)\n Args: web_link (String): 网页链接 return for example: { "currentVideoId" : 11137579, "bangumiList": [ {"episodeName": "第1话", "title": "富士山与咖喱面", "videoId": 11137579}, ... ], } """ result = { "currentVideoId" : None, # 当前视频的videoId "bangumiList": [], # 整部番剧的剧集 } # 返回的结果 response = requests.get( web_link, headers = {"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"} ) page_info_str = re.findall(r'window\.pageInfo = window\.bangumiData = \{(.{0,})\};\n', response.text)[0] page_info = json.loads('{' + page_info_str + '}') result["currentVideoId"] = page_info["videoId"] bangumi_list_str = re.findall(r'window\.bangumiList = \{(.{0,})\};\n', response.text)[0] bangumi_list = json.loads('{' + bangumi_list_str + '}') for item in bangumi_list["items"]: bangumi = { "episodeName": item["episodeName"], "title": item["title"], "videoId": item["videoId"], } result["bangumiList"].append(bangumi) return result print(get_bangumi_video_info(web_link))
输出结果如下图左侧,而右侧是格式化json后的数据
有了这个函数,我们就可以很方便的提取出单前视频和整部番剧每一集的信息,为下文爬取整部番的弹幕提供基础。
实现通过 videoId 获取弹幕数据
一步一步实现,先从爬取单个视频的一页弹幕开始,再单个视频的完整弹幕列表,最后再实现爬取整个番剧所有视频的弹幕。
通过 videoId 获取第一页的 200 个弹幕,并写入到 output.json文 件中
import requests video_id= 11137579 # 等同于resourceId pcursor = 1 # 等同于页码 response = requests.post( "https://www.acfun.cn/rest/pc-direct/new-danmaku/list", data={ "resourceId": video_id, "resourceType": 9, "enableAdvanced": True, "pcursor": pcursor, "count": 200, "sortType": 2, "asc": True, }, headers={"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"} ) # 写入到output.json文件中 with open('output.json', 'w', encoding='utf_8', newline='') as file: file.write(response.text)
将 output.json 文件用编辑器格式化后如下:
成功爬取到一页弹幕,接下来就该实现爬取全部弹幕了。
爬取完整的弹幕列表
由于响应数据是 json 格式,没法直接当成字典访问,所以就需要 python 自带 json 库进行转换。
可以使用json.loads(str)
函数将 json 字符串转换成 Map 字典类型,而json.dumps(map)
则是将Map字典类型转换成 json 字符串。于是代码实现如下:
import json from time import sleep import requests video_id= 11137579 # 等同于resourceId pcursor = 1 # 等同于页码 danmakus = [] # 弹幕列表 output_json = {} # 最终输出的结果,包含了danmakus和其他相关信息 while True: response = requests.post( "https://www.acfun.cn/rest/pc-direct/new-danmaku/list", data={ "resourceId": video_id, "resourceType": 9, "enableAdvanced": True, "pcursor": pcursor, "count": 200, "sortType": 2, "asc": True, }, headers={"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"} ) # 将json字符串格式化为Map字典 response_json = json.loads(response.text) # 往弹幕列表里追加本次获取到的弹幕 danmakus.extend(response_json["danmakus"]) if response_json["pcursor"] == "no_more": # 没有下一页了,就跳出循环 break pcursor += 1 # 下一页 sleep(0.1) # 休息0.1秒,防止访问频率过快 output_json = response_json # 最后一页的数据直接拿来用 output_json["danmakus"] = danmakus # 将完整的弹幕集合替换进去 # 写入output.json文件 with open('output.json', 'w', encoding='utf_8', newline='') as file: file.write(json.dumps(output_json))
这里使用了 while 死循环,每次爬取一页数据并且页码依次累加,直到响应数据里的 pcursor 为
"no_more"
时停止循环。使用sleep(seconds)
函数进行睡眠,避免访问过于频繁被A站给封了 IP。打开
output.json
查看:这样就爬取到了完整的弹幕列表,足足6千多条。但是我发现里面本来应有的中文弹幕都变成了
\u****
这种 Unicode 编码字符,显然编码不对,可我在 file 输出指定了 encoding='utf_8' 不应该编码出错啊。调试了代码结合搜索引擎,发现是json库的锅,
json.dumps()
函数里有一条参数:ensure_ascii=True
,该参数默认开启导致转换后的字符串编码统统变成 ASCII,只需要将其赋值为 False 即可输出中文。于是最后一行代码(37行)改成:
file.write(json.dumps(output_json, ensure_ascii=False))
之后,output.json的中文就正常显示了。
将以上代码封装为函数
在上文中已经定义了一个函数
get_bangumi_video_info(web_link)
,我们可以利用该函数返回的信息进行结合,实现一个request_danmakus(video_id, dir_path)
函数:传入视频 id 和存放的文件夹路径,自动爬取该视频的弹幕保存为文件。代码实现如下:
def request_danmakus(video_id, dir_path): """爬取番剧每集的弹幕存储到文件夹中,自动命名。\n Args: video_id (String): resourceId dir_path (String): 存放的文件夹路径 """ pcursor = 1 # 页码 danmakus = [] # 用于输出结果的弹幕列表 output_json = {} # 最终输出的结果,包含了danmakus和其他相关信息 while True: response = requests.post( "https://www.acfun.cn/rest/pc-direct/new-danmaku/list", data={ "resourceId": video_id, "resourceType": 9, "enableAdvanced": True, "pcursor": pcursor, "count": 200, "sortType": 2, "asc": True, }, headers={"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"} ) # 将json字符串格式化为Map字典 response_json = json.loads(response.text) # 往弹幕列表里追加本次获取到的弹幕 danmakus.extend(response_json["danmakus"]) if response_json["pcursor"] == "no_more": # 没有下一页了,就跳出循环 break pcursor += 1 # 下一页 sleep(0.1) # 休息0.1秒,防止访问频率过快 output_json = response_json # 最后一页的数据直接拿来用 output_json["danmakus"] = danmakus # 将完整的弹幕集合替换进去 # 单个弹幕文件的路径 file_path = "{}/{}-{}.json".format(dir_path, bangumi["episodeName"], bangumi["title"]) with open(file_path, 'w', encoding='utf_8', newline='') as file: file.write(json.dumps(output_json, ensure_ascii=False)) print("成功获取:{}".format(file_path))
调用示例:
request_danmakus(11137579, "./output_danmakus")
,执行完毕后在output_danmakus
文件夹内就可以看到第1话-富士山与咖喱面.json
的弹幕文件,不过多赘述。
实现自动化爬取整部番剧的弹幕
将
get_bangumi_video_info
与request_danmakus
函数结合使用,可以编写出自动化脚本:import json import os import re from time import sleep import requests output_dir_path = "./output_danmakus" # 输出弹幕文件的目录路径 print("该脚本功能:爬取A站某部番剧的每一集所有弹幕,导出到'{}'目录中".format(output_dir_path)) url = input("请输入任意一集的链接(例如https://www.acfun.cn/bangumi/aa6000991):") # 番剧网站的链接(任意一集) def get_bangumi_video_info(web_link): # 省略...上文中有... def request_danmakus(video_id, dir_path): # 省略...上文中有... if not os.path.exists(dir): os.makedirs(dir) # 文件夹不存在则创建 bangumi_list = get_bangumi_video_info(url)["bangumiList"] # 获取到番剧的视频列表 print("开始获取弹幕...") for bangumi in bangumi_list: video_id = bangumi["videoId"] # resourceId request_danmakus(video_id, output_dir_path)
只需输入某部番剧任意一集的网页链接,就可以自动爬取每一集的弹幕存储在
./output_danmakus
目录中,并且弹幕文件的命名也人性化处理了。效果如下图:
最终完整脚本代码
import json
import os
import re
from time import sleep
import requests
output_dir_path = "./output_danmakus" # 输出弹幕文件的目录路径
print("该脚本功能:爬取A站某部番剧的每一集所有弹幕,导出到'{}'目录中".format(output_dir_path))
url = input("请输入任意一集的链接(例如https://www.acfun.cn/bangumi/aa6000991):") # 番剧网站的链接(任意一集)
def get_bangumi_video_info(web_link):
""" 通过网站链接获取当前视频与整部番剧的信息(仅限番剧页)\n
Args:
web_link (String): 网页链接
return for example:
{
"currentVideoId" : 11137579,
"bangumiList": [
{"episodeName": "第1话", "title": "富士山与咖喱面", "videoId": 11137579},
...
],
}
"""
result = {
"currentVideoId" : None,
"bangumiList": [],
} # 返回的结果
response = requests.get(
web_link,
headers = {"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"}
)
page_info_str = re.findall(r'window\.pageInfo = window\.bangumiData = \{(.{0,})\};\n', response.text)[0]
page_info = json.loads('{' + page_info_str + '}')
result["currentVideoId"] = page_info["videoId"]
bangumi_list_str = re.findall(r'window\.bangumiList = \{(.{0,})\};\n', response.text)[0]
bangumi_list = json.loads('{' + bangumi_list_str + '}')
for item in bangumi_list["items"]:
bangumi = {
"episodeName": item["episodeName"],
"title": item["title"],
"videoId": item["videoId"],
}
result["bangumiList"].append(bangumi)
return result
def request_danmakus(video_id, dir_path):
"""爬取番剧每集的弹幕存储到文件夹中,自动命名。\n
Args:
video_id (String): resourceId
dir_path (String): 存放的文件夹路径
"""
pcursor = 1 # 页码
danmakus = [] # 用于输出结果的弹幕列表
output_json = {} # 最终输出的结果,包含了danmakus和其他相关信息
while True:
response = requests.post(
"https://www.acfun.cn/rest/pc-direct/new-danmaku/list",
data={
"resourceId": video_id,
"resourceType": 9,
"enableAdvanced": True,
"pcursor": pcursor,
"count": 200,
"sortType": 2,
"asc": True,
},
headers={"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"}
)
# 将json字符串格式化为Map字典
response_json = json.loads(response.text)
# 往弹幕列表里追加本次获取到的弹幕
danmakus.extend(response_json["danmakus"])
if response_json["pcursor"] == "no_more":
# 没有下一页了,就跳出循环
break
pcursor += 1 # 下一页
sleep(0.1) # 休息0.1秒,防止访问频率过快
output_json = response_json # 最后一页的数据直接拿来用
output_json["danmakus"] = danmakus # 将完整的弹幕集合替换进去
# 单个弹幕文件的路径
file_path = "{}/{}-{}.json".format(dir_path, bangumi["episodeName"], bangumi["title"])
with open(file_path, 'w', encoding='utf_8', newline='') as file:
file.write(json.dumps(output_json, ensure_ascii=False))
print("成功获取:{}".format(file_path))
if not os.path.exists(output_dir_path):
os.makedirs(output_dir_path) # 文件夹不存在则创建
bangumi_list = get_bangumi_video_info(url)["bangumiList"] # 获取到番剧的视频列表
print("开始获取弹幕...")
for bangumi in bangumi_list:
video_id = bangumi["videoId"] # resourceId
request_danmakus(video_id, output_dir_path)
其他
如果需要将弹幕 json 格式转换成 xml 格式,需要注意可能存在
<>&
这些html特殊字符,需要使用html.escape(response.text, quote=False)
函数代码对响应内容进行额外处理。将上文中出现的 response_json 代码替换成以下代码即可:# 原代码:response_json = json.loads(response.text) response_json = json.loads(html.escape(response.text, quote=False))
- 上文中出现的
sleep(0.1)
可能比较保守,可以自行调整甚至直接去掉该行代码也没问题,反正都是大局域网,封IP也无所谓。