Skip to content

Commit

Permalink
i#6471 idle: Add better idle time modeling
Browse files Browse the repository at this point in the history
Changes instruction quantum idle handling to use wall-clock time,
instead of a counter decremented in queue pops.  The counter was
skewed unfairly by the rate of queue queries.  This results in all
quanta using a unified time-based approach for blocked time.

Changes block_time_scale to remove the division by the latency
threshold.  Now the scale is directly multiplied by the latency to
result in the blocked time units.  The default scale is set to 1000
which matches the wall-clock time to process an instruction by the
schedule_stats tool (about 1 instruction per microsecond) and which
should be a rough match for many simulators passing one nanosecond
cycle per instruction as the time unit.

Adds a new scheduler option block_time_max and scheduler option
-sched_block_max which caps the blocked time for a syscall (default 25
seconds) to avoid outliers being scaled to extreme amounts of time.

Adds a heartbeat of queue lookups, currently only in debug build, to
help understand behavior over time.

Sets the input ordinal to invalid while idle.  This also causes
schedule_stats to consider a transition from idle to a valid input to
be a context switch, which matches how the Linux kernel counts
switches.

Augments schedule_stats with a wall-clock measure of cpu and idle time
for a much fairer %cpu metric than the previous one based purely on
record counts, as schedule_stats processes an instruction much more
quickly than an idle record.  Adds two %cpu metrics: one that does not
include idle time past the final instruction (for skewed finishes
across cores) and one that includes idle time until all cpus are done.

Increases the default -schedule_stats_print_every to 500K to avoid
extremely long strings for larger workloads.

Updates the scheduler unit tests for the timing changes, changing the
ones testing idle time to use a deterministic mock time quantum to
avoid wall-clock time flakiness.

Tested on large traces on dozens of cores with known idle
characteristics where by tweaking the parameters I was able to get
reasonably representative idle times.

Issue: #6471
  • Loading branch information
