Skip to content

Commit

Permalink
fix(supagraph): update lock logic to clear on good run
Browse files Browse the repository at this point in the history
  • Loading branch information
grezle committed Aug 2, 2023
1 parent c70a316 commit 060af0f
Showing 1 changed file with 76 additions and 66 deletions.
142 changes: 76 additions & 66 deletions packages/supagraph/src/sync/toolkit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -791,12 +791,14 @@ export const sync = async ({
Entity<{
id: string;
locked: boolean;
lockedAt: number;
latestBlock: number;
_block_num: number;
_block_ts: number;
}> & {
id: string;
locked: boolean;
lockedAt: number;
latestBlock: number;
_block_num: number;
_block_ts: number;
Expand All @@ -815,10 +817,15 @@ export const sync = async ({
}: { tx: TransactionReceipt; block: Block; logIndex: number }
) => void | Promise<void>
> = {};

// collect each events abi iface
const eventIfaces: Record<string, ethers.utils.Interface> = {};
const startBlocks: Record<string, number> = {};
const startTimes: Record<string, number> = {};

// boolean to allow the lock to pass
let clearLock = false;

// record locks by chainId
const locked: Record<number, boolean> = {};
const blocked: Record<number, number> = {};
Expand Down Expand Up @@ -866,80 +873,83 @@ export const sync = async ({
// record the chainId
chainIds.add(chainId);

// record the eventNames callback to execute on final sorted list
callbacks[`${getAddress(address)}-${eventName}`] =
callbacks[`${getAddress(address)}-${eventName}`] || onEvent;

// record the event interface so we can reconstruct args to feed to callback
eventIfaces[`${getAddress(address)}-${eventName}`] =
eventIfaces[`${getAddress(address)}-${eventName}`] ||
new ethers.utils.Interface(eventAbi);

// check locked state
if (!Object.hasOwnProperty.call(locked, chainId)) {
// check for lock state by directly pulling the entity (we can't go through store because newDb might be set)
const latest = await engine.db?.get(`__meta__.${chainId}`);
// check locked state - we only want to check this lock once per chainId
if (!Object.hasOwnProperty.call(latestEntity, chainId)) {
// fetch the meta entity from the store (we'll update this once we've committed all changes)
latestEntity[chainId] = await Store.get<{
id: string;
latestBlock: number;
locked: boolean;
lockedAt: number;
_block_num: number;
_block_ts: number;
}>("__meta__", `${chainId}`); // if this is null but we have rows in the collection, we could select the most recent and check the lastBlock
// hydrate the locked bool
locked[chainId] = latest?.locked;
blocked[chainId] = latest?.latestBlock || 0;
}
locked[chainId] = latestEntity[chainId]?.locked;
// check when the lock was instated
blocked[chainId] =
latestEntity[chainId]?.lockedAt ||
latestEntity[chainId]?.latestBlock ||
0;

// toBlock is always "latest" from when we collected the events
latestBlock[chainId] = await provider.getBlock("latest");

// when the db is locked check if it should be released before ending all operations
if (
!newDb &&
locked[chainId] &&
// this offset should be chain dependent
+blocked[chainId] + 10 > (latestBlock[chainId].number || 0) &&
// if the lock hasnt been cleared by another prov
!clearLock
) {
// mark the operation in the log
console.log("--\n\nSync error:", chainId, " - DB is locked", "\n");

// record when we finished the sync operation
const endTime = performance.now();

// print time in console
console.log(
`\n===\n\nTotal execution time: ${(
Number(endTime - startTime) / 1000
).toPrecision(4)}s\n\n`
);

// to block is always latest when we collected the events
latestBlock[chainId] =
// if we've already discovered the latest block from the provider then return it
latestBlock[chainId] ||
// otherwise fetch it from the provider (we need each latest block for each provider)
(await provider.getBlock("latest"));
// return error state to caller
return {
error: "DB is locked",
};
}

// the db is locked (check if this lock should be released)
if (
!newDb &&
locked[chainId] &&
// this offset should be chain dependent - 10 blocks on mantle is 1hr but is ~1min on mantle testnet
+(blocked[chainId]) + 10 < (latestBlock[chainId].number || 0)
) {
// mark the operation in the log
console.log("--\n\nSync error:", chainId, " - DB is locked", "\n");
// if the blocked frame has passed on this provider we can clear locks for other providers
if (+blocked[chainId] + 10 <= (latestBlock[chainId].number || 0)) {
clearLock = true;
}

// record when we finished the sync operation
const endTime = performance.now();
// set the chainId into the engine
Store.setChainId(chainId);
// clear the current block state so we don't disrupt sync timestamps on the entity
Store.clearBlock();

// print time in console
console.log(
`\n===\n\nTotal execution time: ${(
Number(endTime - startTime) / 1000
).toPrecision(4)}s\n\n`
);
// set the lock (this will be released on successful update)
latestEntity[chainId].set("locked", true);
// move the lockedAt to now to prevent adjacent runs unlocking at the same time
latestEntity[chainId].set("lockedAt", latestBlock[chainId].number);

// return error state to caller
return {
error: "DB is locked",
};
// save the new locked state on the chainId (this will save immediately - we're not inside a checkpoint)
await latestEntity[chainId].save();
}

// get the entity once
latestEntity[chainId] =
// if we've already discovered the latest block entry for this provider then return it
latestEntity[chainId] ||
// otherwise fetch it from the store (we'll update this once we've committed all changes)
(await Store.get<{
id: string;
latestBlock: number;
locked: boolean;
_block_num: number;
_block_ts: number;
}>("__meta__", `${chainId}`)); // if this is null but we have rows in the collection, we could select the most recent and check the lastBlock

// set the chainId into the engine
Store.setChainId(chainId);
// clear the current block state so we don't disrupt timestamps on the entity
Store.clearBlock();

// set the lock (this will be released on successful update)
latestEntity[chainId].set("locked", true);
// record the eventNames callback to execute on final sorted list
callbacks[`${getAddress(address)}-${eventName}`] =
callbacks[`${getAddress(address)}-${eventName}`] || onEvent;

// save the new locked state on the chainId
await latestEntity[chainId].save();
// record the event interface so we can reconstruct args to feed to callback
eventIfaces[`${getAddress(address)}-${eventName}`] =
eventIfaces[`${getAddress(address)}-${eventName}`] ||
new ethers.utils.Interface(eventAbi);

// check if we should be pulling events in this sync or using the tmp cache
if (
Expand All @@ -963,7 +973,7 @@ export const sync = async ({
newDb = engine.newDb = true;
}

// record the startBlock
// record the startBlock and time
startBlocks[chainId] = fromBlock;
startTimes[chainId] = fromTime;

Expand Down

0 comments on commit 060af0f

Please sign in to comment.