Skip to content

Commit

Permalink
Merge branch 'main' into replace-gunicorn-with-uvicorn-run
Browse files Browse the repository at this point in the history
  • Loading branch information
vatsrahul1001 authored Dec 20, 2024
2 parents 0b6774a + b6e3d1c commit 1a74394
Show file tree
Hide file tree
Showing 299 changed files with 14,301 additions and 5,616 deletions.
28 changes: 28 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License
---
version: 2
updates:
- package-ecosystem: pip
directories:
- /clients/python
- /dev/breeze
- /docker_tests
- /task_sdk
- /
schedule:
interval: daily
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,15 @@ jobs:
- name: "Get information about the Workflow"
id: source-run-info
run: breeze ci get-workflow-info 2>> ${GITHUB_OUTPUT}
env:
SKIP_BREEZE_SELF_UPGRADE_CHECK: "true"
- name: Selective checks
id: selective-checks
env:
PR_LABELS: "${{ steps.source-run-info.outputs.pr-labels }}"
COMMIT_REF: "${{ github.sha }}"
VERBOSE: "false"

run: breeze ci selective-check 2>> ${GITHUB_OUTPUT}
- name: env
run: printenv
Expand Down Expand Up @@ -493,6 +496,7 @@ jobs:
needs.build-info.outputs.full-tests-needed == 'true')
with:
test-groups: ${{ needs.build-info.outputs.test-groups }}
default-branch: ${{ needs.build-info.outputs.default-branch }}
runs-on-as-json-default: ${{ needs.build-info.outputs.runs-on-as-json-default }}
image-tag: ${{ needs.build-info.outputs.image-tag }}
core-test-types-list-as-string: ${{ needs.build-info.outputs.core-test-types-list-as-string }}
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ jobs:
- name: "Get information about the Workflow"
id: source-run-info
run: breeze ci get-workflow-info 2>> ${GITHUB_OUTPUT}
env:
SKIP_BREEZE_SELF_UPGRADE_CHECK: "true"
- name: Selective checks
id: selective-checks
env:
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/special-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ on: # yamllint disable-line rule:truthy
description: "The array of labels (in json form) determining default runner used for the build."
required: true
type: string
default-branch:
description: "The default branch for the repository"
required: true
type: string
test-groups:
description: "The json representing list of test test groups to run"
required: true
Expand Down Expand Up @@ -199,6 +203,7 @@ jobs:
include-success-outputs: ${{ inputs.include-success-outputs }}
run-coverage: ${{ inputs.run-coverage }}
debug-resources: ${{ inputs.debug-resources }}
if: ${{ inputs.default-branch == 'main' }}

tests-system:
name: "System test: ${{ matrix.test-group }}"
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ ARG PYTHON_BASE_IMAGE="python:3.9-slim-bookworm"
# Also use `force pip` label on your PR to swap all places we use `uv` to `pip`
ARG AIRFLOW_PIP_VERSION=24.3.1
# ARG AIRFLOW_PIP_VERSION="git+https://github.com/pypa/pip.git@main"
ARG AIRFLOW_UV_VERSION=0.5.9
ARG AIRFLOW_UV_VERSION=0.5.10
ARG AIRFLOW_USE_UV="false"
ARG UV_HTTP_TIMEOUT="300"
ARG AIRFLOW_IMAGE_REPOSITORY="https://github.com/apache/airflow"
Expand Down
18 changes: 1 addition & 17 deletions Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -993,22 +993,6 @@ function determine_airflow_to_use() {
--constraint https://raw.githubusercontent.com/apache/airflow/constraints-main/constraints-${PYTHON_MAJOR_MINOR_VERSION}.txt
# Some packages might leave legacy typing module which causes test issues
pip uninstall -y typing || true
# We need to install `eval-type-backport` to avoid problems with Pydantic 2.10.+ released in
# November 2024 for python 3.8 and 3.9. While Pydantic 2.10.0/2.10.1 completely broke Airflow 2
# installation and Pydantic 2.10.2 fixed the issue for past versions of Airflow, there are still
# Some Typing constructs that are not handled well by Pydantic and in case Pydantic fails with
# those errors, it will STILL fall back to `eval-type-backport` to handle those cases (if
# if `eval-type-backport` is installed. Therefore - until we have Airflow 2.10.3 for backwards
# compatibility tests and we attempt to install "edge" provider that might use such breaking
# constructs, we need to install `eval-type-backport` to avoid problems with Pydantic 2.10.2+
# as well. As soon as we move to Airflow 2.10.4, we can remove this workaround because Airflow
# 2.10.4 adds "eval-type-backport" as a dependency and it will be installed automatically.
if [[ ${PYTHON_MAJOR_MINOR_VERSION} == "3.8" || ${PYTHON_MAJOR_MINOR_VERSION} == "3.9" ]]; then
echo
echo "${COLOR_BLUE}Installing eval-type-backport for Python ${PYTHON_MAJOR_MINOR_VERSION} to workaround Pydantic 2.10.0/2.10.1 issue with new typing style.${COLOR_RESET}"
echo
pip install eval-type-backport>=0.2.0
fi
if [[ ${LINK_PROVIDERS_TO_AIRFLOW_PACKAGE=} == "true" ]]; then
echo
echo "${COLOR_BLUE}Linking providers to airflow package as we are using them from mounted sources.${COLOR_RESET}"
Expand Down Expand Up @@ -1381,7 +1365,7 @@ RUN bash /scripts/docker/install_packaging_tools.sh; \
# Also use `force pip` label on your PR to swap all places we use `uv` to `pip`
ARG AIRFLOW_PIP_VERSION=24.3.1
# ARG AIRFLOW_PIP_VERSION="git+https://github.com/pypa/pip.git@main"
ARG AIRFLOW_UV_VERSION=0.5.9
ARG AIRFLOW_UV_VERSION=0.5.10
# TODO(potiuk): automate with upgrade check (possibly)
ARG AIRFLOW_PRE_COMMIT_VERSION="4.0.1"
ARG AIRFLOW_PRE_COMMIT_UV_VERSION="4.1.4"
Expand Down
7 changes: 4 additions & 3 deletions airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,16 +536,17 @@ def _transform_dag_run_states(states: Iterable[str] | None) -> list[DagRunState

# TI
def _transform_ti_states(states: list[str] | None) -> list[TaskInstanceState | None] | None:
"""Transform a list of state strings into a list of TaskInstanceState enums handling special 'None' cases."""
if not states:
return None

try:
if not states:
return None
return [None if s in ("none", None) else TaskInstanceState(s) for s in states]
except ValueError:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Invalid value for state. Valid values are {', '.join(TaskInstanceState)}",
)
return states


QueryTIStateFilter = Annotated[
Expand Down
3 changes: 2 additions & 1 deletion airflow/api_fastapi/core_api/datamodels/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ class DAGRunClearBody(BaseModel):
"""DAG Run serializer for clear endpoint body."""

dry_run: bool = True
only_failed: bool = False


class DAGRunResponse(BaseModel):
"""DAG Run serializer for responses."""

dag_run_id: str | None = Field(validation_alias="run_id")
dag_run_id: str = Field(validation_alias="run_id")
dag_id: str
logical_date: datetime | None
queued_at: datetime | None
Expand Down
1 change: 1 addition & 0 deletions airflow/api_fastapi/core_api/datamodels/xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class XComResponse(BaseModel):
map_index: int
task_id: str
dag_id: str
run_id: str


class XComResponseNative(XComResponse):
Expand Down
20 changes: 17 additions & 3 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7048,6 +7048,10 @@ components:
type: boolean
title: Dry Run
default: true
only_failed:
type: boolean
title: Only Failed
default: false
type: object
title: DAGRunClearBody
description: DAG Run serializer for clear endpoint body.
Expand Down Expand Up @@ -7093,9 +7097,7 @@ components:
DAGRunResponse:
properties:
dag_run_id:
anyOf:
- type: string
- type: 'null'
type: string
title: Dag Run Id
dag_id:
type: string
Expand Down Expand Up @@ -9494,6 +9496,9 @@ components:
dag_id:
type: string
title: Dag Id
run_id:
type: string
title: Run Id
type: object
required:
- key
Expand All @@ -9502,6 +9507,7 @@ components:
- map_index
- task_id
- dag_id
- run_id
title: XComResponse
description: Serializer for a xcom item.
XComResponseNative:
Expand All @@ -9526,6 +9532,9 @@ components:
dag_id:
type: string
title: Dag Id
run_id:
type: string
title: Run Id
value:
title: Value
type: object
Expand All @@ -9536,6 +9545,7 @@ components:
- map_index
- task_id
- dag_id
- run_id
- value
title: XComResponseNative
description: XCom response serializer with native return type.
Expand All @@ -9561,6 +9571,9 @@ components:
dag_id:
type: string
title: Dag Id
run_id:
type: string
title: Run Id
value:
anyOf:
- type: string
Expand All @@ -9574,6 +9587,7 @@ components:
- map_index
- task_id
- dag_id
- run_id
- value
title: XComResponseString
description: XCom response serializer with string return type.
8 changes: 4 additions & 4 deletions airflow/api_fastapi/core_api/routes/public/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def clear_dag_run(
start_date=start_date,
end_date=end_date,
task_ids=None,
only_failed=False,
only_failed=body.only_failed,
dry_run=True,
session=session,
)
Expand All @@ -245,10 +245,10 @@ def clear_dag_run(
)
else:
dag.clear(
start_date=dag_run.start_date,
end_date=dag_run.end_date,
start_date=start_date,
end_date=end_date,
task_ids=None,
only_failed=False,
only_failed=body.only_failed,
session=session,
)
dag_run_cleared = session.scalar(select(DagRun).where(DagRun.id == dag_run.id))
Expand Down
31 changes: 30 additions & 1 deletion airflow/api_fastapi/execution_api/datamodels/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from datetime import timedelta
from typing import Annotated, Any, Literal, Union

from pydantic import Discriminator, Field, Tag, WithJsonSchema
from pydantic import AwareDatetime, Discriminator, Field, Tag, TypeAdapter, WithJsonSchema, field_validator

from airflow.api_fastapi.common.types import UtcDateTime
from airflow.api_fastapi.core_api.base import BaseModel
Expand All @@ -30,6 +30,8 @@
from airflow.utils.state import IntermediateTIState, TaskInstanceState as TIState, TerminalTIState
from airflow.utils.types import DagRunType

AwareDatetimeAdapter = TypeAdapter(AwareDatetime)


class TIEnterRunningPayload(BaseModel):
"""Schema for updating TaskInstance to 'RUNNING' state with minimal required fields."""
Expand Down Expand Up @@ -83,6 +85,30 @@ class TIDeferredStatePayload(BaseModel):
next_method: str
trigger_timeout: timedelta | None = None

@field_validator("trigger_kwargs")
def validate_moment(cls, v):
if "moment" in v:
v["moment"] = AwareDatetimeAdapter.validate_strings(v["moment"])
return v


class TIRescheduleStatePayload(BaseModel):
"""Schema for updating TaskInstance to a up_for_reschedule state."""

state: Annotated[
Literal[IntermediateTIState.UP_FOR_RESCHEDULE],
# Specify a default in the schema, but not in code, so Pydantic marks it as required.
WithJsonSchema(
{
"type": "string",
"enum": [IntermediateTIState.UP_FOR_RESCHEDULE],
"default": IntermediateTIState.UP_FOR_RESCHEDULE,
}
),
]
reschedule_date: UtcDateTime
end_date: UtcDateTime


def ti_state_discriminator(v: dict[str, str] | BaseModel) -> str:
"""
Expand All @@ -101,6 +127,8 @@ def ti_state_discriminator(v: dict[str, str] | BaseModel) -> str:
return "_terminal_"
elif state == TIState.DEFERRED:
return "deferred"
elif state == TIState.UP_FOR_RESCHEDULE:
return "up_for_reschedule"
return "_other_"


Expand All @@ -111,6 +139,7 @@ def ti_state_discriminator(v: dict[str, str] | BaseModel) -> str:
Annotated[TITerminalStatePayload, Tag("_terminal_")],
Annotated[TITargetStatePayload, Tag("_other_")],
Annotated[TIDeferredStatePayload, Tag("deferred")],
Annotated[TIRescheduleStatePayload, Tag("up_for_reschedule")],
],
Discriminator(ti_state_discriminator),
]
Expand Down
Loading

0 comments on commit 1a74394

Please sign in to comment.