From 82e4042054b40aba4639951652fc6a1199277bf2 Mon Sep 17 00:00:00 2001 From: Paul <108695806+pxrl@users.noreply.github.com> Date: Fri, 27 Sep 2024 15:51:24 +0200 Subject: [PATCH 1/2] improve(relayer): Permit up to 120s for SpokePoolClient readiness (#1838) The existing limit of 10 loops was somewhat imprecise and was too short. as observed in production. This would occasionally trigger asserts in the BundleDataClient because it requires all SpokePoolClient instances to be updated. Instead, permit up to 60 seconds before proceeding. --- src/relayer/index.ts | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/src/relayer/index.ts b/src/relayer/index.ts index da8413ee6..1c526c189 100644 --- a/src/relayer/index.ts +++ b/src/relayer/index.ts @@ -61,14 +61,19 @@ export async function runRelayer(_logger: winston.Logger, baseSigner: Signer): P const ready = await relayer.update(); const activeRelayer = redis ? await redis.get(botIdentifier) : undefined; - // If there is another active relayer, allow up to 10 update cycles for this instance to be ready, - // then proceed unconditionally to protect against any RPC outages blocking the relayer. - if (!ready && activeRelayer && run < 10) { - const runTime = Math.round((performance.now() - tLoopStart) / 1000); - const delta = pollingDelay - runTime; - logger.debug({ at: "Relayer#run", message: `Not ready to relay, waiting ${delta} seconds.` }); - await delay(delta); - continue; + // If there is another active relayer, allow up to 120 seconds for this instance to be ready. + // If this instance can't update, throw an error (for now). + if (!ready && activeRelayer) { + if (run * pollingDelay < 120) { + const runTime = Math.round((performance.now() - tLoopStart) / 1000); + const delta = pollingDelay - runTime; + logger.debug({ at: "Relayer#run", message: `Not ready to relay, waiting ${delta} seconds.` }); + await delay(delta); + continue; + } + + const badChains = Object.values(spokePoolClients).filter(({ isUpdated }) => !isUpdated); + throw new Error(`Unable to start relayer due to chains ${badChains}`); } // Signal to any existing relayer that a handover is underway, or alternatively From 4a39db467d09431a0b06ea379db9a50fa52e8b4c Mon Sep 17 00:00:00 2001 From: Paul <108695806+pxrl@users.noreply.github.com> Date: Mon, 30 Sep 2024 14:44:14 +0200 Subject: [PATCH 2/2] refactor(relayer): Factor out common listener utils (#1841) This is a pre-emptive change made to make it easier to reuse common components in alternative listener implementations. Examples for a viem-based listener, as well as an eventual Solana listener. There are subsequent changes in the pipeline to factor out evm-specific parts. --- src/libexec/RelayerSpokePoolIndexer.ts | 139 ++++--------------------- src/libexec/types.ts | 9 ++ src/libexec/util/evm/index.ts | 1 + src/libexec/util/evm/util.ts | 69 ++++++++++++ src/libexec/util/ipc.ts | 43 ++++++++ 5 files changed, 144 insertions(+), 117 deletions(-) create mode 100644 src/libexec/types.ts create mode 100644 src/libexec/util/evm/index.ts create mode 100644 src/libexec/util/evm/util.ts create mode 100644 src/libexec/util/ipc.ts diff --git a/src/libexec/RelayerSpokePoolIndexer.ts b/src/libexec/RelayerSpokePoolIndexer.ts index 951f528e7..e6b211734 100644 --- a/src/libexec/RelayerSpokePoolIndexer.ts +++ b/src/libexec/RelayerSpokePoolIndexer.ts @@ -1,10 +1,8 @@ import assert from "assert"; import minimist from "minimist"; -import { Contract, EventFilter, providers as ethersProviders, utils as ethersUtils } from "ethers"; +import { Contract, providers as ethersProviders, utils as ethersUtils } from "ethers"; import { utils as sdkUtils } from "@across-protocol/sdk"; import * as utils from "../../scripts/utils"; -import { Log } from "../interfaces"; -import { SpokePoolClientMessage } from "../clients"; import { disconnectRedisClients, EventManager, @@ -19,20 +17,13 @@ import { getRedisCache, getWSProviders, Logger, - paginatedEventQuery, - sortEventsAscending, winston, } from "../utils"; +import { postEvents, removeEvent } from "./util/ipc"; +import { ScraperOpts } from "./types"; +import { getEventFilter, getEventFilterArgs, scrapeEvents as _scrapeEvents } from "./util/evm"; type WebSocketProvider = ethersProviders.WebSocketProvider; -type EventSearchConfig = sdkUtils.EventSearchConfig; -type ScraperOpts = { - lookback?: number; // Event lookback (in seconds). - deploymentBlock: number; // SpokePool deployment block - maxBlockRange?: number; // Maximum block range for paginated getLogs queries. - filterArgs?: { [event: string]: string[] }; // Event-specific filter criteria to apply. -}; - const { NODE_SUCCESS, NODE_APP_ERR } = utils; const INDEXER_POLLING_PERIOD = 2_000; // ms; time to sleep between checking for exit request via SIGHUP. @@ -45,108 +36,21 @@ let stop = false; let oldestTime = 0; /** - * Given an event name and contract, return the corresponding Ethers EventFilter object. - * @param contract Ethers Constract instance. - * @param eventName The name of the event to be filtered. - * @param filterArgs Optional filter arguments to be applied. - * @returns An Ethers EventFilter instance. - */ -function getEventFilter(contract: Contract, eventName: string, filterArgs?: string[]): EventFilter { - const filter = contract.filters[eventName]; - if (!isDefined(filter)) { - throw new Error(`Event ${eventName} not defined for contract`); - } - - return isDefined(filterArgs) ? filter(...filterArgs) : filter(); -} - -function getEventFilterArgs(relayer?: string): { [event: string]: string[] } { - const FilledV3Relay = !isDefined(relayer) - ? undefined - : [null, null, null, null, null, null, null, null, null, null, relayer]; - - return { FilledV3Relay }; -} - -/** - * Given the inputs for a SpokePoolClient update, consolidate the inputs into a message and submit it to the parent - * process (if defined). - * @param blockNumber Block number up to which the update applies. - * @param currentTime The SpokePool timestamp at blockNumber. - * @param events An array of Log objects to be submitted. - * @returns void - */ -function postEvents(blockNumber: number, currentTime: number, events: Log[]): void { - if (!isDefined(process.send) || stop) { - return; - } - - // Drop the array component of event.args and retain the named k/v pairs, - // otherwise stringification tends to retain only the array. - events = sortEventsAscending(events); - - const message: SpokePoolClientMessage = { - blockNumber, - currentTime, - oldestTime, - nEvents: events.length, - data: JSON.stringify(events, sdkUtils.jsonReplacerWithBigNumbers), - }; - process.send(JSON.stringify(message)); -} - -/** - * Given an event removal notification, post the message to the parent process. - * @param event Log instance. - * @returns void - */ -function removeEvent(event: Log): void { - if (!isDefined(process.send) || stop) { - return; - } - - const message: SpokePoolClientMessage = { - event: JSON.stringify(event, sdkUtils.jsonReplacerWithBigNumbers), - }; - process.send(JSON.stringify(message)); -} - -/** - * Given a SpokePool contract instance and an event name, scrape all corresponding events and submit them to the - * parent process (if defined). + * Aggregate utils/scrapeEvents for a series of event names. * @param spokePool Ethers Constract instance. - * @param eventName The name of the event to be filtered. + * @param eventNames The array of events to be queried. * @param opts Options to configure event scraping behaviour. * @returns void */ -async function scrapeEvents(spokePool: Contract, eventName: string, opts: ScraperOpts): Promise { - const { lookback, deploymentBlock, filterArgs, maxBlockRange } = opts; - const { provider } = spokePool; - const { chainId } = await provider.getNetwork(); - const chain = getNetworkName(chainId); - - let tStart: number, tStop: number; - - const pollEvents = async (filter: EventFilter, searchConfig: EventSearchConfig): Promise => { - tStart = performance.now(); - const events = await paginatedEventQuery(spokePool, filter, searchConfig); - tStop = performance.now(); - logger.debug({ - at: "SpokePoolIndexer::listen", - message: `Indexed ${events.length} ${chain} events in ${Math.round((tStop - tStart) / 1000)} seconds`, - searchConfig, - }); - return events; - }; - - const { number: toBlock, timestamp: currentTime } = await provider.getBlock("latest"); - const fromBlock = Math.max(toBlock - (lookback ?? deploymentBlock), deploymentBlock); - assert(toBlock > fromBlock, `${toBlock} > ${fromBlock}`); - const searchConfig = { fromBlock, toBlock, maxBlockLookBack: maxBlockRange }; - - const filter = getEventFilter(spokePool, eventName, filterArgs[eventName]); - const events = await pollEvents(filter, searchConfig); - postEvents(toBlock, currentTime, events); +export async function scrapeEvents(spokePool: Contract, eventNames: string[], opts: ScraperOpts): Promise { + const { number: toBlock, timestamp: currentTime } = await spokePool.provider.getBlock("latest"); + const events = await Promise.all( + eventNames.map((eventName) => _scrapeEvents(spokePool, eventName, { ...opts, toBlock }, logger)) + ); + + if (!stop) { + postEvents(toBlock, oldestTime, currentTime, events.flat()); + } } /** @@ -178,7 +82,9 @@ async function listen( // Post an update to the parent. Do this irrespective of whether there were new events or not, since there's // information in blockNumber and currentTime alone. - postEvents(blockNumber, currentTime, events); + if (!stop) { + postEvents(blockNumber, oldestTime, currentTime, events); + } }); // Add a handler for each new instance of a subscribed event. @@ -191,7 +97,9 @@ async function listen( if (event.removed) { eventMgr.remove(event, host); // Notify the parent immediately in case the event was already submitted. - removeEvent(event); + if (!stop) { + removeEvent(event); + } } else { eventMgr.add(event, host); } @@ -277,10 +185,7 @@ async function run(argv: string[]): Promise { if (latestBlock.number > startBlock) { const events = ["V3FundsDeposited", "FilledV3Relay", "RelayedRootBundle", "ExecutedRelayerRefundRoot"]; const _spokePool = spokePool.connect(quorumProvider); - await Promise.all([ - resolveOldestTime(_spokePool, startBlock), - ...events.map((event) => scrapeEvents(_spokePool, event, opts)), - ]); + await Promise.all([resolveOldestTime(_spokePool, startBlock), scrapeEvents(_spokePool, events, opts)]); } // If no lookback was specified then default to the timestamp of the latest block. diff --git a/src/libexec/types.ts b/src/libexec/types.ts new file mode 100644 index 000000000..c1986dfb0 --- /dev/null +++ b/src/libexec/types.ts @@ -0,0 +1,9 @@ +export { Log } from "../interfaces"; +export { SpokePoolClientMessage } from "../clients"; + +export type ScraperOpts = { + lookback?: number; // Event lookback (in seconds). + deploymentBlock: number; // SpokePool deployment block + maxBlockRange?: number; // Maximum block range for paginated getLogs queries. + filterArgs?: { [event: string]: string[] }; // Event-specific filter criteria to apply. +}; diff --git a/src/libexec/util/evm/index.ts b/src/libexec/util/evm/index.ts new file mode 100644 index 000000000..181e76f24 --- /dev/null +++ b/src/libexec/util/evm/index.ts @@ -0,0 +1 @@ +export * from "./util"; diff --git a/src/libexec/util/evm/util.ts b/src/libexec/util/evm/util.ts new file mode 100644 index 000000000..c19a86199 --- /dev/null +++ b/src/libexec/util/evm/util.ts @@ -0,0 +1,69 @@ +import assert from "assert"; +import { Contract, EventFilter } from "ethers"; +import { getNetworkName, isDefined, paginatedEventQuery, winston } from "../../../utils"; +import { Log, ScraperOpts } from "../../types"; + +/** + * Given an event name and contract, return the corresponding Ethers EventFilter object. + * @param contract Ethers Constract instance. + * @param eventName The name of the event to be filtered. + * @param filterArgs Optional filter arguments to be applied. + * @returns An Ethers EventFilter instance. + */ +export function getEventFilter(contract: Contract, eventName: string, filterArgs?: string[]): EventFilter { + const filter = contract.filters[eventName]; + if (!isDefined(filter)) { + throw new Error(`Event ${eventName} not defined for contract`); + } + + return isDefined(filterArgs) ? filter(...filterArgs) : filter(); +} + +/** + * Get a general event filter mapping to be used for filtering SpokePool contract events. + * This is currently only useful for filtering the relayer address on FilledV3Relay events. + * @param relayer Optional relayer address to filter on. + * @returns An argument array for input to an Ethers EventFilter. + */ +export function getEventFilterArgs(relayer?: string): { [event: string]: (null | string)[] } { + const FilledV3Relay = !isDefined(relayer) + ? undefined + : [null, null, null, null, null, null, null, null, null, null, relayer]; + + return { FilledV3Relay }; +} + +/** + * Given a SpokePool contract instance and an event name, scrape all corresponding events and submit them to the + * parent process (if defined). + * @param spokePool Ethers Constract instance. + * @param eventName The name of the event to be filtered. + * @param opts Options to configure event scraping behaviour. + * @returns void + */ +export async function scrapeEvents( + spokePool: Contract, + eventName: string, + opts: ScraperOpts & { toBlock: number }, + logger: winston.Logger +): Promise { + const { lookback, deploymentBlock, filterArgs, maxBlockRange, toBlock } = opts; + const { chainId } = await spokePool.provider.getNetwork(); + const chain = getNetworkName(chainId); + + const fromBlock = Math.max(toBlock - (lookback ?? deploymentBlock), deploymentBlock); + assert(toBlock > fromBlock, `${toBlock} > ${fromBlock}`); + const searchConfig = { fromBlock, toBlock, maxBlockLookBack: maxBlockRange }; + + const tStart = performance.now(); + const filter = getEventFilter(spokePool, eventName, filterArgs[eventName]); + const events = await paginatedEventQuery(spokePool, filter, searchConfig); + const tStop = performance.now(); + logger.debug({ + at: "scrapeEvents", + message: `Scraped ${events.length} ${chain} ${eventName} events in ${Math.round((tStop - tStart) / 1000)} seconds`, + searchConfig, + }); + + return events; +} diff --git a/src/libexec/util/ipc.ts b/src/libexec/util/ipc.ts new file mode 100644 index 000000000..b5bfb81c3 --- /dev/null +++ b/src/libexec/util/ipc.ts @@ -0,0 +1,43 @@ +import { utils as sdkUtils } from "@across-protocol/sdk"; +import { isDefined, sortEventsAscending } from "../../utils"; +import { Log, SpokePoolClientMessage } from "./../types"; + +/** + * Given the inputs for a SpokePoolClient update, consolidate the inputs into a message and submit it to the parent + * process (if defined). + * @param blockNumber Block number up to which the update applies. + * @param currentTime The SpokePool timestamp at blockNumber. + * @param events An array of Log objects to be submitted. + * @returns void + */ +export function postEvents(blockNumber: number, oldestTime: number, currentTime: number, events: Log[]): void { + if (!isDefined(process.send)) { + return; + } + + events = sortEventsAscending(events); + const message: SpokePoolClientMessage = { + blockNumber, + currentTime, + oldestTime, + nEvents: events.length, + data: JSON.stringify(events, sdkUtils.jsonReplacerWithBigNumbers), + }; + process.send(JSON.stringify(message)); +} + +/** + * Given an event removal notification, post the message to the parent process. + * @param event Log instance. + * @returns void + */ +export function removeEvent(event: Log): void { + if (!isDefined(process.send)) { + return; + } + + const message: SpokePoolClientMessage = { + event: JSON.stringify(event, sdkUtils.jsonReplacerWithBigNumbers), + }; + process.send(JSON.stringify(message)); +}