Skip to content

Commit

Permalink
use temp collections
Browse files Browse the repository at this point in the history
  • Loading branch information
escottalexander committed Jul 12, 2024
1 parent 31d5c37 commit 68cec71
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 46 deletions.
2 changes: 2 additions & 0 deletions packages/nextjs/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ OSO_GRAPHQL_ENDPOINT=
OSO_API_KEY=
AGORA_API_ENDPOINT=https://vote.optimism.io/api_v1
AGORA_API_KEY=
# Don't let the ETL be called more than every X milliseconds
ETL_WINDOW_MS=85500000 # 23 hours and 45 minutes
# Days prior to check existance when running ETL
FILL_DAYS=7

Expand Down
117 changes: 80 additions & 37 deletions packages/nextjs/pages/api/etl/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,55 +4,61 @@ import type { NextApiRequest, NextApiResponse } from "next";
import { OnchainMetricsByProject } from "~~/app/types/OSO";
import dbConnect from "~~/services/mongodb/dbConnect";
import ETLLog from "~~/services/mongodb/models/etlLog";
import GlobalScore from "~~/services/mongodb/models/globalScore";
import Metric, { MetricNames, Metrics } from "~~/services/mongodb/models/metric";
import GlobalScore, { TempGlobalScore } from "~~/services/mongodb/models/globalScore";
import Metric, { Metrics } from "~~/services/mongodb/models/metric";
import Project from "~~/services/mongodb/models/project";
import ProjectMetricSummary from "~~/services/mongodb/models/projectMetricSummary";
import ProjectScore, { IProjectScore } from "~~/services/mongodb/models/projectScore";
import ProjectMetricSummary, { TempProjectMetricSummary } from "~~/services/mongodb/models/projectMetricSummary";
import ProjectScore, { IProjectScore, TempProjectScore } from "~~/services/mongodb/models/projectScore";

export const maxDuration = 300;
export const runtime = "nodejs";

export default async function handler(req: NextApiRequest, res: NextApiResponse) {
if (req.method !== "GET") {
return res.status(405).json({ message: "Method not allowed." });
}
const weightings = weightingsJSON as { [key in keyof Metrics]: number };
try {
await dbConnect();
const mongooseConnection = await dbConnect();
// Log the ETL process
const existingLog = await ETLLog.findOne({ status: "pending" });
const existingLog = await ETLLog.findOne({}, {}, { sort: { _id: -1 } });
if (existingLog) {
return res.status(400).json({ message: `ETL Process already running as of ${existingLog.date}` });
if (existingLog.status === "pending") {
return res.status(400).json({ message: `ETL Process already running as of ${existingLog.date}` });
}
if (existingLog.status === "success" && isTooEarly(existingLog.date)) {
return res.status(400).json({ message: `Too soon to run ETL Process. Last run: ${existingLog.date}` });
}
}

const log = await ETLLog.create({ date: new Date(), status: "pending" });
res.status(200).json({ result: "ETL Process Started" });
// Start a session
const session = await startSession();

// Start a transaction
// session.startTransaction();
// Get weights from JSON file
const weightings = weightingsJSON as { [key in keyof Metrics]: number };

try {
// Clear out the previous data
console.log("Clearing previous data");
await GlobalScore.deleteMany({}, { session });
await ProjectScore.deleteMany({}, { session });
await ProjectMetricSummary.deleteMany({}, { session });
// Update new data
// Get all the mapping data
const { mapping } = await fetch("http://localhost:3000/api/stub/mapping").then(res => res.json());
const { mapping } = await fetch(`${process.env.NEXT_PUBLIC_API_URL}/stub/mapping`).then(res => res.json());
// Get metrics that are activated
const metrics = await Metric.findAllActivated();
if (!metrics) {
return res.status(404).json({ message: "No metrics found" });
throw new Error("No metrics found");
}
const metricNamesObj: Partial<Metrics> = metrics.reduce((acc: Partial<Metrics>, metric) => {
acc[metric.name] = 0;
return acc;
}, {} as Partial<Metrics>);
const dates = getDatesToProcess();
const globalScoreData = Object.assign({}, MetricNames) as { [key in keyof Metrics]: number };
const globalScoreData = Object.assign({}, metricNamesObj) as { [key in keyof Metrics]: number };
for (const day of dates) {
// Get data from the stubbed API (Should be replaced with call to OSO)
const seriesResponse = await fetch(
`http://localhost:3000/api/stub/series?date=${day.toISOString().split("T")[0]}`,
`${process.env.NEXT_PUBLIC_API_URL}/stub/series?date=${day.toISOString().split("T")[0]}`,
).then(res => res.json());

if (!seriesResponse) {
return res.status(400).json(seriesResponse);
throw new Error("No OSO Data", seriesResponse);
}
const osoData = seriesResponse.data as OnchainMetricsByProject[];
console.log(`Processing OSO response for each project on ${day.toISOString().split("T")[0]}`);
Expand All @@ -79,17 +85,17 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
projectId,
impact_index,
}) as IProjectScore;
await ProjectScore.create([projectScoreData], { session });
await TempProjectScore.create(projectScoreData);

// Add the project metrics to the global score
for (const metric of Object.keys(MetricNames) as (keyof Metrics)[]) {
for (const metric of Object.keys(metricNamesObj) as (keyof Metrics)[]) {
if (projectMetrics[metric]) {
globalScoreData[metric] += projectMetrics[metric];
}
}
}
globalScoreData.impact_index = getImpactIndex(globalScoreData, weightings);
await GlobalScore.create([{ date: day, ...globalScoreData }], { session });
await TempGlobalScore.create({ date: day, ...globalScoreData });
}
// Build the project metric summary data
console.log("Building project metric summary data");
Expand All @@ -99,11 +105,12 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
const day90 = dates[90];
// Iterate over all projects
for await (const project of Project.find({}).cursor()) {
const projectScoreToday = await ProjectScore.find({ date: today, projectId: project.id }).session(session);
const projectScoreDay7 = await ProjectScore.find({ date: day7, projectId: project.id }).session(session);
const projectScoreDay30 = await ProjectScore.find({ date: day30, projectId: project.id }).session(session);
const projectScoreDay90 = await ProjectScore.find({ date: day90, projectId: project.id }).session(session);
for (const metric of Object.keys(MetricNames) as (keyof Metrics)[]) {
console.log(`Processing project ${project.name}`);
const projectScoreToday = await TempProjectScore.find({ date: today, projectId: project.id });
const projectScoreDay7 = await TempProjectScore.find({ date: day7, projectId: project.id });
const projectScoreDay30 = await TempProjectScore.find({ date: day30, projectId: project.id });
const projectScoreDay90 = await TempProjectScore.find({ date: day90, projectId: project.id });
for (const metric of Object.keys(metricNamesObj) as (keyof Metrics)[]) {
const scoreToday = projectScoreToday[0][metric as keyof IProjectScore] as number;
const scoreDay7 = projectScoreDay7[0] ? (projectScoreDay7[0][metric as keyof IProjectScore] as number) : 0;
const scoreDay30 = projectScoreDay30[0] ? (projectScoreDay30[0][metric as keyof IProjectScore] as number) : 0;
Expand All @@ -116,25 +123,56 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
30: getMovement(scoreToday, scoreDay30),
90: getMovement(scoreToday, scoreDay90),
};
await ProjectMetricSummary.create([metricSummary], { session });
await TempProjectMetricSummary.create(metricSummary);
}
}
// Change collection names
console.log("Swapping collection names");
// Start a session
const session = await startSession();
try {
// Start a transaction
session.startTransaction();
const collections = [GlobalScore, ProjectScore, ProjectMetricSummary];
for (const model of collections) {
const name = model.collection.collectionName;
// Rename current collection to old_collection
await mongooseConnection.connection.db.collection(name).rename(`old_${name}`);
// Rename temp_collection to collection
await mongooseConnection.connection.db.collection(`temp_${name}`).rename(name);
// Drop old_collection
await mongooseConnection.connection.db.collection(`old_${name}`).drop();
}

// Commit the transaction
await session.commitTransaction();
} catch (err) {
console.log("Aborting transaction");
await session.abortTransaction();
session.endSession();
throw err;
}
// await session.commitTransaction();

session.endSession();

// Delete temp models
console.log("Removing Temp Models");
await mongooseConnection.deleteModel("temp_GlobalScore");
await mongooseConnection.deleteModel("temp_ProjectScore");
await mongooseConnection.deleteModel("temp_ProjectMetricSummary");
console.log("ETL Process Completed");
} catch (err) {
console.error(err);
// console.log("aborting transaction");
// await session.abortTransaction();
session.endSession();
await ETLLog.updateOne({ _id: log._id }, { status: "error", message: err });
throw new Error(err as string);
}
// Log the ETL process
await ETLLog.updateOne({ _id: log._id }, { status: "completed" });
await ETLLog.updateOne({ _id: log._id }, { status: "success" });
} catch (error) {
console.error(error);
res.status(500).json({ message: "Internal Server Error" });
if (!res.headersSent) {
res.status(500).json({ message: "Internal Server Error" });
}
}
}

