Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update API #253

Merged
merged 36 commits into from
Dec 12, 2023
Merged

Update API #253

merged 36 commits into from
Dec 12, 2023

Conversation

ruthenian8
Copy link
Member

Description

Implement asynchronous execution of actor handlers, processing and response functions

Checklist

  • I have covered the code with tests
  • I have added comments to my code to help others understand it
  • I have updated the documentation to reflect the changes
  • I have performed a self-review of the changes

@RLKRo RLKRo changed the base branch from master to dev October 13, 2023 13:23
@ruthenian8 ruthenian8 self-assigned this Oct 13, 2023
@RLKRo
Copy link
Member

RLKRo commented Oct 16, 2023

Since this PR changes processing function API, maybe we should also remove/deprecate overwrite_current_node_in_processing here?

@RLKRo
Copy link
Member

RLKRo commented Oct 20, 2023

Maybe make all user-defined functions async?
#237 (comment) (the list of functions)

Also, maybe rename wrap_sync_function_in_async to something like handle_user_function and let it also accept a function type (e.g. "response", or "stats_extractor") and then that function would handle exceptions / incorrect return type.

dff/pipeline/pipeline/actor.py Outdated Show resolved Hide resolved
dff/pipeline/pipeline/actor.py Outdated Show resolved Hide resolved
dff/pipeline/pipeline/actor.py Outdated Show resolved Hide resolved
dff/pipeline/pipeline/actor.py Outdated Show resolved Hide resolved
ctx.framework_states["actor"]["global_true_label"],
ctx.framework_states["actor"]["local_true_label"],
ctx.framework_states["actor"]["node_true_label"],
) = await asyncio.gather(*[global_transitions_coro, local_transitions_coro, node_transitions_coro])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should discuss how we want the components to execute (parallel vs sequential).
Currently:

  • pre-response/pre-transition functions depend on pipeline.parallelize_processing
  • Script conditions are executed in parallel
  • Labels are executed sequentially inside the groups GLOBAL, LOCAL, NODE (two labels are executed sequentially if they are from the same group, but in parallel if they are from different groups)
  • Actor handlers - in parallel
  • Service handlers - depends on ServiceGroup.asynchronous
  • Extra handlers - depends on asynchronous of the extra handlers

After discussing how we should handle the newly asynchronized functions we should record that information in the documentation (and also in #252; other than the asynchronous option, the guide should also mention other options such as the timeout option for extra handlers, @pseusys).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree

dff/pipeline/pipeline/actor.py Outdated Show resolved Hide resolved
dff/pipeline/service/service.py Show resolved Hide resolved
dff/pipeline/pipeline/actor.py Show resolved Hide resolved
@RLKRo RLKRo marked this pull request as ready for review November 20, 2023 11:03
self, pipeline: Pipeline, ctx: Optional[Union[Context, dict, str]] = None, *args, **kwargs
) -> Union[Context, dict, str]:
# context init
ctx = self._context_init(ctx, *args, **kwargs)
self._run_handlers(ctx, pipeline, ActorStage.CONTEXT_INIT, *args, **kwargs)
await self._run_handlers(ctx, pipeline, ActorStage.CONTEXT_INIT, *args, **kwargs)

# get previous node
ctx = self._get_previous_node(ctx, *args, **kwargs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want to make transition functions asynchronous as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can leave such methods as get_previous_node and get_next_node synchronous for two reasons:

  1. all they do is get values from dictionaries, so making them asynchronous won't result in a performance improvement
  2. we would have to await them inside the actor call, i.e. use them like regular blocking functions

ctx.framework_states["actor"]["global_true_label"],
ctx.framework_states["actor"]["local_true_label"],
ctx.framework_states["actor"]["node_true_label"],
) = await asyncio.gather(*[global_transitions_coro, local_transitions_coro, node_transitions_coro])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree

dff/pipeline/service/service.py Show resolved Hide resolved
@@ -28,3 +30,11 @@
def test_tutorials(tutorial_module_name: str):
tutorial_module = importlib.import_module(f"tutorials.{dot_path_to_addon}.{tutorial_module_name}")
check_happy_path(tutorial_module.pipeline, tutorial_module.happy_path)
random.seed(31415)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should we do it here?
I think it was more obvious the way it was.
Also, you haven't removed any seed calls from the tests themselves (in this PR) (if there were any).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line removed

Copy link
Member

@RLKRo RLKRo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you deprecate overwrite_current_node_in_processing instead of removing it?

The deprecation warning should include info that this function is to be removed in 1.0.

Also, we should either make this function do nothing (which is what it currently does) or fix the bug preventing it from doing something.

The bug in question:
Node.model_validate(processed_node) -- doesn't work because here Node is pydantic.BaseModel instead of dff.script.core.script.Node.

Fixing this bug would prove difficult without the pipeline typing fixes from refactor/use_type_checking so I would suggest making this function do nothing except calling warnings.warn. In this case we should also change the documentation for this function (and wherever else it's mentioned) to say that this function doesn't and never worked and that it is not supposed to be used anymore and will be removed in 1.0.

@RLKRo
Copy link
Member

RLKRo commented Nov 27, 2023

ToDo:

Copy link
Collaborator

@pseusys pseusys left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me!

removed_in="1.0.0",
details="This method is deprecated and will be removed in future versions. "
"The earlier implementation was not functional due to typing issues.",
)
def overwrite_current_node_in_processing(self, processed_node: Node):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think before 1.0.0 we can safely remove methods without deprecation.

RLKRo added 10 commits November 30, 2023 17:23
Special tests for parallel processing will be added instead.
I decided that the next release will be 1.0, so we can remove instead of deprecating.
As I said before it doesn't make much sense that they run in parallel while conditions in them run sequentially.
Also includes some doc and typing updates
@RLKRo RLKRo requested a review from pseusys December 7, 2023 03:13
dff/pipeline/pipeline/pipeline.py Outdated Show resolved Hide resolved
docs/source/user_guides/context_guide.rst Outdated Show resolved Hide resolved
tutorials/script/core/9_pre_transitions_processing.py Outdated Show resolved Hide resolved
dff/pipeline/pipeline/pipeline.py Outdated Show resolved Hide resolved
docs/source/user_guides/context_guide.rst Outdated Show resolved Hide resolved
tutorials/script/core/9_pre_transitions_processing.py Outdated Show resolved Hide resolved
# Conflicts:
#	dff/messengers/telegram/interface.py
@RLKRo RLKRo merged commit 677ee7a into dev Dec 12, 2023
16 checks passed
@RLKRo RLKRo deleted the feat/async_handlers branch December 12, 2023 00:27
@RLKRo RLKRo mentioned this pull request Mar 1, 2024
1 task
@RLKRo RLKRo mentioned this pull request Mar 28, 2024
5 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants