Skip to content

Commit

Permalink
i#6471 sched idle: Add idle time
Browse files Browse the repository at this point in the history
Adds a new STATUS_IDLE return code, and a corresponding
TRACE_MARKER_TYPE_CORE_IDLE record.

Changes the scheduler behavior to no longer return STATUS_EOF for an
output when the ready queue is empty: instead STATUS_IDLE is returned
until every single input is at EOF.  This results in a more realistic
schedule where other cores can pick up work later rather than
disappearing from the system.

Augments the schedule_stats tool to count idle replies and compute a %
cpu usage metric.  Adds a unit test for counting idles.

Augments the scheduler_launcher to also compute %cpu usage.

Updates all the scheduler tests for the new change.

Adding idle time due to blocking syscalls will be done separately.

Issue: #6471
  • Loading branch information
derekbruening committed Nov 22, 2023
1 parent 34fbc25 commit 71f7490
Show file tree
Hide file tree
Showing 15 changed files with 292 additions and 68 deletions.
33 changes: 31 additions & 2 deletions clients/drcachesim/analyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,17 @@ analyzer_t::create_wait_marker()
return record;
}

template <>
memref_t
analyzer_t::create_idle_marker()
{
memref_t record = {}; // Zero the other fields.
record.marker.type = TRACE_TYPE_MARKER;
record.marker.marker_type = TRACE_MARKER_TYPE_CORE_IDLE;
record.marker.tid = INVALID_THREAD_ID;
return record;
}

/******************************************************************************
* Specializations for analyzer_tmpl_t<record_reader_t>, aka record_analyzer_t.
*/
Expand Down Expand Up @@ -182,6 +193,17 @@ record_analyzer_t::create_wait_marker()
return record;
}

template <>
trace_entry_t
record_analyzer_t::create_idle_marker()
{
trace_entry_t record;
record.type = TRACE_TYPE_MARKER;
record.size = TRACE_MARKER_TYPE_CORE_IDLE;
record.addr = 0; // Marker value has no meaning so we zero it.
return record;
}

/********************************************************************
* Other analyzer_tmpl_t routines that do not need to be specialized.
*/
Expand Down Expand Up @@ -537,6 +559,11 @@ analyzer_tmpl_t<RecordType, ReaderType>::process_tasks(analyzer_worker_data_t *w
// We synthesize a record here. If we wanted this to count toward output
// stream ordinals we would need to add a scheduler API to inject it.
record = create_wait_marker();
} else if (status == sched_type_t::STATUS_IDLE) {
// We let tools know about idle time so they can analyze cpu usage.
// We synthesize a record here. If we wanted this to count toward output
// stream ordinals we would need to add a scheduler API to inject it.
record = create_idle_marker();
} else if (status != sched_type_t::STATUS_OK) {
if (status == sched_type_t::STATUS_REGION_INVALID) {
worker->error =
Expand Down Expand Up @@ -596,8 +623,10 @@ analyzer_tmpl_t<RecordType, ReaderType>::process_tasks(analyzer_worker_data_t *w
}
}
if (shard_type_ == SHARD_BY_CORE) {
if (!process_shard_exit(worker, worker->index))
return;
if (worker->shard_data.find(worker->index) != worker->shard_data.end()) {
if (!process_shard_exit(worker, worker->index))
return;
}
}
for (const auto &keyval : worker->shard_data) {
if (!keyval.second.exited) {
Expand Down
3 changes: 3 additions & 0 deletions clients/drcachesim/analyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ template <typename RecordType, typename ReaderType> class analyzer_tmpl_t {
RecordType
create_wait_marker();

RecordType
create_idle_marker();

// Invoked when the given interval finishes during serial or parallel
// analysis of the trace. For parallel analysis, the shard_id
// parameter should be set to the shard_id for which the interval
Expand Down
4 changes: 2 additions & 2 deletions clients/drcachesim/common/options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -809,15 +809,15 @@ droption_t<bool> op_core_sharded(
"software threads. This option instead schedules those threads onto virtual cores "
"and analyzes each core in parallel. Thus, each shard consists of pieces from "
"many software threads. How the scheduling is performed is controlled by a set "
"of options with the prefix \"sched_\" along with -num_cores.");
"of options with the prefix \"sched_\" along with -cores.");

droption_t<bool> op_core_serial(
DROPTION_SCOPE_ALL, "core_serial", false, "Analyze per-core in serial.",
"In this mode, scheduling is performed just like for -core_sharded. "
"However, the resulting schedule is acted upon by a single analysis thread"
"which walks the N cores in lockstep in round robin fashion. "
"How the scheduling is performed is controlled by a set "
"of options with the prefix \"sched_\" along with -num_cores.");
"of options with the prefix \"sched_\" along with -cores.");

droption_t<int64_t>
op_sched_quantum(DROPTION_SCOPE_ALL, "sched_quantum", 1 * 1000 * 1000,
Expand Down
11 changes: 11 additions & 0 deletions clients/drcachesim/common/trace_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,17 @@ typedef enum {
*/
TRACE_MARKER_TYPE_CORE_WAIT,

/**
* This marker is used for core-sharded analyses to indicate that the current
* core has no available inputs to run (all inputs are on other cores or are
* blocked waiting for kernel resources). A new marker is emitted each
* time the tool analysis framework requests a new record from the scheduler and
* is given a wait status. There are no units of time here but each repetition
* is roughly the time where a regular record could have been read and passed
* along.
*/
TRACE_MARKER_TYPE_CORE_IDLE,

// ...
// These values are reserved for future built-in marker types.
// ...
Expand Down
3 changes: 2 additions & 1 deletion clients/drcachesim/reader/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ class reader_t : public std::iterator<std::input_iterator_tag, memref_t>,
is_record_synthetic() const override
{
if (cur_ref_.marker.type == TRACE_TYPE_MARKER &&
cur_ref_.marker.marker_type == TRACE_MARKER_TYPE_CORE_WAIT) {
(cur_ref_.marker.marker_type == TRACE_MARKER_TYPE_CORE_WAIT ||
cur_ref_.marker.marker_type == TRACE_MARKER_TYPE_CORE_IDLE)) {
// These are synthetic records not part of the input and not
// counting toward ordinals.
return true;
Expand Down
61 changes: 46 additions & 15 deletions clients/drcachesim/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,8 @@ scheduler_tmpl_t<RecordType, ReaderType>::init(
}
}
}
VPRINT(this, 1, "%zu inputs\n", inputs_.size());
live_input_count_.store(inputs_.size(), std::memory_order_release);
return set_initial_schedule(workload2inputs);
}

Expand Down Expand Up @@ -1313,7 +1315,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::advance_region_of_interest(
input.cur_region);
if (input.cur_region >= static_cast<int>(input.regions_of_interest.size())) {
if (input.at_eof)
return sched_type_t::STATUS_EOF;
return eof_or_idle(output);
else {
// We let the user know we're done.
if (options_.schedule_record_ostream != nullptr) {
Expand All @@ -1329,7 +1331,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::advance_region_of_interest(
return status;
}
input.queue.push_back(create_thread_exit(input.tid));
input.at_eof = true;
mark_input_eof(input);
return sched_type_t::STATUS_SKIPPED;
}
}
Expand Down Expand Up @@ -1408,7 +1410,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::skip_instructions(output_ordinal_t out
if (*input.reader == *input.reader_end) {
// Raise error because the input region is out of bounds.
VPRINT(this, 2, "skip_instructions: input=%d skip out of bounds\n", input.index);
input.at_eof = true;
mark_input_eof(input);
return sched_type_t::STATUS_REGION_INVALID;
}
input.in_cur_region = true;
Expand Down Expand Up @@ -1645,7 +1647,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input_as_previously(
{
if (outputs_[output].record_index + 1 >=
static_cast<int>(outputs_[output].record.size()))
return sched_type_t::STATUS_EOF;
return eof_or_idle(output);
const schedule_record_t &segment =
outputs_[output].record[outputs_[output].record_index + 1];
index = segment.key.input;
Expand Down Expand Up @@ -1719,7 +1721,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input_as_previously(
// queued candidate record, if any.
clear_input_queue(inputs_[index]);
inputs_[index].queue.push_back(create_thread_exit(inputs_[index].tid));
inputs_[index].at_eof = true;
mark_input_eof(inputs_[index]);
VPRINT(this, 2, "early end for input %d\n", index);
// We're done with this entry but we need the queued record to be read,
// so we do not move past the entry.
Expand Down Expand Up @@ -1773,7 +1775,11 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
const schedule_record_t &segment =
outputs_[output].record[outputs_[output].record_index];
int input = segment.key.input;
VPRINT(this, res == sched_type_t::STATUS_WAIT ? 3 : 2,
VPRINT(this,
(res == sched_type_t::STATUS_IDLE ||
res == sched_type_t::STATUS_WAIT)
? 3
: 2,
"next_record[%d]: replay segment in=%d (@%" PRId64
") type=%d start=%" PRId64 " end=%" PRId64 "\n",
output, input,
Expand Down Expand Up @@ -1819,10 +1825,10 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
// We found a direct switch target above.
} else if (ready_queue_empty()) {
if (prev_index == INVALID_INPUT_ORDINAL)
return sched_type_t::STATUS_EOF;
return eof_or_idle(output);
std::lock_guard<std::mutex> lock(*inputs_[prev_index].lock);
if (inputs_[prev_index].at_eof)
return sched_type_t::STATUS_EOF;
return eof_or_idle(output);
else
index = prev_index; // Go back to prior.
} else {
Expand All @@ -1836,7 +1842,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
}
input_info_t *queue_next = pop_from_ready_queue(output);
if (queue_next == nullptr)
return sched_type_t::STATUS_EOF;
return eof_or_idle(output);
index = queue_next->index;
}
} else if (options_.deps == DEPENDENCY_TIMESTAMPS) {
Expand All @@ -1850,7 +1856,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
}
}
if (index < 0)
return sched_type_t::STATUS_EOF;
return eof_or_idle(output);
VPRINT(this, 2,
"next_record[%d]: advancing to timestamp %" PRIu64
" == input #%d\n",
Expand Down Expand Up @@ -1883,14 +1889,15 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
std::lock_guard<std::mutex> lock(*inputs_[index].lock);
if (inputs_[index].at_eof ||
*inputs_[index].reader == *inputs_[index].reader_end) {
VPRINT(this, 2, "next_record[%d]: local index %d == input #%d at eof\n",
output, outputs_[output].input_indices_index, index);
VPRINT(this, 2, "next_record[%d]: input #%d at eof\n", output, index);
if (options_.schedule_record_ostream != nullptr &&
prev_index != INVALID_INPUT_ORDINAL)
close_schedule_segment(output, inputs_[prev_index]);
inputs_[index].at_eof = true;
if (!inputs_[index].at_eof)
mark_input_eof(inputs_[index]);
index = INVALID_INPUT_ORDINAL;
// Loop and pick next thread.
prev_index = INVALID_INPUT_ORDINAL;
continue;
}
break;
Expand All @@ -1911,7 +1918,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
// check for quantum end.
outputs_[output].cur_time = cur_time; // Invalid values are checked below.
if (!outputs_[output].active)
return sched_type_t::STATUS_WAIT;
return sched_type_t::STATUS_IDLE;
if (outputs_[output].waiting) {
VPRINT(this, 5, "next_record[%d]: need new input (cur=waiting)\n", output);
sched_type_t::stream_status_t res = pick_next_input(output, true);
Expand All @@ -1922,7 +1929,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
if (outputs_[output].cur_input < 0) {
// This happens with more outputs than inputs. For non-empty outputs we
// require cur_input to be set to >=0 during init().
return sched_type_t::STATUS_EOF;
return eof_or_idle(output);
}
input = &inputs_[outputs_[output].cur_input];
auto lock = std::unique_lock<std::mutex>(*input->lock);
Expand Down Expand Up @@ -1970,6 +1977,8 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
input->needs_advance = true;
}
if (input->at_eof || *input->reader == *input->reader_end) {
if (!input->at_eof)
mark_input_eof(*input);
lock.unlock();
VPRINT(this, 5, "next_record[%d]: need new input (cur=%d eof)\n", output,
input->index);
Expand Down Expand Up @@ -1998,6 +2007,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
if (outputs_[output].record_index >=
static_cast<int>(outputs_[output].record.size())) {
// We're on the last record.
VPRINT(this, 4, "next_record[%d]: on last record\n", output);
} else if (outputs_[output].record[outputs_[output].record_index].type ==
schedule_record_t::SKIP) {
VPRINT(this, 5, "next_record[%d]: need new input after skip\n", output);
Expand Down Expand Up @@ -2257,6 +2267,27 @@ scheduler_tmpl_t<RecordType, ReaderType>::stop_speculation(output_ordinal_t outp
return sched_type_t::STATUS_OK;
}

template <typename RecordType, typename ReaderType>
void
scheduler_tmpl_t<RecordType, ReaderType>::mark_input_eof(input_info_t &input)
{
input.at_eof = true;
assert(live_input_count_.load(std::memory_order_acquire) > 0);
live_input_count_.fetch_add(-1, std::memory_order_release);
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_tmpl_t<RecordType, ReaderType>::eof_or_idle(output_ordinal_t output)
{
if (live_input_count_.load(std::memory_order_acquire) == 0) {
return sched_type_t::STATUS_EOF;
} else {
outputs_[output].waiting = true;
return sched_type_t::STATUS_IDLE;
}
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_tmpl_t<RecordType, ReaderType>::set_output_active(output_ordinal_t output,
Expand Down
27 changes: 24 additions & 3 deletions clients/drcachesim/scheduler/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include <stddef.h>
#include <stdint.h>

#include <atomic>
#include <deque>
#include <limits>
#include <memory>
Expand Down Expand Up @@ -112,14 +113,25 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
* (#DEPENDENCY_TIMESTAMPS) to avoid one stream getting ahead of another. For
* replaying a schedule as it was traced with #MAP_TO_RECORDED_OUTPUT this can
* indicate an idle period on a core where the traced workload was not currently
* scheduled.
* scheduled, but generally #STATUS_WAIT should be treated as artificial.
* Simulators are suggested to not advance simulated time for #STATUS_WAIT while
* they should advance time for #STATUS_IDLE.
*/
STATUS_WAIT,
STATUS_INVALID, /**< Error condition. */
STATUS_REGION_INVALID, /**< Input region is out of bounds. */
STATUS_NOT_IMPLEMENTED, /**< Feature not implemented. */
STATUS_SKIPPED, /**< Used for internal scheduler purposes. */
STATUS_RECORD_FAILED, /**< Failed to record schedule for future replay. */
/**
* This code indicates that all inputs are blocked waiting for kernel resources
* (such as i/o). This is similar to #STATUS_WAIT, but #STATUS_WAIT indicates an
* artificial pause due to imposing the original ordering while #STATUS_IDLE
* indicates actual idle time in the application. Simulators are suggested
* to not advance simulated time for #STATUS_WAIT while they should advance
* time for #STATUS_IDLE.
*/
STATUS_IDLE,
};

/** Identifies an input stream by its index. */
Expand Down Expand Up @@ -629,7 +641,7 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
/**
* Disables or re-enables this output stream. If "active" is false, this
* stream becomes inactive and its currently assigned input is moved to the
* ready queue to be scheduled on other outputs. The #STATUS_WAIT code is
* ready queue to be scheduled on other outputs. The #STATUS_IDLE code is
* returned to next_record() for inactive streams. If "active" is true,
* this stream becomes active again.
* This is only supported for #MAP_TO_ANY_OUTPUT.
Expand Down Expand Up @@ -1076,7 +1088,7 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
// sched_lock_.
std::vector<schedule_record_t> record;
int record_index = 0;
bool waiting = false;
bool waiting = false; // Waiting or idling.
bool active = true;
// Used for time-based quanta.
uint64_t cur_time = 0;
Expand Down Expand Up @@ -1259,6 +1271,13 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
stream_status_t
set_output_active(output_ordinal_t output, bool active);

// Caller must hold the input's lock.
void
mark_input_eof(input_info_t &input);

stream_status_t
eof_or_idle(output_ordinal_t output);

///////////////////////////////////////////////////////////////////////////
// Support for ready queues for who to schedule next:

Expand Down Expand Up @@ -1325,6 +1344,8 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
flexible_queue_t<input_info_t *, InputTimestampComparator> ready_priority_;
// Global ready queue counter used to provide FIFO for same-priority inputs.
uint64_t ready_counter_ = 0;
// Count of inputs not yet at eof.
std::atomic<int> live_input_count_;
// Map from workload,tid pair to input.
struct workload_tid_t {
workload_tid_t(int wl, memref_tid_t tid)
Expand Down
Loading

0 comments on commit 71f7490

Please sign in to comment.