Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ETL Revamp #55

Merged
merged 1 commit into from
Jul 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/nextjs/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ OSO_GRAPHQL_ENDPOINT=
OSO_API_KEY=
AGORA_API_ENDPOINT=https://vote.optimism.io/api_v1
AGORA_API_KEY=
# Don't let the ETL be called more than every X milliseconds
ETL_WINDOW_MS=85500000 # 23 hours and 45 minutes
# Days prior to check existance when running ETL
FILL_DAYS=7

Expand Down
117 changes: 80 additions & 37 deletions packages/nextjs/pages/api/etl/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,55 +4,61 @@ import type { NextApiRequest, NextApiResponse } from "next";
import { OnchainMetricsByProject } from "~~/app/types/OSO";
import dbConnect from "~~/services/mongodb/dbConnect";
import ETLLog from "~~/services/mongodb/models/etlLog";
import GlobalScore from "~~/services/mongodb/models/globalScore";
import Metric, { MetricNames, Metrics } from "~~/services/mongodb/models/metric";
import GlobalScore, { TempGlobalScore } from "~~/services/mongodb/models/globalScore";
import Metric, { Metrics } from "~~/services/mongodb/models/metric";
import Project from "~~/services/mongodb/models/project";
import ProjectMetricSummary from "~~/services/mongodb/models/projectMetricSummary";
import ProjectScore, { IProjectScore } from "~~/services/mongodb/models/projectScore";
import ProjectMetricSummary, { TempProjectMetricSummary } from "~~/services/mongodb/models/projectMetricSummary";
import ProjectScore, { IProjectScore, TempProjectScore } from "~~/services/mongodb/models/projectScore";

export const maxDuration = 300;
export const runtime = "nodejs";
Copy link
Collaborator

@swellander swellander Jul 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I don't think maxDuration or runtime get used anywhere

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are used by Vercel. By defining them here it knows that this function uses nodejs runtime and is allowed to run the maximum 300 seconds. I should add a comment though.


export default async function handler(req: NextApiRequest, res: NextApiResponse) {
if (req.method !== "GET") {
return res.status(405).json({ message: "Method not allowed." });
}
const weightings = weightingsJSON as { [key in keyof Metrics]: number };
try {
await dbConnect();
const mongooseConnection = await dbConnect();
// Log the ETL process
const existingLog = await ETLLog.findOne({ status: "pending" });
const existingLog = await ETLLog.findOne({}, {}, { sort: { _id: -1 } });
if (existingLog) {
return res.status(400).json({ message: `ETL Process already running as of ${existingLog.date}` });
if (existingLog.status === "pending") {
return res.status(400).json({ message: `ETL Process already running as of ${existingLog.date}` });
}
if (existingLog.status === "success" && isTooEarly(existingLog.date)) {
return res.status(400).json({ message: `Too soon to run ETL Process. Last run: ${existingLog.date}` });
}
}

const log = await ETLLog.create({ date: new Date(), status: "pending" });
res.status(200).json({ result: "ETL Process Started" });
// Start a session
const session = await startSession();

// Start a transaction
// session.startTransaction();
// Get weights from JSON file
const weightings = weightingsJSON as { [key in keyof Metrics]: number };

try {
// Clear out the previous data
console.log("Clearing previous data");
await GlobalScore.deleteMany({}, { session });
await ProjectScore.deleteMany({}, { session });
await ProjectMetricSummary.deleteMany({}, { session });
Comment on lines -33 to -37
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we clear Temp models at the beginning of the process? In case there's an error mid-way through and it needs to be run again?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a bad idea.

// Update new data
// Get all the mapping data
const { mapping } = await fetch("http://localhost:3000/api/stub/mapping").then(res => res.json());
const { mapping } = await fetch(`${process.env.NEXT_PUBLIC_API_URL}/stub/mapping`).then(res => res.json());
// Get metrics that are activated
const metrics = await Metric.findAllActivated();
if (!metrics) {
return res.status(404).json({ message: "No metrics found" });
throw new Error("No metrics found");
}
const metricNamesObj: Partial<Metrics> = metrics.reduce((acc: Partial<Metrics>, metric) => {
acc[metric.name] = 0;
return acc;
}, {} as Partial<Metrics>);
const dates = getDatesToProcess();
const globalScoreData = Object.assign({}, MetricNames) as { [key in keyof Metrics]: number };
const globalScoreData = Object.assign({}, metricNamesObj) as { [key in keyof Metrics]: number };
for (const day of dates) {
Copy link
Collaborator

@swellander swellander Jul 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think parallelizing here would significantly speed things up. I've been messing around with the following

      await Promise.all(
        dates.map(async day => {
          // Fetch data for the day
          const seriesResponse = await fetch(
            `${process.env.NEXT_PUBLIC_API_URL}/stub/series?date=${day.toISOString().split("T")[0]}`,
          ).then(res => res.json());

          if (!seriesResponse) {
            throw new Error(`No OSO Data for ${day}`);
          }
          const osoData = seriesResponse.data as OnchainMetricsByProject[];
          console.log(`Processing OSO response for each project on ${day.toISOString().split("T")[0]}`);

          const projectScoreOps = [];

          for (const project of osoData) {
            const projectMapping = mapping.find((map: any) => map.oso_name === project.project_name);
            if (!projectMapping) {
              console.error(`No mapping found for ${project.project_name}`);
              continue;
            }
            const projectId = projectMapping.application_id;
            const impact_index = getImpactIndex(project as unknown as Metrics, weightings);

            const projectMetrics = {} as { [key in keyof Metrics]: number };
            for (const metric of metrics) {
              const metricValue = project[metric.name as keyof OnchainMetricsByProject];
              if (!isNaN(metricValue as number)) {
                projectMetrics[metric.name as keyof Metrics] = parseInt(metricValue as string);
              }
            }
            const projectScoreData = Object.assign(projectMetrics, {
              date: day,
              projectId,
              impact_index,
            }) as IProjectScore;
            projectScoreOps.push({
              insertOne: {
                document: projectScoreData,
              },
            });

            for (const metric of Object.keys(metricNamesObj) as (keyof Metrics)[]) {
              if (projectMetrics[metric]) {
                globalScoreData[metric] += projectMetrics[metric];
              }
            }
          }

          globalScoreData.impact_index = getImpactIndex(globalScoreData, weightings);

          // Batch insert project scores
          if (projectScoreOps.length > 0) {
            await TempProjectScore.bulkWrite(projectScoreOps);
          }
          await TempGlobalScore.create({ date: day, ...globalScoreData });
        }),
      );

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great!

// Get data from the stubbed API (Should be replaced with call to OSO)
const seriesResponse = await fetch(
`http://localhost:3000/api/stub/series?date=${day.toISOString().split("T")[0]}`,
`${process.env.NEXT_PUBLIC_API_URL}/stub/series?date=${day.toISOString().split("T")[0]}`,
).then(res => res.json());

if (!seriesResponse) {
return res.status(400).json(seriesResponse);
throw new Error("No OSO Data", seriesResponse);
}
const osoData = seriesResponse.data as OnchainMetricsByProject[];
console.log(`Processing OSO response for each project on ${day.toISOString().split("T")[0]}`);
Expand All @@ -79,17 +85,17 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
projectId,
impact_index,
}) as IProjectScore;
await ProjectScore.create([projectScoreData], { session });
await TempProjectScore.create(projectScoreData);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll also need a way to identify and save new Projects that we don't already have in our db. For example, if OSO sends back data for a project that we don't already have in our db, we'd create a ProjectScore instance for it, but not have a corresponding Project instance

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add an issue for this. It is not essential for the MVP and we are waiting to know exactly how new projects are going to be added between OSO and Agora and then us being informed.


// Add the project metrics to the global score
for (const metric of Object.keys(MetricNames) as (keyof Metrics)[]) {
for (const metric of Object.keys(metricNamesObj) as (keyof Metrics)[]) {
if (projectMetrics[metric]) {
globalScoreData[metric] += projectMetrics[metric];
}
}
}
globalScoreData.impact_index = getImpactIndex(globalScoreData, weightings);
await GlobalScore.create([{ date: day, ...globalScoreData }], { session });
await TempGlobalScore.create({ date: day, ...globalScoreData });
}
// Build the project metric summary data
console.log("Building project metric summary data");
Expand All @@ -99,11 +105,12 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
const day90 = dates[90];
// Iterate over all projects
for await (const project of Project.find({}).cursor()) {
Copy link
Collaborator

@swellander swellander Jul 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also build all the metric summary data in parallel to significantly speed things up

      const projectMetricSummaryOps = [];

      const [projects, scoresToday, scoresDay7, scoresDay30, scoresDay90] = await Promise.all([
        Project.find({}).lean(),
        TempProjectScore.find({ date: today }).lean(),
        TempProjectScore.find({ date: day7 }).lean(),
        TempProjectScore.find({ date: day30 }).lean(),
        TempProjectScore.find({ date: day90 }).lean(),
      ]);
      // Fetch all required scores at once

      // Create a map for quick lookup
      const createScoreMap = scores => {
        return scores.reduce((acc, score) => {
          if (!acc[score.projectId]) {
            acc[score.projectId] = {};
          }
          acc[score.projectId] = score;
          return acc;
        }, {});
      };

      const scoreMapToday = createScoreMap(scoresToday);
      const scoreMapDay7 = createScoreMap(scoresDay7);
      const scoreMapDay30 = createScoreMap(scoresDay30);
      const scoreMapDay90 = createScoreMap(scoresDay90);

      projects.forEach(project => {
        console.log(`Processing project ${project.name}`);
        const projectId = project.id;

        for (const metric of Object.keys(metricNamesObj) as (keyof Metrics)[]) {
          const scoreToday = scoreMapToday[projectId]?.[metric] || 0;
          const scoreDay7 = scoreMapDay7[projectId]?.[metric] || 0;
          const scoreDay30 = scoreMapDay30[projectId]?.[metric] || 0;
          const scoreDay90 = scoreMapDay90[projectId]?.[metric] || 0;

          const metricSummary = {
            date: today,
            projectId,
            metricName: metric,
            7: getMovement(scoreToday, scoreDay7),
            30: getMovement(scoreToday, scoreDay30),
            90: getMovement(scoreToday, scoreDay90),
          };

          projectMetricSummaryOps.push({
            insertOne: {
              document: metricSummary,
            },
          });
        }
      });

      // Batch insert project metric summaries
      if (projectMetricSummaryOps.length > 0) {
        await TempProjectMetricSummary.bulkWrite(projectMetricSummaryOps);
      }

const projectScoreToday = await ProjectScore.find({ date: today, projectId: project.id }).session(session);
const projectScoreDay7 = await ProjectScore.find({ date: day7, projectId: project.id }).session(session);
const projectScoreDay30 = await ProjectScore.find({ date: day30, projectId: project.id }).session(session);
const projectScoreDay90 = await ProjectScore.find({ date: day90, projectId: project.id }).session(session);
for (const metric of Object.keys(MetricNames) as (keyof Metrics)[]) {
console.log(`Processing project ${project.name}`);
const projectScoreToday = await TempProjectScore.find({ date: today, projectId: project.id });
const projectScoreDay7 = await TempProjectScore.find({ date: day7, projectId: project.id });
const projectScoreDay30 = await TempProjectScore.find({ date: day30, projectId: project.id });
const projectScoreDay90 = await TempProjectScore.find({ date: day90, projectId: project.id });
for (const metric of Object.keys(metricNamesObj) as (keyof Metrics)[]) {
const scoreToday = projectScoreToday[0][metric as keyof IProjectScore] as number;
const scoreDay7 = projectScoreDay7[0] ? (projectScoreDay7[0][metric as keyof IProjectScore] as number) : 0;
const scoreDay30 = projectScoreDay30[0] ? (projectScoreDay30[0][metric as keyof IProjectScore] as number) : 0;
Expand All @@ -116,25 +123,56 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
30: getMovement(scoreToday, scoreDay30),
90: getMovement(scoreToday, scoreDay90),
};
await ProjectMetricSummary.create([metricSummary], { session });
await TempProjectMetricSummary.create(metricSummary);
}
}
// Change collection names
console.log("Swapping collection names");
// Start a session
const session = await startSession();
try {
// Start a transaction
session.startTransaction();
const collections = [GlobalScore, ProjectScore, ProjectMetricSummary];
for (const model of collections) {
const name = model.collection.collectionName;
// Rename current collection to old_collection
await mongooseConnection.connection.db.collection(name).rename(`old_${name}`);
// Rename temp_collection to collection
await mongooseConnection.connection.db.collection(`temp_${name}`).rename(name);
// Drop old_collection
await mongooseConnection.connection.db.collection(`old_${name}`).drop();
}

// Commit the transaction
await session.commitTransaction();
} catch (err) {
console.log("Aborting transaction");
await session.abortTransaction();
session.endSession();
throw err;
}
// await session.commitTransaction();

session.endSession();

// Delete temp models
console.log("Removing Temp Models");
await mongooseConnection.deleteModel("temp_GlobalScore");
await mongooseConnection.deleteModel("temp_ProjectScore");
await mongooseConnection.deleteModel("temp_ProjectMetricSummary");
console.log("ETL Process Completed");
} catch (err) {
console.error(err);
// console.log("aborting transaction");
// await session.abortTransaction();
session.endSession();
await ETLLog.updateOne({ _id: log._id }, { status: "error", message: err });
throw new Error(err as string);
}
// Log the ETL process
await ETLLog.updateOne({ _id: log._id }, { status: "completed" });
await ETLLog.updateOne({ _id: log._id }, { status: "success" });
} catch (error) {
console.error(error);
res.status(500).json({ message: "Internal Server Error" });
if (!res.headersSent) {
res.status(500).json({ message: "Internal Server Error" });
}
}
}

