Custom XCOM orm_deserialize_value throws sqlalchemy.exc.InvalidRequestError #45286
Unanswered
mamdouhtawfik
asked this question in
Q&A
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
On Amazon managed airflow v2.8.1, when I override orm_deserialize_value I get a sqlalchemy error when I try to view xcom value, either in the Admin menu or the task specific tab, stack trace below:
r/xcomEntries [GET] Traceback (most recent call last): File "/usr/local/airflow/.local/lib/python3.11/site-packages/flask/app.py", line 2529, in wsgi_app response = self.full_dispatch_request() ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/flask/app.py", line 1825, in full_dispatch_request rv = self.handle_user_exception(e) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/flask/app.py", line 1823, in full_dispatch_request rv = self.dispatch_request() ^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/flask/app.py", line 1799, in dispatch_request return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/connexion/decorators/decorator.py", line 68, in wrapper response = function(request) ^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/connexion/decorators/uri_parsing.py", line 149, in wrapper response = function(request) ^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/connexion/decorators/validation.py", line 399, in wrapper return function(request) ^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/connexion/decorators/response.py", line 112, in wrapper response = function(request) ^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/connexion/decorators/parameter.py", line 120, in wrapper return function(**kwargs) ^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/api_connexion/security.py", line 164, in decorated return _requires_access( ^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/api_connexion/security.py", line 92, in _requires_access return func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/api_connexion/parameters.py", line 104, in wrapped_function return func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py", line 79, in wrapper return func(*args, session=session, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/api_connexion/endpoints/xcom_endpoint.py", line 77, in get_xcom_entries return xcom_collection_schema.dump(XComCollection(xcom_entries=query, total_entries=total_entries)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/marshmallow/schema.py", line 550, in dump result = self._serialize(processed_obj, many=many) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/marshmallow/schema.py", line 518, in _serialize value = field_obj.serialize(attr_name, obj, accessor=self.get_attribute) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/marshmallow/fields.py", line 342, in serialize return self._serialize(value, attr, obj, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/marshmallow/fields.py", line 776, in _serialize return [self.inner._serialize(each, attr, obj, **kwargs) for each in value] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/marshmallow/fields.py", line 776, in <listcomp> return [self.inner._serialize(each, attr, obj, **kwargs) for each in value] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/result.py", line 382, in iterrows for row in self._fetchiter_impl(): File "/usr/local/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/loading.py", line 151, in chunks rows = [proc(row) for row in fetch] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/loading.py", line 151, in <listcomp> rows = [proc(row) for row in fetch] ^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/loading.py", line 940, in _instance session_identity_map._add_unpresent(state, identitykey) File "/usr/local/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/identity.py", line 249, in _killed raise sa_exc.InvalidRequestError( sqlalchemy.exc.InvalidRequestError: Object <CustomXComS3Backend at 0x7fef84ab2350> cannot be converted to 'persistent' state, as this identity map is no longer valid. Has the owning Session been closed? (Background on this error at: https://sqlalche.me/e/14/lkrp)
I tried multiple approach to avoid this, nothing worked, once I comment orm_deserialize_value the error goes away, yet of course I don't get to see the xcom values in the UI. Any ideas on how to resolve this?
Any ideas how to resolve this/what I am doing wrong?
Side question: Is there an easy way to test such changes quickly in docker? Currently I have to bring down and up the container with each change, which for mwaa-local-runner, takes quite some time
Beta Was this translation helpful? Give feedback.
All reactions