Skip to content

Commit

Permalink
Utilize nix::Machine more fully
Browse files Browse the repository at this point in the history
With NixOS/nix#9839, the `storeUri` field is
much better structured, so we can use it while still opening the SSH
connection ourselves.
  • Loading branch information
Ericson2314 committed May 23, 2024
1 parent 346badc commit d55bea2
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 56 deletions.
60 changes: 32 additions & 28 deletions src/hydra-queue-runner/build-remote.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,23 @@

using namespace nix;

namespace nix::build_remote {

static Strings extraStoreArgs(std::string & machine)
bool ::Machine::isLocalhost() const
{
Strings result;
try {
auto parsed = parseURL(machine);
if (parsed.scheme != "ssh") {
throw SysError("Currently, only (legacy-)ssh stores are supported!");
}
machine = parsed.authority.value_or("");
auto remoteStore = parsed.query.find("remote-store");
if (remoteStore != parsed.query.end()) {
result = {"--store", shellEscape(remoteStore->second)};
}
} catch (BadURL &) {
// We just try to continue with `machine->sshName` here for backwards compat.
}

return result;
return storeUri.params.empty() && std::visit(overloaded {
[](const StoreReference::Auto &) {
return true;
},
[](const StoreReference::Specified & s) {
return
(s.scheme == "local" || s.scheme == "unix") ||
((s.scheme == "ssh" || s.scheme == "ssh-ng") &&
s.authority == "localhost");
},
}, storeUri.variant);
}

namespace nix::build_remote {

static std::unique_ptr<SSHMaster::Connection> openConnection(
::Machine::ptr machine, SSHMaster & master)
{
Expand All @@ -51,7 +46,11 @@ static std::unique_ptr<SSHMaster::Connection> openConnection(
command.push_back("--builders");
command.push_back("");
} else {
command.splice(command.end(), extraStoreArgs(machine->sshName));
auto remoteStore = machine->storeUri.params.find("remote-store");
if (remoteStore != machine->storeUri.params.end()) {
command.push_back("--store");
command.push_back(shellEscape(remoteStore->second));
}
}

return master.startCommand(std::move(command), {
Expand Down Expand Up @@ -187,7 +186,7 @@ static BasicDerivation sendInputs(
MaintainCount<counter> mc2(nrStepsCopyingTo);

printMsg(lvlDebug, "sending closure of ‘%s’ to ‘%s’",
localStore.printStorePath(step.drvPath), conn.machine->sshName);
localStore.printStorePath(step.drvPath), conn.machine->storeUri.render());

auto now1 = std::chrono::steady_clock::now();

Expand Down Expand Up @@ -393,8 +392,13 @@ void State::buildRemote(ref<Store> destStore,

updateStep(ssConnecting);

auto * pSpecified = std::get_if<StoreReference::Specified>(&machine->storeUri.variant);
if (!pSpecified || pSpecified->scheme != "ssh") {
throw Error("Currently, only (legacy-)ssh stores are supported!");
}

SSHMaster master {
machine->sshName,
pSpecified->authority,
machine->sshKey,
machine->sshPublicHostKey,
false, // no SSH master yet
Expand Down Expand Up @@ -445,11 +449,11 @@ void State::buildRemote(ref<Store> destStore,
conn.to,
conn.from,
our_version,
machine->sshName);
machine->storeUri.render());
} catch (EndOfFile & e) {
child->sshPid.wait();
std::string s = chomp(readFile(result.logFile));
throw Error("cannot connect to ‘%1%’: %2%", machine->sshName, s);
throw Error("cannot connect to ‘%1%’: %2%", machine->storeUri.render(), s);
}

{
Expand Down Expand Up @@ -480,7 +484,7 @@ void State::buildRemote(ref<Store> destStore,
/* Do the build. */
printMsg(lvlDebug, "building ‘%s’ on ‘%s’",
localStore->printStorePath(step->drvPath),
machine->sshName);
machine->storeUri.render());

updateStep(ssBuilding);

Expand All @@ -503,7 +507,7 @@ void State::buildRemote(ref<Store> destStore,
get a build log. */
if (result.isCached) {
printMsg(lvlInfo, "outputs of ‘%s’ substituted or already valid on ‘%s’",
localStore->printStorePath(step->drvPath), machine->sshName);
localStore->printStorePath(step->drvPath), machine->storeUri.render());
unlink(result.logFile.c_str());
result.logFile = "";
}
Expand Down Expand Up @@ -532,7 +536,7 @@ void State::buildRemote(ref<Store> destStore,

/* Copy each path. */
printMsg(lvlDebug, "copying outputs of ‘%s’ from ‘%s’ (%d bytes)",
localStore->printStorePath(step->drvPath), machine->sshName, totalNarSize);
localStore->printStorePath(step->drvPath), machine->storeUri.render(), totalNarSize);

build_remote::copyPathsFromRemote(conn, narMembers, *localStore, *destStore, infos);
auto now2 = std::chrono::steady_clock::now();
Expand Down Expand Up @@ -571,7 +575,7 @@ void State::buildRemote(ref<Store> destStore,
info->consecutiveFailures = std::min(info->consecutiveFailures + 1, (unsigned int) 4);
info->lastFailure = now;
int delta = retryInterval * std::pow(retryBackoff, info->consecutiveFailures - 1) + (rand() % 30);
printMsg(lvlInfo, "will disable machine ‘%1%’ for %2%s", machine->sshName, delta);
printMsg(lvlInfo, "will disable machine ‘%1%’ for %2%s", machine->storeUri.render(), delta);
info->disabledUntil = now + std::chrono::seconds(delta);
}
throw;
Expand Down
12 changes: 6 additions & 6 deletions src/hydra-queue-runner/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ void State::builder(MachineReservation::ptr reservation)
} catch (std::exception & e) {
printMsg(lvlError, "uncaught exception building ‘%s’ on ‘%s’: %s",
localStore->printStorePath(reservation->step->drvPath),
reservation->machine->sshName,
reservation->machine->storeUri.render(),
e.what());
}
}
Expand Down Expand Up @@ -150,7 +150,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
buildOptions.buildTimeout = build->buildTimeout;

printInfo("performing step ‘%s’ %d times on ‘%s’ (needed by build %d and %d others)",
localStore->printStorePath(step->drvPath), buildOptions.nrRepeats + 1, machine->sshName, buildId, (dependents.size() - 1));
localStore->printStorePath(step->drvPath), buildOptions.nrRepeats + 1, machine->storeUri.render(), buildId, (dependents.size() - 1));
}

if (!buildOneDone)
Expand Down Expand Up @@ -196,7 +196,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
{
auto mc = startDbUpdate();
pqxx::work txn(*conn);
stepNr = createBuildStep(txn, result.startTime, buildId, step, machine->sshName, bsBusy);
stepNr = createBuildStep(txn, result.startTime, buildId, step, machine->storeUri.render(), bsBusy);
txn.commit();
}

Expand Down Expand Up @@ -253,15 +253,15 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
/* Finish the step in the database. */
if (stepNr) {
pqxx::work txn(*conn);
finishBuildStep(txn, result, buildId, stepNr, machine->sshName);
finishBuildStep(txn, result, buildId, stepNr, machine->storeUri.render());
txn.commit();
}

/* The step had a hopefully temporary failure (e.g. network
issue). Retry a number of times. */
if (result.canRetry) {
printMsg(lvlError, "possibly transient failure building ‘%s’ on ‘%s’: %s",
localStore->printStorePath(step->drvPath), machine->sshName, result.errorMsg);
localStore->printStorePath(step->drvPath), machine->storeUri.render(), result.errorMsg);
assert(stepNr);
bool retry;
{
Expand Down Expand Up @@ -452,7 +452,7 @@ void State::failStep(
build->finishedInDB)
continue;
createBuildStep(txn,
0, build->id, step, machine ? machine->sshName : "",
0, build->id, step, machine ? machine->storeUri.render() : "",
result.stepStatus, result.errorMsg, buildId == build->id ? 0 : buildId);
}

Expand Down
2 changes: 1 addition & 1 deletion src/hydra-queue-runner/dispatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ system_time State::doDispatch()
/* Can this machine do this step? */
if (!mi.machine->supportsStep(step)) {
debug("machine '%s' does not support step '%s' (system type '%s')",
mi.machine->sshName, localStore->printStorePath(step->drvPath), step->drv->platform);
mi.machine->storeUri.render(), localStore->printStorePath(step->drvPath), step->drv->platform);
continue;
}

Expand Down
18 changes: 8 additions & 10 deletions src/hydra-queue-runner/hydra-queue-runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ void State::parseMachines(const std::string & contents)
using MaxJobs = std::remove_const<decltype(nix::Machine::maxJobs)>::type;

auto machine = std::make_shared<::Machine>(nix::Machine {
// `storeUri`, not yet used
"",
// `storeUri`
tokens[0],
// `systemTypes`
tokenizeString<StringSet>(tokens[1], ","),
// `sshKey`
Expand All @@ -175,25 +175,23 @@ void State::parseMachines(const std::string & contents)
: "",
});

machine->sshName = tokens[0];

/* Re-use the State object of the previous machine with the
same name. */
auto i = oldMachines.find(machine->sshName);
auto i = oldMachines.find(machine->storeUri.variant);
if (i == oldMachines.end())
printMsg(lvlChatty, "adding new machine ‘%1%’", machine->sshName);
printMsg(lvlChatty, "adding new machine ‘%1%’", machine->storeUri.render());
else
printMsg(lvlChatty, "updating machine ‘%1%’", machine->sshName);
printMsg(lvlChatty, "updating machine ‘%1%’", machine->storeUri.render());
machine->state = i == oldMachines.end()
? std::make_shared<::Machine::State>()
: i->second->state;
newMachines[machine->sshName] = machine;
newMachines[machine->storeUri.variant] = machine;
}

