Skip to content

Commit

Permalink
Add support for custom celery configs (#45038)
Browse files Browse the repository at this point in the history
* #45037: Support for additional celery config directly from airflow.cfg file

* Added unit test for the additional config and addressed comments

* Added unit test for the additional config and addressed comments

* Addressed comments: Added sample config in the docs as well as reference link for all the celery configs.

* Addressed comments: Added sample config in the docs as well as reference link for all the celery configs.

* Fixed the Unit tests

* Fixed the Unit tests

* Fixed the Unit tests

---------

Co-authored-by: Sachin Arora <[email protected]>
  • Loading branch information
arorasachin9 and sachin-arora-cashfree authored Dec 23, 2024
1 parent 545bb84 commit f315149
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ def _broker_supports_visibility_timeout(url):
log.debug("Value for celery result_backend not found. Using sql_alchemy_conn with db+ prefix.")
result_backend = f'db+{conf.get("database", "SQL_ALCHEMY_CONN")}'

extra_celery_config = conf.getjson("celery", "extra_celery_config", fallback={})

DEFAULT_CELERY_CONFIG = {
"accept_content": ["json"],
"event_serializer": "json",
Expand All @@ -85,6 +87,7 @@ def _broker_supports_visibility_timeout(url):
),
"worker_concurrency": conf.getint("celery", "WORKER_CONCURRENCY", fallback=16),
"worker_enable_remote_control": conf.getboolean("celery", "worker_enable_remote_control", fallback=True),
**(extra_celery_config if isinstance(extra_celery_config, dict) else {}),
}


Expand Down
11 changes: 11 additions & 0 deletions providers/src/airflow/providers/celery/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,17 @@ config:
type: string
example: ~
default: "False"
extra_celery_config:
description: |
Extra celery configs to include in the celery worker.
Any of the celery config can be added to this config and it
will be applied while starting the celery worker. e.g. {"worker_max_tasks_per_child": 10}
See also:
https://docs.celeryq.dev/en/stable/userguide/configuration.html#configuration-and-defaults
version_added: ~
type: string
example: ~
default: "{{}}"
celery_broker_transport_options:
description: |
This section is for specifying options which can be passed to the
Expand Down
9 changes: 9 additions & 0 deletions providers/tests/celery/executors/test_celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,3 +399,12 @@ def test_celery_task_acks_late_loaded_from_string():
# reload celery conf to apply the new config
importlib.reload(default_celery)
assert default_celery.DEFAULT_CELERY_CONFIG["task_acks_late"] is False


@conf_vars({("celery", "extra_celery_config"): '{"worker_max_tasks_per_child": 10}'})
def test_celery_extra_celery_config_loaded_from_string():
import importlib

# reload celery conf to apply the new config
importlib.reload(default_celery)
assert default_celery.DEFAULT_CELERY_CONFIG["worker_max_tasks_per_child"] == 10

0 comments on commit f315149

Please sign in to comment.