Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jpuerto/example8 dag #841

Open
wants to merge 43 commits into
base: devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
12b3187
DAGs: Add DAG for generate_today_json
Sep 11, 2023
78ac2c8
DAGs: Add add_path class
Sep 11, 2023
d53f0d9
DAGs: Import airflow_conf
Sep 11, 2023
ffc8229
DAGs: Add kwargs back
Sep 11, 2023
b73509f
DAGs: Add kwargs back
Sep 11, 2023
9a66f01
DAGs: Add kwargs back
Sep 11, 2023
6adc929
DAGs: Add kwargs back
Sep 11, 2023
a325ed5
DAGs: Add kwargs back
Sep 11, 2023
d426b32
DAGs: Add kwargs back
Sep 11, 2023
443756f
Submodule: Update py-hubmapbags
Sep 11, 2023
ff564af
Submodules: Remove non-existent import
Sep 11, 2023
377fd53
Submodules: Remove non-existent import
Sep 11, 2023
6d51a94
Submodules: Remove non-existent import
Sep 11, 2023
4c346eb
Submodules: Remove non-existent import
Sep 11, 2023
342048a
DAGs: Change start_date to today
Sep 11, 2023
7d3d67d
DAGs: Change start_date to today
Sep 11, 2023
7ff8065
DAGs: Print ('finished') for testing
Sep 11, 2023
2239fad
DAGs: Print working directory
Sep 11, 2023
366a7d6
DAGs: Remove working directory print
Sep 11, 2023
1327aaf
Update IVT to latest commit for validation release
Dec 15, 2023
f60dddb
DAGs: Update python_callable
Jan 12, 2024
6cd7e5b
DAGs: Update dag_ids
Jan 12, 2024
447c672
DAGs: Update default_args
Jan 12, 2024
cf60624
Utils: Try going around the add_task function
Jan 12, 2024
577b80a
DAGs: Modify conditional to support datasets with no matching rules
Jan 16, 2024
c9fa06d
DAGs: Bring changes from env into repo
Jan 16, 2024
756c6be
DAGs: Clear out processed_dataset_metadata DAG to avoid confusion
Jan 16, 2024
3e6774d
Utils: Update the return type to be Optional
Jan 17, 2024
eda2bfd
DAG: Update the get_dataset_state invocation to just pass the lambda
Jan 17, 2024
2c9f261
DAG: Print when we don't get any soft_data back for a dataset
Jan 17, 2024
9508f7a
Merge pull request #821 from hubmapconsortium/NIHDEV-465-Update-rebui…
jpuerto-psc Jan 19, 2024
04bfd64
Merge branch 'master' of https://github.com/hubmapconsortium/ingest-p…
Jan 19, 2024
699af50
Utils: Update dataset_type_callable description
Jan 19, 2024
499161f
Merge pull request #830 from hubmapconsortium/sunset666/bumps
jpuerto-psc Jan 19, 2024
2429e17
Update to portal-container-anndata-to-ui:0.0.4
lchoy Jan 19, 2024
e65f445
Merge pull request #832 from hubmapconsortium/lchoy/update-portal-con…
sunset666 Jan 19, 2024
f1e4797
Revert "Update portal containers submodule"
sunset666 Jan 19, 2024
e29a898
Merge pull request #833 from hubmapconsortium/revert-832-lchoy/update…
sunset666 Jan 19, 2024
c6b6bad
Bump portal-containers
sunset666 Jan 19, 2024
7110eac
Merge remote-tracking branch 'origin/master'
sunset666 Feb 1, 2024
a952f69
Merge branch 'master' of https://github.com/hubmapconsortium/ingest-p…
Feb 6, 2024
9a19d15
Update hubmapbags
Feb 6, 2024
f9a8ada
General: Fix issues
Feb 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions src/ingest-pipeline/airflow/dags/generate_bdbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@
hubmapbags_id_namespace as id_namespace,
hubmapbags_biosample_in_collection as biosample_in_collection,
hubmapbags_file_in_collection as file_in_collection,
hubmapbags_primary_dcc_contact as primary_dcc_contact,
hubmapbags_biosample as biosample,
hubmapbags_projects as projects,
hubmapbags_collection as collection,
hubmapbags_anatomy as anatomy,
hubmapbags_file as files,
Expand Down Expand Up @@ -284,9 +282,6 @@ def generate_bdbag(**kwargs):
print('Making biosample_in_collection.tsv')
biosample_in_collection.create_manifest(biosample_id, hubmap_id, output_directory)

print('Making project.tsv')
projects.create_manifest(data_provider, output_directory)

print('Making project_in_project.tsv')
project_in_project.create_manifest(data_provider, output_directory)

Expand All @@ -305,9 +300,6 @@ def generate_bdbag(**kwargs):
print('Making file_describes_collection.tsv')
file_describes_collection.create_manifest(hubmap_id, data_directory, output_directory)

print('Making dcc.tsv')
primary_dcc_contact.create_manifest(output_directory)

print('Making id_namespace.tsv')
id_namespace.create_manifest(output_directory)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import os
import sys
from datetime import datetime, timedelta

from airflow.operators.python import PythonOperator
from airflow.configuration import conf as airflow_conf

from utils import (
HMDAG,
get_queue_resource,
get_auth_tok,
encrypt_tok,
)

class add_path:
"""
Add an element to sys.path using a context.
Thanks to Eugene Yarmash https://stackoverflow.com/a/39855753
"""

def __init__(self, path):
self.path = path

def __enter__(self):
sys.path.insert(0, self.path)

def __exit__(self, exc_type, exc_value, traceback):
try:
sys.path.remove(self.path)
except ValueError:
pass

with add_path(airflow_conf.as_dict()["connections"]["SRC_PATH"].strip("'").strip('"')):
from submodules import hubmapbags

# Following are defaults which can be overridden later on
default_args = {
'owner': 'hubmap',
'depends_on_past': False,
'start_date': datetime.now(),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
'xcom_push': True,
'queue': get_queue_resource('generate_today_json'),
}


with HMDAG('generate_today_json',
schedule_interval="0 0 * * *",
is_paused_upon_creation=False,
default_args=default_args,
user_defined_macros={
}) as dag:

def generate_report(**kwargs):
token = get_auth_tok(**kwargs)
hubmapbags.utilities.clean()
hubmapbags.reports.daily(token)
print('Finished')

t_generate_report = PythonOperator(
task_id='generate_report',
python_callable=generate_report,
provide_context=True,
op_kwargs={
"crypt_auth_tok": (
encrypt_tok(airflow_conf.as_dict()["connections"]["APP_CLIENT_SECRET"]).decode()
),
},
)

t_generate_report
78 changes: 33 additions & 45 deletions src/ingest-pipeline/airflow/dags/rebuild_multiple_metadata.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,18 @@
import os
import yaml
import utils
from pprint import pprint

from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.exceptions import AirflowException
from airflow.configuration import conf as airflow_conf
from datetime import datetime, timedelta
from datetime import datetime
from airflow import DAG

from utils import (
HMDAG,
get_queue_resource,
get_preserve_scratch_resource,
get_soft_data,
create_dataset_state_error_callback,
pythonop_md_consistency_tests,
make_send_status_msg_function,
get_tmp_dir_path,
localized_assert_json_matches_schema as assert_json_matches_schema,
pythonop_get_dataset_state,
encrypt_tok,
)

from hubmap_operators.common_operators import (
CreateTmpDirOperator,
CleanupTmpDirOperator,
pythonop_get_dataset_state,
)


Expand All @@ -37,30 +23,11 @@ def get_uuid_for_error(**kwargs):
return None


def get_dataset_uuid(**kwargs):
return kwargs['dag_run'].conf['uuid']


def get_dataset_lz_path(**kwargs):
ctx = kwargs['dag_run'].conf
return ctx['lz_path']


default_args = {
'owner': 'hubmap',
'depends_on_past': False,
'start_date': datetime(2019, 1, 1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
'xcom_push': True,
'queue': get_queue_resource('rebuild_metadata'),
'on_failure_callback': create_dataset_state_error_callback(get_uuid_for_error)
}

with HMDAG('rebuild_multiple_metadata',
with DAG('rebuild_multiple_metadata',
schedule_interval=None,
is_paused_upon_creation=False,
default_args=default_args,
Expand All @@ -77,15 +44,30 @@ def build_dataset_lists(**kwargs):
pprint(kwargs['dag_run'].conf)
for uuid in kwargs['dag_run'].conf['uuids']:
soft_data = get_soft_data(uuid, **kwargs)
if soft_data.get('primary'):
kwargs['dag_run'].conf['primary_datasets'].append(uuid)

# If we got nothing back from soft_data, then let's try to determine using entity_api
if soft_data:
if soft_data.get('primary'):
kwargs['dag_run'].conf['primary_datasets'].append(uuid)
else:
kwargs['dag_run'].conf['processed_datasets'].append(uuid)
else:
kwargs['dag_run'].conf['processed_datasets'].append(uuid)
print(f'No matching soft data returned for {uuid}')
ds_rslt = pythonop_get_dataset_state(dataset_uuid_callable=lambda **kwargs: uuid, **kwargs)
if ds_rslt.get("dataset_info"):
# dataset_info should only be populated for processed_datasets
print(ds_rslt.get("dataset_info"))
kwargs['dag_run'].conf['processed_datasets'].append(uuid)
else:
kwargs['dag_run'].conf['primary_datasets'].append(uuid)



t_build_dataset_lists = PythonOperator(
task_id='build_dataset_lists',
python_callable=build_dataset_lists,
provide_context=True,
queue= get_queue_resource('rebuild_metadata'),
op_kwargs={
'crypt_auth_tok': encrypt_tok(airflow_conf.as_dict()
['connections']['APP_CLIENT_SECRET']).decode(),
Expand All @@ -97,7 +79,8 @@ def get_primary_dataset_uuids(**kwargs):

t_get_primary_dataset_uuids = PythonOperator(
task_id='get_primary_dataset_uuids',
python_callable=get_primary_dataset_uuids(),
python_callable=get_primary_dataset_uuids,
queue=get_queue_resource('rebuild_metadata'),
provide_context=True
)

Expand All @@ -106,22 +89,27 @@ def get_processed_dataset_uuids(**kwargs):

t_get_processed_dataset_uuids = PythonOperator(
task_id='get_processed_dataset_uuids',
python_callable=get_processed_dataset_uuids(),
python_callable=get_processed_dataset_uuids,
queue=get_queue_resource('rebuild_metadata'),
provide_context=True
)

t_launch_rebuild_primary_dataset_metadata = TriggerDagRunOperator().partial(
t_launch_rebuild_primary_dataset_metadata = TriggerDagRunOperator.partial(
task_id="trigger_rebuild_primary_dataset_metadata",
trigger_dag_id="rebuild_primary_dataset_metadata",
queue=get_queue_resource('rebuild_metadata'),
).expand(
conf=t_get_primary_dataset_uuids.output
)

t_launch_rebuild_processed_dataset_metadata = TriggerDagRunOperator().partial(
t_launch_rebuild_processed_dataset_metadata = TriggerDagRunOperator.partial(
task_id="trigger_rebuild_processed_dataset_metadata",
trigger_dag_id="rebuild_processed_dataset_metadata",
queue=get_queue_resource('rebuild_metadata'),
).expand(
conf=t_get_processed_dataset_uuids.output
)

t_build_dataset_lists >> t_get_primary_dataset_uuids >> t_get_processed_dataset_uuids >> t_launch_rebuild_primary_dataset_metadata >> t_launch_rebuild_processed_dataset_metadata
t_build_dataset_lists >> [t_get_primary_dataset_uuids, t_get_processed_dataset_uuids]
t_get_primary_dataset_uuids >> t_launch_rebuild_primary_dataset_metadata
t_get_processed_dataset_uuids >> t_launch_rebuild_processed_dataset_metadata
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def get_dataset_lz_path(**kwargs):
'on_failure_callback': create_dataset_state_error_callback(get_uuid_for_error)
}

with HMDAG('rebuild_metadata',
with HMDAG('rebuild_primary_dataset_metadata',
schedule_interval=None,
is_paused_upon_creation=False,
default_args=default_args,
Expand Down
Loading