Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Edge API retries configurable #44536

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,8 @@
"edge": {
"deps": [
"apache-airflow>=2.10.0",
"pydantic>=2.10.2"
"pydantic>=2.10.2",
"retryhttp>=1.2.0"
],
"devel-deps": [],
"plugins": [
Expand Down
8 changes: 8 additions & 0 deletions providers/src/airflow/providers/edge/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@
Changelog
---------

0.9.7pre0
.........

* ``Make API retries configurable via ENV. Connection loss is sustained for 5min by default.``

Misc
~~~~

0.9.6pre0
.........

Expand Down
2 changes: 1 addition & 1 deletion providers/src/airflow/providers/edge/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

__all__ = ["__version__"]

__version__ = "0.9.6pre0"
__version__ = "0.9.7pre0"

if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse(
"2.10.0"
Expand Down
45 changes: 23 additions & 22 deletions providers/src/airflow/providers/edge/cli/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@

import json
import logging
import os
from datetime import datetime
from http import HTTPStatus
from pathlib import Path
from typing import TYPE_CHECKING, Any
from urllib.parse import quote, urljoin

import requests
import tenacity
from requests.exceptions import ConnectionError
from urllib3.exceptions import NewConnectionError
from retryhttp import retry, wait_retry_after
from tenacity import before_log, wait_random_exponential

from airflow.configuration import conf
from airflow.exceptions import AirflowException
Expand All @@ -47,29 +47,30 @@
logger = logging.getLogger(__name__)


def _is_retryable_exception(exception: BaseException) -> bool:
"""
Evaluate which exception types to retry.
# Hidden config options for Edge Worker how retries on HTTP requests should be handled
# Note: Given defaults make attempts after 1, 3, 7, 15, 31seconds, 1:03, 2:07, 3:37 and fails after 5:07min
# So far there is no other config facility in Task SDK we use ENV for the moment
# TODO: Consider these env variables jointly in task sdk together with task_sdk/src/airflow/sdk/api/client.py
API_RETRIES = int(os.getenv("AIRFLOW__EDGE__API_RETRIES", os.getenv("AIRFLOW__WORKERS__API_RETRIES", 10)))
API_RETRY_WAIT_MIN = float(
os.getenv("AIRFLOW__EDGE__API_RETRY_WAIT_MIN", os.getenv("AIRFLOW__WORKERS__API_RETRY_WAIT_MIN", 1.0))
)
API_RETRY_WAIT_MAX = float(
os.getenv("AIRFLOW__EDGE__API_RETRY_WAIT_MAX", os.getenv("AIRFLOW__WORKERS__API_RETRY_WAIT_MAX", 90.0))
)
Comment on lines +50 to +60
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, i think this is a fair set of values, it is configurable in any case and shouldn't have a need for people to scream.


This is especially demanded for cases where an application gateway or Kubernetes ingress can
not find a running instance of a webserver hosting the API (HTTP 502+504) or when the
HTTP request fails in general on network level.

Note that we want to fail on other general errors on the webserver not to send bad requests in an endless loop.
"""
retryable_status_codes = (HTTPStatus.BAD_GATEWAY, HTTPStatus.GATEWAY_TIMEOUT)
return (
isinstance(exception, AirflowException)
and exception.status_code in retryable_status_codes
or isinstance(exception, (ConnectionError, NewConnectionError))
)
_default_wait = wait_random_exponential(min=API_RETRY_WAIT_MIN, max=API_RETRY_WAIT_MAX)


@tenacity.retry(
stop=tenacity.stop_after_attempt(10), # TODO: Make this configurable
wait=tenacity.wait_exponential(min=1), # TODO: Make this configurable
retry=tenacity.retry_if_exception(_is_retryable_exception),
before_sleep=tenacity.before_log(logger, logging.WARNING),
@retry(
reraise=True,
max_attempt_number=API_RETRIES,
wait_server_errors=_default_wait,
wait_network_errors=_default_wait,
wait_timeouts=_default_wait,
wait_rate_limited=wait_retry_after(fallback=_default_wait), # No infinite timeout on HTTP 429
before_sleep=before_log(logger, logging.WARNING),
)
def _make_generic_request(method: str, rest_path: str, data: str | None = None) -> Any:
signer = jwt_signer()
Expand Down
3 changes: 2 additions & 1 deletion providers/src/airflow/providers/edge/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ source-date-epoch: 1729683247

# note that those versions are maintained by release manager - do not update them manually
versions:
- 0.9.6pre0
- 0.9.7pre0

dependencies:
- apache-airflow>=2.10.0
- pydantic>=2.10.2
- retryhttp>=1.2.0

plugins:
- name: edge_executor
Expand Down