From 03b481d7d16594026ffd062d65779b2f22ce3e5d Mon Sep 17 00:00:00 2001 From: drupman Date: Sat, 21 Sep 2024 17:16:24 -0300 Subject: [PATCH 01/24] (feat) add database endpoints --- bots/conf/controllers/.gitignore | 1 + main.py | 4 +- routers/manage_databases.py | 47 +++++ utils/etl_databases.py | 352 +++++++++++++++++++++++++++++++ utils/file_system.py | 22 ++ 5 files changed, 425 insertions(+), 1 deletion(-) create mode 100644 routers/manage_databases.py create mode 100644 utils/etl_databases.py diff --git a/bots/conf/controllers/.gitignore b/bots/conf/controllers/.gitignore index e69de29..f59ec20 100644 --- a/bots/conf/controllers/.gitignore +++ b/bots/conf/controllers/.gitignore @@ -0,0 +1 @@ +* \ No newline at end of file diff --git a/main.py b/main.py index 5e790e6..4d37db8 100644 --- a/main.py +++ b/main.py @@ -1,7 +1,7 @@ from dotenv import load_dotenv from fastapi import FastAPI -from routers import manage_accounts, manage_backtesting, manage_broker_messages, manage_docker, manage_files, manage_market_data +from routers import manage_accounts, manage_backtesting, manage_broker_messages, manage_docker, manage_files, manage_market_data, manage_databases load_dotenv() app = FastAPI() @@ -12,3 +12,5 @@ app.include_router(manage_market_data.router) app.include_router(manage_backtesting.router) app.include_router(manage_accounts.router) +# app.include_router(manage_performance.router) +app.include_router(manage_databases.router) diff --git a/routers/manage_databases.py b/routers/manage_databases.py new file mode 100644 index 0000000..175a656 --- /dev/null +++ b/routers/manage_databases.py @@ -0,0 +1,47 @@ +import json + +from typing import List, Dict, Any + +from utils.etl_databases import HummingbotDatabase +from fastapi import APIRouter + +from utils.file_system import FileSystemUtil + +router = APIRouter(tags=["Database Management"]) +file_system = FileSystemUtil() + + +@router.post("/list-databases", response_model=List[str]) +async def list_databases(): + return file_system.list_databases() + + +@router.post("/read-databases", response_model=List[Dict[str, Any]]) +async def read_databases(db_paths: List[str] = None): + dbs = [] + for db_path in db_paths: + db = HummingbotDatabase(db_path) + try: + db_content = { + "db_name": db.db_name, + "db_path": db.db_path, + "healthy": db.status["general_status"], + "status": db.status, + "tables": { + "order": json.dumps(db.get_orders().to_dict()), + "trade_fill": json.dumps(db.get_trade_fills().to_dict()), + "executor": json.dumps(db.get_executors_data().to_dict()), + "order_status": json.dumps(db.get_order_status().to_dict()) + } + } + except Exception as e: + print(f"Error reading database: {str(e)}") + db_content = { + "db_name": "", + "db_path": db_path, + "healthy": False, + "status": db.status, + "tables": {} + } + dbs.append(db_content) + return dbs diff --git a/utils/etl_databases.py b/utils/etl_databases.py new file mode 100644 index 0000000..21fbf2d --- /dev/null +++ b/utils/etl_databases.py @@ -0,0 +1,352 @@ +import os +import pandas as pd +import json +from typing import List + +from hummingbot.strategy_v2.models.executors_info import ExecutorInfo +from sqlalchemy import create_engine, insert, text, MetaData, Table, Column, VARCHAR, INT, FLOAT, Integer, String, Float +from sqlalchemy.orm import sessionmaker + + +class HummingbotDatabase: + def __init__(self, db_path: str): + self.db_name = os.path.basename(db_path) + self.db_path = db_path + self.db_path = f'sqlite:///{os.path.join(db_path)}' + self.engine = create_engine(self.db_path, connect_args={'check_same_thread': False}) + self.session_maker = sessionmaker(bind=self.engine) + + @staticmethod + def _get_table_status(table_loader): + try: + data = table_loader() + return "Correct" if len(data) > 0 else f"Error - No records matched" + except Exception as e: + return f"Error - {str(e)}" + + @property + def status(self): + trade_fill_status = self._get_table_status(self.get_trade_fills) + orders_status = self._get_table_status(self.get_orders) + order_status_status = self._get_table_status(self.get_order_status) + executors_status = self._get_table_status(self.get_executors_data) + general_status = all(status == "Correct" for status in + [trade_fill_status, orders_status, order_status_status, executors_status]) + status = {"db_name": self.db_name, + "db_path": self.db_path, + "trade_fill": trade_fill_status, + "orders": orders_status, + "order_status": order_status_status, + "executors": executors_status, + "general_status": general_status + } + return status + + def get_orders(self): + with self.session_maker() as session: + query = "SELECT * FROM 'Order'" + orders = pd.read_sql_query(text(query), session.connection()) + orders["market"] = orders["market"] + orders["amount"] = orders["amount"] / 1e6 + orders["price"] = orders["price"] / 1e6 + # orders['creation_timestamp'] = pd.to_datetime(orders['creation_timestamp'], unit="ms") + # orders['last_update_timestamp'] = pd.to_datetime(orders['last_update_timestamp'], unit="ms") + return orders + + def get_trade_fills(self): + groupers = ["config_file_path", "market", "symbol"] + float_cols = ["amount", "price", "trade_fee_in_quote"] + with self.session_maker() as session: + query = "SELECT * FROM TradeFill" + trade_fills = pd.read_sql_query(text(query), session.connection()) + trade_fills[float_cols] = trade_fills[float_cols] / 1e6 + trade_fills["cum_fees_in_quote"] = trade_fills.groupby(groupers)["trade_fee_in_quote"].cumsum() + trade_fills["trade_fee"] = trade_fills.groupby(groupers)["cum_fees_in_quote"].diff() + # trade_fills["timestamp"] = pd.to_datetime(trade_fills["timestamp"], unit="ms") + return trade_fills + + def get_order_status(self): + with self.session_maker() as session: + query = "SELECT * FROM OrderStatus" + order_status = pd.read_sql_query(text(query), session.connection()) + return order_status + + def get_executors_data(self) -> pd.DataFrame: + with self.session_maker() as session: + query = "SELECT * FROM Executors" + executors = pd.read_sql_query(text(query), session.connection()) + return executors + + +class ETLPerformance: + def __init__(self, + db_path: str): + self.db_path = f'sqlite:///{os.path.join(db_path)}' + self.engine = create_engine(self.db_path, connect_args={'check_same_thread': False}) + self.session_maker = sessionmaker(bind=self.engine) + self.metadata = MetaData() + + @property + def executors_table(self): + return Table('executors', + MetaData(), + Column('id', String), + Column('timestamp', Integer), + Column('type', String), + Column('close_type', Integer), + Column('close_timestamp', Integer), + Column('status', String), + Column('config', String), + Column('net_pnl_pct', Float), + Column('net_pnl_quote', Float), + Column('cum_fees_quote', Float), + Column('filled_amount_quote', Float), + Column('is_active', Integer), + Column('is_trading', Integer), + Column('custom_info', String), + Column('controller_id', String)) + + @property + def trade_fill_table(self): + return Table( + 'trades', MetaData(), + Column('config_file_path', VARCHAR(255)), + Column('strategy', VARCHAR(255)), + Column('market', VARCHAR(255)), + Column('symbol', VARCHAR(255)), + Column('base_asset', VARCHAR(255)), + Column('quote_asset', VARCHAR(255)), + Column('timestamp', INT), + Column('order_id', VARCHAR(255)), + Column('trade_type', VARCHAR(255)), + Column('order_type', VARCHAR(255)), + Column('price', FLOAT), + Column('amount', FLOAT), + Column('leverage', INT), + Column('trade_fee', VARCHAR(255)), + Column('trade_fee_in_quote', FLOAT), + Column('exchange_trade_id', VARCHAR(255)), + Column('position', VARCHAR(255)), + ) + + @property + def orders_table(self): + return Table( + 'orders', MetaData(), + Column('client_order_id', VARCHAR(255)), + Column('config_file_path', VARCHAR(255)), + Column('strategy', VARCHAR(255)), + Column('market', VARCHAR(255)), + Column('symbol', VARCHAR(255)), + Column('base_asset', VARCHAR(255)), + Column('quote_asset', VARCHAR(255)), + Column('creation_timestamp', INT), + Column('order_type', VARCHAR(255)), + Column('amount', FLOAT), + Column('leverage', INT), + Column('price', FLOAT), + Column('last_status', VARCHAR(255)), + Column('last_update_timestamp', INT), + Column('exchange_order_id', VARCHAR(255)), + Column('position', VARCHAR(255)), + ) + + @property + def tables(self): + return [self.executors_table, self.trade_fill_table, self.orders_table] + + def create_tables(self): + with self.engine.connect(): + for table in self.tables: + table.create(self.engine) + + def insert_data(self, data): + if "executors" in data: + self.insert_executors(data["executors"]) + if "trade_fill" in data: + self.insert_trade_fill(data["trade_fill"]) + if "orders" in data: + self.insert_orders(data["orders"]) + + def insert_executors(self, executors): + with self.engine.connect() as conn: + for _, row in executors.iterrows(): + ins = self.executors_table.insert().values( + id=row["id"], + timestamp=row["timestamp"], + type=row["type"], + close_type=row["close_type"], + close_timestamp=row["close_timestamp"], + status=row["status"], + config=row["config"], + net_pnl_pct=row["net_pnl_pct"], + net_pnl_quote=row["net_pnl_quote"], + cum_fees_quote=row["cum_fees_quote"], + filled_amount_quote=row["filled_amount_quote"], + is_active=row["is_active"], + is_trading=row["is_trading"], + custom_info=row["custom_info"], + controller_id=row["controller_id"]) + conn.execute(ins) + conn.commit() + + def insert_trade_fill(self, trade_fill): + with self.engine.connect() as conn: + for _, row in trade_fill.iterrows(): + ins = insert(self.trade_fill_table).values( + config_file_path=row["config_file_path"], + strategy=row["strategy"], + market=row["market"], + symbol=row["symbol"], + base_asset=row["base_asset"], + quote_asset=row["quote_asset"], + timestamp=row["timestamp"], + order_id=row["order_id"], + trade_type=row["trade_type"], + order_type=row["order_type"], + price=row["price"], + amount=row["amount"], + leverage=row["leverage"], + trade_fee=row["trade_fee"], + trade_fee_in_quote=row["trade_fee_in_quote"], + exchange_trade_id=row["exchange_trade_id"], + position=row["position"], + ) + conn.execute(ins) + conn.commit() + + def insert_orders(self, orders): + with self.engine.connect() as conn: + for _, row in orders.iterrows(): + ins = insert(self.orders_table).values( + client_order_id=row["id"], + config_file_path=row["config_file_path"], + strategy=row["strategy"], + market=row["market"], + symbol=row["symbol"], + base_asset=row["base_asset"], + quote_asset=row["quote_asset"], + creation_timestamp=row["creation_timestamp"], + order_type=row["order_type"], + amount=row["amount"], + leverage=row["leverage"], + price=row["price"], + last_status=row["last_status"], + last_update_timestamp=row["last_update_timestamp"], + exchange_order_id=row["exchange_order_id"], + position=row["position"], + ) + conn.execute(ins) + conn.commit() + + def load_executors(self): + with self.session_maker() as session: + query = "SELECT * FROM executors" + executors = pd.read_sql_query(text(query), session.connection()) + return executors + + def load_trade_fill(self): + with self.session_maker() as session: + query = "SELECT * FROM trades" + trade_fill = pd.read_sql_query(text(query), session.connection()) + return trade_fill + + def load_orders(self): + with self.session_maker() as session: + query = "SELECT * FROM orders" + orders = pd.read_sql_query(text(query), session.connection()) + return orders + + @staticmethod + def parse_executors(executors: pd.DataFrame) -> List[ExecutorInfo]: + executor_values = [] + for _, row in executors.iterrows(): + executor_values.append(ExecutorInfo( + id=row["id"], + timestamp=row["timestamp"], + type=row["type"], + close_timestamp=row["close_timestamp"], + close_type=row["close_type"], + status=row["status"], + config=json.loads(row["config"]), + net_pnl_pct=row["net_pnl_pct"], + net_pnl_quote=row["net_pnl_quote"], + cum_fees_quote=row["cum_fees_quote"], + filled_amount_quote=row["filled_amount_quote"], + is_active=row["is_active"], + is_trading=row["is_trading"], + custom_info=json.loads(row["custom_info"]), + controller_id=row["controller_id"], + side=row["side"], + )) + return executor_values + + @staticmethod + def dump_executors(executors: List[ExecutorInfo]) -> List[dict]: + executor_values = [] + for executor in executors: + executor_to_append = { + "id": executor.id, + "timestamp": executor.timestamp, + "type": executor.type, + "close_timestamp": executor.close_timestamp, + "close_type": executor.close_type.value, + "status": executor.status.value, + "config": executor.config.dict(), + "net_pnl_pct": executor.net_pnl_pct, + "net_pnl_quote": executor.net_pnl_quote, + "cum_fees_quote": executor.cum_fees_quote, + "filled_amount_quote": executor.filled_amount_quote, + "is_active": executor.is_active, + "is_trading": executor.is_trading, + "custom_info": json.dumps(executor.custom_info), + "controller_id": executor.controller_id, + "side": executor.side, + } + executor_to_append["config"]["mode"] = executor_to_append["config"]["mode"].value + executor_to_append["config"]["side"] = executor_to_append["config"]["side"].value + executor_values.append(executor_to_append) + return executor_values + + @staticmethod + def get_executors_with_orders(executors, orders): + df = pd.DataFrame(executors['custom_info'].tolist(), index=executors['id'], + columns=["custom_info"]).reset_index() + df["custom_info"] = df["custom_info"].apply(lambda x: json.loads(x)) + df["orders"] = df["custom_info"].apply(lambda x: x["order_ids"]) + df.rename(columns={"id": "executor_id"}, inplace=True) + exploded_df = df.explode("orders").rename(columns={"orders": "order_id"}) + exec_with_orders = exploded_df.merge(orders, left_on="order_id", right_on="client_order_id", how="inner") + exec_with_orders = exec_with_orders[exec_with_orders["last_status"].isin(["SellOrderCompleted", "BuyOrderCompleted"])] + return exec_with_orders[["executor_id", "order_id", "last_status", "last_update_timestamp", "price", "amount", "position"]] + + @staticmethod + def get_enum_by_value(enum_class, value): + for member in enum_class: + if member.value == value: + return member + raise ValueError(f"No enum member with value {value}") + + @staticmethod + def ensure_timestamp_in_seconds(timestamp: float) -> float: + """ + Ensure the given timestamp is in seconds. + Args: + - timestamp (int): The input timestamp which could be in seconds, milliseconds, or microseconds. + Returns: + - int: The timestamp in seconds. + Raises: + - ValueError: If the timestamp is not in a recognized format. + """ + timestamp_int = int(float(timestamp)) + if timestamp_int >= 1e18: # Nanoseconds + return timestamp_int / 1e9 + elif timestamp_int >= 1e15: # Microseconds + return timestamp_int / 1e6 + elif timestamp_int >= 1e12: # Milliseconds + return timestamp_int / 1e3 + elif timestamp_int >= 1e9: # Seconds + return timestamp_int + else: + raise ValueError( + "Timestamp is not in a recognized format. Must be in seconds, milliseconds, microseconds or nanoseconds.") \ No newline at end of file diff --git a/utils/file_system.py b/utils/file_system.py index d93bfec..0d549be 100644 --- a/utils/file_system.py +++ b/utils/file_system.py @@ -193,6 +193,7 @@ def ensure_file_and_dump_text(file_path, text): def get_connector_keys_path(account_name: str, connector_name: str) -> Path: return Path(f"bots/credentials/{account_name}/connectors/{connector_name}.yml") + @staticmethod def save_model_to_yml(yml_path: Path, cm: ClientConfigAdapter): try: cm_yml_str = cm.generate_yml_output_str_with_comments() @@ -200,3 +201,24 @@ def save_model_to_yml(yml_path: Path, cm: ClientConfigAdapter): outfile.write(cm_yml_str) except Exception as e: logging.error("Error writing configs: %s" % (str(e),), exc_info=True) + + def list_databases(self): + archived_path = os.path.join(self.base_path, "archived") + archived_instances = self.list_folders("archived") + archived_databases = [] + for archived_instance in archived_instances: + db_path = os.path.join(archived_path, archived_instance, "data") + archived_databases += [os.path.join(db_path, db_file) for db_file in os.listdir(db_path) + if db_file.endswith(".sqlite")] + return archived_databases + + def list_checkpoints(self, full_path: bool): + dir_path = os.path.join(self.base_path, "data") + if full_path: + checkpoints = [os.path.join(dir_path, f) for f in os.listdir(dir_path) if + os.path.isfile(os.path.join(dir_path, f)) + and f.startswith("checkpoint") and f.endswith(".sqlite")] + else: + checkpoints = [f for f in os.listdir(dir_path) if os.path.isfile(os.path.join(dir_path, f)) + and f.startswith("checkpoint") and f.endswith(".sqlite")] + return checkpoints From 14068e7a96f179c207b6788d3bb1bc0161df996d Mon Sep 17 00:00:00 2001 From: drupman Date: Sat, 21 Sep 2024 19:05:55 -0300 Subject: [PATCH 02/24] (feat) add checkpoint management endpoints + add controllers table to etl --- routers/manage_databases.py | 61 ++++++++++++++++++++++++++++++++++--- utils/etl_databases.py | 37 ++++++++++++++++++++-- 2 files changed, 91 insertions(+), 7 deletions(-) diff --git a/routers/manage_databases.py b/routers/manage_databases.py index 175a656..8c39c44 100644 --- a/routers/manage_databases.py +++ b/routers/manage_databases.py @@ -1,8 +1,11 @@ import json +import time from typing import List, Dict, Any -from utils.etl_databases import HummingbotDatabase +import pandas as pd + +from utils.etl_databases import HummingbotDatabase, ETLPerformance from fastapi import APIRouter from utils.file_system import FileSystemUtil @@ -28,14 +31,15 @@ async def read_databases(db_paths: List[str] = None): "healthy": db.status["general_status"], "status": db.status, "tables": { - "order": json.dumps(db.get_orders().to_dict()), + "orders": json.dumps(db.get_orders().to_dict()), "trade_fill": json.dumps(db.get_trade_fills().to_dict()), - "executor": json.dumps(db.get_executors_data().to_dict()), - "order_status": json.dumps(db.get_order_status().to_dict()) + "executors": json.dumps(db.get_executors_data().to_dict()), + "order_status": json.dumps(db.get_order_status().to_dict()), + "controllers": json.dumps(db.get_controllers_data().to_dict()) } } except Exception as e: - print(f"Error reading database: {str(e)}") + print(f"Error reading database {db_path}: {str(e)}") db_content = { "db_name": "", "db_path": db_path, @@ -45,3 +49,50 @@ async def read_databases(db_paths: List[str] = None): } dbs.append(db_content) return dbs + + +@router.post("/create-checkpoint", response_model=Dict[str, Any]) +async def create_checkpoint(db_paths: List[str]): + try: + dbs = await read_databases(db_paths) + + healthy_dbs = [db for db in dbs if db["healthy"]] + + table_names = ["trade_fill", "orders", "order_status", "executors", "controllers"] + tables_dict = {name: pd.DataFrame() for name in table_names} + + for db in healthy_dbs: + for table_name in table_names: + new_data = pd.DataFrame(json.loads(db["tables"][table_name])) + new_data["db_path"] = db["db_path"] + new_data["db_name"] = db["db_name"] + tables_dict[table_name] = pd.concat([tables_dict[table_name], new_data]) + + etl = ETLPerformance(db_path=f"bots/data/checkpoint_{str(int(time.time()))}.sqlite") + etl.create_tables() + etl.insert_data(tables_dict) + return {"message": "Checkpoint created successfully."} + except Exception as e: + return {"message": f"Error: {str(e)}"} + + +@router.post("/list-checkpoints", response_model=List[str]) +async def list_checkpoints(full_path: bool): + return file_system.list_checkpoints(full_path) + + +@router.post("/load-checkpoint") +async def load_checkpoint(checkpoint_path: str): + try: + etl = ETLPerformance(checkpoint_path) + executor = etl.load_executors() + order = etl.load_orders() + trade_fill = etl.load_trade_fill() + checkpoint_data = { + "executor": json.dumps(executor.to_dict()), + "order": json.dumps(order.to_dict()), + "trade_fill": json.dumps(trade_fill.to_dict()), + } + return checkpoint_data + except Exception as e: + return {"message": f"Error: {str(e)}"} diff --git a/utils/etl_databases.py b/utils/etl_databases.py index 21fbf2d..b2f7baf 100644 --- a/utils/etl_databases.py +++ b/utils/etl_databases.py @@ -30,8 +30,9 @@ def status(self): orders_status = self._get_table_status(self.get_orders) order_status_status = self._get_table_status(self.get_order_status) executors_status = self._get_table_status(self.get_executors_data) + controller_status = self._get_table_status(self.get_controllers_data) general_status = all(status == "Correct" for status in - [trade_fill_status, orders_status, order_status_status, executors_status]) + [trade_fill_status, orders_status, order_status_status, executors_status, controller_status]) status = {"db_name": self.db_name, "db_path": self.db_path, "trade_fill": trade_fill_status, @@ -77,6 +78,12 @@ def get_executors_data(self) -> pd.DataFrame: executors = pd.read_sql_query(text(query), session.connection()) return executors + def get_controllers_data(self) -> pd.DataFrame: + with self.session_maker() as session: + query = "SELECT * FROM Controllers" + controllers = pd.read_sql_query(text(query), session.connection()) + return controllers + class ETLPerformance: def __init__(self, @@ -151,9 +158,20 @@ def orders_table(self): Column('position', VARCHAR(255)), ) + @property + def controllers_table(self): + return Table( + 'controllers', MetaData(), + Column('id', VARCHAR(255)), + Column('controller_id', INT), + Column('timestamp', FLOAT), + Column('type', VARCHAR(255)), + Column('config', String), + ) + @property def tables(self): - return [self.executors_table, self.trade_fill_table, self.orders_table] + return [self.executors_table, self.trade_fill_table, self.orders_table, self.controllers_table] def create_tables(self): with self.engine.connect(): @@ -167,6 +185,8 @@ def insert_data(self, data): self.insert_trade_fill(data["trade_fill"]) if "orders" in data: self.insert_orders(data["orders"]) + if "controllers" in data: + self.insert_controllers(data["controllers"]) def insert_executors(self, executors): with self.engine.connect() as conn: @@ -239,6 +259,19 @@ def insert_orders(self, orders): conn.execute(ins) conn.commit() + def insert_controllers(self, controllers): + with self.engine.connect() as conn: + for _, row in controllers.iterrows(): + ins = insert(self.controllers_table).values( + id=row["id"], + controller_id=row["controller_id"], + timestamp=row["timestamp"], + type=row["type"], + config=row["config"], + ) + conn.execute(ins) + conn.commit() + def load_executors(self): with self.session_maker() as session: query = "SELECT * FROM executors" From ceb0f5b0349109b283d624b58504b7904207aece Mon Sep 17 00:00:00 2001 From: drupman Date: Sat, 21 Sep 2024 19:23:10 -0300 Subject: [PATCH 03/24] (feat) load controllers --- routers/manage_databases.py | 6 ++++-- utils/etl_databases.py | 6 ++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/routers/manage_databases.py b/routers/manage_databases.py index 8c39c44..6b91bd8 100644 --- a/routers/manage_databases.py +++ b/routers/manage_databases.py @@ -88,10 +88,12 @@ async def load_checkpoint(checkpoint_path: str): executor = etl.load_executors() order = etl.load_orders() trade_fill = etl.load_trade_fill() + controllers = etl.load_controllers() checkpoint_data = { - "executor": json.dumps(executor.to_dict()), - "order": json.dumps(order.to_dict()), + "executors": json.dumps(executor.to_dict()), + "orders": json.dumps(order.to_dict()), "trade_fill": json.dumps(trade_fill.to_dict()), + "controllers": json.dumps(controllers.to_dict()) } return checkpoint_data except Exception as e: diff --git a/utils/etl_databases.py b/utils/etl_databases.py index b2f7baf..fb6ab5b 100644 --- a/utils/etl_databases.py +++ b/utils/etl_databases.py @@ -290,6 +290,12 @@ def load_orders(self): orders = pd.read_sql_query(text(query), session.connection()) return orders + def load_controllers(self): + with self.session_maker() as session: + query = "SELECT * FROM controllers" + controllers = pd.read_sql_query(text(query), session.connection()) + return controllers + @staticmethod def parse_executors(executors: pd.DataFrame) -> List[ExecutorInfo]: executor_values = [] From 5fdbaba3ae71f054dcd0453b91f9c98cd15e8533 Mon Sep 17 00:00:00 2001 From: drupman Date: Sat, 21 Sep 2024 20:50:56 -0300 Subject: [PATCH 04/24] (feat) add performance endpoints --- main.py | 5 +++-- routers/manage_performance.py | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 2 deletions(-) create mode 100644 routers/manage_performance.py diff --git a/main.py b/main.py index 4d37db8..9494502 100644 --- a/main.py +++ b/main.py @@ -1,7 +1,8 @@ from dotenv import load_dotenv from fastapi import FastAPI -from routers import manage_accounts, manage_backtesting, manage_broker_messages, manage_docker, manage_files, manage_market_data, manage_databases +from routers import manage_accounts, manage_backtesting, manage_broker_messages, manage_docker, manage_files, \ + manage_market_data, manage_databases, manage_performance load_dotenv() app = FastAPI() @@ -12,5 +13,5 @@ app.include_router(manage_market_data.router) app.include_router(manage_backtesting.router) app.include_router(manage_accounts.router) -# app.include_router(manage_performance.router) +app.include_router(manage_performance.router) app.include_router(manage_databases.router) diff --git a/routers/manage_performance.py b/routers/manage_performance.py new file mode 100644 index 0000000..1ec97fd --- /dev/null +++ b/routers/manage_performance.py @@ -0,0 +1,34 @@ +from decimal import Decimal +from typing import List + +import json +from hummingbot.strategy_v2.models.executors_info import ExecutorInfo +from hummingbot.strategy_v2.backtesting.backtesting_engine_base import BacktestingEngineBase + +from routers.manage_backtesting import BACKTESTING_ENGINES + +from fastapi import APIRouter + +router = APIRouter(tags=["Market Performance"]) + + +@router.post("/get-performance-results") +async def get_performance_results(executors_and_controller_type: dict): + executors = executors_and_controller_type.get("executors") + controller_type = executors_and_controller_type.get("controller_type") + performance_results = {} + try: + for executor in executors: + if isinstance(executor["custom_info"], str): + executor["custom_info"] = json.loads(executor["custom_info"]) + parsed_executors = [ExecutorInfo(**executor) for executor in executors] + backtesting_engine = BACKTESTING_ENGINES[controller_type] + performance_results["results"] = backtesting_engine.summarize_results(parsed_executors) + results = performance_results["results"] + results["sharpe_ratio"] = results["sharpe_ratio"] if results["sharpe_ratio"] is not None else 0 + return { + "results": performance_results["results"], + } + + except Exception as e: + return {"error": str(e)} From 503c86e5a98d665baed9f5e27e7273c959b20636 Mon Sep 17 00:00:00 2001 From: drupman Date: Mon, 30 Sep 2024 12:45:23 -0300 Subject: [PATCH 05/24] (feat) improve etl performance + remove controller type from performance endpoint --- routers/manage_performance.py | 22 +++---- utils/etl_databases.py | 115 ++++++++++++++++------------------ 2 files changed, 64 insertions(+), 73 deletions(-) diff --git a/routers/manage_performance.py b/routers/manage_performance.py index 1ec97fd..92d1e34 100644 --- a/routers/manage_performance.py +++ b/routers/manage_performance.py @@ -1,32 +1,28 @@ -from decimal import Decimal -from typing import List +from typing import List, Any, Dict -import json -from hummingbot.strategy_v2.models.executors_info import ExecutorInfo from hummingbot.strategy_v2.backtesting.backtesting_engine_base import BacktestingEngineBase from routers.manage_backtesting import BACKTESTING_ENGINES from fastapi import APIRouter +from utils.etl_databases import PerformanceDataSource + router = APIRouter(tags=["Market Performance"]) @router.post("/get-performance-results") -async def get_performance_results(executors_and_controller_type: dict): - executors = executors_and_controller_type.get("executors") - controller_type = executors_and_controller_type.get("controller_type") +async def get_performance_results(payload: Dict[str, Any]): + executors = payload.get("executors") + data_source = PerformanceDataSource(executors) performance_results = {} try: - for executor in executors: - if isinstance(executor["custom_info"], str): - executor["custom_info"] = json.loads(executor["custom_info"]) - parsed_executors = [ExecutorInfo(**executor) for executor in executors] - backtesting_engine = BACKTESTING_ENGINES[controller_type] - performance_results["results"] = backtesting_engine.summarize_results(parsed_executors) + backtesting_engine = BacktestingEngineBase() + performance_results["results"] = backtesting_engine.summarize_results(data_source.executor_info_list) results = performance_results["results"] results["sharpe_ratio"] = results["sharpe_ratio"] if results["sharpe_ratio"] is not None else 0 return { + "executors": executors, "results": performance_results["results"], } diff --git a/utils/etl_databases.py b/utils/etl_databases.py index fb6ab5b..1d1a862 100644 --- a/utils/etl_databases.py +++ b/utils/etl_databases.py @@ -1,8 +1,11 @@ import os import pandas as pd import json -from typing import List +from typing import List, Dict, Any +from hummingbot.connector.connector_base import TradeType +from hummingbot.strategy_v2.models.base import RunnableStatus +from hummingbot.strategy_v2.models.executors import CloseType from hummingbot.strategy_v2.models.executors_info import ExecutorInfo from sqlalchemy import create_engine, insert, text, MetaData, Table, Column, VARCHAR, INT, FLOAT, Integer, String, Float from sqlalchemy.orm import sessionmaker @@ -276,7 +279,7 @@ def load_executors(self): with self.session_maker() as session: query = "SELECT * FROM executors" executors = pd.read_sql_query(text(query), session.connection()) - return executors + return executors def load_trade_fill(self): with self.session_maker() as session: @@ -296,68 +299,60 @@ def load_controllers(self): controllers = pd.read_sql_query(text(query), session.connection()) return controllers - @staticmethod - def parse_executors(executors: pd.DataFrame) -> List[ExecutorInfo]: - executor_values = [] - for _, row in executors.iterrows(): - executor_values.append(ExecutorInfo( - id=row["id"], - timestamp=row["timestamp"], - type=row["type"], - close_timestamp=row["close_timestamp"], - close_type=row["close_type"], - status=row["status"], - config=json.loads(row["config"]), - net_pnl_pct=row["net_pnl_pct"], - net_pnl_quote=row["net_pnl_quote"], - cum_fees_quote=row["cum_fees_quote"], - filled_amount_quote=row["filled_amount_quote"], - is_active=row["is_active"], - is_trading=row["is_trading"], - custom_info=json.loads(row["custom_info"]), - controller_id=row["controller_id"], - side=row["side"], - )) - return executor_values - @staticmethod - def dump_executors(executors: List[ExecutorInfo]) -> List[dict]: - executor_values = [] - for executor in executors: - executor_to_append = { - "id": executor.id, - "timestamp": executor.timestamp, - "type": executor.type, - "close_timestamp": executor.close_timestamp, - "close_type": executor.close_type.value, - "status": executor.status.value, - "config": executor.config.dict(), - "net_pnl_pct": executor.net_pnl_pct, - "net_pnl_quote": executor.net_pnl_quote, - "cum_fees_quote": executor.cum_fees_quote, - "filled_amount_quote": executor.filled_amount_quote, - "is_active": executor.is_active, - "is_trading": executor.is_trading, - "custom_info": json.dumps(executor.custom_info), - "controller_id": executor.controller_id, - "side": executor.side, - } - executor_to_append["config"]["mode"] = executor_to_append["config"]["mode"].value - executor_to_append["config"]["side"] = executor_to_append["config"]["side"].value - executor_values.append(executor_to_append) +class PerformanceDataSource: + def __init__(self, executors_dict: Dict[str, Any]): + self.executors_dict = executors_dict + + @property + def executors_df(self): + executors = pd.DataFrame(self.executors_dict) + executors["custom_info"] = executors["custom_info"].apply( + lambda x: json.loads(x) if isinstance(x, str) else x) + executors["config"] = executors["config"].apply(lambda x: json.loads(x) if isinstance(x, str) else x) + executors["timestamp"] = executors["timestamp"].apply(lambda x: self.ensure_timestamp_in_seconds(x)) + executors["close_timestamp"] = executors["close_timestamp"].apply( + lambda x: self.ensure_timestamp_in_seconds(x)) + executors["trading_pair"] = executors["config"].apply(lambda x: x["trading_pair"]) + executors["exchange"] = executors["config"].apply(lambda x: x["connector_name"]) + executors["level_id"] = executors["config"].apply( + lambda x: x.get("level_id") if x.get("level_id") is not None else 0) + executors["bep"] = executors["custom_info"].apply(lambda x: x["current_position_average_price"]) + executors["order_ids"] = executors["custom_info"].apply(lambda x: x.get("order_ids")) + executors["close_price"] = executors["custom_info"].apply(lambda x: x["close_price"]) + executors["sl"] = executors["config"].apply(lambda x: x["stop_loss"]).fillna(0) + executors["tp"] = executors["config"].apply(lambda x: x["take_profit"]).fillna(0) + executors["tl"] = executors["config"].apply(lambda x: x["time_limit"]).fillna(0) + return executors + + @property + def executor_info_list(self) -> List[ExecutorInfo]: + executors = self.apply_special_data_types(self.executors_df) + required_columns = [ + "id", "timestamp", "type", "close_timestamp", "close_type", "status", + "net_pnl_pct", "net_pnl_quote", "cum_fees_quote", "filled_amount_quote", + "is_active", "is_trading", "controller_id", "side", "config", "custom_info" + ] + filtered_df = executors[required_columns] + executor_values = filtered_df.apply(lambda row: ExecutorInfo(**row.to_dict()), axis=1).tolist() return executor_values + def apply_special_data_types(self, executors): + executors["status"] = executors["status"].apply(lambda x: self.get_enum_by_value(RunnableStatus, int(x))) + executors["side"] = executors["config"].apply(lambda x: self.get_enum_by_value(TradeType, int(x["side"]))) + executors["close_type"] = executors["close_type"].apply(lambda x: self.get_enum_by_value(CloseType, int(x))) + executors["close_type_name"] = executors["close_type"].apply(lambda x: x.name) + executors["datetime"] = pd.to_datetime(executors.timestamp, unit="s") + executors["close_datetime"] = pd.to_datetime(executors["close_timestamp"], unit="s") + return executors + @staticmethod - def get_executors_with_orders(executors, orders): - df = pd.DataFrame(executors['custom_info'].tolist(), index=executors['id'], - columns=["custom_info"]).reset_index() - df["custom_info"] = df["custom_info"].apply(lambda x: json.loads(x)) - df["orders"] = df["custom_info"].apply(lambda x: x["order_ids"]) - df.rename(columns={"id": "executor_id"}, inplace=True) - exploded_df = df.explode("orders").rename(columns={"orders": "order_id"}) - exec_with_orders = exploded_df.merge(orders, left_on="order_id", right_on="client_order_id", how="inner") - exec_with_orders = exec_with_orders[exec_with_orders["last_status"].isin(["SellOrderCompleted", "BuyOrderCompleted"])] - return exec_with_orders[["executor_id", "order_id", "last_status", "last_update_timestamp", "price", "amount", "position"]] + def remove_special_data_types(executors): + executors["status"] = executors["status"].apply(lambda x: x.value) + executors["side"] = executors["side"].apply(lambda x: x.value) + executors["close_type"] = executors["close_type"].apply(lambda x: x.value) + executors.drop(columns=["close_type_name", "datetime", "close_datetime"], inplace=True) + return executors @staticmethod def get_enum_by_value(enum_class, value): From c1211705d98b7f82b08e40ce7561717080944577 Mon Sep 17 00:00:00 2001 From: drupman Date: Sat, 12 Oct 2024 10:28:19 -0300 Subject: [PATCH 06/24] (feat) replace unpacking method by dict definition for executor info list + general refactor --- routers/manage_performance.py | 10 ++++----- utils/etl_databases.py | 41 ++++++++++++++++++++--------------- 2 files changed, 27 insertions(+), 24 deletions(-) diff --git a/routers/manage_performance.py b/routers/manage_performance.py index 92d1e34..bbd7e01 100644 --- a/routers/manage_performance.py +++ b/routers/manage_performance.py @@ -1,11 +1,8 @@ -from typing import List, Any, Dict +from fastapi import APIRouter +from typing import Any, Dict from hummingbot.strategy_v2.backtesting.backtesting_engine_base import BacktestingEngineBase -from routers.manage_backtesting import BACKTESTING_ENGINES - -from fastapi import APIRouter - from utils.etl_databases import PerformanceDataSource router = APIRouter(tags=["Market Performance"]) @@ -18,7 +15,8 @@ async def get_performance_results(payload: Dict[str, Any]): performance_results = {} try: backtesting_engine = BacktestingEngineBase() - performance_results["results"] = backtesting_engine.summarize_results(data_source.executor_info_list) + executor_info_list = data_source.executor_info_list + performance_results["results"] = backtesting_engine.summarize_results(executor_info_list ) results = performance_results["results"] results["sharpe_ratio"] = results["sharpe_ratio"] if results["sharpe_ratio"] is not None else 0 return { diff --git a/utils/etl_databases.py b/utils/etl_databases.py index 1d1a862..f0b4719 100644 --- a/utils/etl_databases.py +++ b/utils/etl_databases.py @@ -3,7 +3,7 @@ import json from typing import List, Dict, Any -from hummingbot.connector.connector_base import TradeType +from hummingbot.core.data_type.common import TradeType from hummingbot.strategy_v2.models.base import RunnableStatus from hummingbot.strategy_v2.models.executors import CloseType from hummingbot.strategy_v2.models.executors_info import ExecutorInfo @@ -315,8 +315,7 @@ def executors_df(self): lambda x: self.ensure_timestamp_in_seconds(x)) executors["trading_pair"] = executors["config"].apply(lambda x: x["trading_pair"]) executors["exchange"] = executors["config"].apply(lambda x: x["connector_name"]) - executors["level_id"] = executors["config"].apply( - lambda x: x.get("level_id") if x.get("level_id") is not None else 0) + executors["level_id"] = executors["config"].apply(lambda x: x.get("level_id")) executors["bep"] = executors["custom_info"].apply(lambda x: x["current_position_average_price"]) executors["order_ids"] = executors["custom_info"].apply(lambda x: x.get("order_ids")) executors["close_price"] = executors["custom_info"].apply(lambda x: x["close_price"]) @@ -328,13 +327,27 @@ def executors_df(self): @property def executor_info_list(self) -> List[ExecutorInfo]: executors = self.apply_special_data_types(self.executors_df) - required_columns = [ - "id", "timestamp", "type", "close_timestamp", "close_type", "status", - "net_pnl_pct", "net_pnl_quote", "cum_fees_quote", "filled_amount_quote", - "is_active", "is_trading", "controller_id", "side", "config", "custom_info" - ] - filtered_df = executors[required_columns] - executor_values = filtered_df.apply(lambda row: ExecutorInfo(**row.to_dict()), axis=1).tolist() + executor_values = [] + for index, row in executors.iterrows(): + executor_to_append = ExecutorInfo( + id=row["id"], + timestamp=row["timestamp"], + type=row["type"], + close_timestamp=row["close_timestamp"], + close_type=row["close_type"], + status=row["status"], + config=row["config"], + net_pnl_pct=row["net_pnl_pct"], + net_pnl_quote=row["net_pnl_quote"], + cum_fees_quote=row["cum_fees_quote"], + filled_amount_quote=row["filled_amount_quote"], + is_active=row["is_active"], + is_trading=row["is_trading"], + custom_info=row["custom_info"], + controller_id=row["controller_id"] + ) + executor_to_append.custom_info["side"] = row["side"] + executor_values.append(executor_to_append) return executor_values def apply_special_data_types(self, executors): @@ -346,14 +359,6 @@ def apply_special_data_types(self, executors): executors["close_datetime"] = pd.to_datetime(executors["close_timestamp"], unit="s") return executors - @staticmethod - def remove_special_data_types(executors): - executors["status"] = executors["status"].apply(lambda x: x.value) - executors["side"] = executors["side"].apply(lambda x: x.value) - executors["close_type"] = executors["close_type"].apply(lambda x: x.value) - executors.drop(columns=["close_type_name", "datetime", "close_datetime"], inplace=True) - return executors - @staticmethod def get_enum_by_value(enum_class, value): for member in enum_class: From 8e15d7be92ba3cb0f5b2c28123155144a0704137 Mon Sep 17 00:00:00 2001 From: david-hummingbot <85695272+david-hummingbot@users.noreply.github.com> Date: Mon, 22 Jul 2024 12:04:21 +0800 Subject: [PATCH 07/24] Update Makefile --- Makefile | 34 ++++++++++++++++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 07226d1..7e9e1e1 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,26 @@ .ONESHELL: +.SHELLFLAGS := -c + .PHONY: run .PHONY: uninstall .PHONY: install +.PHONY: install-pre-commit +.PHONY: docker_build +.PHONY: docker_run + + +detect_conda_bin := $(shell bash -c 'if [ "${CONDA_EXE} " == " " ]; then \ + CONDA_EXE=$$((find /opt/conda/bin/conda || find ~/anaconda3/bin/conda || \ + find /usr/local/anaconda3/bin/conda || find ~/miniconda3/bin/conda || \ + find /root/miniconda/bin/conda || find ~/Anaconda3/Scripts/conda || \ + find $$CONDA/bin/conda) 2>/dev/null); fi; \ + if [ "${CONDA_EXE}_" == "_" ]; then \ + echo "Please install Anaconda w/ Python 3.10+ first"; \ + echo "See: https://www.anaconda.com/distribution/"; \ + exit 1; fi; \ + echo $$(dirname $${CONDA_EXE})') + +CONDA_BIN := $(detect_conda_bin) run: uvicorn main:app --reload @@ -10,10 +29,21 @@ uninstall: conda env remove -n backend-api install: - conda env create -f environment.yml + if conda env list | grep -q '^backend-api '; then \ + echo "Environment already exists."; \ + else \ + conda env create -f environment_conda.yml; \ + fi + $(MAKE) install-pre-commit + +install-pre-commit: + /bin/bash -c 'source "${CONDA_BIN}/activate" backend-api && \ + if ! conda list pre-commit | grep pre-commit &> /dev/null; then \ + pip install pre-commit; \ + fi && pre-commit install' docker_build: docker build -t hummingbot/backend-api:latest . docker_run: - docker compose up -d \ No newline at end of file + docker compose up -d From 1ab63e720ffe464cb2132651dfe0446d8879f70c Mon Sep 17 00:00:00 2001 From: david-hummingbot <85695272+david-hummingbot@users.noreply.github.com> Date: Mon, 14 Oct 2024 16:15:03 +0800 Subject: [PATCH 08/24] fix-conda-env --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 7e9e1e1..b240cd2 100644 --- a/Makefile +++ b/Makefile @@ -32,7 +32,7 @@ install: if conda env list | grep -q '^backend-api '; then \ echo "Environment already exists."; \ else \ - conda env create -f environment_conda.yml; \ + conda env create -f environment.yml; \ fi $(MAKE) install-pre-commit From 90b7366a67fb14bd46a112026e1052288779da8a Mon Sep 17 00:00:00 2001 From: david-hummingbot <85695272+david-hummingbot@users.noreply.github.com> Date: Fri, 25 Oct 2024 18:03:04 +0800 Subject: [PATCH 09/24] update python-multipart version --- environment.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/environment.yml b/environment.yml index a9f5e0e..69b97ef 100644 --- a/environment.yml +++ b/environment.yml @@ -14,7 +14,7 @@ dependencies: - git+https://github.com/felixfontein/docker-py - python-dotenv - boto3 - - python-multipart + - python-multipart==0.0.12 - PyYAML - git+https://github.com/hummingbot/hbot-remote-client-py.git - flake8 From b286bc1432c286e1e94b6c86cf4257f81375ac27 Mon Sep 17 00:00:00 2001 From: cardosofede Date: Sun, 13 Oct 2024 18:37:27 -0300 Subject: [PATCH 10/24] (feat) add logfire dependency --- environment.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/environment.yml b/environment.yml index 69b97ef..3881426 100644 --- a/environment.yml +++ b/environment.yml @@ -20,3 +20,4 @@ dependencies: - flake8 - isort - pre-commit + - logfire[fastapi] From 2d9a462358d8ef7951d83c247139e9522c267797 Mon Sep 17 00:00:00 2001 From: cardosofede Date: Sun, 13 Oct 2024 18:37:44 -0300 Subject: [PATCH 11/24] (feat) add logfire config --- main.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/main.py b/main.py index 9494502..92815ec 100644 --- a/main.py +++ b/main.py @@ -1,3 +1,4 @@ +import logfire from dotenv import load_dotenv from fastapi import FastAPI @@ -5,7 +6,10 @@ manage_market_data, manage_databases, manage_performance load_dotenv() + app = FastAPI() +logfire.configure() +logfire.instrument_fastapi(app) app.include_router(manage_docker.router) app.include_router(manage_broker_messages.router) From 2d7b6d04a32b1733b2a842b59bd49a16a83bf295 Mon Sep 17 00:00:00 2001 From: cardosofede Date: Sun, 13 Oct 2024 18:38:00 -0300 Subject: [PATCH 12/24] (feat) add connector name to the logs --- services/accounts_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/accounts_service.py b/services/accounts_service.py index be05f58..ae0f9c2 100644 --- a/services/accounts_service.py +++ b/services/accounts_service.py @@ -259,7 +259,7 @@ async def _safe_get_last_traded_prices(self, connector, trading_pairs, timeout=5 logging.error(f"Timeout getting last traded prices for trading pairs {trading_pairs}") return {pair: Decimal("0") for pair in trading_pairs} except Exception as e: - logging.error(f"Error getting last traded prices for trading pairs {trading_pairs}: {e}") + logging.error(f"Error getting last traded prices in connector {connector} for trading pairs {trading_pairs}: {e}") return {pair: Decimal("0") for pair in trading_pairs} @staticmethod From 790d30036fe1f289170e07bda8ef16ba725291dc Mon Sep 17 00:00:00 2001 From: cardosofede Date: Tue, 29 Oct 2024 18:26:43 -0300 Subject: [PATCH 13/24] (feat) add user and pass for the api --- set_environment.sh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/set_environment.sh b/set_environment.sh index ee59d61..a38ba9a 100644 --- a/set_environment.sh +++ b/set_environment.sh @@ -6,8 +6,12 @@ By default, the current working directory will be used as the BOTS_PATH and the # Asking for CONFIG_PASSWORD and BOTS_PATH CONFIG_PASSWORD=a +USERNAME=admin +PASSWORD=admin BOTS_PATH=$(pwd) # Write to .env file echo "CONFIG_PASSWORD=$CONFIG_PASSWORD" > .env echo "BOTS_PATH=$BOTS_PATH" >> .env +echo "USERNAME=$USER" >> .env +echo "PASSWORD=$PASSWORD" >> .env From f1341c52cdbcb7f9cef7721ca05ad433a593f7f0 Mon Sep 17 00:00:00 2001 From: cardosofede Date: Tue, 29 Oct 2024 18:33:06 -0300 Subject: [PATCH 14/24] (feat) add grid strike --- bots/controllers/generic/grid_strike.py | 214 ++++++++++++++++++++++++ 1 file changed, 214 insertions(+) create mode 100644 bots/controllers/generic/grid_strike.py diff --git a/bots/controllers/generic/grid_strike.py b/bots/controllers/generic/grid_strike.py new file mode 100644 index 0000000..902810f --- /dev/null +++ b/bots/controllers/generic/grid_strike.py @@ -0,0 +1,214 @@ +from decimal import Decimal +from typing import Dict, List, Optional, Set + +from hummingbot.client.config.config_data_types import ClientFieldData +from hummingbot.core.data_type.common import OrderType, PositionMode, PriceType, TradeType +from hummingbot.core.data_type.trade_fee import TokenAmount +from hummingbot.data_feed.candles_feed.data_types import CandlesConfig +from hummingbot.strategy_v2.controllers import ControllerBase, ControllerConfigBase +from hummingbot.strategy_v2.executors.position_executor.data_types import PositionExecutorConfig, TripleBarrierConfig +from hummingbot.strategy_v2.models.executor_actions import CreateExecutorAction, ExecutorAction, StopExecutorAction +from hummingbot.strategy_v2.models.executors_info import ExecutorInfo +from hummingbot.strategy_v2.utils.distributions import Distributions +from pydantic import BaseModel, Field + + +class GridRange(BaseModel): + id: str + start_price: Decimal + end_price: Decimal + total_amount_pct: Decimal + side: TradeType = TradeType.BUY + open_order_type: OrderType = OrderType.LIMIT_MAKER + take_profit_order_type: OrderType = OrderType.LIMIT + active: bool = True + + +class GridStrikeConfig(ControllerConfigBase): + """ + Configuration required to run the GridStrike strategy for one connector and trading pair. + """ + controller_name: str = "grid_strike" + candles_config: List[CandlesConfig] = [] + connector_name: str = "binance" + trading_pair: str = "BTC-USDT" + total_amount_quote: Decimal = Field(default=Decimal("1000"), client_data=ClientFieldData(is_updatable=True)) + grid_ranges: List[GridRange] = Field(default=[GridRange(id="R0", start_price=Decimal("40000"), + end_price=Decimal("60000"), total_amount_pct=Decimal("0.1"))], + client_data=ClientFieldData(is_updatable=True)) + position_mode: PositionMode = PositionMode.HEDGE + leverage: int = 1 + time_limit: Optional[int] = Field(default=60 * 60 * 24 * 2, client_data=ClientFieldData(is_updatable=True)) + activation_bounds: Decimal = Field(default=Decimal("0.01"), client_data=ClientFieldData(is_updatable=True)) + min_spread_between_orders: Optional[Decimal] = Field(default=None, + client_data=ClientFieldData(is_updatable=True)) + min_order_amount: Optional[Decimal] = Field(default=Decimal("1"), + client_data=ClientFieldData(is_updatable=True)) + max_open_orders: int = Field(default=5, client_data=ClientFieldData(is_updatable=True)) + grid_range_update_interval: int = Field(default=60, client_data=ClientFieldData(is_updatable=True)) + extra_balance_base_usd: Decimal = Decimal("10") + + def update_markets(self, markets: Dict[str, Set[str]]) -> Dict[str, Set[str]]: + if self.connector_name not in markets: + markets[self.connector_name] = set() + markets[self.connector_name].add(self.trading_pair) + return markets + + +class GridLevel(BaseModel): + id: str + price: Decimal + amount: Decimal + step: Decimal + side: TradeType + open_order_type: OrderType + take_profit_order_type: OrderType + + +class GridStrike(ControllerBase): + def __init__(self, config: GridStrikeConfig, *args, **kwargs): + super().__init__(config, *args, **kwargs) + self.config = config + self.trading_rules = self.market_data_provider.get_trading_rules(self.config.connector_name, + self.config.trading_pair) + self._last_grid_levels_update = 0 + + def _calculate_grid_config(self): + grid_levels = [] + if self.config.min_spread_between_orders: + spread_between_orders = self.config.min_spread_between_orders * self.get_mid_price() + step_proposed = max(self.trading_rules.min_price_increment, spread_between_orders) + else: + step_proposed = self.trading_rules.min_price_increment + amount_proposed = max(self.trading_rules.min_notional_size, self.config.min_order_amount) if \ + self.config.min_order_amount else self.trading_rules.min_order_size + for grid_range in self.config.grid_ranges: + if grid_range.active: + total_amount = grid_range.total_amount_pct * self.config.total_amount_quote + theoretical_orders_by_step = (grid_range.end_price - grid_range.start_price) / step_proposed + theoretical_orders_by_amount = total_amount / amount_proposed + orders = int(min(theoretical_orders_by_step, theoretical_orders_by_amount)) + prices = Distributions.linear(orders, float(grid_range.start_price), float(grid_range.end_price)) + step = (grid_range.end_price - grid_range.start_price) / grid_range.end_price / orders + amount_quote = total_amount / orders + for i, price in enumerate(prices): + price_quantized = self.market_data_provider.quantize_order_price(self.config.connector_name, + self.config.trading_pair, price) + # amount_quantized = self.market_data_provider.quantize_order_amount(self.config.connector_name, + # self.config.trading_pair, amount_quote / self.get_mid_price()) + amount_quantized = amount_quote / self.get_mid_price() + grid_levels.append(GridLevel(id=f"{grid_range.id}_P{i}", + price=price_quantized, + amount=amount_quantized, + step=step, side=grid_range.side, + open_order_type=grid_range.open_order_type, + take_profit_order_type=grid_range.take_profit_order_type, + )) + return grid_levels + + def get_balance_requirements(self) -> List[TokenAmount]: + if "perpetual" in self.config.connector_name: + return [] + base_currency = self.config.trading_pair.split("-")[0] + return [TokenAmount(base_currency, self.config.extra_balance_base_usd / self.get_mid_price())] + + def get_mid_price(self) -> Decimal: + return self.market_data_provider.get_price_by_type( + self.config.connector_name, + self.config.trading_pair, + PriceType.MidPrice + ) + + def active_executors(self, is_trading: bool) -> List[ExecutorInfo]: + return [ + executor for executor in self.executors_info + if executor.is_active and executor.is_trading == is_trading + ] + + def determine_executor_actions(self) -> List[ExecutorAction]: + if self.market_data_provider.time() - self._last_grid_levels_update > 60: + self._last_grid_levels_update = self.market_data_provider.time() + self.grid_levels = self._calculate_grid_config() + return self.determine_create_executor_actions() + self.determine_stop_executor_actions() + + async def update_processed_data(self): + mid_price = self.get_mid_price() + self.processed_data.update({ + "mid_price": mid_price, + "active_executors_order_placed": self.active_executors(is_trading=False), + "active_executors_order_trading": self.active_executors(is_trading=True), + "long_activation_bounds": mid_price * (1 - self.config.activation_bounds), + "short_activation_bounds": mid_price * (1 + self.config.activation_bounds), + }) + + def determine_create_executor_actions(self) -> List[ExecutorAction]: + mid_price = self.processed_data["mid_price"] + long_activation_bounds = self.processed_data["long_activation_bounds"] + short_activation_bounds = self.processed_data["short_activation_bounds"] + levels_allowed = [] + for level in self.grid_levels: + if (level.side == TradeType.BUY and level.price >= long_activation_bounds) or \ + (level.side == TradeType.SELL and level.price <= short_activation_bounds): + levels_allowed.append(level) + active_executors = self.processed_data["active_executors_order_placed"] + \ + self.processed_data["active_executors_order_trading"] + active_executors_level_id = [executor.custom_info["level_id"] for executor in active_executors] + levels_allowed = sorted([level for level in levels_allowed if level.id not in active_executors_level_id], + key=lambda level: abs(level.price - mid_price)) + levels_allowed = levels_allowed[:self.config.max_open_orders] + create_actions = [] + for level in levels_allowed: + if level.side == TradeType.BUY and level.price > mid_price: + entry_price = mid_price + take_profit = max(level.step * 2, ((level.price - mid_price) / mid_price) + level.step) + trailing_stop = None + # trailing_stop_ap = max(level.step * 2, ((mid_price - level.price) / mid_price) + level.step) + # trailing_stop = TrailingStop(activation_price=trailing_stop_ap, trailing_delta=level.step / 2) + elif level.side == TradeType.SELL and level.price < mid_price: + entry_price = mid_price + take_profit = max(level.step * 2, ((mid_price - level.price) / mid_price) + level.step) + # trailing_stop_ap = max(level.step * 2, ((mid_price - level.price) / mid_price) + level.step) + # trailing_stop = TrailingStop(activation_price=trailing_stop_ap, trailing_delta=level.step / 2) + trailing_stop = None + else: + entry_price = level.price + take_profit = level.step + trailing_stop = None + create_actions.append(CreateExecutorAction(controller_id=self.config.id, + executor_config=PositionExecutorConfig( + timestamp=self.market_data_provider.time(), + connector_name=self.config.connector_name, + trading_pair=self.config.trading_pair, + entry_price=entry_price, + amount=level.amount, + leverage=self.config.leverage, + side=level.side, + level_id=level.id, + activation_bounds=[self.config.activation_bounds, + self.config.activation_bounds], + triple_barrier_config=TripleBarrierConfig( + take_profit=take_profit, + time_limit=self.config.time_limit, + open_order_type=OrderType.LIMIT_MAKER, + take_profit_order_type=level.take_profit_order_type, + trailing_stop=trailing_stop, + )))) + return create_actions + + def determine_stop_executor_actions(self) -> List[ExecutorAction]: + long_activation_bounds = self.processed_data["long_activation_bounds"] + short_activation_bounds = self.processed_data["short_activation_bounds"] + active_executors_order_placed = self.processed_data["active_executors_order_placed"] + non_active_ranges = [grid_range.id for grid_range in self.config.grid_ranges if not grid_range.active] + active_executor_of_non_active_ranges = [executor.id for executor in self.executors_info if + executor.is_active and + executor.custom_info["level_id"].split("_")[0] in non_active_ranges] + long_executors_to_stop = [executor.id for executor in active_executors_order_placed if + executor.side == TradeType.BUY and + executor.config.entry_price <= long_activation_bounds] + short_executors_to_stop = [executor.id for executor in active_executors_order_placed if + executor.side == TradeType.SELL and + executor.config.entry_price >= short_activation_bounds] + executors_id_to_stop = set(active_executor_of_non_active_ranges + long_executors_to_stop + short_executors_to_stop) + return [StopExecutorAction(controller_id=self.config.id, executor_id=executor) for executor in + list(executors_id_to_stop)] From e98e77ce0a908419c7c03ea3a10a5218d851ba31 Mon Sep 17 00:00:00 2001 From: cardosofede Date: Tue, 29 Oct 2024 18:33:14 -0300 Subject: [PATCH 15/24] (feat) update environment --- environment.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/environment.yml b/environment.yml index 3881426..69b97ef 100644 --- a/environment.yml +++ b/environment.yml @@ -20,4 +20,3 @@ dependencies: - flake8 - isort - pre-commit - - logfire[fastapi] From 197dac763c18b0daac02edcc28ece75d15abe9e4 Mon Sep 17 00:00:00 2001 From: cardosofede Date: Tue, 29 Oct 2024 18:33:25 -0300 Subject: [PATCH 16/24] (feat) add security --- main.py | 55 ++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 40 insertions(+), 15 deletions(-) diff --git a/main.py b/main.py index 92815ec..dfde0ee 100644 --- a/main.py +++ b/main.py @@ -1,21 +1,46 @@ -import logfire +import os +import secrets +from typing import Annotated + from dotenv import load_dotenv -from fastapi import FastAPI +from fastapi import Depends, FastAPI, HTTPException, status +from fastapi.security import HTTPBasic, HTTPBasicCredentials -from routers import manage_accounts, manage_backtesting, manage_broker_messages, manage_docker, manage_files, \ - manage_market_data, manage_databases, manage_performance +from routers import manage_accounts, manage_backtesting, manage_broker_messages, manage_docker, manage_files, manage_market_data load_dotenv() +security = HTTPBasic() + +username = os.getenv("USERNAME", "admin") +password = os.getenv("PASSWORD", "admin") app = FastAPI() -logfire.configure() -logfire.instrument_fastapi(app) - -app.include_router(manage_docker.router) -app.include_router(manage_broker_messages.router) -app.include_router(manage_files.router) -app.include_router(manage_market_data.router) -app.include_router(manage_backtesting.router) -app.include_router(manage_accounts.router) -app.include_router(manage_performance.router) -app.include_router(manage_databases.router) + + +def auth_user( + credentials: Annotated[HTTPBasicCredentials, Depends(security)], +): + current_username_bytes = credentials.username.encode("utf8") + correct_username_bytes = f"{username}".encode("utf8") + is_correct_username = secrets.compare_digest( + current_username_bytes, correct_username_bytes + ) + current_password_bytes = credentials.password.encode("utf8") + correct_password_bytes = f"{password}".encode("utf8") + is_correct_password = secrets.compare_digest( + current_password_bytes, correct_password_bytes + ) + if not (is_correct_username and is_correct_password): + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Incorrect username or password", + headers={"WWW-Authenticate": "Basic"}, + ) + + +app.include_router(manage_docker.router, dependencies=[Depends(auth_user)]) +app.include_router(manage_broker_messages.router, dependencies=[Depends(auth_user)]) +app.include_router(manage_files.router, dependencies=[Depends(auth_user)]) +app.include_router(manage_market_data.router, dependencies=[Depends(auth_user)]) +app.include_router(manage_backtesting.router, dependencies=[Depends(auth_user)]) +app.include_router(manage_accounts.router, dependencies=[Depends(auth_user)]) From 353ee91e2131dd78edfd966af09fdfd8c07dee38 Mon Sep 17 00:00:00 2001 From: cardosofede Date: Thu, 31 Oct 2024 11:36:08 -0300 Subject: [PATCH 17/24] (fix) typo --- set_environment.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/set_environment.sh b/set_environment.sh index a38ba9a..1850198 100644 --- a/set_environment.sh +++ b/set_environment.sh @@ -13,5 +13,5 @@ BOTS_PATH=$(pwd) # Write to .env file echo "CONFIG_PASSWORD=$CONFIG_PASSWORD" > .env echo "BOTS_PATH=$BOTS_PATH" >> .env -echo "USERNAME=$USER" >> .env +echo "USERNAME=$USERNAME" >> .env echo "PASSWORD=$PASSWORD" >> .env From 6364add9229af2e5dfb9e51c9dccda3ec7c6887c Mon Sep 17 00:00:00 2001 From: drupman Date: Sat, 21 Sep 2024 17:16:24 -0300 Subject: [PATCH 18/24] (feat) add database endpoints --- main.py | 3 +- routers/manage_databases.py | 63 ++-------------- utils/etl_databases.py | 139 +++++++++++++----------------------- 3 files changed, 57 insertions(+), 148 deletions(-) diff --git a/main.py b/main.py index dfde0ee..d707375 100644 --- a/main.py +++ b/main.py @@ -6,7 +6,7 @@ from fastapi import Depends, FastAPI, HTTPException, status from fastapi.security import HTTPBasic, HTTPBasicCredentials -from routers import manage_accounts, manage_backtesting, manage_broker_messages, manage_docker, manage_files, manage_market_data +from routers import manage_accounts, manage_backtesting, manage_broker_messages, manage_docker, manage_files, manage_market_data, manage_databases load_dotenv() security = HTTPBasic() @@ -44,3 +44,4 @@ def auth_user( app.include_router(manage_market_data.router, dependencies=[Depends(auth_user)]) app.include_router(manage_backtesting.router, dependencies=[Depends(auth_user)]) app.include_router(manage_accounts.router, dependencies=[Depends(auth_user)]) +app.include_router(manage_databases.router, dependencies=[Depends(auth_user)]) diff --git a/routers/manage_databases.py b/routers/manage_databases.py index 6b91bd8..175a656 100644 --- a/routers/manage_databases.py +++ b/routers/manage_databases.py @@ -1,11 +1,8 @@ import json -import time from typing import List, Dict, Any -import pandas as pd - -from utils.etl_databases import HummingbotDatabase, ETLPerformance +from utils.etl_databases import HummingbotDatabase from fastapi import APIRouter from utils.file_system import FileSystemUtil @@ -31,15 +28,14 @@ async def read_databases(db_paths: List[str] = None): "healthy": db.status["general_status"], "status": db.status, "tables": { - "orders": json.dumps(db.get_orders().to_dict()), + "order": json.dumps(db.get_orders().to_dict()), "trade_fill": json.dumps(db.get_trade_fills().to_dict()), - "executors": json.dumps(db.get_executors_data().to_dict()), - "order_status": json.dumps(db.get_order_status().to_dict()), - "controllers": json.dumps(db.get_controllers_data().to_dict()) + "executor": json.dumps(db.get_executors_data().to_dict()), + "order_status": json.dumps(db.get_order_status().to_dict()) } } except Exception as e: - print(f"Error reading database {db_path}: {str(e)}") + print(f"Error reading database: {str(e)}") db_content = { "db_name": "", "db_path": db_path, @@ -49,52 +45,3 @@ async def read_databases(db_paths: List[str] = None): } dbs.append(db_content) return dbs - - -@router.post("/create-checkpoint", response_model=Dict[str, Any]) -async def create_checkpoint(db_paths: List[str]): - try: - dbs = await read_databases(db_paths) - - healthy_dbs = [db for db in dbs if db["healthy"]] - - table_names = ["trade_fill", "orders", "order_status", "executors", "controllers"] - tables_dict = {name: pd.DataFrame() for name in table_names} - - for db in healthy_dbs: - for table_name in table_names: - new_data = pd.DataFrame(json.loads(db["tables"][table_name])) - new_data["db_path"] = db["db_path"] - new_data["db_name"] = db["db_name"] - tables_dict[table_name] = pd.concat([tables_dict[table_name], new_data]) - - etl = ETLPerformance(db_path=f"bots/data/checkpoint_{str(int(time.time()))}.sqlite") - etl.create_tables() - etl.insert_data(tables_dict) - return {"message": "Checkpoint created successfully."} - except Exception as e: - return {"message": f"Error: {str(e)}"} - - -@router.post("/list-checkpoints", response_model=List[str]) -async def list_checkpoints(full_path: bool): - return file_system.list_checkpoints(full_path) - - -@router.post("/load-checkpoint") -async def load_checkpoint(checkpoint_path: str): - try: - etl = ETLPerformance(checkpoint_path) - executor = etl.load_executors() - order = etl.load_orders() - trade_fill = etl.load_trade_fill() - controllers = etl.load_controllers() - checkpoint_data = { - "executors": json.dumps(executor.to_dict()), - "orders": json.dumps(order.to_dict()), - "trade_fill": json.dumps(trade_fill.to_dict()), - "controllers": json.dumps(controllers.to_dict()) - } - return checkpoint_data - except Exception as e: - return {"message": f"Error: {str(e)}"} diff --git a/utils/etl_databases.py b/utils/etl_databases.py index f0b4719..21fbf2d 100644 --- a/utils/etl_databases.py +++ b/utils/etl_databases.py @@ -1,11 +1,8 @@ import os import pandas as pd import json -from typing import List, Dict, Any +from typing import List -from hummingbot.core.data_type.common import TradeType -from hummingbot.strategy_v2.models.base import RunnableStatus -from hummingbot.strategy_v2.models.executors import CloseType from hummingbot.strategy_v2.models.executors_info import ExecutorInfo from sqlalchemy import create_engine, insert, text, MetaData, Table, Column, VARCHAR, INT, FLOAT, Integer, String, Float from sqlalchemy.orm import sessionmaker @@ -33,9 +30,8 @@ def status(self): orders_status = self._get_table_status(self.get_orders) order_status_status = self._get_table_status(self.get_order_status) executors_status = self._get_table_status(self.get_executors_data) - controller_status = self._get_table_status(self.get_controllers_data) general_status = all(status == "Correct" for status in - [trade_fill_status, orders_status, order_status_status, executors_status, controller_status]) + [trade_fill_status, orders_status, order_status_status, executors_status]) status = {"db_name": self.db_name, "db_path": self.db_path, "trade_fill": trade_fill_status, @@ -81,12 +77,6 @@ def get_executors_data(self) -> pd.DataFrame: executors = pd.read_sql_query(text(query), session.connection()) return executors - def get_controllers_data(self) -> pd.DataFrame: - with self.session_maker() as session: - query = "SELECT * FROM Controllers" - controllers = pd.read_sql_query(text(query), session.connection()) - return controllers - class ETLPerformance: def __init__(self, @@ -161,20 +151,9 @@ def orders_table(self): Column('position', VARCHAR(255)), ) - @property - def controllers_table(self): - return Table( - 'controllers', MetaData(), - Column('id', VARCHAR(255)), - Column('controller_id', INT), - Column('timestamp', FLOAT), - Column('type', VARCHAR(255)), - Column('config', String), - ) - @property def tables(self): - return [self.executors_table, self.trade_fill_table, self.orders_table, self.controllers_table] + return [self.executors_table, self.trade_fill_table, self.orders_table] def create_tables(self): with self.engine.connect(): @@ -188,8 +167,6 @@ def insert_data(self, data): self.insert_trade_fill(data["trade_fill"]) if "orders" in data: self.insert_orders(data["orders"]) - if "controllers" in data: - self.insert_controllers(data["controllers"]) def insert_executors(self, executors): with self.engine.connect() as conn: @@ -262,24 +239,11 @@ def insert_orders(self, orders): conn.execute(ins) conn.commit() - def insert_controllers(self, controllers): - with self.engine.connect() as conn: - for _, row in controllers.iterrows(): - ins = insert(self.controllers_table).values( - id=row["id"], - controller_id=row["controller_id"], - timestamp=row["timestamp"], - type=row["type"], - config=row["config"], - ) - conn.execute(ins) - conn.commit() - def load_executors(self): with self.session_maker() as session: query = "SELECT * FROM executors" executors = pd.read_sql_query(text(query), session.connection()) - return executors + return executors def load_trade_fill(self): with self.session_maker() as session: @@ -293,71 +257,68 @@ def load_orders(self): orders = pd.read_sql_query(text(query), session.connection()) return orders - def load_controllers(self): - with self.session_maker() as session: - query = "SELECT * FROM controllers" - controllers = pd.read_sql_query(text(query), session.connection()) - return controllers - - -class PerformanceDataSource: - def __init__(self, executors_dict: Dict[str, Any]): - self.executors_dict = executors_dict - - @property - def executors_df(self): - executors = pd.DataFrame(self.executors_dict) - executors["custom_info"] = executors["custom_info"].apply( - lambda x: json.loads(x) if isinstance(x, str) else x) - executors["config"] = executors["config"].apply(lambda x: json.loads(x) if isinstance(x, str) else x) - executors["timestamp"] = executors["timestamp"].apply(lambda x: self.ensure_timestamp_in_seconds(x)) - executors["close_timestamp"] = executors["close_timestamp"].apply( - lambda x: self.ensure_timestamp_in_seconds(x)) - executors["trading_pair"] = executors["config"].apply(lambda x: x["trading_pair"]) - executors["exchange"] = executors["config"].apply(lambda x: x["connector_name"]) - executors["level_id"] = executors["config"].apply(lambda x: x.get("level_id")) - executors["bep"] = executors["custom_info"].apply(lambda x: x["current_position_average_price"]) - executors["order_ids"] = executors["custom_info"].apply(lambda x: x.get("order_ids")) - executors["close_price"] = executors["custom_info"].apply(lambda x: x["close_price"]) - executors["sl"] = executors["config"].apply(lambda x: x["stop_loss"]).fillna(0) - executors["tp"] = executors["config"].apply(lambda x: x["take_profit"]).fillna(0) - executors["tl"] = executors["config"].apply(lambda x: x["time_limit"]).fillna(0) - return executors - - @property - def executor_info_list(self) -> List[ExecutorInfo]: - executors = self.apply_special_data_types(self.executors_df) + @staticmethod + def parse_executors(executors: pd.DataFrame) -> List[ExecutorInfo]: executor_values = [] - for index, row in executors.iterrows(): - executor_to_append = ExecutorInfo( + for _, row in executors.iterrows(): + executor_values.append(ExecutorInfo( id=row["id"], timestamp=row["timestamp"], type=row["type"], close_timestamp=row["close_timestamp"], close_type=row["close_type"], status=row["status"], - config=row["config"], + config=json.loads(row["config"]), net_pnl_pct=row["net_pnl_pct"], net_pnl_quote=row["net_pnl_quote"], cum_fees_quote=row["cum_fees_quote"], filled_amount_quote=row["filled_amount_quote"], is_active=row["is_active"], is_trading=row["is_trading"], - custom_info=row["custom_info"], - controller_id=row["controller_id"] - ) - executor_to_append.custom_info["side"] = row["side"] + custom_info=json.loads(row["custom_info"]), + controller_id=row["controller_id"], + side=row["side"], + )) + return executor_values + + @staticmethod + def dump_executors(executors: List[ExecutorInfo]) -> List[dict]: + executor_values = [] + for executor in executors: + executor_to_append = { + "id": executor.id, + "timestamp": executor.timestamp, + "type": executor.type, + "close_timestamp": executor.close_timestamp, + "close_type": executor.close_type.value, + "status": executor.status.value, + "config": executor.config.dict(), + "net_pnl_pct": executor.net_pnl_pct, + "net_pnl_quote": executor.net_pnl_quote, + "cum_fees_quote": executor.cum_fees_quote, + "filled_amount_quote": executor.filled_amount_quote, + "is_active": executor.is_active, + "is_trading": executor.is_trading, + "custom_info": json.dumps(executor.custom_info), + "controller_id": executor.controller_id, + "side": executor.side, + } + executor_to_append["config"]["mode"] = executor_to_append["config"]["mode"].value + executor_to_append["config"]["side"] = executor_to_append["config"]["side"].value executor_values.append(executor_to_append) return executor_values - def apply_special_data_types(self, executors): - executors["status"] = executors["status"].apply(lambda x: self.get_enum_by_value(RunnableStatus, int(x))) - executors["side"] = executors["config"].apply(lambda x: self.get_enum_by_value(TradeType, int(x["side"]))) - executors["close_type"] = executors["close_type"].apply(lambda x: self.get_enum_by_value(CloseType, int(x))) - executors["close_type_name"] = executors["close_type"].apply(lambda x: x.name) - executors["datetime"] = pd.to_datetime(executors.timestamp, unit="s") - executors["close_datetime"] = pd.to_datetime(executors["close_timestamp"], unit="s") - return executors + @staticmethod + def get_executors_with_orders(executors, orders): + df = pd.DataFrame(executors['custom_info'].tolist(), index=executors['id'], + columns=["custom_info"]).reset_index() + df["custom_info"] = df["custom_info"].apply(lambda x: json.loads(x)) + df["orders"] = df["custom_info"].apply(lambda x: x["order_ids"]) + df.rename(columns={"id": "executor_id"}, inplace=True) + exploded_df = df.explode("orders").rename(columns={"orders": "order_id"}) + exec_with_orders = exploded_df.merge(orders, left_on="order_id", right_on="client_order_id", how="inner") + exec_with_orders = exec_with_orders[exec_with_orders["last_status"].isin(["SellOrderCompleted", "BuyOrderCompleted"])] + return exec_with_orders[["executor_id", "order_id", "last_status", "last_update_timestamp", "price", "amount", "position"]] @staticmethod def get_enum_by_value(enum_class, value): From f232b1aa4d9fba65652fb868e31113c6585a16a3 Mon Sep 17 00:00:00 2001 From: drupman Date: Sat, 21 Sep 2024 20:50:56 -0300 Subject: [PATCH 19/24] (feat) add performance endpoints --- main.py | 11 ++++++++++- routers/manage_performance.py | 26 ++++++++++++++++---------- 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/main.py b/main.py index d707375..1bcfccb 100644 --- a/main.py +++ b/main.py @@ -6,7 +6,8 @@ from fastapi import Depends, FastAPI, HTTPException, status from fastapi.security import HTTPBasic, HTTPBasicCredentials -from routers import manage_accounts, manage_backtesting, manage_broker_messages, manage_docker, manage_files, manage_market_data, manage_databases +from routers import manage_accounts, manage_backtesting, manage_broker_messages, manage_docker, manage_files, \ + manage_market_data, manage_databases, manage_performance load_dotenv() security = HTTPBasic() @@ -16,6 +17,14 @@ app = FastAPI() +app.include_router(manage_docker.router) +app.include_router(manage_broker_messages.router) +app.include_router(manage_files.router) +app.include_router(manage_market_data.router) +app.include_router(manage_backtesting.router) +app.include_router(manage_accounts.router) +app.include_router(manage_performance.router) +app.include_router(manage_databases.router) def auth_user( credentials: Annotated[HTTPBasicCredentials, Depends(security)], diff --git a/routers/manage_performance.py b/routers/manage_performance.py index bbd7e01..1ec97fd 100644 --- a/routers/manage_performance.py +++ b/routers/manage_performance.py @@ -1,26 +1,32 @@ -from fastapi import APIRouter -from typing import Any, Dict +from decimal import Decimal +from typing import List +import json +from hummingbot.strategy_v2.models.executors_info import ExecutorInfo from hummingbot.strategy_v2.backtesting.backtesting_engine_base import BacktestingEngineBase -from utils.etl_databases import PerformanceDataSource +from routers.manage_backtesting import BACKTESTING_ENGINES + +from fastapi import APIRouter router = APIRouter(tags=["Market Performance"]) @router.post("/get-performance-results") -async def get_performance_results(payload: Dict[str, Any]): - executors = payload.get("executors") - data_source = PerformanceDataSource(executors) +async def get_performance_results(executors_and_controller_type: dict): + executors = executors_and_controller_type.get("executors") + controller_type = executors_and_controller_type.get("controller_type") performance_results = {} try: - backtesting_engine = BacktestingEngineBase() - executor_info_list = data_source.executor_info_list - performance_results["results"] = backtesting_engine.summarize_results(executor_info_list ) + for executor in executors: + if isinstance(executor["custom_info"], str): + executor["custom_info"] = json.loads(executor["custom_info"]) + parsed_executors = [ExecutorInfo(**executor) for executor in executors] + backtesting_engine = BACKTESTING_ENGINES[controller_type] + performance_results["results"] = backtesting_engine.summarize_results(parsed_executors) results = performance_results["results"] results["sharpe_ratio"] = results["sharpe_ratio"] if results["sharpe_ratio"] is not None else 0 return { - "executors": executors, "results": performance_results["results"], } From 86f405df0efe034f4e7c3d1a4922e8fb2ac806be Mon Sep 17 00:00:00 2001 From: drupman Date: Wed, 13 Nov 2024 11:05:43 -0300 Subject: [PATCH 20/24] (fix) fix main --- main.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/main.py b/main.py index 1bcfccb..3657ef3 100644 --- a/main.py +++ b/main.py @@ -17,14 +17,6 @@ app = FastAPI() -app.include_router(manage_docker.router) -app.include_router(manage_broker_messages.router) -app.include_router(manage_files.router) -app.include_router(manage_market_data.router) -app.include_router(manage_backtesting.router) -app.include_router(manage_accounts.router) -app.include_router(manage_performance.router) -app.include_router(manage_databases.router) def auth_user( credentials: Annotated[HTTPBasicCredentials, Depends(security)], @@ -54,3 +46,4 @@ def auth_user( app.include_router(manage_backtesting.router, dependencies=[Depends(auth_user)]) app.include_router(manage_accounts.router, dependencies=[Depends(auth_user)]) app.include_router(manage_databases.router, dependencies=[Depends(auth_user)]) +app.include_router(manage_performance.router, dependencies=[Depends(auth_user)]) From 9bf3bd7d6ca1d68dbe6d4bbcaaaf79f2b16f5e0d Mon Sep 17 00:00:00 2001 From: drupman Date: Wed, 13 Nov 2024 11:44:14 -0300 Subject: [PATCH 21/24] (fix) remove * from .gitignore --- bots/conf/controllers/.gitignore | 1 - 1 file changed, 1 deletion(-) diff --git a/bots/conf/controllers/.gitignore b/bots/conf/controllers/.gitignore index f59ec20..e69de29 100644 --- a/bots/conf/controllers/.gitignore +++ b/bots/conf/controllers/.gitignore @@ -1 +0,0 @@ -* \ No newline at end of file From 2dbaa1710b20c3339972a2400778ccf8efe87fa0 Mon Sep 17 00:00:00 2001 From: drupman Date: Thu, 14 Nov 2024 22:42:53 -0300 Subject: [PATCH 22/24] (feat) add lost lines from bad github management --- routers/manage_databases.py | 63 +++++++++++++-- routers/manage_performance.py | 28 +++---- utils/etl_databases.py | 139 ++++++++++++++++++++++------------ 3 files changed, 158 insertions(+), 72 deletions(-) diff --git a/routers/manage_databases.py b/routers/manage_databases.py index 175a656..ae90dce 100644 --- a/routers/manage_databases.py +++ b/routers/manage_databases.py @@ -1,8 +1,11 @@ import json +import time from typing import List, Dict, Any -from utils.etl_databases import HummingbotDatabase +import pandas as pd + +from utils.etl_databases import HummingbotDatabase, ETLPerformance from fastapi import APIRouter from utils.file_system import FileSystemUtil @@ -28,14 +31,15 @@ async def read_databases(db_paths: List[str] = None): "healthy": db.status["general_status"], "status": db.status, "tables": { - "order": json.dumps(db.get_orders().to_dict()), + "orders": json.dumps(db.get_orders().to_dict()), "trade_fill": json.dumps(db.get_trade_fills().to_dict()), - "executor": json.dumps(db.get_executors_data().to_dict()), - "order_status": json.dumps(db.get_order_status().to_dict()) + "executors": json.dumps(db.get_executors_data().to_dict()), + "order_status": json.dumps(db.get_order_status().to_dict()), + "controllers": json.dumps(db.get_controllers_data().to_dict()) } } except Exception as e: - print(f"Error reading database: {str(e)}") + print(f"Error reading database {db_path}: {str(e)}") db_content = { "db_name": "", "db_path": db_path, @@ -45,3 +49,52 @@ async def read_databases(db_paths: List[str] = None): } dbs.append(db_content) return dbs + + +@router.post("/create-checkpoint", response_model=Dict[str, Any]) +async def create_checkpoint(db_paths: List[str]): + try: + dbs = await read_databases(db_paths) + + healthy_dbs = [db for db in dbs if db["healthy"]] + + table_names = ["trade_fill", "orders", "order_status", "executors", "controllers"] + tables_dict = {name: pd.DataFrame() for name in table_names} + + for db in healthy_dbs: + for table_name in table_names: + new_data = pd.DataFrame(json.loads(db["tables"][table_name])) + new_data["db_path"] = db["db_path"] + new_data["db_name"] = db["db_name"] + tables_dict[table_name] = pd.concat([tables_dict[table_name], new_data]) + + etl = ETLPerformance(db_path=f"bots/data/checkpoint_{str(int(time.time()))}.sqlite") + etl.create_tables() + etl.insert_data(tables_dict) + return {"message": "Checkpoint created successfully."} + except Exception as e: + return {"message": f"Error: {str(e)}"} + + +@router.post("/list-checkpoints", response_model=List[str]) +async def list_checkpoints(full_path: bool): + return file_system.list_checkpoints(full_path) + + +@router.post("/load-checkpoint") +async def load_checkpoint(checkpoint_path: str): + try: + etl = ETLPerformance(checkpoint_path) + executor = etl.load_executors() + order = etl.load_orders() + trade_fill = etl.load_trade_fill() + controllers = etl.load_controllers() + checkpoint_data = { + "executors": json.dumps(executor.to_dict()), + "orders": json.dumps(order.to_dict()), + "trade_fill": json.dumps(trade_fill.to_dict()), + "controllers": json.dumps(controllers.to_dict()) + } + return checkpoint_data + except Exception as e: + return {"message": f"Error: {str(e)}"} \ No newline at end of file diff --git a/routers/manage_performance.py b/routers/manage_performance.py index 1ec97fd..01bc316 100644 --- a/routers/manage_performance.py +++ b/routers/manage_performance.py @@ -1,34 +1,28 @@ -from decimal import Decimal -from typing import List +from fastapi import APIRouter +from typing import Any, Dict -import json -from hummingbot.strategy_v2.models.executors_info import ExecutorInfo from hummingbot.strategy_v2.backtesting.backtesting_engine_base import BacktestingEngineBase -from routers.manage_backtesting import BACKTESTING_ENGINES - -from fastapi import APIRouter +from utils.etl_databases import PerformanceDataSource router = APIRouter(tags=["Market Performance"]) @router.post("/get-performance-results") -async def get_performance_results(executors_and_controller_type: dict): - executors = executors_and_controller_type.get("executors") - controller_type = executors_and_controller_type.get("controller_type") +async def get_performance_results(payload: Dict[str, Any]): + executors = payload.get("executors") + data_source = PerformanceDataSource(executors) performance_results = {} try: - for executor in executors: - if isinstance(executor["custom_info"], str): - executor["custom_info"] = json.loads(executor["custom_info"]) - parsed_executors = [ExecutorInfo(**executor) for executor in executors] - backtesting_engine = BACKTESTING_ENGINES[controller_type] - performance_results["results"] = backtesting_engine.summarize_results(parsed_executors) + backtesting_engine = BacktestingEngineBase() + executor_info_list = data_source.executor_info_list + performance_results["results"] = backtesting_engine.summarize_results(executor_info_list ) results = performance_results["results"] results["sharpe_ratio"] = results["sharpe_ratio"] if results["sharpe_ratio"] is not None else 0 return { + "executors": executors, "results": performance_results["results"], } except Exception as e: - return {"error": str(e)} + return {"error": str(e)} \ No newline at end of file diff --git a/utils/etl_databases.py b/utils/etl_databases.py index 21fbf2d..85efe9e 100644 --- a/utils/etl_databases.py +++ b/utils/etl_databases.py @@ -1,8 +1,11 @@ import os import pandas as pd import json -from typing import List +from typing import List, Dict, Any +from hummingbot.core.data_type.common import TradeType +from hummingbot.strategy_v2.models.base import RunnableStatus +from hummingbot.strategy_v2.models.executors import CloseType from hummingbot.strategy_v2.models.executors_info import ExecutorInfo from sqlalchemy import create_engine, insert, text, MetaData, Table, Column, VARCHAR, INT, FLOAT, Integer, String, Float from sqlalchemy.orm import sessionmaker @@ -30,8 +33,9 @@ def status(self): orders_status = self._get_table_status(self.get_orders) order_status_status = self._get_table_status(self.get_order_status) executors_status = self._get_table_status(self.get_executors_data) + controller_status = self._get_table_status(self.get_controllers_data) general_status = all(status == "Correct" for status in - [trade_fill_status, orders_status, order_status_status, executors_status]) + [trade_fill_status, orders_status, order_status_status, executors_status, controller_status]) status = {"db_name": self.db_name, "db_path": self.db_path, "trade_fill": trade_fill_status, @@ -77,6 +81,12 @@ def get_executors_data(self) -> pd.DataFrame: executors = pd.read_sql_query(text(query), session.connection()) return executors + def get_controllers_data(self) -> pd.DataFrame: + with self.session_maker() as session: + query = "SELECT * FROM Controllers" + controllers = pd.read_sql_query(text(query), session.connection()) + return controllers + class ETLPerformance: def __init__(self, @@ -151,9 +161,20 @@ def orders_table(self): Column('position', VARCHAR(255)), ) + @property + def controllers_table(self): + return Table( + 'controllers', MetaData(), + Column('id', VARCHAR(255)), + Column('controller_id', INT), + Column('timestamp', FLOAT), + Column('type', VARCHAR(255)), + Column('config', String), + ) + @property def tables(self): - return [self.executors_table, self.trade_fill_table, self.orders_table] + return [self.executors_table, self.trade_fill_table, self.orders_table, self.controllers_table] def create_tables(self): with self.engine.connect(): @@ -167,6 +188,8 @@ def insert_data(self, data): self.insert_trade_fill(data["trade_fill"]) if "orders" in data: self.insert_orders(data["orders"]) + if "controllers" in data: + self.insert_controllers(data["controllers"]) def insert_executors(self, executors): with self.engine.connect() as conn: @@ -239,11 +262,24 @@ def insert_orders(self, orders): conn.execute(ins) conn.commit() + def insert_controllers(self, controllers): + with self.engine.connect() as conn: + for _, row in controllers.iterrows(): + ins = insert(self.controllers_table).values( + id=row["id"], + controller_id=row["controller_id"], + timestamp=row["timestamp"], + type=row["type"], + config=row["config"], + ) + conn.execute(ins) + conn.commit() + def load_executors(self): with self.session_maker() as session: query = "SELECT * FROM executors" executors = pd.read_sql_query(text(query), session.connection()) - return executors + return executors def load_trade_fill(self): with self.session_maker() as session: @@ -257,68 +293,71 @@ def load_orders(self): orders = pd.read_sql_query(text(query), session.connection()) return orders - @staticmethod - def parse_executors(executors: pd.DataFrame) -> List[ExecutorInfo]: + def load_controllers(self): + with self.session_maker() as session: + query = "SELECT * FROM controllers" + controllers = pd.read_sql_query(text(query), session.connection()) + return controllers + + +class PerformanceDataSource: + def __init__(self, executors_dict: Dict[str, Any]): + self.executors_dict = executors_dict + + @property + def executors_df(self): + executors = pd.DataFrame(self.executors_dict) + executors["custom_info"] = executors["custom_info"].apply( + lambda x: json.loads(x) if isinstance(x, str) else x) + executors["config"] = executors["config"].apply(lambda x: json.loads(x) if isinstance(x, str) else x) + executors["timestamp"] = executors["timestamp"].apply(lambda x: self.ensure_timestamp_in_seconds(x)) + executors["close_timestamp"] = executors["close_timestamp"].apply( + lambda x: self.ensure_timestamp_in_seconds(x)) + executors["trading_pair"] = executors["config"].apply(lambda x: x["trading_pair"]) + executors["exchange"] = executors["config"].apply(lambda x: x["connector_name"]) + executors["level_id"] = executors["config"].apply(lambda x: x.get("level_id")) + executors["bep"] = executors["custom_info"].apply(lambda x: x["current_position_average_price"]) + executors["order_ids"] = executors["custom_info"].apply(lambda x: x.get("order_ids")) + executors["close_price"] = executors["custom_info"].apply(lambda x: x.get("close_price", x["current_position_average_price"])) + executors["sl"] = executors["config"].apply(lambda x: x.get("stop_loss")).fillna(0) + executors["tp"] = executors["config"].apply(lambda x: x.get("take_profit")).fillna(0) + executors["tl"] = executors["config"].apply(lambda x: x.get("time_limit")).fillna(0) + return executors + + @property + def executor_info_list(self) -> List[ExecutorInfo]: + executors = self.apply_special_data_types(self.executors_df) executor_values = [] - for _, row in executors.iterrows(): - executor_values.append(ExecutorInfo( + for index, row in executors.iterrows(): + executor_to_append = ExecutorInfo( id=row["id"], timestamp=row["timestamp"], type=row["type"], close_timestamp=row["close_timestamp"], close_type=row["close_type"], status=row["status"], - config=json.loads(row["config"]), + config=row["config"], net_pnl_pct=row["net_pnl_pct"], net_pnl_quote=row["net_pnl_quote"], cum_fees_quote=row["cum_fees_quote"], filled_amount_quote=row["filled_amount_quote"], is_active=row["is_active"], is_trading=row["is_trading"], - custom_info=json.loads(row["custom_info"]), - controller_id=row["controller_id"], - side=row["side"], - )) - return executor_values - - @staticmethod - def dump_executors(executors: List[ExecutorInfo]) -> List[dict]: - executor_values = [] - for executor in executors: - executor_to_append = { - "id": executor.id, - "timestamp": executor.timestamp, - "type": executor.type, - "close_timestamp": executor.close_timestamp, - "close_type": executor.close_type.value, - "status": executor.status.value, - "config": executor.config.dict(), - "net_pnl_pct": executor.net_pnl_pct, - "net_pnl_quote": executor.net_pnl_quote, - "cum_fees_quote": executor.cum_fees_quote, - "filled_amount_quote": executor.filled_amount_quote, - "is_active": executor.is_active, - "is_trading": executor.is_trading, - "custom_info": json.dumps(executor.custom_info), - "controller_id": executor.controller_id, - "side": executor.side, - } - executor_to_append["config"]["mode"] = executor_to_append["config"]["mode"].value - executor_to_append["config"]["side"] = executor_to_append["config"]["side"].value + custom_info=row["custom_info"], + controller_id=row["controller_id"] + ) + executor_to_append.custom_info["side"] = row["side"] executor_values.append(executor_to_append) return executor_values - @staticmethod - def get_executors_with_orders(executors, orders): - df = pd.DataFrame(executors['custom_info'].tolist(), index=executors['id'], - columns=["custom_info"]).reset_index() - df["custom_info"] = df["custom_info"].apply(lambda x: json.loads(x)) - df["orders"] = df["custom_info"].apply(lambda x: x["order_ids"]) - df.rename(columns={"id": "executor_id"}, inplace=True) - exploded_df = df.explode("orders").rename(columns={"orders": "order_id"}) - exec_with_orders = exploded_df.merge(orders, left_on="order_id", right_on="client_order_id", how="inner") - exec_with_orders = exec_with_orders[exec_with_orders["last_status"].isin(["SellOrderCompleted", "BuyOrderCompleted"])] - return exec_with_orders[["executor_id", "order_id", "last_status", "last_update_timestamp", "price", "amount", "position"]] + def apply_special_data_types(self, executors): + executors["status"] = executors["status"].apply(lambda x: self.get_enum_by_value(RunnableStatus, int(x))) + executors["side"] = executors["config"].apply(lambda x: self.get_enum_by_value(TradeType, int(x["side"]))) + executors["close_type"] = executors["close_type"].apply(lambda x: self.get_enum_by_value(CloseType, int(x))) + executors["close_type_name"] = executors["close_type"].apply(lambda x: x.name) + executors["datetime"] = pd.to_datetime(executors.timestamp, unit="s") + executors["close_datetime"] = pd.to_datetime(executors["close_timestamp"], unit="s") + return executors @staticmethod def get_enum_by_value(enum_class, value): From 23c028685a3508cc0f85fc203de3c0b0865d8e3d Mon Sep 17 00:00:00 2001 From: drupman Date: Thu, 14 Nov 2024 22:43:04 -0300 Subject: [PATCH 23/24] (feat) add debug mode --- main.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/main.py b/main.py index ac50467..b5ce723 100644 --- a/main.py +++ b/main.py @@ -14,6 +14,7 @@ username = os.getenv("USERNAME", "admin") password = os.getenv("PASSWORD", "admin") +debug_mode = os.getenv("DEBUG_MODE", False) app = FastAPI() @@ -31,7 +32,7 @@ def auth_user( is_correct_password = secrets.compare_digest( current_password_bytes, correct_password_bytes ) - if not (is_correct_username and is_correct_password): + if not (is_correct_username and is_correct_password) and not debug_mode: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", From 394d6ecb0334fc9fac502e20b2ae3272189ffe30 Mon Sep 17 00:00:00 2001 From: drupman Date: Thu, 14 Nov 2024 22:43:12 -0300 Subject: [PATCH 24/24] (feat) update compose file --- docker-compose.yml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 03e1163..5758652 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,17 +2,18 @@ version: "3.9" services: backend-api: - build: . + container_name: backend-api + image: hummingbot/backend-api:latest ports: - "8000:8000" volumes: - ./bots:/backend-api/bots - /var/run/docker.sock:/var/run/docker.sock - env_file: - - .env environment: - BROKER_HOST=emqx - BROKER_PORT=1883 + - USERNAME=admin + - PASSWORD=admin networks: - emqx-bridge emqx: