Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(relayer): Factor out common listener utils #1841

Merged
merged 10 commits into from
Sep 30, 2024
59 changes: 4 additions & 55 deletions src/libexec/RelayerSpokePoolIndexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ import minimist from "minimist";
import { Contract, EventFilter, providers as ethersProviders, utils as ethersUtils } from "ethers";
import { utils as sdkUtils } from "@across-protocol/sdk";
import * as utils from "../../scripts/utils";
import { Log } from "../interfaces";
import { SpokePoolClientMessage } from "../clients";
import {
disconnectRedisClients,
EventManager,
Expand All @@ -20,19 +18,13 @@ import {
getWSProviders,
Logger,
paginatedEventQuery,
sortEventsAscending,
winston,
} from "../utils";
import { postEvents, removeEvent } from "./util";
import { Log, ScraperOpts } from "./types";

type WebSocketProvider = ethersProviders.WebSocketProvider;
type EventSearchConfig = sdkUtils.EventSearchConfig;
type ScraperOpts = {
lookback?: number; // Event lookback (in seconds).
deploymentBlock: number; // SpokePool deployment block
maxBlockRange?: number; // Maximum block range for paginated getLogs queries.
filterArgs?: { [event: string]: string[] }; // Event-specific filter criteria to apply.
};

const { NODE_SUCCESS, NODE_APP_ERR } = utils;

const INDEXER_POLLING_PERIOD = 2_000; // ms; time to sleep between checking for exit request via SIGHUP.
Expand Down Expand Up @@ -68,49 +60,6 @@ function getEventFilterArgs(relayer?: string): { [event: string]: string[] } {
return { FilledV3Relay };
}

/**
* Given the inputs for a SpokePoolClient update, consolidate the inputs into a message and submit it to the parent
* process (if defined).
* @param blockNumber Block number up to which the update applies.
* @param currentTime The SpokePool timestamp at blockNumber.
* @param events An array of Log objects to be submitted.
* @returns void
*/
function postEvents(blockNumber: number, currentTime: number, events: Log[]): void {
if (!isDefined(process.send) || stop) {
return;
}

// Drop the array component of event.args and retain the named k/v pairs,
// otherwise stringification tends to retain only the array.
events = sortEventsAscending(events);

const message: SpokePoolClientMessage = {
blockNumber,
currentTime,
oldestTime,
nEvents: events.length,
data: JSON.stringify(events, sdkUtils.jsonReplacerWithBigNumbers),
};
process.send(JSON.stringify(message));
}

/**
* Given an event removal notification, post the message to the parent process.
* @param event Log instance.
* @returns void
*/
function removeEvent(event: Log): void {
if (!isDefined(process.send) || stop) {
return;
}

const message: SpokePoolClientMessage = {
event: JSON.stringify(event, sdkUtils.jsonReplacerWithBigNumbers),
};
process.send(JSON.stringify(message));
}

/**
* Given a SpokePool contract instance and an event name, scrape all corresponding events and submit them to the
* parent process (if defined).
Expand Down Expand Up @@ -146,7 +95,7 @@ async function scrapeEvents(spokePool: Contract, eventName: string, opts: Scrape

const filter = getEventFilter(spokePool, eventName, filterArgs[eventName]);
const events = await pollEvents(filter, searchConfig);
postEvents(toBlock, currentTime, events);
postEvents(toBlock, oldestTime, currentTime, events);
}

/**
Expand Down Expand Up @@ -178,7 +127,7 @@ async function listen(

// Post an update to the parent. Do this irrespective of whether there were new events or not, since there's
// information in blockNumber and currentTime alone.
postEvents(blockNumber, currentTime, events);
postEvents(blockNumber, oldestTime, currentTime, events);
});

// Add a handler for each new instance of a subscribed event.
Expand Down
9 changes: 9 additions & 0 deletions src/libexec/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
export { Log } from "../interfaces";
export { SpokePoolClientMessage } from "../clients";

export type ScraperOpts = {
lookback?: number; // Event lookback (in seconds).
deploymentBlock: number; // SpokePool deployment block
pxrl marked this conversation as resolved.
Show resolved Hide resolved
maxBlockRange?: number; // Maximum block range for paginated getLogs queries.
filterArgs?: { [event: string]: string[] }; // Event-specific filter criteria to apply.
};
46 changes: 46 additions & 0 deletions src/libexec/util.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { utils as sdkUtils } from "@across-protocol/sdk";
import { isDefined, sortEventsAscending } from "../utils";
import { Log, SpokePoolClientMessage } from "./types";

/**
* Given the inputs for a SpokePoolClient update, consolidate the inputs into a message and submit it to the parent
* process (if defined).
* @param blockNumber Block number up to which the update applies.
* @param currentTime The SpokePool timestamp at blockNumber.
* @param events An array of Log objects to be submitted.
* @returns void
*/
export function postEvents(blockNumber: number, oldestTime: number, currentTime: number, events: Log[]): void {
if (!isDefined(process.send) || stop) {
pxrl marked this conversation as resolved.
Show resolved Hide resolved
return;
}

// Drop the array component of event.args and retain the named k/v pairs,
// otherwise stringification tends to retain only the array.
events = sortEventsAscending(events);

const message: SpokePoolClientMessage = {
blockNumber,
currentTime,
oldestTime,
nEvents: events.length,
data: JSON.stringify(events, sdkUtils.jsonReplacerWithBigNumbers),
};
process.send(JSON.stringify(message));
}

/**
* Given an event removal notification, post the message to the parent process.
* @param event Log instance.
* @returns void
*/
export function removeEvent(event: Log): void {
if (!isDefined(process.send) || stop) {
return;
}

const message: SpokePoolClientMessage = {
event: JSON.stringify(event, sdkUtils.jsonReplacerWithBigNumbers),
};
process.send(JSON.stringify(message));
}