Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Nix's Machine type in a minimal way #1341

Merged
merged 2 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions src/hydra-queue-runner/build-remote.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ static Strings extraStoreArgs(std::string & machine)
return result;
}

static void openConnection(Machine::ptr machine, Path tmpDir, int stderrFD, SSHMaster::Connection & child)
static void openConnection(::Machine::ptr machine, Path tmpDir, int stderrFD, SSHMaster::Connection & child)
{
std::string pgmName;
Pipe to, from;
Expand Down Expand Up @@ -104,7 +104,7 @@ static void openConnection(Machine::ptr machine, Path tmpDir, int stderrFD, SSHM


static void copyClosureTo(
Machine::Connection & conn,
::Machine::Connection & conn,
Store & destStore,
const StorePathSet & paths,
SubstituteFlag useSubstitutes = NoSubstitute)
Expand Down Expand Up @@ -195,7 +195,7 @@ static std::pair<Path, AutoCloseFD> openLogFile(const std::string & logDir, cons
* Therefore, no `ServeProto::Serialize` functions can be used until
* that field is set.
*/
static void handshake(Machine::Connection & conn, unsigned int repeats)
static void handshake(::Machine::Connection & conn, unsigned int repeats)
{
conn.to << SERVE_MAGIC_1 << 0x206;
conn.to.flush();
Expand All @@ -216,7 +216,7 @@ static BasicDerivation sendInputs(
Step & step,
Store & localStore,
Store & destStore,
Machine::Connection & conn,
::Machine::Connection & conn,
unsigned int & overhead,
counter & nrStepsWaiting,
counter & nrStepsCopyingTo
Expand Down Expand Up @@ -272,7 +272,7 @@ static BasicDerivation sendInputs(
}

static BuildResult performBuild(
Machine::Connection & conn,
::Machine::Connection & conn,
Store & localStore,
StorePath drvPath,
const BasicDerivation & drv,
Expand Down Expand Up @@ -317,7 +317,7 @@ static BuildResult performBuild(
}

static std::map<StorePath, ValidPathInfo> queryPathInfos(
Machine::Connection & conn,
::Machine::Connection & conn,
Store & localStore,
StorePathSet & outputs,
size_t & totalNarSize
Expand Down Expand Up @@ -355,7 +355,7 @@ static std::map<StorePath, ValidPathInfo> queryPathInfos(
}

static void copyPathFromRemote(
Machine::Connection & conn,
::Machine::Connection & conn,
NarMemberDatas & narMembers,
Store & localStore,
Store & destStore,
Expand Down Expand Up @@ -385,7 +385,7 @@ static void copyPathFromRemote(
}

static void copyPathsFromRemote(
Machine::Connection & conn,
::Machine::Connection & conn,
NarMemberDatas & narMembers,
Store & localStore,
Store & destStore,
Expand Down Expand Up @@ -462,7 +462,7 @@ void RemoteResult::updateWithBuildResult(const nix::BuildResult & buildResult)


void State::buildRemote(ref<Store> destStore,
Machine::ptr machine, Step::ptr step,
::Machine::ptr machine, Step::ptr step,
const BuildOptions & buildOptions,
RemoteResult & result, std::shared_ptr<ActiveStep> activeStep,
std::function<void(StepState)> updateStep,
Expand Down Expand Up @@ -503,7 +503,7 @@ void State::buildRemote(ref<Store> destStore,
process. Meh. */
});

Machine::Connection conn {
::Machine::Connection conn {
.from = child.out.get(),
.to = child.in.get(),
.machine = machine,
Expand Down
2 changes: 1 addition & 1 deletion src/hydra-queue-runner/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ void State::failStep(
Step::ptr step,
BuildID buildId,
const RemoteResult & result,
Machine::ptr machine,
::Machine::ptr machine,
bool & stepFinished)
{
/* Register failure in the database for all Build objects that
Expand Down
10 changes: 5 additions & 5 deletions src/hydra-queue-runner/dispatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ system_time State::doDispatch()
filter out temporarily disabled machines. */
struct MachineInfo
{
Machine::ptr machine;
::Machine::ptr machine;
unsigned long currentJobs;
};
std::vector<MachineInfo> machinesSorted;
Expand Down Expand Up @@ -231,11 +231,11 @@ system_time State::doDispatch()
sort(machinesSorted.begin(), machinesSorted.end(),
[](const MachineInfo & a, const MachineInfo & b) -> bool
{
float ta = std::round(a.currentJobs / a.machine->speedFactor);
float tb = std::round(b.currentJobs / b.machine->speedFactor);
float ta = std::round(a.currentJobs / a.machine->speedFactorFloat);
float tb = std::round(b.currentJobs / b.machine->speedFactorFloat);
return
ta != tb ? ta < tb :
a.machine->speedFactor != b.machine->speedFactor ? a.machine->speedFactor > b.machine->speedFactor :
a.machine->speedFactorFloat != b.machine->speedFactorFloat ? a.machine->speedFactorFloat > b.machine->speedFactorFloat :
a.currentJobs > b.currentJobs;
});

Expand Down Expand Up @@ -435,7 +435,7 @@ void Jobset::pruneSteps()
}


State::MachineReservation::MachineReservation(State & state, Step::ptr step, Machine::ptr machine)
State::MachineReservation::MachineReservation(State & state, Step::ptr step, ::Machine::ptr machine)
: state(state), step(step), machine(machine)
{
machine->state->currentJobs++;
Expand Down
59 changes: 40 additions & 19 deletions src/hydra-queue-runner/hydra-queue-runner.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <iostream>
#include <thread>
#include <optional>
#include <type_traits>

#include <sys/types.h>
#include <sys/stat.h>
Expand Down Expand Up @@ -140,23 +141,43 @@ void State::parseMachines(const std::string & contents)
if (tokens.size() < 3) continue;
tokens.resize(8);

auto machine = std::make_shared<Machine>();
machine->sshName = tokens[0];
machine->systemTypes = tokenizeString<StringSet>(tokens[1], ",");
machine->sshKey = tokens[2] == "-" ? std::string("") : tokens[2];
if (tokens[3] != "")
machine->maxJobs = string2Int<decltype(machine->maxJobs)>(tokens[3]).value();
else
machine->maxJobs = 1;
machine->speedFactor = atof(tokens[4].c_str());
if (tokens[5] == "-") tokens[5] = "";
machine->supportedFeatures = tokenizeString<StringSet>(tokens[5], ",");
auto supportedFeatures = tokenizeString<StringSet>(tokens[5], ",");

if (tokens[6] == "-") tokens[6] = "";
machine->mandatoryFeatures = tokenizeString<StringSet>(tokens[6], ",");
for (auto & f : machine->mandatoryFeatures)
machine->supportedFeatures.insert(f);
if (tokens[7] != "" && tokens[7] != "-")
machine->sshPublicHostKey = base64Decode(tokens[7]);
auto mandatoryFeatures = tokenizeString<StringSet>(tokens[6], ",");

for (auto & f : mandatoryFeatures)
supportedFeatures.insert(f);

using MaxJobs = std::remove_const<decltype(nix::Machine::maxJobs)>::type;

auto machine = std::make_shared<::Machine>(nix::Machine {
// `storeUri`, not yet used
"",
// `systemTypes`, not yet used
{},
// `sshKey`
tokens[2] == "-" ? "" : tokens[2],
// `maxJobs`
tokens[3] != ""
? string2Int<MaxJobs>(tokens[3]).value()
: 1,
// `speedFactor`, not yet used
1,
// `supportedFeatures`
std::move(supportedFeatures),
// `mandatoryFeatures`
std::move(mandatoryFeatures),
// `sshPublicHostKey`
tokens[7] != "" && tokens[7] != "-"
? base64Decode(tokens[7])
: "",
});

machine->sshName = tokens[0];
machine->systemTypesSet = tokenizeString<StringSet>(tokens[1], ",");
machine->speedFactorFloat = atof(tokens[4].c_str());

/* Re-use the State object of the previous machine with the
same name. */
Expand All @@ -166,7 +187,7 @@ void State::parseMachines(const std::string & contents)
else
printMsg(lvlChatty, "updating machine ‘%1%’", machine->sshName);
machine->state = i == oldMachines.end()
? std::make_shared<Machine::State>()
? std::make_shared<::Machine::State>()
: i->second->state;
newMachines[machine->sshName] = machine;
}
Expand All @@ -175,9 +196,9 @@ void State::parseMachines(const std::string & contents)
if (newMachines.find(m.first) == newMachines.end()) {
if (m.second->enabled)
printInfo("removing machine ‘%1%’", m.first);
/* Add a disabled Machine object to make sure stats are
/* Add a disabled ::Machine object to make sure stats are
maintained. */
auto machine = std::make_shared<Machine>(*(m.second));
auto machine = std::make_shared<::Machine>(*(m.second));
machine->enabled = false;
newMachines[m.first] = machine;
}
Expand Down Expand Up @@ -596,7 +617,7 @@ void State::dumpStatus(Connection & conn)

json machine = {
{"enabled", m->enabled},
{"systemTypes", m->systemTypes},
{"systemTypes", m->systemTypesSet},
{"supportedFeatures", m->supportedFeatures},
{"mandatoryFeatures", m->mandatoryFeatures},
{"nrStepsDone", s->nrStepsDone.load()},
Expand Down
21 changes: 13 additions & 8 deletions src/hydra-queue-runner/state.hh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "sync.hh"
#include "nar-extractor.hh"
#include "serve-protocol.hh"
#include "machines.hh"


typedef unsigned int BuildID;
Expand Down Expand Up @@ -234,17 +235,21 @@ void getDependents(Step::ptr step, std::set<Build::ptr> & builds, std::set<Step:
void visitDependencies(std::function<void(Step::ptr)> visitor, Step::ptr step);


struct Machine
struct Machine : nix::Machine
{
typedef std::shared_ptr<Machine> ptr;

bool enabled{true};
/* TODO Get rid of: `nix::Machine::storeUri` is normalized in a way
we are not yet used to, but once we are, we don't need this. */
std::string sshName;

std::string sshName, sshKey;
std::set<std::string> systemTypes, supportedFeatures, mandatoryFeatures;
unsigned int maxJobs = 1;
float speedFactor = 1.0;
std::string sshPublicHostKey;
/* TODO Get rid once `nix::Machine::systemTypes` is a set not
vector. */
std::set<std::string> systemTypesSet;

/* TODO Get rid once `nix::Machine::systemTypes` is a `float` not
an `int`. */
float speedFactorFloat = 1.0;

struct State {
typedef std::shared_ptr<State> ptr;
Expand Down Expand Up @@ -272,7 +277,7 @@ struct Machine
{
/* Check that this machine is of the type required by the
step. */
if (!systemTypes.count(step->drv->platform == "builtin" ? nix::settings.thisSystem : step->drv->platform))
if (!systemTypesSet.count(step->drv->platform == "builtin" ? nix::settings.thisSystem : step->drv->platform))
return false;

/* Check that the step requires all mandatory features of this
Expand Down
Loading