How does 'getting_time' work in a dag in Airflow? #40155
-
So, I wanted to make a file name variable using time stamp as part of the name. My idea was to make a timestamp variable in the dag file, outside the dag declaration. I expected to get the same file name over the tasks I put into the dag since I thought that a dag is going to run over the file once and with each task coming up, the dag only executes the tasks at hand, not the whole dag file. Here's a small code snippet for reference: from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
import datetime
now = datetime.datetime.now()
# This will show the 2 timestamps result in the log,
# and make 2 text file with timestamp name in test folder if you have one
def print_time():
TEXT_FILE_NAME = str(now.strftime('%m-%d-%Y_%H-%M-%S')) + '.txt'
f = open('test_folder/' + TEXT_FILE_NAME, 'w+')
f.write('1234')
f.close()
print(TEXT_FILE_NAME)
dag = DAG(
dag_id='test_dag',
default_args={
'start_date': days_ago(1),
},
schedule_interval='*/2 * * * *',
catchup=False
)
print_task_1 = PythonOperator(
task_id='print1',
python_callable=print_time,
dag=dag
)
print_task_2 = PythonOperator(
task_id='print2',
python_callable=print_time,
dag=dag,
)
print_task_1 >> print_task_2 Oh, and I've known about XCom when I was working on the idea above, just got it to work and delete the XCom variable after each dag call. Still, this keeps me wondering. Thank you for the help, wish you all clean coding and a clear mind. |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 3 replies
-
Airflow parses the DAG file multiple times - every 30 seconds by default when DAG file processor is scanning it. Plus every time task gets executed it's parsed again on the worker. There are good reasons for that, but basically every time your "now" will be different so you should use one of the dates available in context - for example |
Beta Was this translation helpful? Give feedback.
-
Nice questions, I still don't think that you've made it, hehe |
Beta Was this translation helpful? Give feedback.
Airflow parses the DAG file multiple times - every 30 seconds by default when DAG file processor is scanning it. Plus every time task gets executed it's parsed again on the worker. There are good reasons for that, but basically every time your "now" will be different so you should use one of the dates available in context - for example
execution_date
orend_date' or
start_date` - see all available context variables here https://airflow.apache.org/docs/apache-airflow/2.9.2/templates-ref.html