From 66b96611757d788538f12dd4d630e13321b593ab Mon Sep 17 00:00:00 2001 From: "Justin Ritter@j54j6" Date: Fri, 17 May 2024 22:05:10 +0200 Subject: [PATCH] Code Quality fixes --- database_manager.py | 8 +-- project_functions.py | 122 +++++++++++++++++++++++-------------------- 2 files changed, 71 insertions(+), 59 deletions(-) diff --git a/database_manager.py b/database_manager.py index d1db4ae..85045ae 100644 --- a/database_manager.py +++ b/database_manager.py @@ -175,7 +175,9 @@ def prepare_sql_create_statement(name, scheme): if "primary_key" in options and options["primary_key"] is True and not primary_key_defined: c_query += " PRIMARY KEY" primary_key_defined = True - elif "primary_key" in options and options["primary_key"] is True and primary_key_defined is True: + #PyLint C0301 + elif ("primary_key" in options and options["primary_key"] is True and + primary_key_defined is True): logger.warning("""There are at least 2 primary keys defined! - Please check config. Ignore Primary Key %s""", column_name) @@ -238,7 +240,7 @@ def create_table(name:str, scheme:json): if not table_exist: #PyLint C0301 - logger.error("""Error while creating table %s! - + logger.error("""Error while creating table %s! - After creating table does not exist!""", name) return False return True @@ -296,7 +298,7 @@ def fetch_value(table:str, row_name:str, value:str, data_filter:list = None, is_ return False #Pylint C0301 -def fetch_value_as_bool(table:str, row_name:str, value:str, +def fetch_value_as_bool(table:str, row_name:str, value:str, data_filter:list = None, is_unique=False): """ This function is used to fetch a value from a database. The filter is a list containing all fields that need to be returned. diff --git a/project_functions.py b/project_functions.py index 3a84968..e12d4c7 100644 --- a/project_functions.py +++ b/project_functions.py @@ -29,7 +29,7 @@ import tldextract from prettytable import PrettyTable -from yt_dlp import YoutubeDL +from yt_dlp import YoutubeDL, DownloadError #own modules from database_manager import (check_table_exist, create_table, @@ -263,13 +263,14 @@ def fetch_category_name(url:str, scheme:json): return category def fetch_scheme_file_by_file(url): + """ internal function used to iterate over all schemes in a dictionary and find a + matching scheme by reading the scheme files""" + return_val = {"status": False, "scheme_path": None, "scheme_file": None} script_dir = pathlib.Path(__file__).parent.resolve() if not os.path.isdir(os.path.join(script_dir, "scheme")): logging.error("The scheme folder does not exist in the script folder! - Please add it!") - return False - - parsed_url = tldextract.extract(url) + return return_val #iterate over all files in directory scheme_folder = os.path.join(script_dir, "scheme") @@ -291,16 +292,23 @@ def fetch_scheme_file_by_file(url): if(validate_scheme(url, scheme)): logging.info("Found suitable scheme file - Scheme used: %s", scheme_file) - return [os.path.join(scheme_folder, scheme_file), scheme_file] + return_val["scheme_file"] = scheme_file + return_val["scheme_path"] = os.path.join(scheme_folder, scheme_file) + return_val["status"] = True + return return_val else: continue + return return_val def fetch_scheme_file(url:str): + """ FInd a matching scheme file for a given url by testing the filename. If not working + iterate over all files and try to find a scheme matching the sld and tld""" + return_val = {"status": False, "scheme_path": None, "scheme_file": None} script_dir = pathlib.Path(__file__).parent.resolve() if not os.path.isdir(os.path.join(script_dir, "scheme")): logging.error("The scheme folder does not exist in the script folder! - Please add it!") - return False + return return_val parsed_url = tldextract.extract(url) @@ -310,33 +318,38 @@ def fetch_scheme_file(url:str): logging.info("No suitable file found by filename. Check per file...") scheme_check = fetch_scheme_file_by_file(url) - if not scheme_check: + if not scheme_check["status"]: logging.error("There is no matching scheme file for site %s!", parsed_url.domain) - return False - else: - return scheme_check + return return_val + + return scheme_check logging.info("Scheme file for %s found", parsed_url.domain) logging.info("Check if provided url %s is valid for scheme", parsed_url.domain) - return [expected_scheme_path, str(parsed_url.domain + ".json")] + return_val["scheme_path"] = expected_scheme_path + return_val["scheme_file"] = str(parsed_url.domain + ".json") + return_val["status"] = True + return return_val def load_json_file(path:str): + """ Read a file and return it as json dict """ if not os.path.isfile(path): logging.error("The provided file does not exist! - Can't open file %s", path) return False try: - with open(path, "r") as file: + with open(path, "r", encoding="UTF-8") as file: json_file = json.loads(file.read()) except json.JSONDecodeError as e: logging.error("Error while reading json file! - JSON Error: %s", e) return False - except Exception as e: + except FileNotFoundError as e: logging.error("Error while reading json file! - Error: %s", e) return False return json_file def validate_scheme(url, scheme, silent=False): + """ Check if the given scheme contains all information needed to download and save a file""" #Check if the loaded template is a url template - if not "url_template" in scheme or scheme["url_template"] == False: + if not "url_template" in scheme or scheme["url_template"] is False: #Line Break for Pylint #C0301 logging.error("""The laoded scheme is not marked as a url_template! - Please mark it as url template (Add key 'url_template' with value 'true' @@ -348,28 +361,28 @@ def validate_scheme(url, scheme, silent=False): logging.error("Provided Scheme is not valid! - Check log") return False - parsedUrl = tldextract.extract(url) + parsed_url = tldextract.extract(url) #Check if the provided url matches the filter of the given scheme - if not parsedUrl.suffix in scheme["url_scheme"]["tld"]: + if not parsed_url.suffix in scheme["url_scheme"]["tld"]: if not silent: #Line Break for Pylint #C0301 logging.error("""Provided url does not match the requirements for the %s scheme! - TLD '%s' is not supported! - scheme name: %s""", - parsedUrl.domain, parsedUrl.suffix, scheme["schema_name"]) + parsed_url.domain, parsed_url.suffix, scheme["schema_name"]) return False - if not parsedUrl.domain in scheme["url_scheme"]["sld"]: + if not parsed_url.domain in scheme["url_scheme"]["sld"]: if not silent: #Line Break for Pylint #C0301 logging.error("""Provided url does not match the requirements for the %s scheme! - SLD '%s' is not supported! - scheme name: %s""", - parsedUrl.domain, parsedUrl.domain, scheme["schema_name"]) + parsed_url.domain, parsed_url.domain, scheme["schema_name"]) return False - if not parsedUrl.subdomain in scheme["url_scheme"]["subd"]: + if not parsed_url.subdomain in scheme["url_scheme"]["subd"]: if not silent: #Line Break for Pylint #C0301 logging.error("""Provided url does not match the requirements for the %s scheme! - Subdomain '%s' is not supported! - scheme name: %s""", - parsedUrl.domain, parsedUrl.subdomain, scheme["schema_name"]) + parsed_url.domain, parsed_url.subdomain, scheme["schema_name"]) return False return True @@ -399,7 +412,7 @@ def decide_storage_path(url, scheme): #Line Break for Pylint #C0301 if ("categories" in scheme and "available" in scheme["categories"] and - scheme["categories"]["available"] == True): + scheme["categories"]["available"] is True): #Decide if categories are used #Check if categories are defined. If not categories can not be used categories_defined = False @@ -429,7 +442,7 @@ def inner_decide_path(base_path): return base_path - if scheme["categories"]["needed"] == True: + if scheme["categories"]["needed"] is True: #if url don't have category -> Fail logging.debug("Scheme requires category") if not categories_defined: @@ -438,24 +451,18 @@ def inner_decide_path(base_path): Please define categories or set them as optional!""") #Line Break for Pylint #C0301 if ("category_storage" in scheme["storage"] and - scheme["storage"]["category_storage"] == False): + scheme["storage"]["category_storage"] is False): return base_path - else: - return inner_decide_path(base_path) + return inner_decide_path(base_path) else: #Line Break for Pylint #C0301 if("category_storage" in scheme["storage"] and - scheme["storage"]["category_storage"] == False): + scheme["storage"]["category_storage"] is False): path_ext = inner_decide_path(base_path) if not path_ext: return base_path - else: - return path_ext - else: - return base_path - else: - #No categories avail - return base_path + return path_ext + return base_path def get_file_data(url, ydl_opts): """ This function is used to fetch all needed information about the requested file @@ -465,7 +472,7 @@ def get_file_data(url, ydl_opts): #We only need the metadata. So we don't need to download the whole file. #We will do this later... file_data = ydl.extract_info(url, download=False) - except Exception as e: + except DownloadError as e: logging.error("Error while fetching File information from target server! - Error: %s", e) return False @@ -473,9 +480,8 @@ def get_file_data(url, ydl_opts): try: if len(file_data) > 0: return file_data - else: - return False - except Exception as e: + return False + except ValueError as e: #Line Break for Pylint #C0301 logger.error("Error result seems to have no content! - \n\n Result: %s \n Error: %s", file_data, e) @@ -522,7 +528,7 @@ def download_file(url, path): return return_val logger.error("YDL reported code %s", value) return return_val - except Exception as e: + except DownloadError as e: logger.error("Error while downloading video!- Error: %s", e) return return_val @@ -545,7 +551,7 @@ def create_hash_from_file(file): return_val["hash"] = hash_obj.hexdigest() return_val["status"] = True return return_val - except Exception as e: + except FileNotFoundError as e: logging.error("Error while creating hash of file! - Error: %s", e) return return_val @@ -553,13 +559,15 @@ def load_scheme(url: str): """ This function is used to load the correct scheme based on an url """ return_scheme = {"status": False, "scheme": None, "scheme_path": None} #Search for scheme - scheme_path = fetch_scheme_file(url) - if not scheme_path: + scheme_data = fetch_scheme_file(url) + if not scheme_data["status"]: logging.error("Error while fetching scheme! - Check log") return return_scheme + scheme_path = scheme_data["scheme_path"] + #Load Scheme - scheme = load_json_file(scheme_path[0]) + scheme = load_json_file(scheme_path) if not scheme: logging.error("Error while loading scheme! - Check log") return return_scheme @@ -591,10 +599,11 @@ def prepare_file_download(url): url_already_exist = fetch_value("items", "url", url, ["url"], True) #Try to load a scheme matching the current url scheme = load_scheme(url) - if not url_alive or url_already_exist != None or not scheme["status"]: + + if not url_alive or url_already_exist is not None or not scheme["status"]: if not url_alive: logging.error("Can't download video! - Url can not be reached! - Check log above!") - if url_already_exist != None: + if url_already_exist is not None: logging.error("Video already exist in db!") return_val["status"] = 2 if not scheme["status"]: @@ -615,11 +624,14 @@ def prepare_file_download(url): return return_val -def save_file_to_db(scheme, scheme_path, full_file_path, file_hash, url, metadata): +def save_file_to_db(scheme_data, full_file_path, file_hash, url, metadata): """ This function is used to save a file into the items table. It is basically an SQL Insert wrapper""" #Video hash not exist as saved item add it... logger.info("Add Video to DB") + scheme = scheme_data["scheme"] + scheme_path = scheme_data["scheme_path"] + head, tail = os.path.split(full_file_path) logging.debug("Scheme Data: %s", scheme_path) #Line Break for Pylint #C0301 @@ -627,7 +639,7 @@ def save_file_to_db(scheme, scheme_path, full_file_path, file_hash, url, metadat ["option_value"], True) #Define base data video_data = { - "scheme": scheme, + "scheme": scheme_data["scheme"]["schema_name"], "file_name": tail, "file_path": head, "file_hash": file_hash, @@ -668,6 +680,7 @@ def save_file_to_db(scheme, scheme_path, full_file_path, file_hash, url, metadat return True def error_post_processing(full_file_path): + """ This function is used to remove downloaded files if anything fails during post processing""" #Line Break for Pylint #C0301 remove_file = fetch_value_as_bool("config", "option_name", "remove_file_on_post_process_error", ["option_value"], True) @@ -684,22 +697,20 @@ def error_post_processing(full_file_path): logger.warning("File will not be removed! - Be cautious, the file is not saved in the db!") return False -#This function represents the "manual" video download approach def direct_download(url:str): - global path + """ This function represents the "manual" video download approach """ #Line Break for Pylint #C0301 logger.info("""Directly download content from %s - Check prerequisites and prepare download data""", url) prepared_data = prepare_file_download(url) + if prepared_data["status"] != 1: logging.error("Error while preparing download! - Check log.") return False path = prepared_data["dst_path"] - scheme = prepared_data["scheme"] - scheme_path = prepared_data["scheme_path"] logger.info("File will be saved under: %s", path) @@ -713,13 +724,13 @@ def direct_download(url:str): full_file_path = downloaded["full_file_path"] metadata = downloaded["metadata"] - logger.debug(f"Full File path is: {full_file_path}") + logger.debug("Full File path is: %s", full_file_path) #Compute hash from file file_hash = create_hash_from_file(full_file_path) #Check if hash created successfully - if not file_hash["status"] or file_hash["hash"] == None: + if not file_hash["status"] or file_hash["hash"] is None: logger.error("Error while creating hash from file! - Please check log. - Results: %s, %s", file_hash["status"], file_hash["hash"]) error_post_processing(full_file_path) @@ -732,11 +743,10 @@ def direct_download(url:str): hash_exist = fetch_value("items", "file_hash", file_hash["hash"], None, True) if not hash_exist: - video_registered = save_file_to_db(scheme["schema_name"], - scheme_path, + video_registered = save_file_to_db(prepared_data, full_file_path, file_hash["hash"], - url, + {"url": [url]}, metadata) if video_registered: