-
Notifications
You must be signed in to change notification settings - Fork 71
/
NodeScraper.py
168 lines (145 loc) · 6.51 KB
/
NodeScraper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import re
import time
from datetime import datetime
from urllib.parse import urljoin
from bs4 import BeautifulSoup
from pyyoutube import Api
from selenium import webdriver
from selenium.common import TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from Config import Decryption
from PwdFinder import find_pwd
from RequestHandler import make_request
# def gen_elem(markup: str, element="", attrs=None) -> Generator[Tag, None, None]:
# """获取网页元素"""
# if attrs is None: attrs = {}
# soup = BeautifulSoup(markup, "html.parser")
# yield from soup.find_all(element, attrs)
class NodeScraper:
name: str
up_date: str
web_date: datetime
detail_url: str
detail_text: str
detail_soup: BeautifulSoup
pattern: str
nodes_index: int
decryption: Decryption
driver: webdriver.Chrome
def __init__(self, name: str, up_date: str, main_url: str, attrs: dict,
pattern: str, nodes_index=0, decryption=None):
"""
:param name: 保存的文件名
:param up_date: 更新日期
:param main_url: 主页链接
:param attrs: 抓取属性
:param pattern: 节点链接匹配表达式
:param nodes_index: 节点链接索引
:param decryption: 解密参数
"""
self.name = name
self.up_date = up_date
self.pattern = pattern
self.nodes_index = nodes_index
self.decryption = decryption if decryption else Decryption(**{})
if main_url.startswith("https://www.youtube.com"):
self.init_webdriver()
self.driver.get(main_url)
main_text = self.driver.page_source
else:
main_text = make_request("GET", main_url).text
main_soup = BeautifulSoup(main_text, "html.parser")
# 选择最新的有日期的
for tag in main_soup.find_all("a", attrs):
match = re.search(r"\d+月\d+", tag.prettify())
if not match: continue
self.web_date = datetime.strptime(str(match.group()), "%m月%d")
self.web_date = self.web_date.replace(year=datetime.today().year)
self.detail_url = urljoin(main_url, tag.get("href", "")) # 获得完整地址
break
def init_webdriver(self):
"""虚拟浏览器初始化"""
options = webdriver.ChromeOptions()
options.add_argument("--headless") # 启用无头模式
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--pageLoadStrategy=eager")
self.driver = webdriver.Chrome(options) # 创建浏览器实例
def get_detail(self):
"""获取详情页"""
if detail_text := make_request("GET", self.detail_url).text:
print(f"{self.name}: 访问 {self.detail_url}")
self.detail_text = detail_text
self.detail_soup = BeautifulSoup(self.detail_text, "html.parser")
else:
raise RuntimeError(f"{self.name}: 无法访问 {self.detail_url}")
def is_locked(self) -> bool:
"""判断网页中是否存在解密元素"""
locked_elems = [{"name": "input", "attrs": {"id": "EPassword"}},
{"name": "input", "attrs": {"id": "pwbox-426"}},
{"name": "input", "attrs": {"name": "secret-key"}}]
return any([e for le in locked_elems for e in self.detail_soup.find_all(**le)])
def is_latest(self) -> bool:
"""判断是否已经是最新的"""
up_date = datetime.strptime(self.up_date, "%Y-%m-%d")
return False if self.web_date.date() > up_date.date() else True
def get_nodes_url(self, text="") -> str:
"""匹配txt文本链接"""
text = text if text else self.detail_text
texts = re.findall(self.pattern, text)
return texts[self.nodes_index] if texts else ""
def get_yt_url(self) -> str:
"""获取youtube视频链接"""
yt_urls = [str(tag.get("href")) for tag in self.detail_soup.find_all("a")
if str(tag.get("href")).startswith("https://youtu.be")]
# 根据yt_index取链接
return yt_urls[self.decryption.get("yt_index", 0)] if yt_urls else ""
def decrypt_for_text(self, pwd: str, url="") -> tuple[bool, str]:
"""网页解密得到隐藏文本内容"""
url = url if url else self.detail_url
print(f"{self.name}: 访问 {url}")
self.driver.get(url)
wait = WebDriverWait(self.driver, 10)
wait.until(EC.presence_of_element_located((By.TAG_NAME, "body")))
time.sleep(5) # 强制等待元素加载
decrypt_by = self.decryption.get("decrypt_by", "click")
# 传递参数给JavaScript函数
if decrypt_by == "js":
self.driver.execute_script(self.decryption["script"], pwd)
# 模拟输入提交
else:
# 定位文本框
by, value = self.decryption["textbox"]
textbox = self.driver.find_element(by, value)
textbox.send_keys(pwd)
# 定位按钮
by, value = self.decryption["button"]
button = self.driver.find_element(by, value)
button.submit()
try:
alert = WebDriverWait(self.driver, 2).until(EC.alert_is_present())
msg = alert.text
alert.accept() # 处理alert弹窗
return False, msg
except TimeoutException:
return True, self.driver.find_element(By.TAG_NAME, "body").text
def get_description(self, yt_url="", yt_key="") -> tuple[str, str]:
"""从视频描述中获取密码和下载链接"""
yt_url = yt_url if yt_url else self.detail_url
vid = re.search(r"[\w-]+(?=[/=])", yt_url[::-1]).group()[::-1]
api = Api(api_key=yt_key)
response = api.get_video_by_id(video_id=vid, parts="snippet")
snippet = response.items[0].to_dict().get("snippet", {})
description = snippet.get("description", "")
ls = [s for s in description.splitlines() if s.strip()]
pwd = ""
link = ""
for i, s in enumerate(ls):
if p := find_pwd(s): pwd = p
if "下载" not in s: continue
elif match := re.search(r"https://[^\r\n\s]+", f"{ls[i]}\n{ls[i + 1]}"):
link = match.group()
break
return pwd, link