Skip to content

Commit

Permalink
Make Edge API retries configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
jscheffl committed Dec 14, 2024
1 parent 8e2e1fa commit 0c2bbf3
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 4 deletions.
8 changes: 8 additions & 0 deletions providers/src/airflow/providers/edge/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@
Changelog
---------

0.9.6pre0
.........

Misc
~~~~

* ``Make API retries configurable via ENV. Connection loss is sustained for 5min by default.``

0.9.5pre0
.........

Expand Down
2 changes: 1 addition & 1 deletion providers/src/airflow/providers/edge/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

__all__ = ["__version__"]

__version__ = "0.9.5pre0"
__version__ = "0.9.6pre0"

if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse(
"2.10.0"
Expand Down
14 changes: 12 additions & 2 deletions providers/src/airflow/providers/edge/cli/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import json
import logging
import os
from datetime import datetime
from http import HTTPStatus
from pathlib import Path
Expand Down Expand Up @@ -47,6 +48,13 @@
logger = logging.getLogger(__name__)


# Hidden config options for Edge Worker how retries on HTTP requests should be handled
# Note: Given defaults make attempts after 1, 3, 7, 15, 31seconds, 1:03, 2:07, 3:37 and fails after 5:07min
AIRFLOW__EDGE__API_RETRIES = int(os.getenv("AIRFLOW__EDGE__API_RETRIES", 10))
AIRFLOW__EDGE__API_RETRY_WAIT_MIN = int(os.getenv("AIRFLOW__EDGE__API_RETRY_WAIT_MIN", 1))
AIRFLOW__EDGE__API_RETRY_WAIT_MAX = int(os.getenv("AIRFLOW__EDGE__API_RETRY_WAIT_MAX", 90))


def _is_retryable_exception(exception: BaseException) -> bool:
"""
Evaluate which exception types to retry.
Expand All @@ -66,8 +74,10 @@ def _is_retryable_exception(exception: BaseException) -> bool:


@tenacity.retry(
stop=tenacity.stop_after_attempt(10), # TODO: Make this configurable
wait=tenacity.wait_exponential(min=1), # TODO: Make this configurable
stop=tenacity.stop_after_attempt(AIRFLOW__EDGE__API_RETRIES),
wait=tenacity.wait_exponential(
min=AIRFLOW__EDGE__API_RETRY_WAIT_MIN, max=AIRFLOW__EDGE__API_RETRY_WAIT_MAX
),
retry=tenacity.retry_if_exception(_is_retryable_exception),
before_sleep=tenacity.before_log(logger, logging.WARNING),
)
Expand Down
2 changes: 1 addition & 1 deletion providers/src/airflow/providers/edge/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ source-date-epoch: 1729683247

# note that those versions are maintained by release manager - do not update them manually
versions:
- 0.9.5pre0
- 0.9.6pre0

dependencies:
- apache-airflow>=2.10.0
Expand Down

0 comments on commit 0c2bbf3

Please sign in to comment.