-
Notifications
You must be signed in to change notification settings - Fork 14.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for custom celery configs #45038
base: main
Are you sure you want to change the base?
Conversation
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
|
1a172e5
to
28d1806
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR - the contribution looks really small and is efficient. Thanks for taking care for the backwards compatibility.
I assume the fallback handling in default_celery.py:72
is not needed if the config entry is correctly registered as a default in providers/src/airflow/providers/celery/provider.yaml
- There all configs for the provider should be registered. This also would add missing documentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also add a test for this?
Addressed the comments. CI is failing not able to debug why CI pipeline is failing. |
26a981b
to
b719528
Compare
Except some minor comment I think it is good, let me check what is broken on CI. Nothing related to your code. Then I think... LGTM! |
4d67fe8
to
2c56333
Compare
…nce link for all the celery configs.
…nce link for all the celery configs.
2c56333
to
382d45e
Compare
Somthing is wrong with this PR and CI - I don't see the reason but I am very sure it is not related to the changes. I am on it... |
version_added: ~ | ||
type: string | ||
example: ~ | ||
default: "{}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I also did not know but defaults seem to be Format-String formatted and the string "{}" is an invalid sequence. So to effectively have an {}
as default we need to use {{}}
here to quote the brackets.
default: "{}" | |
default: "{{}}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay ... interesting, this also resolves the CI problem - it seems the constrains also can not be built as the templating fails there. I did a PR with the fixes applied as test and this is turning green: #45160
As with the other proposal a check is being made that the settings are a dict, you can even lave an empty string as default...
default: "{}" | |
default: "" |
@@ -85,6 +87,7 @@ def _broker_supports_visibility_timeout(url): | |||
), | |||
"worker_concurrency": conf.getint("celery", "WORKER_CONCURRENCY", fallback=16), | |||
"worker_enable_remote_control": conf.getboolean("celery", "worker_enable_remote_control", fallback=True), | |||
**extra_celery_config, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As CI did not get to this stage (yet) as failing earlier, there will be a problem reported by mypy for providers like:
providers/src/airflow/providers/celery/executors/default_celery.py:72: error:
Incompatible types in assignment (expression has type
"Union[dict[Any, Any], list[Any], str, int, float, None]", variable has type
"dict[Any, Any]") [assignment]
extra_celery_config: dict = conf.getjson("celery", "extra_celery_confi...
^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~...
Found 1 error in 1 file (checked 1 source file)
Error 1 returned
As the JSON result can be other values than a dict (irrespective of the type declaration above, you need to use:
**extra_celery_config, | |
**(extra_celery_config if isinstance(extra_celery_config, dict) else {}), |
closes: #45037
Description:
Currently Airflow support limited celery options only. This PR adds the support for the additional celery config for celery workers.