-
Notifications
You must be signed in to change notification settings - Fork 0
/
generate.py
executable file
·131 lines (115 loc) · 4.62 KB
/
generate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import os, datetime, requests, re, logging, asyncio, concurrent.futures, json
logging.basicConfig(format='%(asctime)s [%(levelname)s] %(message)s',
datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
SOURCE_DIR = "src"
DEST_DIR = "dist"
STMT_OPERATOR = "!!"
def download_webpage(url):
res = requests.get(url)
logging.info("downloading {}".format(url))
if res.ok:
return res.text
else:
logging.warning("failed to download {}, {}, {}".format(url, res.status_code, res.text))
return None
def download_webpages(urls):
pages = []
with concurrent.futures.ThreadPoolExecutor(20) as executor:
for result in executor.map(download_webpage, urls):
if result != None:
pages.append(result)
logging.info("downloaded {}/{} webpages".format(len(pages), len(urls)))
return pages
# Extract hostnames from blocklists
def extract_hostnames(lines):
hostnames = []
for line in lines:
line = line.strip()
if not (line == "" or line.startswith("!") or line.startswith("#")):
matched = re.search("[\w\d-]+(\.[\w\d]+)+", line)
if matched:
hostnames.append(matched.group())
else:
logging.warning("failed to extract: '{}'".format(line))
return hostnames
def apply_templates(inputs, templates):
result = ""
for template in templates:
for input in inputs:
result += template.format(input) + "\n"
return result
for filename in os.listdir(SOURCE_DIR):
path = os.path.join(SOURCE_DIR, filename)
if not os.path.isfile(path):
continue
meta = {}
sources = []
templates = []
hostnames = []
result = ""
with open(path, "r") as f:
logging.info("generating source {}".format(path))
contents = f.readlines()
for line in contents:
line = line.strip()
if line == "":
continue
elif line.startswith(STMT_OPERATOR):
stmt, input = line.split(" ", 1)
stmt = stmt[2:]
if stmt == "meta":
key, val = input.split(" ", 1)
meta[key] = val
elif stmt == "src":
sources.append(input.strip())
elif stmt == "tmpl":
templates.append(input.strip())
elif stmt == "add":
hostname = input.strip()
if hostname[0] == "@":
# Invidious instances API
if hostname[1:] == "invidious":
logging.info("downloading list of invidious instances..")
res = requests.get("https://api.invidious.io/instances.json?sort_by=health")
if res.ok:
instances = json.loads(res.text)
for item in instances:
if item[1]['type'] == "https":
hostnames.append(item[0])
else:
logging.warning("failed to download invidious instances {}, {}, {}".format(url, res.status_code, res.text))
else:
hostnames.append(hostname)
else:
logging.error("unkown statement {}".format(stmt))
elif not line.startswith("!"):
result += line + "\n"
# Merge blocklists
if len(sources) > 0 or len(hostnames) > 0:
if len(sources) > 0:
lists = download_webpages(sources)
for text in lists:
lines = text.split("\n")
list_hostnames = extract_hostnames(lines)
hostnames.extend(list_hostnames)
# Remove duplicates
items = []
[items.append(x) for x in hostnames if x not in items]
items.sort()
logging.info("reduced duplicates from {} to {} hosts".format(len(hostnames), len(items)))
# Templates
result += apply_templates(items, templates)
# Add generated & expires metadata
meta["Generated"] = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC")
meta["Expires"] = "7 day"
# Generate metadata
header = ""
for key, val in meta.items():
header += "! {}: {}\n".format(key, val)
result = header + result
# Write output
os.makedirs(DEST_DIR, exist_ok=True)
dest_path = os.path.join(DEST_DIR, filename + ".txt")
with open(dest_path, "w") as f:
logging.info("saving output to {}".format(str(dest_path)))
f.write(result)