derekbruening committed Dec 13, 2023
1 parent c40095e commit e6507a1
Show file tree
Hide file tree
Showing 10 changed files with 313 additions and 135 deletions.
1 change: 1 addition & 0 deletions clients/drcachesim/analyzer_multi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ analyzer_multi_t::init_dynamic_schedule()
sched_ops.syscall_switch_threshold = op_sched_syscall_switch_us.get_value();
sched_ops.blocking_switch_threshold = op_sched_blocking_switch_us.get_value();
sched_ops.block_time_scale = op_sched_block_scale.get_value();
sched_ops.block_time_max = op_sched_block_max_us.get_value();
#ifdef HAS_ZIP
if (!op_record_file.get_value().empty()) {
record_schedule_zip_.reset(new zipfile_ostream_t(op_record_file.get_value()));
Expand Down
21 changes: 14 additions & 7 deletions clients/drcachesim/common/options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -851,11 +851,18 @@ droption_t<uint64_t> op_sched_blocking_switch_us(
"maybe-blocking to incur a context switch. Applies to -core_sharded and "
"-core_serial. ");

droption_t<double>
op_sched_block_scale(DROPTION_SCOPE_ALL, "sched_block_scale", 1.,
"Input block time scale factor",
"A higher value here results in blocking syscalls "
"keeping inputs unscheduled for longer.");
droption_t<double> op_sched_block_scale(
DROPTION_SCOPE_ALL, "sched_block_scale", 1000., "Input block time scale factor",
"The scale applied to the microsecond latency of blocking system calls. A higher "
"value here results in blocking syscalls keeping inputs unscheduled for longer. "
"This should roughly equal the slowdown of instruction record processing versus the "
"original (untraced) application execution.");

droption_t<uint64_t> op_sched_block_max_us(DROPTION_SCOPE_ALL, "sched_block_max_us",
25000000,
"Maximum blocked input time, in microseconds",
"The maximum blocked time, after scaling with "
"-sched_block_scale.");
#ifdef HAS_ZIP
droption_t<std::string> op_record_file(DROPTION_SCOPE_FRONTEND, "record_file", "",
"Path for storing record of schedule",
Expand All @@ -875,8 +882,8 @@ droption_t<std::string>

// Schedule_stats options.
droption_t<uint64_t>
op_schedule_stats_print_every(DROPTION_SCOPE_ALL, "schedule_stats_print_every", 5000,
"A letter is printed every N instrs",
op_schedule_stats_print_every(DROPTION_SCOPE_ALL, "schedule_stats_print_every",
500000, "A letter is printed every N instrs",
"A letter is printed every N instrs or N waits");

droption_t<std::string> op_syscall_template_file(
Expand Down
1 change: 1 addition & 0 deletions clients/drcachesim/common/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ extern dynamorio::droption::droption_t<bool> op_sched_order_time;
extern dynamorio::droption::droption_t<uint64_t> op_sched_syscall_switch_us;
extern dynamorio::droption::droption_t<uint64_t> op_sched_blocking_switch_us;
extern dynamorio::droption::droption_t<double> op_sched_block_scale;
extern dynamorio::droption::droption_t<uint64_t> op_sched_block_max_us;
#ifdef HAS_ZIP
extern dynamorio::droption::droption_t<std::string> op_record_file;
extern dynamorio::droption::droption_t<std::string> op_replay_file;
Expand Down
136 changes: 74 additions & 62 deletions clients/drcachesim/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1576,10 +1576,12 @@ scheduler_tmpl_t<RecordType, ReaderType>::add_to_ready_queue(input_info_t *input
VPRINT(
this, 4,
"add_to_ready_queue (pre-size %zu): input %d priority %d timestamp delta %" PRIu64
" block factor %3.2f time %" PRIu64 "\n",
" block time %" PRIu64 " start time %" PRIu64 "\n",
ready_priority_.size(), input->index, input->priority,
input->reader->get_last_timestamp() - input->base_timestamp,
input->block_time_factor, input->blocked_start_time);
input->reader->get_last_timestamp() - input->base_timestamp, input->blocked_time,
input->blocked_start_time);
if (input->blocked_time > 0)
++num_blocked_;
input->queue_counter = ++ready_counter_;
ready_priority_.push(input);
}
Expand All @@ -1593,40 +1595,27 @@ scheduler_tmpl_t<RecordType, ReaderType>::pop_from_ready_queue(
std::set<input_info_t *> blocked;
input_info_t *res = nullptr;
sched_type_t::stream_status_t status = STATUS_OK;
uint64_t cur_time = (num_blocked_ > 0) ? get_output_time(for_output) : 0;
while (!ready_priority_.empty()) {
res = ready_priority_.top();
ready_priority_.pop();
if (res->binding.empty() || res->binding.find(for_output) != res->binding.end()) {
// For blocked inputs, as we don't have interrupts or other regular
// control points we only check for being unblocked when an input
// would be chosen to run. We thus keep blocked inputs in the ready queue.
if (options_.quantum_unit == QUANTUM_TIME) {
if (res->block_time_factor > 0 &&
outputs_[for_output].cur_time - res->blocked_start_time <
options_.block_time_scale * res->block_time_factor) {
VPRINT(
this, 4,
"pop queue: %d still blocked for %4.1f (%3.2f * %3.2f - (%" PRIu64
" - %" PRIu64 ")\n",
res->index,
options_.block_time_scale * res->block_time_factor -
(outputs_[for_output].cur_time - res->blocked_start_time),
options_.block_time_scale, res->block_time_factor,
outputs_[for_output].cur_time, res->blocked_start_time);
// We keep searching for a suitable input.
blocked.insert(res);
} else
break;
} else {
if (res->block_time_factor > 0) {
VPRINT(this, 4, "pop queue: %d still blocked for %4.1f\n", res->index,
res->block_time_factor);
--res->block_time_factor;
// We keep searching for a suitable input.
blocked.insert(res);
} else
break;
if (res->blocked_time > 0) {
assert(cur_time > 0);
--num_blocked_;
}
if (res->blocked_time > 0 &&
cur_time - res->blocked_start_time < res->blocked_time) {
VPRINT(this, 4, "pop queue: %d still blocked for %" PRIu64 "\n",
res->index,
res->blocked_time - (cur_time - res->blocked_start_time));
// We keep searching for a suitable input.
blocked.insert(res);
} else
break;
} else {
// We keep searching for a suitable input.
skipped.insert(res);
Expand All @@ -1645,13 +1634,21 @@ scheduler_tmpl_t<RecordType, ReaderType>::pop_from_ready_queue(
// Re-add the blocked ones to the back.
for (input_info_t *save : blocked)
add_to_ready_queue(save);
VDO(this, 1, {
static int heartbeat;
if (++heartbeat % 500 == 0) {
VPRINT(this, 1, "heartbeat[%d] %zd in queue; %d blocked => %d %d\n",
for_output, ready_priority_.size(), num_blocked_,
res == nullptr ? -1 : res->index, status);
}
});
if (res != nullptr) {
VPRINT(this, 4,
"pop_from_ready_queue[%d] (post-size %zu): input %d priority %d timestamp "
"delta %" PRIu64 "\n",
for_output, ready_priority_.size(), res->index, res->priority,
res->reader->get_last_timestamp() - res->base_timestamp);
res->block_time_factor = 0.;
res->blocked_time = 0;
}
new_input = res;
return status;
Expand All @@ -1660,15 +1657,15 @@ scheduler_tmpl_t<RecordType, ReaderType>::pop_from_ready_queue(
template <typename RecordType, typename ReaderType>
bool
scheduler_tmpl_t<RecordType, ReaderType>::syscall_incurs_switch(input_info_t *input,
double &block_time_factor)
uint64_t &blocked_time)
{
uint64_t post_time = input->reader->get_last_timestamp();
assert(input->processing_syscall || input->processing_maybe_blocking_syscall);
if (input->reader->get_version() < TRACE_ENTRY_VERSION_FREQUENT_TIMESTAMPS) {
// This is a legacy trace that does not have timestamps bracketing syscalls.
// We switch on every maybe-blocking syscall in this case and have a simplified
// blocking model.
block_time_factor = 1.;
blocked_time = options_.blocking_switch_threshold;
return input->processing_maybe_blocking_syscall;
}
assert(input->pre_syscall_timestamp > 0);
Expand All @@ -1677,13 +1674,22 @@ scheduler_tmpl_t<RecordType, ReaderType>::syscall_incurs_switch(input_info_t *in
uint64_t threshold = input->processing_maybe_blocking_syscall
? options_.blocking_switch_threshold
: options_.syscall_switch_threshold;
block_time_factor = static_cast<double>(latency) / threshold;
if (options_.quantum_unit == QUANTUM_INSTRUCTIONS)
block_time_factor *= options_.block_time_scale;
VPRINT(this, 3, "input %d %ssyscall latency %" PRIu64 " => factor %4.1f\n",
blocked_time =
static_cast<uint64_t>(static_cast<double>(latency) * options_.block_time_scale);
if (blocked_time > options_.block_time_max) {
// We have a max to avoid outlier latencies that are already a second or
// more from scaling up to tens of minutes. We assume a cap is representative
// as the outliers likely were not part of key dependence chains. Without a
// cap the other threads all finish and the simulation waits for tens of
// minutes further for a couple of outliers.
blocked_time = options_.block_time_max;
}
VPRINT(this, 3,
"input %d %ssyscall latency %" PRIu64 " * scale %5.1f => blocked time %" PRIu64
"\n",
input->index,
input->processing_maybe_blocking_syscall ? "maybe-blocking " : "", latency,
block_time_factor);
options_.block_time_scale, blocked_time);
return latency >= threshold;
}

Expand Down Expand Up @@ -1877,7 +1883,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input_as_previously(
template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t output,
double block_time_factor)
uint64_t blocked_time)
{
sched_type_t::stream_status_t res = sched_type_t::STATUS_OK;
bool need_lock =
Expand Down Expand Up @@ -1917,14 +1923,13 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
if (res != sched_type_t::STATUS_OK)
return res;
} else if (options_.mapping == MAP_TO_ANY_OUTPUT) {
if (block_time_factor > 0. && prev_index != INVALID_INPUT_ORDINAL) {
if (blocked_time > 0 && prev_index != INVALID_INPUT_ORDINAL) {
std::lock_guard<std::mutex> lock(*inputs_[prev_index].lock);
if (inputs_[prev_index].block_time_factor == 0.) {
VPRINT(this, 2, "next_record[%d]: block time factor %3.2f\n",
output, block_time_factor);
inputs_[prev_index].block_time_factor = block_time_factor;
inputs_[prev_index].blocked_start_time =
outputs_[output].cur_time;
if (inputs_[prev_index].blocked_time == 0) {
VPRINT(this, 2, "next_record[%d]: blocked time %" PRIu64 "\n",
output, blocked_time);
inputs_[prev_index].blocked_time = blocked_time;
inputs_[prev_index].blocked_start_time = get_output_time(output);
} else {
// If we looped we could have the same prev_index.
assert(iters > 1);
Expand All @@ -1942,13 +1947,13 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
ready_priority_.erase(target);
index = target->index;
// Erase any remaining wait time for the target.
if (target->block_time_factor != 0.) {
if (target->blocked_time > 0) {
VPRINT(this, 3,
"next_record[%d]: direct switch erasing block time "
"factor "
"%4.1f for input %d\n",
output, target->block_time_factor, target->index);
target->block_time_factor = 0.;
"next_record[%d]: direct switch erasing blocked time "
"for input %d\n",
output, target->index);
--num_blocked_;
target->blocked_time = 0;
}
} else {
// TODO i#5843: If the target is running on another output, we
Expand All @@ -1965,13 +1970,14 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
}
if (index != INVALID_INPUT_ORDINAL) {
// We found a direct switch target above.
} else if (ready_queue_empty() && block_time_factor == 0.) {
} else if (ready_queue_empty() && blocked_time == 0) {
if (prev_index == INVALID_INPUT_ORDINAL)
return eof_or_idle(output);
std::lock_guard<std::mutex> lock(*inputs_[prev_index].lock);
if (inputs_[prev_index].at_eof)
auto lock = std::unique_lock<std::mutex>(*inputs_[prev_index].lock);
if (inputs_[prev_index].at_eof) {
lock.unlock();
return eof_or_idle(output);
else
} else
index = prev_index; // Go back to prior.
} else {
// Give up the input before we go to the queue so we can add
Expand All @@ -1997,8 +2003,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
return status;
}
if (queue_next == nullptr) {
assert(block_time_factor == 0. ||
prev_index == INVALID_INPUT_ORDINAL);
assert(blocked_time == 0 || prev_index == INVALID_INPUT_ORDINAL);
return eof_or_idle(output);
}
index = queue_next->index;
Expand Down Expand Up @@ -2070,6 +2075,12 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
// We do not enforce a globally increasing time to avoid the synchronization cost; we
// do return an error on a time smaller than an input's current start time when we
// check for quantum end.
if (cur_time == 0) {
// It's more efficient for QUANTUM_TIME to get the time here instead of
// in get_output_time(). This also makes the two more similarly behaved
// with respect to blocking system calls.
cur_time = get_time_micros();
}
outputs_[output].cur_time = cur_time; // Invalid values are checked below.
if (!outputs_[output].active)
return sched_type_t::STATUS_IDLE;
Expand Down Expand Up @@ -2170,7 +2181,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
VDO(this, 5, print_record(record););
bool need_new_input = false;
bool preempt = false;
double block_time_factor = 0.;
uint64_t blocked_time = 0;
uint64_t prev_time_in_quantum = 0;
if (options_.mapping == MAP_AS_PREVIOUSLY) {
assert(outputs_[output].record_index >= 0);
Expand Down Expand Up @@ -2235,7 +2246,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
input->switch_to_input = it->second;
}
} else if (record_type_is_instr(record)) {
if (syscall_incurs_switch(input, block_time_factor)) {
if (syscall_incurs_switch(input, blocked_time)) {
// Model as blocking and should switch to a different input.
need_new_input = true;
VPRINT(this, 3,
Expand Down Expand Up @@ -2323,8 +2334,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
VPRINT(this, 5, "next_record[%d]: queuing candidate record\n", output);
input->queue.push_back(record);
lock.unlock();
sched_type_t::stream_status_t res =
pick_next_input(output, block_time_factor);
sched_type_t::stream_status_t res = pick_next_input(output, blocked_time);
if (res != sched_type_t::STATUS_OK && res != sched_type_t::STATUS_WAIT &&
res != sched_type_t::STATUS_SKIPPED)
return res;
Expand Down Expand Up @@ -2385,7 +2395,8 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
}
break;
}
VPRINT(this, 4, "next_record[%d]: from %d: ", output, input->index);
VPRINT(this, 4, "next_record[%d]: from %d @%" PRId64 ": ", output, input->index,
cur_time);
VDO(this, 4, print_record(record););

outputs_[output].last_record = record;
Expand Down Expand Up @@ -2493,6 +2504,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::eof_or_idle(output_ordinal_t output)
assert(options_.mapping != MAP_AS_PREVIOUSLY || outputs_[output].at_eof);
return sched_type_t::STATUS_EOF;
} else {
set_cur_input(output, INVALID_INPUT_ORDINAL);
outputs_[output].waiting = true;
return sched_type_t::STATUS_IDLE;
}
Expand Down
Loading

0 comments on commit e6507a1

Please sign in to comment.