Skip to content

Commit

Permalink
Merge pull request #9 from ghrcdaac/bugfix/add_files_under_id
Browse files Browse the repository at this point in the history
Adding files under the provided id
  • Loading branch information
amarouane-ABDELHAK authored Feb 3, 2021
2 parents b7194a6 + 8087048 commit 32c24c3
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 52 deletions.
89 changes: 44 additions & 45 deletions dmrpp_generator/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from cumulus_process import Process, s3
import os
from re import match, search
import copy


class DMRPPGenerator(Process):
Expand All @@ -23,6 +24,7 @@ def input_keys(self):
'input_files': f"{self.processing_regex}(\\.cmr\\.xml|\\.json)?$"
}


def get_bucket(self, filename, files, buckets):
"""
Extract the bucket from the files
Expand All @@ -49,45 +51,42 @@ def upload_file(self, filename):
except Exception as e:
self.logger.error("Error uploading file %s: %s" % (os.path.basename(os.path.basename(filename)), str(e)))



def process(self):
"""
Override the processing wrapper
:return:
"""
input_files = self.fetch('input_files')
self.output = self.dmrpp_generate(input_files)
uploaded_files = self.upload_output_files()
collection = self.config.get('collection')
buckets = self.config.get('buckets')
files_sizes = {}
for output_file_path in self.output:
files_sizes[os.path.basename(output_file_path)] = os.path.getsize(output_file_path)
# Cleanup the space
os.remove(output_file_path)

granule_data = {}
for uploaded_file in uploaded_files:
if uploaded_file is None or not uploaded_file.startswith('s3'):
continue
filename = uploaded_file.split('/')[-1]
potential_extensions = f"({self.processing_regex})(\\.cmr.xml|\\.json.xml|\\.dmrpp)?"
granule_id = match(potential_extensions, filename).group(1) if match(potential_extensions, filename) else filename
if granule_id not in granule_data.keys():
granule_data[granule_id] = {'granuleId': granule_id, 'files': []}
granule_data[granule_id]['files'].append(
{
"path": self.config.get('fileStagingDir'),
"url_path": self.config.get('fileStagingDir'),
"bucket": self.get_bucket(filename, collection.get('files', []),
buckets)['name'],
"filename": uploaded_file,
"name": filename,
"size": files_sizes.get(filename, 0)
}
)

final_output = list(granule_data.values())
return {"granules": final_output, "input": uploaded_files}
granules = self.input['granules']
append_output = {}
for granule in granules:
granule_id = granule['granuleId']
for file_ in granule['files']:
output_file_path = self.dmrpp_generate(file_['filename'])
if output_file_path:
s3_path = output_file_path.get('s3_path')
file_local_path = output_file_path.get('file_local_path')
append_output[granule_id] = append_output.get(granule_id, {'files': []})
append_output[granule_id]['files'].append(
{
"bucket": self.get_bucket(file_['filename'], collection.get('files', []),buckets)['name'],
"filename": s3_path,
"name": os.path.basename(file_local_path),
"size": os.path.getsize(file_local_path),
"path": self.config.get('fileStagingDir'),
"url_path": self.config.get('fileStagingDir')
}
)
for granule in granules:
granule_id = granule['granuleId']
if append_output.get(granule_id, False):
granule['files'] += append_output[granule_id]['files']

return self.input


def get_data_access(self, key, bucket_destination):
"""
Expand All @@ -96,23 +95,23 @@ def get_data_access(self, key, bucket_destination):
return: access URL
"""
key = key.split('/')[-1]
half_url = ("%s/%s/%s" % (bucket_destination, self.config['fileStagingDir'], key)).replace('//',
'/')
half_url = ("%s/%s/%s" % (bucket_destination, self.config['fileStagingDir'], key)).replace('//','/')
return "%s/%s"% (self.config.get('distribution_endpoint').rstrip('/'), half_url)

def dmrpp_generate(self, input_files):

def dmrpp_generate(self, input_file):
"""
"""
outputs = []
for input_file in input_files:
if not match(f"{self.processing_regex}$", input_file):
outputs += [input_file]
continue
cmd = f"get_dmrpp -b {self.path} -o {input_file}.dmrpp {os.path.basename(input_file)}"
self.run_command(cmd)
outputs += [input_file, f"{input_file}.dmrpp"]
return outputs

if not match(f"{self.processing_regex}$", input_file):
return {}
try:
file_name = s3.download(input_file, path=self.path)
cmd = f"get_dmrpp -b {self.path} -o {file_name}.dmrpp {os.path.basename(file_name)}"
self.run_command(cmd)
return {'file_local_path': f"{file_name}.dmrpp", 's3_path': self.upload_file(f"{file_name}.dmrpp")}
except Exception as ex:
self.logger.error(f"DMRPP error {ex}")
return {}

if __name__ == "__main__":
DMRPPGenerator.cli()
71 changes: 64 additions & 7 deletions tests/test_generate_dmrpp.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,69 @@ class TestDMRPPFileGeneration(TestCase):
"""
Test generating dmrpp files.
"""
granule_id = "tpw_v07r01_201910"
granule_name = "tpw_v07r01_201910.nc"
fixture_path = os.path.join(os.path.dirname(__file__), "fixtures")
input_file = [f"{fixture_path}/{granule_name}"]
input_file = {
"granules": [
{
"granuleId": granule_id,
"dataType": "MODIS_A-JPL-L2P-v2019.0",
"sync_granule_duration": 3759,
"files": [
{
"bucket": "fake-cumulus-protected",
"path": "fakepath/2020/001",
"filename": f"{fixture_path}/{granule_name}",
"size": 18232098,
"name": granule_name,
"checksumType": "md5",
"checksum": "aa5204f125ae83847b3b80fa2e571b00",
"type": "data",
"url_path": "",
"filepath": granule_name,
"duplicate_found": True
},
{
"bucket": "fake-cumulus-public",
"path": "fakepath/2020/001",
"filename": f"s3://fake-cumulus-public/{granule_name}.md5",
"size": 98,
"name": f"{granule_name}.md5",
"type": "metadata",
"url_path": "",
"filepath": f"{granule_name}.md5",
"duplicate_found": True
},
{
"bucket": "fake-cumulus-public",
"filename": f"s3://fake-cumulus-public/{granule_name}.cmr.json",
"fileSize": 1381,
"name": f"{granule_name}.cmr.json",
"type": "metadata",
"url_path": "",
"filepath": f"{granule_name}.cmr.json"
}
],
"version": "2019.0"
}
]
}

payload_file = f"{fixture_path}/payload.json"
with open(payload_file) as f:
payload = json.load(f)

process_instance = DMRPPGenerator(input = input_file, config=payload['config'], path=fixture_path)
process_instance.path = fixture_path

@patch('cumulus_process.Process.upload_output_files',
return_value=[f's3://{granule_name}',
f's3://{granule_name}.dmrpp'])
@patch('dmrpp_generator.main.DMRPPGenerator.upload_file',
return_value={granule_id:f's3://{granule_name}.dmrpp'})
@patch('cumulus_process.Process.fetch_all',
return_value={'input_key': [os.path.join(os.path.dirname(__file__), f"fixtures/{granule_name}")]})
@patch('os.remove', return_value=granule_name)
def test_1_check_generate_dmrpp(self, mock_upload, mock_fetch, mock_remove):
@patch('cumulus_process.s3.download', return_value=f"{process_instance.path}/{granule_name}")
def test_1_check_generate_dmrpp(self, mock_upload, mock_fetch, mock_remove, mock_download):
"""
Testing get correct start date
:return:
Expand All @@ -38,8 +85,18 @@ def test_1_check_generate_dmrpp(self, mock_upload, mock_fetch, mock_remove):

def test_2_check_output(self):
"""
Test the putput schema of the processing
Test the putput schema of the processnig
:return:
"""
print(StorageValues.processing_output)
self.assertListEqual(['granules', 'input'], list(StorageValues.processing_output.keys()))
self.assertListEqual(['granules'], list(StorageValues.processing_output.keys()))

def test_3_checkout_dmrpp_output(self):

dmrpp_file = f"{self.granule_name}.dmrpp"
dmrpp_exists = False
for granules in StorageValues.processing_output.get('granules'):
for file in granules.get('files'):
if file["name"] == dmrpp_file:
dmrpp_exists = True
self.assertEqual(True, dmrpp_exists)

0 comments on commit 32c24c3

Please sign in to comment.