diff --git a/src/clients/BundleDataClient.ts b/src/clients/BundleDataClient.ts index 0c028f149..7348318f6 100644 --- a/src/clients/BundleDataClient.ts +++ b/src/clients/BundleDataClient.ts @@ -1,3 +1,4 @@ +import assert from "assert"; import * as _ from "lodash"; import { ProposedRootBundle, @@ -16,7 +17,6 @@ import { getRefundInformationFromFill, queryHistoricalDepositForFill, assign, - assert, fixedPointAdjustment, isDefined, toBN, @@ -45,6 +45,15 @@ import { import { BundleDataSS } from "../utils/SuperstructUtils"; import { PoolRebalanceRoot } from "../dataworker/Dataworker"; +// type DataCacheValue = { +// unfilledDeposits: UnfilledDeposit[]; +// fillsToRefund: FillsToRefund; +// allValidFills: FillWithBlock[]; +// allInvalidFills: InvalidFill[]; +// deposits: DepositWithBlock[]; +// earlyDeposits: typechain.FundsDepositedEvent[]; +// }; +// type DataCache = Record; type DataCache = Record>; // V3 dictionary helper functions diff --git a/src/dataworker/Dataworker.ts b/src/dataworker/Dataworker.ts index 4bae77879..8a3aca33a 100644 --- a/src/dataworker/Dataworker.ts +++ b/src/dataworker/Dataworker.ts @@ -15,6 +15,8 @@ import { ZERO_ADDRESS, } from "../utils"; import { + DepositWithBlock, + InvalidFill, ProposedRootBundle, RootBundleRelayWithBlock, SpokePoolClientsByChain, @@ -338,7 +340,9 @@ export class Dataworker { return; } - return blockRangesForProposal; + // Narrow the block range depending on potential data incoherencies. + const safeBlockRanges = await this.narrowProposalBlockRanges(blockRangesForProposal, spokePoolClients); + return safeBlockRanges; } getNextHubChainBundleStartBlock(chainIdList = this.chainIdListForBundleEvaluationBlockNumbers): number { @@ -467,6 +471,104 @@ export class Dataworker { return rootBundleData.dataToPersistToDALayer; } + async narrowProposalBlockRanges( + blockRanges: number[][], + spokePoolClients: SpokePoolClientsByChain + ): Promise { + const chainIds = this.chainIdListForBundleEvaluationBlockNumbers; + const updatedBlockRanges = Object.fromEntries(chainIds.map((chainId, idx) => [chainId, [...blockRanges[idx]]])); + + const { bundleInvalidFillsV3: invalidFills } = await this.clients.bundleDataClient.loadData( + blockRanges, + spokePoolClients + ); + + // Helper to update a chain's end block correctly, accounting for soft-pausing if needed. + const updateEndBlock = (chainId: number, endBlock?: number): void => { + const [currentStartBlock, currentEndBlock] = updatedBlockRanges[chainId]; + const previousEndBlock = this.clients.hubPoolClient.getLatestBundleEndBlockForChain( + chainIds, + this.clients.hubPoolClient.latestBlockSearched, + chainId + ); + + endBlock ??= previousEndBlock; + assert( + endBlock < currentEndBlock, + `Invalid block range update for chain ${chainId}: block ${endBlock} >= ${currentEndBlock}` + ); + + // If endBlock is equal to or before the currentEndBlock then the chain is "soft-paused" by setting the bundle + // start and end block equal to the previous end block. This tells the dataworker to not progress on that chain. + updatedBlockRanges[chainId] = + endBlock > currentStartBlock ? [currentStartBlock, endBlock] : [previousEndBlock, previousEndBlock]; + }; + + // If invalid fills were detected and they appear to be due to gaps in FundsDeposited events: + // - Narrow the origin block range to exclude the missing deposit, AND + // - Narrow the destination block range to exclude the invalid fill. + invalidFills + .filter(({ code }) => code === InvalidFill.DepositIdNotFound) + .forEach(({ fill: { depositId, originChainId, destinationChainId, blockNumber } }) => { + const [originChain, destinationChain] = [getNetworkName(originChainId), getNetworkName(destinationChainId)]; + + // Exclude the missing deposit on the origin chain. + const originSpokePoolClient = spokePoolClients[originChainId]; + let [startBlock, endBlock] = updatedBlockRanges[originChainId]; + + // Find the previous known deposit. This may resolve a deposit before the immediately preceding depositId. + const previousDeposit = originSpokePoolClient + .getDepositsForDestinationChain(destinationChainId) + .filter((deposit: DepositWithBlock) => deposit.blockNumber < blockNumber) + .at(-1); + + updateEndBlock(originChainId, previousDeposit?.blockNumber); + this.logger.debug({ + at: "Dataworker::narrowBlockRanges", + message: `Narrowed proposal block range on ${originChain} due to missing deposit.`, + depositId, + previousBlockRange: [startBlock, endBlock], + newBlockRange: updatedBlockRanges[originChainId], + }); + + // Update the endBlock on the destination chain to exclude the invalid fill. + const destSpokePoolClient = spokePoolClients[destinationChainId]; + [startBlock, endBlock] = updatedBlockRanges[destinationChainId]; + + // The blockNumber is iteratively narrowed in this loop so this fill might already be excluded. + if (blockNumber <= endBlock) { + // Find the fill immediately preceding this invalid fill. + const previousFill = destSpokePoolClient + .getFillsWithBlockInRange(startBlock, Math.max(blockNumber - 1, startBlock)) + .at(-1); + + // Wind back to the bundle end block number to that of the previous fill. + updateEndBlock(destinationChainId, previousFill?.blockNumber); + this.logger.debug({ + at: "Dataworker::narrowBlockRanges", + message: `Narrowed proposal block range on ${destinationChain} due to missing deposit on ${originChain}.`, + depositId, + previousBlockRange: [startBlock, endBlock], + newBlockRange: updatedBlockRanges[destinationChainId], + }); + } + }); + + // Quick sanity check - make sure that the block ranges are coherent. A chain may be soft-paused if it has ongoing + // RPC issues (block ranges are frozen at the previous proposal endBlock), so ensure that this is also handled. + const finalBlockRanges = chainIds.map((chainId) => updatedBlockRanges[chainId]); + const coherentBlockRanges = finalBlockRanges.every(([startBlock, endBlock], idx) => { + const [originalStartBlock] = blockRanges[idx]; + return ( + (endBlock > startBlock && startBlock === originalStartBlock) || + (startBlock === endBlock && startBlock === originalStartBlock - 1) // soft-pause + ); + }); + assert(coherentBlockRanges, "Updated proposal block ranges are incoherent"); + + return finalBlockRanges; + } + async _proposeRootBundle( blockRangesForProposal: number[][], spokePoolClients: SpokePoolClientsByChain, @@ -475,12 +577,12 @@ export class Dataworker { logData = false ): Promise { const timerStart = Date.now(); + const { bundleDepositsV3, bundleFillsV3, bundleSlowFillsV3, unexecutableSlowFills, expiredDepositsToRefundV3 } = await this.clients.bundleDataClient.loadData(blockRangesForProposal, spokePoolClients, loadDataFromArweave); - // Prepare information about what we need to store to - // Arweave for the bundle. We will be doing this at a - // later point so that we can confirm that this data is - // worth storing. + + // Prepare information about what we need to store to Arweave for the bundle. + // We will be doing this at a later point so that we can confirm that this data is worth storing. const dataToPersistToDALayer = { bundleBlockRanges: blockRangesForProposal, bundleDepositsV3, diff --git a/src/interfaces/SpokePool.ts b/src/interfaces/SpokePool.ts index e1e0f3bfe..1e5fdf88b 100644 --- a/src/interfaces/SpokePool.ts +++ b/src/interfaces/SpokePool.ts @@ -1,5 +1,14 @@ -import { SpokePoolClient } from "../clients"; +import { FillWithBlock, SpokePoolClient } from "../clients"; +import * as utils from "../utils/SDKUtils"; export interface SpokePoolClientsByChain { [chainId: number]: SpokePoolClient; } + +export const { InvalidFill } = utils; + +export type InvalidFill = { + fill: FillWithBlock; + code: utils.InvalidFillEnum; + reason: string; +}; diff --git a/src/interfaces/index.ts b/src/interfaces/index.ts index 731e0b93f..f3a1713c8 100644 --- a/src/interfaces/index.ts +++ b/src/interfaces/index.ts @@ -51,6 +51,7 @@ export type PendingRootBundle = interfaces.PendingRootBundle; // SpokePool interfaces export type FundsDepositedEvent = interfaces.FundsDepositedEvent; +export type RelayData = interfaces.RelayData; export type Deposit = interfaces.Deposit; export type DepositWithBlock = interfaces.DepositWithBlock; export type Fill = interfaces.Fill; diff --git a/src/utils/SDKUtils.ts b/src/utils/SDKUtils.ts index 5fd725ed0..9208c9598 100644 --- a/src/utils/SDKUtils.ts +++ b/src/utils/SDKUtils.ts @@ -6,6 +6,8 @@ export type BlockFinderHints = sdk.utils.BlockFinderHints; export class PriceClient extends sdk.priceClient.PriceClient {} export const { acrossApi, coingecko, defiLlama } = sdk.priceClient.adapters; +export type InvalidFillEnum = sdk.utils.InvalidFill; + export const { bnZero, bnOne, @@ -23,6 +25,7 @@ export const { toGWei, toBNWei, formatFeePct, + InvalidFill, shortenHexStrings, convertFromWei, max, diff --git a/test/Dataworker.loadData.ts b/test/Dataworker.loadData.ts index 818dc83d3..938e79774 100644 --- a/test/Dataworker.loadData.ts +++ b/test/Dataworker.loadData.ts @@ -90,6 +90,7 @@ describe("Dataworker: Load data used in all functions", async function () { bundleDepositsV3: {}, expiredDepositsToRefundV3: {}, bundleFillsV3: {}, + bundleInvalidFillsV3: [], unexecutableSlowFills: {}, bundleSlowFillsV3: {}, }); diff --git a/test/Dataworker.proposeRootBundle.ts b/test/Dataworker.proposeRootBundle.ts index 08078c545..d092f8f17 100644 --- a/test/Dataworker.proposeRootBundle.ts +++ b/test/Dataworker.proposeRootBundle.ts @@ -1,6 +1,10 @@ +import assert from "assert"; +import { utils as sdkUtils } from "@across-protocol/sdk"; +import { Dataworker } from "../src/dataworker/Dataworker"; import { HubPoolClient, MultiCallerClient, SpokePoolClient } from "../src/clients"; +import { Deposit } from "../src/interfaces"; import { MAX_UINT_VAL } from "../src/utils"; -import { CHAIN_ID_TEST_LIST, amountToDeposit, destinationChainId } from "./constants"; +import { CHAIN_ID_TEST_LIST, amountToDeposit, destinationChainId, originChainId } from "./constants"; import { setupFastDataworker } from "./fixtures/Dataworker.Fixture"; import { Contract, @@ -11,20 +15,36 @@ import { fillV3, lastSpyLogIncludes, lastSpyLogLevel, + setupTokensForWallet, requestSlowFill, sinon, toBNWei, utf8ToHex, } from "./utils"; +import { MockConfigStoreClient } from "./mocks"; // Tested -// Tested -import { Dataworker } from "../src/dataworker/Dataworker"; -import { MockConfigStoreClient } from "./mocks"; +class TestSpokePoolClient extends SpokePoolClient { + deleteDeposit(depositId: number): void { + const depositHash = this.getDepositHash({ depositId, originChainId: this.chainId }); + delete this.depositHashes[depositHash]; + } + + // Delete the first fill matching the pair of originChainId and depositId. + deleteFill(originChainId: number, depositId: number): void { + const fills = this.fills[originChainId]; + const depositIdx = fills + .map((fill) => `${fill.originChainId}-${fill.depositId}`) + .indexOf(`${originChainId}-${depositId}`); + assert(depositIdx !== -1); + + this.fills[originChainId].splice(depositIdx, 1); + } +} let spy: sinon.SinonSpy; let spokePool_1: Contract, erc20_1: Contract, spokePool_2: Contract, erc20_2: Contract; let l1Token_1: Contract, hubPool: Contract, configStore: Contract; -let depositor: SignerWithAddress; +let depositor: SignerWithAddress, relayer: SignerWithAddress; let hubPoolClient: HubPoolClient, configStoreClient: MockConfigStoreClient; let dataworkerInstance: Dataworker, multiCallerClient: MultiCallerClient; @@ -35,6 +55,7 @@ let updateAllClients: () => Promise; describe("Dataworker: Propose root bundle", async function () { beforeEach(async function () { ({ + relayer, hubPool, spokePool_1, erc20_1, @@ -200,4 +221,202 @@ describe("Dataworker: Propose root bundle", async function () { expect(lastSpyLogIncludes(spy, "Skipping proposal because missing updated ConfigStore version")).to.be.true; expect(lastSpyLogLevel(spy)).to.equal("warn"); }); + + describe("Dataworker: Proposal block range narrowing", async function () { + const nDeposits = 50; + const chainIds = [originChainId, destinationChainId]; + let originSpoke: SpokePoolClient, destinationSpoke: SpokePoolClient; + + beforeEach(async function () { + // Construct special SpokePoolClient instances with the ability to delete events. + [originSpoke, destinationSpoke] = chainIds.map((chainId) => { + const { deploymentBlock, logger, spokePool } = spokePoolClients[chainId]; + const spokePoolClient = new TestSpokePoolClient(logger, spokePool, hubPoolClient, chainId, deploymentBlock); + spokePoolClients[chainId] = spokePoolClient; + return spokePoolClient; + }); + await Promise.all(Object.values(spokePoolClients).map((spokePoolClient) => spokePoolClient.update())); + + const weth = undefined; + await setupTokensForWallet(originSpoke.spokePool, depositor, [erc20_1], weth, nDeposits + 1); + await setupTokensForWallet(destinationSpoke.spokePool, relayer, [erc20_2], weth, nDeposits + 1); + }); + + it("Narrows block ranges for fills where no depositId is found (origin chain deposit gaps)", async function () { + // Prime the origin spoke with a single deposit. Some utility functions involved in searching for deposits + // rely on the lowest deposit ID in order to bound the search by. A missing initial deposit ID would always + // be caught during pre-launch testing, so that's not a concern here. + let deposit = await depositV3( + originSpoke.spokePool, + destinationChainId, + depositor, + erc20_1.address, + amountToDeposit, + erc20_2.address, + amountToDeposit + ); + + // Since the SpokePoolClient relies on SpokePoolClient.latestDepositIdQueried, we can't currently detect + // the _final_ deposit going missing. Some additional refactoring is needed to add this capability. + const missingDepositIdx = Math.floor(Math.random() * (nDeposits - 2)); + let missingDepositId = -1; + + for (let idx = 0; idx < nDeposits; ++idx) { + deposit = await depositV3( + originSpoke.spokePool, + destinationChainId, + depositor, + erc20_1.address, + amountToDeposit, + erc20_2.address, + amountToDeposit + ); + + if (idx === missingDepositIdx) { + missingDepositId = deposit.depositId; + } + + await fillV3Relay(destinationSpoke.spokePool, deposit, relayer); + } + await hubPool.setCurrentTime(deposit.quoteTimestamp + 1); + await hubPoolClient.update(); + + await sdkUtils.mapAsync([originSpoke, destinationSpoke], async (spokePoolClient) => { + await spokePoolClient.spokePool.setCurrentTime(deposit.quoteTimestamp + 1); + await spokePoolClient.update(); + }); + + // Flush the "missing" deposit, which was made but should not be visible to the SpokePoolClient. + originSpoke.deleteDeposit(missingDepositId); + const deposits = originSpoke.getDeposits(); + expect(deposits.length).equals(nDeposits); // 1 (initial) + nDeposits - 1 (missing). + expect(deposits.map(({ depositId }) => depositId).includes(missingDepositId)).is.false; + + // There must be 1 more fill than deposits. + const fills = destinationSpoke.getFills(); + expect(fills.length).equals(nDeposits); + + // Estimate the starting proposal block range for each chain, based on the blocks it has data for. + const proposalChainIds = configStoreClient.getChainIdIndicesForBlock(); + const blockRanges = proposalChainIds.map((chainId) => { + const { deploymentBlock, latestBlockSearched } = spokePoolClients[chainId]; + return [deploymentBlock, latestBlockSearched]; + }); + + // Read in the valid and invalid fills for the specified block ranges. This should contain an invalid fill. + let { allValidFills, allInvalidFills } = await dataworkerInstance.clients.bundleDataClient.loadData( + blockRanges, + spokePoolClients + ); + expect(allValidFills.length).equals(nDeposits - 1); + expect(allInvalidFills.length).equals(1); + expect(allInvalidFills[0].fill.depositId).equals(missingDepositId); + + // Compute the updated end block on the origin chain (excluding the missing deposit). + const safeDepositIdx = deposits.map(({ depositId }) => depositId).indexOf(allInvalidFills[0]?.fill.depositId - 1); + const { blockNumber: safeOriginEndBlock } = deposits[safeDepositIdx]; + + // Compute the updated end block on the destination chain (excluding the "invalid" fill). + const { blockNumber: safeDestinationEndBlock } = allValidFills.reduce( + (acc, curr) => + curr.blockNumber < allInvalidFills[0]?.fill.blockNumber && curr.blockNumber > acc.blockNumber ? curr : acc, + allValidFills[0] + ); + + // Update the expected proposal block ranges. + const safeBlockRanges = blockRanges.map((blockRange) => [...blockRange]); + const [originChainIdx, destinationChainIdx] = [ + proposalChainIds.indexOf(originChainId), + proposalChainIds.indexOf(destinationChainId), + ]; + safeBlockRanges[originChainIdx][1] = safeOriginEndBlock; + safeBlockRanges[destinationChainIdx][1] = safeDestinationEndBlock; + + const narrowedBlockRanges = await dataworkerInstance.narrowProposalBlockRanges(blockRanges, spokePoolClients); + ({ allValidFills, allInvalidFills } = await dataworkerInstance.clients.bundleDataClient.loadData( + narrowedBlockRanges, + spokePoolClients + )); + expect(allInvalidFills.length).equals(0); + expect(narrowedBlockRanges).to.deep.equal(safeBlockRanges); + }); + + it.skip("Narrows proposal block ranges for missing fills (destination chain fill gaps)", async function () { + // Since the SpokePoolClient relies on SpokePoolClient.latestDepositIdQueried, we can't currently detect + // the _final_ deposit going missing. Some additional refactoring is needed to add this detection. + const missingDepositIdx = Math.floor(Math.random() * (nDeposits - 2)); + let missingDepositId = -1; + let fillBlock = -1; + let deposit: Deposit; + for (let idx = 0; idx < nDeposits; ++idx) { + deposit = await depositV3( + hubPoolClient, + originSpoke.spokePool, + erc20_1, + l1Token_1, + depositor, + destinationChainId, + amountToDeposit + ); + + await buildFill(destinationSpoke.spokePool, erc20_2, depositor, relayer, deposit, 1); + if (idx === missingDepositIdx) { + missingDepositId = deposit.depositId; + fillBlock = await destinationSpoke.spokePool.provider.getBlockNumber(); + } + } + assert(fillBlock !== -1); + + await hubPool.setCurrentTime(deposit.quoteTimestamp + 1); + await hubPoolClient.update(); + + await sdkUtils.mapAsync([originSpoke, destinationSpoke], async (spokePoolClient) => { + await spokePoolClient.spokePool.setCurrentTime(deposit.quoteTimestamp + 1); + await spokePoolClient.update(); + }); + + // There must be 1 more deposits than fills. + expect(originSpoke.getDeposits().length).equals(nDeposits); + + // Flush the "missing" fill, which was made but should not be visible to the SpokePoolClient. + destinationSpoke.deleteFill(originChainId, missingDepositId); + const fills = destinationSpoke.getFillsForOriginChain(originChainId); + expect(fills.length).equals(nDeposits - 1); // nDeposits - 1 (missing). + expect(fills.map(({ depositId }) => depositId).includes(missingDepositId)).is.false; + + // Estimate the starting proposal block range for each chain, based on the blocks it has data for. + const proposalChainIds = configStoreClient.getChainIdIndicesForBlock(); + const blockRanges = proposalChainIds.map((chainId) => { + const { deploymentBlock, latestBlockSearched } = spokePoolClients[chainId]; + return [deploymentBlock, latestBlockSearched]; + }); + + // Read in the valid and invalid fills for the specified block ranges. This should contain an invalid fill. + let { allValidFills, allInvalidFills } = await dataworkerInstance.clients.bundleDataClient.loadData( + blockRanges, + spokePoolClients + ); + expect(allValidFills.length).equals(nDeposits - 1); + expect(allInvalidFills.length).equals(0); // One valid fill is _missing_. + + // Compute the updated end block on the destination chain (excluding the "invalid" fill). + const { blockNumber: safeDestinationEndBlock } = allValidFills.reduce( + (acc, curr) => (curr.blockNumber < fillBlock && curr.blockNumber > acc.blockNumber ? curr : acc), + allValidFills[0] + ); + + // Update the expected proposal block ranges. + const safeBlockRanges = blockRanges.map((blockRange) => [...blockRange]); + const destinationChainIdx = proposalChainIds.indexOf(destinationChainId); + safeBlockRanges[destinationChainIdx][1] = safeDestinationEndBlock; + + const narrowedBlockRanges = await dataworkerInstance.narrowProposalBlockRanges(blockRanges, spokePoolClients); + ({ allValidFills, allInvalidFills } = await dataworkerInstance.clients.bundleDataClient.loadData( + narrowedBlockRanges, + spokePoolClients + )); + expect(allInvalidFills.length).equals(0); + expect(narrowedBlockRanges).to.deep.equal(safeBlockRanges); + }); + }); });