Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate swodlr-common-py and fix issues with available tiles insertion #13

Merged
merged 5 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions podaac/swodlr_ingest_to_sds/bootstrap.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
'''Lambda to bootstrap step function execution'''
import json
import logging
import boto3
from podaac.swodlr_ingest_to_sds.utils import get_param
from podaac.swodlr_ingest_to_sds.utilities import utils

stepfunctions = boto3.client('stepfunctions')
ingest_sf_arn = get_param('stepfunction_arn')
ingest_sf_arn = utils.get_param('stepfunction_arn')
logger = utils.get_logger(__name__)


def lambda_handler(event, _context):
Expand All @@ -16,4 +16,4 @@ def lambda_handler(event, _context):
stateMachineArn=ingest_sf_arn,
input=sf_input
)
logging.info('Started step function execution: %s', result['executionArn'])
logger.info('Started step function execution: %s', result['executionArn'])
50 changes: 27 additions & 23 deletions podaac/swodlr_ingest_to_sds/poll_status.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
'''Lambda to poll SDS for job status and update DynamoDB'''
from datetime import datetime
from copy import deepcopy
import logging
import re
from podaac.swodlr_ingest_to_sds.utils import (
mozart_client, ingest_table, available_tiles_table
)
from podaac.swodlr_common import sds_statuses
from podaac.swodlr_ingest_to_sds.utilities import utils


SUCCESS_STATUSES = {'job-completed'}
FAIL_STATUSES = {'job-failed', 'job-offline', 'job-deduped'}
PRODUCT_REGEX = re.compile(
r'_(?P<product>PIXC(Vec)?)_(?P<cycle>\d{3})_(?P<pass>\d{3})_(?P<tile>\d{3})(?P<direction>(R|L))_' # pylint: disable=line-too-long # noqa: E501
)

logger = utils.get_logger(__name__)


def lambda_handler(event, _context):
'''
Expand All @@ -27,12 +25,12 @@ def lambda_handler(event, _context):
job_id = item['job_id']

try:
job = mozart_client.get_job_by_id(job_id)
job = utils.mozart_client.get_job_by_id(job_id)
info = job.get_info()
status = info['status']
timestamp = datetime.now().isoformat()
logging.debug('granule id: %s; job id: %s; status: %s',
granule_id, job_id, status)
logger.debug('granule id: %s; job id: %s; status: %s',
granule_id, job_id, status)

update_expression = (
'SET #status = :status'
Expand All @@ -52,48 +50,54 @@ def lambda_handler(event, _context):
expression_attribute_names['#traceback'] = 'traceback'
expression_attribute_values[':traceback'] = info['traceback']

ingest_table.update_item(
utils.ingest_table.update_item(
Key={'granule_id': granule_id},
UpdateExpression=update_expression,
ExpressionAttributeNames=expression_attribute_names,
ExpressionAttributeValues=expression_attribute_values
)

if status in FAIL_STATUSES:
logging.error('Job id: %s; status: %s', job_id, status)
if status in sds_statuses.FAIL:
logger.error('Job id: %s; status: %s', job_id, status)
if 'traceback' in info:
logging.error(
logger.error(
'Job id: %s; traceback: %s', job_id, info['traceback']
)

new_event['jobs'].remove(item)
elif status in SUCCESS_STATUSES:
logging.info('Job id: %s; status: %s', job_id, status)
elif status in sds_statuses.SUCCESS:
logger.info('Job id: %s; status: %s', job_id, status)

# Insert into available tiles table
cpt = _extract_cpt(granule_id)
if cpt is not None:
if cpt is None:
logger.error(
'CPT not found: granule_id=%s, job_id=%s',
granule_id, job_id
)
else:
tile_id = f'{cpt["product"]},{cpt["cycle"]},{cpt["pass"]},{cpt["tile"]}' # pylint: disable=line-too-long # noqa: E501
available_tiles_table.put_item(
Item={'tile_id': {'S': tile_id}}
utils.available_tiles_table.put_item(
Item={'tile_id': tile_id}
)

new_event['jobs'].remove(item) # Remove from queue
# Otello raises very generic exceptions
except Exception: # pylint: disable=broad-except
logging.exception('Failed to get status: %s', job_id)
logger.exception('Failed to get status: %s', job_id)

return new_event


def _extract_cpt(granule_id):
parsed_id = PRODUCT_REGEX.match(granule_id)
parsed_id = PRODUCT_REGEX.search(granule_id)
if parsed_id is None:
return None

return {
'product': parsed_id.group('product'),
'cycle': int(parsed_id.group('cycle')),
'pass': int(parsed_id.group('pass')),
'tile': int(parsed_id.group('tile')) + parsed_id.group('direction')
'cycle': str(int(parsed_id.group('cycle'))),
'pass': str(int(parsed_id.group('pass'))),
'tile': str(int(parsed_id.group('tile')))
+ parsed_id.group('direction')
}
40 changes: 20 additions & 20 deletions podaac/swodlr_ingest_to_sds/submit_to_sds.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,25 @@
'''Lambda to submit granules to the SDS for ingestion'''
from datetime import datetime
import json
import logging
from pathlib import PurePath
from urllib.parse import urlsplit, urlunsplit
import boto3
from podaac.swodlr_common import sds_statuses
from podaac.swodlr_ingest_to_sds.errors import DataNotFoundError
from podaac.swodlr_ingest_to_sds.utils import (
mozart_client, get_param, ingest_table
)
from podaac.swodlr_ingest_to_sds.utilities import utils

ACCEPTED_EXTS = ['nc']
INGEST_QUEUE_URL = get_param('ingest_queue_url')
INGEST_TABLE_NAME = get_param('ingest_table_name')
PCM_RELEASE_TAG = get_param('sds_pcm_release_tag')
INGEST_QUEUE_URL = utils.get_param('ingest_queue_url')
INGEST_TABLE_NAME = utils.get_param('ingest_table_name')
PCM_RELEASE_TAG = utils.get_param('sds_pcm_release_tag')

dynamodb = boto3.client('dynamodb')
sqs = boto3.client('sqs')

ingest_job_type = mozart_client.get_job_type(
logger = utils.get_logger(__name__)
ingest_job_type = utils.mozart_client.get_job_type(
f'job-INGEST_STAGED:{PCM_RELEASE_TAG}'
)

ingest_job_type.initialize()


Expand All @@ -31,35 +29,37 @@ def lambda_handler(event, _context):
not already ingested and inserts the granule and job info into DynamoDB
'''

logging.debug('Records received: %d', len(event['Records']))
logger.debug('Records received: %d', len(event['Records']))

granules = {}
for record in event['Records']:
try:
granule = _parse_record(record)
granules[granule['id']] = granule
except (DataNotFoundError, json.JSONDecodeError):
logging.exception('Failed to parse record')
logger.exception('Failed to parse record')

lookup_results = dynamodb.batch_get_item(
RequestItems={
INGEST_TABLE_NAME: {
'Keys': [{'granule_id': {'S': granule['id']}}
for granule in granules.values()],
'ProjectionExpression': 'granule_id'
'ProjectionExpression': 'granule_id, #status',
'ExpressionAttributeNames': {'#status': 'status'}
}
},
ReturnConsumedCapacity='NONE'
)

for item in lookup_results['Responses'][INGEST_TABLE_NAME]:
granule_id = item['granule_id']['S']
if granule_id in granules:
logging.info('Granule already ingested: %s', granule_id)
status = item['status']['S']
if granule_id in granules and status in sds_statuses.SUCCESS:
logger.info('Granule already ingested: %s', granule_id)
del granules[granule_id]

jobs = []
with ingest_table.batch_writer() as batch:
with utils.ingest_table.batch_writer() as batch:
for granule in granules.values():
try:
job = _ingest_granule(granule)
Expand All @@ -79,7 +79,7 @@ def lambda_handler(event, _context):
)
# Otello throws generic Exceptions
except Exception: # pylint: disable=broad-exception-caught
logging.exception('Failed to ingest granule')
logger.exception('Failed to ingest granule')

return {'jobs': jobs}

Expand All @@ -100,15 +100,15 @@ def _ingest_granule(granule):
filename = granule['filename']
s3_url = granule['s3_url']

logging.debug('Ingesting granule id: %s', granule['id'])
logger.debug('Ingesting granule id: %s', granule['id'])

job_params = _gen_mozart_job_params(filename, s3_url)
tag = f'ingest_file_otello__{filename}'

ingest_job_type.set_input_params(job_params)
job = ingest_job_type.submit_job(tag=tag)
timestamp = datetime.now().isoformat()
logging.info('Submitted to sds: %s', granule['id'])
logger.info('Submitted to sds: %s', granule['id'])

return {
'job_id': job.job_id,
Expand Down Expand Up @@ -136,11 +136,11 @@ def _extract_s3_url(cnm_r_message, strict=True):
)

s3_url = urlunsplit(s3_elements)
logging.debug("S3 url: %s", s3_elements)
logger.debug("S3 url: %s", s3_elements)

return (file['name'], s3_url)

logging.debug('Rejected file: %s', file['name'])
logger.debug('Rejected file: %s', file['name'])

if strict:
# Rerun without strict mode
Expand Down
64 changes: 64 additions & 0 deletions podaac/swodlr_ingest_to_sds/utilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
'''Shared utilities for ingest-to-sds lambdas'''
import boto3
from otello.mozart import Mozart

from podaac.swodlr_common.utilities import BaseUtilities


class Utilities(BaseUtilities):
'''Utility functions implemented as a singleton'''
APP_NAME = 'swodlr'
SERVICE_NAME = 'ingest-to-sds'

def __init__(self):
super().__init__(Utilities.APP_NAME, Utilities.SERVICE_NAME)

@property
def mozart_client(self):
'''
Lazily creates a Mozart client
'''
if not hasattr(self, '_mozart_client'):
host = self.get_param('sds_host')
username = self.get_param('sds_username')
cfg = {
'host': host,
'auth': True,
'username': username
}

# pylint: disable=attribute-defined-outside-init
self._mozart_client = Mozart(cfg, session=self._get_sds_session())

return self._mozart_client

@property
def ingest_table(self):
'''
Lazily creates a DynamoDB table resource
'''
if not hasattr(self, '_ingest_table'):
dynamodb = boto3.resource('dynamodb')
# pylint: disable=attribute-defined-outside-init
self._ingest_table = dynamodb.Table(
self.get_param('ingest_table_name')
)

return self._ingest_table

@property
def available_tiles_table(self):
'''
Lazily creates a DynamoDB table resource
'''
if not hasattr(self, '_available_tiles_table'):
dynamodb = boto3.resource('dynamodb')
# pylint: disable=attribute-defined-outside-init
self._available_tiles_table = dynamodb.Table(
self.get_param('available_tiles_table_name')
)

return self._available_tiles_table


utils = Utilities()
Loading