Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

airlfow helper does not clean pipeline working folder when DAG is created #1464

Open
rudolfix opened this issue Jun 14, 2024 · 0 comments · May be fixed by #1506
Open

airlfow helper does not clean pipeline working folder when DAG is created #1464

rudolfix opened this issue Jun 14, 2024 · 0 comments · May be fixed by #1506
Assignees
Labels
bug Something isn't working

Comments

@rudolfix
Copy link
Collaborator

dlt version

0.4.12

Describe the problem

dlt creates a new working folder in a /tmp for each (1) dag creation (2) each actual dag execution. in case of (2) we clean the working directory after run (see _run) method of the task group
in case of (1) we do not do it, ie. in this snippet

@dag(
    schedule_interval='@daily',
    start_date=pendulum.datetime(2023, 7, 1),
    catchup=False,
    max_active_runs=1,
    default_args=default_task_args
)
def load_data():
    # set `use_data_folder` to True to store temporary data on the `data` bucket. Use only when it does not fit on the local storage
    tasks = PipelineTasksGroup("pipeline_decomposed", use_data_folder=False, wipe_local_data=True)

    # import your source from pipeline script
    from github import github_repo_events

    source = github_repo_events("apache", "airflow")

    # modify the pipeline parameters 
    pipeline = dlt.pipeline(pipeline_name='pipeline_name',
                     dataset_name='dataset_name',
                     destination='duckdb',
                     full_refresh=False # must be false if we decompose
                     )
    # create the source, the "serialize" decompose option will converts dlt resources into Airflow tasks. use "none" to disable it
    tasks.add_run(pipeline, source, decompose="serialize", trigger_rule="all_done", retries=0, provide_context=True)

pipeline creates a temp working dir (just to create a DAG), the code ends and the working dir is left.

Expected behavior

dlt will clean the working folder from the DAG creation, observing the wipe_local_data=True of PipelineTasksGroup.
In order to fix it we could convert our task group into a context manager:

with PipelineTasksGroup("pipeline_decomposed", use_data_folder=False, wipe_local_data=True) as tasks:
  ...
  tasks.add_run(pipeline, source, decompose="serialize", trigger_rule="all_done", retries=0, provide_context=True)

what happens here:

  1. tasks.add_run keeps the pipeline instance(s) so it can use them to wipe the folder
  2. on __exit__ all working folders are wiped for all pipelines (flag permitting)

Steps to reproduce

Please write a test that reproduces this behavior

Operating system

Linux

Runtime environment

Airflow

Python version

3.11

dlt data source

No response

dlt destination

No response

Other deployment details

No response

Additional information

No response

@rudolfix rudolfix added the bug Something isn't working label Jun 14, 2024
@IlyaFaer IlyaFaer linked a pull request Jun 21, 2024 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
Status: In Progress
Development

Successfully merging a pull request may close this issue.

2 participants