Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge Delete from BQ and Insert from GCS to BQ into a Single Atomic Operation for achieving Idempotency #421

Merged
merged 16 commits into from
Jul 12, 2024

Conversation

harsha-stellar-data
Copy link
Contributor

@harsha-stellar-data harsha-stellar-data commented Jul 8, 2024

PR Checklist
  • Created a Custom Python Task (build_del_ins_from_gcs_to_bq_task) to combine the Delete from BQ and Insert from GCS to BQ Tasks.
  • Updated the History Table Export and State table Export DAGs to handle variable assignments, parameter exchange, task lineup updates after the changes.
  • Updated the Airflow variables docs to add task_sla and task_timeout for the new task.
  • Updated the image of history_table_export and state table export DAGs to show the new task flow.
  • Updated the documentation page to refer to the new task.

PR Structure

  • This PR has reasonably narrow scope (if not, break it down into smaller PRs).
  • This PR avoids mixing refactoring changes with feature changes (split into two PRs
    otherwise).
  • This PR's title starts with the JIRA ticket associated with the PR.

Thoroughness

  • This PR adds tests for the most critical parts of the new functionality or fixes.
  • I've updated the README with the added features, breaking changes, new instructions on how to use the repository.

What

https://stellarorg.atlassian.net/browse/HUBBLE-398

Why

This PR Changes the way a DELETE and INSERT ops are handled in the history_table_export DAG. Existing method is causing duplicated data issues during restarts and this PR addresses the issue by combining both the operators into one atomic operation thus ensuring a clean up is always performed before an Insert is done to avoid potential duplication.

Known limitations

[TODO]
enriched_history_operations is still using build_delete_data_task as it is following a different task build_bq_insert_job for the Insert operation. We may have to review these and see if there could be an impact there during the operations being non-atomic

@harsha-stellar-data harsha-stellar-data requested a review from a team as a code owner July 8, 2024 17:02
@harsha-stellar-data harsha-stellar-data changed the title Test commit Merge Delete from BQ and Insert from GCS to BQ into a Single Atomic Operation for achieving Idempotency Jul 8, 2024
dags/history_tables_dag.py Outdated Show resolved Hide resolved
catchup=True,
description="This DAG exports trades and operations from the history archive using CaptiveCore. This supports parsing sponsorship and AMMs.",
description="This DAG exports trades, asstes, ledgers, operations, transactions, effects from the history archive using CaptiveCore. This supports parsing sponsorship and AMMs.",
Copy link
Contributor

@chowbao chowbao Jul 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
description="This DAG exports trades, asstes, ledgers, operations, transactions, effects from the history archive using CaptiveCore. This supports parsing sponsorship and AMMs.",
description="This DAG exports information for the trades, assets, ledgers, operations, transactions, and effects history tables.",

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI in the future you can just click Commit suggestion instead of copy and pasting them locally. It makes a new commit and all you'd have to do is do a git pull on your branch locally

README.md Outdated
@@ -668,6 +669,10 @@ This section contains information about the Airflow setup. It includes our DAG d

[This file](https://github.com/stellar/stellar-etl-airflow/blob/master/dags/stellar_etl_airflow/build_gcs_to_bq_task.py) contains methods for creating tasks that appends information from a Google Cloud Storage file to a BigQuery table. These tasks will create a new table if one does not exist. These tasks are used for history archive data structures, as Stellar wants to keep a complete record of the ledger's entire history.

### **build_del_ins_from_gcs_to_bq_task**

[This file](https://github.com/stellar/stellar-etl-airflow/blob/master/dags/stellar_etl_airflow/build_del_ins_from_gcs_to_bq_task.py) contains methods for deleting data from old partitions if the table exists and also import fresh data from gcs to the corresponding Big Query table. These tasks will create a new table if one does not exist. These tasks are used for history archive data structures, as Stellar wants to keep a complete record of the ledger's entire history.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
[This file](https://github.com/stellar/stellar-etl-airflow/blob/master/dags/stellar_etl_airflow/build_del_ins_from_gcs_to_bq_task.py) contains methods for deleting data from old partitions if the table exists and also import fresh data from gcs to the corresponding Big Query table. These tasks will create a new table if one does not exist. These tasks are used for history archive data structures, as Stellar wants to keep a complete record of the ledger's entire history.
[This file](https://github.com/stellar/stellar-etl-airflow/blob/master/dags/stellar_etl_airflow/build_del_ins_from_gcs_to_bq_task.py) contains methods for deleting data from a specified BigQuery table according to the batch interval and also imports data from gcs to the corresponding BigQuery table. These tasks will create a new table if one does not exist. These tasks are used for history and state data structures, as Stellar wants to keep a complete record of the ledger's entire history.

dags/history_tables_dag.py Outdated Show resolved Hide resolved
dags/history_tables_dag.py Outdated Show resolved Hide resolved
@harsha-stellar-data
Copy link
Contributor Author

@chowbao All the suggested changes are taken care. Please review and let me know if anything else needs to be looked at. Thanks

@@ -47,10 +57,14 @@
"subtract_data_interval": macros.subtract_data_interval,
"batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string,
},
# sla_miss_callback=alert_sla_miss,
sla_miss_callback=alert_sla_miss,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be left commented out. The sla alerts still don't work as intended and are too noisy right now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will comment it.

@@ -59,37 +73,7 @@
use_captive_core = literal_eval(Variable.get("use_captive_core"))
txmeta_datastore_path = "{{ var.value.txmeta_datastore_path }}"


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was the purpose of deleting and moving existing tasks around?

For example time_task = build_time_task(dag, use_testnet=use_testnet, use_futurenet=use_futurenet) was moved to line 188 and replaced the comment with just # Define time task

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will update the proper comments back. Took care of some sections but this and export task are missed when I merged the changes back from my test script to original.

Comment on lines +63 to +65
# Initialize batch metadata variables
batch_id = macros.get_batch_id()
batch_date = "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}"
Copy link
Contributor

@chowbao chowbao Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This could be moved to the function in https://github.com/stellar/stellar-etl-airflow/pull/421/files#diff-62ce6084a9fcd6873489fbc5700bff8413d481cb580f0fdd2d641b744c68b909

Not sure if macros will create a circular dependency. You might need to only import the macros. that are needed in each file

Copy link
Contributor Author

@harsha-stellar-data harsha-stellar-data Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

batch_id and batch_date are DAG level. So, i thought it would be best to leave them as params coming from a DAG rather than a reusable piece of code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's probably fine.

My nit comes from the perspective that batch_id and batch_date are only used in the new functions for the delete/insert task and the delete/insert task will always need batch_id and batch_date anyways so why bother passing them as a parameter

}


def create_del_ins_task(dag, task_id, task_vars, del_ins_callable):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: create_del_ins_task could probably be merged into initialize_task_vars and renamed.

Like right now all create_del_ins_task is doing is passing through variables. So in initialize_task_vars why return a dict of variables when instead you could pass the variables directly to the PythonOperator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented them as separate functions as we can use either of them separately if needed as well. For initialising in any other flows too if required.

where as the Del Ins task serves the specific purpose. So, separating them seems logical.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I agree but I think that's just an opinion.

If that's the case why even have create_del_ins_task? All it's doing is returning PythonOperator. You could just call PythonOperator with the same params

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. We can discuss in the office hours but the whole point of separating these out into a separate script is to provide a level of abstraction is my understanding. Combining them may be an okay idea for now as the code base is small but long term, i think we should keep them separated.

public_dataset,
)
del_ins_tasks[data_type] = create_del_ins_task(
dag, f"del_ins_{data_type}_task", task_vars, build_del_ins_from_gcs_to_bq_task
Copy link
Contributor

@chowbao chowbao Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The fstring for the task name can be done within initialize_task_vars

return PythonOperator(
task_id=task_id,
python_callable=del_ins_callable,
op_args=[
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this could have been op_kwargs instead of op_args

op_kwargs=task_vars,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will update to kwargs. I usyally prefer the params explicitly mentioned but in this case, since the task vars initialization is right above, use of kwargs should be clear.

Initialize task variables for data export and import.

Args:
data_type (str): Type of data (e.g., operations, trades).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
data_type (str): Type of data (e.g., operations, trades).
data_type (str): Type of data (e.g. operations, trades).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not get this suggestion. (eg.,) is the proper form, right?

Copy link
Contributor

@chowbao chowbao Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chowbao
chowbao previously approved these changes Jul 11, 2024
@chowbao chowbao self-requested a review July 11, 2024 03:02
@chowbao chowbao dismissed their stale review July 11, 2024 03:03

New code updates

…rences between history and state tables.

- Modified the resuable python functions to account for the naming convention chnages in schema files between history and state DAGs
- Updated the code to use table_id and table_name as parameters instead of data_type which is confusing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refresh dag image: I think it's stale

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will take care of this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refresh dag image: I think it's stale

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will take care of this


# filter for only the required tables pertaining to the DAG
table_id_and_suffixes = {
key: suffix
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I don't think this needs to create a new dict. source_object_suffix_mapping and table_id_and_suffixes are effectively the same thing I think.

Also this kind of fails silently if the key is not in table_names whereas I think right now if "accounts" wasn't defined in the airflow var for table_names it would give us an error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove this and add a check if the keys are missing in the table_names variable.

staging_table_suffix = "_staging"

if table_id in history_tables:
schema_fields = read_local_schema(f"history_{table_id}")
Copy link
Contributor

@chowbao chowbao Jul 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think this can be updated to use table_name
Also funnily enough, setting schema_fields can now be done outside the if/else logic

Suggested change
schema_fields = read_local_schema(f"history_{table_id}")
schema_fields = read_local_schema(f"{table_name}")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Moved it outside the conditional statements and defined it once outside.

@harsha-stellar-data harsha-stellar-data merged commit 1475901 into master Jul 12, 2024
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants