Skip to content

Commit

Permalink
refactor: move global refs to main_store
Browse files Browse the repository at this point in the history
  • Loading branch information
eri24816 committed Mar 23, 2024
1 parent 35fb815 commit d92a73b
Show file tree
Hide file tree
Showing 19 changed files with 167 additions and 149 deletions.
105 changes: 49 additions & 56 deletions backend/src/grapycal/core/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from grapycal.sobjects.controls import *
from grapycal.sobjects.editor import Editor
from grapycal.sobjects.workspaceObject import WebcamStream, WorkspaceObject
from grapycal.stores import main_store
from grapycal.utils.httpResource import HttpResource
from grapycal.utils.io import file_exists, read_workspace, write_workspace

Expand Down Expand Up @@ -62,71 +63,45 @@ def __init__(self, port, host, path, workspace_id) -> None:
self.host = host
self.workspace_id = workspace_id # used for exit message file
self.running_module = running_module
""""""

"""
Enable stdout proxy for this process
"""
stdout_helper.enable_proxy(redirect_error=False)
self.redirect = stdout_helper.redirect

self._communication_event_loop: asyncio.AbstractEventLoop | None = None

self.background_runner = BackgroundRunner()

self._objectsync = objectsync.Server(port, host)

self._extention_manager = ExtensionManager(self._objectsync, self)

self.do_after_transition = self._objectsync.do_after_transition

self.clock = Clock(0.1)

self.webcam: WebcamStream | None = None
self._slash_commands_topic = self._objectsync.create_topic("slash_commands", objectsync.DictTopic)
self.slash = SlashCommandManager(self._slash_commands_topic)

self.slash.register("save workspace", lambda ctx: self.save_workspace(self.path))

self.data_yaml = HttpResource(
"https://github.com/Grapycal/grapycal_data/raw/main/data.yaml", dict
)
self.slash.register("save workspace", lambda ctx: self.save_workspace(self.path))

self.grapycal_id_count = 0
self.is_running = False

self._setup_objectsync()

def _communication_thread(self, event_loop_set_event: threading.Event):
asyncio.run(self._async_communication_thread(event_loop_set_event))

async def _async_communication_thread(self, event_loop_set_event: threading.Event):
self._communication_event_loop = asyncio.get_event_loop()
main_store.event_loop = asyncio.get_event_loop()
event_loop_set_event.set()
try:
await asyncio.gather(self._objectsync.serve(), self.clock.run())
await self._objectsync.serve()
except OSError as e:
if e.errno == 10048:
logger.error(
f"Port {self.port} is already in use. Maybe another instance of grapycal is running?"
)
self.get_communication_event_loop().stop()
main_store.event_loop.stop()
# send signal to the main thread to exit
os.kill(os.getpid(), signal.SIGTERM)
else:
raise e

def run(self) -> None:
event_loop_set_event = threading.Event()
t = threading.Thread(
target=self._communication_thread, daemon=True, args=[event_loop_set_event]
) # daemon=True until we have a proper exit strategy

t.start()
event_loop_set_event.wait()

self._extention_manager.start()

self._objectsync.globals.workspace = self


def _setup_objectsync(self):
self._objectsync.register_service("exit", self.exit)
self._objectsync.register_service("interrupt", self.interrupt)

Expand All @@ -150,8 +125,39 @@ def run(self) -> None:
self._objectsync.register(WebcamStream)
self._objectsync.register(LinePlotControl)

def run(self) -> None:
'''
The blocking function that make the workspace start functioning. The main thread will run a background_runner
that runs the background tasks from nodes.
A communication thread will be started to handle the communication between the frontend and the backend.
'''

event_loop_set_event = threading.Event()
t = threading.Thread(
target=self._communication_thread, daemon=True, args=[event_loop_set_event]
).start() # daemon=True until we have a proper exit strategy
event_loop_set_event.wait()
self._extention_manager.start()

"""
Register all built-in node types
Setup the store
"""

main_store.node_types = self._objectsync.create_topic('node_types',objectsync.DictTopic,is_stateful=False)
main_store.clock = Clock(0.1)
main_store.event_loop.create_task(main_store.clock.run())
main_store.redirect = stdout_helper.redirect
main_store.runner = BackgroundRunner()
main_store.send_message = self.send_message
main_store.send_message_to_all = self.send_message_to_all
main_store.clear_edges = self._clear_edges
main_store.open_workspace = self._open_workspace_callback
main_store.data_yaml = HttpResource("https://github.com/Grapycal/grapycal_data/raw/main/data.yaml", dict)
main_store.next_id = self._next_id
main_store.vars = self._vars

"""
Load/initialize workspace
"""

signal.signal(signal.SIGTERM, lambda sig, frame: self.exit())
Expand Down Expand Up @@ -195,18 +201,14 @@ def run(self) -> None:

self.is_running = True

self.background_runner.run()
main_store.runner.run()

def exit(self):
self.background_runner.exit()
main_store.runner.exit()

def interrupt(self):
self.background_runner.interrupt()
self.background_runner.clear_tasks()

def get_communication_event_loop(self) -> asyncio.AbstractEventLoop:
assert self._communication_event_loop is not None
return self._communication_event_loop
main_store.runner.interrupt()
main_store.runner.clear_tasks()

"""
Save and load
Expand Down Expand Up @@ -259,13 +261,7 @@ def load_workspace(self, path: str) -> None:

self._objectsync.set_client_id_count(data["client_id_count"])
self._objectsync.set_id_count(data["id_count"])
if (
"grapycal_id_count" in data
): # DEPRECATED from v0.11.0: v0.10.0 and before has no grapycal_id_count
self.grapycal_id_count = data["grapycal_id_count"]
else:
# we cannot know the exact value of grapycal_id_count, so we set it to 032.5, which will never collide. Very smart 🤯
self.grapycal_id_count = 032.5
self.grapycal_id_count = data["grapycal_id_count"]
workspace_serialized = from_dict(
SObjectSerialized, data["workspace_serialized"]
)
Expand Down Expand Up @@ -329,7 +325,7 @@ def get_workspace_object(self) -> WorkspaceObject:
# In case this called in self._objectsync.create_object(WorkspaceObject),
return self._objectsync.get_root_object().get_child_of_type(WorkspaceObject)

def vars(self) -> Dict[str, Any]:
def _vars(self) -> Dict[str, Any]:
return self.running_module.__dict__

def _open_workspace_callback(self, path, no_exist_ok=False):
Expand All @@ -347,9 +343,6 @@ def _open_workspace_callback(self, path, no_exist_ok=False):
f.write(f"open {path}")
self.exit()

def add_task_to_event_loop(self, task):
self._communication_event_loop.create_task(task)

def send_message_to_all(self, message, type=ClientMsgTypes.NOTIFICATION):
if not self.is_running:
return
Expand Down Expand Up @@ -380,11 +373,11 @@ def client_disconnected(self, client_id):
except:
pass # topic may have not been created successfully.

def next_id(self):
def _next_id(self):
self.grapycal_id_count += 1
return self.grapycal_id_count

def clear_edges(self):
def _clear_edges(self):
edges = self.get_workspace_object().top_down_search(type=Edge)
for edge in edges:
edge.clear()
Expand Down
16 changes: 8 additions & 8 deletions backend/src/grapycal/extension/extensionManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from unittest import skip
from grapycal.extension.extensionSearch import get_remote_extensions
from grapycal.extension.utils import get_extension_info, get_package_version, list_to_dict, snap_node
from grapycal.stores import main_store
logger = logging.getLogger(__name__)

from typing import TYPE_CHECKING, Dict, List, Tuple
Expand Down Expand Up @@ -33,7 +34,6 @@ def __init__(self,objectsync_server:objectsync.Server,workspace:'Workspace') ->
self._imported_extensions_topic = self._objectsync.create_topic('imported_extensions',objectsync.DictTopic,is_stateful=False)
self._avaliable_extensions_topic = self._objectsync.create_topic('avaliable_extensions',objectsync.DictTopic,is_stateful=False)
self._not_installed_extensions_topic = self._objectsync.create_topic('not_installed_extensions',objectsync.DictTopic,is_stateful=False)
self._node_types_topic = self._objectsync.create_topic('node_types',objectsync.DictTopic,is_stateful=False)
self._objectsync.on('import_extension',self.import_extension,is_stateful=False)
self._objectsync.on('unimport_extension',self.unimport_extension,is_stateful=False)
self._objectsync.on('update_extension',self.update_extension,is_stateful=False)
Expand All @@ -60,7 +60,7 @@ def import_extension(self, extension_name: str, create_nodes=True, log=True) ->
self._workspace.slash.register(f'unimport: {extension_name}',lambda _: self.unimport_extension(extension_name),source=extension_name)
if log:
logger.info(f'Imported extension {extension_name}')
self._workspace.send_message_to_all(f'Imported extension {extension_name}')
main_store.send_message_to_all(f'Imported extension {extension_name}')
self._objectsync.clear_history_inclusive()
return extension

Expand Down Expand Up @@ -143,7 +143,7 @@ def hit(node:objectsync.SObject) -> bool:
self._update_available_extensions_topic()

logger.info(f'Reloaded extension {extension_name}')
self._workspace.send_message_to_all(f'Reloaded extension {extension_name} {new_version.version}')
main_store.send_message_to_all(f'Reloaded extension {extension_name} {new_version.version}')
self._objectsync.clear_history_inclusive()

def unimport_extension(self, extension_name: str, log=True) -> None:
Expand All @@ -154,7 +154,7 @@ def unimport_extension(self, extension_name: str, log=True) -> None:
self._workspace.slash.unregister_source(extension_name)
if log:
logger.info(f'Unimported extension {extension_name}')
self._workspace.send_message_to_all(f'Unimported extension {extension_name}')
main_store.send_message_to_all(f'Unimported extension {extension_name}')
self._objectsync.clear_history_inclusive()

def _instantiate_singletons(self, extension_name: str) -> None:
Expand All @@ -167,7 +167,7 @@ def _instantiate_singletons(self, extension_name: str) -> None:
self._workspace.get_workspace_object().main_editor.create_node(node_type, translation='9999,9999',is_new = True)

def _update_available_extensions_topic(self) -> None:
self._workspace.add_task_to_event_loop(self._update_available_extensions_topic_async())
main_store.event_loop.create_task(self._update_available_extensions_topic_async())

async def _update_available_extensions_topic_async(self) -> None:
'''
Expand Down Expand Up @@ -225,7 +225,7 @@ async def _check_extension_compatible(self, extension_name: str):
')

def _install_extension(self, extension_name: str) -> None:
self._workspace.add_task_to_event_loop(self._install_extension_async(extension_name))
main_store.event_loop.create_task(self._install_extension_async(extension_name))

async def _install_extension_async(self, extension_name: str) -> None:
# TODO: async this slow stuff
Expand All @@ -252,7 +252,7 @@ def _load_extension(self, name: str) -> Extension:
self._register_extension(name)
self._imported_extensions_topic.add(name,extension.get_info())
for node_type_name, node_type in extension.node_types_d.items():
self._node_types_topic.add(node_type_name,{
main_store.node_types.add(node_type_name,{
'name':node_type_name,
'category':node_type.category,
'description':node_type.__doc__,
Expand Down Expand Up @@ -298,7 +298,7 @@ def _unload_extension(self, name: str) -> None:
self._extensions.pop(name)
self._imported_extensions_topic.pop(name)
for node_type_name in node_types:
self._node_types_topic.pop(node_type_name)
main_store.node_types.pop(node_type_name)

def create_preview_nodes(self, name: str) -> None:
node_types = self._extensions[name].node_types_d
Expand Down
15 changes: 7 additions & 8 deletions backend/src/grapycal/sobjects/editor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import threading

from grapycal.stores import main_store
from grapycal.utils.misc import as_type

logger = logging.getLogger(__name__)
Expand All @@ -22,8 +23,7 @@ class Editor(SObject):
def build(self, old: SObjectSerialized | None = None):
from grapycal.core.workspace import Workspace # avoid circular import

self.workspace: Workspace = self._server.globals.workspace
self.node_types = self.workspace._extention_manager._node_types_topic
self.node_types = main_store.node_types


# used by frontend
Expand All @@ -33,7 +33,7 @@ def build(self, old: SObjectSerialized | None = None):
self._running = set()
self._set_running_lock = threading.Lock()

self.workspace.clock.on_tick += self.check_running_nodes
main_store.clock.on_tick += self.check_running_nodes

if old is not None:
# If the editor is loaded from a save, we need to recreate the nodes and edges.
Expand Down Expand Up @@ -80,7 +80,6 @@ def restore(self, nodes: list[SObjectSerialized], edges: list[SObjectSerialized]
with self._server.record(allow_reentry=True):
new_node_ids, new_edge_ids = self._restore(nodes, edges)
for node in self._restored_nodes:
# the post_create method must called outside of the restore_event to be in auto mode
node.post_create()

self._new_node_ids = new_node_ids # the _paste() method will use this
Expand Down Expand Up @@ -109,14 +108,14 @@ def _restore(
edges: dict[str, SObjectSerialized] = {}
for obj in node_list:
if obj.id in self._server._objects:
new_id = f"r_{self.workspace.next_id()}" # r means restored
new_id = f"r_{main_store.next_id()}" # r means restored
else:
new_id = obj.id # if possible, keep the old id
nodes[new_id] = obj

for obj in edge_list:
if obj.id in self._server._objects:
new_id = f"r_{self.workspace.next_id()}"
new_id = f"r_{main_store.next_id()}"
else:
new_id = obj.id
edges[new_id] = obj
Expand Down Expand Up @@ -252,7 +251,7 @@ def port_id_map(old_port_id: str) -> str | None:

# keep the old id if possible
if obj.id in self._server._objects:
new_edge_id = f"r_{self.workspace.next_id()}"
new_edge_id = f"r_{main_store.next_id()}"
else:
new_edge_id = obj.id
new_edge_id = self.create_edge_from_port_id(
Expand All @@ -275,7 +274,7 @@ def _create_node_service(self, node_type: str, **kwargs):

def create_node(self, node_type: str | type[Node], **kwargs) -> Node | None:
if isinstance(node_type, str):
node_type_cls = as_type(self._server._object_types[node_type], NodeMeta)
node_type_cls = as_type(self._server._object_types[node_type], type[Node])
else:
node_type_cls = node_type
if node_type_cls._is_singleton and hasattr(node_type_cls, "instance"):
Expand Down
9 changes: 5 additions & 4 deletions backend/src/grapycal/sobjects/fileView.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
from pathlib import Path
from grapycal.extension.utils import list_to_dict
from grapycal.stores import main_store
from grapycal.utils.httpResource import HttpResource
from matplotlib.style import available
from objectsync import IntTopic, SObject, StringTopic
Expand Down Expand Up @@ -35,7 +36,7 @@ def get_workspace_metadata(self, path):
raise NotImplementedError()

def open_workspace(self, path):
self._server.globals.workspace._open_workspace_callback(path)
main_store.open_workspace(path)


class LocalFileView(FileView):
Expand Down Expand Up @@ -83,7 +84,7 @@ def add_file(self, path):
path = os.path.join(root, path)
if os.path.exists(path):
return False
self._server.globals.workspace._open_workspace_callback(path, no_exist_ok=True)
main_store.open_workspace(path, no_exist_ok=True)

def add_dir(self, path):
root = os.getcwd()
Expand All @@ -92,7 +93,7 @@ def add_dir(self, path):
if os.path.exists(path):
return False
os.mkdir(path)
self._server.globals.workspace.send_status_message(f"Created directory {path}")
main_store.send_message(f"Created directory {path}")
return True

def delete(self, path):
Expand Down Expand Up @@ -243,4 +244,4 @@ async def open_workspace(self, path: str):

# open workspace

self._server.globals.workspace._open_workspace_callback(local_path)
main_store.open_workspace(local_path)
Loading

0 comments on commit d92a73b

Please sign in to comment.