Expand Down Expand Up @@ -185,3 +223,8 @@ const getDatesToProcess = (): Date[] => {
}
return dates;
};

const isTooEarly = (lastRunDate: Date): boolean => {
const window = parseInt(process.env.ETL_WINDOW_MS || "0") || 85500000; // 23h 45m
return lastRunDate > new Date(new Date().getTime() - window);
};
Comment on lines +227 to +230
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice 👌

4 changes: 4 additions & 0 deletions packages/nextjs/services/mongodb/models/globalScore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ GlobalScoreSchema.statics.findBetweenDates = function (
);
};

export const TempGlobalScore =
(mongoose.models.temp_GlobalScore as IGlobalScoreModel) ||
mongoose.model<IGlobalScore, IGlobalScoreModel>("temp_GlobalScore", GlobalScoreSchema);

const GlobalScore =
(mongoose.models.GlobalScore as IGlobalScoreModel) ||
mongoose.model<IGlobalScore, IGlobalScoreModel>("GlobalScore", GlobalScoreSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ ProjectMetricSummarySchema.statics.findBetweenDates = function (
);
};

export const TempProjectMetricSummary =
(mongoose.models.temp_ProjectMetricSummary as IProjectMetricSummaryModel) ||
mongoose.model<IProjectMetricSummary, IProjectMetricSummaryModel>(
"temp_ProjectMetricSummary",
ProjectMetricSummarySchema,
);

