diff --git a/packages/supagraph/src/sync/toolkit.ts b/packages/supagraph/src/sync/toolkit.ts index 63ee8319..195366c1 100644 --- a/packages/supagraph/src/sync/toolkit.ts +++ b/packages/supagraph/src/sync/toolkit.ts @@ -791,12 +791,14 @@ export const sync = async ({ Entity<{ id: string; locked: boolean; + lockedAt: number; latestBlock: number; _block_num: number; _block_ts: number; }> & { id: string; locked: boolean; + lockedAt: number; latestBlock: number; _block_num: number; _block_ts: number; @@ -815,10 +817,15 @@ export const sync = async ({ }: { tx: TransactionReceipt; block: Block; logIndex: number } ) => void | Promise > = {}; + // collect each events abi iface const eventIfaces: Record = {}; const startBlocks: Record = {}; const startTimes: Record = {}; + + // boolean to allow the lock to pass + let clearLock = false; + // record locks by chainId const locked: Record = {}; const blocked: Record = {}; @@ -866,80 +873,83 @@ export const sync = async ({ // record the chainId chainIds.add(chainId); - // record the eventNames callback to execute on final sorted list - callbacks[`${getAddress(address)}-${eventName}`] = - callbacks[`${getAddress(address)}-${eventName}`] || onEvent; - - // record the event interface so we can reconstruct args to feed to callback - eventIfaces[`${getAddress(address)}-${eventName}`] = - eventIfaces[`${getAddress(address)}-${eventName}`] || - new ethers.utils.Interface(eventAbi); - - // check locked state - if (!Object.hasOwnProperty.call(locked, chainId)) { - // check for lock state by directly pulling the entity (we can't go through store because newDb might be set) - const latest = await engine.db?.get(`__meta__.${chainId}`); + // check locked state - we only want to check this lock once per chainId + if (!Object.hasOwnProperty.call(latestEntity, chainId)) { + // fetch the meta entity from the store (we'll update this once we've committed all changes) + latestEntity[chainId] = await Store.get<{ + id: string; + latestBlock: number; + locked: boolean; + lockedAt: number; + _block_num: number; + _block_ts: number; + }>("__meta__", `${chainId}`); // if this is null but we have rows in the collection, we could select the most recent and check the lastBlock // hydrate the locked bool - locked[chainId] = latest?.locked; - blocked[chainId] = latest?.latestBlock || 0; - } + locked[chainId] = latestEntity[chainId]?.locked; + // check when the lock was instated + blocked[chainId] = + latestEntity[chainId]?.lockedAt || + latestEntity[chainId]?.latestBlock || + 0; + + // toBlock is always "latest" from when we collected the events + latestBlock[chainId] = await provider.getBlock("latest"); + + // when the db is locked check if it should be released before ending all operations + if ( + !newDb && + locked[chainId] && + // this offset should be chain dependent + +blocked[chainId] + 10 > (latestBlock[chainId].number || 0) && + // if the lock hasnt been cleared by another prov + !clearLock + ) { + // mark the operation in the log + console.log("--\n\nSync error:", chainId, " - DB is locked", "\n"); + + // record when we finished the sync operation + const endTime = performance.now(); + + // print time in console + console.log( + `\n===\n\nTotal execution time: ${( + Number(endTime - startTime) / 1000 + ).toPrecision(4)}s\n\n` + ); - // to block is always latest when we collected the events - latestBlock[chainId] = - // if we've already discovered the latest block from the provider then return it - latestBlock[chainId] || - // otherwise fetch it from the provider (we need each latest block for each provider) - (await provider.getBlock("latest")); + // return error state to caller + return { + error: "DB is locked", + }; + } - // the db is locked (check if this lock should be released) - if ( - !newDb && - locked[chainId] && - // this offset should be chain dependent - 10 blocks on mantle is 1hr but is ~1min on mantle testnet - +(blocked[chainId]) + 10 < (latestBlock[chainId].number || 0) - ) { - // mark the operation in the log - console.log("--\n\nSync error:", chainId, " - DB is locked", "\n"); + // if the blocked frame has passed on this provider we can clear locks for other providers + if (+blocked[chainId] + 10 <= (latestBlock[chainId].number || 0)) { + clearLock = true; + } - // record when we finished the sync operation - const endTime = performance.now(); + // set the chainId into the engine + Store.setChainId(chainId); + // clear the current block state so we don't disrupt sync timestamps on the entity + Store.clearBlock(); - // print time in console - console.log( - `\n===\n\nTotal execution time: ${( - Number(endTime - startTime) / 1000 - ).toPrecision(4)}s\n\n` - ); + // set the lock (this will be released on successful update) + latestEntity[chainId].set("locked", true); + // move the lockedAt to now to prevent adjacent runs unlocking at the same time + latestEntity[chainId].set("lockedAt", latestBlock[chainId].number); - // return error state to caller - return { - error: "DB is locked", - }; + // save the new locked state on the chainId (this will save immediately - we're not inside a checkpoint) + await latestEntity[chainId].save(); } - // get the entity once - latestEntity[chainId] = - // if we've already discovered the latest block entry for this provider then return it - latestEntity[chainId] || - // otherwise fetch it from the store (we'll update this once we've committed all changes) - (await Store.get<{ - id: string; - latestBlock: number; - locked: boolean; - _block_num: number; - _block_ts: number; - }>("__meta__", `${chainId}`)); // if this is null but we have rows in the collection, we could select the most recent and check the lastBlock - - // set the chainId into the engine - Store.setChainId(chainId); - // clear the current block state so we don't disrupt timestamps on the entity - Store.clearBlock(); - - // set the lock (this will be released on successful update) - latestEntity[chainId].set("locked", true); + // record the eventNames callback to execute on final sorted list + callbacks[`${getAddress(address)}-${eventName}`] = + callbacks[`${getAddress(address)}-${eventName}`] || onEvent; - // save the new locked state on the chainId - await latestEntity[chainId].save(); + // record the event interface so we can reconstruct args to feed to callback + eventIfaces[`${getAddress(address)}-${eventName}`] = + eventIfaces[`${getAddress(address)}-${eventName}`] || + new ethers.utils.Interface(eventAbi); // check if we should be pulling events in this sync or using the tmp cache if ( @@ -963,7 +973,7 @@ export const sync = async ({ newDb = engine.newDb = true; } - // record the startBlock + // record the startBlock and time startBlocks[chainId] = fromBlock; startTimes[chainId] = fromTime;