From 4a39db467d09431a0b06ea379db9a50fa52e8b4c Mon Sep 17 00:00:00 2001 From: Paul <108695806+pxrl@users.noreply.github.com> Date: Mon, 30 Sep 2024 14:44:14 +0200 Subject: [PATCH] refactor(relayer): Factor out common listener utils (#1841) This is a pre-emptive change made to make it easier to reuse common components in alternative listener implementations. Examples for a viem-based listener, as well as an eventual Solana listener. There are subsequent changes in the pipeline to factor out evm-specific parts. --- src/libexec/RelayerSpokePoolIndexer.ts | 139 ++++--------------------- src/libexec/types.ts | 9 ++ src/libexec/util/evm/index.ts | 1 + src/libexec/util/evm/util.ts | 69 ++++++++++++ src/libexec/util/ipc.ts | 43 ++++++++ 5 files changed, 144 insertions(+), 117 deletions(-) create mode 100644 src/libexec/types.ts create mode 100644 src/libexec/util/evm/index.ts create mode 100644 src/libexec/util/evm/util.ts create mode 100644 src/libexec/util/ipc.ts diff --git a/src/libexec/RelayerSpokePoolIndexer.ts b/src/libexec/RelayerSpokePoolIndexer.ts index 951f528e7..e6b211734 100644 --- a/src/libexec/RelayerSpokePoolIndexer.ts +++ b/src/libexec/RelayerSpokePoolIndexer.ts @@ -1,10 +1,8 @@ import assert from "assert"; import minimist from "minimist"; -import { Contract, EventFilter, providers as ethersProviders, utils as ethersUtils } from "ethers"; +import { Contract, providers as ethersProviders, utils as ethersUtils } from "ethers"; import { utils as sdkUtils } from "@across-protocol/sdk"; import * as utils from "../../scripts/utils"; -import { Log } from "../interfaces"; -import { SpokePoolClientMessage } from "../clients"; import { disconnectRedisClients, EventManager, @@ -19,20 +17,13 @@ import { getRedisCache, getWSProviders, Logger, - paginatedEventQuery, - sortEventsAscending, winston, } from "../utils"; +import { postEvents, removeEvent } from "./util/ipc"; +import { ScraperOpts } from "./types"; +import { getEventFilter, getEventFilterArgs, scrapeEvents as _scrapeEvents } from "./util/evm"; type WebSocketProvider = ethersProviders.WebSocketProvider; -type EventSearchConfig = sdkUtils.EventSearchConfig; -type ScraperOpts = { - lookback?: number; // Event lookback (in seconds). - deploymentBlock: number; // SpokePool deployment block - maxBlockRange?: number; // Maximum block range for paginated getLogs queries. - filterArgs?: { [event: string]: string[] }; // Event-specific filter criteria to apply. -}; - const { NODE_SUCCESS, NODE_APP_ERR } = utils; const INDEXER_POLLING_PERIOD = 2_000; // ms; time to sleep between checking for exit request via SIGHUP. @@ -45,108 +36,21 @@ let stop = false; let oldestTime = 0; /** - * Given an event name and contract, return the corresponding Ethers EventFilter object. - * @param contract Ethers Constract instance. - * @param eventName The name of the event to be filtered. - * @param filterArgs Optional filter arguments to be applied. - * @returns An Ethers EventFilter instance. - */ -function getEventFilter(contract: Contract, eventName: string, filterArgs?: string[]): EventFilter { - const filter = contract.filters[eventName]; - if (!isDefined(filter)) { - throw new Error(`Event ${eventName} not defined for contract`); - } - - return isDefined(filterArgs) ? filter(...filterArgs) : filter(); -} - -function getEventFilterArgs(relayer?: string): { [event: string]: string[] } { - const FilledV3Relay = !isDefined(relayer) - ? undefined - : [null, null, null, null, null, null, null, null, null, null, relayer]; - - return { FilledV3Relay }; -} - -/** - * Given the inputs for a SpokePoolClient update, consolidate the inputs into a message and submit it to the parent - * process (if defined). - * @param blockNumber Block number up to which the update applies. - * @param currentTime The SpokePool timestamp at blockNumber. - * @param events An array of Log objects to be submitted. - * @returns void - */ -function postEvents(blockNumber: number, currentTime: number, events: Log[]): void { - if (!isDefined(process.send) || stop) { - return; - } - - // Drop the array component of event.args and retain the named k/v pairs, - // otherwise stringification tends to retain only the array. - events = sortEventsAscending(events); - - const message: SpokePoolClientMessage = { - blockNumber, - currentTime, - oldestTime, - nEvents: events.length, - data: JSON.stringify(events, sdkUtils.jsonReplacerWithBigNumbers), - }; - process.send(JSON.stringify(message)); -} - -/** - * Given an event removal notification, post the message to the parent process. - * @param event Log instance. - * @returns void - */ -function removeEvent(event: Log): void { - if (!isDefined(process.send) || stop) { - return; - } - - const message: SpokePoolClientMessage = { - event: JSON.stringify(event, sdkUtils.jsonReplacerWithBigNumbers), - }; - process.send(JSON.stringify(message)); -} - -/** - * Given a SpokePool contract instance and an event name, scrape all corresponding events and submit them to the - * parent process (if defined). + * Aggregate utils/scrapeEvents for a series of event names. * @param spokePool Ethers Constract instance. - * @param eventName The name of the event to be filtered. + * @param eventNames The array of events to be queried. * @param opts Options to configure event scraping behaviour. * @returns void */ -async function scrapeEvents(spokePool: Contract, eventName: string, opts: ScraperOpts): Promise { - const { lookback, deploymentBlock, filterArgs, maxBlockRange } = opts; - const { provider } = spokePool; - const { chainId } = await provider.getNetwork(); - const chain = getNetworkName(chainId); - - let tStart: number, tStop: number; - - const pollEvents = async (filter: EventFilter, searchConfig: EventSearchConfig): Promise => { - tStart = performance.now(); - const events = await paginatedEventQuery(spokePool, filter, searchConfig); - tStop = performance.now(); - logger.debug({ - at: "SpokePoolIndexer::listen", - message: `Indexed ${events.length} ${chain} events in ${Math.round((tStop - tStart) / 1000)} seconds`, - searchConfig, - }); - return events; - }; - - const { number: toBlock, timestamp: currentTime } = await provider.getBlock("latest"); - const fromBlock = Math.max(toBlock - (lookback ?? deploymentBlock), deploymentBlock); - assert(toBlock > fromBlock, `${toBlock} > ${fromBlock}`); - const searchConfig = { fromBlock, toBlock, maxBlockLookBack: maxBlockRange }; - - const filter = getEventFilter(spokePool, eventName, filterArgs[eventName]); - const events = await pollEvents(filter, searchConfig); - postEvents(toBlock, currentTime, events); +export async function scrapeEvents(spokePool: Contract, eventNames: string[], opts: ScraperOpts): Promise { + const { number: toBlock, timestamp: currentTime } = await spokePool.provider.getBlock("latest"); + const events = await Promise.all( + eventNames.map((eventName) => _scrapeEvents(spokePool, eventName, { ...opts, toBlock }, logger)) + ); + + if (!stop) { + postEvents(toBlock, oldestTime, currentTime, events.flat()); + } } /** @@ -178,7 +82,9 @@ async function listen( // Post an update to the parent. Do this irrespective of whether there were new events or not, since there's // information in blockNumber and currentTime alone. - postEvents(blockNumber, currentTime, events); + if (!stop) { + postEvents(blockNumber, oldestTime, currentTime, events); + } }); // Add a handler for each new instance of a subscribed event. @@ -191,7 +97,9 @@ async function listen( if (event.removed) { eventMgr.remove(event, host); // Notify the parent immediately in case the event was already submitted. - removeEvent(event); + if (!stop) { + removeEvent(event); + } } else { eventMgr.add(event, host); } @@ -277,10 +185,7 @@ async function run(argv: string[]): Promise { if (latestBlock.number > startBlock) { const events = ["V3FundsDeposited", "FilledV3Relay", "RelayedRootBundle", "ExecutedRelayerRefundRoot"]; const _spokePool = spokePool.connect(quorumProvider); - await Promise.all([ - resolveOldestTime(_spokePool, startBlock), - ...events.map((event) => scrapeEvents(_spokePool, event, opts)), - ]); + await Promise.all([resolveOldestTime(_spokePool, startBlock), scrapeEvents(_spokePool, events, opts)]); } // If no lookback was specified then default to the timestamp of the latest block. diff --git a/src/libexec/types.ts b/src/libexec/types.ts new file mode 100644 index 000000000..c1986dfb0 --- /dev/null +++ b/src/libexec/types.ts @@ -0,0 +1,9 @@ +export { Log } from "../interfaces"; +export { SpokePoolClientMessage } from "../clients"; + +export type ScraperOpts = { + lookback?: number; // Event lookback (in seconds). + deploymentBlock: number; // SpokePool deployment block + maxBlockRange?: number; // Maximum block range for paginated getLogs queries. + filterArgs?: { [event: string]: string[] }; // Event-specific filter criteria to apply. +}; diff --git a/src/libexec/util/evm/index.ts b/src/libexec/util/evm/index.ts new file mode 100644 index 000000000..181e76f24 --- /dev/null +++ b/src/libexec/util/evm/index.ts @@ -0,0 +1 @@ +export * from "./util"; diff --git a/src/libexec/util/evm/util.ts b/src/libexec/util/evm/util.ts new file mode 100644 index 000000000..c19a86199 --- /dev/null +++ b/src/libexec/util/evm/util.ts @@ -0,0 +1,69 @@ +import assert from "assert"; +import { Contract, EventFilter } from "ethers"; +import { getNetworkName, isDefined, paginatedEventQuery, winston } from "../../../utils"; +import { Log, ScraperOpts } from "../../types"; + +/** + * Given an event name and contract, return the corresponding Ethers EventFilter object. + * @param contract Ethers Constract instance. + * @param eventName The name of the event to be filtered. + * @param filterArgs Optional filter arguments to be applied. + * @returns An Ethers EventFilter instance. + */ +export function getEventFilter(contract: Contract, eventName: string, filterArgs?: string[]): EventFilter { + const filter = contract.filters[eventName]; + if (!isDefined(filter)) { + throw new Error(`Event ${eventName} not defined for contract`); + } + + return isDefined(filterArgs) ? filter(...filterArgs) : filter(); +} + +/** + * Get a general event filter mapping to be used for filtering SpokePool contract events. + * This is currently only useful for filtering the relayer address on FilledV3Relay events. + * @param relayer Optional relayer address to filter on. + * @returns An argument array for input to an Ethers EventFilter. + */ +export function getEventFilterArgs(relayer?: string): { [event: string]: (null | string)[] } { + const FilledV3Relay = !isDefined(relayer) + ? undefined + : [null, null, null, null, null, null, null, null, null, null, relayer]; + + return { FilledV3Relay }; +} + +/** + * Given a SpokePool contract instance and an event name, scrape all corresponding events and submit them to the + * parent process (if defined). + * @param spokePool Ethers Constract instance. + * @param eventName The name of the event to be filtered. + * @param opts Options to configure event scraping behaviour. + * @returns void + */ +export async function scrapeEvents( + spokePool: Contract, + eventName: string, + opts: ScraperOpts & { toBlock: number }, + logger: winston.Logger +): Promise { + const { lookback, deploymentBlock, filterArgs, maxBlockRange, toBlock } = opts; + const { chainId } = await spokePool.provider.getNetwork(); + const chain = getNetworkName(chainId); + + const fromBlock = Math.max(toBlock - (lookback ?? deploymentBlock), deploymentBlock); + assert(toBlock > fromBlock, `${toBlock} > ${fromBlock}`); + const searchConfig = { fromBlock, toBlock, maxBlockLookBack: maxBlockRange }; + + const tStart = performance.now(); + const filter = getEventFilter(spokePool, eventName, filterArgs[eventName]); + const events = await paginatedEventQuery(spokePool, filter, searchConfig); + const tStop = performance.now(); + logger.debug({ + at: "scrapeEvents", + message: `Scraped ${events.length} ${chain} ${eventName} events in ${Math.round((tStop - tStart) / 1000)} seconds`, + searchConfig, + }); + + return events; +} diff --git a/src/libexec/util/ipc.ts b/src/libexec/util/ipc.ts new file mode 100644 index 000000000..b5bfb81c3 --- /dev/null +++ b/src/libexec/util/ipc.ts @@ -0,0 +1,43 @@ +import { utils as sdkUtils } from "@across-protocol/sdk"; +import { isDefined, sortEventsAscending } from "../../utils"; +import { Log, SpokePoolClientMessage } from "./../types"; + +/** + * Given the inputs for a SpokePoolClient update, consolidate the inputs into a message and submit it to the parent + * process (if defined). + * @param blockNumber Block number up to which the update applies. + * @param currentTime The SpokePool timestamp at blockNumber. + * @param events An array of Log objects to be submitted. + * @returns void + */ +export function postEvents(blockNumber: number, oldestTime: number, currentTime: number, events: Log[]): void { + if (!isDefined(process.send)) { + return; + } + + events = sortEventsAscending(events); + const message: SpokePoolClientMessage = { + blockNumber, + currentTime, + oldestTime, + nEvents: events.length, + data: JSON.stringify(events, sdkUtils.jsonReplacerWithBigNumbers), + }; + process.send(JSON.stringify(message)); +} + +/** + * Given an event removal notification, post the message to the parent process. + * @param event Log instance. + * @returns void + */ +export function removeEvent(event: Log): void { + if (!isDefined(process.send)) { + return; + } + + const message: SpokePoolClientMessage = { + event: JSON.stringify(event, sdkUtils.jsonReplacerWithBigNumbers), + }; + process.send(JSON.stringify(message)); +}