Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ETL Revamp #55

Merged
merged 1 commit into from
Jul 14, 2024
Merged

ETL Revamp #55

merged 1 commit into from
Jul 14, 2024

Conversation

escottalexander
Copy link
Collaborator

In this PR I:

  • updated the metric seed data to have the correct metrics enabled/disabled
  • updated ETL endpoint to only allow itself to be called once every 23 hours and 45 minutes.
  • Better error handling in ETL. Should handle multiple calls without issues.
  • Use temporary collections so that the existing data is only removed when the ETL process is successful.

Copy link

vercel bot commented Jul 12, 2024

@escottalexander is attempting to deploy a commit to the BuidlGuidl Team on Vercel.

A member of the Team first needs to authorize it.

Copy link

vercel bot commented Jul 13, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

Name Status Preview Comments Updated (UTC)
onchain-impact-dashboard ✅ Ready (Inspect) Visit Preview 💬 Add feedback Jul 13, 2024 2:14pm

@Franpastoragusti Franpastoragusti merged commit 2741657 into BuidlGuidl:main Jul 14, 2024
2 checks passed
Comment on lines +227 to +230
const isTooEarly = (lastRunDate: Date): boolean => {
const window = parseInt(process.env.ETL_WINDOW_MS || "0") || 85500000; // 23h 45m
return lastRunDate > new Date(new Date().getTime() - window);
};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice 👌

Comment on lines -33 to -37
// Clear out the previous data
console.log("Clearing previous data");
await GlobalScore.deleteMany({}, { session });
await ProjectScore.deleteMany({}, { session });
await ProjectMetricSummary.deleteMany({}, { session });
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we clear Temp models at the beginning of the process? In case there's an error mid-way through and it needs to be run again?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a bad idea.

const dates = getDatesToProcess();
const globalScoreData = Object.assign({}, MetricNames) as { [key in keyof Metrics]: number };
const globalScoreData = Object.assign({}, metricNamesObj) as { [key in keyof Metrics]: number };
for (const day of dates) {
Copy link
Collaborator

@swellander swellander Jul 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think parallelizing here would significantly speed things up. I've been messing around with the following

      await Promise.all(
        dates.map(async day => {
          // Fetch data for the day
          const seriesResponse = await fetch(
            `${process.env.NEXT_PUBLIC_API_URL}/stub/series?date=${day.toISOString().split("T")[0]}`,
          ).then(res => res.json());

          if (!seriesResponse) {
            throw new Error(`No OSO Data for ${day}`);
          }
          const osoData = seriesResponse.data as OnchainMetricsByProject[];
          console.log(`Processing OSO response for each project on ${day.toISOString().split("T")[0]}`);

          const projectScoreOps = [];

          for (const project of osoData) {
            const projectMapping = mapping.find((map: any) => map.oso_name === project.project_name);
            if (!projectMapping) {
              console.error(`No mapping found for ${project.project_name}`);
              continue;
            }
            const projectId = projectMapping.application_id;
            const impact_index = getImpactIndex(project as unknown as Metrics, weightings);

            const projectMetrics = {} as { [key in keyof Metrics]: number };
            for (const metric of metrics) {
              const metricValue = project[metric.name as keyof OnchainMetricsByProject];
              if (!isNaN(metricValue as number)) {
                projectMetrics[metric.name as keyof Metrics] = parseInt(metricValue as string);
              }
            }
            const projectScoreData = Object.assign(projectMetrics, {
              date: day,
              projectId,
              impact_index,
            }) as IProjectScore;
            projectScoreOps.push({
              insertOne: {
                document: projectScoreData,
              },
            });

            for (const metric of Object.keys(metricNamesObj) as (keyof Metrics)[]) {
              if (projectMetrics[metric]) {
                globalScoreData[metric] += projectMetrics[metric];
              }
            }
          }

          globalScoreData.impact_index = getImpactIndex(globalScoreData, weightings);

          // Batch insert project scores
          if (projectScoreOps.length > 0) {
            await TempProjectScore.bulkWrite(projectScoreOps);
          }
          await TempGlobalScore.create({ date: day, ...globalScoreData });
        }),
      );

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great!

@@ -99,11 +105,12 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
const day90 = dates[90];
// Iterate over all projects
for await (const project of Project.find({}).cursor()) {
Copy link
Collaborator

@swellander swellander Jul 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also build all the metric summary data in parallel to significantly speed things up

      const projectMetricSummaryOps = [];

      const [projects, scoresToday, scoresDay7, scoresDay30, scoresDay90] = await Promise.all([
        Project.find({}).lean(),
        TempProjectScore.find({ date: today }).lean(),
        TempProjectScore.find({ date: day7 }).lean(),
        TempProjectScore.find({ date: day30 }).lean(),
        TempProjectScore.find({ date: day90 }).lean(),
      ]);
      // Fetch all required scores at once

      // Create a map for quick lookup
      const createScoreMap = scores => {
        return scores.reduce((acc, score) => {
          if (!acc[score.projectId]) {
            acc[score.projectId] = {};
          }
          acc[score.projectId] = score;
          return acc;
        }, {});
      };

      const scoreMapToday = createScoreMap(scoresToday);
      const scoreMapDay7 = createScoreMap(scoresDay7);
      const scoreMapDay30 = createScoreMap(scoresDay30);
      const scoreMapDay90 = createScoreMap(scoresDay90);

      projects.forEach(project => {
        console.log(`Processing project ${project.name}`);
        const projectId = project.id;

        for (const metric of Object.keys(metricNamesObj) as (keyof Metrics)[]) {
          const scoreToday = scoreMapToday[projectId]?.[metric] || 0;
          const scoreDay7 = scoreMapDay7[projectId]?.[metric] || 0;
          const scoreDay30 = scoreMapDay30[projectId]?.[metric] || 0;
          const scoreDay90 = scoreMapDay90[projectId]?.[metric] || 0;

          const metricSummary = {
            date: today,
            projectId,
            metricName: metric,
            7: getMovement(scoreToday, scoreDay7),
            30: getMovement(scoreToday, scoreDay30),
            90: getMovement(scoreToday, scoreDay90),
          };

          projectMetricSummaryOps.push({
            insertOne: {
              document: metricSummary,
            },
          });
        }
      });

      // Batch insert project metric summaries
      if (projectMetricSummaryOps.length > 0) {
        await TempProjectMetricSummary.bulkWrite(projectMetricSummaryOps);
      }

import ProjectScore, { IProjectScore, TempProjectScore } from "~~/services/mongodb/models/projectScore";

export const maxDuration = 300;
export const runtime = "nodejs";
Copy link
Collaborator

@swellander swellander Jul 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I don't think maxDuration or runtime get used anywhere

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are used by Vercel. By defining them here it knows that this function uses nodejs runtime and is allowed to run the maximum 300 seconds. I should add a comment though.

@@ -79,17 +85,17 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
projectId,
impact_index,
}) as IProjectScore;
await ProjectScore.create([projectScoreData], { session });
await TempProjectScore.create(projectScoreData);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll also need a way to identify and save new Projects that we don't already have in our db. For example, if OSO sends back data for a project that we don't already have in our db, we'd create a ProjectScore instance for it, but not have a corresponding Project instance

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add an issue for this. It is not essential for the MVP and we are waiting to know exactly how new projects are going to be added between OSO and Agora and then us being informed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants