Skip to content

Commit

Permalink
Integrate swodlr-common-py and fix issues with available tiles insert…
Browse files Browse the repository at this point in the history
…ion (#13)

* Integrate swodlr-common to fix logging, refactor redund
ant code + fix available tiles insertion

* Update terraform to correct misspelling

* Update tests to new utilities base class

* Linting

* Fix bootstrap test
  • Loading branch information
joshgarde committed Dec 4, 2023
1 parent 6e73993 commit 6aa890b
Show file tree
Hide file tree
Showing 13 changed files with 929 additions and 934 deletions.
8 changes: 4 additions & 4 deletions podaac/swodlr_ingest_to_sds/bootstrap.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
'''Lambda to bootstrap step function execution'''
import json
import logging
import boto3
from podaac.swodlr_ingest_to_sds.utils import get_param
from podaac.swodlr_ingest_to_sds.utilities import utils

stepfunctions = boto3.client('stepfunctions')
ingest_sf_arn = get_param('stepfunction_arn')
ingest_sf_arn = utils.get_param('stepfunction_arn')
logger = utils.get_logger(__name__)


def lambda_handler(event, _context):
Expand All @@ -16,4 +16,4 @@ def lambda_handler(event, _context):
stateMachineArn=ingest_sf_arn,
input=sf_input
)
logging.info('Started step function execution: %s', result['executionArn'])
logger.info('Started step function execution: %s', result['executionArn'])
50 changes: 27 additions & 23 deletions podaac/swodlr_ingest_to_sds/poll_status.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
'''Lambda to poll SDS for job status and update DynamoDB'''
from datetime import datetime
from copy import deepcopy
import logging
import re
from podaac.swodlr_ingest_to_sds.utils import (
mozart_client, ingest_table, available_tiles_table
)
from podaac.swodlr_common import sds_statuses
from podaac.swodlr_ingest_to_sds.utilities import utils


SUCCESS_STATUSES = {'job-completed'}
FAIL_STATUSES = {'job-failed', 'job-offline', 'job-deduped'}
PRODUCT_REGEX = re.compile(
r'_(?P<product>PIXC(Vec)?)_(?P<cycle>\d{3})_(?P<pass>\d{3})_(?P<tile>\d{3})(?P<direction>(R|L))_' # pylint: disable=line-too-long # noqa: E501
)

logger = utils.get_logger(__name__)


def lambda_handler(event, _context):
'''
Expand All @@ -27,12 +25,12 @@ def lambda_handler(event, _context):
job_id = item['job_id']

try:
job = mozart_client.get_job_by_id(job_id)
job = utils.mozart_client.get_job_by_id(job_id)
info = job.get_info()
status = info['status']
timestamp = datetime.now().isoformat()
logging.debug('granule id: %s; job id: %s; status: %s',
granule_id, job_id, status)
logger.debug('granule id: %s; job id: %s; status: %s',
granule_id, job_id, status)

update_expression = (
'SET #status = :status'
Expand All @@ -52,48 +50,54 @@ def lambda_handler(event, _context):
expression_attribute_names['#traceback'] = 'traceback'
expression_attribute_values[':traceback'] = info['traceback']

ingest_table.update_item(
utils.ingest_table.update_item(
Key={'granule_id': granule_id},
UpdateExpression=update_expression,
ExpressionAttributeNames=expression_attribute_names,
ExpressionAttributeValues=expression_attribute_values
)

if status in FAIL_STATUSES:
logging.error('Job id: %s; status: %s', job_id, status)
if status in sds_statuses.FAIL:
logger.error('Job id: %s; status: %s', job_id, status)
if 'traceback' in info:
logging.error(
logger.error(
'Job id: %s; traceback: %s', job_id, info['traceback']
)

new_event['jobs'].remove(item)
elif status in SUCCESS_STATUSES:
logging.info('Job id: %s; status: %s', job_id, status)
elif status in sds_statuses.SUCCESS:
logger.info('Job id: %s; status: %s', job_id, status)

# Insert into available tiles table
cpt = _extract_cpt(granule_id)
if cpt is not None:
if cpt is None:
logger.error(
'CPT not found: granule_id=%s, job_id=%s',
granule_id, job_id
)
else:
tile_id = f'{cpt["product"]},{cpt["cycle"]},{cpt["pass"]},{cpt["tile"]}' # pylint: disable=line-too-long # noqa: E501
available_tiles_table.put_item(
Item={'tile_id': {'S': tile_id}}
utils.available_tiles_table.put_item(
Item={'tile_id': tile_id}
)

new_event['jobs'].remove(item) # Remove from queue
# Otello raises very generic exceptions
except Exception: # pylint: disable=broad-except
logging.exception('Failed to get status: %s', job_id)
logger.exception('Failed to get status: %s', job_id)

return new_event


def _extract_cpt(granule_id):
parsed_id = PRODUCT_REGEX.match(granule_id)
parsed_id = PRODUCT_REGEX.search(granule_id)
if parsed_id is None:
return None

return {
'product': parsed_id.group('product'),
'cycle': int(parsed_id.group('cycle')),
'pass': int(parsed_id.group('pass')),
'tile': int(parsed_id.group('tile')) + parsed_id.group('direction')
'cycle': str(int(parsed_id.group('cycle'))),
'pass': str(int(parsed_id.group('pass'))),
'tile': str(int(parsed_id.group('tile')))
+ parsed_id.group('direction')
}
40 changes: 20 additions & 20 deletions podaac/swodlr_ingest_to_sds/submit_to_sds.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,25 @@
'''Lambda to submit granules to the SDS for ingestion'''
from datetime import datetime
import json
import logging
from pathlib import PurePath
from urllib.parse import urlsplit, urlunsplit
import boto3
from podaac.swodlr_common import sds_statuses
from podaac.swodlr_ingest_to_sds.errors import DataNotFoundError
from podaac.swodlr_ingest_to_sds.utils import (
mozart_client, get_param, ingest_table
)
from podaac.swodlr_ingest_to_sds.utilities import utils

ACCEPTED_EXTS = ['nc']
INGEST_QUEUE_URL = get_param('ingest_queue_url')
INGEST_TABLE_NAME = get_param('ingest_table_name')
PCM_RELEASE_TAG = get_param('sds_pcm_release_tag')
INGEST_QUEUE_URL = utils.get_param('ingest_queue_url')
INGEST_TABLE_NAME = utils.get_param('ingest_table_name')
PCM_RELEASE_TAG = utils.get_param('sds_pcm_release_tag')

dynamodb = boto3.client('dynamodb')
sqs = boto3.client('sqs')

ingest_job_type = mozart_client.get_job_type(
logger = utils.get_logger(__name__)
ingest_job_type = utils.mozart_client.get_job_type(
f'job-INGEST_STAGED:{PCM_RELEASE_TAG}'
)

ingest_job_type.initialize()


Expand All @@ -31,35 +29,37 @@ def lambda_handler(event, _context):
not already ingested and inserts the granule and job info into DynamoDB
'''

logging.debug('Records received: %d', len(event['Records']))
logger.debug('Records received: %d', len(event['Records']))

granules = {}
for record in event['Records']:
try:
granule = _parse_record(record)
granules[granule['id']] = granule
except (DataNotFoundError, json.JSONDecodeError):
logging.exception('Failed to parse record')
logger.exception('Failed to parse record')

lookup_results = dynamodb.batch_get_item(
RequestItems={
INGEST_TABLE_NAME: {
'Keys': [{'granule_id': {'S': granule['id']}}
for granule in granules.values()],
'ProjectionExpression': 'granule_id'
'ProjectionExpression': 'granule_id, #status',
'ExpressionAttributeNames': {'#status': 'status'}
}
},
ReturnConsumedCapacity='NONE'
)

for item in lookup_results['Responses'][INGEST_TABLE_NAME]:
granule_id = item['granule_id']['S']
if granule_id in granules:
logging.info('Granule already ingested: %s', granule_id)
status = item['status']['S']
if granule_id in granules and status in sds_statuses.SUCCESS:
logger.info('Granule already ingested: %s', granule_id)
del granules[granule_id]

jobs = []
with ingest_table.batch_writer() as batch:
with utils.ingest_table.batch_writer() as batch:
for granule in granules.values():
try:
job = _ingest_granule(granule)
Expand All @@ -79,7 +79,7 @@ def lambda_handler(event, _context):
)
# Otello throws generic Exceptions
except Exception: # pylint: disable=broad-exception-caught
logging.exception('Failed to ingest granule')
logger.exception('Failed to ingest granule')

return {'jobs': jobs}

Expand All @@ -100,15 +100,15 @@ def _ingest_granule(granule):
filename = granule['filename']
s3_url = granule['s3_url']

logging.debug('Ingesting granule id: %s', granule['id'])
logger.debug('Ingesting granule id: %s', granule['id'])

job_params = _gen_mozart_job_params(filename, s3_url)
tag = f'ingest_file_otello__{filename}'

ingest_job_type.set_input_params(job_params)
job = ingest_job_type.submit_job(tag=tag)
timestamp = datetime.now().isoformat()
logging.info('Submitted to sds: %s', granule['id'])
logger.info('Submitted to sds: %s', granule['id'])

return {
'job_id': job.job_id,
Expand Down Expand Up @@ -136,11 +136,11 @@ def _extract_s3_url(cnm_r_message, strict=True):
)

s3_url = urlunsplit(s3_elements)
logging.debug("S3 url: %s", s3_elements)
logger.debug("S3 url: %s", s3_elements)

return (file['name'], s3_url)

logging.debug('Rejected file: %s', file['name'])
logger.debug('Rejected file: %s', file['name'])

if strict:
# Rerun without strict mode
Expand Down
64 changes: 64 additions & 0 deletions podaac/swodlr_ingest_to_sds/utilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
'''Shared utilities for ingest-to-sds lambdas'''
import boto3
from otello.mozart import Mozart

from podaac.swodlr_common.utilities import BaseUtilities


class Utilities(BaseUtilities):
'''Utility functions implemented as a singleton'''
APP_NAME = 'swodlr'
SERVICE_NAME = 'ingest-to-sds'

def __init__(self):
super().__init__(Utilities.APP_NAME, Utilities.SERVICE_NAME)

@property
def mozart_client(self):
'''
Lazily creates a Mozart client
'''
if not hasattr(self, '_mozart_client'):
host = self.get_param('sds_host')
username = self.get_param('sds_username')
cfg = {
'host': host,
'auth': True,
'username': username
}

# pylint: disable=attribute-defined-outside-init
self._mozart_client = Mozart(cfg, session=self._get_sds_session())

return self._mozart_client

@property
def ingest_table(self):
'''
Lazily creates a DynamoDB table resource
'''
if not hasattr(self, '_ingest_table'):
dynamodb = boto3.resource('dynamodb')
# pylint: disable=attribute-defined-outside-init
self._ingest_table = dynamodb.Table(
self.get_param('ingest_table_name')
)

return self._ingest_table

@property
def available_tiles_table(self):
'''
Lazily creates a DynamoDB table resource
'''
if not hasattr(self, '_available_tiles_table'):
dynamodb = boto3.resource('dynamodb')
# pylint: disable=attribute-defined-outside-init
self._available_tiles_table = dynamodb.Table(
self.get_param('available_tiles_table_name')
)

return self._available_tiles_table


utils = Utilities()
Loading

0 comments on commit 6aa890b

Please sign in to comment.