const ProjectMetricSummary =
(mongoose.models.ProjectMetricSummary as IProjectMetricSummaryModel) ||
mongoose.model<IProjectMetricSummary, IProjectMetricSummaryModel>("ProjectMetricSummary", ProjectMetricSummarySchema);
Expand Down
4 changes: 4 additions & 0 deletions packages/nextjs/services/mongodb/models/projectScore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ ProjectScoreSchema.methods.getScore = function (metric: string) {
return record ? record.value : "0";
};

export const TempProjectScore =
(mongoose.models.temp_ProjectScore as IProjectScoreModel) ||
mongoose.model<IProjectScore, IProjectScoreModel>("temp_ProjectScore", ProjectScoreSchema);

const ProjectScore =
(mongoose.models.ProjectScore as IProjectScoreModel) ||
mongoose.model<IProjectScore, IProjectScoreModel>("ProjectScore", ProjectScoreSchema);
Expand Down
18 changes: 9 additions & 9 deletions packages/nextjs/services/mongodb/seed.sample.json
Original file line number Diff line number Diff line change
Expand Up @@ -23820,25 +23820,25 @@
"name": "active_contract_count_90_days",
"description": "string",
"weight": 1,
"activated": true
"activated": false
},
{
"name": "address_count",
"description": "string",
"weight": 1,
"activated": true
"activated": false
},
{
"name": "address_count_90_days",
"description": "string",
"weight": 1,
"activated": true
"activated": false
},
{
"name": "days_since_first_transaction",
"description": "string",
"weight": 1,
"activated": true
"activated": false
},
{
"name": "gas_fees_sum",
Expand All @@ -23850,7 +23850,7 @@
"name": "gas_fees_sum_6_months",
"description": "string",
"weight": 1,
"activated": true
"activated": false
},
{
"name": "high_activity_address_count_90_days",
Expand All @@ -23862,19 +23862,19 @@
"name": "low_activity_address_count_90_days",
"description": "string",
"weight": 1,
"activated": true
"activated": false
},
{
"name": "medium_activity_address_count_90_days",
"description": "string",
"weight": 1,
"activated": true
"activated": false
},
{
"name": "multi_project_address_count_90_days",
"description": "string",
"weight": 1,
"activated": true
"activated": false
},
{
"name": "new_address_count_90_days",
Expand All @@ -23898,7 +23898,7 @@
"name": "transaction_count_6_months",
"description": "string",
"weight": 1,
"activated": true
"activated": false
}
]
}
Loading