Skip to content

Commit

Permalink
minicargo - Use jobserver if -j isn't passed
Browse files Browse the repository at this point in the history
  • Loading branch information
thepowersgang committed Sep 22, 2023
1 parent b1aea89 commit d4f6734
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 59 deletions.
38 changes: 12 additions & 26 deletions tools/common/jobserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,39 +26,26 @@
#ifdef _WIN32
class JobServer_Client: public JobServer
{
HANDLE m_local_sem_handle;
HANDLE m_sem_handle;
public:
JobServer_Client(size_t max_jobs, std::string path, HANDLE sem_handle)
: m_local_sem_handle(CreateSemaphore(nullptr, max_jobs, max_jobs, nullptr))
, m_sem_handle(sem_handle)
JobServer_Client(std::string path, HANDLE sem_handle)
: m_sem_handle(sem_handle)
{
}
~JobServer_Client()
{
CloseHandle(m_local_sem_handle);
CloseHandle(m_sem_handle);
}

bool take_one(unsigned long timeout_ms) override {
auto t = timeGetTime();
if( WaitForSingleObject(m_local_sem_handle, timeout_ms) != 0 ) {
return false;
}
auto dt_ms = timeGetTime() - t;
auto new_timeout = timeout_ms == INFINITE ? INFINITE :
(timeout_ms < dt_ms ? 0 : timeout_ms - dt_ms)
;
if(WaitForSingleObject(m_sem_handle, new_timeout) == 0) {
if(WaitForSingleObject(m_sem_handle, timeout_ms) == 0) {
return true;
}
else {
ReleaseSemaphore(m_local_sem_handle, 1, NULL);
return false;
}
}
void return_one() override {
ReleaseSemaphore(m_local_sem_handle, 1, NULL);
ReleaseSemaphore(m_sem_handle, 1, NULL);
}
};
Expand Down Expand Up @@ -99,15 +86,13 @@ class JobServer_Server: public JobServer
#else
class JobServer_Client: public JobServer
{
size_t m_jobs;
int m_fd_read;
int m_fd_write;
std::vector<uint8_t> m_held_tokens;
//::std::semaphore m_sem;
public:
JobServer_Client(size_t max_jobs, int fd_read, int fd_write = -1)
: m_jobs(max_jobs)
, m_fd_read(fd_read)
JobServer_Client(int fd_read, int fd_write = -1)
: m_fd_read(fd_read)
, m_fd_write(fd_write)
{
assert(fd_read >= 0);
Expand Down Expand Up @@ -245,11 +230,8 @@ class JobServer_Server: public JobServer
#endif


::std::unique_ptr<JobServer> JobServer::create(size_t max_jobs)
::std::unique_ptr<JobServer> JobServer::create(size_t server_jobs)
{
if( max_jobs < 1 ) {
return nullptr;
}
const auto* makeflags = getenv("MAKEFLAGS");

const char* jobserver_auth = nullptr;
Expand Down Expand Up @@ -277,7 +259,7 @@ ::std::unique_ptr<JobServer> JobServer::create(size_t max_jobs)
// - Windows: named semaphore
auto sem_handle = OpenSemaphoreA(0,FALSE,auth_str.c_str());
if( sem_handle ) {
return ::std::make_unique<JobServer_Client>(max_jobs, auth_str, sem_handle);
return ::std::make_unique<JobServer_Client>(auth_str, sem_handle);
}
#else
// - Named pipe: `fifo:PATH`
Expand Down Expand Up @@ -305,5 +287,9 @@ ::std::unique_ptr<JobServer> JobServer::create(size_t max_jobs)
}
#endif
}
return ::std::make_unique<JobServer_Server>(max_jobs);
// If no `-j` option is passed to this application, then don't create a jobserver
if(server_jobs == 0) {
return nullptr;
}
return ::std::make_unique<JobServer_Server>(server_jobs);
}
7 changes: 6 additions & 1 deletion tools/common/jobserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@ class JobServer
{
public:
virtual ~JobServer() {}
static ::std::unique_ptr<JobServer> create(size_t max_jobs);
/// <summary>
/// Create a jobserver instance (client, or server if `server_jobs` is non-zero and there isn't already a server)
/// </summary>
/// <param name="server_jobs">Number of downstream job slots to expose</param>
/// <returns></returns>
static ::std::unique_ptr<JobServer> create(size_t server_jobs);

virtual bool take_one(unsigned long timeout_ms) = 0;
virtual void return_one() = 0;
Expand Down
6 changes: 3 additions & 3 deletions tools/minicargo/build.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ BuildList::BuildList(const PackageManifest& manifest, const BuildOptions& opts):
}
}

bool BuildList::build(BuildOptions opts, unsigned num_jobs)
bool BuildList::build(BuildOptions opts, unsigned num_jobs, bool dry_run)
{
bool include_build = !opts.build_script_overrides.is_valid();
Builder builder { opts, m_list.size() };
Expand Down Expand Up @@ -322,7 +322,7 @@ bool BuildList::build(BuildOptions opts, unsigned num_jobs)
}

