diff --git a/cmflib/cmf.py b/cmflib/cmf.py index a0af65f3..be181c4c 100644 --- a/cmflib/cmf.py +++ b/cmflib/cmf.py @@ -2137,6 +2137,7 @@ def cmf_init(type: str = "", password: str = "", port: int = 0, osdf_path: str = "", + osdf_cache: str = "", key_id: str = "", key_path: str = "", key_issuer: str = "", @@ -2169,7 +2170,12 @@ def cmf_init(type: str = "", session_token: Session token for AmazonS3. user: SSH remote username. password: SSH remote password. - port: SSH remote port + port: SSH remote port. + osdf_path: OSDF Origin Path. + osdf_cache: OSDF Cache Path (Optional). + key_id: OSDF Key ID. + key_path: OSDF Private Key Path. + key_issuer: OSDF Key Issuer URL. Returns: Output based on the initialized repository type. """ @@ -2196,6 +2202,7 @@ def cmf_init(type: str = "", 'user': user, 'password': password, 'osdf_path': osdf_path, + 'osdf_cache': osdf_cache, 'key_id': key_id, 'key_path': key_path, 'key-issuer': key_issuer, @@ -2265,10 +2272,11 @@ def cmf_init(type: str = "", return output - elif type == "osdfremote" and osdf_path != "" and key_id != "" and key_path != 0 and key_issuer != "" and git_remote_url != "": + elif type == "osdfremote" and osdf_path != "" and key_id != "" and key_path != "" and key_issuer != "" and git_remote_url != "": """Initialize osdfremote repository""" output = _init_osdfremote( osdf_path, + osdf_cache, key_id, key_path, key_issuer, @@ -2293,10 +2301,10 @@ def non_related_args(type : str, args : dict): minioS3=["url", "endpoint_url", "access_key_id", "secret_key", "git_remote_url"] amazonS3=["url", "access_key_id", "secret_key", "session_token", "git_remote_url"] sshremote=["path", "user", "port", "password", "git_remote_url"] - osdfremote=["osdf_path", "key_id", "key_path", "key-issuer", "git_remote_url"] + osdfremote=["osdf_path", "osdf_cache", "key_id", "key_path", "key-issuer", "git_remote_url"] - dict_repository_args={"local" : local, "minioS3" : minioS3, "amazonS3" : amazonS3, "sshremote" : sshremote} + dict_repository_args={"local" : local, "minioS3" : minioS3, "amazonS3" : amazonS3, "sshremote" : sshremote, "osdfremote": osdfremote} for repo,arg in dict_repository_args.items(): if repo ==type: diff --git a/cmflib/cmf_commands_wrapper.py b/cmflib/cmf_commands_wrapper.py index 6a76b562..4e2c5325 100644 --- a/cmflib/cmf_commands_wrapper.py +++ b/cmflib/cmf_commands_wrapper.py @@ -252,13 +252,15 @@ def _init_sshremote(path,user, port, password, git_remote_url, cmf_server_url, n print(msg) return msg -def _init_osdfremote(path, key_id, key_path, key_issuer, git_remote_url, cmf_server_url, neo4j_user, neo4j_password, neo4j_uri): +def _init_osdfremote(path, cache, key_id, key_path, key_issuer, git_remote_url, cmf_server_url, neo4j_user, neo4j_password, neo4j_uri): cli_args = cli.parse_args( [ "init", "osdf", "--path", path, + "--cache", + cache, "--key-id", key_id, "--key-path", diff --git a/cmflib/commands/artifact/pull.py b/cmflib/commands/artifact/pull.py index 77312d95..090809e7 100644 --- a/cmflib/commands/artifact/pull.py +++ b/cmflib/commands/artifact/pull.py @@ -151,10 +151,11 @@ def search_artifact(self, input_dict): continue # Splitting the 'name' using ':' as the delimiter and storing the first argument in the 'name' variable. name = name.split(":")[0] + artifact_hash = name = name.split(":")[1] # Splitting the path on '/' to extract the file name, excluding the directory structure. file_name = name.split('/')[-1] if file_name == self.args.artifact_name: - return name, url + return name, url, artifact_hash else: pass @@ -201,7 +202,8 @@ def run(self): ) # getting all artifacts with id temp_dict = dict(zip(get_artifacts['name'], get_artifacts['url'])) # getting dictionary of name and url pair name_url_dict.update(temp_dict) # updating name_url_dict with temp_dict - # print(name_url_dict) + + #print(name_url_dict) # name_url_dict = ('artifacts/parsed/test.tsv:6f597d341ceb7d8fbbe88859a892ef81', 'Test-env:/home/sharvark/local-storage/6f/597d341ceb7d8fbbe88859a892ef81' # name_url_dict = ('artifacts/parsed/test.tsv:6f597d341ceb7d8fbbe88859a892ef81', 'Test-env:/home/sharvark/local-storage/6f/597d341ceb7d8fbbe88859a892ef81,Second-env:/home/sharvark/local-storage/6f/597d341ceb7d8fbbe88859a892ef81') @@ -216,6 +218,7 @@ def run(self): output = self.search_artifact(name_url_dict) # output[0] = name # output[1] = url + # output[2] = hash if output is None: print(f"{self.args.artifact_name} doesn't exist.") else: @@ -322,22 +325,27 @@ def run(self): #Need to write to cmfconfig with new credentials #CmfConfig.write_config(cmf_config, "osdf", attr_dict, True) #Now Ready to do dvc pull + cache_path=cmf_config["osdf-cache"] osdfremote_class_obj = osdf_artifacts.OSDFremoteArtifacts() if self.args.artifact_name: output = self.search_artifact(name_url_dict) # output[0] = name # output[1] = url + # output[3]=artifact_hash if output is None: print(f"{self.args.artifact_name} doesn't exist.") else: args = self.extract_repo_args("osdf", output[0], output[1], current_directory) + #print(f"Hash for the artifact {self.args.artifact_name} is {output[3]}") stmt = osdfremote_class_obj.download_artifacts( dvc_config_op, args[0], # s_url of the artifact + cache_path, current_directory, args[1], # download_loc of the artifact - args[2] # name of the artifact + args[2], # name of the artifact + output[3] #Artifact Hash ) print(stmt) else: @@ -345,13 +353,17 @@ def run(self): #print(name, url) if not isinstance(url, str): continue + artifact_hash = name.split(':')[1] #Extract Hash of the artifact from name + #print(f"Hash for the artifact {name} is {artifact_hash}") args = self.extract_repo_args("osdf", name, url, current_directory) stmt = osdfremote_class_obj.download_artifacts( dvc_config_op, args[0], # host, + cache_path, current_directory, args[1], # remote_loc of the artifact - args[2] # name + args[2], # name + artifact_hash #Artifact Hash ) print(stmt) return "Done" diff --git a/cmflib/commands/init/osdfremote.py b/cmflib/commands/init/osdfremote.py index c187d894..39325ac7 100644 --- a/cmflib/commands/init/osdfremote.py +++ b/cmflib/commands/init/osdfremote.py @@ -99,6 +99,7 @@ def run(self): attr_dict = {} attr_dict["path"] = self.args.path + attr_dict["cache"] = self.args.cache attr_dict["key_id"] = self.args.key_id attr_dict["key_path"] = self.args.key_path attr_dict["key_issuer"] = self.args.key_issuer @@ -127,6 +128,14 @@ def add_parser(subparsers, parent_parser): default=argparse.SUPPRESS, ) + parser.add_argument( + "--cache", + help="Specify FQDN for OSDF cache path including port and path. For Ex. https://osdf-director.osg-htc.org/nrp/fdp/", + metavar="", + #default="https://osdf-director.osg-htc.org/nrp/fdp/", + default="", + ) + required_arguments.add_argument( "--key-id", required=True, diff --git a/cmflib/storage_backends/osdf_artifacts.py b/cmflib/storage_backends/osdf_artifacts.py index ff14e51c..43a725fc 100644 --- a/cmflib/storage_backends/osdf_artifacts.py +++ b/cmflib/storage_backends/osdf_artifacts.py @@ -18,16 +18,91 @@ import requests #import urllib3 #urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) +import hashlib +import time +from urllib.parse import urlparse + +def generate_cached_url(url, cache): + #This takes host URL as supplied from MLMD records and generates cached URL=cache_path + path + #Example Input: https://sdsc-origin.nationalresearchplatform.org:8443/nrp/fdp/23/6d9502e0283d91f689d7038b8508a2 + #Example Output: https://osdf-director.osg-htc.org/nrp/fdp/23/6d9502e0283d91f689d7038b8508a2 + #The assumption is that url obtained from MLMD is more accurate. So we use the path from this URL and append it to cache path + #but we clean up the cache path to only its scheme + netloc: https://osdf-director.osg-htc.org + parsed_url = urlparse(url) + parsed_cache_url= urlparse(cache) + cached_url= parsed_cache_url.scheme + "://" + parsed_cache_url.netloc + parsed_url.path + return cached_url + +def calculate_md5_from_file(file_path, chunk_size=8192): + md5 = hashlib.md5() + try: + with open(file_path, 'rb') as f: + while chunk := f.read(chunk_size): + md5.update(chunk) + except Exception as e: + print(f"An error occurred while reading the file: {e}") + return None + return md5.hexdigest() + +def download_and_verify_file(host, headers, remote_file_path, local_path, artifact_hash, timeout): + print(f"Fetching artifact={local_path}, surl={host} to {remote_file_path}") + data= None + try: + response = requests.get(host, headers=headers, timeout=timeout, verify=True) # This should be made True. otherwise this will produce Insecure SSL Warning + if response.status_code == 200 and response.content: + data = response.content + else: + return False, "No data received from the server." + #pass + except requests.exceptions.Timeout: + return False, "The request timed out." + #pass + except Exception as exception: + return False, str(exception) + + if data is not None: + try: + with open(remote_file_path, 'wb') as file: + file.write(data) + if os.path.exists(remote_file_path) and os.path.getsize(remote_file_path) > 0: + # Calculate MD5 hash of the downloaded file + start_time = time.time() + md5_hash = calculate_md5_from_file(remote_file_path) + end_time = time.time() + time_taken = end_time - start_time + if md5_hash: + #print(f"MD5 hash of the downloaded file is: {md5_hash}") + #print(f"Time taken to calculate MD5 hash: {time_taken:.2f} seconds") + if artifact_hash == md5_hash: + #print("MD5 hash of the downloaded file matches the hash in MLMD records.") + stmt = f"object {local_path} downloaded at {remote_file_path} in {time_taken:.2f} seconds and matches MLMD records." + success=True + else: + #print("Error: MD5 hash of the downloaded file does not match the hash in MLMD records.") + stmt = f"object {local_path} downloaded at {remote_file_path} in {time_taken:.2f} seconds and does NOT match MLMD records." + success=False + return success, stmt + else: + print("Failed to calculate MD5 hash of the downloaded file.") + except Exception as e: + print(f"An error occurred while writing to the file: {e}") + return False, f"An error occurred while writing to the file: {e}" + + return False, "Data is None." + class OSDFremoteArtifacts: def download_artifacts( self, dvc_config_op, host: str, #s_url + cache: str, #cache_path from cmfconfig current_directory: str, #current_directory where cmf artifact pull is executed remote_file_path: str, # download_loc of the artifact local_path: str, #name of the artifact + artifact_hash: str, #hash of the artifact from MLMD records ): + #print(f"Configured Host from MLMD record={host}. User configured cache redirector={cache}") output = "" remote_repo = dvc_config_op["remote.osdf.url"] user = "nobody" @@ -36,33 +111,50 @@ def download_artifacts( #print(f"dynamic password from download_artifacts={dynamic_password}") #print(f"Fetching artifact={local_path}, surl={host} to {remote_file_path} when this has been called at {current_directory}") - try: - headers={dvc_config_op["remote.osdf.custom_auth_header"]: dvc_config_op["remote.osdf.password"]} - temp = local_path.split("/") - temp.pop() - dir_path = "/".join(temp) - dir_to_create = os.path.join(current_directory, dir_path) - os.makedirs( - dir_to_create, mode=0o777, exist_ok=True - ) # creates subfolders needed as per artifacts folder structure - local_file_path = os.path.join(current_directory, local_path) - local_file_path = os.path.abspath(local_file_path) - - response = requests.get(host, headers=headers, verify=True) #This should be made True. otherwise this will produce Insecure SSL Warning - if response.status_code == 200 and response.content: - data = response.content + # Prepare directories and file paths + headers={dvc_config_op["remote.osdf.custom_auth_header"]: dvc_config_op["remote.osdf.password"]} + temp = local_path.split("/") + temp.pop() + dir_path = "/".join(temp) + dir_to_create = os.path.join(current_directory, dir_path) + os.makedirs( + dir_to_create, mode=0o777, exist_ok=True + ) # creates subfolders needed as per artifacts folder structure + local_file_path = os.path.join(current_directory, local_path) + local_file_path = os.path.abspath(local_file_path) + + #Cache can be Blank. If so, fetch from Origin + if cache == "": + #Fetch from Origin + success, result = download_and_verify_file(host, headers, remote_file_path, local_file_path, artifact_hash, timeout=10) + if success: + #print(result) + return result else: - return "No data received from the server." + #print(f"Failed to download and verify file: {result}") + return f"Failed to download and verify file" + else: + #Generate Cached path for artifact + cached_s_url=generate_cached_url(host,cache) + #Try to fetch from cache first + success, cached_result = download_and_verify_file(cached_s_url, headers, remote_file_path, local_path, artifact_hash,timeout=5) + if success: + #print(cached_result) + return cached_result + else: + print(f"Failed to download and verify file from cache: {cached_result}") + print(f"Trying Origin at {host}") + #Fetch from Origin + success, origin_result = download_and_verify_file(host, headers, remote_file_path, local_path, artifact_hash, timeout=10) + if success: + #print(origin_result) + return origin_result + else: + #print(f"Failed to download and verify file: {result}") + return f"Failed to download and verify file" + - except Exception as exception: - return exception + + - try: - with open(remote_file_path, 'wb') as file: - file.write(data) - if os.path.exists(remote_file_path) and os.path.getsize(remote_file_path) > 0: - #print(f"object {local_path} downloaded at {remote_file_path}") - stmt = f"object {local_path} downloaded at {remote_file_path}." - return stmt - except Exception as e: - print(f"An error occurred while writing to the file: {e}") + diff --git a/docs/cmf_client/cmf_client.md b/docs/cmf_client/cmf_client.md index a5a89ea8..ed13bb9a 100644 --- a/docs/cmf_client/cmf_client.md +++ b/docs/cmf_client/cmf_client.md @@ -185,6 +185,7 @@ Optional Arguments ### cmf init osdfremote ``` Usage: cmf init osdfremote [-h] --path [path] + --cache [cache] --key-id [key_id] --key-path [key_path] --key-issuer [key_issuer] @@ -196,11 +197,11 @@ Usage: cmf init osdfremote [-h] --path [path] ``` `cmf init osdfremote` configures a OSDF Origin as a cmf artifact repository. ``` -cmf init osdfremote --path https://[Some Origin]:8443/nrp/fdp/ --key-id c2a5 --key-path ~/.ssh/fdp.pem --key-issuer https://[Token Issuer]] --git-remote-url https://github.com/user/experiment-repo.git --git-remote-url https://github.com/user/experiment-repo.git --cmf-server-url http://127.0.0.1:80 --neo4j-user neo4j --neo4j-password password --neo4j-uri bolt://localhost:7687 +cmf init osdfremote --path https://[Some Origin]:8443/nrp/fdp/ --cache http://[Some Redirector]/nrp/fdp --key-id c2a5 --key-path ~/.ssh/fdp.pem --key-issuer https://[Token Issuer]] --git-remote-url https://github.com/user/experiment-repo.git --git-remote-url https://github.com/user/experiment-repo.git --cmf-server-url http://127.0.0.1:80 --neo4j-user neo4j --neo4j-password password --neo4j-uri bolt://localhost:7687 ``` Required Arguments ``` - --path [path] Specify FQDN for OSDF origin including including port and directory path + --path [path] Specify FQDN for OSDF origin including including port and directory path if any --key-id [key_id] Specify key_id for provided private key. eg. b2d3 --key-path [key_path] Specify path for private key on local filesystem. eg. ~/.ssh/XXX.pem --key-issuer [key_issuer] Specify URL for Key Issuer. eg. https://t.nationalresearchplatform.org/XXX @@ -208,6 +209,7 @@ Required Arguments ``` Optional Arguments ``` + --cache [cache] Specify FQDN for OSDF cache including including port and directory path if any -h, --help show this help message and exit --cmf-server-url [cmf_server_url] Specify cmf-server url. (default: http://127.0.0.1:80) --neo4j-user [neo4j_user] Specify neo4j user. (default: None)