From 08289382fd0bb822e5307ee683e80e22ecd51f21 Mon Sep 17 00:00:00 2001 From: Shreyas Papinwar Date: Wed, 27 Apr 2022 18:13:36 +0100 Subject: [PATCH 01/13] Adding graphQL instead of using postgres for getting streams --- dags/ricochet_stream_watch.py | 33 +++++++++++++-------------------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/dags/ricochet_stream_watch.py b/dags/ricochet_stream_watch.py index 70471a6..07b634d 100644 --- a/dags/ricochet_stream_watch.py +++ b/dags/ricochet_stream_watch.py @@ -14,11 +14,13 @@ from blocksec_plugin.abis import REX_ABI, ERC20_ABI from datetime import datetime, timedelta from constants.constants import ScheduleConstants, Utils +import requests CLOSER_WALLET_ADDRESS = Variable.get("closer-address") EXCHANGE_ADDRESSES = Variable.get("ricochet-exchange-addresses", deserialize_json=True) SCHEDULE_INTERVAL = Variable.get("watch-schedule-interval", ScheduleConstants.RICOCHET_STREAM_WATCH) +SUPERFLUID_GRAPHQL_POLYGON_URI = Variable.get("superfluid-graphql-polygon-uri", default_var="https://api.thegraph.com/subgraphs/name/superfluid-finance/protocol-v1-matic") default_args = Utils.get_DAG_args() @@ -43,27 +45,18 @@ def review_streamers_and_trigger_closures(**context): """ execution_date = context['execution_date'].isoformat() exchange_address = context['exchange_address'] - print(f"Checking exchange {exchange_address}") - sql = f""" - with streamer_rates as ( - select args->>'from' as streamer, - FIRST_VALUE(args->>'newRate') OVER (PARTITION BY args->>'from' ORDER BY block_number DESC) as rate - from ethereum_events - where event = 'UpdatedStream' - and address ='{exchange_address}' - ) - select distinct streamer, CAST(rate as float) - from streamer_rates - where CAST(rate as FLOAT) > 0 - order by 2 desc - """ - print(sql) - postgres = PostgresHook(postgres_conn_id='data_warehouse') - conn = postgres.get_conn() - cursor = conn.cursor() - cursor.execute(sql) - streamers = [result[0] for result in cursor.fetchall()] + query = '{' +'account(id: "{}")'.format(exchange_address) + ''' { + inflows { + id + } + } + } + ''' + + request = requests.post(SUPERFLUID_GRAPHQL_POLYGON_URI, json={'query': query}) + response = request.json() + streamers = [streamer['id'].split("-")[0] for streamer in response['data']['account']['inflows']] print("Streamers", streamers) web3 = Web3Hook(web3_conn_id='infura').http_client From 555057f9f3bfa234240b23c2cdedf6468dde529b Mon Sep 17 00:00:00 2001 From: Shreyas Papinwar Date: Wed, 27 Apr 2022 18:21:09 +0100 Subject: [PATCH 02/13] Deleted ethereum block poll dag --- dags/ethereum_block_poll.py | 113 ------------------------------------ 1 file changed, 113 deletions(-) delete mode 100644 dags/ethereum_block_poll.py diff --git a/dags/ethereum_block_poll.py b/dags/ethereum_block_poll.py deleted file mode 100644 index a88d194..0000000 --- a/dags/ethereum_block_poll.py +++ /dev/null @@ -1,113 +0,0 @@ -""" -Ethereum Block Poll - -- Every minute, check the Ethereum block height -- Save the block height to the data warehouse - -** Requires the following tables exist: - -CREATE TABLE ethereum_events( - id SERIAL PRIMARY KEY, - args JSON, - event VARCHAR(128), - log_index INTEGER, - transaction_index INTEGER, - transaction_hash VARCHAR(68), - address VARCHAR(68), - block_hash VARCHAR(68), - block_number INTEGER, - created_at timestamp DEFAULT CURRENT_TIMESTAMP -); - -CREATE TABLE ethereum_blocks( - id SERIAL PRIMARY KEY, - block_height INTEGER, - mined_at timestamp, - created_at timestamp DEFAULT CURRENT_TIMESTAMP -); - -""" -from airflow import DAG -from airflow.models import Variable -from airflow.operators.bash_operator import BashOperator -from airflow.hooks.postgres_hook import PostgresHook -from airflow.operators.python_operator import PythonOperator -from blocksec_plugin.web3_hook import Web3Hook -from blocksec_plugin.ethereum_block_to_postgres_operator import EthereumBlocktoPostgresOperator -from blocksec_plugin.ethereum_events_to_postgres_operator import EthereumEventstoPostgresOperator -from blocksec_plugin.abis import REX_ABI -from datetime import datetime, timedelta -from json import loads -from constants.constants import ScheduleConstants, Utils - -SCHEDULE_INTERVAL = Variable.get("block-poll-schedule-interval", ScheduleConstants.ETHEREUM_BLOCK_POLL) - -default_args = Utils.get_DAG_args(start_date=datetime(2021, 8, 1, 23, 0)) - -dag = DAG("ethereum_block_poll", catchup=False, default_args=default_args, schedule_interval=SCHEDULE_INTERVAL) - -EXCHANGE_ADDRESSES = Variable.get("ricochet-exchange-addresses", deserialize_json=True) - -def get_from_block_height(**context): - """ - Check the smart_contracts table for the current_block_height - """ - execution_date = context['execution_date'].isoformat() - sql = """ - SELECT block_height - FROM ethereum_blocks - WHERE mined_at < ('{0}'::timestamp - interval '1 hour') - ORDER BY 1 DESC - LIMIT 1 - """.format(execution_date) - print(sql) - postgres = PostgresHook(postgres_conn_id='data_warehouse') - conn = postgres.get_conn() - cursor = conn.cursor() - cursor.execute(sql) - result = cursor.fetchall() - try: - from_block_height = result[0][0] - except IndexError: - # first time running - from_block_height = 17498383 - return from_block_height - -done = BashOperator( - task_id='done', - bash_command='date', - dag=dag, -) - -get_to_block_height = EthereumBlocktoPostgresOperator( - task_id="get_to_block_height", - postgres_conn_id='data_warehouse', - postgres_table='ethereum_blocks', - web3_conn_id='infura', - dag=dag, -) - -get_from_block_height = PythonOperator( - task_id="get_from_block_height", - provide_context=True, - python_callable=get_from_block_height, - dag=dag -) - -for exchange_address in EXCHANGE_ADDRESSES: - - extract_events = EthereumEventstoPostgresOperator( - task_id="record_events_"+exchange_address, - postgres_conn_id='data_warehouse', - postgres_table='ethereum_events', - abi_json=loads(REX_ABI), - contract_address=exchange_address, - from_block="{{task_instance.xcom_pull(task_ids='get_from_block_height')}}", - to_block="{{task_instance.xcom_pull(task_ids='get_to_block_height')}}", - web3_conn_id='infura', - dag=dag, - ) - - extract_events << get_to_block_height - extract_events << get_from_block_height - done << extract_events From 1749d3f5d177aa40b0f103c7e29e47515a4aae14 Mon Sep 17 00:00:00 2001 From: Shreyas Papinwar Date: Wed, 27 Apr 2022 20:41:43 +0100 Subject: [PATCH 03/13] Replacing exchange_address -> contract_address fixing stream_closure --- dags/blocksec_plugin/ricochet_streamer_close_operator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/blocksec_plugin/ricochet_streamer_close_operator.py b/dags/blocksec_plugin/ricochet_streamer_close_operator.py index c2bda95..924f985 100644 --- a/dags/blocksec_plugin/ricochet_streamer_close_operator.py +++ b/dags/blocksec_plugin/ricochet_streamer_close_operator.py @@ -7,7 +7,7 @@ class RicochetStreamerCloseOperator(ContractInteractionOperator): """ Closes a streamers stream using `closeStream` """ - template_fields = ['streamer_address', 'exchange_address'] + template_fields = ['streamer_address', 'contract_address'] ui_color = "#ADF5FF" @apply_defaults From 770c1b081e1382555457ac79b469b316df23049e Mon Sep 17 00:00:00 2001 From: Shreyas Papinwar Date: Wed, 27 Apr 2022 20:50:42 +0100 Subject: [PATCH 04/13] Replacing exchange_address -> contract_address fixing stream_closure --- dags/ricochet_stream_closure.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/ricochet_stream_closure.py b/dags/ricochet_stream_closure.py index 435bbd5..6bf05b3 100644 --- a/dags/ricochet_stream_closure.py +++ b/dags/ricochet_stream_closure.py @@ -40,7 +40,7 @@ web3_conn_id="infura", # Set in Aiflow Connections UI ethereum_wallet=CLOSER_WALLET_ADDRESS, # Set in Airflow Connections UI streamer_address='{{ dag_run.conf["streamer_address"] }}', - contract_address='{{ dag_run.conf["exchange_address"] }}', + contract_address='{{ dag_run.conf["contract_address"] }}', nonce='{{ dag_run.conf["nonce"] }}', gas_multiplier=PriceConstants.GAS_MULTIPLIER_STREAM_CLOSURE, max_gas_price=MAX_GAS_PRICE, From 4fd7991a439186b2b8b19df5a51c500dc4dac4c8 Mon Sep 17 00:00:00 2001 From: Shreyas Papinwar Date: Fri, 29 Apr 2022 18:32:12 +0100 Subject: [PATCH 05/13] testing --- dags/blocksec_plugin/ricochet_streamer_close_operator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/blocksec_plugin/ricochet_streamer_close_operator.py b/dags/blocksec_plugin/ricochet_streamer_close_operator.py index 924f985..cb284f0 100644 --- a/dags/blocksec_plugin/ricochet_streamer_close_operator.py +++ b/dags/blocksec_plugin/ricochet_streamer_close_operator.py @@ -7,7 +7,7 @@ class RicochetStreamerCloseOperator(ContractInteractionOperator): """ Closes a streamers stream using `closeStream` """ - template_fields = ['streamer_address', 'contract_address'] + template_fields = ['streamer_address', 'exchange_address', 'nonce'] ui_color = "#ADF5FF" @apply_defaults From a2733e05f007fb74b0562896955d717bf07d48b5 Mon Sep 17 00:00:00 2001 From: Shreyas Papinwar Date: Fri, 29 Apr 2022 18:33:07 +0100 Subject: [PATCH 06/13] testing --- dags/ricochet_stream_closure.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/ricochet_stream_closure.py b/dags/ricochet_stream_closure.py index 6bf05b3..435bbd5 100644 --- a/dags/ricochet_stream_closure.py +++ b/dags/ricochet_stream_closure.py @@ -40,7 +40,7 @@ web3_conn_id="infura", # Set in Aiflow Connections UI ethereum_wallet=CLOSER_WALLET_ADDRESS, # Set in Airflow Connections UI streamer_address='{{ dag_run.conf["streamer_address"] }}', - contract_address='{{ dag_run.conf["contract_address"] }}', + contract_address='{{ dag_run.conf["exchange_address"] }}', nonce='{{ dag_run.conf["nonce"] }}', gas_multiplier=PriceConstants.GAS_MULTIPLIER_STREAM_CLOSURE, max_gas_price=MAX_GAS_PRICE, From e94128a6cdcfe2ae7c04092017307da791f59db3 Mon Sep 17 00:00:00 2001 From: Shreyas Papinwar Date: Sat, 30 Apr 2022 17:59:18 +0100 Subject: [PATCH 07/13] test --- dags/blocksec_plugin/ricochet_streamer_close_operator.py | 2 ++ dags/ricochet_stream_closure.py | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/dags/blocksec_plugin/ricochet_streamer_close_operator.py b/dags/blocksec_plugin/ricochet_streamer_close_operator.py index cb284f0..3c19b3b 100644 --- a/dags/blocksec_plugin/ricochet_streamer_close_operator.py +++ b/dags/blocksec_plugin/ricochet_streamer_close_operator.py @@ -13,6 +13,7 @@ class RicochetStreamerCloseOperator(ContractInteractionOperator): @apply_defaults def __init__(self, streamer_address, + exhange_address, *args, **kwargs): super().__init__(abi_json=REX_ABI, *args, **kwargs) @@ -21,6 +22,7 @@ def __init__(self, def execute(self, context): self.streamer_address = streamer_address + self.exchange_address = exhange_address self.function = self.contract.functions.closeStream self.function_args = {"streamer": self.streamer_address} return super().execute(context) diff --git a/dags/ricochet_stream_closure.py b/dags/ricochet_stream_closure.py index 435bbd5..76532b9 100644 --- a/dags/ricochet_stream_closure.py +++ b/dags/ricochet_stream_closure.py @@ -40,7 +40,7 @@ web3_conn_id="infura", # Set in Aiflow Connections UI ethereum_wallet=CLOSER_WALLET_ADDRESS, # Set in Airflow Connections UI streamer_address='{{ dag_run.conf["streamer_address"] }}', - contract_address='{{ dag_run.conf["exchange_address"] }}', + exchange_address='{{ dag_run.conf["exchange_address"] }}', nonce='{{ dag_run.conf["nonce"] }}', gas_multiplier=PriceConstants.GAS_MULTIPLIER_STREAM_CLOSURE, max_gas_price=MAX_GAS_PRICE, From 63470b6a10a5a7c0fc5bc67e5c70dc127f75d9d1 Mon Sep 17 00:00:00 2001 From: Shreyas Papinwar Date: Sat, 30 Apr 2022 18:13:29 +0100 Subject: [PATCH 08/13] tets --- dags/blocksec_plugin/ricochet_streamer_close_operator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dags/blocksec_plugin/ricochet_streamer_close_operator.py b/dags/blocksec_plugin/ricochet_streamer_close_operator.py index 3c19b3b..819adb9 100644 --- a/dags/blocksec_plugin/ricochet_streamer_close_operator.py +++ b/dags/blocksec_plugin/ricochet_streamer_close_operator.py @@ -7,7 +7,7 @@ class RicochetStreamerCloseOperator(ContractInteractionOperator): """ Closes a streamers stream using `closeStream` """ - template_fields = ['streamer_address', 'exchange_address', 'nonce'] + template_fields = ['streamer_address', 'contract_address'] ui_color = "#ADF5FF" @apply_defaults @@ -22,7 +22,7 @@ def __init__(self, def execute(self, context): self.streamer_address = streamer_address - self.exchange_address = exhange_address + self.exchange_address = contract_address self.function = self.contract.functions.closeStream self.function_args = {"streamer": self.streamer_address} return super().execute(context) From da1d36b47da11f290d61cc7211121cea53458f49 Mon Sep 17 00:00:00 2001 From: Shreyas Papinwar Date: Sat, 30 Apr 2022 18:20:42 +0100 Subject: [PATCH 09/13] test --- dags/blocksec_plugin/ricochet_streamer_close_operator.py | 2 -- dags/ricochet_stream_closure.py | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/dags/blocksec_plugin/ricochet_streamer_close_operator.py b/dags/blocksec_plugin/ricochet_streamer_close_operator.py index 819adb9..924f985 100644 --- a/dags/blocksec_plugin/ricochet_streamer_close_operator.py +++ b/dags/blocksec_plugin/ricochet_streamer_close_operator.py @@ -13,7 +13,6 @@ class RicochetStreamerCloseOperator(ContractInteractionOperator): @apply_defaults def __init__(self, streamer_address, - exhange_address, *args, **kwargs): super().__init__(abi_json=REX_ABI, *args, **kwargs) @@ -22,7 +21,6 @@ def __init__(self, def execute(self, context): self.streamer_address = streamer_address - self.exchange_address = contract_address self.function = self.contract.functions.closeStream self.function_args = {"streamer": self.streamer_address} return super().execute(context) diff --git a/dags/ricochet_stream_closure.py b/dags/ricochet_stream_closure.py index 76532b9..435bbd5 100644 --- a/dags/ricochet_stream_closure.py +++ b/dags/ricochet_stream_closure.py @@ -40,7 +40,7 @@ web3_conn_id="infura", # Set in Aiflow Connections UI ethereum_wallet=CLOSER_WALLET_ADDRESS, # Set in Airflow Connections UI streamer_address='{{ dag_run.conf["streamer_address"] }}', - exchange_address='{{ dag_run.conf["exchange_address"] }}', + contract_address='{{ dag_run.conf["exchange_address"] }}', nonce='{{ dag_run.conf["nonce"] }}', gas_multiplier=PriceConstants.GAS_MULTIPLIER_STREAM_CLOSURE, max_gas_price=MAX_GAS_PRICE, From 9316c253bff1d73b35b2ddf26fab4e17c3186f07 Mon Sep 17 00:00:00 2001 From: Shreyas Papinwar Date: Sat, 30 Apr 2022 18:36:29 +0100 Subject: [PATCH 10/13] test --- dags/ricochet_stream_closure.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/ricochet_stream_closure.py b/dags/ricochet_stream_closure.py index 435bbd5..6bf05b3 100644 --- a/dags/ricochet_stream_closure.py +++ b/dags/ricochet_stream_closure.py @@ -40,7 +40,7 @@ web3_conn_id="infura", # Set in Aiflow Connections UI ethereum_wallet=CLOSER_WALLET_ADDRESS, # Set in Airflow Connections UI streamer_address='{{ dag_run.conf["streamer_address"] }}', - contract_address='{{ dag_run.conf["exchange_address"] }}', + contract_address='{{ dag_run.conf["contract_address"] }}', nonce='{{ dag_run.conf["nonce"] }}', gas_multiplier=PriceConstants.GAS_MULTIPLIER_STREAM_CLOSURE, max_gas_price=MAX_GAS_PRICE, From ea92fb3605d7c00c59aff3f07ac90bf16ad1e510 Mon Sep 17 00:00:00 2001 From: Shreyas Papinwar Date: Sat, 30 Apr 2022 18:46:02 +0100 Subject: [PATCH 11/13] test --- dags/ricochet_stream_closure.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/ricochet_stream_closure.py b/dags/ricochet_stream_closure.py index 6bf05b3..435bbd5 100644 --- a/dags/ricochet_stream_closure.py +++ b/dags/ricochet_stream_closure.py @@ -40,7 +40,7 @@ web3_conn_id="infura", # Set in Aiflow Connections UI ethereum_wallet=CLOSER_WALLET_ADDRESS, # Set in Airflow Connections UI streamer_address='{{ dag_run.conf["streamer_address"] }}', - contract_address='{{ dag_run.conf["contract_address"] }}', + contract_address='{{ dag_run.conf["exchange_address"] }}', nonce='{{ dag_run.conf["nonce"] }}', gas_multiplier=PriceConstants.GAS_MULTIPLIER_STREAM_CLOSURE, max_gas_price=MAX_GAS_PRICE, From ca6bcc8db8ffae1067d564113e6a74fc959d9ffd Mon Sep 17 00:00:00 2001 From: Shreyas Papinwar Date: Sat, 28 May 2022 14:51:06 +0100 Subject: [PATCH 12/13] contract_address -> exchange_address, as we have closeStream function in v2 --- dags/blocksec_plugin/ricochet_streamer_close_operator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/blocksec_plugin/ricochet_streamer_close_operator.py b/dags/blocksec_plugin/ricochet_streamer_close_operator.py index 924f985..c2bda95 100644 --- a/dags/blocksec_plugin/ricochet_streamer_close_operator.py +++ b/dags/blocksec_plugin/ricochet_streamer_close_operator.py @@ -7,7 +7,7 @@ class RicochetStreamerCloseOperator(ContractInteractionOperator): """ Closes a streamers stream using `closeStream` """ - template_fields = ['streamer_address', 'contract_address'] + template_fields = ['streamer_address', 'exchange_address'] ui_color = "#ADF5FF" @apply_defaults From b0d3d3131f64cf78de44646b937cb2e04018d951 Mon Sep 17 00:00:00 2001 From: Shreyas Papinwar Date: Sat, 28 May 2022 14:52:26 +0100 Subject: [PATCH 13/13] Delete unused postgres files --- .../ethereum_block_to_postgres_operator.py | 45 ------ .../ethereum_events_to_postgres_operator.py | 146 ------------------ 2 files changed, 191 deletions(-) delete mode 100644 dags/blocksec_plugin/ethereum_block_to_postgres_operator.py delete mode 100644 dags/blocksec_plugin/ethereum_events_to_postgres_operator.py diff --git a/dags/blocksec_plugin/ethereum_block_to_postgres_operator.py b/dags/blocksec_plugin/ethereum_block_to_postgres_operator.py deleted file mode 100644 index aa0acc1..0000000 --- a/dags/blocksec_plugin/ethereum_block_to_postgres_operator.py +++ /dev/null @@ -1,45 +0,0 @@ -from airflow.models.baseoperator import BaseOperator -from airflow.utils.decorators import apply_defaults -from airflow.hooks.postgres_hook import PostgresHook -from blocksec_plugin.web3_hook import Web3Hook -from datetime import datetime, timedelta -from tempfile import NamedTemporaryFile -from web3.middleware import geth_poa_middleware - -class EthereumBlocktoPostgresOperator(BaseOperator): - """ - Checks the current Ethereum block height and save it to a postgres - database table - """ - - @apply_defaults - def __init__(self, - postgres_conn_id='postgres_default', - postgres_table='ethereum_blocks', - web3_conn_id='web3_default', - *args, - **kwargs): - super().__init__(*args, **kwargs) - self.postgres_conn_id = postgres_conn_id - self.postgres_table = postgres_table - self.web3_conn_id = web3_conn_id - - def execute(self, context): - """ - Check the block height on Ethereum and save it to postgres - """ - web3 = Web3Hook(web3_conn_id=self.web3_conn_id).http_client - web3.middleware_onion.inject(geth_poa_middleware, layer=0) - print("Web3", web3) - block_height = web3.eth.blockNumber - print(block_height) - block_data = web3.eth.getBlock(block_height) - print(block_data) - with NamedTemporaryFile(mode='r+') as file: - file.write("{0}\t{1}\n".format(block_height, datetime.fromtimestamp(block_data.timestamp))) - file.seek(0) - postgres = PostgresHook(postgres_conn_id=self.postgres_conn_id) - print(file.name) - result = postgres.bulk_load('ethereum_blocks(block_height,mined_at)', file.name) - print(result) - return block_data.number diff --git a/dags/blocksec_plugin/ethereum_events_to_postgres_operator.py b/dags/blocksec_plugin/ethereum_events_to_postgres_operator.py deleted file mode 100644 index b9086e2..0000000 --- a/dags/blocksec_plugin/ethereum_events_to_postgres_operator.py +++ /dev/null @@ -1,146 +0,0 @@ -from airflow.models.baseoperator import BaseOperator -from airflow.utils.decorators import apply_defaults -from airflow.hooks.postgres_hook import PostgresHook -from blocksec_plugin.web3_hook import Web3Hook -from datetime import datetime -from tempfile import NamedTemporaryFile -from copy import deepcopy -from functools import partial -from eth_abi.exceptions import DecodingError -from eth_utils import event_abi_to_log_topic -from web3._utils.events import get_event_data -from json import dumps -import re - - -class EthereumEventstoPostgresOperator(BaseOperator): - """ - Get the event logs for a Ethereum contract within a range of blocks - """ - template_fields = ['from_block', 'to_block'] - - @apply_defaults - def __init__(self, - web3_conn_id='web3_default', - postgres_conn_id='postgres_default', - postgres_table='ethereum_events', - contract_address=None, - abi_json=None, - event_name=None, - from_block=None, - to_block=None, - *args, - **kwargs): - super().__init__(*args, **kwargs) - self.postgres_conn_id = postgres_conn_id - self.postgres_table = postgres_table - self.web3_conn_id = web3_conn_id - self.contract_address = contract_address - self.abi_json = abi_json - self.event_name = event_name - self.from_block = from_block - self.to_block = to_block - self.web3 = Web3Hook(web3_conn_id=self.web3_conn_id).http_client - - def execute(self, context): - to_block = int(self.to_block) - while int(self.from_block) < to_block: - self.to_block = int(self.from_block) + 10000 - 1 - print("Scanning", self.from_block, self.to_block) - events = self._load_contract_events() - self.from_block = self.to_block - postgres = PostgresHook(postgres_conn_id=self.postgres_conn_id) - # conn = postgres.get_conn() - # cursor = conn.cursor() - # cursor.execute(""" - # INSERT INTO ethereum_events(args,event,log_index,transaction_index,transaction_hash,address,block_hash,block_number) - # VALUES ('{0}','{1}',{2},{3},'{4}','{5}','{6}',{7}) - # """.format(args,event["event"],event["logIndex"],\ - # event["transactionIndex"],event["transactionHash"],\ - # event["address"],event["blockHash"],event["blockNumber"])) - with NamedTemporaryFile(mode='r+') as file: - print(file.name) - for event in events: - print(event["args"]) - try: - args = dumps(dict(event["args"])) - except TypeError: - args = {} - for key, value in event["args"].items(): - pattern = re.compile('[\W_]+', re.UNICODE) - if isinstance(value, bytes): - args[key] = pattern.sub('', value.hex()) - elif isinstance(value, str): - args[key] = pattern.sub('',value.replace("\\","\\\\")) - else: - args[key] = value - - args = dumps(args) - file.write("{0}\t{1}\t{2}\t{3}\t{4}\t{5}\t{6}\t{7}\t{8}\n"\ - .format(args,event["event"],event["logIndex"],\ - event["transactionIndex"],event["transactionHash"].hex(),\ - event["address"],event["blockHash"].hex(),event["blockNumber"], datetime.now())) - file.seek(0) - result = postgres.bulk_load('ethereum_events(args,event,log_index,transaction_index,transaction_hash,address,block_hash,block_number,created_at)', file.name) - - - def _load_contract_events(self): - decoders = self._get_log_decoders(self.abi_json) - print("Filtering Eth logs") - logs = self.web3.eth.getLogs({ - 'address': self.contract_address, - 'fromBlock': int(self.from_block), - 'toBlock': int(self.to_block) - }) - print("Getting all events from filter", self.from_block, self.to_block) - #event_logs = event_filter.get_all_entries() - return self._decode_logs(logs, decoders) - - - def _get_log_decoders(self, contract_abi): - """ - Source: banteg/tellor - """ - decoders = { - event_abi_to_log_topic(abi): partial(get_event_data, self.web3.codec, abi) - for abi in contract_abi if abi['type'] == 'event' - } - # fix for byte nonce in events - # nonce_string_abi = next(x for x in contract_abi if x.get('name') == 'NonceSubmitted') - # nonce_string_topic = event_abi_to_log_topic(nonce_string_abi) - # nonce_bytes_abi = deepcopy(nonce_string_abi) - # nonce_bytes_abi['inputs'][1]['type'] = 'bytes' - # decoders[nonce_string_topic] = partial(self._decode_log_with_fallback, [nonce_string_abi, nonce_bytes_abi]) - return decoders - - - def _decode_log_with_fallback(self, abis_to_try, log): - """ - Source: banteg/tellor - """ - for abi in abis_to_try: - try: - log_with_replaced_topic = deepcopy(log) - log_with_replaced_topic['topics'][0] = event_abi_to_log_topic(abi) - return get_event_data(self.web3.codec, abi, log_with_replaced_topic) - except DecodingError: - print('trying fallback log decoder') - raise DecodingError('could not decode log') - - - def _decode_logs(self, logs, decoders): - """ - Source: banteg/tellor - """ - result = [] - for log in logs: - topic = log['topics'][0] - if topic in decoders: - try: - decoded = decoders[topic](log) - result.append(decoded) - except DecodingError as e: - print('could not decode log') - print(log) - print(e) - return result