diff --git a/crawlers/mooncrawl/mooncrawl/blockchain.py b/crawlers/mooncrawl/mooncrawl/blockchain.py index f5f45b706..1e9a84924 100644 --- a/crawlers/mooncrawl/mooncrawl/blockchain.py +++ b/crawlers/mooncrawl/mooncrawl/blockchain.py @@ -48,19 +48,21 @@ def connect( blockchain_type: AvailableBlockchainType, web3_uri: Optional[str] = None, access_id: Optional[UUID] = None, + request_timeout: Optional[int] = None, ) -> Web3: web3_provider: Union[IPCProvider, HTTPProvider] = Web3.IPCProvider() - request_kwargs: Any = None + request_kwargs = {} if access_id is not None: - request_kwargs = { - "headers": { - NB_ACCESS_ID_HEADER: str(access_id), - NB_DATA_SOURCE_HEADER: "blockchain", - "Content-Type": "application/json", - } + request_kwargs["headers"] = { + NB_ACCESS_ID_HEADER: str(access_id), + NB_DATA_SOURCE_HEADER: "blockchain", + "Content-Type": "application/json", } + if request_timeout is not None: + request_kwargs["timeout"] = request_timeout + if web3_uri is None: if blockchain_type == AvailableBlockchainType.ETHEREUM: web3_uri = MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI @@ -73,10 +75,11 @@ def connect( if web3_uri.startswith("http://") or web3_uri.startswith("https://"): web3_provider = Web3.HTTPProvider(web3_uri, request_kwargs=request_kwargs) + elif web3_uri.startswith("wss://"): + web3_provider = Web3.WebsocketProvider(web3_uri) else: web3_provider = Web3.IPCProvider(web3_uri) web3_client = Web3(web3_provider) - # Inject --dev middleware if it is not Ethereum mainnet # Docs: https://web3py.readthedocs.io/en/stable/middleware.html#geth-style-proof-of-authority if blockchain_type != AvailableBlockchainType.ETHEREUM: diff --git a/crawlers/mooncrawl/mooncrawl/generic_crawler/base.py b/crawlers/mooncrawl/mooncrawl/generic_crawler/base.py index d4a93d92a..e7f1aed43 100644 --- a/crawlers/mooncrawl/mooncrawl/generic_crawler/base.py +++ b/crawlers/mooncrawl/mooncrawl/generic_crawler/base.py @@ -2,8 +2,10 @@ import logging import time from dataclasses import dataclass -from typing import Any, Dict, List, Optional, Set, Union - +from typing import Any, Dict, List, Optional, Set, Union, Callable +from eth_abi.codec import ABICodec +from web3._utils.events import get_event_data +from web3._utils.filters import construct_event_filter_params import web3 from eth_typing import ChecksumAddress from hexbytes.main import HexBytes @@ -19,7 +21,6 @@ ContractFunctionCall, utfy_dict, ) -from moonworm.crawler.log_scanner import _fetch_events_chunk # type: ignore from sqlalchemy.orm.session import Session from tqdm import tqdm from web3 import Web3 @@ -44,13 +45,21 @@ logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -# TODO: ADD VALUE!!! + @dataclass -class ExtededFunctionCall(ContractFunctionCall): +class ExtededFunctionCall: + block_number: int + block_timestamp: int + transaction_hash: str + contract_address: str + caller_address: str + function_name: str + function_args: Dict[str, Any] gas_price: int + value: int = 0 max_fee_per_gas: Optional[int] = None max_priority_fee_per_gas: Optional[int] = None - value: int = 0 + status: Optional[str] = None def _function_call_with_gas_price_to_label( @@ -69,8 +78,6 @@ def _function_call_with_gas_price_to_label( "name": function_call.function_name, "caller": function_call.caller_address, "args": function_call.function_args, - "status": function_call.status, - "gasUsed": function_call.gas_used, "gasPrice": function_call.gas_price, "maxFeePerGas": function_call.max_fee_per_gas, "maxPriorityFeePerGas": function_call.max_priority_fee_per_gas, @@ -96,7 +103,7 @@ def add_function_calls_with_gas_price_to_session( transactions_hashes_to_save = [ function_call.transaction_hash for function_call in function_calls ] - + logger.info(f"Querrying existing labels (function call)") existing_labels = ( db_session.query(label_model.transaction_hash) .filter( @@ -106,6 +113,7 @@ def add_function_calls_with_gas_price_to_session( ) .all() ) + logger.info(f"Querry finished") existing_labels_transactions = [label[0] for label in existing_labels] @@ -152,6 +160,76 @@ def _transform_to_w3_tx( return tx +def _fetch_events_chunk( + web3, + event_abi, + from_block: int, + to_block: int, + addresses: Optional[List[ChecksumAddress]] = None, + on_decode_error: Optional[Callable[[Exception], None]] = None, + address_block_list: Optional[List[ChecksumAddress]] = None, +) -> List[Any]: + """Get events using eth_getLogs API. + + Event structure: + { + "event": Event name, + "args": dictionary of event arguments, + "address": contract address, + "blockNumber": block number, + "transactionHash": transaction hash, + "logIndex": log index + } + + """ + + if from_block is None: + raise TypeError("Missing mandatory keyword argument to getLogs: fromBlock") + + # Depending on the Solidity version used to compile + # the contract that uses the ABI, + # it might have Solidity ABI encoding v1 or v2. + # We just assume the default that you set on Web3 object here. + # More information here https://eth-abi.readthedocs.io/en/latest/index.html + codec: ABICodec = web3.codec + + _, event_filter_params = construct_event_filter_params( + event_abi, + codec, + fromBlock=from_block, + toBlock=to_block, + ) + if addresses: + event_filter_params["address"] = addresses + + logs = web3.eth.get_logs(event_filter_params) + logger.info(f"Fetched {len(logs)} raw logs") + # Convert raw binary data to Python proxy objects as described by ABI + all_events = [] + for log in logs: + if address_block_list and log["address"] in address_block_list: + continue + try: + raw_event = get_event_data(codec, event_abi, log) + event = { + "event": raw_event["event"], + "args": json.loads(Web3.toJSON(utfy_dict(dict(raw_event["args"])))), + "address": raw_event["address"], + "blockNumber": raw_event["blockNumber"], + "transactionHash": raw_event["transactionHash"].hex(), + "logIndex": raw_event["logIndex"], + } + all_events.append(event) + except Exception as e: + if address_block_list is not None: + address_block_list.append(log["address"]) + if on_decode_error: + on_decode_error(e) + continue + logger.info(f"Decoded {len(all_events)} logs") + return all_events + + def process_transaction( db_session: Session, web3: Web3, @@ -160,19 +238,20 @@ def process_transaction( secondary_abi: List[Dict[str, Any]], transaction: Dict[str, Any], blocks_cache: Dict[int, int], + skip_decoding: bool = False, ): + selector = transaction["input"][:10] + function_name = selector + function_args = "unknown" + if not skip_decoding: + try: + raw_function_call = contract.decode_function_input(transaction["input"]) + function_name = raw_function_call[0].fn_name + function_args = utfy_dict(raw_function_call[1]) + except Exception as e: + pass + # logger.error(f"Failed to decode transaction : {str(e)}") - try: - raw_function_call = contract.decode_function_input(transaction["input"]) - function_name = raw_function_call[0].fn_name - function_args = utfy_dict(raw_function_call[1]) - except Exception as e: - # logger.error(f"Failed to decode transaction : {str(e)}") - selector = transaction["input"][:10] - function_name = selector - function_args = "unknown" - - transaction_reciept = web3.eth.getTransactionReceipt(transaction["hash"]) block_timestamp = get_block_timestamp( db_session, web3, @@ -190,8 +269,6 @@ def process_transaction( caller_address=transaction["from"], function_name=function_name, function_args=function_args, - status=transaction_reciept["status"], - gas_used=transaction_reciept["gasUsed"], gas_price=transaction["gasPrice"], max_fee_per_gas=transaction.get( "maxFeePerGas", @@ -200,28 +277,7 @@ def process_transaction( value=transaction["value"], ) - secondary_logs = [] - for log in transaction_reciept["logs"]: - for abi in secondary_abi: - try: - raw_event = get_event_data(web3.codec, abi, log) - event = { - "event": raw_event["event"], - "args": json.loads(Web3.toJSON(utfy_dict(dict(raw_event["args"])))), - "address": raw_event["address"], - "blockNumber": raw_event["blockNumber"], - "transactionHash": raw_event["transactionHash"].hex(), - "logIndex": raw_event["logIndex"], - "blockTimestamp": block_timestamp, - } - processed_event = _processEvent(event) - secondary_logs.append(processed_event) - - break - except: - pass - - return function_call, secondary_logs + return function_call, [] def _get_transactions( @@ -350,6 +406,7 @@ def crawl( crawl_transactions: bool = True, addresses: Optional[List[ChecksumAddress]] = None, batch_size: int = 100, + skip_decoding_transactions: bool = False, ) -> None: current_block = from_block @@ -371,6 +428,7 @@ def crawl( logger.info(f"Crawling blocks {current_block}-{current_block + batch_size}") events = [] logger.info("Fetching events") + block_list = [] for event_abi in events_abi: raw_events = _fetch_events_chunk( web3, @@ -378,6 +436,7 @@ def crawl( current_block, batch_end, addresses, + address_block_list=block_list, ) for raw_event in raw_events: raw_event["blockTimestamp"] = get_block_timestamp( @@ -386,7 +445,7 @@ def crawl( blockchain_type, raw_event["blockNumber"], blocks_cache=db_blocks_cache, - max_blocks_batch=1000, + max_blocks_batch=100, ) event = _processEvent(raw_event) events.append(event) @@ -401,6 +460,7 @@ def crawl( ) logger.info(f"Fetched {len(transactions)} transactions") + logger.info(f"Processing transactions") function_calls = [] for tx in transactions: processed_tx, secondary_logs = process_transaction( @@ -411,9 +471,12 @@ def crawl( secondary_abi, tx, db_blocks_cache, + skip_decoding=skip_decoding_transactions, ) function_calls.append(processed_tx) events.extend(secondary_logs) + logger.info(f"Processed {len(function_calls)} transactions") + add_function_calls_with_gas_price_to_session( db_session, function_calls, diff --git a/crawlers/mooncrawl/mooncrawl/generic_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/generic_crawler/cli.py index ed7845bbf..46dce14b0 100644 --- a/crawlers/mooncrawl/mooncrawl/generic_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/generic_crawler/cli.py @@ -11,8 +11,8 @@ from mooncrawl.data import AvailableBlockchainType # type: ignore from ..blockchain import connect -from .base import crawl, get_checkpoint, populate_with_events from ..settings import NB_CONTROLLER_ACCESS_ID +from .base import crawl, get_checkpoint, populate_with_events logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -43,12 +43,12 @@ def handle_nft_crawler(args: argparse.Namespace) -> None: web3 = connect(blockchain_type, access_id=args.access_id) else: logger.info(f"Using web3 provider URL: {args.web3}") - web3 = Web3( - Web3.HTTPProvider(args.web3), + web3 = connect( + blockchain_type, + access_id=args.access_id, + web3_uri=args.web3, + request_timeout=60, ) - if args.poa: - logger.info("Using PoA middleware") - web3.middleware_onion.inject(geth_poa_middleware, layer=0) last_crawled_block = get_checkpoint( db_session, blockchain_type, from_block, to_block, label ) @@ -64,6 +64,7 @@ def handle_nft_crawler(args: argparse.Namespace) -> None: from_block=last_crawled_block, to_block=to_block, batch_size=args.max_blocks_batch, + skip_decoding_transactions=True, ) @@ -95,12 +96,12 @@ def populate_with_erc20_transfers(args: argparse.Namespace) -> None: web3 = connect(blockchain_type, access_id=args.access_id) else: logger.info(f"Using web3 provider URL: {args.web3}") - web3 = Web3( - Web3.HTTPProvider(args.web3), + web3 = connect( + blockchain_type, + access_id=args.access_id, + web3_uri=args.web3, + request_timeout=60, ) - if args.poa: - logger.info("Using PoA middleware") - web3.middleware_onion.inject(geth_poa_middleware, layer=0) last_crawled_block = get_checkpoint( db_session, blockchain_type, from_block, to_block, label ) @@ -120,6 +121,8 @@ def populate_with_erc20_transfers(args: argparse.Namespace) -> None: def handle_crawl(args: argparse.Namespace) -> None: + # TODO(yhtiyar): fix it + raise NotImplementedError("Deprecated for now, since blocklist is added") logger.info(f"Starting generic crawler") label = args.label_name @@ -141,12 +144,10 @@ def handle_crawl(args: argparse.Namespace) -> None: web3 = connect(blockchain_type, access_id=args.access_id) else: logger.info(f"Using web3 provider URL: {args.web3}") - web3 = Web3( - Web3.HTTPProvider(args.web3), + + web3 = connect( + blockchain_type, access_id=args.access_id, web3_uri=args.web3 ) - if args.poa: - logger.info("Using PoA middleware") - web3.middleware_onion.inject(geth_poa_middleware, layer=0) last_crawled_block = get_checkpoint( db_session, blockchain_type, from_block, to_block, label ) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py index a4bb03a5f..05128ec2b 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py @@ -17,7 +17,8 @@ make_event_crawl_jobs, make_function_call_crawl_jobs, ) -from .db import get_last_labeled_block_number +from .db import get_first_labeled_block_number, get_last_labeled_block_number +from .historical_crawler import historical_crawler logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -114,6 +115,121 @@ def handle_crawl(args: argparse.Namespace) -> None: ) +def handle_historical_crawl(args: argparse.Namespace) -> None: + blockchain_type = AvailableBlockchainType(args.blockchain_type) + subscription_type = blockchain_type_to_subscription_type(blockchain_type) + + addresses_filter = [args.address] + all_event_jobs = make_event_crawl_jobs( + get_crawl_job_entries( + subscription_type, + "event", + MOONSTREAM_MOONWORM_TASKS_JOURNAL, + ) + ) + filtered_event_jobs = [] + for job in all_event_jobs: + intersection = [ + address for address in job.contracts if address in addresses_filter + ] + if intersection: + job.contracts = intersection + filtered_event_jobs.append(job) + + logger.info(f"Filtered event crawl jobs count: {len(filtered_event_jobs)}") + + all_function_call_jobs = make_function_call_crawl_jobs( + get_crawl_job_entries( + subscription_type, + "function", + MOONSTREAM_MOONWORM_TASKS_JOURNAL, + ) + ) + filtered_function_call_jobs = [ + job + for job in all_function_call_jobs + if job.contract_address in addresses_filter + ] + + if args.only_events: + filtered_function_call_jobs = [] + logger.info(f"Removing function call crawl jobs since --only-events is set") + + logger.info( + f"Initial function call crawl jobs count: {len(filtered_function_call_jobs)}" + ) + + logger.info(f"Blockchain type: {blockchain_type.value}") + with yield_db_session_ctx() as db_session: + web3: Optional[Web3] = None + if args.web3 is None: + logger.info( + "No web3 provider URL provided, using default (blockchan.py: connect())" + ) + web3 = _retry_connect_web3(blockchain_type, access_id=args.access_id) + else: + logger.info(f"Using web3 provider URL: {args.web3}") + web3 = Web3( + Web3.HTTPProvider( + args.web3, + ) + ) + if args.poa: + logger.info("Using PoA middleware") + web3.middleware_onion.inject(geth_poa_middleware, layer=0) + + last_labeled_block = get_first_labeled_block_number( + db_session, blockchain_type, args.address, only_events=args.only_events + ) + logger.info(f"Last labeled block: {last_labeled_block}") + + start_block = args.start + if start_block is None: + logger.info("No start block provided") + if last_labeled_block is not None: + start_block = last_labeled_block + logger.info(f"Using last labeled block as start: {start_block}") + else: + logger.info( + "No last labeled block found, using start block (web3.eth.blockNumber - 300)" + ) + raise ValueError( + "No start block provided and no last labeled block found" + ) + elif last_labeled_block is not None: + if start_block > last_labeled_block and not args.force: + logger.info( + f"Start block is less than last labeled block, using last labeled block: {last_labeled_block}" + ) + logger.info( + f"Use --force to override this and start from the start block: {start_block}" + ) + + start_block = last_labeled_block + else: + logger.info(f"Using start block: {start_block}") + else: + logger.info(f"Using start block: {start_block}") + + if start_block < args.end: + raise ValueError( + f"Start block {start_block} is less than end block {args.end}. This crawler crawls in the reverse direction." + ) + + historical_crawler( + db_session, + blockchain_type, + web3, + filtered_event_jobs, + filtered_function_call_jobs, + start_block, + args.end, + args.max_blocks_batch, + args.min_sleep_time, + access_id=args.access_id, + ) + + def main() -> None: parser = argparse.ArgumentParser() parser.set_defaults(func=lambda _: parser.print_help()) @@ -127,7 +243,10 @@ def main() -> None: subparsers = parser.add_subparsers() - crawl_parser = subparsers.add_parser("crawl") + crawl_parser = subparsers.add_parser( + "crawl", + help="continuous crawling the event/function call jobs from bugout journal", + ) crawl_parser.add_argument( "--start", @@ -211,6 +330,76 @@ def main() -> None: crawl_parser.set_defaults(func=handle_crawl) + historical_crawl_parser = subparsers.add_parser( + "historical-crawl", help="Crawl historical data" + ) + historical_crawl_parser.add_argument( + "--address", + "-a", + required=True, + type=str, + ) + historical_crawl_parser.add_argument( + "--start", + "-s", + type=int, + default=None, + ) + historical_crawl_parser.add_argument( + "--end", + "-e", + type=int, + required=True, + ) + historical_crawl_parser.add_argument( + "--blockchain-type", + "-b", + type=str, + help=f"Available blockchain types: {[member.value for member in AvailableBlockchainType]}", + ) + historical_crawl_parser.add_argument( + "--web3", + type=str, + default=None, + help="Web3 provider URL", + ) + historical_crawl_parser.add_argument( + "--poa", + action="store_true", + default=False, + help="Use PoA middleware", + ) + + historical_crawl_parser.add_argument( + "--max-blocks-batch", + "-m", + type=int, + default=80, + help="Maximum number of blocks to crawl in a single batch", + ) + + historical_crawl_parser.add_argument( + "--min-sleep-time", + "-t", + type=float, + default=0.1, + help="Minimum time to sleep between crawl step", + ) + + historical_crawl_parser.add_argument( + "--force", + action="store_true", + default=False, + help="Force start from the start block", + ) + historical_crawl_parser.add_argument( + "--only-events", + action="store_true", + default=False, + help="Only crawl events", + ) + historical_crawl_parser.set_defaults(func=handle_historical_crawl) + args = parser.parse_args() args.func(args) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py index d7cfb6229..1ac12665e 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py @@ -17,6 +17,7 @@ from .crawler import ( EventCrawlJob, FunctionCallCrawlJob, + _retry_connect_web3, blockchain_type_to_subscription_type, get_crawl_job_entries, heartbeat, @@ -82,34 +83,6 @@ def _refetch_new_jobs( return event_crawl_jobs, function_call_crawl_jobs -def _retry_connect_web3( - blockchain_type: AvailableBlockchainType, - retry_count: int = 10, - sleep_time: float = 5, - access_id: Optional[UUID] = None, -) -> Web3: - """ - Retry connecting to the blockchain. - """ - while retry_count > 0: - retry_count -= 1 - try: - web3 = connect(blockchain_type, access_id=access_id) - web3.eth.block_number - logger.info(f"Connected to {blockchain_type}") - return web3 - except Exception as e: - if retry_count == 0: - error = e - break - logger.error(f"Failed to connect to {blockchain_type} blockchain: {e}") - logger.info(f"Retrying in {sleep_time} seconds") - time.sleep(sleep_time) - raise Exception( - f"Failed to connect to {blockchain_type} blockchain after {retry_count} retries: {error}" - ) - - def continuous_crawler( db_session: Session, blockchain_type: AvailableBlockchainType, @@ -180,8 +153,6 @@ def continuous_crawler( try: while True: try: - # query db with limit 1, to avoid session closing - db_session.execute("SELECT 1") time.sleep(current_sleep_time) end_block = min( diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py index f740aaeea..ce1cd48f3 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py @@ -6,6 +6,7 @@ from datetime import datetime from enum import Enum from typing import Any, Callable, Dict, List, Optional, cast +from uuid import UUID from bugout.data import BugoutSearchResult from eth_typing.evm import ChecksumAddress @@ -15,6 +16,7 @@ from mooncrawl.data import AvailableBlockchainType +from ..blockchain import connect from ..reporter import reporter from ..settings import ( MOONSTREAM_ADMIN_ACCESS_TOKEN, @@ -93,6 +95,34 @@ def reporter_callback(error: Exception) -> None: return reporter_callback +def _retry_connect_web3( + blockchain_type: AvailableBlockchainType, + retry_count: int = 10, + sleep_time: float = 5, + access_id: Optional[UUID] = None, +) -> Web3: + """ + Retry connecting to the blockchain. + """ + while retry_count > 0: + retry_count -= 1 + try: + web3 = connect(blockchain_type, access_id=access_id) + web3.eth.block_number + logger.info(f"Connected to {blockchain_type}") + return web3 + except Exception as e: + if retry_count == 0: + error = e + break + logger.error(f"Failed to connect to {blockchain_type} blockchain: {e}") + logger.info(f"Retrying in {sleep_time} seconds") + time.sleep(sleep_time) + raise Exception( + f"Failed to connect to {blockchain_type} blockchain after {retry_count} retries: {error}" + ) + + def blockchain_type_to_subscription_type( blockchain_type: AvailableBlockchainType, ) -> SubscriptionTypes: diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py index 80615c676..c82541896 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py @@ -1,24 +1,13 @@ import logging from typing import Any, Dict, List, Optional, Union -from eth_typing.evm import ChecksumAddress -from hexbytes.main import HexBytes -from moonstreamdb.db import yield_db_session_ctx -from moonstreamdb.models import ( - Base, - EthereumLabel, - EthereumTransaction, - PolygonLabel, - PolygonTransaction, -) +from moonstreamdb.models import Base from moonworm.crawler.function_call_crawler import ContractFunctionCall # type: ignore from sqlalchemy.orm import Session -from sqlalchemy.sql.expression import label -from ..blockchain import connect, get_block_model, get_label_model +from ..blockchain import get_label_model from ..data import AvailableBlockchainType from ..settings import CRAWLER_LABEL -from .crawler import FunctionCallCrawlJob, _generate_reporter_callback from .event_crawler import Event logging.basicConfig(level=logging.INFO) @@ -93,6 +82,44 @@ def get_last_labeled_block_number( return block_number[0] if block_number else None +def get_first_labeled_block_number( + db_session: Session, + blockchain_type: AvailableBlockchainType, + address: str, + label_name=CRAWLER_LABEL, + only_events: bool = False, +) -> Optional[int]: + label_model = get_label_model(blockchain_type) + block_number_query = ( + db_session.query(label_model.block_number) + .filter(label_model.label == label_name) + .filter(label_model.address == address) + ) + + function_call_block_numbers = ( + block_number_query.filter(label_model.log_index == None) + .order_by(label_model.block_number) + .limit(50) + .all() + ) + event_block_numbers = ( + block_number_query.filter(label_model.log_index != None) + .order_by(label_model.block_number) + .limit(50) + .all() + ) + + if only_events: + return event_block_numbers[0][0] if event_block_numbers else None + else: + event_block_number = event_block_numbers[0][0] if event_block_numbers else -1 + function_call_block_number = ( + function_call_block_numbers[0][0] if function_call_block_numbers else -1 + ) + max_block_number = max(event_block_number, function_call_block_number) + return max_block_number if max_block_number != -1 else None + + def commit_session(db_session: Session) -> None: """ Save labels in the database. @@ -116,6 +143,7 @@ def add_events_to_session( events_hashes_to_save = [event.transaction_hash for event in events] + logger.info(f"Querying database for existing events") existing_labels = ( db_session.query(label_model.transaction_hash, label_model.log_index) .filter( @@ -126,6 +154,8 @@ def add_events_to_session( .all() ) + logger.info(f"Querry finished") + existing_labels_transactions = [] existing_log_index_by_tx_hash: Dict[str, List[int]] = {} for label in existing_labels: diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py new file mode 100644 index 000000000..2fb519280 --- /dev/null +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py @@ -0,0 +1,127 @@ +import logging +import time +from typing import Dict, List, Optional, Tuple +from uuid import UUID + +from moonworm.crawler.moonstream_ethereum_state_provider import ( # type: ignore + MoonstreamEthereumStateProvider, +) +from moonworm.crawler.networks import Network # type: ignore +from sqlalchemy.orm.session import Session +from web3 import Web3 + +from ..data import AvailableBlockchainType +from .crawler import EventCrawlJob, FunctionCallCrawlJob, _retry_connect_web3 +from .db import add_events_to_session, add_function_calls_to_session, commit_session +from .event_crawler import _crawl_events +from .function_call_crawler import _crawl_functions + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def historical_crawler( + db_session: Session, + blockchain_type: AvailableBlockchainType, + web3: Optional[Web3], + event_crawl_jobs: List[EventCrawlJob], + function_call_crawl_jobs: List[FunctionCallCrawlJob], + start_block: int, + end_block: int, + max_blocks_batch: int = 100, + min_sleep_time: float = 0.1, + access_id: Optional[UUID] = None, +): + assert max_blocks_batch > 0, "max_blocks_batch must be greater than 0" + assert min_sleep_time > 0, "min_sleep_time must be greater than 0" + assert start_block >= end_block, "start_block must be greater than end_block" + assert end_block > 0, "end_block must be greater than 0" + + if web3 is None: + web3 = _retry_connect_web3(blockchain_type, access_id=access_id) + + assert ( + web3.eth.block_number >= start_block + ), "start_block must be less than current block" + + network = ( + Network.ethereum + if blockchain_type == AvailableBlockchainType.ETHEREUM + else Network.polygon + ) + ethereum_state_provider = MoonstreamEthereumStateProvider( + web3, + network, + db_session, + ) + + logger.info(f"Starting historical event crawler start_block={start_block}") + + blocks_cache: Dict[int, int] = {} + failed_count = 0 + + while start_block >= end_block: + try: + + time.sleep(min_sleep_time) + + batch_end_block = max( + start_block - max_blocks_batch, + end_block, + ) + + logger.info(f"Crawling events from {start_block} to {batch_end_block}") + all_events = _crawl_events( + db_session=db_session, + blockchain_type=blockchain_type, + web3=web3, + jobs=event_crawl_jobs, + from_block=batch_end_block, + to_block=start_block, + blocks_cache=blocks_cache, + db_block_query_batch=max_blocks_batch, + ) + logger.info( + f"Crawled {len(all_events)} events from {start_block} to {batch_end_block}." + ) + + add_events_to_session(db_session, all_events, blockchain_type) + + logger.info( + f"Crawling function calls from {start_block} to {batch_end_block}" + ) + if function_call_crawl_jobs: + all_function_calls = _crawl_functions( + blockchain_type, + ethereum_state_provider, + function_call_crawl_jobs, + batch_end_block, + start_block, + ) + logger.info( + f"Crawled {len(all_function_calls)} function calls from {start_block} to {batch_end_block}." + ) + + add_function_calls_to_session( + db_session, all_function_calls, blockchain_type + ) + + # Commiting to db + commit_session(db_session) + + start_block = batch_end_block - 1 + failed_count = 0 + except Exception as e: + + logger.error(f"Internal error: {e}") + logger.exception(e) + failed_count += 1 + if failed_count > 10: + logger.error("Too many failures, exiting") + raise e + try: + web3 = _retry_connect_web3(blockchain_type, access_id=access_id) + except Exception as err: + logger.error(f"Failed to reconnect: {err}") + logger.exception(err) + raise err diff --git a/datasets/nfts/nfts/cli.py b/datasets/nfts/nfts/cli.py index cb4d3b2ec..8785902f0 100644 --- a/datasets/nfts/nfts/cli.py +++ b/datasets/nfts/nfts/cli.py @@ -93,11 +93,12 @@ def handle_materialize(args: argparse.Namespace) -> None: crawl_erc721_labels( db_session, - moonstream_datastore, + args.datastore, label_model, start_block=bounds.starting_block, end_block=bounds.ending_block, batch_size=args.batch_size, + blockchain_type=args.blockchain, ) diff --git a/datasets/nfts/nfts/data.py b/datasets/nfts/nfts/data.py index 9a5debeaf..1ca9d1dc2 100644 --- a/datasets/nfts/nfts/data.py +++ b/datasets/nfts/nfts/data.py @@ -23,10 +23,8 @@ class NftTransaction: caller_address: str function_name: str function_args: Union[Dict[str, Any], str] - gas_used: int gas_price: int value: int - status: int max_fee_per_gas: Optional[int] = None max_priority_fee_per_gas: Optional[int] = None diff --git a/datasets/nfts/nfts/datastore.py b/datasets/nfts/nfts/datastore.py index 566fa8dae..c52dc2a27 100644 --- a/datasets/nfts/nfts/datastore.py +++ b/datasets/nfts/nfts/datastore.py @@ -34,7 +34,6 @@ def create_transactions_table_query(tabel_name) -> str: functionName TEXT NOT NULL, functionArgs JSON NOT NULL, value INTEGER NOT NULL, - gasUsed INTEGER NOT NULL, gasPrice INTEGER NOT NULL, maxFeePerGas INTEGER, maxPriorityFeePerGas INTEGER, @@ -125,14 +124,13 @@ def insertTransactionQuery(tabel_name): functionName, functionArgs, value, - gasUsed, gasPrice, maxFeePerGas, maxPriorityFeePerGas ) VALUES ( - ?,?,?,?,?,?,?,?,?,?,?,?,? + ?,?,?,?,?,?,?,?,?,?,?,? ); """ return query @@ -240,7 +238,6 @@ def nft_transaction_to_tuple(nft_transaction: NftTransaction) -> Tuple[Any]: nft_transaction.function_name, json.dumps(nft_transaction.function_args), str(nft_transaction.value), - str(nft_transaction.gas_used), str(nft_transaction.gas_price), str(nft_transaction.max_fee_per_gas), str(nft_transaction.max_priority_fee_per_gas), @@ -356,7 +353,7 @@ def insert_events( raise ValueError(f"Unknown event type: {type(event)}") if len(nft_transfers) > 0: - query = insert_nft_transfers_query("transfers") + query = insert_nft_transfers_query("erc721_transfers") cur.executemany( query, nft_transfers, @@ -405,13 +402,13 @@ def setup_database(conn: sqlite3.Connection) -> None: cur.execute(create_transactions_table_query("transactions")) cur.execute(create_approvals_table_query("approvals")) cur.execute(create_approval_for_all_table_query("approvals_for_all")) - cur.execute(create_transfers_table_query("transfers")) + cur.execute(create_transfers_table_query("erc721_transfers")) cur.execute(create_erc20_transfers_table_query("erc20_transfers")) - cur.execute(create_blockchain_type_index_query("transactions")) - cur.execute(create_blockchain_type_index_query("approvals")) - cur.execute(create_blockchain_type_index_query("approvals_for_all")) - cur.execute(create_blockchain_type_index_query("transfers")) - cur.execute(create_blockchain_type_index_query("erc20_transfers")) + # cur.execute(create_blockchain_type_index_query("transactions")) + # cur.execute(create_blockchain_type_index_query("approvals")) + # cur.execute(create_blockchain_type_index_query("approvals_for_all")) + # cur.execute(create_blockchain_type_index_query("erc721_transfers")) + # cur.execute(create_blockchain_type_index_query("erc20_transfers")) conn.commit() diff --git a/datasets/nfts/nfts/materialize.py b/datasets/nfts/nfts/materialize.py index 30791434f..1f3a6ab24 100644 --- a/datasets/nfts/nfts/materialize.py +++ b/datasets/nfts/nfts/materialize.py @@ -1,8 +1,16 @@ +import contextlib import logging import sqlite3 -from typing import Any, Dict, Union, cast, Iterator, List, Optional, Set +from typing import Any, Dict, Tuple, Union, cast, Iterator, List, Optional, Set import json -from attr import dataclass +from concurrent.futures import ( + ProcessPoolExecutor, + as_completed, + ThreadPoolExecutor, + Future, + wait, +) +from dataclasses import dataclass from moonstreamdb.models import ( EthereumLabel, @@ -26,7 +34,7 @@ logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -ERC721_LABEL = "erc721" +ERC721_LABEL = "erc721-v2" ERC20_LABEL = "test-erc20" @@ -45,16 +53,8 @@ def _get_last_labeled_erc721_block( def parse_transaction_label( - label_model: Union[EthereumLabel, PolygonLabel] + label_model: Union[EthereumLabel, PolygonLabel], blockchain_type: str ) -> NftTransaction: - assert ( - label_model.label_data["type"] == "tx_call" - ), "Expected label to be of type 'tx_call'" - - if isinstance(label_model, EthereumLabel): - blockchain_type = "ethereum" - else: - blockchain_type = "polygon" # TODO: this is done because I forgot to add value in polygon labels value = 0 @@ -70,29 +70,17 @@ def parse_transaction_label( caller_address=label_model.label_data["caller"], function_name=label_model.label_data["name"], function_args=label_model.label_data["args"], - gas_used=label_model.label_data["gasUsed"], gas_price=label_model.label_data["gasPrice"], value=value, - status=label_model.label_data["status"], max_fee_per_gas=label_model.label_data["maxFeePerGas"], max_priority_fee_per_gas=label_model.label_data["maxPriorityFeePerGas"], ) def _parse_transfer_event( - label_model: Union[EthereumLabel, PolygonLabel] + label_model: Union[EthereumLabel, PolygonLabel], blockchain_type: str ) -> NftTransferEvent: - assert ( - label_model.label_data["type"] == "event" - ), "Expected label to be of type 'event'" - assert ( - label_model.label_data["name"] == "Transfer" - ), "Expected label to be of type 'Transfer'" - - if isinstance(label_model, EthereumLabel): - blockchain_type = "ethereum" - else: - blockchain_type = "polygon" + if label_model.label_data["args"].get("tokenId") is not None: return NftTransferEvent( blockchain_type=blockchain_type, @@ -116,19 +104,9 @@ def _parse_transfer_event( def _parse_approval_event( - label_model: Union[EthereumLabel, PolygonLabel] + label_model: Union[EthereumLabel, PolygonLabel], blockchain_type: str ) -> NftApprovalEvent: - assert ( - label_model.label_data["type"] == "event" - ), "Expected label to be of type 'event'" - assert ( - label_model.label_data["name"] == "Approval" - ), "Expected label to be of type 'Approval'" - - if isinstance(label_model, EthereumLabel): - blockchain_type = "ethereum" - else: - blockchain_type = "polygon" + return NftApprovalEvent( blockchain_type=blockchain_type, token_address=label_model.address, @@ -141,19 +119,8 @@ def _parse_approval_event( def _parse_approval_for_all_event( - label_model: Union[EthereumLabel, PolygonLabel] + label_model: Union[EthereumLabel, PolygonLabel], blockchain_type: str ) -> NftApprovalForAllEvent: - assert ( - label_model.label_data["type"] == "event" - ), "Expected label to be of type 'event'" - assert ( - label_model.label_data["name"] == "ApprovalForAll" - ), "Expected label to be of type 'ApprovalForAll'" - - if isinstance(label_model, EthereumLabel): - blockchain_type = "ethereum" - else: - blockchain_type = "polygon" return NftApprovalForAllEvent( blockchain_type=blockchain_type, token_address=label_model.address, @@ -166,24 +133,25 @@ def _parse_approval_for_all_event( def parse_event( - label_model: Union[EthereumLabel, PolygonLabel] + label_model: Union[EthereumLabel, PolygonLabel], blockchain_type: str ) -> Union[NftTransferEvent, NftApprovalEvent, NftApprovalForAllEvent]: if label_model.label_data["name"] == "Transfer": - return _parse_transfer_event(label_model) + return _parse_transfer_event(label_model, blockchain_type) elif label_model.label_data["name"] == "Approval": - return _parse_approval_event(label_model) + return _parse_approval_event(label_model, blockchain_type) elif label_model.label_data["name"] == "ApprovalForAll": - return _parse_approval_for_all_event(label_model) + return _parse_approval_for_all_event(label_model, blockchain_type) else: raise ValueError(f"Unknown label type: {label_model.label_data['name']}") def crawl_erc721_labels( db_session: Session, - conn: sqlite3.Connection, + datastore: str, label_model: Union[EthereumLabel, PolygonLabel], start_block: int, end_block: int, + blockchain_type: str, batch_size: int = 10000, ): logger.info( @@ -193,35 +161,69 @@ def crawl_erc721_labels( pbar.set_description( f"Crawling {label_model.__tablename__} blocks {start_block}-{end_block}" ) - current_block = start_block - while current_block <= end_block: - batch_end = min(current_block + batch_size, end_block) - logger.info(f"Crawling {current_block}-{batch_end}") - labels = db_session.query(label_model).filter( - and_( - label_model.block_number >= current_block, - label_model.block_number <= batch_end, - or_( - label_model.label == ERC721_LABEL, label_model.label == ERC20_LABEL - ), + + def _crawl(from_block, to_block) -> bool: + labels = ( + db_session.query(label_model) + .filter( + and_( + label_model.block_number >= from_block, + label_model.block_number <= to_block, + or_( + label_model.label == ERC721_LABEL, + label_model.label == ERC20_LABEL, + ), + ) ) + .all() ) - - logger.info(f"Found {labels.count()} labels") - transactions = [] events = [] for label in labels: - if label.label_data["type"] == "tx_call": - transactions.append(parse_transaction_label(label)) + transactions.append(parse_transaction_label(label, blockchain_type)) else: - events.append(parse_event(label)) + events.append(parse_event(label, blockchain_type)) logger.info(f"Parsed {len(events)} events and {len(transactions)} transactions") - insert_transactions(conn, transactions) - insert_events(conn, events) + with contextlib.closing(sqlite3.connect(datastore, timeout=200)) as conn: + insert_transactions(conn, transactions) + insert_events(conn, events) logger.info(f"Saved {len(events)} events and {len(transactions)} transactions") - pbar.update(batch_end - current_block + 1) + return True + + def crawl_with_threads(ranges: List[Tuple[int, int]]): + futures: Dict[Future, Tuple[int, int]] = {} + with ThreadPoolExecutor(max_workers=30) as executor: + for from_block, to_block in ranges: + future = executor.submit(_crawl, from_block, to_block) + futures[future] = (from_block, to_block) + + for future in as_completed(futures): + from_block, to_block = futures[future] + logger.info(f"Crawled {from_block}-{to_block}") + if future.exception() is not None: + logger.error( + f"Error crawling {from_block}-{to_block}", future.exception() + ) + wait(list(futures.keys())) + + current_block = start_block + + while current_block <= end_block: + batch_end = min(current_block + batch_size * 10, end_block) + + # divide into batches with batch_size + ranges = [] + batch_start = current_block + while batch_start <= batch_end: + ranges.append((batch_start, min(batch_start + batch_size, batch_end))) + batch_start += batch_size + 1 + + print(f"Crawling {len(ranges)} ranges") + print(ranges) + crawl_with_threads(ranges) + print(f"Crawled") + pbar.update(batch_end - current_block + 1) current_block = batch_end + 1 diff --git a/datasets/nfts/setup.py b/datasets/nfts/setup.py index a641f8423..251116940 100644 --- a/datasets/nfts/setup.py +++ b/datasets/nfts/setup.py @@ -39,7 +39,6 @@ "requests", "scipy", "tqdm", - "web3", ], extras_require={ "dev": ["black", "mypy", "types-requests"],