Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

i#6471 sched idle: Add idle time #6472

Merged
merged 5 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions clients/drcachesim/analyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,17 @@ analyzer_t::create_wait_marker()
return record;
}

template <>
memref_t
analyzer_t::create_idle_marker()
{
memref_t record = {}; // Zero the other fields.
record.marker.type = TRACE_TYPE_MARKER;
record.marker.marker_type = TRACE_MARKER_TYPE_CORE_IDLE;
record.marker.tid = INVALID_THREAD_ID;
return record;
}

/******************************************************************************
* Specializations for analyzer_tmpl_t<record_reader_t>, aka record_analyzer_t.
*/
Expand Down Expand Up @@ -182,6 +193,17 @@ record_analyzer_t::create_wait_marker()
return record;
}

template <>
trace_entry_t
record_analyzer_t::create_idle_marker()
{
trace_entry_t record;
record.type = TRACE_TYPE_MARKER;
record.size = TRACE_MARKER_TYPE_CORE_IDLE;
record.addr = 0; // Marker value has no meaning so we zero it.
return record;
}

/********************************************************************
* Other analyzer_tmpl_t routines that do not need to be specialized.
*/
Expand Down Expand Up @@ -537,6 +559,12 @@ analyzer_tmpl_t<RecordType, ReaderType>::process_tasks(analyzer_worker_data_t *w
// We synthesize a record here. If we wanted this to count toward output
// stream ordinals we would need to add a scheduler API to inject it.
record = create_wait_marker();
} else if (status == sched_type_t::STATUS_IDLE) {
assert(shard_type_ == SHARD_BY_CORE);
// We let tools know about idle time so they can analyze cpu usage.
// We synthesize a record here. If we wanted this to count toward output
// stream ordinals we would need to add a scheduler API to inject it.
record = create_idle_marker();
} else if (status != sched_type_t::STATUS_OK) {
if (status == sched_type_t::STATUS_REGION_INVALID) {
worker->error =
Expand Down Expand Up @@ -596,8 +624,10 @@ analyzer_tmpl_t<RecordType, ReaderType>::process_tasks(analyzer_worker_data_t *w
}
}
if (shard_type_ == SHARD_BY_CORE) {
if (!process_shard_exit(worker, worker->index))
return;
if (worker->shard_data.find(worker->index) != worker->shard_data.end()) {
if (!process_shard_exit(worker, worker->index))
return;
}
}
for (const auto &keyval : worker->shard_data) {
if (!keyval.second.exited) {
Expand Down
3 changes: 3 additions & 0 deletions clients/drcachesim/analyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ template <typename RecordType, typename ReaderType> class analyzer_tmpl_t {
RecordType
create_wait_marker();

RecordType
create_idle_marker();

// Invoked when the given interval finishes during serial or parallel
// analysis of the trace. For parallel analysis, the shard_id
// parameter should be set to the shard_id for which the interval
Expand Down
4 changes: 2 additions & 2 deletions clients/drcachesim/common/options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -809,15 +809,15 @@ droption_t<bool> op_core_sharded(
"software threads. This option instead schedules those threads onto virtual cores "
"and analyzes each core in parallel. Thus, each shard consists of pieces from "
"many software threads. How the scheduling is performed is controlled by a set "
"of options with the prefix \"sched_\" along with -num_cores.");
"of options with the prefix \"sched_\" along with -cores.");

droption_t<bool> op_core_serial(
DROPTION_SCOPE_ALL, "core_serial", false, "Analyze per-core in serial.",
"In this mode, scheduling is performed just like for -core_sharded. "
"However, the resulting schedule is acted upon by a single analysis thread"
"which walks the N cores in lockstep in round robin fashion. "
"How the scheduling is performed is controlled by a set "
"of options with the prefix \"sched_\" along with -num_cores.");
"of options with the prefix \"sched_\" along with -cores.");

droption_t<int64_t>
op_sched_quantum(DROPTION_SCOPE_ALL, "sched_quantum", 1 * 1000 * 1000,
Expand Down
13 changes: 13 additions & 0 deletions clients/drcachesim/common/trace_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,19 @@ typedef enum {
*/
TRACE_MARKER_TYPE_CORE_WAIT,

/**
* This marker is used for core-sharded analyses to indicate that the current
* core has no available inputs to run (all inputs are on other cores or are
* blocked waiting for kernel resources). A new marker is emitted each
* time the tool analysis framework requests a new record from the scheduler and
* is given an idle status. There are no units of time here but each repetition
* is roughly the time where a regular record could have been read and passed
derekbruening marked this conversation as resolved.
Show resolved Hide resolved
* along. This idle marker indicates that a core actually had no work to do,
* as opposed to #TRACE_MARKER_TYPE_CORE_WAIT which is an artifact of an
* imposed re-created schedule.
*/
TRACE_MARKER_TYPE_CORE_IDLE,

// ...
// These values are reserved for future built-in marker types.
// ...
Expand Down
3 changes: 2 additions & 1 deletion clients/drcachesim/reader/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ class reader_t : public std::iterator<std::input_iterator_tag, memref_t>,
is_record_synthetic() const override
{
if (cur_ref_.marker.type == TRACE_TYPE_MARKER &&
cur_ref_.marker.marker_type == TRACE_MARKER_TYPE_CORE_WAIT) {
(cur_ref_.marker.marker_type == TRACE_MARKER_TYPE_CORE_WAIT ||
cur_ref_.marker.marker_type == TRACE_MARKER_TYPE_CORE_IDLE)) {
// These are synthetic records not part of the input and not
// counting toward ordinals.
return true;
Expand Down
67 changes: 52 additions & 15 deletions clients/drcachesim/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,8 @@ scheduler_tmpl_t<RecordType, ReaderType>::init(
}
}
}
VPRINT(this, 1, "%zu inputs\n", inputs_.size());
live_input_count_.store(static_cast<int>(inputs_.size()), std::memory_order_release);
return set_initial_schedule(workload2inputs);
}

Expand Down Expand Up @@ -1313,7 +1315,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::advance_region_of_interest(
input.cur_region);
if (input.cur_region >= static_cast<int>(input.regions_of_interest.size())) {
if (input.at_eof)
return sched_type_t::STATUS_EOF;
return eof_or_idle(output);
else {
// We let the user know we're done.
if (options_.schedule_record_ostream != nullptr) {
Expand All @@ -1329,7 +1331,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::advance_region_of_interest(
return status;
}
input.queue.push_back(create_thread_exit(input.tid));
input.at_eof = true;
mark_input_eof(input);
return sched_type_t::STATUS_SKIPPED;
}
}
Expand Down Expand Up @@ -1408,7 +1410,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::skip_instructions(output_ordinal_t out
if (*input.reader == *input.reader_end) {
// Raise error because the input region is out of bounds.
VPRINT(this, 2, "skip_instructions: input=%d skip out of bounds\n", input.index);
input.at_eof = true;
mark_input_eof(input);
return sched_type_t::STATUS_REGION_INVALID;
}
input.in_cur_region = true;
Expand Down Expand Up @@ -1645,7 +1647,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input_as_previously(
{
if (outputs_[output].record_index + 1 >=
static_cast<int>(outputs_[output].record.size()))
return sched_type_t::STATUS_EOF;
return eof_or_idle(output);
const schedule_record_t &segment =
outputs_[output].record[outputs_[output].record_index + 1];
index = segment.key.input;
Expand Down Expand Up @@ -1681,6 +1683,11 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input_as_previously(
// XXX i#5843: We may want to provide a kernel-mediated wait
// feature so a multi-threaded simulator doesn't have to do a
// spinning poll loop.
// XXX i#5843: For replaying a schedule as it was traced with
// MAP_TO_RECORDED_OUTPUT there may have been true idle periods during
// tracing where some other process than the traced workload was
// scheduled on a core. If we could identify those, we should return
// STATUS_IDLE rather than STATUS_WAIT.
VPRINT(this, 3, "next_record[%d]: waiting for input %d instr #%" PRId64 "\n",
output, index, segment.start_instruction);
// Give up this input and go into a wait state.
Expand Down Expand Up @@ -1719,7 +1726,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input_as_previously(
// queued candidate record, if any.
clear_input_queue(inputs_[index]);
inputs_[index].queue.push_back(create_thread_exit(inputs_[index].tid));
inputs_[index].at_eof = true;
mark_input_eof(inputs_[index]);
VPRINT(this, 2, "early end for input %d\n", index);
// We're done with this entry but we need the queued record to be read,
// so we do not move past the entry.
Expand Down Expand Up @@ -1773,7 +1780,11 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
const schedule_record_t &segment =
outputs_[output].record[outputs_[output].record_index];
int input = segment.key.input;
VPRINT(this, res == sched_type_t::STATUS_WAIT ? 3 : 2,
VPRINT(this,
(res == sched_type_t::STATUS_IDLE ||
res == sched_type_t::STATUS_WAIT)
? 3
: 2,
"next_record[%d]: replay segment in=%d (@%" PRId64
") type=%d start=%" PRId64 " end=%" PRId64 "\n",
output, input,
Expand Down Expand Up @@ -1819,10 +1830,10 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
// We found a direct switch target above.
} else if (ready_queue_empty()) {
if (prev_index == INVALID_INPUT_ORDINAL)
return sched_type_t::STATUS_EOF;
return eof_or_idle(output);
std::lock_guard<std::mutex> lock(*inputs_[prev_index].lock);
if (inputs_[prev_index].at_eof)
return sched_type_t::STATUS_EOF;
return eof_or_idle(output);
else
index = prev_index; // Go back to prior.
} else {
Expand All @@ -1836,7 +1847,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
}
input_info_t *queue_next = pop_from_ready_queue(output);
if (queue_next == nullptr)
return sched_type_t::STATUS_EOF;
return eof_or_idle(output);
index = queue_next->index;
}
} else if (options_.deps == DEPENDENCY_TIMESTAMPS) {
Expand All @@ -1850,7 +1861,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
}
}
if (index < 0)
return sched_type_t::STATUS_EOF;
return eof_or_idle(output);
VPRINT(this, 2,
"next_record[%d]: advancing to timestamp %" PRIu64
" == input #%d\n",
Expand Down Expand Up @@ -1883,14 +1894,15 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
std::lock_guard<std::mutex> lock(*inputs_[index].lock);
if (inputs_[index].at_eof ||
*inputs_[index].reader == *inputs_[index].reader_end) {
VPRINT(this, 2, "next_record[%d]: local index %d == input #%d at eof\n",
output, outputs_[output].input_indices_index, index);
VPRINT(this, 2, "next_record[%d]: input #%d at eof\n", output, index);
if (options_.schedule_record_ostream != nullptr &&
prev_index != INVALID_INPUT_ORDINAL)
close_schedule_segment(output, inputs_[prev_index]);
inputs_[index].at_eof = true;
if (!inputs_[index].at_eof)
mark_input_eof(inputs_[index]);
index = INVALID_INPUT_ORDINAL;
// Loop and pick next thread.
prev_index = INVALID_INPUT_ORDINAL;
continue;
}
break;
Expand All @@ -1911,7 +1923,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
// check for quantum end.
outputs_[output].cur_time = cur_time; // Invalid values are checked below.
if (!outputs_[output].active)
return sched_type_t::STATUS_WAIT;
return sched_type_t::STATUS_IDLE;
if (outputs_[output].waiting) {
VPRINT(this, 5, "next_record[%d]: need new input (cur=waiting)\n", output);
sched_type_t::stream_status_t res = pick_next_input(output, true);
Expand All @@ -1922,7 +1934,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
if (outputs_[output].cur_input < 0) {
// This happens with more outputs than inputs. For non-empty outputs we
// require cur_input to be set to >=0 during init().
return sched_type_t::STATUS_EOF;
return eof_or_idle(output);
}
input = &inputs_[outputs_[output].cur_input];
auto lock = std::unique_lock<std::mutex>(*input->lock);
Expand Down Expand Up @@ -1970,6 +1982,8 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
input->needs_advance = true;
}
if (input->at_eof || *input->reader == *input->reader_end) {
if (!input->at_eof)
mark_input_eof(*input);
lock.unlock();
VPRINT(this, 5, "next_record[%d]: need new input (cur=%d eof)\n", output,
input->index);
Expand Down Expand Up @@ -1998,6 +2012,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
if (outputs_[output].record_index >=
static_cast<int>(outputs_[output].record.size())) {
// We're on the last record.
VPRINT(this, 4, "next_record[%d]: on last record\n", output);
} else if (outputs_[output].record[outputs_[output].record_index].type ==
schedule_record_t::SKIP) {
VPRINT(this, 5, "next_record[%d]: need new input after skip\n", output);
Expand Down Expand Up @@ -2257,6 +2272,28 @@ scheduler_tmpl_t<RecordType, ReaderType>::stop_speculation(output_ordinal_t outp
return sched_type_t::STATUS_OK;
}

template <typename RecordType, typename ReaderType>
void
scheduler_tmpl_t<RecordType, ReaderType>::mark_input_eof(input_info_t &input)
{
input.at_eof = true;
assert(live_input_count_.load(std::memory_order_acquire) > 0);
live_input_count_.fetch_add(-1, std::memory_order_release);
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_tmpl_t<RecordType, ReaderType>::eof_or_idle(output_ordinal_t output)
{
if (options_.mapping == MAP_TO_CONSISTENT_OUTPUT ||
live_input_count_.load(std::memory_order_acquire) == 0) {
return sched_type_t::STATUS_EOF;
} else {
outputs_[output].waiting = true;
return sched_type_t::STATUS_IDLE;
}
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_tmpl_t<RecordType, ReaderType>::set_output_active(output_ordinal_t output,
Expand Down
33 changes: 27 additions & 6 deletions clients/drcachesim/scheduler/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include <stddef.h>
#include <stdint.h>

#include <atomic>
#include <deque>
#include <limits>
#include <memory>
Expand Down Expand Up @@ -109,17 +110,28 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
* For dynamic scheduling with cross-stream dependencies, the scheduler may pause
* a stream if it gets ahead of another stream it should have a dependence on.
* This value is also used for schedules following the recorded timestamps
* (#DEPENDENCY_TIMESTAMPS) to avoid one stream getting ahead of another. For
* replaying a schedule as it was traced with #MAP_TO_RECORDED_OUTPUT this can
* indicate an idle period on a core where the traced workload was not currently
* scheduled.
* (#DEPENDENCY_TIMESTAMPS) to avoid one stream getting ahead of another.
* #STATUS_WAIT should be treated as artificial, an artifact of enforcing a
* recorded schedule on concurrent differently-timed output streams.
* Simulators are suggested to not advance simulated time for #STATUS_WAIT while
* they should advance time for #STATUS_IDLE as the latter indicates a true
* lack of work.
*/
STATUS_WAIT,
STATUS_INVALID, /**< Error condition. */
STATUS_REGION_INVALID, /**< Input region is out of bounds. */
STATUS_NOT_IMPLEMENTED, /**< Feature not implemented. */
STATUS_SKIPPED, /**< Used for internal scheduler purposes. */
STATUS_RECORD_FAILED, /**< Failed to record schedule for future replay. */
/**
* This code indicates that all inputs are blocked waiting for kernel resources
* (such as i/o). This is similar to #STATUS_WAIT, but #STATUS_WAIT indicates an
* artificial pause due to imposing the original ordering while #STATUS_IDLE
* indicates actual idle time in the application. Simulators are suggested
* to not advance simulated time for #STATUS_WAIT while they should advance
* time for #STATUS_IDLE.
*/
STATUS_IDLE,
};

/** Identifies an input stream by its index. */
Expand Down Expand Up @@ -629,7 +641,7 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
/**
* Disables or re-enables this output stream. If "active" is false, this
* stream becomes inactive and its currently assigned input is moved to the
* ready queue to be scheduled on other outputs. The #STATUS_WAIT code is
* ready queue to be scheduled on other outputs. The #STATUS_IDLE code is
* returned to next_record() for inactive streams. If "active" is true,
* this stream becomes active again.
* This is only supported for #MAP_TO_ANY_OUTPUT.
Expand Down Expand Up @@ -1076,7 +1088,7 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
// sched_lock_.
std::vector<schedule_record_t> record;
int record_index = 0;
bool waiting = false;
bool waiting = false; // Waiting or idling.
bool active = true;
// Used for time-based quanta.
uint64_t cur_time = 0;
Expand Down Expand Up @@ -1259,6 +1271,13 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
stream_status_t
set_output_active(output_ordinal_t output, bool active);

// Caller must hold the input's lock.
void
mark_input_eof(input_info_t &input);

stream_status_t
eof_or_idle(output_ordinal_t output);

///////////////////////////////////////////////////////////////////////////
// Support for ready queues for who to schedule next:

Expand Down Expand Up @@ -1325,6 +1344,8 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
flexible_queue_t<input_info_t *, InputTimestampComparator> ready_priority_;
// Global ready queue counter used to provide FIFO for same-priority inputs.
uint64_t ready_counter_ = 0;
// Count of inputs not yet at eof.
std::atomic<int> live_input_count_;
// Map from workload,tid pair to input.
struct workload_tid_t {
workload_tid_t(int wl, memref_tid_t tid)
Expand Down
Loading
Loading