Expand Down Expand Up @@ -185,3 +223,8 @@ const getDatesToProcess = (): Date[] => {
}
return dates;
};

const isTooEarly = (lastRunDate: Date): boolean => {
const window = parseInt(process.env.ETL_WINDOW_MS || "0") || 85500000; // 23h 45m
return lastRunDate > new Date(new Date().getTime() - window);
};
4 changes: 4 additions & 0 deletions packages/nextjs/services/mongodb/models/globalScore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ GlobalScoreSchema.statics.findBetweenDates = function (
);
};

export const TempGlobalScore =
(mongoose.models.temp_GlobalScore as IGlobalScoreModel) ||
mongoose.model<IGlobalScore, IGlobalScoreModel>("temp_GlobalScore", GlobalScoreSchema);

const GlobalScore =
(mongoose.models.GlobalScore as IGlobalScoreModel) ||
mongoose.model<IGlobalScore, IGlobalScoreModel>("GlobalScore", GlobalScoreSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ ProjectMetricSummarySchema.statics.findBetweenDates = function (
);
};

export const TempProjectMetricSummary =
(mongoose.models.temp_ProjectMetricSummary as IProjectMetricSummaryModel) ||
mongoose.model<IProjectMetricSummary, IProjectMetricSummaryModel>(
"temp_ProjectMetricSummary",
ProjectMetricSummarySchema,
);

const ProjectMetricSummary =
(mongoose.models.ProjectMetricSummary as IProjectMetricSummaryModel) ||
mongoose.model<IProjectMetricSummary, IProjectMetricSummaryModel>("ProjectMetricSummary", ProjectMetricSummarySchema);
Expand Down
4 changes: 4 additions & 0 deletions packages/nextjs/services/mongodb/models/projectScore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ ProjectScoreSchema.methods.getScore = function (metric: string) {
return record ? record.value : "0";
};

export const TempProjectScore =
(mongoose.models.temp_ProjectScore as IProjectScoreModel) ||
mongoose.model<IProjectScore, IProjectScoreModel>("temp_ProjectScore", ProjectScoreSchema);

const ProjectScore =
(mongoose.models.ProjectScore as IProjectScoreModel) ||
mongoose.model<IProjectScore, IProjectScoreModel>("ProjectScore", ProjectScoreSchema);
Expand Down
18 changes: 9 additions & 9 deletions packages/nextjs/services/mongodb/seed.sample.json
Original file line number Diff line number Diff line change
Expand Up @@ -23820,25 +23820,25 @@
"name": "active_contract_count_90_days",
"description": "string",
"weight": 1,
"activated": true
"activated": false
},
{
"name": "address_count",
"description": "string",
"weight": 1,
"activated": true
"activated": false
},
{
"name": "address_count_90_days",
"description": "string",
"weight": 1,
"activated": true
"activated": false
},
{
"name": "days_since_first_transaction",
"description": "string",
"weight": 1,
"activated": true
"activated": false
},
{
"name": "gas_fees_sum",
Expand All @@ -23850,7 +23850,7 @@
"name": "gas_fees_sum_6_months",
"description": "string",
"weight": 1,
"activated": true
"activated": false
},
{
"name": "high_activity_address_count_90_days",
Expand All @@ -23862,19 +23862,19 @@
"name": "low_activity_address_count_90_days",
"description": "string",
"weight": 1,
"activated": true
"activated": false
},
{
"name": "medium_activity_address_count_90_days",
"description": "string",
"weight": 1,
"activated": true
"activated": false
},
{
"name": "multi_project_address_count_90_days",
"description": "string",
"weight": 1,
"activated": true
"activated": false
},
{
"name": "new_address_count_90_days",
Expand All @@ -23898,7 +23898,7 @@
"name": "transaction_count_6_months",
"description": "string",
"weight": 1,
"activated": true
"activated": false
}
]
}

0 comments on commit 68cec71

Please sign in to comment.