-
Notifications
You must be signed in to change notification settings - Fork 6
/
endpoints.py
462 lines (376 loc) · 18.1 KB
/
endpoints.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
from urllib.parse import urlparse
from fastapi import HTTPException, APIRouter, Body
from fastapi.responses import FileResponse, HTMLResponse
from fastapi.openapi.utils import get_openapi
from httpx import AsyncClient, RequestError
from utils import *
from typing import List, Dict, Optional
from logger import error_log
from analytics import update_analytics, generate_plot
import os
from plugins_html_table import generate_plugins_html_table, PLUGIN_COLUMNS
import shutil
import git
import zipfile
class Endpoints:
def __init__(self, app, plugin_json, cache_duration, page_size):
self.cache_duration = cache_duration
self.json = plugin_json
self.page_size = page_size
self.app = app
self.cache = {}
self.cache_timestamp = {}
# Define FastAPI endpoints
self.router = APIRouter()
self.router.add_api_route("/plugins", self.get_all_plugins, methods=["GET"])
self.router.add_api_route("/plugins-table", self.get_plugins_html_table, methods=["GET"])
self.router.add_api_route("/tags", self.get_all_tags, methods=["GET"])
self.router.add_api_route("/tag/{tag_name}", self.get_plugins_by_tag, methods=["GET"])
self.router.add_api_route("/exclude", self.exclude_plugins, methods=["POST"])
self.router.add_api_route("/author", self.get_plugins_by_author, methods=["POST"])
self.router.add_api_route("/download", self.download_plugin_zip, methods=["POST"])
self.router.add_api_route("/search", self.search_plugins, methods=["POST"])
self.router.add_api_route("/analytics", self.get_analytics, methods=["GET"])
self.router.add_api_route("/analytics/graph", self.get_analytics_plot, methods=["GET"])
self.router.add_api_route("/", self.home, methods=["GET"])
app.include_router(self.router)
async def cache_plugins(self):
try:
async with AsyncClient() as client:
response = await client.get(self.json)
data = response.json()
analytics_data = read_analytics_data()
cached_plugins = []
for entry in data:
url = entry["url"]
plugin_json_url = url.replace("github.com", "raw.githubusercontent.com") + "/main/plugin.json"
try:
print(f"Fetching plugin {plugin_json_url}")
plugin_data = await fetch_plugin_json(plugin_json_url)
# Validate plugin.json required fields
name = plugin_data.get("name")
author_name = plugin_data.get("author_name")
if name and author_name:
plugin_data['url'] = url
cached_plugins.append(plugin_data)
else:
message = f"Error: Skipping plugin {url}"
error_log(message, "WARNING")
except Exception as e:
error_msg = f"Error fetching plugin: {plugin_json_url}, Error: {str(e)}"
print(error_msg)
error_log(error_msg, "ERROR")
for plugin in cached_plugins:
# Use the 'url' from the item to find the corresponding analytics data
downloads = analytics_data.get(plugin['url'])
# If analytics data is found, add a 'downloads' key to the item with the value
if downloads is not None:
plugin['downloads'] = downloads
else:
# noinspection PyTypeChecker
plugin['downloads'] = 0
# Update the cache with the new data and timestamp
self.cache["plugins"] = cached_plugins
self.cache_timestamp["plugins"] = datetime.utcnow()
except RequestError as e:
message = f"Error fetching data from GitHub: {str(e)}"
error_log(f"Can't cache plugins. {message}", "ERROR")
raise HTTPException(status_code=500, detail=message)
async def get_all_plugins(self, page: int = 1, page_size: int = 0, order: Optional[str] = None):
if page_size == 0:
page_size = self.page_size
# Check if cache is still valid
if not is_cache_valid(self.cache_duration, self.cache_timestamp):
await self.cache_plugins()
# Retrieve plugins from cache
cached_plugins = self.cache["plugins"]
# Create a copy for preserving the original order
sorted_plugins = cached_plugins[:]
if order == 'oldest':
pass # Default
elif order == 'newest':
sorted_plugins = list(reversed(sorted_plugins))
elif order == 'popular':
sorted_plugins.sort(key=lambda x: x.get('downloads', 0), reverse=True)
elif order == 'a2z':
sorted_plugins.sort(key=lambda x: x.get('name', '').lower())
elif order == 'z2a':
sorted_plugins.sort(key=lambda x: x.get('name', '').lower(), reverse=True)
start_index = (page - 1) * page_size
end_index = start_index + page_size
paginated_plugins = sorted_plugins[start_index:end_index]
return {
"total_plugins": len(sorted_plugins),
"page": page,
"page_size": page_size,
"plugins": paginated_plugins,
}
async def get_plugins_html_table(self, columns: Optional[str] = ",".join(PLUGIN_COLUMNS), render_link: Optional[bool] = False, classes: Optional[str] = "plugins-table"):
if not is_cache_valid(self.cache_duration, self.cache_timestamp):
await self.cache_plugins()
html_img = generate_plugins_html_table(self.cache["plugins"], columns, render_link, classes)
return HTMLResponse(content=html_img)
async def get_all_tags(self):
# Check if cache is still valid, otherwise update the cache
if not is_cache_valid(self.cache_duration, self.cache_timestamp):
await self.cache_plugins()
# Get all tags from plugin data
all_tags = set()
for plugin_data in self.cache["plugins"]:
if "tags" in plugin_data:
tags = plugin_data["tags"]
if isinstance(tags, str):
all_tags.update(tags.split(","))
elif isinstance(tags, list):
for tag in tags:
all_tags.update(tag.split(","))
tags_dict = {i: tag.strip() for i, tag in enumerate(all_tags)}
return tags_dict
async def get_plugins_by_tag(self, tag_name: str, page: int = 1, page_size: int = 0):
if page_size == 0:
page_size = self.page_size
# Check if cache is still valid, otherwise update the cache
if not is_cache_valid(self.cache_duration, self.cache_timestamp):
await self.cache_plugins()
# Find plugins containing the given tag
matching_plugins = []
for plugin_data in self.cache["plugins"]:
if "tags" in plugin_data and tag_name in plugin_data["tags"]:
matching_plugins.append(plugin_data)
total_plugins = len(matching_plugins)
start_index = (page - 1) * page_size
end_index = start_index + page_size
if start_index >= total_plugins:
return []
return {
"total_plugins": total_plugins,
"page": page,
"page_size": page_size,
"plugins": matching_plugins[start_index:end_index],
}
@staticmethod
def filter_plugins_by_names(plugins, excluded):
return [plugin_data for plugin_data in plugins if plugin_data.get('name') not in excluded]
async def exclude_plugins(self, page: int = 1, page_size: int = 10, excluded: List[str] = Body(..., embed=True)):
# Check if cache is still valid, otherwise update the cache
if not is_cache_valid(self.cache_duration, self.cache_timestamp):
await self.cache_plugins()
plugins_to_exclude = set(excluded)
filtered_plugins = self.filter_plugins_by_names(self.cache["plugins"], plugins_to_exclude)
total_plugins = len(filtered_plugins)
start_index = (page - 1) * page_size
end_index = start_index + page_size
if start_index >= total_plugins:
return []
return {
"total_plugins": total_plugins,
"page": page,
"page_size": page_size,
"plugins": filtered_plugins[start_index:end_index],
}
async def get_plugins_by_author(self, author_name: str = Body(..., embed=True), page: int = 1, page_size: int = 0):
if page_size == 0:
page_size = self.page_size
# Check if cache is still valid, otherwise update the cache
if not is_cache_valid(self.cache_duration, self.cache_timestamp):
await self.cache_plugins()
# Find plugins by the specified author name
matching_plugins = []
for plugin_data in self.cache["plugins"]:
if plugin_data.get("author_name") == author_name:
matching_plugins.append(plugin_data)
total_plugins = len(matching_plugins)
start_index = (page - 1) * page_size
end_index = start_index + page_size
if start_index >= total_plugins:
return []
return {
"total_plugins": total_plugins,
"page": page,
"page_size": page_size,
"plugins": matching_plugins[start_index:end_index],
}
@staticmethod
async def get_analytics() -> Dict[str, int]:
analytics_data = read_analytics_data()
return analytics_data
async def get_analytics_plot(self) -> HTMLResponse:
# Check if cache is still valid, otherwise update the cache
if not is_cache_valid(self.cache_duration, self.cache_timestamp):
await self.cache_plugins()
html_img = generate_plot(self.cache["plugins"])
return HTMLResponse(content=html_img)
async def download_plugin_zip(self, plugin_data: dict = Body({"url": ""})):
# Check if cache is still valid, otherwise update the cache
if not is_cache_valid(self.cache_duration, self.cache_timestamp):
await self.cache_plugins()
plugin_url = plugin_data.get("url")
if not plugin_url:
raise HTTPException(status_code=400, detail="Missing 'url' in request body.")
matching_plugins = [plugin for plugin in self.cache["plugins"] if plugin.get("url") == plugin_url]
if not matching_plugins:
raise HTTPException(status_code=404, detail=f"Plugin url '{plugin_url}' not found.")
plugin_data = matching_plugins[0]
plugin_name = str(plugin_data.get("name"))
# Check if there is a release zip file
path_url = str(urlparse(plugin_url).path)
url = "https://api.github.com/repos" + path_url + "/releases"
async with AsyncClient() as client:
response = await client.get(url)
if response.status_code != 200:
raise HTTPException(
status_code=503,
detail={"error": "Github API not available"}
)
response = response.json()
i = 0
try:
# list of assets with files differents from Source code zips
assets = response[i]["assets"]
while len(assets) == 0:
i += 1
assets = response[i]["assets"]
url_zip = assets[0]["browser_download_url"]
version = response[i]["tag_name"]
if i != 0:
error_log(f"The plugin {plugin_name} has no release zip file or was pushed by hand, the version pulled is {version}", "WARNING")
zip_filename = await self.download_releses_plugin_zip(plugin_name, url_zip, version)
except (IndexError, KeyError):
# if you are here, there aren't any assets in the response. So, download the zip repo
repo_path = await self.clone_repository(plugin_url, plugin_name)
zip_filename = await self.create_plugin_zip(repo_path, plugin_name)
# Set the appropriate headers to trigger a download
headers = {
"Content-Disposition": f"attachment; filename={plugin_name}.zip"
}
# Update analytics count
update_analytics(plugin_url)
return FileResponse(zip_filename, headers=headers, media_type="application/zip")
@staticmethod
async def clone_repository(plugin_url: str, plugin_name: str):
# Define a cache directory
cache_dir = "repository_cache"
if not os.path.exists(cache_dir):
os.makedirs(cache_dir)
repo_path = os.path.join(cache_dir, plugin_name)
# Check if the repository is already cloned and updated
if os.path.exists(repo_path):
try:
repo = git.Repo(repo_path)
origin = repo.remotes.origin
origin.fetch()
if "master" in origin.refs:
origin_master = origin.refs["master"]
else:
origin_master = origin.refs["main"]
diff = repo.git.diff(origin_master.commit, repo.head.commit)
if diff == "":
return repo_path
else:
shutil.rmtree(repo_path)
except Exception as e:
message = f"Error while checking repository status: {str(e)}"
error_log(f"{repo_path} - {message}", "WARNING")
shutil.rmtree(repo_path)
# Clone the repository
try:
git.Repo.clone_from(plugin_url, repo_path)
except git.GitCommandError as e:
message = f"Failed to clone repository: {str(e)}"
error_log(f"{repo_path} - {message}")
raise HTTPException(status_code=500, detail=message)
return repo_path
@staticmethod
async def create_plugin_zip(repo_path: str, plugin_name: str):
zip_cache_dir = "zip_cache"
if not os.path.exists(zip_cache_dir):
os.makedirs(zip_cache_dir)
zip_filename = os.path.join(zip_cache_dir, f"{plugin_name}.zip")
# Create a .zip file excluding .git and __pycache__ folders
with zipfile.ZipFile(zip_filename, "w") as zip_file:
for root, _, files in os.walk(repo_path):
# Skip .git and __pycache__ folders
if ".git" in root or "__pycache__" in root:
continue
for file in files:
file_path = os.path.join(root, file)
zip_file.write(file_path, os.path.relpath(file_path, repo_path))
return zip_filename
@staticmethod
async def download_releses_plugin_zip(plugin_name: str, url_zip: str, version_origin: str):
# Define a cache directory
cache_dir = "zip_cache"
if not os.path.exists(cache_dir):
os.makedirs(cache_dir)
name_plugin = plugin_name + ".zip"
os_path_plugin = os.path.join(cache_dir, name_plugin)
check = check_version_zip(plugin_name, version_origin)
if os.path.exists(os_path_plugin) and check:
return os_path_plugin
else:
async with httpx.AsyncClient() as client:
response = await client.get(url_zip)
"""
Check if there is a redirect, by looping the response
"""
while response.is_redirect:
try:
url_location = response.headers["location"]
response = await client.get(url_location)
except httpx.RequestError as e:
error = {"error": str(e)}
raise HTTPException(
status_code=400,
detail=error
)
if response.status_code != 200:
error_message = f"GitHub API error: {response.text}"
raise HTTPException(
status_code=response.status_code,
detail=error_message
)
with open(os_path_plugin, "wb") as zip_ref:
for chunk in response.iter_bytes(chunk_size=8192):
zip_ref.write(chunk)
update_version_zip(plugin_name, version_origin)
return os_path_plugin
async def search_plugins(self, search_data: dict):
# Check if cache is still valid, otherwise update the cache
if not is_cache_valid(self.cache_duration, self.cache_timestamp):
await self.cache_plugins()
query = search_data.get("query")
if not query:
raise HTTPException(status_code=400, detail="Missing 'query' in request body.")
query_words = query.split()
matching_plugins = []
for plugin_data in self.cache["plugins"]:
plugin_matches_all_words = all(
any( str(word).lower() in str(field).lower() for field in plugin_data.values())
for word in query_words
)
if plugin_matches_all_words:
matching_plugins.append(plugin_data)
return matching_plugins
async def home(self):
"""
Returns the registry status.
"""
out = {
"status": "✅ Running: Fuck the American Dream! 🖕",
"version": self.app.openapi_schema["info"]["version"]
}
return out
def customize_openapi(self, title: str, logo_url: str, version: str, description: str):
if self.app.openapi_schema:
return self.app.openapi_schema
openapi_schema = get_openapi(
title=title,
version=version,
description=description,
routes=self.app.routes,
)
openapi_schema["info"]["x-logo"] = {
"url": logo_url
}
self.app.openapi_schema = openapi_schema
return self.app.openapi_schema