// Actually do the build
if( num_jobs > 1 )
if( !dry_run && num_jobs > 1 )
{
#ifndef DISABLE_MULTITHREAD
class Semaphore
Expand Down Expand Up @@ -480,7 +480,7 @@ bool BuildList::build(BuildOptions opts, unsigned num_jobs)
}
#endif
}
else if( num_jobs == 1 )
else if( !dry_run )
{
while( !state.build_queue.empty() )
{
Expand Down
2 changes: 1 addition & 1 deletion tools/minicargo/build.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,5 @@ class BuildList
::std::vector<Entry> m_list;
public:
BuildList(const PackageManifest& manifest, const BuildOptions& opts);
bool build(BuildOptions opts, unsigned num_jobs); // 0 = 1 job
bool build(BuildOptions opts, unsigned num_jobs, bool dry_run); // 0 = either 1 job, or as many as we can take from the jobserver
};
6 changes: 3 additions & 3 deletions tools/minicargo/build2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,12 +280,12 @@ BuildList::BuildList(const PackageManifest& manifest, const BuildOptions& opts):
m_list.push_back({ e.package, e.native, {} });
}
}
bool BuildList::build(BuildOptions opts, unsigned num_jobs)
bool BuildList::build(BuildOptions opts, unsigned num_jobs, bool dry_run)
{
bool cross_compiling = (opts.target_name != nullptr);

RunState run_state { opts, cross_compiling };
JobList runner(num_jobs);
JobList runner;

struct ConvertState {
JobList& joblist;
Expand Down Expand Up @@ -480,7 +480,7 @@ bool BuildList::build(BuildOptions opts, unsigned num_jobs)
//case BuildOptions::Mode::Examples:
}

return runner.run_all();
return runner.run_all(num_jobs, dry_run);
}

namespace {
Expand Down
56 changes: 39 additions & 17 deletions tools/minicargo/jobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,30 @@ void JobList::add_job(::std::unique_ptr<Job> job)
waiting_jobs.push_back(std::move(job));
}