for (auto & m : oldMachines)
if (newMachines.find(m.first) == newMachines.end()) {
if (m.second->enabled)
printInfo("removing machine ‘%1%’", m.first);
printInfo("removing machine ‘%1%’", m.second->storeUri.render());
/* Add a disabled ::Machine object to make sure stats are
maintained. */
auto machine = std::make_shared<::Machine>(*(m.second));
Expand Down Expand Up @@ -657,7 +655,7 @@ void State::dumpStatus(Connection & conn)
machine["avgStepTime"] = (float) s->totalStepTime / s->nrStepsDone;
machine["avgStepBuildTime"] = (float) s->totalStepBuildTime / s->nrStepsDone;
}
statusJson["machines"][m->sshName] = machine;
statusJson["machines"][m->storeUri.render()] = machine;
}
}

Expand Down
13 changes: 2 additions & 11 deletions src/hydra-queue-runner/state.hh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#include <map>
#include <memory>
#include <queue>
#include <regex>

#include <prometheus/counter.h>
#include <prometheus/gauge.h>
Expand Down Expand Up @@ -240,10 +239,6 @@ struct Machine : nix::Machine
{
typedef std::shared_ptr<Machine> ptr;

/* TODO Get rid of: `nix::Machine::storeUri` is normalized in a way
we are not yet used to, but once we are, we don't need this. */
std::string sshName;

struct State {
typedef std::shared_ptr<State> ptr;
counter currentJobs{0};
Expand Down Expand Up @@ -293,11 +288,7 @@ struct Machine : nix::Machine
return true;
}

bool isLocalhost()
{
std::regex r("^(ssh://|ssh-ng://)?localhost$");
return std::regex_search(sshName, r);
}
bool isLocalhost() const;

// A connection to a machine
struct Connection : nix::ServeProto::BasicClientConnection {
Expand Down Expand Up @@ -357,7 +348,7 @@ private:

/* The build machines. */
std::mutex machinesReadyLock;
typedef std::map<std::string, Machine::ptr> Machines;
typedef std::map<nix::StoreReference::Variant, Machine::ptr> Machines;
nix::Sync<Machines> machines; // FIXME: use atomic_shared_ptr

/* Various stats. */
Expand Down

0 comments on commit d55bea2

Please sign in to comment.