Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using subgraph to get address of streamers #46

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
45 changes: 0 additions & 45 deletions dags/blocksec_plugin/ethereum_block_to_postgres_operator.py

This file was deleted.

146 changes: 0 additions & 146 deletions dags/blocksec_plugin/ethereum_events_to_postgres_operator.py

This file was deleted.

113 changes: 0 additions & 113 deletions dags/ethereum_block_poll.py

This file was deleted.

33 changes: 13 additions & 20 deletions dags/ricochet_stream_watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
from blocksec_plugin.abis import REX_ABI, ERC20_ABI
from datetime import datetime, timedelta
from constants.constants import ScheduleConstants, Utils
import requests


CLOSER_WALLET_ADDRESS = Variable.get("closer-address")
EXCHANGE_ADDRESSES = Variable.get("ricochet-exchange-addresses", deserialize_json=True)
SCHEDULE_INTERVAL = Variable.get("watch-schedule-interval", ScheduleConstants.RICOCHET_STREAM_WATCH)
SUPERFLUID_GRAPHQL_POLYGON_URI = Variable.get("superfluid-graphql-polygon-uri", default_var="https://api.thegraph.com/subgraphs/name/superfluid-finance/protocol-v1-matic")

default_args = Utils.get_DAG_args()

Expand All @@ -43,27 +45,18 @@ def review_streamers_and_trigger_closures(**context):
"""
execution_date = context['execution_date'].isoformat()
exchange_address = context['exchange_address']
print(f"Checking exchange {exchange_address}")
sql = f"""
with streamer_rates as (
select args->>'from' as streamer,
FIRST_VALUE(args->>'newRate') OVER (PARTITION BY args->>'from' ORDER BY block_number DESC) as rate
from ethereum_events
where event = 'UpdatedStream'
and address ='{exchange_address}'
)
select distinct streamer, CAST(rate as float)
from streamer_rates
where CAST(rate as FLOAT) > 0
order by 2 desc
"""
print(sql)
postgres = PostgresHook(postgres_conn_id='data_warehouse')
conn = postgres.get_conn()
cursor = conn.cursor()
cursor.execute(sql)
streamers = [result[0] for result in cursor.fetchall()]

query = '{' +'account(id: "{}")'.format(exchange_address) + ''' {
inflows {
id
}
}
}
'''

request = requests.post(SUPERFLUID_GRAPHQL_POLYGON_URI, json={'query': query})
response = request.json()
streamers = [streamer['id'].split("-")[0] for streamer in response['data']['account']['inflows']]
print("Streamers", streamers)

web3 = Web3Hook(web3_conn_id='infura').http_client
Expand Down