Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extenstions support for subscriptions. #2097

Open
1 of 3 tasks
nrbnlulu opened this issue Aug 14, 2022 · 5 comments · May be fixed by #3554
Open
1 of 3 tasks

Extenstions support for subscriptions. #2097

nrbnlulu opened this issue Aug 14, 2022 · 5 comments · May be fixed by #3554

Comments

@nrbnlulu
Copy link
Member

nrbnlulu commented Aug 14, 2022

Provide extenstions support for subscriptions.

Feature Request Type

  • Core functionality
  • Alteration (enhancement/optimization) of existing feature(s)
  • New behavior

Description

currently with subscriptions won't hook to any of

  • on_request_start/end
  • resolve
  • get_result
  • on_validation_start/ end
  • on_parsing_start / end
  • on_executing_start / end

It would be nice if we can hook it there too
maybe on_request_start/end would hook to connect / disconnect of the application.
for the other I am not quite sure..

Upvote & Fund

  • We're using Polar.sh so you can upvote and help fund this issue.
  • We receive the funding once the issue is completed & confirmed by you.
  • Thank you in advance for helping prioritize & fund our backlog.
Fund with Polar
@ArtemConstantinov
Copy link

In my project, I also need a symmetrical interface of extensions between schema.execute(...) and schema.subscribe(...). I come up with a not ideal solution that works for my project. There is definitely a need for more discussion about extensions
interface for giving answers to questions as:

  • "Does it relevant to use on_request_... if we wanna use unified extensions for subscriptions and queries?"
  • "Do we need to add a hook as before_subscribe_result and after_subscribe_result?"
  • ...

Due to an open issue #1864 I will not make a pull request with my solution.

Module structure

.../strawberry_patch/
    |- ./__init__.py
    |- ./subscribe.py
    |- ./schema.py
# subscribe.py

from typing import (
    AsyncGenerator, 
    Sequence, 
    Type, 
    Union,
)
from strawberry.extensions import Extension
from strawberry.schema.exceptions import InvalidOperationTypeError
from strawberry.schema.execute import _run_validation
from strawberry.types.graphql import OperationType
from strawberry.types import ExecutionContext
from graphql.subscription import subscribe as original_subscribe
from graphql import (
    parse,
    GraphQLSchema,
    GraphQLError,
    ExecutionResult as GraphQLExecutionResult,
)
from strawberry.extensions.runner import ExtensionsRunner

async def subscribe( 
    schema: GraphQLSchema,
    query: str,
    *,
    extensions: Sequence[Union[Type[Extension], Extension]],
    execution_context: ExecutionContext,
) -> AsyncGenerator[GraphQLExecutionResult, None]:

    extensions_runner = ExtensionsRunner(
        execution_context=execution_context,
        extensions=list(extensions),
    )
    async with extensions_runner.request():
        async with extensions_runner.parsing():
            if not execution_context.graphql_document:
                try:
                    execution_context.graphql_document = parse(query)
                except Exception as err:
                    error = GraphQLError(str(err), original_error=err) if not isinstance(err, GraphQLError) else err
                    execution_context.errors = [error]
                    yield GraphQLExecutionResult(
                        data=None,
                        errors=[error],
                        extensions=await extensions_runner.get_extensions_results(),
                    )
                    return

        async with extensions_runner.validation():
            if execution_context.operation_type is not OperationType.SUBSCRIPTION:
                # can be other subscriptions working through the same WS connection
                # thats why we are not raising InvalidOperationTypeError as in schema.execute
                err = InvalidOperationTypeError(execution_context.operation_type)
                execution_context.errors = [GraphQLError(str(err), original_error=err)]
                yield GraphQLExecutionResult(data=None, errors=execution_context.errors)
                return

            _run_validation(execution_context)
            if execution_context.errors:
                yield GraphQLExecutionResult(data=None, errors=execution_context.errors)
                return

        async with extensions_runner.executing():
            result_source = await original_subscribe(
                schema=schema,
                document=execution_context.graphql_document,
                root_value=execution_context.root_value,
                context_value=execution_context.context,
                variable_values=execution_context.variables,
                operation_name=execution_context.operation_name,
            )
            if isinstance(result_source, GraphQLExecutionResult):
                # An error happen
                yield result_source
                return
            async for exec_result in result_source:
                yield exec_result
# schema.py

from typing import (
    Any, 
    AsyncGenerator, 
    Dict, 
    Optional,
)

import strawberry
from graphql import ExecutionResult as GraphQLExecutionResult
from strawberry.types import ExecutionContext

from .subscribe import subscribe


class Schema(strawberry.Schema):
    async def subscribe(
        self,
        query: str,
        variable_values: Optional[Dict[str, Any]] = None,
        context_value: Optional[Any] = None,
        root_value: Optional[Any] = None,
        operation_name: Optional[str] = None,
    ) -> AsyncGenerator[GraphQLExecutionResult, None]:
        # Create execution context
        execution_context = ExecutionContext(
            query=query,
            schema=self,
            context=context_value,
            root_value=root_value,
            variables=variable_values,
            provided_operation_name=operation_name,
        )
        return subscribe(
            self._schema,
            query,
            extensions=self.get_extensions(),
            execution_context=execution_context,
        )

Usage

Instead of Schema class from strawberry import our patched version.

import asyncio
import strawberry
from strawberry.extensions import Extension
from mydb import get_db_session

from .strawberry_patch.schema import Schema

class MyExtension(Extension):
    def on_request_start(self):
        self.execution_context.context["db"] = get_db_session()

    def on_request_end(self):
        self.execution_context.context["db"].close()

@strawberry.type
class Query:
    @strawberry.field
    def hello(self) -> str:
        return "Hello, World"


@strawberry.type
class Subscription:
    @strawberry.subscription
    async def count(self, target: int = 10) -> AsyncGenerator[int, None]:
        for i in range(target):
            yield i
            await asyncio.sleep(0.5)

SCHEMA = Schema(
    query=Query,
    subscription=Subscription,
    extensions=[MyExtension]
)

@Bingdom
Copy link

Bingdom commented Dec 15, 2022

Thank you for that one

I've noticed that if we interrupt the Async Generator by stopping the query the following exceptions are thrown:

Dec 15 14:18:10 raspberrypi python3.10[6464]: Error handling request
Dec 15 14:18:10 raspberrypi python3.10[6464]: Traceback (most recent call last):
Dec 15 14:18:10 raspberrypi python3.10[6464]:   File "/home/pi/.local/lib/python3.10/site-packages/strawberry/aiohttp/handlers/graphql_ws_handler.py", line 48, in handle_request
Dec 15 14:18:10 raspberrypi python3.10[6464]:     await self.handle_message(message)
Dec 15 14:18:10 raspberrypi python3.10[6464]:   File "/home/pi/.local/lib/python3.10/site-packages/strawberry/subscriptions/protocols/graphql_ws/handlers.py", line 82, in handle_message
Dec 15 14:18:10 raspberrypi python3.10[6464]:     await self.handle_stop(message)
Dec 15 14:18:10 raspberrypi python3.10[6464]:   File "/home/pi/.local/lib/python3.10/site-packages/strawberry/subscriptions/protocols/graphql_ws/handlers.py", line 135, in handle_stop
Dec 15 14:18:10 raspberrypi python3.10[6464]:     await self.cleanup_operation(operation_id)
Dec 15 14:18:10 raspberrypi python3.10[6464]:   File "/home/pi/.local/lib/python3.10/site-packages/strawberry/subscriptions/protocols/graphql_ws/handlers.py", line 177, in cleanup_operation
Dec 15 14:18:10 raspberrypi python3.10[6464]:     await self.subscriptions[operation_id].aclose()
Dec 15 14:18:10 raspberrypi python3.10[6464]: RuntimeError: aclose(): asynchronous generator is already running
Dec 15 14:18:10 raspberrypi python3.10[6464]: During handling of the above exception, another exception occurred:
Dec 15 14:18:10 raspberrypi python3.10[6464]: Traceback (most recent call last):
Dec 15 14:18:10 raspberrypi python3.10[6464]:   File "/home/pi/.local/lib/python3.10/site-packages/aiohttp/web_protocol.py", line 433, in _handle_request
Dec 15 14:18:10 raspberrypi python3.10[6464]:     resp = await request_handler(request)
Dec 15 14:18:10 raspberrypi python3.10[6464]:   File "/home/pi/.local/lib/python3.10/site-packages/aiohttp/web_app.py", line 504, in _handle
Dec 15 14:18:10 raspberrypi python3.10[6464]:     resp = await handler(request)
Dec 15 14:18:10 raspberrypi python3.10[6464]:   File "/home/pi/.local/lib/python3.10/site-packages/strawberry/aiohttp/views.py", line 65, in __call__
Dec 15 14:18:10 raspberrypi python3.10[6464]:     return await self.graphql_ws_handler_class(
Dec 15 14:18:10 raspberrypi python3.10[6464]:   File "/home/pi/.local/lib/python3.10/site-packages/strawberry/subscriptions/protocols/graphql_ws/handlers.py", line 67, in handle
Dec 15 14:18:10 raspberrypi python3.10[6464]:     return await self.handle_request()
Dec 15 14:18:10 raspberrypi python3.10[6464]:   File "/home/pi/.local/lib/python3.10/site-packages/strawberry/aiohttp/handlers/graphql_ws_handler.py", line 56, in handle_request
Dec 15 14:18:10 raspberrypi python3.10[6464]:     await self.cleanup_operation(operation_id)
Dec 15 14:18:10 raspberrypi python3.10[6464]:   File "/home/pi/.local/lib/python3.10/site-packages/strawberry/subscriptions/protocols/graphql_ws/handlers.py", line 177, in cleanup_operation
Dec 15 14:18:10 raspberrypi python3.10[6464]:     await self.subscriptions[operation_id].aclose()
Dec 15 14:18:10 raspberrypi python3.10[6464]: RuntimeError: aclose(): asynchronous generator is already running

@ArtemConstantinov
Copy link

Thank you for that one

I've noticed that if we interrupt the Async Generator by stopping the query the following exceptions are thrown:

Dec 15 14:18:10 raspberrypi python3.10[6464]: Error handling request
Dec 15 14:18:10 raspberrypi python3.10[6464]: Traceback (most recent call last):
Dec 15 14:18:10 raspberrypi python3.10[6464]:   File "/home/pi/.local/lib/python3.10/site-packages/strawberry/aiohttp/handlers/graphql_ws_handler.py", line 48, in handle_request
Dec 15 14:18:10 raspberrypi python3.10[6464]:     await self.handle_message(message)
Dec 15 14:18:10 raspberrypi python3.10[6464]:   File "/home/pi/.local/lib/python3.10/site-packages/strawberry/subscriptions/protocols/graphql_ws/handlers.py", line 82, in handle_message
Dec 15 14:18:10 raspberrypi python3.10[6464]:     await self.handle_stop(message)
Dec 15 14:18:10 raspberrypi python3.10[6464]:   File "/home/pi/.local/lib/python3.10/site-packages/strawberry/subscriptions/protocols/graphql_ws/handlers.py", line 135, in handle_stop
Dec 15 14:18:10 raspberrypi python3.10[6464]:     await self.cleanup_operation(operation_id)
Dec 15 14:18:10 raspberrypi python3.10[6464]:   File "/home/pi/.local/lib/python3.10/site-packages/strawberry/subscriptions/protocols/graphql_ws/handlers.py", line 177, in cleanup_operation
Dec 15 14:18:10 raspberrypi python3.10[6464]:     await self.subscriptions[operation_id].aclose()
Dec 15 14:18:10 raspberrypi python3.10[6464]: RuntimeError: aclose(): asynchronous generator is already running
Dec 15 14:18:10 raspberrypi python3.10[6464]: During handling of the above exception, another exception occurred:
Dec 15 14:18:10 raspberrypi python3.10[6464]: Traceback (most recent call last):
Dec 15 14:18:10 raspberrypi python3.10[6464]:   File "/home/pi/.local/lib/python3.10/site-packages/aiohttp/web_protocol.py", line 433, in _handle_request
Dec 15 14:18:10 raspberrypi python3.10[6464]:     resp = await request_handler(request)
Dec 15 14:18:10 raspberrypi python3.10[6464]:   File "/home/pi/.local/lib/python3.10/site-packages/aiohttp/web_app.py", line 504, in _handle
Dec 15 14:18:10 raspberrypi python3.10[6464]:     resp = await handler(request)
Dec 15 14:18:10 raspberrypi python3.10[6464]:   File "/home/pi/.local/lib/python3.10/site-packages/strawberry/aiohttp/views.py", line 65, in __call__
Dec 15 14:18:10 raspberrypi python3.10[6464]:     return await self.graphql_ws_handler_class(
Dec 15 14:18:10 raspberrypi python3.10[6464]:   File "/home/pi/.local/lib/python3.10/site-packages/strawberry/subscriptions/protocols/graphql_ws/handlers.py", line 67, in handle
Dec 15 14:18:10 raspberrypi python3.10[6464]:     return await self.handle_request()
Dec 15 14:18:10 raspberrypi python3.10[6464]:   File "/home/pi/.local/lib/python3.10/site-packages/strawberry/aiohttp/handlers/graphql_ws_handler.py", line 56, in handle_request
Dec 15 14:18:10 raspberrypi python3.10[6464]:     await self.cleanup_operation(operation_id)
Dec 15 14:18:10 raspberrypi python3.10[6464]:   File "/home/pi/.local/lib/python3.10/site-packages/strawberry/subscriptions/protocols/graphql_ws/handlers.py", line 177, in cleanup_operation
Dec 15 14:18:10 raspberrypi python3.10[6464]:     await self.subscriptions[operation_id].aclose()
Dec 15 14:18:10 raspberrypi python3.10[6464]: RuntimeError: aclose(): asynchronous generator is already running

The error is related to changes with the python.

You can play around by overwrite the method for your GraphQLWSHandler as:

from strawberry.aiohttp.views import (
    GraphQLWSHandler, 
    GraphQLView,
)


class MyGraphQLWSHandler(GraphQLWSHandler):
    async def cleanup_operation(self, operation_id: str) -> None:
        task_  = self.tasks.pop(operation_id)
        task_.cancel()
        with suppress(asyncio.CancelledError):
            await task_

         generator = self.subscriptions.pop(operation_id)
         # comment next 2 lines if still get the issue
         with suppress(RuntimeError):
     	    generator.aclose()


class MyGraphQLView(GraphQLView):
    graphql_ws_handler_class = MyGraphQLWSHandler

Or use subscription_protocols = (GRAPHQL_TRANSPORT_WS_PROTOCOL,)

This is maximum of what I can comment base to provided logs, please also check the answer on stackoverflow related to RuntimeError: aclose()

@nrbnlulu
Copy link
Member Author

nrbnlulu commented Jan 3, 2023

@ArtemConstantinov

  1. One gql-ws connection can execute many operations on the same connection so on_reques_started
    is quite misleading...
  2. get_result hook can't work here since there are possibly infinite results.

@ArtemConstantinov
Copy link

@nrbnlulu aggry to you, here also missing extra hooks like: on_message_receaved and on_message_sent which will be relevant only for the WS connection. Making this part more explicit by my observation will require standardizing the interface of handling HTTP requests and also making asymmetrical extensions that will be relevant for HTTP / WS / SSE. Because some methods in extension are common some are unique to the delivery method.

We need to wait for issue #2389 to be completed before thinking of the improvement because at the moment each supported framework integration will require a custom implementation to solve the issue...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants