diff --git a/environment.yml b/environment.yml index f26fbbb..a9f5e0e 100644 --- a/environment.yml +++ b/environment.yml @@ -16,7 +16,6 @@ dependencies: - boto3 - python-multipart - PyYAML - - apscheduler - git+https://github.com/hummingbot/hbot-remote-client-py.git - flake8 - isort diff --git a/routers/manage_accounts.py b/routers/manage_accounts.py index 91b634e..7751f7c 100644 --- a/routers/manage_accounts.py +++ b/routers/manage_accounts.py @@ -17,6 +17,11 @@ async def startup_event(): accounts_service.start_update_account_state_loop() +@router.on_event("shutdown") +async def shutdown_event(): + accounts_service.stop_update_account_state_loop() + + @router.get("/accounts-state", response_model=Dict[str, Dict[str, List[Dict]]]) async def get_all_accounts_state(): return accounts_service.get_accounts_state() diff --git a/routers/manage_broker_messages.py b/routers/manage_broker_messages.py index 8116184..e31d1f3 100644 --- a/routers/manage_broker_messages.py +++ b/routers/manage_broker_messages.py @@ -1,4 +1,3 @@ -from apscheduler.schedulers.asyncio import AsyncIOScheduler from fastapi import APIRouter, HTTPException from config import BROKER_HOST, BROKER_PASSWORD, BROKER_PORT, BROKER_USERNAME @@ -6,26 +5,20 @@ from services.bots_orchestrator import BotsManager # Initialize the scheduler -scheduler = AsyncIOScheduler() router = APIRouter(tags=["Manage Broker Messages"]) bots_manager = BotsManager(broker_host=BROKER_HOST, broker_port=BROKER_PORT, broker_username=BROKER_USERNAME, broker_password=BROKER_PASSWORD) -def update_active_bots(): - bots_manager.update_active_bots() - - @router.on_event("startup") async def startup_event(): - # Add the job to the scheduler - scheduler.add_job(update_active_bots, 'interval', seconds=10) + bots_manager.start_update_active_bots_loop() @router.on_event("shutdown") async def shutdown_event(): # Shutdown the scheduler on application exit - scheduler.shutdown() + bots_manager.stop_update_active_bots_loop() @router.get("/get-active-bots-status") diff --git a/services/accounts_service.py b/services/accounts_service.py index 7f0a4d6..be05f58 100644 --- a/services/accounts_service.py +++ b/services/accounts_service.py @@ -3,6 +3,7 @@ import logging from datetime import datetime, timedelta from decimal import Decimal +from typing import Optional from fastapi import HTTPException from hummingbot.client.config.client_config_map import ClientConfigMap @@ -40,6 +41,8 @@ def __init__(self, self.default_quote = default_quote self.history_file = account_history_file self.account_history_dump_interval = account_history_dump_interval_minutes + self._update_account_state_task: Optional[asyncio.Task] = None + self._dump_account_state_task: Optional[asyncio.Task] = None def get_accounts_state(self): return self.accounts_state @@ -52,8 +55,20 @@ def start_update_account_state_loop(self): Start the loop that updates the balances of all the accounts at a fixed interval. :return: """ - asyncio.create_task(self.update_account_state_loop()) - asyncio.create_task(self.dump_account_state_loop()) + self._update_account_state_task = asyncio.create_task(self.update_account_state_loop()) + self._dump_account_state_task = asyncio.create_task(self.dump_account_state_loop()) + + def stop_update_account_state_loop(self): + """ + Stop the loop that updates the balances of all the accounts at a fixed interval. + :return: + """ + if self._update_account_state_task: + self._update_account_state_task.cancel() + if self._dump_account_state_task: + self._dump_account_state_task.cancel() + self._update_account_state_task = None + self._dump_account_state_task = None async def update_account_state_loop(self): """ diff --git a/services/bots_orchestrator.py b/services/bots_orchestrator.py index ea777d9..24ae656 100644 --- a/services/bots_orchestrator.py +++ b/services/bots_orchestrator.py @@ -1,11 +1,11 @@ -import logging +import asyncio from collections import deque +from typing import Optional import docker from hbotrc import BotCommands from hbotrc.listener import BotListener from hbotrc.spec import TopicSpecs -from hummingbot.connector.connector_base import Decimal class HummingbotPerformanceListener(BotListener): @@ -58,6 +58,7 @@ def __init__(self, broker_host, broker_port, broker_username, broker_password): self.broker_password = broker_password self.docker_client = docker.from_env() self.active_bots = {} + self._update_bots_task: Optional[asyncio.Task] = None @staticmethod def hummingbot_containers_fiter(container): @@ -70,28 +71,38 @@ def get_active_containers(self): return [container.name for container in self.docker_client.containers.list() if container.status == 'running' and self.hummingbot_containers_fiter(container)] - def update_active_bots(self): - active_hbot_containers = self.get_active_containers() - # Remove bots that are no longer active - for bot in list(self.active_bots): - if bot not in active_hbot_containers: - del self.active_bots[bot] - - # Add new bots or update existing ones - for bot in active_hbot_containers: - if bot not in self.active_bots: - hbot_listener = HummingbotPerformanceListener(host=self.broker_host, port=self.broker_port, - username=self.broker_username, - password=self.broker_password, - bot_id=bot) - hbot_listener.start() - self.active_bots[bot] = { - "bot_name": bot, - "broker_client": BotCommands(host=self.broker_host, port=self.broker_port, - username=self.broker_username, password=self.broker_password, - bot_id=bot), - "broker_listener": hbot_listener, - } + def start_update_active_bots_loop(self): + self._update_bots_task = asyncio.create_task(self.update_active_bots()) + + def stop_update_active_bots_loop(self): + if self._update_bots_task: + self._update_bots_task.cancel() + self._update_bots_task = None + + async def update_active_bots(self, sleep_time=1): + while True: + active_hbot_containers = self.get_active_containers() + # Remove bots that are no longer active + for bot in list(self.active_bots): + if bot not in active_hbot_containers: + del self.active_bots[bot] + + # Add new bots or update existing ones + for bot in active_hbot_containers: + if bot not in self.active_bots: + hbot_listener = HummingbotPerformanceListener(host=self.broker_host, port=self.broker_port, + username=self.broker_username, + password=self.broker_password, + bot_id=bot) + hbot_listener.start() + self.active_bots[bot] = { + "bot_name": bot, + "broker_client": BotCommands(host=self.broker_host, port=self.broker_port, + username=self.broker_username, password=self.broker_password, + bot_id=bot), + "broker_listener": hbot_listener, + } + await asyncio.sleep(sleep_time) # Interact with a specific bot def start_bot(self, bot_name, **kwargs): @@ -104,7 +115,6 @@ def stop_bot(self, bot_name, **kwargs): self.active_bots[bot_name]["broker_listener"].stop() return self.active_bots[bot_name]["broker_client"].stop(**kwargs) - def import_strategy_for_bot(self, bot_name, strategy, **kwargs): if bot_name in self.active_bots: return self.active_bots[bot_name]["broker_client"].import_strategy(strategy, **kwargs) @@ -131,7 +141,7 @@ def determine_controller_performance(controllers_performance): except Exception as e: cleaned_performance[controller] = { "status": "error", - "error": "Some metrics are not numeric, check logs and restart controller", + "error": f"Some metrics are not numeric, check logs and restart controller: {e}", } return cleaned_performance