Skip to content

Commit

Permalink
Align implementation with TaskSDK PR #45121
Browse files Browse the repository at this point in the history
  • Loading branch information
jscheffl committed Dec 29, 2024
1 parent 4a5d3ad commit 74a839a
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 32 deletions.
3 changes: 2 additions & 1 deletion generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,8 @@
"edge": {
"deps": [
"apache-airflow>=2.10.0",
"pydantic>=2.10.2"
"pydantic>=2.10.2",
"retryhttp>=1.2.0"
],
"devel-deps": [],
"plugins": [
Expand Down
53 changes: 22 additions & 31 deletions providers/src/airflow/providers/edge/cli/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@
from urllib.parse import quote, urljoin

import requests
import tenacity
from requests.exceptions import ConnectionError
from urllib3.exceptions import NewConnectionError
from retryhttp import retry, wait_retry_after
from tenacity import before_log, wait_random_exponential

from airflow.configuration import conf
from airflow.exceptions import AirflowException
Expand All @@ -50,36 +49,28 @@

# Hidden config options for Edge Worker how retries on HTTP requests should be handled
# Note: Given defaults make attempts after 1, 3, 7, 15, 31seconds, 1:03, 2:07, 3:37 and fails after 5:07min
AIRFLOW__EDGE__API_RETRIES = int(os.getenv("AIRFLOW__EDGE__API_RETRIES", 10))
AIRFLOW__EDGE__API_RETRY_WAIT_MIN = int(os.getenv("AIRFLOW__EDGE__API_RETRY_WAIT_MIN", 1))
AIRFLOW__EDGE__API_RETRY_WAIT_MAX = int(os.getenv("AIRFLOW__EDGE__API_RETRY_WAIT_MAX", 90))


def _is_retryable_exception(exception: BaseException) -> bool:
"""
Evaluate which exception types to retry.
This is especially demanded for cases where an application gateway or Kubernetes ingress can
not find a running instance of a webserver hosting the API (HTTP 502+504) or when the
HTTP request fails in general on network level.
Note that we want to fail on other general errors on the webserver not to send bad requests in an endless loop.
"""
retryable_status_codes = (HTTPStatus.BAD_GATEWAY, HTTPStatus.GATEWAY_TIMEOUT)
return (
isinstance(exception, AirflowException)
and exception.status_code in retryable_status_codes
or isinstance(exception, (ConnectionError, NewConnectionError))
)
# So far there is no other config facility in Task SDK we use ENV for the moment
# TODO: Consider these env variables jointly in task sdk together with task_sdk/src/airflow/sdk/api/client.py
API_RETRIES = int(os.getenv("AIRFLOW__EDGE__API_RETRIES", os.getenv("AIRFLOW__WORKERS__API_RETRIES", 10)))
API_RETRY_WAIT_MIN = float(
os.getenv("AIRFLOW__EDGE__API_RETRY_WAIT_MIN", os.getenv("AIRFLOW__WORKERS__API_RETRY_WAIT_MIN", 1.0))
)
API_RETRY_WAIT_MAX = float(
os.getenv("AIRFLOW__EDGE__API_RETRY_WAIT_MAX", os.getenv("AIRFLOW__WORKERS__API_RETRY_WAIT_MAX", 90.0))
)


_default_wait = wait_random_exponential(min=API_RETRY_WAIT_MIN, max=API_RETRY_WAIT_MAX)


@tenacity.retry(
stop=tenacity.stop_after_attempt(AIRFLOW__EDGE__API_RETRIES),
wait=tenacity.wait_exponential(
min=AIRFLOW__EDGE__API_RETRY_WAIT_MIN, max=AIRFLOW__EDGE__API_RETRY_WAIT_MAX
),
retry=tenacity.retry_if_exception(_is_retryable_exception),
before_sleep=tenacity.before_log(logger, logging.WARNING),
@retry(
reraise=True,
max_attempt_number=API_RETRIES,
wait_server_errors=_default_wait,
wait_network_errors=_default_wait,
wait_timeouts=_default_wait,
wait_rate_limited=wait_retry_after(fallback=_default_wait), # No infinite timeout on HTTP 429
before_sleep=before_log(logger, logging.WARNING),
)
def _make_generic_request(method: str, rest_path: str, data: str | None = None) -> Any:
signer = jwt_signer()
Expand Down
1 change: 1 addition & 0 deletions providers/src/airflow/providers/edge/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ versions:
dependencies:
- apache-airflow>=2.10.0
- pydantic>=2.10.2
- retryhttp>=1.2.0

plugins:
- name: edge_executor
Expand Down

0 comments on commit 74a839a

Please sign in to comment.