diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index 8b214f6..c02122f 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -7,7 +7,7 @@ on: push: branches: [ main, develop ] pull_request: - branches: [ main ] + branches: [ main, develop ] jobs: build: @@ -33,4 +33,13 @@ jobs: flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics - name: Test with pytest run: | - pytest + pytest -m "not regression" + - name: netrc-gen + uses: extractions/netrc@v1 + with: + machine: urs.earthdata.nasa.gov + username: ${{ secrets.EDL_OPS_USERNAME }} + password: ${{ secrets.EDL_OPS_PASSWORD }} + - name: Regression Test with pytest + run: | + pytest -m "regression" diff --git a/CHANGELOG.md b/CHANGELOG.md index f90b329..294b444 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,9 +4,16 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) -## Unreleased + +## [1.9.0] ### Added +- check if file exists before downloading a file. [17](https://github.com/podaac/data-subscriber/issues/17) +- added automated regression testing ### Changed +- Implemented Search After CMR interface to allow granule listings > 2000 [15](https://github.com/podaac/data-subscriber/issues/15) +- Retry CMR queries on server error using random exponential backoff max 60 seconds and 10 retries +- Refresh token if CMR returns 401 error +- Converted print statements to log statements ### Deprecated ### Removed ### Fixed diff --git a/Downloader.md b/Downloader.md index cd46f4f..78b3af6 100644 --- a/Downloader.md +++ b/Downloader.md @@ -6,9 +6,7 @@ For installation and dependency information, please see the [top-level README](R ``` $> podaac-data-downloader -h -usage: PO.DAAC bulk-data downloader [-h] -c COLLECTION -d OUTPUTDIRECTORY [--cycle SEARCH_CYCLES] [-sd STARTDATE] [-ed ENDDATE] - [-b BBOX] [-dc] [-dydoy] [-dymd] [-dy] [--offset OFFSET] [-e EXTENSIONS] [--process PROCESS_CMD] - [--version] [--verbose] [-p PROVIDER] [--limit LIMIT] +usage: PO.DAAC bulk-data downloader [-h] -c COLLECTION -d OUTPUTDIRECTORY [--cycle SEARCH_CYCLES] [-sd STARTDATE] [-ed ENDDATE] [-f] [-b BBOX] [-dc] [-dydoy] [-dymd] [-dy] [--offset OFFSET] [-e EXTENSIONS] [--process PROCESS_CMD] [--version] [--verbose] [-p PROVIDER] [--limit LIMIT] optional arguments: -h, --help show this help message and exit @@ -22,6 +20,8 @@ optional arguments: The ISO date time before which data should be retrieved. For Example, --start-date 2021-01-14T00:00:00Z -ed ENDDATE, --end-date ENDDATE The ISO date time after which data should be retrieved. For Example, --end-date 2021-01-14T00:00:00Z + -f, --force + Flag to force downloading files that are listed in CMR query, even if the file exists and checksum matches -b BBOX, --bounds BBOX The bounding rectangle to filter result in. Format is W Longitude,S Latitude,E Longitude,N Latitude without spaces. Due to an issue with parsing arguments, to use this command, please use the -b="-180,-90,180,90" syntax @@ -50,7 +50,7 @@ optional arguments: Usage: ``` -usage: PO.DAAC bulk-data downloader [-h] -c COLLECTION -d OUTPUTDIRECTORY [--cycle SEARCH_CYCLES] [-sd STARTDATE] [-ed ENDDATE] +usage: PO.DAAC bulk-data downloader [-h] -c COLLECTION -d OUTPUTDIRECTORY [--cycle SEARCH_CYCLES] [-sd STARTDATE] [-ed ENDDATE] [-f] [-b BBOX] [-dc] [-dydoy] [-dymd] [-dy] [--offset OFFSET] [-e EXTENSIONS] [--process PROCESS_CMD] [--version] [--verbose] [-p PROVIDER] [--limit LIMIT] ``` @@ -163,6 +163,22 @@ The subscriber allows the placement of downloaded files into one of several dire * -dymd - optional, relative paths use the start time of a granule to layout data in a YEAR/MONTH/DAY path +### Downloader behavior when a file already exists + +By default, when the downloader is about to download a file, it first: +- Checks if the file already exists in the target location +- Creates a checksum for the file and sees if it matches the checksum for that file in CMR + +If the file already exists AND the checksum matches, the downloader will skip downloading that file. + +This can drastically reduce the time for the downloader to complete. Also, since the checksum is verified, files will still be re-downloaded if for some reason the file has changed (or the file already on disk is corrupted). + +You can override this default behavior - forcing the downloader to always download matching files, by using --force/-f. + +``` +podaac-data-downloader -c SENTINEL-1A_SLC -d myData -f +``` + ### Setting a bounding rectangle for filtering results If you're interested in a specific region, you can set the bounds parameter on your request to filter data that passes through a certain area. This is useful in particular for non-global datasets (such as swath datasets) with non-global coverage per file. diff --git a/Subscriber.md b/Subscriber.md index eab2c7a..0cd5e3f 100644 --- a/Subscriber.md +++ b/Subscriber.md @@ -6,7 +6,7 @@ For installation and dependency information, please see the [top-level README](R ``` $> podaac-data-subscriber -h -usage: PO.DAAC data subscriber [-h] -c COLLECTION -d OUTPUTDIRECTORY [-sd STARTDATE] [-ed ENDDATE] [-b BBOX] [-dc] [-dydoy] [-dymd] [-dy] [--offset OFFSET] [-m MINUTES] [-e EXTENSIONS] [--process PROCESS_CMD] [--version] [--verbose] [-p PROVIDER] +usage: PO.DAAC data subscriber [-h] -c COLLECTION -d OUTPUTDIRECTORY [-f] [-sd STARTDATE] [-ed ENDDATE] [-b BBOX] [-dc] [-dydoy] [-dymd] [-dy] [--offset OFFSET] [-m MINUTES] [-e EXTENSIONS] [--process PROCESS_CMD] [--version] [--verbose] [-p PROVIDER] optional arguments: -h, --help show this help message and exit @@ -14,6 +14,7 @@ optional arguments: The collection shortname for which you want to retrieve data. -d OUTPUTDIRECTORY, --data-dir OUTPUTDIRECTORY The directory where data products will be downloaded. + -f, --force Flag to force downloading files that are listed in CMR query, even if the file exists and checksum matches -sd STARTDATE, --start-date STARTDATE The ISO date time before which data should be retrieved. For Example, --start-date 2021-01-14T00:00:00Z -ed ENDDATE, --end-date ENDDATE @@ -37,12 +38,11 @@ optional arguments: Specify a provider for collection search. Default is POCLOUD. ``` -##Run the Script +## Run the Script Usage: ``` -usage: podaac_data_subscriber.py [-h] -c COLLECTION -d OUTPUTDIRECTORY [-sd STARTDATE] [-ed ENDDATE] [-b BBOX] [-dc] [-dydoy] [-dymd] [-dy] [--offset OFFSET] - [-m MINUTES] [-e EXTENSIONS] [--version] [--verbose] [-p PROVIDER] +usage: podaac_data_subscriber.py [-h] -c COLLECTION -d OUTPUTDIRECTORY [-f] [-sd STARTDATE] [-ed ENDDATE] [-b BBOX] [-dc] [-dydoy] [-dymd] [-dy] [--offset OFFSET] [-m MINUTES] [-e EXTENSIONS] [--version] [--verbose] [-p PROVIDER] ``` To run the script, the following parameters are required: @@ -112,6 +112,7 @@ machine urs.earthdata.nasa.gov **If the script cannot find the netrc file, you will be prompted to enter the username and password and the script wont be able to generate the CMR token** + ## Advanced Usage ### Request data from another DAAC... @@ -141,6 +142,22 @@ The subscriber allows the placement of downloaded files into one of several dire * -dydoy - optional, relative paths use the start time of a granule to layout data in a YEAR/DAY-OF-YEAR path * -dymd - optional, relative paths use the start time of a granule to layout data in a YEAR/MONTH/DAY path +### Subscriber behavior when a file already exists + +By default, when the subscriber is about to download a file, it first: +- Checks if the file already exists in the target location +- Creates a checksum for the file and sees if it matches the checksum for that file in CMR + +If the file already exists AND the checksum matches, the subscriber will skip downloading that file. + +This can drastically reduce the time for the subscriber to complete. Also, since the checksum is verified, files will still be re-downloaded if for some reason the file has changed (or the file already on disk is corrupted). + +You can override this default behavior - forcing the subscriber to always download matching files, by using --force/-f. + +``` +podaac-data-subscriber -c SENTINEL-1A_SLC -d myData -f +``` + ### Running as a Cron job To automatically run and update a local file system with data files from a collection, one can use a syntax like the following: diff --git a/dev-requirements.txt b/dev-requirements.txt new file mode 100644 index 0000000..49435c9 --- /dev/null +++ b/dev-requirements.txt @@ -0,0 +1 @@ +pytest==7.1.1 \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 374b58c..aed08bf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,3 +4,7 @@ requires = [ "wheel" ] build-backend = "setuptools.build_meta" +[tool.pytest.ini_options] +markers = [ + "regression: marks a test as a regression, requires netrc file (deselect with '-m \"not regresion\"')" +] diff --git a/requirements.txt b/requirements.txt index b63590e..9319567 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ chardet==4.0.0 idna==2.10 requests==2.25.1 urllib3>=1.26.5 +tenacity>=8.0.1 \ No newline at end of file diff --git a/setup.py b/setup.py index a96b7f7..40ae947 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ long_description = fh.read() setup(name='podaac-data-subscriber', - version='1.8.0', + version='1.9.0', description='PO.DAAC Data Susbcriber Command Line Tool', url='https://github.com/podaac/data-subscriber', long_description=long_description, @@ -15,7 +15,7 @@ packages=['subscriber'], entry_points=''' [console_scripts] - podaac-data-subscriber=subscriber.podaac_data_subscriber:run - podaac-data-downloader=subscriber.podaac_data_downloader:run + podaac-data-subscriber=subscriber.podaac_data_subscriber:main + podaac-data-downloader=subscriber.podaac_data_downloader:main ''', zip_safe=False) diff --git a/subscriber/podaac_access.py b/subscriber/podaac_access.py index 0bdf10b..b4711df 100644 --- a/subscriber/podaac_access.py +++ b/subscriber/podaac_access.py @@ -1,16 +1,27 @@ -from urllib import request -from http.cookiejar import CookieJar -import netrc -import requests import json +import logging +import netrc +import subprocess +from datetime import datetime +from http.cookiejar import CookieJar from os import makedirs from os.path import isdir, basename, join, splitext +from urllib import request +from typing import Dict +from urllib import request +from urllib.error import HTTPError import subprocess from urllib.parse import urlencode -from urllib.request import urlopen +from urllib.request import Request, urlopen +import hashlib + +import requests + +import requests +import tenacity from datetime import datetime -__version__ = "1.8.0" +__version__ = "1.9.0" extensions = [".nc", ".h5", ".zip", ".tar.gz"] edl = "urs.earthdata.nasa.gov" cmr = "cmr.earthdata.nasa.gov" @@ -18,6 +29,7 @@ IPAddr = "127.0.0.1" # socket.gethostbyname(hostname) + # ## Authentication setup # # The function below will allow Python scripts to log into any Earthdata Login @@ -60,7 +72,7 @@ def setup_earthdata_login_auth(endpoint): # FileNotFound = There's no .netrc file # TypeError = The endpoint isn't in the netrc file, # causing the above to try unpacking None - print("There's no .netrc file or the The endpoint isn't in the netrc file") # noqa E501 + logging.warning("There's no .netrc file or the The endpoint isn't in the netrc file") manager = request.HTTPPasswordMgrWithDefaultRealm() manager.add_password(None, endpoint, username, password) @@ -82,15 +94,15 @@ def get_token(url: str, client_id: str, endpoint: str) -> str: username, _, password = netrc.netrc().authenticators(endpoint) xml: str = """ {}{}{} - {}""".format(username, password, client_id, IPAddr) # noqa E501 - headers: Dict = {'Content-Type': 'application/xml', 'Accept': 'application/json'} # noqa E501 + {}""".format(username, password, client_id, IPAddr) # noqa E501 + headers: Dict = {'Content-Type': 'application/xml', 'Accept': 'application/json'} # noqa E501 resp = requests.post(url, headers=headers, data=xml) response_content: Dict = json.loads(resp.content) token = response_content['token']['id'] # What error is thrown here? Value Error? Request Errors? except: # noqa E722 - print("Error getting the token - check user name and password") + logging.warning("Error getting the token - check user name and password") return token @@ -99,45 +111,56 @@ def get_token(url: str, client_id: str, endpoint: str) -> str: ############################################################################### def delete_token(url: str, token: str) -> None: try: - headers: Dict = {'Content-Type': 'application/xml','Accept': 'application/json'} # noqa E501 + headers: Dict = {'Content-Type': 'application/xml', 'Accept': 'application/json'} # noqa E501 url = '{}/{}'.format(url, token) resp = requests.request('DELETE', url, headers=headers) if resp.status_code == 204: - print("CMR token successfully deleted") + logging.info("CMR token successfully deleted") else: - print("CMR token deleting failed.") + logging.info("CMR token deleting failed.") except: # noqa E722 - print("Error deleting the token") + logging.warning("Error deleting the token") + + +def refresh_token(old_token: str, client_id: str): + setup_earthdata_login_auth(edl) + delete_token(token_url, old_token) + return get_token(token_url, client_id, edl) def validate(args): bounds = args.bbox.split(',') if len(bounds) != 4: - raise ValueError("Error parsing '--bounds': " + args.bbox + ". Format is W Longitude,S Latitude,E Longitude,N Latitude without spaces ") # noqa E501 + raise ValueError( + "Error parsing '--bounds': " + args.bbox + ". Format is W Longitude,S Latitude,E Longitude,N Latitude without spaces ") # noqa E501 for b in bounds: try: float(b) except ValueError: - raise ValueError("Error parsing '--bounds': " + args.bbox + ". Format is W Longitude,S Latitude,E Longitude,N Latitude without spaces ") # noqa E501 + raise ValueError( + "Error parsing '--bounds': " + args.bbox + ". Format is W Longitude,S Latitude,E Longitude,N Latitude without spaces ") # noqa E501 if args.startDate: try: datetime.strptime(args.startDate, '%Y-%m-%dT%H:%M:%SZ') except ValueError: - raise ValueError("Error parsing '--start-date' date: " + args.startDate + ". Format must be like 2021-01-14T00:00:00Z") # noqa E501 + raise ValueError( + "Error parsing '--start-date' date: " + args.startDate + ". Format must be like 2021-01-14T00:00:00Z") # noqa E501 if args.endDate: try: datetime.strptime(args.endDate, '%Y-%m-%dT%H:%M:%SZ') except ValueError: - raise ValueError("Error parsing '--end-date' date: " + args.endDate + ". Format must be like 2021-01-14T00:00:00Z") # noqa E501 + raise ValueError( + "Error parsing '--end-date' date: " + args.endDate + ". Format must be like 2021-01-14T00:00:00Z") # noqa E501 if 'minutes' in args: if args.minutes: try: int(args.minutes) except ValueError: - raise ValueError("Error parsing '--minutes': " + args.minutes + ". Number must be an integer.") # noqa E501 + raise ValueError( + "Error parsing '--minutes': " + args.minutes + ". Number must be an integer.") # noqa E501 # Error catching for output directory specifications # Must specify -d output path or one time-based output directory flag @@ -243,9 +266,9 @@ def process_file(process_cmd, output_path, args): else: for cmd in process_cmd: if args.verbose: - print(f'Running: {cmd} {output_path}') + logging.info(f'Running: {cmd} {output_path}') subprocess.run(cmd.split() + [output_path], - check=True) + check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) def get_temporal_range(start, end, now): @@ -262,24 +285,55 @@ def get_temporal_range(start, end, now): raise ValueError("One of start-date or end-date must be specified.") -def get_search_results(args, params): +# Retry using random exponential backoff if a 500 error is raised. Maximum 10 attempts. +@tenacity.retry(wait=tenacity.wait_random_exponential(multiplier=1, max=60), + stop=tenacity.stop_after_attempt(10), + reraise=True, + retry=(tenacity.retry_if_exception_type(HTTPError) & tenacity.retry_if_exception( + lambda exc: exc.code == 500)) + ) +def get_search_results(params, verbose=False): # Get the query parameters as a string and then the complete search url: query = urlencode(params) url = "https://" + cmr + "/search/granules.umm_json?" + query - if args.verbose: - print(url) + if verbose: + logging.info(url) # Get a new timestamp that represents the UTC time of the search. # Then download the records in `umm_json` format for granules # that match our search parameters: - with urlopen(url) as f: - results = json.loads(f.read().decode()) + results = None + search_after_header = None + while True: + # Build the request, add the search after header to it if it's not None (e.g. after the first iteration) + req = Request(url) + if search_after_header is not None: + req.add_header('CMR-Search-After', search_after_header) + response = urlopen(req) + + # Build the results object, load entire result if it's the first time. + if results is None: + results = json.loads(response.read().decode()) + # if not the first time, add the new items to the existing array + else: + results['items'].extend(json.loads(response.read().decode())['items']) + + # get the new Search After header, if it's not set, we have all the results and we're done. + search_after_header = None + search_after_header = response.info()['CMR-Search-After'] + if search_after_header is not None: + logging.debug("Search After response header defined, paging CMR for more data.") + else: + break + # return all of the paged CMR results. return results def parse_start_times(results): try: - file_start_times = [(r['meta']['native-id'], datetime.strptime((r['umm']['TemporalExtent']['RangeDateTime']['BeginningDateTime']), "%Y-%m-%dT%H:%M:%S.%fZ")) for r in results['items']] # noqa E501 + file_start_times = [(r['meta']['native-id'], + datetime.strptime((r['umm']['TemporalExtent']['RangeDateTime']['BeginningDateTime']), + "%Y-%m-%dT%H:%M:%S.%fZ")) for r in results['items']] # noqa E501 except KeyError: raise ValueError('Could not locate start time for data.') return file_start_times @@ -287,9 +341,92 @@ def parse_start_times(results): def parse_cycles(results): try: - cycles = [(splitext(r['meta']['native-id'])[0],str(r['umm']['SpatialExtent']['HorizontalSpatialDomain']['Track']['Cycle'])) for r in results['items']] # noqa E501 + cycles = [(splitext(r['meta']['native-id'])[0], + str(r['umm']['SpatialExtent']['HorizontalSpatialDomain']['Track']['Cycle'])) for r in + results['items']] # noqa E501 except KeyError: raise ValueError('No cycles found within collection granules. ' 'Specify an output directory or ' 'choose another output directory flag other than -dc.') # noqa E501 return cycles + + + +def extract_checksums(granule_results): + """ + Create a dictionary containing checksum information from files. + + Parameters + ---------- + granule_results : dict + The cmr granule search results (umm_json format) + + Returns + ------- + A dictionary where the keys are filenames and the values are + checksum information (checksum value and checksum algorithm). + + For Example: + { + "some-granule-name.nc": { + "Value": "d96387295ea979fb8f7b9aa5f231c4ab", + "Algorithm": "MD5" + }, + "some-granule-name.nc.md5": { + "Value": '320876f087da0876edc0876ab0876b7a", + "Algorithm": "MD5" + }, + ... + } + """ + checksums = {} + for granule in granule_results["items"]: + try: + items = granule["umm"]["DataGranule"]["ArchiveAndDistributionInformation"] + for item in items: + try: + checksums[item["Name"]] = item["Checksum"] + except: + pass + except: + pass + return checksums + + +def checksum_does_match(file_path, checksums): + """ + Checks if a file's checksum matches a checksum in the checksums dict + + Parameters + ---------- + file_path : string + The relative or absolute path to an existing file + + checksums: dict + A dictionary where keys are filenames (not including the path) + and values are checksum information (checksum value and checksum algorithm) + + Returns + ------- + True - if the file's checksum matches a checksum in the checksum dict + False - if the file doesn't have a checksum, or if the checksum doesn't match + """ + filename = basename(file_path) + checksum = checksums.get(filename) + if not checksum: + return False + return make_checksum(file_path, checksum["Algorithm"]) == checksum["Value"] + + +def make_checksum(file_path, algorithm): + """ + Create checksum of file using the specified algorithm + """ + # Based on https://stackoverflow.com/questions/3431825/generating-an-md5-checksum-of-a-file#answer-3431838 + # with modification to handle multiple algorithms + hash = getattr(hashlib, algorithm.lower())() + + with open(file_path, 'rb') as f: + for chunk in iter(lambda: f.read(4096), b""): + hash.update(chunk) + return hash.hexdigest() diff --git a/subscriber/podaac_data_downloader.py b/subscriber/podaac_data_downloader.py index 442ac5b..abe69fa 100644 --- a/subscriber/podaac_data_downloader.py +++ b/subscriber/podaac_data_downloader.py @@ -2,26 +2,24 @@ import argparse import logging import os +import sys +from datetime import datetime, timedelta from os import makedirs -from os.path import isdir, basename, join +from os.path import isdir, basename, join, exists +from urllib.error import HTTPError from urllib.request import urlretrieve -from datetime import datetime, timedelta from subscriber import podaac_access as pa __version__ = pa.__version__ -LOGLEVEL = os.environ.get('PODAAC_LOGLEVEL', 'WARNING').upper() - -logging.basicConfig(level=LOGLEVEL) -logging.debug("Log level set to " + LOGLEVEL) - page_size = 2000 edl = pa.edl cmr = pa.cmr token_url = pa.token_url + # The lines below are to get the IP address. You can make this static and # assign a fixed value to the IPAddr variable @@ -37,13 +35,17 @@ def parse_cycles(cycle_input): def validate(args): if args.search_cycles is None and args.startDate is None and args.endDate is None: - raise ValueError("Error parsing command line arguments: one of [--start-date and --end-date] or [--cycles] are required") # noqa E501 + raise ValueError( + "Error parsing command line arguments: one of [--start-date and --end-date] or [--cycles] are required") # noqa E501 if args.search_cycles is not None and args.startDate is not None: - raise ValueError("Error parsing command line arguments: only one of -sd/--start-date and --cycles are allowed") # noqa E501 + raise ValueError( + "Error parsing command line arguments: only one of -sd/--start-date and --cycles are allowed") # noqa E501 if args.search_cycles is not None and args.endDate is not None: - raise ValueError("Error parsing command line arguments: only one of -ed/--end-date and --cycles are allowed") # noqa E50 + raise ValueError( + "Error parsing command line arguments: only one of -ed/--end-date and --cycles are allowed") # noqa E50 if None in [args.endDate, args.startDate] and args.search_cycles is None: - raise ValueError("Error parsing command line arguments: Both --start-date and --end-date must be specified") # noqa E50 + raise ValueError( + "Error parsing command line arguments: Both --start-date and --end-date must be specified") # noqa E50 def create_parser(): @@ -51,42 +53,65 @@ def create_parser(): parser = argparse.ArgumentParser(prog='PO.DAAC bulk-data downloader') # Adding Required arguments - parser.add_argument("-c", "--collection-shortname", dest="collection",required=True, help = "The collection shortname for which you want to retrieve data.") # noqa E501 - parser.add_argument("-d", "--data-dir", dest="outputDirectory", required=True, help = "The directory where data products will be downloaded.") # noqa E501 + parser.add_argument("-c", "--collection-shortname", dest="collection", required=True, + help="The collection shortname for which you want to retrieve data.") # noqa E501 + parser.add_argument("-d", "--data-dir", dest="outputDirectory", required=True, + help="The directory where data products will be downloaded.") # noqa E501 # Required through validation - parser.add_argument("--cycle", required=False, dest="search_cycles", help="Cycle number for determining downloads. can be repeated for multiple cycles", action='append', type=int) - parser.add_argument("-sd", "--start-date", required=False, dest="startDate", help="The ISO date time before which data should be retrieved. For Example, --start-date 2021-01-14T00:00:00Z") # noqa E501 - parser.add_argument("-ed", "--end-date", required=False, dest="endDate", help="The ISO date time after which data should be retrieved. For Example, --end-date 2021-01-14T00:00:00Z") # noqa E501 + parser.add_argument("--cycle", required=False, dest="search_cycles", + help="Cycle number for determining downloads. can be repeated for multiple cycles", + action='append', type=int) + parser.add_argument("-sd", "--start-date", required=False, dest="startDate", + help="The ISO date time before which data should be retrieved. For Example, --start-date 2021-01-14T00:00:00Z") # noqa E501 + parser.add_argument("-ed", "--end-date", required=False, dest="endDate", + help="The ISO date time after which data should be retrieved. For Example, --end-date 2021-01-14T00:00:00Z") # noqa E501 + # Adding optional arguments + parser.add_argument("-f", "--force", dest="force", action="store_true", help = "Flag to force downloading files that are listed in CMR query, even if the file exists and checksum matches") # noqa E501 # spatiotemporal arguments - parser.add_argument("-b", "--bounds", dest="bbox", help = "The bounding rectangle to filter result in. Format is W Longitude,S Latitude,E Longitude,N Latitude without spaces. Due to an issue with parsing arguments, to use this command, please use the -b=\"-180,-90,180,90\" syntax when calling from the command line. Default: \"-180,-90,180,90\".", default="-180,-90,180,90") # noqa E501 + parser.add_argument("-b", "--bounds", dest="bbox", + help="The bounding rectangle to filter result in. Format is W Longitude,S Latitude,E Longitude,N Latitude without spaces. Due to an issue with parsing arguments, to use this command, please use the -b=\"-180,-90,180,90\" syntax when calling from the command line. Default: \"-180,-90,180,90\".", + default="-180,-90,180,90") # noqa E501 # Arguments for how data are stored locally - much processing is based on # the underlying directory structure (e.g. year/Day-of-year) - parser.add_argument("-dc", dest="cycle", action="store_true", help = "Flag to use cycle number for directory where data products will be downloaded.") # noqa E501 - parser.add_argument("-dydoy", dest="dydoy", action="store_true", help = "Flag to use start time (Year/DOY) of downloaded data for directory where data products will be downloaded.") # noqa E501 - parser.add_argument("-dymd", dest="dymd", action="store_true", help = "Flag to use start time (Year/Month/Day) of downloaded data for directory where data products will be downloaded.") # noqa E501 - parser.add_argument("-dy", dest="dy", action="store_true", help = "Flag to use start time (Year) of downloaded data for directory where data products will be downloaded.") # noqa E501 - parser.add_argument("--offset", dest="offset", help = "Flag used to shift timestamp. Units are in hours, e.g. 10 or -10.") # noqa E501 - - parser.add_argument("-e", "--extensions", dest="extensions", help="The extensions of products to download. Default is [.nc, .h5, .zip, .tar.gz]", default=None, action='append') # noqa E501 - parser.add_argument("--process", dest="process_cmd", help="Processing command to run on each downloaded file (e.g., compression). Can be specified multiple times.", action='append') - - - parser.add_argument("--version", action="version", version='%(prog)s ' + __version__, help="Display script version information and exit.") # noqa E501 - parser.add_argument("--verbose", dest="verbose", action="store_true",help="Verbose mode.") # noqa E501 - parser.add_argument("-p", "--provider", dest="provider", default='POCLOUD', help="Specify a provider for collection search. Default is POCLOUD.") # noqa E501 - - parser.add_argument("--limit", dest="limit", default='2000', type=int, help="Integer limit for number of granules to download. Useful in testing. Defaults to " + str(page_size)) # noqa E501 + parser.add_argument("-dc", dest="cycle", action="store_true", + help="Flag to use cycle number for directory where data products will be downloaded.") # noqa E501 + parser.add_argument("-dydoy", dest="dydoy", action="store_true", + help="Flag to use start time (Year/DOY) of downloaded data for directory where data products will be downloaded.") # noqa E501 + parser.add_argument("-dymd", dest="dymd", action="store_true", + help="Flag to use start time (Year/Month/Day) of downloaded data for directory where data products will be downloaded.") # noqa E501 + parser.add_argument("-dy", dest="dy", action="store_true", + help="Flag to use start time (Year) of downloaded data for directory where data products will be downloaded.") # noqa E501 + parser.add_argument("--offset", dest="offset", + help="Flag used to shift timestamp. Units are in hours, e.g. 10 or -10.") # noqa E501 + + parser.add_argument("-e", "--extensions", dest="extensions", + help="The extensions of products to download. Default is [.nc, .h5, .zip, .tar.gz]", + default=None, action='append') # noqa E501 + parser.add_argument("--process", dest="process_cmd", + help="Processing command to run on each downloaded file (e.g., compression). Can be specified multiple times.", + action='append') + + parser.add_argument("--version", action="version", version='%(prog)s ' + __version__, + help="Display script version information and exit.") # noqa E501 + parser.add_argument("--verbose", dest="verbose", action="store_true", help="Verbose mode.") # noqa E501 + parser.add_argument("-p", "--provider", dest="provider", default='POCLOUD', + help="Specify a provider for collection search. Default is POCLOUD.") # noqa E501 + + parser.add_argument("--limit", dest="limit", default='2000', type=int, + help="Integer limit for number of granules to download. Useful in testing. Defaults to " + str( + page_size)) # noqa E501 return parser -def run(): - parser = create_parser() - args = parser.parse_args() +def run(args=None): + if args is None: + parser = create_parser() + args = parser.parse_args() try: pa.validate(args) @@ -98,8 +123,8 @@ def run(): validate(args) except ValueError as v: - print(v) - exit() + logging.error(str(v)) + exit(1) pa.setup_earthdata_login_auth(edl) token = pa.get_token(token_url, 'podaac-subscriber', edl) @@ -130,7 +155,7 @@ def run(): # This cell will replace the timestamp above with the one read from the `.update` file in the data directory, if it exists. if not isdir(data_path): - print("NOTE: Making new data directory at " + data_path + "(This is the first run.)") + logging.info("NOTE: Making new data directory at " + data_path + "(This is the first run.)") makedirs(data_path, exist_ok=True) # Change this to whatever extent you need. Format is W Longitude,S Latitude,E Longitude,N Latitude @@ -139,7 +164,6 @@ def run(): if search_cycles is not None: cmr_cycles = search_cycles params = [ - ('scroll', "true"), ('page_size', page_size), ('sort_key', "-start_date"), ('provider', provider), @@ -150,12 +174,12 @@ def run(): for v in cmr_cycles: params.append(("cycle[]", v)) if args.verbose: - print("cycles: " + str(cmr_cycles)) + logging.info("cycles: " + str(cmr_cycles)) else: - temporal_range = pa.get_temporal_range(start_date_time, end_date_time, datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")) # noqa E501 + temporal_range = pa.get_temporal_range(start_date_time, end_date_time, + datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")) # noqa E501 params = { - 'scroll': "true", 'page_size': page_size, 'sort_key': "-start_date", 'provider': provider, @@ -165,15 +189,24 @@ def run(): 'bounding_box': bounding_extent, } if args.verbose: - print("Temporal Range: " + temporal_range) + logging.info("Temporal Range: " + temporal_range) if args.verbose: - print("Provider: " + provider) + logging.info("Provider: " + provider) - results = pa.get_search_results(args, params) + # If 401 is raised, refresh token and try one more time + try: + results = pa.get_search_results(params, args.verbose) + except HTTPError as e: + if e.code == 401: + token = pa.refresh_token(token, 'podaac-subscriber') + params['token'] = token + results = pa.get_search_results(params, args.verbose) + else: + raise e if args.verbose: - print(str(results['hits'])+" granules found for "+short_name) # noqa E501 + logging.info(str(results['hits']) + " granules found for " + short_name) # noqa E501 if any([args.dy, args.dydoy, args.dymd]): file_start_times = pa.parse_start_times(results) @@ -181,8 +214,12 @@ def run(): cycles = pa.parse_cycles(results) downloads_all = [] - downloads_data = [[u['URL'] for u in r['umm']['RelatedUrls'] if u['Type'] == "GET DATA" and ('Subtype' not in u or u['Subtype'] != "OPENDAP DATA")] for r in results['items']] - downloads_metadata = [[u['URL'] for u in r['umm']['RelatedUrls'] if u['Type'] == "EXTENDED METADATA"] for r in results['items']] + downloads_data = [[u['URL'] for u in r['umm']['RelatedUrls'] if + u['Type'] == "GET DATA" and ('Subtype' not in u or u['Subtype'] != "OPENDAP DATA")] for r in + results['items']] + downloads_metadata = [[u['URL'] for u in r['umm']['RelatedUrls'] if u['Type'] == "EXTENDED METADATA"] for r in + results['items']] + checksums = pa.extract_checksums(results) for f in downloads_data: downloads_all.append(f) @@ -192,7 +229,8 @@ def run(): downloads = [item for sublist in downloads_all for item in sublist] if len(downloads) >= page_size: - print("Warning: only the most recent " + str(page_size) + " granules will be downloaded; try adjusting your search criteria (suggestion: reduce time period or spatial region of search) to ensure you retrieve all granules.") + logging.warning("Only the most recent " + str( + page_size) + " granules will be downloaded; try adjusting your search criteria (suggestion: reduce time period or spatial region of search) to ensure you retrieve all granules.") # filter list based on extension if not extensions: @@ -208,14 +246,14 @@ def run(): # https://github.com/podaac/data-subscriber/issues/33 # Make this a non-verbose message # if args.verbose: - print("Found " + str(len(downloads)) + " total files to download") + logging.info("Found " + str(len(downloads)) + " total files to download") if args.verbose: - print("Downloading files with extensions: " + str(extensions)) + logging.info("Downloading files with extensions: " + str(extensions)) # NEED TO REFACTOR THIS, A LOT OF STUFF in here # Finish by downloading the files to the data directory in a loop. # Overwrite `.update` with a new timestamp on success. - success_cnt = failure_cnt = 0 + success_cnt = failure_cnt = skip_cnt = 0 for f in downloads: try: # -d flag, args.outputDirectory @@ -228,21 +266,43 @@ def run(): if args.cycle: output_path = pa.prepare_cycles_output( cycles, data_path, f) + + # decide if we should actually download this file (e.g. we may already have the latest version) + if(exists(output_path) and not args.force and pa.checksum_does_match(output_path, checksums)): + logging.info(str(datetime.now()) + " SKIPPED: " + f) + skip_cnt += 1 + continue + urlretrieve(f, output_path) pa.process_file(process_cmd, output_path, args) - print(str(datetime.now()) + " SUCCESS: " + f) + logging.info(str(datetime.now()) + " SUCCESS: " + f) success_cnt = success_cnt + 1 - except Exception as e: - print(str(datetime.now()) + " FAILURE: " + f) + except Exception: + logging.warning(str(datetime.now()) + " FAILURE: " + f, exc_info=True) failure_cnt = failure_cnt + 1 - print(e) - print("Downloaded: " + str(success_cnt) + " files\n") - print("Files Failed to download:" + str(failure_cnt) + "\n") + logging.info("Downloaded Files: " + str(success_cnt)) + logging.info("Failed Files: " + str(failure_cnt)) + logging.info("Skipped Files: " + str(skip_cnt)) pa.delete_token(token_url, token) - print("END \n\n") - exit(0) + logging.info("END\n\n") + + + + +def main(): + log_level = os.environ.get('PODAAC_LOGLEVEL', 'INFO').upper() + logging.basicConfig(stream=sys.stdout, + format='[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s', + level=log_level) + logging.debug("Log level set to " + log_level) + + try: + run() + except Exception as e: + logging.exception("Uncaught exception occurred during execution.") + exit(hash(e)) if __name__ == '__main__': - run() + main() diff --git a/subscriber/podaac_data_subscriber.py b/subscriber/podaac_data_subscriber.py index d749453..66d79cc 100755 --- a/subscriber/podaac_data_subscriber.py +++ b/subscriber/podaac_data_subscriber.py @@ -15,19 +15,17 @@ import argparse import logging import os +import sys +from datetime import datetime, timedelta from os import makedirs -from os.path import isdir, basename, join, isfile +from os.path import isdir, basename, join, isfile, exists +from urllib.error import HTTPError from urllib.request import urlretrieve -from datetime import datetime, timedelta from subscriber import podaac_access as pa __version__ = pa.__version__ -LOGLEVEL = os.environ.get('PODAAC_LOGLEVEL', 'WARNING').upper() -logging.basicConfig(level=LOGLEVEL) -logging.debug("Log level set to " + LOGLEVEL) - page_size = 2000 edl = pa.edl @@ -39,7 +37,9 @@ def get_update_file(data_dir, collection_name): if isfile(data_dir + "/.update__" + collection_name): return data_dir + "/.update__" + collection_name elif isfile(data_dir + "/.update"): - print("WARNING: found a deprecated use of '.update' file at {0}. After this run it will be renamed to {1}".format(data_dir + "/.update", data_dir + "/.update__" + collection_name)) + logging.warning( + "found a deprecated use of '.update' file at {0}. After this run it will be renamed to {1}".format( + data_dir + "/.update", data_dir + "/.update__" + collection_name)) return data_dir + "/.update" return None @@ -47,7 +47,8 @@ def get_update_file(data_dir, collection_name): def validate(args): if args.minutes is None and args.startDate is False and args.endDate is False: - raise ValueError("Error parsing command line arguments: one of --start-date, --end-date or --minutes are required") + raise ValueError( + "Error parsing command line arguments: one of --start-date, --end-date or --minutes are required") def create_parser(): @@ -55,45 +56,70 @@ def create_parser(): parser = argparse.ArgumentParser(prog='PO.DAAC data subscriber') # Adding Required arguments - parser.add_argument("-c", "--collection-shortname", dest="collection",required=True, help = "The collection shortname for which you want to retrieve data.") # noqa E501 - parser.add_argument("-d", "--data-dir", dest="outputDirectory", required=True, help = "The directory where data products will be downloaded.") # noqa E501 + parser.add_argument("-c", "--collection-shortname", dest="collection", required=True, + help="The collection shortname for which you want to retrieve data.") # noqa E501 + parser.add_argument("-d", "--data-dir", dest="outputDirectory", required=True, + help="The directory where data products will be downloaded.") # noqa E501 # Adding optional arguments + parser.add_argument("-f", "--force", dest="force", action="store_true", help = "Flag to force downloading files that are listed in CMR query, even if the file exists and checksum matches") # noqa E501 # spatiotemporal arguments - parser.add_argument("-sd", "--start-date", dest="startDate", help = "The ISO date time before which data should be retrieved. For Example, --start-date 2021-01-14T00:00:00Z", default=False) # noqa E501 - parser.add_argument("-ed", "--end-date", dest="endDate", help = "The ISO date time after which data should be retrieved. For Example, --end-date 2021-01-14T00:00:00Z", default=False) # noqa E501 - parser.add_argument("-b", "--bounds", dest="bbox", help = "The bounding rectangle to filter result in. Format is W Longitude,S Latitude,E Longitude,N Latitude without spaces. Due to an issue with parsing arguments, to use this command, please use the -b=\"-180,-90,180,90\" syntax when calling from the command line. Default: \"-180,-90,180,90\".", default="-180,-90,180,90") # noqa E501 + parser.add_argument("-sd", "--start-date", dest="startDate", + help="The ISO date time before which data should be retrieved. For Example, --start-date 2021-01-14T00:00:00Z", + default=False) # noqa E501 + parser.add_argument("-ed", "--end-date", dest="endDate", + help="The ISO date time after which data should be retrieved. For Example, --end-date 2021-01-14T00:00:00Z", + default=False) # noqa E501 + parser.add_argument("-b", "--bounds", dest="bbox", + help="The bounding rectangle to filter result in. Format is W Longitude,S Latitude,E Longitude,N Latitude without spaces. Due to an issue with parsing arguments, to use this command, please use the -b=\"-180,-90,180,90\" syntax when calling from the command line. Default: \"-180,-90,180,90\".", + default="-180,-90,180,90") # noqa E501 # Arguments for how data are stored locally - much processing is based on # the underlying directory structure (e.g. year/Day-of-year) - parser.add_argument("-dc", dest="cycle", action="store_true", help = "Flag to use cycle number for directory where data products will be downloaded.") # noqa E501 - parser.add_argument("-dydoy", dest="dydoy", action="store_true", help = "Flag to use start time (Year/DOY) of downloaded data for directory where data products will be downloaded.") # noqa E501 - parser.add_argument("-dymd", dest="dymd", action="store_true", help = "Flag to use start time (Year/Month/Day) of downloaded data for directory where data products will be downloaded.") # noqa E501 - parser.add_argument("-dy", dest="dy", action="store_true", help = "Flag to use start time (Year) of downloaded data for directory where data products will be downloaded.") # noqa E501 - parser.add_argument("--offset", dest="offset", help = "Flag used to shift timestamp. Units are in hours, e.g. 10 or -10.") # noqa E501 - - parser.add_argument("-m", "--minutes", dest="minutes", help = "How far back in time, in minutes, should the script look for data. If running this script as a cron, this value should be equal to or greater than how often your cron runs.", type=int, default=None) # noqa E501 - parser.add_argument("-e", "--extensions", dest="extensions", help = "The extensions of products to download. Default is [.nc, .h5, .zip]", default=None, action='append') # noqa E501 - parser.add_argument("--process", dest="process_cmd", help="Processing command to run on each downloaded file (e.g., compression). Can be specified multiple times.", action='append') + parser.add_argument("-dc", dest="cycle", action="store_true", + help="Flag to use cycle number for directory where data products will be downloaded.") # noqa E501 + parser.add_argument("-dydoy", dest="dydoy", action="store_true", + help="Flag to use start time (Year/DOY) of downloaded data for directory where data products will be downloaded.") # noqa E501 + parser.add_argument("-dymd", dest="dymd", action="store_true", + help="Flag to use start time (Year/Month/Day) of downloaded data for directory where data products will be downloaded.") # noqa E501 + parser.add_argument("-dy", dest="dy", action="store_true", + help="Flag to use start time (Year) of downloaded data for directory where data products will be downloaded.") # noqa E501 + parser.add_argument("--offset", dest="offset", + help="Flag used to shift timestamp. Units are in hours, e.g. 10 or -10.") # noqa E501 + + parser.add_argument("-m", "--minutes", dest="minutes", + help="How far back in time, in minutes, should the script look for data. If running this script as a cron, this value should be equal to or greater than how often your cron runs.", + type=int, default=None) # noqa E501 + parser.add_argument("-e", "--extensions", dest="extensions", + help="The extensions of products to download. Default is [.nc, .h5, .zip]", default=None, + action='append') # noqa E501 + parser.add_argument("--process", dest="process_cmd", + help="Processing command to run on each downloaded file (e.g., compression). Can be specified multiple times.", + action='append') + + parser.add_argument("--version", action="version", version='%(prog)s ' + __version__, + help="Display script version information and exit.") # noqa E501 + parser.add_argument("--verbose", dest="verbose", action="store_true", help="Verbose mode.") # noqa E501 + + parser.add_argument("-p", "--provider", dest="provider", default='POCLOUD', + help="Specify a provider for collection search. Default is POCLOUD.") # noqa E501 + return parser - parser.add_argument("--version", action="version", version='%(prog)s ' + __version__, help="Display script version information and exit.") # noqa E501 - parser.add_argument("--verbose", dest="verbose", action="store_true", help="Verbose mode.") # noqa E501 - parser.add_argument("-p", "--provider", dest="provider", default='POCLOUD', help="Specify a provider for collection search. Default is POCLOUD.") # noqa E501 - return parser +def run(args=None): + if args is None: + parser = create_parser() + args = parser.parse_args() -def run(): - parser = create_parser() - args = parser.parse_args() try: pa.validate(args) validate(args) except ValueError as v: - print(v) - exit() + logging.error(str(v)) + exit(1) pa.setup_earthdata_login_auth(edl) token = pa.get_token(token_url, 'podaac-subscriber', edl) @@ -136,7 +162,7 @@ def run(): # This cell will replace the timestamp above with the one read from the `.update` file in the data directory, if it exists. if not isdir(data_path): - print("NOTE: Making new data directory at " + data_path + "(This is the first run.)") + logging.info("NOTE: Making new data directory at " + data_path + "(This is the first run.)") makedirs(data_path, exist_ok=True) else: @@ -145,11 +171,12 @@ def run(): try: with open(update_file, "r") as f: data_within_last_timestamp = f.read().strip() - print("NOTE: Update found in the data directory. (The last run was at " + data_within_last_timestamp + ".)") + logging.info( + "NOTE: Update found in the data directory. (The last run was at " + data_within_last_timestamp + ".)") except FileNotFoundError: - print("WARN: No .update in the data directory. (Is this the first run?)") + logging.warning("No .update in the data directory. (Is this the first run?)") else: - print("WARN: No .update__" + short_name + " in the data directory. (Is this the first run?)") + logging.warning("No .update__" + short_name + " in the data directory. (Is this the first run?)") # Change this to whatever extent you need. Format is W Longitude,S Latitude,E Longitude,N Latitude bounding_extent = args.bbox @@ -163,10 +190,10 @@ def run(): if defined_time_range: # if(data_since): - temporal_range = pa.get_temporal_range(start_date_time, end_date_time, datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")) # noqa E501 + temporal_range = pa.get_temporal_range(start_date_time, end_date_time, + datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")) # noqa E501 params = { - 'scroll': "true", 'page_size': page_size, 'sort_key': "-start_date", 'provider': provider, @@ -178,7 +205,6 @@ def run(): if defined_time_range: params = { - 'scroll': "true", 'page_size': page_size, 'sort_key': "-start_date", 'provider': provider, @@ -190,16 +216,26 @@ def run(): } if args.verbose: - print("Temporal Range: " + temporal_range) + logging.info("Temporal Range: " + temporal_range) if args.verbose: - print("Provider: " + provider) - print("Updated Since: " + data_within_last_timestamp) + logging.info("Provider: " + provider) + logging.info("Updated Since: " + data_within_last_timestamp) - results = pa.get_search_results(args, params) + # If 401 is raised, refresh token and try one more time + try: + results = pa.get_search_results(params, args.verbose) + except HTTPError as e: + if e.code == 401: + token = pa.refresh_token(token, 'podaac-subscriber') + params['token'] = token + results = pa.get_search_results(params, args.verbose) + else: + raise e if args.verbose: - print(str(results['hits'])+" new granules found for "+short_name+" since "+data_within_last_timestamp) # noqa E501 + logging.info(str(results[ + 'hits']) + " new granules found for " + short_name + " since " + data_within_last_timestamp) # noqa E501 if any([args.dy, args.dydoy, args.dymd]): file_start_times = pa.parse_start_times(results) @@ -209,8 +245,12 @@ def run(): timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") downloads_all = [] - downloads_data = [[u['URL'] for u in r['umm']['RelatedUrls'] if u['Type'] == "GET DATA" and ('Subtype' not in u or u['Subtype'] != "OPENDAP DATA")] for r in results['items']] - downloads_metadata = [[u['URL'] for u in r['umm']['RelatedUrls'] if u['Type'] == "EXTENDED METADATA"] for r in results['items']] + downloads_data = [[u['URL'] for u in r['umm']['RelatedUrls'] if + u['Type'] == "GET DATA" and ('Subtype' not in u or u['Subtype'] != "OPENDAP DATA")] for r in + results['items']] + downloads_metadata = [[u['URL'] for u in r['umm']['RelatedUrls'] if u['Type'] == "EXTENDED METADATA"] for r in + results['items']] + checksums = pa.extract_checksums(results) for f in downloads_data: downloads_all.append(f) @@ -220,7 +260,8 @@ def run(): downloads = [item for sublist in downloads_all for item in sublist] if len(downloads) >= page_size: - print("Warning: only the most recent " + str(page_size) + " granules will be downloaded; try adjusting your search criteria (suggestion: reduce time period or spatial region of search) to ensure you retrieve all granules.") + logging.warning("Only the most recent " + str( + page_size) + " granules will be downloaded; try adjusting your search criteria (suggestion: reduce time period or spatial region of search) to ensure you retrieve all granules.") # filter list based on extension if not extensions: @@ -236,14 +277,14 @@ def run(): # https://github.com/podaac/data-subscriber/issues/33 # Make this a non-verbose message # if args.verbose: - print("Found " + str(len(downloads)) + " total files to download") + logging.info("Found " + str(len(downloads)) + " total files to download") if args.verbose: - print("Downloading files with extensions: " + str(extensions)) + logging.info("Downloading files with extensions: " + str(extensions)) # NEED TO REFACTOR THIS, A LOT OF STUFF in here # Finish by downloading the files to the data directory in a loop. # Overwrite `.update` with a new timestamp on success. - success_cnt = failure_cnt = 0 + success_cnt = failure_cnt = skip_cnt = 0 for f in downloads: try: # -d flag, args.outputDirectory @@ -256,14 +297,20 @@ def run(): if args.cycle: output_path = pa.prepare_cycles_output( cycles, data_path, f) + + # decide if we should actually download this file (e.g. we may already have the latest version) + if(exists(output_path) and not args.force and pa.checksum_does_match(output_path, checksums)): + logging.info(str(datetime.now()) + " SKIPPED: " + f) + skip_cnt += 1 + continue + urlretrieve(f, output_path) pa.process_file(process_cmd, output_path, args) - print(str(datetime.now()) + " SUCCESS: " + f) + logging.info(str(datetime.now()) + " SUCCESS: " + f) success_cnt = success_cnt + 1 - except Exception as e: - print(str(datetime.now()) + " FAILURE: " + f) + except Exception: + logging.warning(str(datetime.now()) + " FAILURE: " + f, exc_info=True) failure_cnt = failure_cnt + 1 - print(e) # If there were updates to the local time series during this run and no # exceptions were raised during the download loop, then overwrite the @@ -274,12 +321,27 @@ def run(): with open(data_path + "/.update__" + short_name, "w") as f: f.write(timestamp) - print("Downloaded: " + str(success_cnt) + " files\n") - print("Files Failed to download:" + str(failure_cnt) + "\n") + logging.info("Downloaded Files: " + str(success_cnt)) + logging.info("Failed Files: " + str(failure_cnt)) + logging.info("Skipped Files: " + str(skip_cnt)) pa.delete_token(token_url, token) - print("END \n\n") - exit(0) + logging.info("END\n\n") + #exit(0) + + +def main(): + log_level = os.environ.get('PODAAC_LOGLEVEL', 'INFO').upper() + logging.basicConfig(stream=sys.stdout, + format='[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s', + level=log_level) + logging.debug("Log level set to " + log_level) + + try: + run() + except Exception as e: + logging.exception("Uncaught exception occurred during execution.") + exit(hash(e)) if __name__ == '__main__': - run() + main() diff --git a/tests/MANUAL.md b/tests/MANUAL.md index 52a1e1e..6fe9608 100644 --- a/tests/MANUAL.md +++ b/tests/MANUAL.md @@ -3,7 +3,7 @@ ## Subscriber -### Test 1 +### Test 1 - added to test_regression.py use to test: * download to `this` directory. * download using only 'enddate' @@ -29,7 +29,7 @@ ls -rth .update__ECCO_L4_ATM_STATE_05DEG_DAILY_V4R4 .update__ECCO_L4_ATM_STATE_05DEG_DAILY_V4R4 ``` -### Test 2 +### Test 2 - added to regression test use to test: * cycle based directory layouts * Bounding box limiting search results @@ -54,7 +54,7 @@ JASON_CS_S6A_L2_ALT_LR_STD_OST_NRT_F/ ``` -### Test 3 +### Test 3 -- added to regression, but not the .update file log message portion use to test: * offset Usage * start/end date is working @@ -137,7 +137,7 @@ MUR25-JPL-L4-GLOB-v04.2/ 4 directories, 2 files ``` - +### Test 1 Download by cycle ``` rm -r JASON_CS_S6A_L2_ALT_LR_STD_OST_NRT_F diff --git a/tests/test_downloader_regression.py b/tests/test_downloader_regression.py new file mode 100644 index 0000000..852cbf3 --- /dev/null +++ b/tests/test_downloader_regression.py @@ -0,0 +1,49 @@ +import pytest +import os +from os.path import exists +from subscriber import podaac_data_downloader as pdd +import shutil +from pathlib import Path + +# REGRESSION TEST CURRENTLY REQUIRES A .NETRC file for CMR/Data Download + +def create_downloader_args(args): + parser = pdd.create_parser() + args2 = parser.parse_args(args) + return args2 + +#Test the downlaoder on MUR25 data for start/stop/, yyyy/mmm/dd dir structure, +# and offset. Running it a second time to ensure it downlaods the files again- +# the downloader doesn't care about updates. +@pytest.mark.regression +def test_downloader_MUR(): + shutil.rmtree('./MUR25-JPL-L4-GLOB-v04.2', ignore_errors=True) + args2 = create_downloader_args('-c MUR25-JPL-L4-GLOB-v04.2 -d ./MUR25-JPL-L4-GLOB-v04.2 -sd 2020-01-01T00:00:00Z -ed 2020-01-02T00:00:00Z -dymd --offset 4'.split()) + pdd.run(args2) + assert exists('./MUR25-JPL-L4-GLOB-v04.2/2020/01/01/20200101090000-JPL-L4_GHRSST-SSTfnd-MUR25-GLOB-v02.0-fv04.2.nc') + assert exists('./MUR25-JPL-L4-GLOB-v04.2/2020/01/02/20200102090000-JPL-L4_GHRSST-SSTfnd-MUR25-GLOB-v02.0-fv04.2.nc') + t1 = os.path.getmtime('./MUR25-JPL-L4-GLOB-v04.2/2020/01/01/20200101090000-JPL-L4_GHRSST-SSTfnd-MUR25-GLOB-v02.0-fv04.2.nc') + t2 = os.path.getmtime('./MUR25-JPL-L4-GLOB-v04.2/2020/01/02/20200102090000-JPL-L4_GHRSST-SSTfnd-MUR25-GLOB-v02.0-fv04.2.nc') + + # this part of the test should not re-download the files unless the --force + # option is used. + pdd.run(args2) + assert t1 == os.path.getmtime('./MUR25-JPL-L4-GLOB-v04.2/2020/01/01/20200101090000-JPL-L4_GHRSST-SSTfnd-MUR25-GLOB-v02.0-fv04.2.nc') + assert t2 == os.path.getmtime('./MUR25-JPL-L4-GLOB-v04.2/2020/01/02/20200102090000-JPL-L4_GHRSST-SSTfnd-MUR25-GLOB-v02.0-fv04.2.nc') + + # Update a file to change the checksum, then re-download + os.remove('./MUR25-JPL-L4-GLOB-v04.2/2020/01/01/20200101090000-JPL-L4_GHRSST-SSTfnd-MUR25-GLOB-v02.0-fv04.2.nc') + Path('./MUR25-JPL-L4-GLOB-v04.2/2020/01/01/20200101090000-JPL-L4_GHRSST-SSTfnd-MUR25-GLOB-v02.0-fv04.2.nc').touch() + pdd.run(args2) + assert t1 != os.path.getmtime('./MUR25-JPL-L4-GLOB-v04.2/2020/01/01/20200101090000-JPL-L4_GHRSST-SSTfnd-MUR25-GLOB-v02.0-fv04.2.nc') + assert t2 == os.path.getmtime('./MUR25-JPL-L4-GLOB-v04.2/2020/01/02/20200102090000-JPL-L4_GHRSST-SSTfnd-MUR25-GLOB-v02.0-fv04.2.nc') + + t1 = os.path.getmtime('./MUR25-JPL-L4-GLOB-v04.2/2020/01/01/20200101090000-JPL-L4_GHRSST-SSTfnd-MUR25-GLOB-v02.0-fv04.2.nc') + + # Set the args to --force to re-download those data + args2 = create_downloader_args('-c MUR25-JPL-L4-GLOB-v04.2 -d ./MUR25-JPL-L4-GLOB-v04.2 -sd 2020-01-01T00:00:00Z -ed 2020-01-02T00:00:00Z -dymd --offset 4 -f'.split()) + pdd.run(args2) + assert t1 != os.path.getmtime('./MUR25-JPL-L4-GLOB-v04.2/2020/01/01/20200101090000-JPL-L4_GHRSST-SSTfnd-MUR25-GLOB-v02.0-fv04.2.nc') + assert t2 != os.path.getmtime('./MUR25-JPL-L4-GLOB-v04.2/2020/01/02/20200102090000-JPL-L4_GHRSST-SSTfnd-MUR25-GLOB-v02.0-fv04.2.nc') + + shutil.rmtree('./MUR25-JPL-L4-GLOB-v04.2') diff --git a/tests/test_subscriber.py b/tests/test_subscriber.py index 675c2b4..2e9d4cb 100644 --- a/tests/test_subscriber.py +++ b/tests/test_subscriber.py @@ -24,6 +24,22 @@ def cleanup_update_test(): shutil.rmtree(data_dir_with_updates) +def test_search_after(): + # cmr query: https://cmr.earthdata.nasa.gov/search/granules.umm_json?page_size=2000&sort_key=-start_date&provider=POCLOUD&ShortName=JASON_CS_S6A_L2_ALT_LR_STD_OST_NRT_F&temporal=2000-01-01T10%3A00%3A00Z%2C2022-04-15T00%3A00%3A00Z&bounding_box=-180%2C-90%2C180%2C90 + # requires page-After + # ends up with 3748 granules + params = { + 'page_size': 2000, + 'sort_key': "-start_date", + 'provider': "POCLOUD", + 'ShortName': "JASON_CS_S6A_L2_ALT_LR_STD_OST_NRT_F", + 'temporal': "2000-01-01T10:00:00Z,2022-04-15T00:00:00Z", + 'bounding_box': "-180,-90,180,90", + } + results = pa.get_search_results(params, True) + assert results['hits'] == 3748 + assert len(results['items']) == 3748 + def test_update_format_change(cleanup_update_test): print("Running Test") data_dir_with_updates = "./test_update_format_change" diff --git a/tests/test_subscriber_extracting_checksums.py b/tests/test_subscriber_extracting_checksums.py new file mode 100644 index 0000000..87a5f00 --- /dev/null +++ b/tests/test_subscriber_extracting_checksums.py @@ -0,0 +1,113 @@ +import json +from subscriber.podaac_access import extract_checksums + +minimal_granule_search_results = """{ + "hits": 13, + "took": 51, + "items": [ + { + "umm": { + "DataGranule": { + "ArchiveAndDistributionInformation": [ + { + "SizeUnit": "MB", + "Size": 4.312029838562012, + "Checksum": { + "Value": "d96387295ea979fb8f7b9aa5f231c4ab", + "Algorithm": "MD5" + }, + "SizeInBytes": 4521491, + "Name": "20211231000000-REMSS-L3U_GHRSST-SSTsubskin-AMSR2-f34_20211231v8-v02.0-fv01.0.nc" + }, + { + "SizeUnit": "MB", + "Size": 1.068115234375e-4, + "Checksum": { + "Value": "8704789dd2cad4554481f6e438acb376", + "Algorithm": "MD5" + }, + "SizeInBytes": 112, + "Name": "20211231000000-REMSS-L3U_GHRSST-SSTsubskin-AMSR2-f34_20211231v8-v02.0-fv01.0.nc.md5" + } + ] + } + } + }, + { + "umm": { + "DataGranule": { + "ArchiveAndDistributionInformation": [ + { + "SizeUnit": "MB", + "Size": 4.267633438110352, + "SizeInBytes": 4474938, + "Name": "this-shouldnt-be-counted-because-theres-no-checksum-info.nc" + } + ] + } + } + }, + { + "umm": { + "DataGranule": { + "ArchiveAndDistributionInformation": [ + { + "SizeUnit": "MB", + "Size": 4.267633438110352, + "SizeInBytes": 4474938, + "Name": "this-also-shouldnt-be-counted-because-no-checksum-info.nc" + }, + { + "SizeUnit": "MB", + "Size": 4.267633438110352, + "Checksum": { + "Value": "98d330cad6d1233c258178bcc07102d6", + "Algorithm": "MD5" + }, + "SizeInBytes": 4474938, + "Name": "this-should-be-counted.nc" + } + ] + } + } + }, + { + "umm": { + "DataGranule": { + "ArchiveAndDistributionInformation": [ + { + "SizeUnit": "MB", + "Size": 4.267633438110352, + "Checksum": { + "Value": "98d330cad6d1233c258178bcc07102d6", + "Algorithm": "MD5" + }, + "SizeInBytes": 4474938, + "Name": "20220101000000-REMSS-L3U_GHRSST-SSTsubskin-AMSR2-f34_20220101v8-v02.0-fv01.0.nc" + }, + { + "SizeUnit": "MB", + "Size": 1.068115234375e-4, + "Checksum": { + "Value": "667a931589ec574acbf8791b73aeff1a", + "Algorithm": "MD5" + }, + "SizeInBytes": 112, + "Name": "20220101000000-REMSS-L3U_GHRSST-SSTsubskin-AMSR2-f34_20220101v8-v02.0-fv01.0.nc.md5" + } + ] + } + } + } + ] +} +""" + +def test_extract_checksums(): + checksums = extract_checksums(json.loads(minimal_granule_search_results)) + assert checksums["20211231000000-REMSS-L3U_GHRSST-SSTsubskin-AMSR2-f34_20211231v8-v02.0-fv01.0.nc"] == { + "Value": "d96387295ea979fb8f7b9aa5f231c4ab", + "Algorithm": "MD5" + } + assert len(checksums) == 5 + diff --git a/tests/test_subscriber_matching_checksums.py b/tests/test_subscriber_matching_checksums.py new file mode 100644 index 0000000..cd67a80 --- /dev/null +++ b/tests/test_subscriber_matching_checksums.py @@ -0,0 +1,72 @@ +from subscriber.podaac_access import checksum_does_match + +def test_checksum_does_match__positive_match_md5(tmpdir): + output_path = str(tmpdir) + '/tmp.nc' + checksums = { + "tmp.nc": { + "Value": "f83f9ad1718d9b95220ddd6b18dbcecf", + "Algorithm": "MD5" + } + } + + with open(output_path, 'w') as f: + f.write("This is a temporary test file\n") + + assert checksum_does_match(output_path, checksums) + + +def test_checksum_does_match__negative_match_md5(tmpdir): + output_path = str(tmpdir) + '/tmp.nc' + checksums = { + "tmp.nc": { + "Value": "f83f9ad1718d9b95220ddd6b18dbcecf", + "Algorithm": "MD5" + } + } + + with open(output_path, 'w') as f: + f.write("This is a different temporary test file\n") + + assert not checksum_does_match(output_path, checksums) + + +def test_checksum_does_match__positive_match_sha512(tmpdir): + output_path = str(tmpdir) + '/tmp.nc' + checksums = { + "tmp.nc": { + "Value": "3f5bda96115a5d8fcbcbd71bc28ade2de24bba5f48ce485012f933c877d279d78be3ad028f69af620325a010ce34bd19be78c8b6bf083b0d523165ede8669483", + "Algorithm": "SHA512" + } + } + + with open(output_path, 'w') as f: + f.write("This is a temporary test file\n") + + assert checksum_does_match(output_path, checksums) + + +def test_checksum_does_match__negative_match_sha512(tmpdir): + output_path = str(tmpdir) + '/tmp.nc' + checksums = { + "tmp.nc": { + "Value": "3f5bda96115a5d8fcbcbd71bc28ade2de24bba5f48ce485012f933c877d279d78be3ad028f69af620325a010ce34bd19be78c8b6bf083b0d523165ede8669483", + "Algorithm": "SHA512" + } + } + + with open(output_path, 'w') as f: + f.write("This is a different temporary test file\n") + + assert not checksum_does_match(output_path, checksums) + + +def test_checksum_does_match__with_no_checksum(tmpdir): + output_path = str(tmpdir) + '/tmp.nc' + checksums = { + "tmp.nc": None + } + + with open(output_path, 'w') as f: + f.write("This is a temporary test file\n") + + assert not checksum_does_match(output_path, checksums) \ No newline at end of file diff --git a/tests/test_subscriber_regression.py b/tests/test_subscriber_regression.py new file mode 100644 index 0000000..07dc2f8 --- /dev/null +++ b/tests/test_subscriber_regression.py @@ -0,0 +1,71 @@ +import pytest +import os +from os.path import exists +from subscriber import podaac_data_subscriber as pds +from subscriber import podaac_data_downloader as pdd +import shutil + +# REGRESSION TEST CURRENTLY REQUIRES A .NETRC file for CMR/Data Download +# +def create_args(args): + parser = pds.create_parser() + args2 = parser.parse_args(args) + return args2 + +# Test to download ECCO data by start/stop date and put it in the year/doy dir +# structure. +@pytest.mark.regression +def test_subscriber_ecco_only_enddate(): + args2 = create_args('-c ECCO_L4_ATM_STATE_05DEG_DAILY_V4R4 -ed 1992-01-03T00:00:00Z -d ./ECCO_L4_ATM_STATE_05DEG_DAILY_V4R4 -dydoy'.split()) + pds.run(args2) + assert exists('./ECCO_L4_ATM_STATE_05DEG_DAILY_V4R4/1992/001/ATM_SURFACE_TEMP_HUM_WIND_PRES_day_mean_1992-01-01_ECCO_V4r4_latlon_0p50deg.nc') + assert exists('./ECCO_L4_ATM_STATE_05DEG_DAILY_V4R4/1992/002/ATM_SURFACE_TEMP_HUM_WIND_PRES_day_mean_1992-01-02_ECCO_V4r4_latlon_0p50deg.nc') + assert exists('./ECCO_L4_ATM_STATE_05DEG_DAILY_V4R4/1992/003/ATM_SURFACE_TEMP_HUM_WIND_PRES_day_mean_1992-01-03_ECCO_V4r4_latlon_0p50deg.nc') + shutil.rmtree('./ECCO_L4_ATM_STATE_05DEG_DAILY_V4R4') + +# test to download S6 data by start/stop time, and bbox, and put it in the +# cycle based directory structure +@pytest.mark.regression +def test_subscriber_cycle_bbox(): + args2 = create_args('-c JASON_CS_S6A_L2_ALT_LR_STD_OST_NRT_F -d ./JASON_CS_S6A_L2_ALT_LR_STD_OST_NRT_F -dc -sd 2022-01-01T00:00:00Z -ed 2022-01-02T00:00:00Z -b=-20,-20,20,20'.split()) + pds.run(args2) + assert exists('./JASON_CS_S6A_L2_ALT_LR_STD_OST_NRT_F/c0042/S6A_P4_2__LR_STD__NR_042_071_20211231T232728_20220101T012144_F04.nc') + assert exists('./JASON_CS_S6A_L2_ALT_LR_STD_OST_NRT_F/c0042/S6A_P4_2__LR_STD__NR_042_082_20220101T090557_20220101T104242_F04.nc') + assert exists('./JASON_CS_S6A_L2_ALT_LR_STD_OST_NRT_F/c0042/S6A_P4_2__LR_STD__NR_042_083_20220101T104242_20220101T123506_F04.nc') + assert exists('./JASON_CS_S6A_L2_ALT_LR_STD_OST_NRT_F/c0042/S6A_P4_2__LR_STD__NR_042_095_20220101T215702_20220101T234905_F04.nc') + assert exists('./JASON_CS_S6A_L2_ALT_LR_STD_OST_NRT_F/c0042/S6A_P4_2__LR_STD__NR_042_097_20220101T234905_20220102T014431_F04.nc') + assert exists('./JASON_CS_S6A_L2_ALT_LR_STD_OST_NRT_F/.update__JASON_CS_S6A_L2_ALT_LR_STD_OST_NRT_F') + shutil.rmtree('./JASON_CS_S6A_L2_ALT_LR_STD_OST_NRT_F') + +# Test to download MUR25 data by start/stop, put it in yyyy/mm/dd dir structure, +# using the offset so it aligns with the right day in the filename. +# +# Test will run it again, to ensure that the files are not re-downlaoded, that +# is, they have the same modified time before/after the second run +@pytest.mark.regression +def test_subscriber_MUR_update_file_no_redownload(): + try: + os.remove('MUR25-JPL-L4-GLOB-v04.2/.update') + except OSError as e: + print("Expecting this...") + try: + os.remove('MUR25-JPL-L4-GLOB-v04.2/..update__MUR25-JPL-L4-GLOB-v04.2') + except OSError as e: + print("Expecting this...") + + args2 = create_args('-c MUR25-JPL-L4-GLOB-v04.2 -d ./MUR25-JPL-L4-GLOB-v04.2 -sd 2020-01-01T00:00:00Z -ed 2020-01-02T00:00:00Z -dymd --offset 4'.split()) + pds.run(args2) + assert exists('./MUR25-JPL-L4-GLOB-v04.2/2020/01/01/20200101090000-JPL-L4_GHRSST-SSTfnd-MUR25-GLOB-v02.0-fv04.2.nc') + assert exists('./MUR25-JPL-L4-GLOB-v04.2/2020/01/02/20200102090000-JPL-L4_GHRSST-SSTfnd-MUR25-GLOB-v02.0-fv04.2.nc') + assert exists('./MUR25-JPL-L4-GLOB-v04.2/.update__MUR25-JPL-L4-GLOB-v04.2') + t1 = os.path.getmtime('./MUR25-JPL-L4-GLOB-v04.2/2020/01/01/20200101090000-JPL-L4_GHRSST-SSTfnd-MUR25-GLOB-v02.0-fv04.2.nc') + t2 = os.path.getmtime('./MUR25-JPL-L4-GLOB-v04.2/2020/01/02/20200102090000-JPL-L4_GHRSST-SSTfnd-MUR25-GLOB-v02.0-fv04.2.nc') + + # Compare another run to existing times to ensure it didn't redownload the file + pds.run(args2) + assert exists('./MUR25-JPL-L4-GLOB-v04.2/2020/01/01/20200101090000-JPL-L4_GHRSST-SSTfnd-MUR25-GLOB-v02.0-fv04.2.nc') + assert exists('./MUR25-JPL-L4-GLOB-v04.2/2020/01/02/20200102090000-JPL-L4_GHRSST-SSTfnd-MUR25-GLOB-v02.0-fv04.2.nc') + assert exists('./MUR25-JPL-L4-GLOB-v04.2/.update__MUR25-JPL-L4-GLOB-v04.2') + assert t1 == os.path.getmtime('./MUR25-JPL-L4-GLOB-v04.2/2020/01/01/20200101090000-JPL-L4_GHRSST-SSTfnd-MUR25-GLOB-v02.0-fv04.2.nc') + assert t2 == os.path.getmtime('./MUR25-JPL-L4-GLOB-v04.2/2020/01/02/20200102090000-JPL-L4_GHRSST-SSTfnd-MUR25-GLOB-v02.0-fv04.2.nc') + shutil.rmtree('./MUR25-JPL-L4-GLOB-v04.2')