bool JobList::run_all()
bool JobList::run_all(size_t num_jobs, bool dry_run)
{
auto jobserver = this->num_jobs > 1 ? JobServer::create(this->num_jobs-1) : ::std::unique_ptr<JobServer>();
if( this->num_jobs > 1 ) {
auto jobserver = (dry_run || num_jobs == 1) ? ::std::unique_ptr<JobServer>() // Dry run or being forced to a single build task? force no jobserver
: num_jobs == 0 ? JobServer::create(0) // If no `-j` option is passed, try and get a jobserver client but don't create a server
: JobServer::create(num_jobs-1) // `-j` was passed with a non-1 count, allow `JobServer` to create a server (keeping one job for ourselves)
;
if( !dry_run && num_jobs > 1 ) {
assert(jobserver);
}
auto total_job_count = this->waiting_jobs.size();
// If there is no jobserver client, and `-j` wasn't passed - force the job count to 1
// - This simplifies the logic lower down.
if(!jobserver && num_jobs == 0) {
num_jobs = 1;
}
const auto total_job_count = this->waiting_jobs.size();

bool failed = false;
/// Number of jobserver tokens taken
size_t n_tokens = 0;
// Force the main loop to wait until a task completes. Set when there is nothing waiting to run.
bool force_wait = false;

auto dump_state = [&]() {
auto num_complete = total_job_count - this->running_jobs.size() - this->running_jobs.size() - this->waiting_jobs.size();
auto num_complete = total_job_count - this->runnable_jobs.size() - this->running_jobs.size() - this->waiting_jobs.size();
::std::cerr
<< " ("
<< std::fixed << std::setprecision(1) << (100 * static_cast<double>(num_complete) / total_job_count) << "% "
Expand All @@ -66,8 +76,13 @@ bool JobList::run_all()
while( !this->waiting_jobs.empty() || !this->runnable_jobs.empty() || !this->running_jobs.empty() )
{
// Wait until a running job stops
if( this->num_jobs > 0 ) {
while( (force_wait || this->running_jobs.size() >= this->num_jobs) && !this->running_jobs.empty() )
if( !dry_run ) {
// Keep looping while:
// - There is at least one running job
// - AND any of:
// - A wait is being forced (due to an empty runnable queue)
// - We're being limited by our own internal limits (either via `-j` or because there's no jobserver)
while( this->running_jobs.size() > 0 && (force_wait || (num_jobs > 0 && this->running_jobs.size() >= num_jobs)) )
{
if( !wait_one() ) {
dump_state();
Expand All @@ -81,9 +96,8 @@ bool JobList::run_all()
break;
}
// Release jobserver tokens
if( this->num_jobs > 1 ) {
// - if none running, then release `n_tokens` entirely
// - if one running, then release none?
if( jobserver ) {
// Has to handle the case where there are now no jobs running, and NOT release the implicit token we already own
while( n_tokens > 0 && 1+n_tokens > this->running_jobs.size()) {
n_tokens -= 1;
jobserver->return_one();
Expand Down Expand Up @@ -122,12 +136,20 @@ bool JobList::run_all()

if( this->running_jobs.empty() )
{
// Nothing running, so we don't need to acquire a token (we're using our own token)
}
else if( this->num_jobs > 1)
else if( jobserver )
{
assert(this->num_jobs > 1);
if( !jobserver->take_one(500) ) {
wait_one(false);
// JobServer is present, need to request a token
assert(num_jobs == 0 || num_jobs > 1);
// - Wait 100ms, short enough to not slow things down too much but not too short to be busy waiting all the time
if( !jobserver->take_one(100) ) {
// Check if one of our own tasks is complete.
if( !wait_one(false) ) {
dump_state();
failed = true;
break;
}
continue;
}
n_tokens += 1;
Expand All @@ -136,7 +158,7 @@ bool JobList::run_all()
auto job = ::std::move(this->runnable_jobs.front());
this->runnable_jobs.pop_front();
auto rjob = job->start();
if( this->num_jobs == 0 )
if( dry_run )
{
this->completed_jobs.insert(job->name());
::std::cout << "> " << rjob.exe_name;
Expand All @@ -151,7 +173,7 @@ bool JobList::run_all()
os_support::set_console_colour(::std::cout, os_support::TerminalColour::Green);
::std::cout << job->verb() << " " << job->name();
os_support::set_console_colour(::std::cout, os_support::TerminalColour::Default);
auto num_complete = total_job_count - this->running_jobs.size() - this->running_jobs.size() - this->waiting_jobs.size();
auto num_complete = total_job_count - this->runnable_jobs.size() - this->running_jobs.size() - this->waiting_jobs.size();
::std::cout << " ("
<< std::fixed << std::setprecision(1) << (100 * static_cast<double>(num_complete) / total_job_count) << "% "
<< this->running_jobs.size()+1 << "r," << this->runnable_jobs.size() << "w," << this->waiting_jobs.size() << "b/" << total_job_count << "t)";
Expand Down Expand Up @@ -199,7 +221,7 @@ bool JobList::wait_one(bool block)
handles.resize(MAXIMUM_WAIT_OBJECTS);
::std::cerr << "WARNING! Win32's WaitForMultipleObjects only supports up to " << MAXIMUM_WAIT_OBJECTS << " handles, but have " << handles.size() << ::std::endl;
}
auto wait_rv = WaitForMultipleObjects(static_cast<DWORD>(handles.size()), handles.data(), FALSE, block ? 0 : INFINITE);
auto wait_rv = WaitForMultipleObjects(static_cast<DWORD>(handles.size()), handles.data(), FALSE, block ? INFINITE : 0);
if( !(WAIT_OBJECT_0 <= wait_rv && wait_rv < WAIT_OBJECT_0 + handles.size()) ) {
if( !block )
return true;
Expand Down
7 changes: 2 additions & 5 deletions tools/minicargo/jobs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,14 @@ class JobList
RunningJob& operator=(RunningJob&& ) = default;
};

size_t num_jobs;
::std::vector<job_t> waiting_jobs;
::std::deque<job_t> runnable_jobs;
::std::vector<RunningJob> running_jobs;
::std::unordered_set<std::string> completed_jobs;
public:
JobList(size_t num_jobs)
: num_jobs(num_jobs)
{}
JobList() {}
void add_job(::std::unique_ptr<Job> job);
bool run_all();
bool run_all(size_t num_jobs, bool dry_run);

private:
os_support::Process spawn(const RunnableJob& j);
Expand Down
8 changes: 5 additions & 3 deletions tools/minicargo/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ struct ProgramOptions
::std::vector<const char*> lib_search_dirs;

// Number of build jobs to run at a time
unsigned build_jobs = 1;
unsigned build_jobs = 0;
// Don't run build tasks, just print
bool dry_run = false;

// Pause for user input before quitting (useful for MSVC debugging)
bool pause_before_quit = false;
Expand Down Expand Up @@ -236,7 +238,7 @@ int main(int argc, const char* argv[])
Debug_SetPhase("Enumerate Build");
auto build_list = BuildList(m, build_opts);
Debug_SetPhase("Run Build");
if( !build_list.build(::std::move(build_opts), opts.build_jobs) )
if( !build_list.build(::std::move(build_opts), opts.build_jobs, opts.dry_run) )
{
::std::cerr << "BUILD FAILED" << ::std::endl;
if(opts.pause_before_quit) {
Expand Down Expand Up @@ -324,7 +326,7 @@ int ProgramOptions::parse(int argc, const char* argv[])
}
break;
case 'n':
this->build_jobs = 0;
this->dry_run = true;
break;
case 'g':
this->enable_debug = true;
Expand Down

0 comments on commit d4f6734

Please sign in to comment.