Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an option to get 48h of live data history #269

Merged
merged 2 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions apps/fetcher/src/app/trackers/flymaster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import type { protos, TrackerNames } from '@flyxc/common';
import {
fetchResponse,
formatReqError,
LIVE_MINIMAL_INTERVAL_SEC,
LiveDataIntervalSec,
removeBeforeFromLiveTrack,
simplifyLiveTrack,
validateFlymasterAccount,
Expand Down Expand Up @@ -79,7 +79,7 @@ export class FlymasterFetcher extends TrackerFetcher {
const points = parse(flight);
let track = makeLiveTrack(points);
track = removeBeforeFromLiveTrack(track, fetchSecond - 300);
simplifyLiveTrack(track, LIVE_MINIMAL_INTERVAL_SEC);
simplifyLiveTrack(track, LiveDataIntervalSec.Recent);
updates.trackerDeltas.set(dsId, track);
});

Expand Down
4 changes: 2 additions & 2 deletions apps/fetcher/src/app/trackers/inreach.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
fetchResponse,
formatReqError,
Keys,
LIVE_MINIMAL_INTERVAL_SEC,
LiveDataIntervalSec,
parallelTasksWithTimeout,
parseRetryAfterS,
protos,
Expand Down Expand Up @@ -91,7 +91,7 @@ export class InreachFetcher extends TrackerFetcher {
try {
const points = parse(await response.text());
const track = makeLiveTrack(points);
simplifyLiveTrack(track, LIVE_MINIMAL_INTERVAL_SEC);
simplifyLiveTrack(track, LiveDataIntervalSec.Recent);
vicb marked this conversation as resolved.
Show resolved Hide resolved
if (track.timeSec.length > 0) {
updates.trackerDeltas.set(id, track);
}
Expand Down
70 changes: 47 additions & 23 deletions apps/fetcher/src/app/trackers/refresh.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import type { protos } from '@flyxc/common';
import {
Keys,
LIVE_AGE_OLD_SEC,
LIVE_FETCH_TIMEOUT_SEC,
LIVE_MINIMAL_INTERVAL_SEC,
LIVE_OLD_INTERVAL_SEC,
LIVE_TRACKER_RETENTION_SEC,
LiveDataIntervalSec,
LiveDataRetentionSec,
mergeLiveTracks,
removeBeforeFromLiveTrack,
simplifyLiveTrack,
Expand Down Expand Up @@ -34,6 +32,19 @@ export function disconnectOgnClient() {
ognClient.disconnect();
}

/**
* Refreshes all the trackers.
*
* Process:
* - fetch live data
* - update tracks (remove outdated point, decimates point according to their age),
* - add elevation information.
*
* @param pipeline - The ChainableCommander instance for executing commands.
* @param state - The FetcherState object containing current state information.
* @param redis - The Redis client for caching data.
* @param datastore - The Datastore instance for storing data.
*/
export async function resfreshTrackers(
pipeline: ChainableCommander,
state: protos.FetcherState,
Expand All @@ -51,38 +62,52 @@ export async function resfreshTrackers(
new XcontestFetcher(state, pipeline),
];

const updatePromises = await Promise.allSettled(fetchers.map((f) => f.refresh(LIVE_FETCH_TIMEOUT_SEC)));
const fetchResults = await Promise.allSettled(fetchers.map((f) => f.refresh(LIVE_FETCH_TIMEOUT_SEC)));

const trackerUpdates: TrackerUpdates[] = [];

for (const p of updatePromises) {
if (p.status === 'fulfilled') {
const updates = p.value;
for (const result of fetchResults) {
if (result.status === 'fulfilled') {
const updates = result.value;
trackerUpdates.push(updates);
addTrackerLogs(pipeline, updates, state);
} else {
console.error(`Tracker update error: ${p.reason}`);
console.error(`Tracker update error: ${result.reason}`);
}
}

// Drop points older than the max retention.
const nowSec = Math.round(Date.now() / 1000);
const fullStartSec = nowSec - LIVE_TRACKER_RETENTION_SEC;
const dropBeforeSec = nowSec - LiveDataRetentionSec.Max;
vicb marked this conversation as resolved.
Show resolved Hide resolved

// Apply the updates.
for (const [idStr, pilot] of Object.entries(state.pilots)) {
const id = Number(idStr);
// Merge updates
for (const updates of trackerUpdates) {
if (updates.trackerDeltas.has(id)) {
pilot.track = mergeLiveTracks(pilot.track!, updates.trackerDeltas.get(id)!);
pilot.track = mergeLiveTracks(pilot.track, updates.trackerDeltas.get(id));
}
}

// Trim and simplify
pilot.track = removeBeforeFromLiveTrack(pilot.track!, fullStartSec);
simplifyLiveTrack(pilot.track, LIVE_MINIMAL_INTERVAL_SEC);
// Reduce precision for old point.
simplifyLiveTrack(pilot.track, LIVE_OLD_INTERVAL_SEC, { toSec: nowSec - LIVE_AGE_OLD_SEC });
// Remove outdated points
pilot.track = removeBeforeFromLiveTrack(pilot.track, dropBeforeSec);

// Decimates points according to their age.
simplifyLiveTrack(pilot.track, LiveDataIntervalSec.AfterH24, {
toSec: nowSec - 24 * 3600,
});
simplifyLiveTrack(pilot.track, LiveDataIntervalSec.H12ToH24, {
fromSec: nowSec - 24 * 3600,
toSec: nowSec - 12 * 3600,
});
simplifyLiveTrack(pilot.track, LiveDataIntervalSec.H6ToH12, {
fromSec: nowSec - 12 * 3600,
toSec: nowSec - 6 * 3600,
vicb marked this conversation as resolved.
Show resolved Hide resolved
});
simplifyLiveTrack(pilot.track, LiveDataIntervalSec.Recent, {
fromSec: nowSec - 6 * 3600,
});
}

// Add the elevation for the last fix of every tracks when not present.
Expand All @@ -96,19 +121,18 @@ export function addTrackerLogs(
updates: TrackerUpdates,
state: protos.FetcherState,
): void {
const name = updates.name;
const time = updates.startFetchSec;
const { name, startFetchSec } = updates;

pushListCap(
pipeline,
Keys.trackerErrorsByType.replace('{name}', name),
updates.errors.map((e) => `[${time}] ${e}`),
updates.errors.map((e) => `[${startFetchSec}] ${e}`),
20,
);
pushListCap(
pipeline,
Keys.trackerErrorsById.replace('{name}', name),
[...updates.trackerErrors.entries()].map(([id, e]) => `[${time}] id=${id} ${e}`),
[...updates.trackerErrors.entries()].map(([id, e]) => `[${startFetchSec}] id=${id} ${e}`),
20,
);
pushListCap(pipeline, Keys.trackerNumFetches.replace('{name}', name), [updates.fetchedTracker.size], 20);
Expand All @@ -127,16 +151,16 @@ export function addTrackerLogs(
pushListCap(
pipeline,
Keys.trackerConsecutiveErrorsById.replace('{name}', name),
[`[${time}] id=${id} ${error}`],
[`[${startFetchSec}] id=${id} ${error}`],
20,
);
}
const numErrors = state.pilots[id][name].numErrors;
const { numErrors } = state.pilots[id][name];
if (numErrors > 300) {
pushListCap(
pipeline,
Keys.trackerManyErrorsById.replace('{name}', name),
[`[${time}] id=${id} ${numErrors} errors`],
[`[${startFetchSec}] id=${id} ${numErrors} errors`],
20,
);
}
Expand Down
4 changes: 2 additions & 2 deletions apps/fetcher/src/app/trackers/skylines.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
decodeDeltas,
fetchResponse,
formatReqError,
LIVE_MINIMAL_INTERVAL_SEC,
LiveDataIntervalSec,
removeBeforeFromLiveTrack,
simplifyLiveTrack,
validateSkylinesAccount,
Expand Down Expand Up @@ -61,7 +61,7 @@ export class SkylinesFetcher extends TrackerFetcher {
const points = parse(flight);
let track = makeLiveTrack(points);
track = removeBeforeFromLiveTrack(track, keepFromSec);
simplifyLiveTrack(track, LIVE_MINIMAL_INTERVAL_SEC);
simplifyLiveTrack(track, LiveDataIntervalSec.Recent);
vicb marked this conversation as resolved.
Show resolved Hide resolved
updates.trackerDeltas.set(dsId, track);
});
[...sklIdToDsId.values()].forEach((id) => updates.fetchedTracker.add(id));
Expand Down
4 changes: 2 additions & 2 deletions apps/fetcher/src/app/trackers/spot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import type { protos, TrackerNames } from '@flyxc/common';
import {
fetchResponse,
formatReqError,
LIVE_MINIMAL_INTERVAL_SEC,
LiveDataIntervalSec,
simplifyLiveTrack,
validateSpotAccount,
} from '@flyxc/common';
Expand Down Expand Up @@ -43,7 +43,7 @@ export class SpotFetcher extends TrackerFetcher {
try {
const points = parse(await response.text());
const track = makeLiveTrack(points);
simplifyLiveTrack(track, LIVE_MINIMAL_INTERVAL_SEC);
simplifyLiveTrack(track, LiveDataIntervalSec.Recent);
vicb marked this conversation as resolved.
Show resolved Hide resolved
if (track.timeSec.length > 0) {
updates.trackerDeltas.set(id, track);
}
Expand Down
4 changes: 2 additions & 2 deletions apps/fetcher/src/app/trackers/tracker.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Base class for fetching tracker updates.

import type { protos, TrackerNames } from '@flyxc/common';
import { LIVE_REFRESH_SEC, LIVE_TRACKER_RETENTION_SEC } from '@flyxc/common';
import { LIVE_REFRESH_SEC, TRACKERS_MAX_FETCH_DURATION_SEC } from '@flyxc/common';
import type { ChainableCommander } from 'ioredis';

// Updates for a tick of a tracker type (InReach, Spot, ...).
Expand Down Expand Up @@ -123,7 +123,7 @@ export class TrackerFetcher {
const tracker = this.getTracker(id);
const nowSec = Math.round(Date.now() / 1000);
const lastFetchSec = tracker ? tracker.lastFetchSec : nowSec;
return Math.max(startSec - LIVE_TRACKER_RETENTION_SEC, lastFetchSec - paddingSec);
return Math.max(startSec - TRACKERS_MAX_FETCH_DURATION_SEC, lastFetchSec - paddingSec);
}

// Counts the number of requests and errors.
Expand Down
4 changes: 2 additions & 2 deletions apps/fetcher/src/app/trackers/xcontest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import type { protos, TrackerNames } from '@flyxc/common';
import {
fetchResponse,
formatReqError,
LIVE_TRACKER_RETENTION_SEC,
parallelTasksWithTimeout,
TRACKERS_MAX_FETCH_DURATION_SEC,
validateXContestAccount,
} from '@flyxc/common';
import { Secrets } from '@flyxc/secrets';
Expand Down Expand Up @@ -41,7 +41,7 @@ export class XcontestFetcher extends TrackerFetcher {
const xcontestIdToLastFlight = new Map<string, XContestFlight>();

// Get all the users for the retention period
const openTimeMs = new Date().getTime() - LIVE_TRACKER_RETENTION_SEC * 1000;
const openTimeMs = new Date().getTime() - TRACKERS_MAX_FETCH_DURATION_SEC * 1000;

const liveUserUrl =
`https://api.xcontest.org/livedata/users?entity=group:flyxc&source=live&opentime={openTimeISO}`.replace(
Expand Down
16 changes: 8 additions & 8 deletions apps/fetcher/src/app/ufos/refresh.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import type { protos } from '@flyxc/common';
import {
Keys,
LIVE_FETCH_TIMEOUT_SEC,
LIVE_MINIMAL_INTERVAL_SEC,
LIVE_UFO_RETENTION_SEC,
LiveDataIntervalSec,
LiveDataRetentionSec,
mergeLiveTracks,
removeBeforeFromLiveTrack,
simplifyLiveTrack,
Expand Down Expand Up @@ -32,23 +32,23 @@ export async function resfreshUfoFleets(pipeline: ChainableCommander, state: pro
}

const nowSec = Math.round(Date.now() / 1000);
const ufoStartSec = nowSec - LIVE_UFO_RETENTION_SEC;
const ufoStartSec = nowSec - LiveDataRetentionSec.Ufo;

for (const fleetUpdate of fleetUpdates) {
const fleetName = fleetUpdate.fleetName;
const { fleetName } = fleetUpdate;
const ufoTracks = state.ufoFleets[fleetName].ufos ?? {};

for (const [id, livetrack] of fleetUpdate.deltas.entries()) {
for (const [id, track] of fleetUpdate.deltas.entries()) {
if (ufoTracks[id] != null) {
ufoTracks[id] = mergeLiveTracks(ufoTracks[id], livetrack);
ufoTracks[id] = mergeLiveTracks(ufoTracks[id], track);
} else {
ufoTracks[id] = livetrack;
ufoTracks[id] = track;
}
}

// eslint-disable-next-line prefer-const
for (let [id, track] of Object.entries(ufoTracks)) {
simplifyLiveTrack(track, LIVE_MINIMAL_INTERVAL_SEC);
simplifyLiveTrack(track, LiveDataIntervalSec.Recent);
vicb marked this conversation as resolved.
Show resolved Hide resolved
track = removeBeforeFromLiveTrack(track, ufoStartSec);
if (track.timeSec.length == 0) {
delete ufoTracks[id];
Expand Down
Loading
Loading