Skip to content

Commit

Permalink
Support deferrable operators in dag test (apache#34585)
Browse files Browse the repository at this point in the history
You need to run triggerer separately, but now at least dag test will resume the task after deferral.
  • Loading branch information
dstandish authored Sep 25, 2023
1 parent 97916ba commit 2035dc7
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 2 deletions.
16 changes: 14 additions & 2 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import pathlib
import pickle
import sys
import time
import traceback
import warnings
import weakref
Expand Down Expand Up @@ -123,7 +124,7 @@
tuple_in_condition,
with_row_locks,
)
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.state import DagRunState, State, TaskInstanceState
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.types import NOTSET, ArgNotSet, DagRunType, EdgeInfoType

Expand Down Expand Up @@ -2811,8 +2812,19 @@ def add_logger_if_needed(ti: TaskInstance):
# for task readiness and dependency management. This is notably faster
# than creating a BackfillJob and allows us to surface logs to the user
while dr.state == DagRunState.RUNNING:
session.expire_all()
schedulable_tis, _ = dr.update_state(session=session)
for ti in schedulable_tis:
for s in schedulable_tis:
s.state = TaskInstanceState.SCHEDULED
session.commit()
# triggerer may mark tasks scheduled so we read from DB
all_tis = set(dr.get_task_instances(session=session))
scheduled_tis = {x for x in all_tis if x.state == TaskInstanceState.SCHEDULED}
ids_unrunnable = {x for x in all_tis if x.state not in State.finished} - scheduled_tis
if not scheduled_tis and ids_unrunnable:
self.log.warning("No tasks to run. unrunnable tasks: %s", ids_unrunnable)
time.sleep(1)
for ti in scheduled_tis:
try:
add_logger_if_needed(ti)
ti.task = tasks[ti.task_id]
Expand Down
3 changes: 3 additions & 0 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1361,6 +1361,9 @@ def __init__(
# can be changed when calling 'run'
self.test_mode = False

def __hash__(self):
return hash((self.task_id, self.dag_id, self.run_id, self.map_index))

@property
def stats_tags(self) -> dict[str, str]:
"""Returns task instance tags."""
Expand Down

0 comments on commit 2035dc7

Please sign in to comment.