From ea3b1ae7650dcc7f28d49c6473209047961d37cd Mon Sep 17 00:00:00 2001 From: "Justin Ritter@j54j6" Date: Fri, 17 May 2024 17:34:38 +0200 Subject: [PATCH] Update project_functions.py Code Quality fixes --- project_functions.py | 380 ++++++++++++++++++++++--------------------- 1 file changed, 195 insertions(+), 185 deletions(-) diff --git a/project_functions.py b/project_functions.py index c0fb267..73bbcb4 100644 --- a/project_functions.py +++ b/project_functions.py @@ -3,13 +3,13 @@ # # Project by j54j6 # This program is used to make a private copy of youtube videos and potentially other websites supported by youtubedl -# Furthermore it supports automatic periodic checking of channels and auto downloading. +# Furthermore it supports automatic periodic checking of channels and auto downloading. # It also checks if the downlaoded video already exists in the specified storage space and checks integrity of videos and redownload them if needed -# +# # -# This file contains the "Project specific funtions" this means that all functions I cannot reuse in other projects -# like controls or checks are located inside this file. +# This file contains the "Project specific funtions" this means that all functions I cannot reuse in other projects +# like controls or checks are located inside this file. # """ @@ -35,7 +35,7 @@ logger = logging.getLogger(__name__) #Define buffer per Thread in Bytes for filehashing - Default 2GB = 2147483648 -BUF_SIZE = 2147483648 +BUF_SIZE = 2147483648 def scheme_setup(): script_dir = pathlib.Path(__file__).parent.resolve() @@ -43,7 +43,7 @@ def scheme_setup(): if not os.path.isdir(os.path.join(script_dir, "scheme")): logging.error("The scheme folder does not exist in the script folder! - Please add it!") return False - + error_occured = False #Iterate over all existing scheme files and create tables if needed for scheme in os.listdir(os.path.join(script_dir, "scheme")): @@ -66,7 +66,7 @@ def scheme_setup(): if not table_exists: result = create_table(scheme_data["db"]["table_name"], scheme_data["db"]["columns"]) - + if not result: logging.error("Error while creating table %s for scheme %s! - Check log", scheme_data["db"]["table_name"], scheme) error_occured = True @@ -108,7 +108,7 @@ def check_dependencies(): #Create config table from scheme logger.info("Config table does not exist - Create Table...") try: - with open("./scheme/project.json") as config_scheme: + with open("./scheme/project.json") as config_scheme: config_data = config_scheme.read() except FileNotFoundError: logger.error("Error while reading Config Scheme! - Error: %s", e) @@ -131,7 +131,7 @@ def check_dependencies(): if not result: logging.error("Error while creating table! - Check log") exit() - + #Check for default settings if "rows" in config_data_as_json["db"]: for option in config_data_as_json["db"]["rows"]: @@ -142,7 +142,7 @@ def check_dependencies(): #Create config table from scheme logger.info("Items table does not exist - Create Table...") try: - with open("./scheme/saved_items.json") as table_scheme: + with open("./scheme/saved_items.json") as table_scheme: column_data = table_scheme.read() except FileNotFoundError as e: logger.error("Error while reading items table Scheme! - FILE Error: %s", e) @@ -172,7 +172,7 @@ def check_dependencies(): #Create config table from scheme logger.info("Items table does not exist - Create Table...") try: - with open("./scheme/subscriptions.json") as table_scheme: + with open("./scheme/subscriptions.json") as table_scheme: column_data = table_scheme.read() except Exception as e: logger.error(f"Error while reading subscription table Scheme! - Error: {e}") @@ -258,7 +258,7 @@ def validate_url_scheme(scheme:json): if not all_keys_exist: logging.error("Some required url_scheme keys are missing in the given scheme file! - Please check your scheme.") return False - + #Check minimum keys for categorizing needed_category_scheme_keys = ["available"] all_keys_exist = True @@ -281,7 +281,7 @@ def fetch_category_name(url:str, scheme:json): category_path = 1 if "category_path" in scheme["categories"]: category_path = scheme["categories"]["category_path"] - + #if category path = "" -> First path descriptor is used (e.g. sld.tld/<>) is used parsed_url = urlparse.urlparse(url) category = parsed_url.path.split('/')[1] @@ -293,7 +293,7 @@ def fetch_scheme_file_by_file(url): if not os.path.isdir(os.path.join(script_dir, "scheme")): logging.error("The scheme folder does not exist in the script folder! - Please add it!") return False - + parsed_url = tldextract.extract(url) #iterate over all files in directory @@ -306,7 +306,7 @@ def fetch_scheme_file_by_file(url): if not scheme: logging.error("Error while reading scheme file %s", scheme_file) continue - + #Check if the scheme file is a url template (used for websites) or a system template (for local use) if "url_template" in scheme and scheme["url_template"] == True: if not validate_url_scheme(scheme): @@ -325,9 +325,9 @@ def fetch_scheme_file(url:str): if not os.path.isdir(os.path.join(script_dir, "scheme")): logging.error("The scheme folder does not exist in the script folder! - Please add it!") return False - + parsed_url = tldextract.extract(url) - + expected_scheme_path = os.path.join(script_dir, "scheme") expected_scheme_path = os.path.join(expected_scheme_path, str(parsed_url.domain + ".json")) if not os.path.isfile(expected_scheme_path): @@ -386,7 +386,7 @@ def validate_scheme(url, scheme, silent=False): return True def decide_storage_path(url, scheme): - #General configuration db table (config) provides a base location where all stuff from this script needs to be saved... + #General configuration db table (config) provides a base location where all stuff from this script needs to be saved... #First fetch this... data = fetch_value("config", "option_name", "base_location", ["option_value"], True) if not data: @@ -406,12 +406,12 @@ def decide_storage_path(url, scheme): if "categories" in scheme and "available" in scheme["categories"] and scheme["categories"]["available"] == True: - #Decide if categories are used + #Decide if categories are used #Check if categories are defined. If not categories can not be used categories_defined = False if "categories" in scheme["categories"]: categories_defined = True - + def inner_decide_path(base_path): category_name = fetch_category_name(url, scheme) @@ -419,7 +419,7 @@ def inner_decide_path(base_path): if not category_name in scheme["categories"]["categories"]: logging.error("Category %s is not defined!", category_name) return False - + logging.debug("Provided category %s is known... Check for custom storage path", category_name) if "storage_path" in scheme["categories"]["categories"][category_name]: @@ -447,48 +447,14 @@ def inner_decide_path(base_path): else: return path_ext else: - return base_path + return base_path else: - #No categories avail + #No categories avail return base_path -#This function downloads a file (url) and saves it to a defined path (path) -def download_file(url): - logging.info("Downloading file from server") - try: - ydl_opts = { - 'format': 'best', - 'outtmpl': path + '/%(title)s.%(ext)s', - 'nooverwrites': True, - 'no_warnings': False, - 'ignoreerrors': True, - 'replace-in-metadata': True, - 'restrict-filenames': True -} - with YoutubeDL(ydl_opts) as ydl: - value = ydl.download([url]) - - if value == 0: - return True - else: - logger.error(f"YDL reported code {value}") - return False - except Exception as e: - logger.error(f"Error while downloading video!- Error: {e}") - return False - -#This function is used to fetch all needed information about the requested file to save it later into db. -def get_file_data(url, path): +def get_file_data(url, ydl_opts): + """ This function is used to fetch all needed information about the requested file to save it later into db.""" try: - ydl_opts = { - 'format': 'best', - 'outtmpl': path + '/%(title)s.%(ext)s', - 'nooverwrites': True, - 'no_warnings': False, - 'ignoreerrors': True, - 'replace-in-metadata': True, - 'restrict-filenames': True -} with YoutubeDL(ydl_opts) as ydl: #We only need the metadata. So we don't need to download the whole file. We will do this later... file_data = ydl.extract_info(url, download=False) @@ -506,8 +472,48 @@ def get_file_data(url, path): logger.error("Error result seems to have no content! - \n\n Result: %s \n Error: %s", file_data, e) return False -#Create a hash from a given file +#This function downloads a file (url) and saves it to a defined path (path) +def download_file(url, path): + """This function downloads the file specified in url and also provides the prepared file path from ydl""" + logging.info("Downloading file from server") + return_val = {"status": False, "full_file_path": None, "metadata": None} + + try: + ydl_opts = { + 'format': 'best', + 'outtmpl': path + '/%(title)s.%(ext)s', + 'nooverwrites': True, + 'no_warnings': False, + 'ignoreerrors': True, + 'replace-in-metadata': True, + 'restrict-filenames': True + } + + metadata = get_file_data(url, path, ydl_opts) + if not metadata or not "title" in metadata or not "ext" in metadata: + logger.error("Error while fetching metadata from target server! - Metadata could not be fetched or key \"title\" / \"ext\" is missing") + return return_val + + with YoutubeDL(ydl_opts) as ydl: + value = ydl.download([url]) + + if value == 0: + full_file_path = YoutubeDL(ydl_opts).prepare_filename(metadata, outtmpl=path + '/%(title)s.%(ext)s') + full_file_path = os.path.abspath(full_file_path) + return_val["status"] = True + return_val["full_file_path"] = full_file_path + return_val["metadata"] = metadata + + return return_val + logger.error("YDL reported code %s", value) + return return_val + except Exception as e: + logger.error("Error while downloading video!- Error: %s", e) + return return_val + def create_hash_from_file(file): + """ Create a hash from a given file and returns a JSON Dict""" + return_val = {"status": False, "file": None, "hash": None} #create hash and return the hex value hash_obj = hashlib.sha256() try: @@ -516,20 +522,21 @@ def create_hash_from_file(file): while len(fb) > 0: # While there is still data being read from the file hash_obj.update(fb) # Update the hash fb = f.read(BUF_SIZE) # Read the next block from the file - return [file, hash_obj.hexdigest()] + return_val["status"] = True + return return_val except Exception as e: logging.error("Error while creating hash of file! - Error: %s", e) - return False + return return_val -#This function is used to load the correct scheme based on an url def load_scheme(url: str): + """ This function is used to load the correct scheme based on an url """ return_scheme = {"status": False, "scheme": None, "scheme_path": None} #Search for scheme scheme_path = fetch_scheme_file(url) if not scheme_path: logging.error("Error while fetching scheme! - Check log") return return_scheme - + #Load Scheme scheme = load_json_file(scheme_path[0]) if not scheme: @@ -540,156 +547,159 @@ def load_scheme(url: str): if not validate_scheme(url, scheme): logging.error("Error while validating scheme! - Check log") return return_scheme - + return_scheme["status"] = True return_scheme["scheme"] = scheme return_scheme["scheme_path"] = scheme_path return return_scheme -#This function represents the "manual" video download approach -def direct_download(url:str): - global path +def prepare_file_download(url): + """This function checks if the given url is alive and can be used to download a file using the defined templates + Possible return Values: + 0 => Failed + 1 => Success + 2 => File already exists + """ - logger.info(f"Directly download content from {url}") + return_val = {"status": 0, "scheme": None, "scheme_path": None, "dst_path": None} + #Check if the url is reachable url_alive = alive_check(url) - - if not url_alive: - logging.error("Can't download video! - Url can not be reached! - Check log above!") - return False - - #Check if url is already in db (this can only detect if a video was already downloaded from the same source). A reuploaded video will be downlaoded again (url is different) - later when the hash is checked it will be dropped again - logger.info("Check if file already exist...") - + #Check if the given url already exists in the items db url_already_exist = fetch_value("items", "url", url, ["url"], True) - if url_already_exist != None: - logging.info("File already exist! - Skip download") - return True - logger.info("File dont exist. Start download...") - #Any videoplatform could need some special handling in order to use youtube_dl. Things like age verification. This can be done with templates to add headers for example - logger.info("Check for suitable template to download video") - + #Try to load a scheme matching the current url scheme = load_scheme(url) - - if not scheme["status"]: - logging.error("Error while loading scheme data! - Check log") - return False - - scheme_path = scheme["scheme_path"] - scheme = scheme["scheme"] + if not url_alive or url_already_exist != None or not scheme["status"]: + if not url_alive: + logging.error("Can't download video! - Url can not be reached! - Check log above!") + if url_already_exist != None: + logging.error("Video already exist in db!") + return_val["status"] = 2 + if not scheme["status"]: + logging.error("Error while loading scheme data! - Check log") + return return_val + + return_val["scheme_path"] = scheme["scheme_path"] + return_val["scheme"] = scheme["scheme"] #Scheme is valid. Decicde where to save the file (Check for categories). Read general config and do the stuff... - path = decide_storage_path(url, scheme) - if not path: + dst_path = decide_storage_path(url, scheme) + if not dst_path: logging.error("Error while defining storage path! - Check log") return False - - logger.info(F"File will be saved under: {path}") - metadata = get_file_data(url, path) - if not metadata: - logger.error("Error while fetching metadata from target server! - Please check log") + return_val["status"] = 1 + return_val["dst_path"] = dst_path + + return return_val + +def save_file_to_db(scheme, scheme_path, full_file_path, file_hash, url, metadata): + """ This function is used to save a file into the items table. It is basically an SQL Insert wrapper""" + #Video hash not exist as saved item add it... + logger.info("Add Video to DB") + head, tail = os.path.split(full_file_path) + logging.debug("Scheme Data: %s", scheme_path) + use_tags_ydl = fetch_value_as_bool("config", "option_name", "use_tags_from_ydl", ["option_value"], True) + #Define base data + video_data = { + "scheme": scheme, + "file_name": tail, + "file_path": head, + "file_hash": file_hash, + "url": url, + "data": metadata + } + if use_tags_ydl: + logging.info("Also insert tags from ydl metadata") + if "tags" in metadata: + logging.debug("Found key 'tags'") + if len(metadata["tags"]) > 0: + logging.debug("Tags found...") + video_data["tags"] = data["tags"] + else: + logging.debug("Tags array is empty!") + else: + logger.debug("No tags key found in metadata") + else: + logging.info("Tags are not inserted from ydl") + video_registered = insert_value("items", video_data) + if not video_registered: + logger.error("Error while saving file to db!! - Please check log.") + remove_file = fetch_value_as_bool("config", "option_name", "remove_file_on_post_process_error", ["option_value"], True) + if remove_file: + logger.info("Remove file due to config setting.") + os.remove(full_file_path) + if os.path.exists(full_file_path): + logging.error("Error while removing video after post processing error! - Check permissions") + return False + logger.warning("File will not be removed! - Be cautious, the file is not saved in the db!") + return False + logger.info("Video successfully saved. - Finished") + return True + +def error_post_processing(full_file_path): + remove_file = fetch_value_as_bool("config", "option_name", "remove_file_on_post_process_error", ["option_value"], True) + if remove_file: + logger.info("Remove file due to config setting.") + os.remove(full_file_path) + if os.path.exists(full_file_path): + logging.error("Error while removing video after post processing error! - Check permissions") + return False + logger.info("File removed") return False + logger.warning("File will not be removed! - Be cautious, the file is not saved in the db!") + return False + +#This function represents the "manual" video download approach +def direct_download(url:str): + global path + logger.info(f"Directly download content from {url} - Check prerequisites and prepare download data") - logger.info(F"Fetched all metadata for file") + prepared_data = prepare_file_download(url) - #Check if title and extension are availiable for further processing - if not "title" in metadata or not "ext" in metadata: - logger.error("Metadata does not contain necessary \"title\" and \"ext\" key!") + if prepared_data["status"] != 1: + logging.error("Error while preparing download! - Check log.") return False - downloaded = download_file(url) + path = prepared_data["dst_path"] + scheme = prepared_data["scheme"] + scheme_path = prepared_data["scheme_path"] - if not downloaded: + logger.info(F"File will be saved under: {path}") + + downloaded = download_file(url, path) + + if not downloaded["status"]: logger.error(f"Error while downloading file from {url} - Please check log!") return False - ydl_opts = { - 'format': 'best', - 'outtmpl': path + '/%(title)s.%(ext)s', - 'nooverwrites': True, - 'no_warnings': False, - 'ignoreerrors': True, - 'replace-in-metadata': True, - 'restrict-filenames': True - } - - full_file_path = YoutubeDL(ydl_opts).prepare_filename(metadata, outtmpl=path + '/%(title)s.%(ext)s') - full_file_path = os.path.abspath(full_file_path) + + + full_file_path = downloaded["full_file_path"] + metadata = downloaded["metadata"] + logger.debug(f"Full File path is: {full_file_path}") #Compute hash from file - hash = create_hash_from_file(full_file_path) - + file_hash = create_hash_from_file(full_file_path) + #Check if hash created successfully - if not hash or len(hash) != 2: + if not file_hash["status"] or file_hash["hash"] == None: logger.error("Error while creating hash from file! - Please check log.") - remove_file = fetch_value_as_bool("config", "option_name", "remove_file_on_post_process_error", ["option_value"], True) - if remove_file: - logger.info("Remove file due to config setting.") - os.remove(full_file_path) - if os.path.exists(full_file_path): - logging.error("Error while removing video after post processing error! - Check permissions") - return False - else: - logger.info("File removed") - return False - else: - logger.warning("File will not be removed! - Be cautious, the file is not saved in the db!") - return False - - + error_post_processing(full_file_path) + return False + + #Check if hash is already in database #If hash is not in db -> Video is new - If hash is in db video already exist. Check if the url is the same - hash_exist = fetch_value("items", "file_hash", hash[1], None, True) + hash_exist = fetch_value("items", "file_hash", file_hash["hash"], None, True) if not hash_exist: - #Video hash not exist as saved item add it... - logger.info("Add Video to DB") - head, tail = os.path.split(full_file_path) - logging.debug("Scheme Data: %s", scheme_path) - - use_tags_ydl = fetch_value_as_bool("config", "option_name", "use_tags_from_ydl", ["option_value"], True) - - #Define base data - video_data = { - "scheme": scheme_path[1], - "file_name": tail, - "file_path": head, - "file_hash": hash[1], - "url": url, - "data": metadata - } + video_registered = save_file_to_db(scheme, scheme_path, full_file_path, file_hash["hash"]) - if use_tags_ydl: - logging.info("Also insert tags from ydl metadata") - if "tags" in metadata: - logging.debug("Found key 'tags'") - if len(metadata["tags"]) > 0: - logging.debug("Tags found...") - video_data["tags"] = metadata["tags"] - else: - logging.debug("Tags array is empty!") - else: - logger.debug("No tags key found in metadata") - else: - logging.info("Tags are not inserted from ydl") - - video_registered = insert_value("items", video_data) - - if not video_registered: - logger.error("Error while saving file to db!! - Please check log.") - remove_file = fetch_value_as_bool("config", "option_name", "remove_file_on_post_process_error", ["option_value"], True) - if remove_file: - logger.info("Remove file due to config setting.") - os.remove(full_file_path) - if os.path.exists(full_file_path): - logging.error("Error while removing video after post processing error! - Check permissions") - return False - logger.warning("File will not be removed! - Be cautious, the file is not saved in the db!") - return False - logger.info("Video successfully saved. - Finished") - return True - else: - #hash already exist - check if url is the same. If not add it to url + if video_registered: + logging.info("File successfully downlaoded.") + return True + logging.error("Error while register Video to db!") + error_post_processing(full_file_path) return False - - - \ No newline at end of file + #TODO + #hash already exist - check if url is the same. If not add it to url + return False