Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add schedule entrance and job name para #107

Open
wants to merge 9 commits into
base: pre-release/v0.3.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from cli.api import api
from cli.load import load
from cli.reorg import reorg
from cli.schedule import schedule
from cli.stream import stream
from indexer.utils.logging_utils import logging_basic_config

Expand All @@ -29,3 +30,4 @@ def cli(ctx):
cli.add_command(api, "api")
cli.add_command(aggregates, "aggregates")
cli.add_command(reorg, "reorg")
cli.add_command(schedule, "schedule")
40 changes: 35 additions & 5 deletions cli/aggregates.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,31 @@
import click

from common.services.postgresql_service import PostgreSQLService
from indexer.aggr_jobs.job_list_generator import JobListGenerator
from indexer.aggr_jobs.utils import DateType, check_data_completeness, get_yesterday_date
from indexer.controller.aggregates_controller import AggregatesController
from indexer.controller.dispatcher.aggregates_dispatcher import AggregatesDispatcher


@click.command(context_settings=dict(help_option_names=["-h", "--help"]))
@click.option(
"-cn",
"--chain-name",
default=None,
show_default=True,
type=str,
help="The chain name of the chain to aggregate data for",
envvar="CHAIN_NAME",
)
@click.option(
"-jn",
"--job-name",
default=None,
show_default=True,
type=str,
help="Job list to aggregate data for",
envvar="JOB_NAME",
)
@click.option(
"-pg",
"--postgres-url",
Expand Down Expand Up @@ -47,24 +66,35 @@
@click.option(
"-D",
"--date-batch-size",
default=30,
default=5,
show_default=True,
type=int,
envvar="DATE_BATCH_SIZE",
help="How many DATEs to batch in single sync round",
)
def aggregates(postgres_url, provider_uri, start_date, end_date, date_batch_size):
@click.option(
"-du",
"--dblink-url",
default=None,
show_default=True,
type=str,
envvar="DBLINK_URL",
help="dblink to take token price, maybe moved to other replace later",
)
def aggregates(chain_name, job_name, postgres_url, provider_uri, start_date, end_date, date_batch_size, dblink_url):
if not start_date and not end_date:
start_date, end_date = get_yesterday_date()
elif not end_date:
_, end_date = get_yesterday_date()

db_service = PostgreSQLService(postgres_url)

check_data_completeness(db_service, provider_uri, end_date)
# check_data_completeness(db_service, provider_uri, end_date)

config = {"db_service": db_service, "chain_name": chain_name, "dblink_url": dblink_url}
job_list = JobListGenerator(job_name)

config = {"db_service": db_service}
dispatcher = AggregatesDispatcher(config)
dispatcher = AggregatesDispatcher(config, job_list)

controller = AggregatesController(job_dispatcher=dispatcher)
controller.action(start_date, end_date, date_batch_size)
83 changes: 83 additions & 0 deletions cli/schedule.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import os
import sys
from datetime import datetime

import click
from apscheduler.schedulers.blocking import BlockingScheduler

from indexer.schedule_jobs.aggregates_jobs import aggregates_yesterday_job, parse_crontab


@click.command(context_settings=dict(help_option_names=["-h", "--help"]))
@click.option(
"-cn",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need chain name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because some configurations depend on the chain, just like the selection of protocol ID in Uniswap v3.
And some table fields have chain names.
But it is true that this will be removed due to the configuration change of the address

"--chain-name",
default=None,
show_default=True,
type=str,
help="The chain name of the chain to aggregate data for",
envvar="CHAIN_NAME",
)
@click.option(
"-jn",
"--job-name",
default=None,
show_default=True,
type=str,
help="Job list to aggregate data for",
envvar="JOB_NAME",
)
@click.option(
"-pg",
"--postgres-url",
type=str,
required=True,
envvar="POSTGRES_URL",
help="The required postgres connection url." "e.g. postgresql+psycopg2://postgres:[email protected]:5432/ethereum",
)
@click.option(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest not to add this. You can assume you have token price table in postgres-url

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

"-du",
"--dblink-url",
default=None,
show_default=True,
type=str,
envvar="DBLINK_URL",
help="dblink to take token price, maybe moved to other replace later",
)
@click.option(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can only run one job at a time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but it has changed now.

"-st",
"--schedule-time",
default="0 1 * * *",
show_default=True,
type=str,
envvar="SCHEDULE_TIME",
help="schedule time by crontab expression: default: 0 1 * * *",
)
def schedule(schedule_time, job_name, chain_name, postgres_url, dblink_url) -> None:
sys.stdout = os.fdopen(sys.stdout.fileno(), "w", buffering=1) # Line-buffered stdout
sys.stderr = os.fdopen(sys.stderr.fileno(), "w", buffering=1) # Line-buffered stderr

parsed_crontab = parse_crontab(schedule_time)
minute = parsed_crontab["minute"]
hour = parsed_crontab["hour"]

day = parsed_crontab["day"]
month = parsed_crontab["month"]
day_of_week = parsed_crontab["day_of_week"]

scheduler = BlockingScheduler()
job_args = (chain_name, job_name, postgres_url, dblink_url)
scheduler.add_job(
aggregates_yesterday_job,
"cron",
hour=hour,
minute=minute,
day=day,
month=month,
day_of_week=day_of_week,
args=job_args,
)

scheduler.start()
# current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# print(f'Job started {current_time}, schedule time is {hour}:{minute} daily')
11 changes: 10 additions & 1 deletion indexer/aggr_jobs/aggr_base_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,18 @@ def get_sql_content(self, file_name, start_date, end_date):

with open(file_path, "r") as f:
sql_template = f.read()
sql = sql_template.format(start_date=start_date, end_date=end_date)
sql = sql_template.format(
start_date=start_date, end_date=end_date, start_date_previous=self.get_previous(start_date)
)
return sql

@staticmethod
def get_previous(date_str):
date_obj = datetime.strptime(date_str, "%Y-%m-%d")
previous_day = date_obj - timedelta(days=1)
previous_day_str = previous_day.strftime("%Y-%m-%d")
return previous_day_str

@staticmethod
def generate_date_pairs(start_date, end_date):
start_date_obj = datetime.strptime(start_date, "%Y-%m-%d")
Expand Down
11 changes: 6 additions & 5 deletions indexer/aggr_jobs/aggr_job_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
"""

from indexer.aggr_jobs.disorder_jobs.disorder_job import AggrDisorderJob
from indexer.aggr_jobs.initialization_jobs.initialization_job import InitializationJob
from indexer.aggr_jobs.order_jobs.order_job import AggrOrderJob


class AggrJobScheduler:
def __init__(self, config):
def __init__(self, config, job_list):
self.config = config
self.job_list = job_list
self.jobs = self.instantiate_jobs()

def run_jobs(self, start_date, end_date):
Expand All @@ -18,9 +20,8 @@ def run_jobs(self, start_date, end_date):

def instantiate_jobs(self):
jobs = []
for job_class in [AggrDisorderJob, AggrOrderJob]:
job = job_class(
config=self.config,
)
# InitializationJob should be executed once only
for job_class in [InitializationJob, AggrDisorderJob, AggrOrderJob]:
job = job_class(config=self.config, job_list=self.job_list)
jobs.append(job)
return jobs
Empty file.
11 changes: 8 additions & 3 deletions indexer/aggr_jobs/disorder_jobs/disorder_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ class AggrDisorderJob(AggrBaseJob):

def __init__(self, **kwargs):
config = kwargs["config"]
job_list = kwargs["job_list"]
self.job_list = job_list.get_disordered_jobs()
self.db_service = config["db_service"]
self._batch_work_executor = BatchWorkExecutor(5, 5)
self._batch_work_executor = BatchWorkExecutor(10, 10)

def run(self, **kwargs):
start_date = kwargs["start_date"]
Expand All @@ -20,11 +22,14 @@ def run(self, **kwargs):
date_pairs = self.generate_date_pairs(start_date, end_date)
for date_pair in date_pairs:
start_date, end_date = date_pair
sql_content = self.get_sql_content("daily_wallet_addresses_aggregates", start_date, end_date)
execute_sql_list.append(sql_content)
# Could be replaced to auto and selected
for sql_name in self.job_list:
sql_content = self.get_sql_content(sql_name, start_date, end_date)
execute_sql_list.append(sql_content)

self._batch_work_executor.execute(execute_sql_list, self.execute_sql, total_items=len(execute_sql_list))
self._batch_work_executor.wait()
print(f"finish disorder job {start_date}")

def execute_sql(self, sql_contents):
session = self.db_service.Session()
Expand Down
Empty file.
Empty file.
68 changes: 68 additions & 0 deletions indexer/aggr_jobs/initialization_jobs/initialization_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import time

from sqlalchemy import text


class InitializationJob:
def __init__(self, **kwargs):
config = kwargs["config"]
job_list = kwargs["job_list"]
self.job_list = job_list.get_initialization_jobs()

self.db_service = config["db_service"]
self.dblink_url = config["dblink_url"]

def init_token_price(self):
token_price_sql_template = """
CREATE EXTENSION if not exists dblink;

DELETE FROM token_price WHERE timestamp >= :start_date;

INSERT INTO token_price
SELECT * FROM dblink(:dblink_url,
'SELECT * FROM w3w_commons.token_hourly_prices WHERE timestamp >= ''{start_date}'' ')
AS t(symbol varchar, timestamp timestamp, price numeric);
"""

sql = token_price_sql_template.format(start_date=self.start_date)

session = self.db_service.Session()

start_time = time.time()

session.execute(text(sql), {"start_date": self.start_date, "dblink_url": self.dblink_url})

session.commit()
execution_time = time.time() - start_time
print(f"----------- executed in {execution_time:.2f} seconds: init token price")

session.close()

def init_period_address_token_balance(self):
session = self.db_service.Session()

sql_template = """
delete from period_address_token_balances where period_date >= :start_date;
"""
start_time = time.time()
session.execute(text(sql_template), {"start_date": self.start_date})

session.commit()
execution_time = time.time() - start_time
print(f"----------- executed in {execution_time:.2f} seconds: init period_address_token_balances")

session.close()

def run(self, **kwargs):
self.start_date = kwargs["start_date"]
self.end_date = kwargs["end_date"]

# self.init_token_price()
# self.init_period_address_token_balance()

for function_name in self.job_list:
func = getattr(self, function_name, None)
if callable(func):
func()
else:
print(f"Function {function_name} does not exist")
48 changes: 48 additions & 0 deletions indexer/aggr_jobs/job_list_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
class JobListGenerator(object):
def __init__(self, job_name):
self.job_name = job_name

def get_initialization_jobs(self):
job_list = []
if self.job_name == "FBTC":
job_list = ["init_token_price", "init_period_address_token_balance"]
return job_list

def get_disordered_jobs(self):
job_list = []

if self.job_name == "FBTC":
job_list = [
"daily_feature_holding_balance_staked_fbtc_detail.sql",
"daily_feature_holding_balance_uniswap_v3.sql",
"daily_address_token_balances",
"daily_feature_erc20_token_supply_records.sql",
# 'daily_feature_erc1155_token_holdings.sql',
# 'daily_feature_erc1155_token_supply_records.sql'
]
elif self.job_name == "EXPLORE":
job_list = [
"daily_explore_aggregates.sql",
]

return job_list

def get_order_jobs(self):
job_list = []

if self.job_name == "FBTC":
job_list = [
"period_address_token_balances",
"period_feature_holding_balance_uniswap_v3.sql",
"period_feature_staked_fbtc_detail_records.sql",
"period_feature_holding_balance_staked_fbtc_detail.sql",
# 'period_feature_erc1155_token_holdings.sql',
"period_feature_erc1155_token_supply_records.sql",
"period_feature_holding_balance_merchantmoe.sql",
"period_feature_erc20_token_supply_records.sql",
"period_feature_holding_balance_dodo.sql",
"period_feature_holding_balance_lendle.sql",
"period_feature_defi_wallet_fbtc_aggregates.py",
]

return job_list
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import DATE, TIMESTAMP, Column, Index, func
from sqlalchemy.dialects.postgresql import BYTEA, NUMERIC, VARCHAR

from common.models import HemeraModel, general_converter


class PeriodFeatureHoldingBalanceLendle(HemeraModel):
__tablename__ = "period_feature_holding_balance_lendle"

period_date = Column(DATE, primary_key=True, nullable=False)
wallet_address = Column(BYTEA, primary_key=True)
protocol_id = Column(VARCHAR, primary_key=True)
contract_address = Column(BYTEA, primary_key=True)

token_symbol = Column(VARCHAR)
token_address = Column(BYTEA)

balance = Column(NUMERIC(100, 18))

create_time = Column(TIMESTAMP, server_default=func.now())
Loading
Loading