Skip to content

Commit

Permalink
hydra-eval-jobset: incrementally ingest eval results
Browse files Browse the repository at this point in the history
nix-eval-jobs streams output, unlike hydra-eval-jobs. Now that we've
migrated, we can use this to:

1. Use less RAM by avoiding buffering a whole eval's worth of metadata
   into a Perl string and an array of JSON objects.
2. Make evals latency a bit lower by allowing the queue runner to start
   ingesting builds faster.

(cherry picked from commit b0e9b4b)
  • Loading branch information
delroth authored and Ericson2314 committed Dec 10, 2024
1 parent b284d58 commit 1888a64
Showing 1 changed file with 51 additions and 46 deletions.
97 changes: 51 additions & 46 deletions src/script/hydra-eval-jobset
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use Hydra::Helper::Nix;
use Hydra::Model::DB;
use Hydra::Plugin;
use Hydra::Schema;
use IPC::Run;
use JSON::MaybeXS;
use Net::Statsd;
use Nix::Store;
Expand Down Expand Up @@ -383,23 +384,33 @@ sub evalJobs {
print STDERR "evaluator: @escaped\n";
}

(my $res, my $jobsJSONLines, my $stderr) = captureStdoutStderr(21600, @cmd);
die "nix-eval-jobs returned " . ($res & 127 ? "signal $res" : "exit code " . ($res >> 8))
. ":\n" . ($stderr ? decode("utf-8", $stderr) : "(no output)\n")
if $res;

print STDERR "$stderr";
my $evalProc = IPC::Run::start \@cmd,
'>', IPC::Run::new_chunker, \my $out,
'2>', \my $err;

return sub {
while (1) {
$evalProc->pump;
if (!defined $out && !defined $err) {
$evalProc->finish;
if ($?) {
die "nix-eval-jobs returned " . ($? & 127 ? "signal $?" : "exit code " . ($? >> 8)) . "\n";
}
return;
}

# XXX: take advantage of nix-eval-jobs's streaming instead of parsing everything in one block at
# the end.
my @jobs;
foreach my $line (split(/\n/, $jobsJSONLines)) {
last if $line eq "";
if (defined $err) {
print STDERR "$err";
undef $err;
}

push(@jobs, decode_json($line));
if (defined $out && $out ne '') {
my $job = decode_json($out);
undef $out;
return $job;
}
}
};

return @jobs;
}


Expand Down Expand Up @@ -716,17 +727,11 @@ sub checkJobsetWrapped {

# Evaluate the job expression.
my $evalStart = clock_gettime(CLOCK_MONOTONIC);
my @jobs = evalJobs($project->name . ":" . $jobset->name, $inputInfo, $jobset->nixexprinput, $jobset->nixexprpath, $flakeRef);
my $evalStop = clock_gettime(CLOCK_MONOTONIC);

if ($jobsetsJobset) {
die "The .jobsets jobset must only have a single job named 'jobsets'"
unless (scalar @jobs) == 1 && $jobs[0]->{attr} eq "jobsets";
}
Net::Statsd::timing("hydra.evaluator.eval_time", int(($evalStop - $evalStart) * 1000));
my $evalStop;
my $jobsIter = evalJobs($project->name . ":" . $jobset->name, $inputInfo, $jobset->nixexprinput, $jobset->nixexprpath, $flakeRef);

if ($dryRun) {
foreach my $job (@jobs) {
while (defined(my $job = $jobsIter->())) {
my $name = $job->{attr};
if (defined $job->{drvPath}) {
print STDERR "good job $name: $job->{drvPath}\n";
Expand All @@ -737,31 +742,20 @@ sub checkJobsetWrapped {
return;
}

my $jobOutPathMap = {};
my $jobsetChanged = 0;
my $dbStart = clock_gettime(CLOCK_MONOTONIC);


# Store the error messages for jobs that failed to evaluate.
my $evaluationErrorTime = time;
my $evaluationErrorMsg = "";
foreach my $job (@jobs) {
next unless defined $job->{error};
$evaluationErrorMsg .=
($job->{attr} ne "" ? "in job ‘$job->{attr}’" : "at top-level") .
":\n" . $job->{error} . "\n\n";
}
setJobsetError($jobset, $evaluationErrorMsg, $evaluationErrorTime);

my $evaluationErrorRecord = $db->resultset('EvaluationErrors')->create(
{ errormsg => $evaluationErrorMsg
, errortime => $evaluationErrorTime
}
);

my $jobOutPathMap = {};
my $jobsetChanged = 0;
my %buildMap;
$db->txn_do(sub {

$db->txn_do(sub {
my $prevEval = getPrevJobsetEval($db, $jobset, 1);

# Clear the "current" flag on all builds. Since we're in a
Expand All @@ -774,19 +768,26 @@ sub checkJobsetWrapped {
, evaluationerror => $evaluationErrorRecord
, timestamp => time
, checkouttime => abs(int($checkoutStop - $checkoutStart))
, evaltime => abs(int($evalStop - $evalStart))
, evaltime => 0
, hasnewbuilds => 0
, nrbuilds => 0
, flake => $flakeRef
, nixexprinput => $jobset->nixexprinput
, nixexprpath => $jobset->nixexprpath
});

# Schedule each successfully evaluated job.
foreach my $job (permute(@jobs)) {
next if defined $job->{error};
#print STDERR "considering job " . $project->name, ":", $jobset->name, ":", $job->{jobName} . "\n";
checkBuild($db, $jobset, $ev, $inputInfo, $job, \%buildMap, $prevEval, $jobOutPathMap, $plugins);
while (defined(my $job = $jobsIter->())) {
if ($jobsetsJobset) {
die "The .jobsets jobset must only have a single job named 'jobsets'"
unless $job->{attr} eq "jobsets";
}

$evaluationErrorMsg .=
($job->{attr} ne "" ? "in job ‘$job->{attr}’" : "at top-level") .
":\n" . $job->{error} . "\n\n" if defined $job->{error};

checkBuild($db, $jobset, $ev, $inputInfo, $job, \%buildMap, $prevEval, $jobOutPathMap, $plugins)
unless defined $job->{error};
}

# Have any builds been added or removed since last time?
Expand Down Expand Up @@ -881,11 +882,15 @@ sub checkJobsetWrapped {
$jobset->update({ enabled => 0 }) if $jobset->enabled == 2;

$jobset->update({ lastcheckedtime => time, forceeval => undef });
});

my $dbStop = clock_gettime(CLOCK_MONOTONIC);
$evaluationErrorRecord->update({ errormsg => $evaluationErrorMsg });
setJobsetError($jobset, $evaluationErrorMsg, $evaluationErrorTime);

Net::Statsd::timing("hydra.evaluator.db_time", int(($dbStop - $dbStart) * 1000));
$evalStop = clock_gettime(CLOCK_MONOTONIC);
$ev->update({ evaltime => abs(int($evalStop - $evalStart)) });
});

Net::Statsd::timing("hydra.evaluator.eval_time", int(($evalStop - $evalStart) * 1000));
Net::Statsd::increment("hydra.evaluator.evals");
Net::Statsd::increment("hydra.evaluator.cached_evals") unless $jobsetChanged;
}
Expand Down

0 comments on commit 1888a64

Please sign in to comment.