diff --git a/cli/__init__.py b/cli/__init__.py index d45a9f66..687facf8 100644 --- a/cli/__init__.py +++ b/cli/__init__.py @@ -6,6 +6,7 @@ from cli.api import api from cli.load import load from cli.reorg import reorg +from cli.schedule import schedule from cli.stream import stream from indexer.utils.logging_utils import logging_basic_config @@ -29,3 +30,4 @@ def cli(ctx): cli.add_command(api, "api") cli.add_command(aggregates, "aggregates") cli.add_command(reorg, "reorg") +cli.add_command(schedule, "schedule") diff --git a/cli/aggregates.py b/cli/aggregates.py index c2bdb347..17711bc3 100644 --- a/cli/aggregates.py +++ b/cli/aggregates.py @@ -1,12 +1,24 @@ +import datetime + import click from common.services.postgresql_service import PostgreSQLService -from indexer.aggr_jobs.utils import DateType, check_data_completeness, get_yesterday_date +from indexer.aggr_jobs.utils import DateType, get_yesterday_date, parse_job_list from indexer.controller.aggregates_controller import AggregatesController from indexer.controller.dispatcher.aggregates_dispatcher import AggregatesDispatcher +from indexer.schedule_jobs.aggregates_jobs import parse_aggregate_schedule @click.command(context_settings=dict(help_option_names=["-h", "--help"])) +@click.option( + "-jn", + "--job-name", + default='all', + show_default=True, + type=str, + help="Job list to aggregate data for, e.g. 'all', 'FBTC', 'EXPLORE', 'FBTC,EXPLORE'", + envvar="JOB_NAME", +) @click.option( "-pg", "--postgres-url", @@ -23,8 +35,8 @@ type=str, envvar="PROVIDER_URI", help="The URI of the web3 provider e.g. " - "file://$HOME/Library/Ethereum/geth.ipc or https://mainnet.infura.io" - "It helps determine whether the latest synchronized data meets the required execution date range.", + "file://$HOME/Library/Ethereum/geth.ipc or https://mainnet.infura.io" + "It helps determine whether the latest synchronized data meets the required execution date range.", ) @click.option( "-sd", @@ -53,18 +65,29 @@ envvar="DATE_BATCH_SIZE", help="How many DATEs to batch in single sync round", ) -def aggregates(postgres_url, provider_uri, start_date, end_date, date_batch_size): +@click.option( + "--config-file", + default='/app/aggr_schedule_config.yaml', + show_default=True, + type=str, + envvar="CONFIG_FILE", + help="", +) +def aggregates(job_name, postgres_url, start_date, end_date, date_batch_size, config_file, provider_uri): if not start_date and not end_date: start_date, end_date = get_yesterday_date() elif not end_date: _, end_date = get_yesterday_date() + # check_data_completeness(db_service, provider_uri, end_date) + db_service = PostgreSQLService(postgres_url) + version = int(datetime.datetime.now().timestamp()) - check_data_completeness(db_service, provider_uri, end_date) + config = {"db_service": db_service, 'version': version} - config = {"db_service": db_service} - dispatcher = AggregatesDispatcher(config) + job_list = parse_job_list(job_name, config_file) + dispatcher = AggregatesDispatcher(config, job_list) controller = AggregatesController(job_dispatcher=dispatcher) controller.action(start_date, end_date, date_batch_size) diff --git a/cli/schedule.py b/cli/schedule.py new file mode 100644 index 00000000..6ac932e3 --- /dev/null +++ b/cli/schedule.py @@ -0,0 +1,47 @@ +import os +import sys +from datetime import datetime + +import click +from apscheduler.schedulers.blocking import BlockingScheduler +from apscheduler.triggers.cron import CronTrigger + +from indexer.schedule_jobs.aggregates_jobs import aggregates_yesterday_job, parse_crontab, parse_aggregate_schedule + + +@click.command(context_settings=dict(help_option_names=["-h", "--help"])) +@click.option( + "-pg", + "--postgres-url", + type=str, + required=True, + envvar="POSTGRES_URL", + help="The required postgres connection url." "e.g. postgresql+psycopg2://postgres:admin@127.0.0.1:5432/ethereum", +) +@click.option( + "--config-file", + default='/app/aggr_schedule_config.yaml', + show_default=True, + type=str, + envvar="CONFIG_FILE", + help="", +) +def schedule(postgres_url, configure_file) -> None: + sys.stdout = os.fdopen(sys.stdout.fileno(), "w", buffering=1) # Line-buffered stdout + sys.stderr = os.fdopen(sys.stderr.fileno(), "w", buffering=1) # Line-buffered stderr + + jobs = parse_aggregate_schedule(configure_file) + + scheduler = BlockingScheduler() + + for job in jobs: + schedule_time = job["schedule_time"] + trigger = CronTrigger.from_crontab(schedule_time) + + job_list_generator = job['job_list_generator'] + + job_args = (job_list_generator, postgres_url) + + scheduler.add_job(func=aggregates_yesterday_job, trigger=trigger, args=job_args) + + scheduler.start() diff --git a/docker-compose/aggr_schedule_config.yaml b/docker-compose/aggr_schedule_config.yaml new file mode 100644 index 00000000..d940100f --- /dev/null +++ b/docker-compose/aggr_schedule_config.yaml @@ -0,0 +1,44 @@ +jobs: + FBTC: + initialization_tasks: + - init_token_price + - init_period_address_token_balance + + regular_tasks: + - daily_feature_holding_balance_staked_fbtc_detail.sql + - daily_feature_holding_balance_uniswap_v3.sql + - daily_address_token_balances.sql + - daily_feature_erc20_token_supply_records.sql + + ordered_tasks: + - period_address_token_balances.sql + - period_feature_holding_balance_uniswap_v3.sql: + - agin_position_token_address: 0x218bf598d1453383e2f4aa7b14ffb9bfb102d637 + - cleoexchange_position_token_address: 0xaaa78e8c4241990b4ce159e105da08129345946a + - period_feature_staked_fbtc_detail_records.sql + - period_feature_holding_balance_staked_fbtc_detail.sql + - period_feature_erc1155_token_supply_records.sql + - period_feature_holding_balance_merchantmoe.sql + - period_feature_erc20_token_supply_records.sql + - period_feature_holding_balance_dodo.sql: + - token_address: 0x2260fac5e5542a773aa44fbcfedf7c193bc2c599 + - period_feature_holding_balance_lendle.sql + - period_feature_defi_wallet_fbtc_aggregates.py + EXPLORE: + regular_tasks: + - daily_explore_aggregates.sql + + +schedule: + common_config: + schedule_time: 0 1 * * * + + schedule1: + jobs: + FBTC + + schedule2: + config: + schedule_time: 0 2 * * * # default 0 1 * * * + jobs: + EXPLORE \ No newline at end of file diff --git a/docker-compose/docker-compose.yaml b/docker-compose/docker-compose.yaml index 9e4e73d2..f2fece2e 100644 --- a/docker-compose/docker-compose.yaml +++ b/docker-compose/docker-compose.yaml @@ -44,6 +44,13 @@ services: ports: - 8082:8082 + hemera-schedule: + <<: *common-settings + container_name: hemera-schedule + command: [ "schedule" ] + volumes: + - ./config.yaml:/app/config.yaml + postgresql: env_file: - hemera-indexer.env diff --git a/indexer/aggr_jobs/aggr_base_job.py b/indexer/aggr_jobs/aggr_base_job.py index 6efde17a..3f5661d1 100644 --- a/indexer/aggr_jobs/aggr_base_job.py +++ b/indexer/aggr_jobs/aggr_base_job.py @@ -8,7 +8,7 @@ class AggrBaseJob: def run(self, **kwargs): pass - def get_sql_content(self, file_name, start_date, end_date): + def get_sql_content(self, file_name, start_date, end_date, **kwargs): base_dir = os.path.dirname(__file__) if not file_name.endswith(".sql"): file_name += ".sql" @@ -16,9 +16,24 @@ def get_sql_content(self, file_name, start_date, end_date): with open(file_path, "r") as f: sql_template = f.read() - sql = sql_template.format(start_date=start_date, end_date=end_date) + + template_dict = { + 'start_date': start_date, + 'end_date': end_date, + 'start_date_previous': self.get_previous(start_date), + **kwargs + } + + sql = sql_template.format(**template_dict) return sql + @staticmethod + def get_previous(date_str): + date_obj = datetime.strptime(date_str, "%Y-%m-%d") + previous_day = date_obj - timedelta(days=1) + previous_day_str = previous_day.strftime("%Y-%m-%d") + return previous_day_str + @staticmethod def generate_date_pairs(start_date, end_date): start_date_obj = datetime.strptime(start_date, "%Y-%m-%d") diff --git a/indexer/aggr_jobs/aggr_job_scheduler.py b/indexer/aggr_jobs/aggr_job_scheduler.py index 54b490d8..d59f80ba 100644 --- a/indexer/aggr_jobs/aggr_job_scheduler.py +++ b/indexer/aggr_jobs/aggr_job_scheduler.py @@ -3,24 +3,35 @@ AggrDisorderJob -> AggrOrderJob """ -from indexer.aggr_jobs.disorder_jobs.disorder_job import AggrDisorderJob -from indexer.aggr_jobs.order_jobs.order_job import AggrOrderJob +from indexer.aggr_jobs.regular_tasks.regular_task_dispatch_job import AggrRegularTaskDispatchJob +from indexer.aggr_jobs.initialization_tasks.initialization_task_dispatch_job import InitializationTaskDispatchJob +from indexer.aggr_jobs.ordered_tasks.ordered_task_dispatch_job import AggrOrderedTaskDispatchJob class AggrJobScheduler: - def __init__(self, config): + def __init__(self, config, job_list): self.config = config - self.jobs = self.instantiate_jobs() + self.job_list = job_list + self.init_job = {} + self.jobs = {} + self.instantiate_jobs() + pass - def run_jobs(self, start_date, end_date): - for job in self.jobs: + def run_init_job(self, start_date, end_date): + for job_name, job in self.init_job.items(): job.run(start_date=start_date, end_date=end_date) + def run_jobs(self, start_date, end_date): + for job_name, jobs in self.jobs.items(): + for job in jobs: + job.run(start_date=start_date, end_date=end_date) + def instantiate_jobs(self): - jobs = [] - for job_class in [AggrDisorderJob, AggrOrderJob]: - job = job_class( - config=self.config, - ) - jobs.append(job) - return jobs + for job_name, tasks_dict in self.job_list.items(): + self.init_job[job_name] = InitializationTaskDispatchJob(config=self.config, tasks_dict=tasks_dict) + + jobs = [] + for job_class in [AggrRegularTaskDispatchJob, AggrOrderedTaskDispatchJob]: + job = job_class(config=self.config, tasks_dict=tasks_dict) + jobs.append(job) + self.jobs[job_name] = jobs diff --git a/indexer/aggr_jobs/initialization_tasks/__init__.py b/indexer/aggr_jobs/initialization_tasks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/indexer/aggr_jobs/initialization_tasks/initialization_task_dispatch_job.py b/indexer/aggr_jobs/initialization_tasks/initialization_task_dispatch_job.py new file mode 100644 index 00000000..4641d2f2 --- /dev/null +++ b/indexer/aggr_jobs/initialization_tasks/initialization_task_dispatch_job.py @@ -0,0 +1,31 @@ +import time + +from sqlalchemy import text + +from indexer.aggr_jobs.aggr_base_job import AggrBaseJob + + +class InitializationTaskDispatchJob(AggrBaseJob): + sql_folder = "initialization_tasks" + + def __init__(self, **kwargs): + config = kwargs["config"] + tasks_dict = kwargs["tasks_dict"] + self.tasks_list = tasks_dict.get('initialization_tasks', []) + + self.db_service = config["db_service"] + + def run(self, **kwargs): + self.start_date = kwargs["start_date"] + self.end_date = kwargs["end_date"] + + session = self.db_service.Session() + for job_name in self.tasks_list: + start_time = time.time() + sql_content = self.get_sql_content(job_name, self.start_date, self.end_date) + session.execute(text(sql_content)) + session.commit() + execution_time = time.time() - start_time + print(f"----------- executed in {execution_time:.2f} seconds: JOB {job_name}") + print("======== finished date", self.start_date) + session.close() diff --git a/indexer/aggr_jobs/job_list_generator.py b/indexer/aggr_jobs/job_list_generator.py new file mode 100644 index 00000000..e12288df --- /dev/null +++ b/indexer/aggr_jobs/job_list_generator.py @@ -0,0 +1,15 @@ +class JobListGenerator(object): + def __init__(self, job_name,initialization_jobs, disorder_jobs, order_jobs): + self.job_name = job_name + self.initialization_jobs = initialization_jobs + self.disorder_jobs = disorder_jobs + self.order_jobs = order_jobs + + def get_initialization_jobs(self): + return self.initialization_jobs + + def get_disorder_jobs(self): + return self.disorder_jobs + + def get_order_jobs(self): + return self.order_jobs diff --git a/indexer/aggr_jobs/order_jobs/order_job.py b/indexer/aggr_jobs/order_jobs/order_job.py deleted file mode 100644 index ea944934..00000000 --- a/indexer/aggr_jobs/order_jobs/order_job.py +++ /dev/null @@ -1,24 +0,0 @@ -from sqlalchemy import text - -from indexer.aggr_jobs.aggr_base_job import AggrBaseJob - - -class AggrOrderJob(AggrBaseJob): - sql_folder = "order_jobs" - - def __init__(self, **kwargs): - config = kwargs["config"] - self.db_service = config["db_service"] - - def run(self, **kwargs): - start_date = kwargs["start_date"] - end_date = kwargs["end_date"] - - session = self.db_service.Session() - - date_pairs = self.generate_date_pairs(start_date, end_date) - for date_pair in date_pairs: - start_date, end_date = date_pair - sql_content = self.get_sql_content("period_wallet_addresses_aggregates", start_date, end_date) - session.execute(text(sql_content)) - session.commit() diff --git a/indexer/aggr_jobs/ordered_tasks/__init__.py b/indexer/aggr_jobs/ordered_tasks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/indexer/aggr_jobs/ordered_tasks/models/__init__.py b/indexer/aggr_jobs/ordered_tasks/models/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/indexer/aggr_jobs/ordered_tasks/models/period_feature_holding_balance_lendle.py b/indexer/aggr_jobs/ordered_tasks/models/period_feature_holding_balance_lendle.py new file mode 100644 index 00000000..678bc368 --- /dev/null +++ b/indexer/aggr_jobs/ordered_tasks/models/period_feature_holding_balance_lendle.py @@ -0,0 +1,21 @@ +from flask_sqlalchemy import SQLAlchemy +from sqlalchemy import DATE, TIMESTAMP, Column, Index, func +from sqlalchemy.dialects.postgresql import BYTEA, NUMERIC, VARCHAR + +from common.models import HemeraModel, general_converter + + +class PeriodFeatureHoldingBalanceLendle(HemeraModel): + __tablename__ = "period_feature_holding_balance_lendle" + + period_date = Column(DATE, primary_key=True, nullable=False) + wallet_address = Column(BYTEA, primary_key=True) + protocol_id = Column(VARCHAR, primary_key=True) + contract_address = Column(BYTEA, primary_key=True) + + token_symbol = Column(VARCHAR) + token_address = Column(BYTEA) + + balance = Column(NUMERIC(100, 18)) + + create_time = Column(TIMESTAMP, server_default=func.now()) diff --git a/indexer/aggr_jobs/order_jobs/models/period_feature_holding_balance_merchantmoe.py b/indexer/aggr_jobs/ordered_tasks/models/period_feature_holding_balance_merchantmoe.py similarity index 100% rename from indexer/aggr_jobs/order_jobs/models/period_feature_holding_balance_merchantmoe.py rename to indexer/aggr_jobs/ordered_tasks/models/period_feature_holding_balance_merchantmoe.py diff --git a/indexer/aggr_jobs/order_jobs/models/period_feature_holding_balance_uniswap_v3.py b/indexer/aggr_jobs/ordered_tasks/models/period_feature_holding_balance_uniswap_v3.py similarity index 100% rename from indexer/aggr_jobs/order_jobs/models/period_feature_holding_balance_uniswap_v3.py rename to indexer/aggr_jobs/ordered_tasks/models/period_feature_holding_balance_uniswap_v3.py diff --git a/indexer/aggr_jobs/order_jobs/models/period_feature_merchant_moe_token_bin.py b/indexer/aggr_jobs/ordered_tasks/models/period_feature_merchant_moe_token_bin.py similarity index 100% rename from indexer/aggr_jobs/order_jobs/models/period_feature_merchant_moe_token_bin.py rename to indexer/aggr_jobs/ordered_tasks/models/period_feature_merchant_moe_token_bin.py diff --git a/indexer/aggr_jobs/order_jobs/models/period_feature_uniswap_v3_pool_prices.py b/indexer/aggr_jobs/ordered_tasks/models/period_feature_uniswap_v3_pool_prices.py similarity index 100% rename from indexer/aggr_jobs/order_jobs/models/period_feature_uniswap_v3_pool_prices.py rename to indexer/aggr_jobs/ordered_tasks/models/period_feature_uniswap_v3_pool_prices.py diff --git a/indexer/aggr_jobs/order_jobs/models/period_feature_uniswap_v3_token_details.py b/indexer/aggr_jobs/ordered_tasks/models/period_feature_uniswap_v3_token_details.py similarity index 100% rename from indexer/aggr_jobs/order_jobs/models/period_feature_uniswap_v3_token_details.py rename to indexer/aggr_jobs/ordered_tasks/models/period_feature_uniswap_v3_token_details.py diff --git a/indexer/aggr_jobs/ordered_tasks/ordered_task_dispatch_job.py b/indexer/aggr_jobs/ordered_tasks/ordered_task_dispatch_job.py new file mode 100644 index 00000000..a199c6d0 --- /dev/null +++ b/indexer/aggr_jobs/ordered_tasks/ordered_task_dispatch_job.py @@ -0,0 +1,50 @@ +import time + +from sqlalchemy import text + +from indexer.aggr_jobs.aggr_base_job import AggrBaseJob +from indexer.aggr_jobs.ordered_tasks.py_jobs.period_feature_defi_wallet_fbtc_aggregates import ( + PeriodFeatureDefiWalletFbtcAggregates, +) + + +class AggrOrderedTaskDispatchJob(AggrBaseJob): + sql_folder = "ordered_tasks" + + def __init__(self, **kwargs): + config = kwargs["config"] + tasks_dict = kwargs["tasks_dict"] + self.tasks_dict = tasks_dict.get('ordered_tasks', []) + self.db_service = config["db_service"] + + def generator_py_jobs(self, job_name, start_date, end_date): + if job_name == "period_feature_defi_wallet_fbtc_aggregates.py": + period_feature_defi_wallet_fbtc_aggregates_job = PeriodFeatureDefiWalletFbtcAggregates( + self.chain_name, self.db_service, start_date + ) + period_feature_defi_wallet_fbtc_aggregates_job.run() + + def run(self, **kwargs): + start_date_limit = kwargs["start_date"] + end_date_limit = kwargs["end_date"] + + session = self.db_service.Session() + + date_pairs = self.generate_date_pairs(start_date_limit, end_date_limit) + for date_pair in date_pairs: + start_date, end_date = date_pair + + for job_name in self.tasks_dict: + start_time = time.time() + if job_name.endswith(".py"): + self.generator_py_jobs(job_name, start_date, end_date) + else: + sql_content = self.get_sql_content(job_name, start_date, end_date) + session.execute(text(sql_content)) + session.commit() + execution_time = time.time() - start_time + print(f"----------- executed in {execution_time:.2f} seconds: JOB {job_name}") + + print("======== finished date", start_date) + + session.close() diff --git a/indexer/aggr_jobs/ordered_tasks/period_feature_holding_balance_lendle.sql b/indexer/aggr_jobs/ordered_tasks/period_feature_holding_balance_lendle.sql new file mode 100644 index 00000000..0564362a --- /dev/null +++ b/indexer/aggr_jobs/ordered_tasks/period_feature_holding_balance_lendle.sql @@ -0,0 +1,41 @@ +delete +from period_feature_holding_balance_lendle +where period_date >= '{start_date}' + and period_date < '{end_date}'; + + +with tokens_table as (select *, + CASE + WHEN address = '\xdef3542bb1b2969c1966dd91ebc504f4b37462fe' THEN 1 + WHEN address = '\x874712c653aaaa7cfb201317f46e00238c2649bb' THEN -1 + WHEN address = '\x08fc23af290d538647aa2836c5b3cf2fb3313759' THEN -1 + ELSE 0 + END AS voucher_value + from tokens + where address in ( + '\xdef3542bb1b2969c1966dd91ebc504f4b37462fe', + '\x874712c653aaaa7cfb201317f46e00238c2649bb', + '\x08fc23af290d538647aa2836c5b3cf2fb3313759' + )) + +insert +into period_feature_holding_balance_lendle (period_date, wallet_address, protocol_id, contract_address, + token_symbol, token_address, balance) +select date('{start_date}'), + d1.address, + 'lendle', + d1.token_address, + d3.symbol as token_symbol, + d3.address as token_address, + d1.balance / pow(10, d2.decimals) * voucher_value + +from period_address_token_balances d1 + inner join tokens_table d2 on d1.token_address = d2.address + inner join tokens d3 on d3.address = '\xC96DE26018A54D51C097160568752C4E3BD6C364' + +where token_address in ( + '\xdef3542bb1b2969c1966dd91ebc504f4b37462fe', + '\x874712c653aaaa7cfb201317f46e00238c2649bb', + '\x08fc23af290d538647aa2836c5b3cf2fb3313759' + ); + diff --git a/indexer/aggr_jobs/order_jobs/period_feature_holding_balance_merchantmoe.sql b/indexer/aggr_jobs/ordered_tasks/period_feature_holding_balance_merchantmoe.sql similarity index 100% rename from indexer/aggr_jobs/order_jobs/period_feature_holding_balance_merchantmoe.sql rename to indexer/aggr_jobs/ordered_tasks/period_feature_holding_balance_merchantmoe.sql diff --git a/indexer/aggr_jobs/order_jobs/period_feature_holding_balance_uniswap_v3.sql b/indexer/aggr_jobs/ordered_tasks/period_feature_holding_balance_uniswap_v3.sql similarity index 100% rename from indexer/aggr_jobs/order_jobs/period_feature_holding_balance_uniswap_v3.sql rename to indexer/aggr_jobs/ordered_tasks/period_feature_holding_balance_uniswap_v3.sql diff --git a/indexer/aggr_jobs/order_jobs/period_wallet_addresses_aggregates.sql b/indexer/aggr_jobs/ordered_tasks/period_wallet_addresses_aggregates.sql similarity index 100% rename from indexer/aggr_jobs/order_jobs/period_wallet_addresses_aggregates.sql rename to indexer/aggr_jobs/ordered_tasks/period_wallet_addresses_aggregates.sql diff --git a/indexer/aggr_jobs/regular_tasks/__init__.py b/indexer/aggr_jobs/regular_tasks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/indexer/aggr_jobs/regular_tasks/daily_feature_defi_wallect_fbtc_detail.sql b/indexer/aggr_jobs/regular_tasks/daily_feature_defi_wallect_fbtc_detail.sql new file mode 100644 index 00000000..e69de29b diff --git a/indexer/aggr_jobs/disorder_jobs/daily_feature_holding_balance_uniswap_v3.sql b/indexer/aggr_jobs/regular_tasks/daily_feature_holding_balance_uniswap_v3.sql similarity index 100% rename from indexer/aggr_jobs/disorder_jobs/daily_feature_holding_balance_uniswap_v3.sql rename to indexer/aggr_jobs/regular_tasks/daily_feature_holding_balance_uniswap_v3.sql diff --git a/indexer/aggr_jobs/disorder_jobs/daily_wallet_addresses_aggregates.sql b/indexer/aggr_jobs/regular_tasks/daily_wallet_addresses_aggregates.sql similarity index 100% rename from indexer/aggr_jobs/disorder_jobs/daily_wallet_addresses_aggregates.sql rename to indexer/aggr_jobs/regular_tasks/daily_wallet_addresses_aggregates.sql diff --git a/indexer/aggr_jobs/regular_tasks/models/__init__.py b/indexer/aggr_jobs/regular_tasks/models/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/indexer/aggr_jobs/disorder_jobs/models/daily_feature_uniswap_v3_pool_prices.py b/indexer/aggr_jobs/regular_tasks/models/daily_feature_uniswap_v3_pool_prices.py similarity index 100% rename from indexer/aggr_jobs/disorder_jobs/models/daily_feature_uniswap_v3_pool_prices.py rename to indexer/aggr_jobs/regular_tasks/models/daily_feature_uniswap_v3_pool_prices.py diff --git a/indexer/aggr_jobs/disorder_jobs/models/daily_feature_uniswap_v3_token_details.py b/indexer/aggr_jobs/regular_tasks/models/daily_feature_uniswap_v3_token_details.py similarity index 100% rename from indexer/aggr_jobs/disorder_jobs/models/daily_feature_uniswap_v3_token_details.py rename to indexer/aggr_jobs/regular_tasks/models/daily_feature_uniswap_v3_token_details.py diff --git a/indexer/aggr_jobs/disorder_jobs/disorder_job.py b/indexer/aggr_jobs/regular_tasks/regular_task_dispatch_job.py similarity index 53% rename from indexer/aggr_jobs/disorder_jobs/disorder_job.py rename to indexer/aggr_jobs/regular_tasks/regular_task_dispatch_job.py index a168dc5f..5a47c2a4 100644 --- a/indexer/aggr_jobs/disorder_jobs/disorder_job.py +++ b/indexer/aggr_jobs/regular_tasks/regular_task_dispatch_job.py @@ -4,13 +4,15 @@ from indexer.executors.batch_work_executor import BatchWorkExecutor -class AggrDisorderJob(AggrBaseJob): - sql_folder = "disorder_jobs" +class AggrRegularTaskDispatchJob(AggrBaseJob): + sql_folder = "regular_tasks" def __init__(self, **kwargs): config = kwargs["config"] + tasks_dict = kwargs["tasks_dict"] + self.tasks_list = tasks_dict.get("regular_tasks", []) self.db_service = config["db_service"] - self._batch_work_executor = BatchWorkExecutor(5, 5) + self._batch_work_executor = BatchWorkExecutor(10, 10) def run(self, **kwargs): start_date = kwargs["start_date"] @@ -20,11 +22,20 @@ def run(self, **kwargs): date_pairs = self.generate_date_pairs(start_date, end_date) for date_pair in date_pairs: start_date, end_date = date_pair - sql_content = self.get_sql_content("daily_wallet_addresses_aggregates", start_date, end_date) - execute_sql_list.append(sql_content) + # Could be replaced to auto and selected + for task in self.tasks_list: + if isinstance(task, str): + sql_name = task + sql_content = self.get_sql_content(sql_name, start_date, end_date) + else: + sql_name, value_dict = next(iter(task.items())) + sql_content = self.get_sql_content(sql_name, start_date, end_date, **value_dict) + + execute_sql_list.append(sql_content) self._batch_work_executor.execute(execute_sql_list, self.execute_sql, total_items=len(execute_sql_list)) self._batch_work_executor.wait() + print(f"finish disorder job {start_date}") def execute_sql(self, sql_contents): session = self.db_service.Session() diff --git a/indexer/aggr_jobs/utils.py b/indexer/aggr_jobs/utils.py index 8a630290..c835b305 100644 --- a/indexer/aggr_jobs/utils.py +++ b/indexer/aggr_jobs/utils.py @@ -14,7 +14,7 @@ def get_yesterday_date(): today_str = now.strftime("%Y-%m-%d") yesterday_str = yesterday_datetime.strftime("%Y-%m-%d") - return today_str, yesterday_str + return yesterday_str, today_str class DateType(click.ParamType): @@ -77,3 +77,6 @@ def read_sync_record(db_service) -> int: if not record: record = read_sync_record_from_pg(db_service) return record + +def parse_job_list(job_name, configure_file): + pass diff --git a/indexer/controller/aggregates_controller.py b/indexer/controller/aggregates_controller.py index 714c0b17..bd9c9cf8 100644 --- a/indexer/controller/aggregates_controller.py +++ b/indexer/controller/aggregates_controller.py @@ -7,11 +7,15 @@ class AggregatesController(BaseController): def __init__(self, job_dispatcher): self.job_dispatcher = job_dispatcher - def action(self, start_date, end_date, date_batch_size): + def action(self, start_date, end_date, date_batch_size=None): + # no batch size + # self.job_dispatcher.run(start_date, end_date) + self.job_dispatcher.run_initialization_task_dispatch_job(start_date, end_date) + # batch size date_batches = self.split_date_range(start_date, end_date, date_batch_size) for date_batch in date_batches: - start_date, end_date = date_batch - self.job_dispatcher.run(start_date, end_date) + start_date_, end_date_ = date_batch + self.job_dispatcher.run(start_date_, end_date_) @staticmethod def split_date_range(start_date, end_date, batch_size): @@ -19,9 +23,11 @@ def split_date_range(start_date, end_date, batch_size): end_date_obj = datetime.strptime(end_date, "%Y-%m-%d") date_ranges = [] + + # 按批次切分日期范围,并确保批次间的结束日期和下一个批次的开始日期相同 while start_date_obj < end_date_obj: - batch_end_date = min(start_date_obj + timedelta(days=batch_size - 1), end_date_obj) + batch_end_date = min(start_date_obj + timedelta(days=batch_size), end_date_obj) date_ranges.append((start_date_obj.strftime("%Y-%m-%d"), batch_end_date.strftime("%Y-%m-%d"))) - start_date_obj = batch_end_date + timedelta(days=1) + start_date_obj = batch_end_date # 让下一个批次的开始日期与当前批次的结束日期接壤 return date_ranges diff --git a/indexer/controller/dispatcher/aggregates_dispatcher.py b/indexer/controller/dispatcher/aggregates_dispatcher.py index 97560581..625a43b4 100644 --- a/indexer/controller/dispatcher/aggregates_dispatcher.py +++ b/indexer/controller/dispatcher/aggregates_dispatcher.py @@ -3,9 +3,12 @@ class AggregatesDispatcher(BaseDispatcher): - def __init__(self, config): + def __init__(self, config, job_list): super().__init__() - self._job_scheduler = AggrJobScheduler(config=config) + self._job_scheduler = AggrJobScheduler(config=config, job_list=job_list) + + def run_initialization_task_dispatch_job(self, start_date, end_date): + self._job_scheduler.run_init_job(start_date, end_date) def run(self, start_date, end_date): self._job_scheduler.run_jobs(start_date=start_date, end_date=end_date) diff --git a/indexer/schedule_jobs/__init__.py b/indexer/schedule_jobs/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/indexer/schedule_jobs/aggregates_jobs.py b/indexer/schedule_jobs/aggregates_jobs.py new file mode 100644 index 00000000..aa2b5309 --- /dev/null +++ b/indexer/schedule_jobs/aggregates_jobs.py @@ -0,0 +1,79 @@ +import logging + +import yaml + +from common.services.postgresql_service import PostgreSQLService +from indexer.aggr_jobs.job_list_generator import JobListGenerator +from indexer.aggr_jobs.utils import get_yesterday_date +from indexer.controller.aggregates_controller import AggregatesController +from indexer.controller.dispatcher.aggregates_dispatcher import AggregatesDispatcher + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler() + ] +) + +logger = logging.getLogger(__name__) + + +def parse_crontab(expression): + fields = expression.split() + + if len(fields) != 5: + raise ValueError("Invalid crontab expression, it must contain 5 fields.") + + minute, hour, day, month, day_of_week = fields + + parsed_fields = {"minute": minute, "hour": hour, "day": day, "month": month, "day_of_week": day_of_week} + + return parsed_fields + + +def parse_aggregate_schedule(configure_file): + with open(configure_file, "r") as file: + config = yaml.safe_load(file) + + common_config = config["common_config"] + default_schedule_time = common_config["schedule_time"] + + jobs = config["jobs"] + + result_jobs = [] + for job_name, job_config in jobs.items(): + initialization_jobs = job_config.get("initialization_tasks", []) + disorder_jobs = job_config.get("regular_tasks", []) + order_jobs = job_config.get("ordered_tasks", []) + + job_list_generator = JobListGenerator(job_name=job_name, initialization_jobs=initialization_jobs, + disorder_jobs=disorder_jobs, + order_jobs=order_jobs) + + job_config_config = job_config.get('config') + if job_config_config: + schedule_time = job_config_config.get("schedule_time", default_schedule_time) + else: + schedule_time = default_schedule_time + + result_jobs.append( + {'schedule_time': schedule_time, 'job_list_generator': job_list_generator}) + + return result_jobs + + +def aggregates_yesterday_job(chain_name, job_list_generator, postgres_url, dblink_url): + job_name = job_list_generator.job_name + + logger.info(f"Job {job_name} executed") + + start_date, end_date = get_yesterday_date() + db_service = PostgreSQLService(postgres_url) + + config = {"db_service": db_service, "chain_name": chain_name, "dblink_url": dblink_url} + + dispatcher = AggregatesDispatcher(config, job_list_generator) + + controller = AggregatesController(job_dispatcher=dispatcher) + controller.action(start_date, end_date) diff --git a/pyproject.toml b/pyproject.toml index f0c75a53..e0612059 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,6 +50,7 @@ dependencies = [ "orjson==3.10.7", "mpire==2.10.2", "PyYAML==6.0.2", + "apscheduler==3.10.4" ] [tool.setuptools.dynamic] diff --git a/scheduler/__init__.py b/scheduler/__init__.py new file mode 100644 index 00000000..e69de29b