From 110ca5e1af52901b5c2b17d6995f067e0ba28025 Mon Sep 17 00:00:00 2001 From: Derek Bruening Date: Wed, 6 Dec 2023 23:32:20 -0500 Subject: [PATCH] i#6471 sched idle: Add idle time from blocking syscalls (#6494) Adds a longer waiting period on blocking syscalls using either provided output time or for instruction-based quanta a count of queue selections before a blocked input is actually selected. Since the scheduler does not have timer interrupts or regular points of control and relies on its user calling it, idle inputs are kept on the ready queue and are checked for becoming unblocked when the ready queue is queried. The wait duration is set based on the "wait time factor" which is the syscall latency divided by the context switch threshold multipled by a user-provided "block_time_scale" option which can be used to scale up or down the durations. The wait duration is erased on a direct switch to an input. Adds a new replay record type to represent idle time on replay. Augments the unit tests to include blocking high-latency syscalls to test the new feature in various sub-tests. Issue: #6471 --- clients/drcachesim/analyzer_multi.cpp | 1 + clients/drcachesim/common/options.cpp | 5 + clients/drcachesim/common/options.h | 1 + clients/drcachesim/scheduler/scheduler.cpp | 335 +++++++++++++----- clients/drcachesim/scheduler/scheduler.h | 54 ++- .../drcachesim/tests/scheduler_launcher.cpp | 6 + .../drcachesim/tests/scheduler_unit_tests.cpp | 231 +++++++++--- 7 files changed, 502 insertions(+), 131 deletions(-) diff --git a/clients/drcachesim/analyzer_multi.cpp b/clients/drcachesim/analyzer_multi.cpp index 11ba9ef3b23..383707e86d7 100644 --- a/clients/drcachesim/analyzer_multi.cpp +++ b/clients/drcachesim/analyzer_multi.cpp @@ -259,6 +259,7 @@ analyzer_multi_t::init_dynamic_schedule() sched_ops.quantum_unit = scheduler_t::QUANTUM_TIME; sched_ops.syscall_switch_threshold = op_sched_syscall_switch_us.get_value(); sched_ops.blocking_switch_threshold = op_sched_blocking_switch_us.get_value(); + sched_ops.block_time_scale = op_sched_block_scale.get_value(); #ifdef HAS_ZIP if (!op_record_file.get_value().empty()) { record_schedule_zip_.reset(new zipfile_ostream_t(op_record_file.get_value())); diff --git a/clients/drcachesim/common/options.cpp b/clients/drcachesim/common/options.cpp index 53fceb2c03d..8b89f884d98 100644 --- a/clients/drcachesim/common/options.cpp +++ b/clients/drcachesim/common/options.cpp @@ -851,6 +851,11 @@ droption_t op_sched_blocking_switch_us( "maybe-blocking to incur a context switch. Applies to -core_sharded and " "-core_serial. "); +droption_t + op_sched_block_scale(DROPTION_SCOPE_ALL, "sched_block_scale", 1., + "Input block time scale factor", + "A higher value here results in blocking syscalls " + "keeping inputs unscheduled for longer."); #ifdef HAS_ZIP droption_t op_record_file(DROPTION_SCOPE_FRONTEND, "record_file", "", "Path for storing record of schedule", diff --git a/clients/drcachesim/common/options.h b/clients/drcachesim/common/options.h index 8deff88ea74..c4d3f1ec604 100644 --- a/clients/drcachesim/common/options.h +++ b/clients/drcachesim/common/options.h @@ -193,6 +193,7 @@ extern dynamorio::droption::droption_t op_sched_time; extern dynamorio::droption::droption_t op_sched_order_time; extern dynamorio::droption::droption_t op_sched_syscall_switch_us; extern dynamorio::droption::droption_t op_sched_blocking_switch_us; +extern dynamorio::droption::droption_t op_sched_block_scale; #ifdef HAS_ZIP extern dynamorio::droption::droption_t op_record_file; extern dynamorio::droption::droption_t op_replay_file; diff --git a/clients/drcachesim/scheduler/scheduler.cpp b/clients/drcachesim/scheduler/scheduler.cpp index c757d8f9f7d..ad2bcff115f 100644 --- a/clients/drcachesim/scheduler/scheduler.cpp +++ b/clients/drcachesim/scheduler/scheduler.cpp @@ -744,7 +744,12 @@ scheduler_tmpl_t::set_initial_schedule( } for (int i = 0; i < static_cast(outputs_.size()); ++i) { if (i < static_cast(inputs_.size())) { - input_info_t *queue_next = pop_from_ready_queue(i); + input_info_t *queue_next; +#ifdef DEBUG + sched_type_t::stream_status_t status = +#endif + pop_from_ready_queue(i, queue_next); + assert(status == STATUS_OK); // No blocked inputs yet. if (queue_next == nullptr) set_cur_input(i, INVALID_INPUT_ORDINAL); else @@ -860,10 +865,17 @@ scheduler_tmpl_t::read_recorded_schedule() return STATUS_ERROR_INVALID_PARAMETER; } for (int i = 0; i < static_cast(outputs_.size()); ++i) { - if (!outputs_[i].record.empty()) { - set_cur_input(i, outputs_[i].record[0].key.input); - } else + if (outputs_[i].record.empty()) { set_cur_input(i, INVALID_INPUT_ORDINAL); + } else if (outputs_[i].record[0].type == schedule_record_t::IDLE) { + set_cur_input(i, INVALID_INPUT_ORDINAL); + outputs_[i].waiting = true; + outputs_[i].wait_start_time = 0; // Updated on first next_record(). + VPRINT(this, 3, "output %d starting out idle\n", i); + } else { + assert(outputs_[i].record[0].type == schedule_record_t::DEFAULT); + set_cur_input(i, outputs_[i].record[0].key.input); + } } return STATUS_SUCCESS; } @@ -926,9 +938,11 @@ scheduler_tmpl_t::read_traced_schedule() uint64_t timestamp = entry.timestamp; // Some entries have no instructions (there is an entry for each timestamp, and // a signal can come in after a prior timestamp with no intervening instrs). + assert(all_sched[cur_output].empty() || + all_sched[cur_output].back().type == schedule_record_t::DEFAULT); if (!all_sched[cur_output].empty() && input == all_sched[cur_output].back().key.input && - start == all_sched[cur_output].back().start_instruction) { + start == all_sched[cur_output].back().value.start_instruction) { VPRINT(this, 3, "Output #%d: as-read segment #%zu has no instructions: skipping\n", cur_output, all_sched[cur_output].size() - 1); @@ -954,7 +968,8 @@ scheduler_tmpl_t::read_traced_schedule() for (int sched_idx = 0; sched_idx < static_cast(all_sched[output_idx].size()); ++sched_idx) { auto &segment = all_sched[output_idx][sched_idx]; - auto find = start2stop[segment.key.input].find(segment.start_instruction); + auto find = + start2stop[segment.key.input].find(segment.value.start_instruction); ++find; if (find == start2stop[segment.key.input].end()) segment.stop_instruction = std::numeric_limits::max(); @@ -963,12 +978,12 @@ scheduler_tmpl_t::read_traced_schedule() VPRINT(this, 4, "as-read segment #%d: input=%d start=%" PRId64 " stop=%" PRId64 " time=%" PRId64 "\n", - sched_idx, segment.key.input, segment.start_instruction, + sched_idx, segment.key.input, segment.value.start_instruction, segment.stop_instruction, segment.timestamp); if (sched_idx + 1 < static_cast(all_sched[output_idx].size()) && segment.key.input == all_sched[output_idx][sched_idx + 1].key.input && segment.stop_instruction > - all_sched[output_idx][sched_idx + 1].start_instruction) { + all_sched[output_idx][sched_idx + 1].value.start_instruction) { // A second sanity check. error_string_ = "Invalid decreasing start field in schedule file"; return STATUS_ERROR_INVALID_PARAMETER; @@ -976,7 +991,7 @@ scheduler_tmpl_t::read_traced_schedule() segment.key.input == all_sched[output_idx][sched_idx + 1].key.input && segment.stop_instruction == - all_sched[output_idx][sched_idx + 1].start_instruction) { + all_sched[output_idx][sched_idx + 1].value.start_instruction) { // Collapse into next. if (start_consec == -1) start_consec = sched_idx; @@ -986,7 +1001,7 @@ scheduler_tmpl_t::read_traced_schedule() : all_sched[output_idx][sched_idx]; outputs_[output_idx].record.emplace_back( static_cast(toadd.type), - +toadd.key.input, +toadd.start_instruction, + +toadd.key.input, +toadd.value.start_instruction, +all_sched[output_idx][sched_idx].stop_instruction, +toadd.timestamp); start_consec = -1; VDO(this, 3, { @@ -995,7 +1010,7 @@ scheduler_tmpl_t::read_traced_schedule() "segment #%zu: input=%d start=%" PRId64 " stop=%" PRId64 " time=%" PRId64 "\n", outputs_[output_idx].record.size() - 1, added.key.input, - added.start_instruction, added.stop_instruction, + added.value.start_instruction, added.stop_instruction, added.timestamp); }); } @@ -1003,7 +1018,7 @@ scheduler_tmpl_t::read_traced_schedule() VPRINT(this, 1, "Collapsed duplicates for %zu as-traced records for output #%d\n", outputs_[output_idx].record.size(), output_idx); if (!outputs_[output_idx].record.empty()) { - if (outputs_[output_idx].record[0].start_instruction != 0) { + if (outputs_[output_idx].record[0].value.start_instruction != 0) { VPRINT(this, 1, "Initial input for output #%d is: wait state\n", output_idx); set_cur_input(output_idx, INVALID_INPUT_ORDINAL); @@ -1056,7 +1071,8 @@ scheduler_tmpl_t::check_and_fix_modulo_problem_in_schedu uint64_t add_to_start = 0; bool in_order = true; for (const schedule_record_t &sched : input_sched[input_idx]) { - if (sched.start_instruction < prev_start) { + assert(sched.type == schedule_record_t::DEFAULT); + if (sched.value.start_instruction < prev_start) { // If within 50% of the end of the chunk we assume it's i#6107. if (prev_start * 2 > DEFAULT_CHUNK_SIZE) { add_to_start += DEFAULT_CHUNK_SIZE; @@ -1078,9 +1094,9 @@ scheduler_tmpl_t::check_and_fix_modulo_problem_in_schedu error_string_ = "Same timestamps not supported for i#6107 workaround"; return STATUS_ERROR_INVALID_PARAMETER; } - prev_start = sched.start_instruction; + prev_start = sched.value.start_instruction; timestamp2adjust[input_idx][sched.timestamp] = - sched.start_instruction + add_to_start; + sched.value.start_instruction + add_to_start; } } if (!found_i6107) @@ -1104,16 +1120,16 @@ scheduler_tmpl_t::check_and_fix_modulo_problem_in_schedu error_string_ = "Failed to find timestamp for i#6107 workaround"; return STATUS_ERROR_INVALID_PARAMETER; } - assert(it->second >= segment.start_instruction); - assert(it->second % DEFAULT_CHUNK_SIZE == segment.start_instruction); - if (it->second != segment.start_instruction) { + assert(it->second >= segment.value.start_instruction); + assert(it->second % DEFAULT_CHUNK_SIZE == segment.value.start_instruction); + if (it->second != segment.value.start_instruction) { VPRINT(this, 2, "Updating all_sched[%d][%d] input %d from %" PRId64 " to %" PRId64 "\n", output_idx, sched_idx, segment.key.input, - segment.start_instruction, it->second); + segment.value.start_instruction, it->second); } - segment.start_instruction = it->second; + segment.value.start_instruction = it->second; } } return STATUS_SUCCESS; @@ -1447,12 +1463,9 @@ scheduler_tmpl_t::skip_instructions(output_ordinal_t out } template -typename scheduler_tmpl_t::stream_status_t -scheduler_tmpl_t::record_schedule_segment( - output_ordinal_t output, typename schedule_record_t::record_type_t type, - input_ordinal_t input, uint64_t start_instruction, uint64_t stop_instruction) +uint64_t +scheduler_tmpl_t::get_time_micros() { - uint64_t timestamp; // XXX i#5843: Should we just use dr_get_microseconds() and avoid split-OS support // inside here? We will be pulling in drdecode at least for identifying blocking // syscalls so maybe full DR isn't much more since we're often linked with raw2trace @@ -1461,16 +1474,36 @@ scheduler_tmpl_t::record_schedule_segment( struct timeval time; if (gettimeofday(&time, nullptr) != 0) return sched_type_t::STATUS_RECORD_FAILED; - timestamp = time.tv_sec * 1000000 + time.tv_usec; + return time.tv_sec * 1000000 + time.tv_usec; #else SYSTEMTIME sys_time; GetSystemTime(&sys_time); FILETIME file_time; if (!SystemTimeToFileTime(&sys_time, &file_time)) return sched_type_t::STATUS_RECORD_FAILED; - timestamp = - file_time.dwLowDateTime + (static_cast(file_time.dwHighDateTime) << 32); + return file_time.dwLowDateTime + + (static_cast(file_time.dwHighDateTime) << 32); #endif +} + +template +uint64_t +scheduler_tmpl_t::get_output_time(output_ordinal_t output) +{ + // If the user is giving us times take the most recent of those. + if (outputs_[output].cur_time > 0) + return outputs_[output].cur_time; + // Otherwise, use wall-clock time. + return get_time_micros(); +} + +template +typename scheduler_tmpl_t::stream_status_t +scheduler_tmpl_t::record_schedule_segment( + output_ordinal_t output, typename schedule_record_t::record_type_t type, + input_ordinal_t input, uint64_t start_instruction, uint64_t stop_instruction) +{ + uint64_t timestamp = get_output_time(output); outputs_[output].record.emplace_back(type, input, start_instruction, stop_instruction, timestamp); // The stop is typically updated later in close_schedule_segment(). @@ -1491,6 +1524,18 @@ scheduler_tmpl_t::close_schedule_segment(output_ordinal_ // Skips already have a final stop value. return sched_type_t::STATUS_OK; } + if (outputs_[output].record.back().type == schedule_record_t::IDLE) { + uint64_t end = get_output_time(output); + assert(end >= outputs_[output].record.back().timestamp); + outputs_[output].record.back().value.idle_duration = + end - outputs_[output].record.back().timestamp; + VPRINT(this, 3, + "close_schedule_segment: idle duration %" PRIu64 " = %" PRIu64 + " - %" PRIu64 "\n", + outputs_[output].record.back().value.idle_duration, end, + outputs_[output].record.back().timestamp); + return sched_type_t::STATUS_OK; + } uint64_t instr_ord = input.reader->get_instruction_ordinal(); if (input.at_eof || *input.reader == *input.reader_end) { // The end is exclusive, so use the max int value. @@ -1506,12 +1551,12 @@ scheduler_tmpl_t::close_schedule_segment(output_ordinal_ input.index); ++instr_ord; } - VPRINT(this, 3, - "close_schedule_segment: input=%d start=%" PRId64 " stop=%" PRId64 "\n", - input.index, outputs_[output].record.back().start_instruction, instr_ord); + VPRINT( + this, 3, "close_schedule_segment: input=%d start=%" PRId64 " stop=%" PRId64 "\n", + input.index, outputs_[output].record.back().value.start_instruction, instr_ord); // Check for empty default entries, except the starter 0,0 ones. assert(outputs_[output].record.back().type != schedule_record_t::DEFAULT || - outputs_[output].record.back().start_instruction < instr_ord || + outputs_[output].record.back().value.start_instruction < instr_ord || instr_ord == 0); outputs_[output].record.back().stop_instruction = instr_ord; return sched_type_t::STATUS_OK; @@ -1531,62 +1576,115 @@ scheduler_tmpl_t::add_to_ready_queue(input_info_t *input VPRINT( this, 4, "add_to_ready_queue (pre-size %zu): input %d priority %d timestamp delta %" PRIu64 - "\n", + " block factor %3.2f time %" PRIu64 "\n", ready_priority_.size(), input->index, input->priority, - input->reader->get_last_timestamp() - input->base_timestamp); + input->reader->get_last_timestamp() - input->base_timestamp, + input->block_time_factor, input->blocked_start_time); input->queue_counter = ++ready_counter_; ready_priority_.push(input); } template -typename scheduler_tmpl_t::input_info_t * +typename scheduler_tmpl_t::stream_status_t scheduler_tmpl_t::pop_from_ready_queue( - output_ordinal_t for_output) + output_ordinal_t for_output, input_info_t *&new_input) { std::set skipped; + std::set blocked; input_info_t *res = nullptr; - do { + sched_type_t::stream_status_t status = STATUS_OK; + while (!ready_priority_.empty()) { res = ready_priority_.top(); ready_priority_.pop(); - if (res->binding.empty() || res->binding.find(for_output) != res->binding.end()) - break; - // We keep searching for a suitable input. - skipped.insert(res); + if (res->binding.empty() || res->binding.find(for_output) != res->binding.end()) { + // For blocked inputs, as we don't have interrupts or other regular + // control points we only check for being unblocked when an input + // would be chosen to run. We thus keep blocked inputs in the ready queue. + if (options_.quantum_unit == QUANTUM_TIME) { + if (res->block_time_factor > 0 && + outputs_[for_output].cur_time - res->blocked_start_time < + options_.block_time_scale * res->block_time_factor) { + VPRINT( + this, 4, + "pop queue: %d still blocked for %4.1f (%3.2f * %3.2f - (%" PRIu64 + " - %" PRIu64 ")\n", + res->index, + options_.block_time_scale * res->block_time_factor - + (outputs_[for_output].cur_time - res->blocked_start_time), + options_.block_time_scale, res->block_time_factor, + outputs_[for_output].cur_time, res->blocked_start_time); + // We keep searching for a suitable input. + blocked.insert(res); + } else + break; + } else { + if (res->block_time_factor > 0) { + VPRINT(this, 4, "pop queue: %d still blocked for %4.1f\n", res->index, + res->block_time_factor); + --res->block_time_factor; + // We keep searching for a suitable input. + blocked.insert(res); + } else + break; + } + } else { + // We keep searching for a suitable input. + skipped.insert(res); + } res = nullptr; - } while (!ready_priority_.empty()); + } + if (res == nullptr && !blocked.empty()) { + // Do not hand out EOF thinking we're done: we still have inputs blocked + // on i/o, so just wait and retry. + status = STATUS_IDLE; + } // Re-add the ones we skipped, but without changing their counters so we preserve // the prior FIFO order. for (input_info_t *save : skipped) ready_priority_.push(save); + // Re-add the blocked ones to the back. + for (input_info_t *save : blocked) + add_to_ready_queue(save); if (res != nullptr) { VPRINT(this, 4, "pop_from_ready_queue[%d] (post-size %zu): input %d priority %d timestamp " "delta %" PRIu64 "\n", for_output, ready_priority_.size(), res->index, res->priority, res->reader->get_last_timestamp() - res->base_timestamp); + res->block_time_factor = 0.; } - return res; + new_input = res; + return status; } template bool -scheduler_tmpl_t::syscall_incurs_switch(input_info_t *input) +scheduler_tmpl_t::syscall_incurs_switch(input_info_t *input, + double &block_time_factor) { uint64_t post_time = input->reader->get_last_timestamp(); assert(input->processing_syscall || input->processing_maybe_blocking_syscall); if (input->reader->get_version() < TRACE_ENTRY_VERSION_FREQUENT_TIMESTAMPS) { // This is a legacy trace that does not have timestamps bracketing syscalls. - // We switch on every maybe-blocking syscall in this case. + // We switch on every maybe-blocking syscall in this case and have a simplified + // blocking model. + block_time_factor = 1.; return input->processing_maybe_blocking_syscall; } assert(input->pre_syscall_timestamp > 0); assert(input->pre_syscall_timestamp <= post_time); uint64_t latency = post_time - input->pre_syscall_timestamp; - VPRINT(this, 3, "input %d %ssyscall latency: %" PRIu64 "\n", input->index, - input->processing_maybe_blocking_syscall ? "maybe-blocking " : "", latency); - return (input->processing_maybe_blocking_syscall && - latency >= options_.blocking_switch_threshold) || - latency >= options_.syscall_switch_threshold; + uint64_t threshold = input->processing_maybe_blocking_syscall + ? options_.blocking_switch_threshold + : options_.syscall_switch_threshold; + block_time_factor = static_cast(latency) / threshold; + if (options_.quantum_unit == QUANTUM_INSTRUCTIONS) + block_time_factor *= options_.block_time_scale; + VPRINT(this, 3, "input %d %ssyscall latency %" PRIu64 " => factor %4.1f\n", + input->index, + input->processing_maybe_blocking_syscall ? "maybe-blocking " : "", latency, + block_time_factor); + return latency >= threshold; } template @@ -1602,7 +1700,8 @@ scheduler_tmpl_t::set_cur_input(output_ordinal_t output, assert(input < static_cast(inputs_.size())); int prev_input = outputs_[output].cur_input; if (prev_input >= 0) { - if (options_.mapping == MAP_TO_ANY_OUTPUT && prev_input != input) + if (options_.mapping == MAP_TO_ANY_OUTPUT && prev_input != input && + !inputs_[prev_input].at_eof) add_to_ready_queue(&inputs_[prev_input]); if (prev_input != input && options_.schedule_record_ostream != nullptr) { input_info_t &prev_info = inputs_[prev_input]; @@ -1612,6 +1711,12 @@ scheduler_tmpl_t::set_cur_input(output_ordinal_t output, if (status != sched_type_t::STATUS_OK) return status; } + } else if (options_.schedule_record_ostream != nullptr && + outputs_[output].record.back().type == schedule_record_t::IDLE) { + input_info_t unused; + sched_type_t::stream_status_t status = close_schedule_segment(output, unused); + if (status != sched_type_t::STATUS_OK) + return status; } outputs_[output].cur_input = input; if (input < 0) @@ -1657,26 +1762,32 @@ scheduler_tmpl_t::pick_next_input_as_previously( } const schedule_record_t &segment = outputs_[output].record[outputs_[output].record_index + 1]; + if (segment.type == schedule_record_t::IDLE) { + outputs_[output].waiting = true; + outputs_[output].wait_start_time = get_output_time(output); + ++outputs_[output].record_index; + return sched_type_t::STATUS_IDLE; + } index = segment.key.input; VPRINT(this, 5, "pick_next_input_as_previously[%d]: next replay segment in=%d (@%" PRId64 ") type=%d start=%" PRId64 " end=%" PRId64 "\n", output, index, inputs_[index].reader->get_instruction_ordinal(), segment.type, - segment.start_instruction, segment.stop_instruction); + segment.value.start_instruction, segment.stop_instruction); { std::lock_guard lock(*inputs_[index].lock); if (inputs_[index].reader->get_instruction_ordinal() > - segment.start_instruction) { + segment.value.start_instruction) { VPRINT(this, 1, "WARNING: next_record[%d]: input %d wants instr #%" PRId64 " but it is already at #%" PRId64 "\n", - output, index, segment.start_instruction, + output, index, segment.value.start_instruction, inputs_[index].reader->get_instruction_ordinal()); } if (inputs_[index].reader->get_instruction_ordinal() < - segment.start_instruction && + segment.value.start_instruction && // Don't wait for an ROI that starts at the beginning. - segment.start_instruction > 1 && + segment.value.start_instruction > 1 && // The output may have begun in the wait state. (outputs_[output].record_index == -1 || // When we skip our separator+timestamp markers are at the @@ -1696,7 +1807,7 @@ scheduler_tmpl_t::pick_next_input_as_previously( // scheduled on a core. If we could identify those, we should return // STATUS_IDLE rather than STATUS_WAIT. VPRINT(this, 3, "next_record[%d]: waiting for input %d instr #%" PRId64 "\n", - output, index, segment.start_instruction); + output, index, segment.value.start_instruction); // Give up this input and go into a wait state. // We'll come back here on the next next_record() call. set_cur_input(output, INVALID_INPUT_ORDINAL); @@ -1758,7 +1869,7 @@ scheduler_tmpl_t::pick_next_input_as_previously( return sched_type_t::STATUS_SKIPPED; } else { VPRINT(this, 2, "next_record[%d]: advancing to input %d instr #%" PRId64 "\n", - output, index, segment.start_instruction); + output, index, segment.value.start_instruction); } ++outputs_[output].record_index; return sched_type_t::STATUS_OK; @@ -1767,7 +1878,7 @@ scheduler_tmpl_t::pick_next_input_as_previously( template typename scheduler_tmpl_t::stream_status_t scheduler_tmpl_t::pick_next_input(output_ordinal_t output, - bool in_wait_state) + double block_time_factor) { sched_type_t::stream_status_t res = sched_type_t::STATUS_OK; bool need_lock = @@ -1776,7 +1887,9 @@ scheduler_tmpl_t::pick_next_input(output_ordinal_t outpu : std::unique_lock(); input_ordinal_t prev_index = outputs_[output].cur_input; input_ordinal_t index = INVALID_INPUT_ORDINAL; + int iters = 0; while (true) { + ++iters; if (index < 0) { if (options_.mapping == MAP_AS_PREVIOUSLY) { res = pick_next_input_as_previously(output, index); @@ -1796,7 +1909,7 @@ scheduler_tmpl_t::pick_next_input(output_ordinal_t outpu ") type=%d start=%" PRId64 " end=%" PRId64 "\n", output, input, inputs_[input].reader->get_instruction_ordinal(), - segment.type, segment.start_instruction, + segment.type, segment.value.start_instruction, segment.stop_instruction); } }); @@ -1805,14 +1918,23 @@ scheduler_tmpl_t::pick_next_input(output_ordinal_t outpu if (res != sched_type_t::STATUS_OK) return res; } else if (options_.mapping == MAP_TO_ANY_OUTPUT) { + if (block_time_factor > 0. && prev_index != INVALID_INPUT_ORDINAL) { + std::lock_guard lock(*inputs_[prev_index].lock); + if (inputs_[prev_index].block_time_factor == 0.) { + VPRINT(this, 2, "next_record[%d]: block time factor %3.2f\n", + output, block_time_factor); + inputs_[prev_index].block_time_factor = block_time_factor; + inputs_[prev_index].blocked_start_time = + outputs_[output].cur_time; + } else { + // If we looped we could have the same prev_index. + assert(iters > 1); + } + } if (prev_index != INVALID_INPUT_ORDINAL && inputs_[prev_index].switch_to_input != INVALID_INPUT_ORDINAL) { input_info_t *target = &inputs_[inputs_[prev_index].switch_to_input]; inputs_[prev_index].switch_to_input = INVALID_INPUT_ORDINAL; - // TODO i#5843: Once we add i/o wait times, we should also check - // the sleeping queue and wake up the target. - // We should probably implement the "Merge tracking..." proposal - // from the comment at the top of set_cur_input() too. // XXX i#5843: Add an invariant check that the next timestamp of the // target is later than the pre-switch-syscall timestamp? if (ready_priority_.find(target)) { @@ -1820,6 +1942,15 @@ scheduler_tmpl_t::pick_next_input(output_ordinal_t outpu output, target->index); ready_priority_.erase(target); index = target->index; + // Erase any remaining wait time for the target. + if (target->block_time_factor != 0.) { + VPRINT(this, 3, + "next_record[%d]: direct switch erasing block time " + "factor " + "%4.1f for input %d\n", + output, target->block_time_factor, target->index); + target->block_time_factor = 0.; + } } else { // TODO i#5843: If the target is running on another output, we // need to do a forced migration by setting a flag to force a @@ -1835,7 +1966,7 @@ scheduler_tmpl_t::pick_next_input(output_ordinal_t outpu } if (index != INVALID_INPUT_ORDINAL) { // We found a direct switch target above. - } else if (ready_queue_empty()) { + } else if (ready_queue_empty() && block_time_factor == 0.) { if (prev_index == INVALID_INPUT_ORDINAL) return eof_or_idle(output); std::lock_guard lock(*inputs_[prev_index].lock); @@ -1844,17 +1975,33 @@ scheduler_tmpl_t::pick_next_input(output_ordinal_t outpu else index = prev_index; // Go back to prior. } else { - if (!in_wait_state) { - // Give up the input before we go to the queue so we can add - // ourselves to the queue. If we're the highest priority we - // shouldn't switch. The queue preserves FIFO for same-priority - // cases so we will switch if someone of equal priority is - // waiting. - set_cur_input(output, INVALID_INPUT_ORDINAL); + // Give up the input before we go to the queue so we can add + // ourselves to the queue. If we're the highest priority we + // shouldn't switch. The queue preserves FIFO for same-priority + // cases so we will switch if someone of equal priority is + // waiting. + set_cur_input(output, INVALID_INPUT_ORDINAL); + input_info_t *queue_next = nullptr; + sched_type_t::stream_status_t status = + pop_from_ready_queue(output, queue_next); + if (status != STATUS_OK) { + if (status == STATUS_IDLE) { + outputs_[output].waiting = true; + if (options_.schedule_record_ostream != nullptr) { + sched_type_t::stream_status_t record_status = + record_schedule_segment( + output, schedule_record_t::IDLE, 0, 0, 0); + if (record_status != sched_type_t::STATUS_OK) + return record_status; + } + } + return status; } - input_info_t *queue_next = pop_from_ready_queue(output); - if (queue_next == nullptr) + if (queue_next == nullptr) { + assert(block_time_factor == 0. || + prev_index == INVALID_INPUT_ORDINAL); return eof_or_idle(output); + } index = queue_next->index; } } else if (options_.deps == DEPENDENCY_TIMESTAMPS) { @@ -1928,8 +2075,22 @@ scheduler_tmpl_t::next_record(output_ordinal_t output, if (!outputs_[output].active) return sched_type_t::STATUS_IDLE; if (outputs_[output].waiting) { + if (options_.mapping == MAP_AS_PREVIOUSLY && + outputs_[output].wait_start_time > 0) { + uint64_t duration = outputs_[output] + .record[outputs_[output].record_index] + .value.idle_duration; + uint64_t now = get_output_time(output); + if (now - outputs_[output].wait_start_time < duration) { + VPRINT(this, 4, + "next_record[%d]: elapsed %" PRIu64 " < duration %" PRIu64 "\n", + output, now - outputs_[output].wait_start_time, duration); + return sched_type_t::STATUS_WAIT; + } else + outputs_[output].wait_start_time = 0; + } VPRINT(this, 5, "next_record[%d]: need new input (cur=waiting)\n", output); - sched_type_t::stream_status_t res = pick_next_input(output, true); + sched_type_t::stream_status_t res = pick_next_input(output, 0.); if (res != sched_type_t::STATUS_OK && res != sched_type_t::STATUS_SKIPPED) return res; outputs_[output].waiting = false; @@ -2009,7 +2170,7 @@ scheduler_tmpl_t::next_record(output_ordinal_t output, input->index, input->reader->get_instruction_ordinal()); VDO(this, 5, print_record(record);); bool need_new_input = false; - bool in_wait_state = false; + double block_time_factor = 0.; if (options_.mapping == MAP_AS_PREVIOUSLY) { assert(outputs_[output].record_index >= 0); if (outputs_[output].record_index >= @@ -2020,10 +2181,14 @@ scheduler_tmpl_t::next_record(output_ordinal_t output, schedule_record_t::SKIP) { VPRINT(this, 5, "next_record[%d]: need new input after skip\n", output); need_new_input = true; + } else if (outputs_[output].record[outputs_[output].record_index].type == + schedule_record_t::SYNTHETIC_END) { + VPRINT(this, 5, "next_record[%d]: at synthetic end\n", output); } else { const schedule_record_t &segment = outputs_[output].record[outputs_[output].record_index]; - uint64_t start = segment.start_instruction; + assert(segment.type == schedule_record_t::DEFAULT); + uint64_t start = segment.value.start_instruction; uint64_t stop = segment.stop_instruction; // The stop is exclusive. 0 does mean to do nothing (easiest // to have an empty record to share the next-entry for a start skip @@ -2069,13 +2234,19 @@ scheduler_tmpl_t::next_record(output_ordinal_t output, input->switch_to_input = it->second; } } else if (record_type_is_instr(record)) { - if (syscall_incurs_switch(input)) { + if (syscall_incurs_switch(input, block_time_factor)) { // Model as blocking and should switch to a different input. need_new_input = true; - in_wait_state = true; VPRINT(this, 3, "next_record[%d]: hit blocking syscall in input %d\n", output, input->index); + } else if (input->switch_to_input != INVALID_INPUT_ORDINAL) { + // The switch request overrides any latency threshold. + need_new_input = true; + VPRINT(this, 3, + "next_record[%d]: direct switch on low-latency syscall in " + "input %d\n", + output, input->index); } input->processing_syscall = false; input->processing_maybe_blocking_syscall = false; @@ -2101,6 +2272,9 @@ scheduler_tmpl_t::next_record(output_ordinal_t output, // We again prefer to switch to another input even if the current // input has the oldest timestamp, prioritizing context switches // over timestamp ordering. + VPRINT(this, 4, + "next_record[%d]: input %d hit end of instr quantum\n", output, + input->index); need_new_input = true; } } else if (options_.quantum_unit == QUANTUM_TIME) { @@ -2144,7 +2318,8 @@ scheduler_tmpl_t::next_record(output_ordinal_t output, VPRINT(this, 5, "next_record[%d]: queuing candidate record\n", output); input->queue.push_back(record); lock.unlock(); - sched_type_t::stream_status_t res = pick_next_input(output, in_wait_state); + sched_type_t::stream_status_t res = + pick_next_input(output, block_time_factor); if (res != sched_type_t::STATUS_OK && res != sched_type_t::STATUS_WAIT && res != sched_type_t::STATUS_SKIPPED) return res; diff --git a/clients/drcachesim/scheduler/scheduler.h b/clients/drcachesim/scheduler/scheduler.h index edbb1ab998c..490afeecc6b 100644 --- a/clients/drcachesim/scheduler/scheduler.h +++ b/clients/drcachesim/scheduler/scheduler.h @@ -522,6 +522,18 @@ template class scheduler_tmpl_t { * blocking and trigger a context switch. */ uint64_t blocking_switch_threshold = 100; + /** + * Controls the amount of time inputs are considered blocked at a syscall whose + * latency exceeds #syscall_switch_threshold or #blocking_switch_threshold. A + * "block time factor" is computed from the syscall latency divided by either + * #syscall_switch_threshold or #blocking_switch_threshold. This factor is + * multiplied by this field #block_time_scale to produce a final value. For + * #QUANTUM_TIME, that final value's amount of time, as reported by the time + * parameter to next_record(), must pass before the input is no longer considered + * blocked. For instruction quanta, that final value's count of scheduler + * selections must occur before the input is actually selected. + */ + double block_time_scale = 1.; }; /** @@ -956,6 +968,8 @@ template class scheduler_tmpl_t { // While the scheduler only hands an input to one output at a time, during // scheduling decisions one thread may need to access another's fields. // We use a unique_ptr to make this moveable for vector storage. + // For inputs not actively assigned to a core but sitting in the ready_queue, + // sched_lock_ suffices to synchronize access. std::unique_ptr lock; // A tid can be duplicated across workloads so we need the pair of // workload index + tid to identify the original input. @@ -1002,6 +1016,9 @@ template class scheduler_tmpl_t { bool switching_pre_instruction = false; // Used for time-based quanta. uint64_t start_time_in_quantum = 0; + // These fields model waiting at a blocking syscall. + double block_time_factor = 0.; + uint64_t blocked_start_time = 0; // For QUANTUM_TIME only. }; // Format for recording a schedule to disk. A separate sequence of these records @@ -1019,6 +1036,9 @@ template class scheduler_tmpl_t { FOOTER, // The final entry in the component. Other fields are ignored. SKIP, // Skip ahead to the next region of interest. SYNTHETIC_END, // A synthetic thread exit record must be supplied. + // Indicates that the output is idle. The value.idle_duration field holds + // a duration in microseconds. + IDLE, }; static constexpr int VERSION_CURRENT = 0; schedule_record_t() = default; @@ -1026,7 +1046,7 @@ template class scheduler_tmpl_t { uint64_t stop, uint64_t time) : type(type) , key(input) - , start_instruction(start) + , value(start) , stop_instruction(stop) , timestamp(time) { @@ -1045,8 +1065,18 @@ template class scheduler_tmpl_t { input_ordinal_t input = -1; int version; // For record_type_t::VERSION. } END_PACKED_STRUCTURE key; - // Input stream ordinal of starting point. - uint64_t start_instruction = 0; + START_PACKED_STRUCTURE + union value { + value() = default; + value(uint64_t start) + : start_instruction(start) + { + } + // For record_type_t::IDLE, the duration in microseconds of the idling. + uint64_t idle_duration; + // Input stream ordinal of starting point, for non-IDLE types. + uint64_t start_instruction = 0; + } END_PACKED_STRUCTURE value; // Input stream ordinal, exclusive. Max numeric value means continue until EOF. uint64_t stop_instruction = 0; // Timestamp in microseconds to keep context switches ordered. @@ -1096,6 +1126,8 @@ template class scheduler_tmpl_t { int64_t as_traced_cpuid = -1; // Used for MAP_AS_PREVIOUSLY with live_replay_output_count_. bool at_eof = false; + // Used for replaying wait periods. + uint64_t wait_start_time = 0; }; // Called just once at initialization time to set the initial input-to-output @@ -1168,6 +1200,12 @@ template class scheduler_tmpl_t { scheduler_status_t read_recorded_schedule(); + uint64_t + get_time_micros(); + + uint64_t + get_output_time(output_ordinal_t output); + // The caller must hold the lock for the input. stream_status_t record_schedule_segment( @@ -1193,7 +1231,7 @@ template class scheduler_tmpl_t { // Finds the next input stream for the 'output_ordinal'-th output stream. // No input_info_t lock can be held on entry. stream_status_t - pick_next_input(output_ordinal_t output, bool in_wait_state); + pick_next_input(output_ordinal_t output, double block_time_factor); // Helper for pick_next_input() for MAP_AS_PREVIOUSLY. // No input_info_t lock can be held on entry. @@ -1313,14 +1351,16 @@ template class scheduler_tmpl_t { add_to_ready_queue(input_info_t *input); // The input's lock must be held by the caller. + // Returns a multiplier for how long the input should be considered blocked. bool - syscall_incurs_switch(input_info_t *input); + syscall_incurs_switch(input_info_t *input, double &block_time_factor); // sched_lock_ must be held by the caller. // "for_output" is which output stream is looking for a new input; only an // input which is able to run on that output will be selected. - input_info_t * - pop_from_ready_queue(output_ordinal_t for_output); + stream_status_t + pop_from_ready_queue(output_ordinal_t for_output, input_info_t *&new_input); + /// /////////////////////////////////////////////////////////////////////////// diff --git a/clients/drcachesim/tests/scheduler_launcher.cpp b/clients/drcachesim/tests/scheduler_launcher.cpp index b0cac30df6e..966f814d7ae 100644 --- a/clients/drcachesim/tests/scheduler_launcher.cpp +++ b/clients/drcachesim/tests/scheduler_launcher.cpp @@ -101,6 +101,11 @@ droption_t op_honor_stamps(DROPTION_SCOPE_ALL, "honor_stamps", true, "Whether to honor recorded timestamps for ordering", "Whether to honor recorded timestamps for ordering"); +droption_t op_block_time_scale(DROPTION_SCOPE_ALL, "block_time_scale", 1., + "Input block time scale factor", + "A higher value here results in blocking syscalls " + "keeping inputs unscheduled for longer."); + #ifdef HAS_ZIP droption_t op_record_file(DROPTION_SCOPE_FRONTEND, "record_file", "", "Path for storing record of schedule", @@ -319,6 +324,7 @@ _tmain(int argc, const TCHAR *targv[]) sched_ops.quantum_duration = op_sched_quantum.get_value(); if (op_sched_time.get_value()) sched_ops.quantum_unit = scheduler_t::QUANTUM_TIME; + sched_ops.block_time_scale = op_block_time_scale.get_value(); #ifdef HAS_ZIP std::unique_ptr record_zip; std::unique_ptr replay_zip; diff --git a/clients/drcachesim/tests/scheduler_unit_tests.cpp b/clients/drcachesim/tests/scheduler_unit_tests.cpp index fbf431a72e1..445785772a1 100644 --- a/clients/drcachesim/tests/scheduler_unit_tests.cpp +++ b/clients/drcachesim/tests/scheduler_unit_tests.cpp @@ -33,6 +33,7 @@ #define NOMINMAX // Avoid windows.h messing up std::min. #undef NDEBUG #include +#include #include #include #include @@ -806,7 +807,7 @@ test_real_file_queries_and_filters(const char *testdir) // Assumes the input threads are all tid_base plus an offset < 26. static std::vector run_lockstep_simulation(scheduler_t &scheduler, int num_outputs, memref_tid_t tid_base, - bool send_time = false) + bool send_time = false, bool print_markers = true) { // Walk the outputs in lockstep for crude but deterministic concurrency. std::vector outputs(num_outputs, nullptr); @@ -816,6 +817,10 @@ run_lockstep_simulation(scheduler_t &scheduler, int num_outputs, memref_tid_t ti int num_eof = 0; // Record the threads, one char each. std::vector sched_as_string(num_outputs); + static constexpr char THREAD_LETTER_START = 'A'; + static constexpr char WAIT_SYMBOL = '-'; + static constexpr char IDLE_SYMBOL = '_'; + static constexpr char NON_INSTR_SYMBOL = '.'; while (num_eof < num_outputs) { for (int i = 0; i < num_outputs; i++) { if (eof[i]) @@ -839,22 +844,22 @@ run_lockstep_simulation(scheduler_t &scheduler, int num_outputs, memref_tid_t ti continue; } if (status == scheduler_t::STATUS_WAIT) { - sched_as_string[i] += '-'; + sched_as_string[i] += WAIT_SYMBOL; continue; } if (status == scheduler_t::STATUS_IDLE) { - sched_as_string[i] += '_'; + sched_as_string[i] += IDLE_SYMBOL; continue; } assert(status == scheduler_t::STATUS_OK); if (type_is_instr(memref.instr.type)) { sched_as_string[i] += - 'A' + static_cast(memref.instr.tid - tid_base); + THREAD_LETTER_START + static_cast(memref.instr.tid - tid_base); } else { // While this makes the string longer, it is just too confusing // with the same letter seemingly on 2 cores at once without these // fillers to line everything up in time. - sched_as_string[i] += '.'; + sched_as_string[i] += NON_INSTR_SYMBOL; } } } @@ -873,6 +878,15 @@ run_lockstep_simulation(scheduler_t &scheduler, int num_outputs, memref_tid_t ti inputs.insert(sched_as_string[out][step]); } } + if (!print_markers) { + // We kept the dots internally for our same-timestep check above. + for (int i = 0; i < num_outputs; ++i) { + sched_as_string[i].erase(std::remove(sched_as_string[i].begin(), + sched_as_string[i].end(), + NON_INSTR_SYMBOL), + sched_as_string[i].end()); + } + } return sched_as_string; } @@ -976,6 +990,12 @@ test_synthetic_time_quanta() refs[i].push_back(make_timestamp(10)); refs[i].push_back(make_instr(10)); refs[i].push_back(make_instr(30)); + if (i == 0) { + refs[i].push_back(make_timestamp(20)); + refs[i].push_back(make_marker(TRACE_MARKER_TYPE_SYSCALL, 42)); + refs[i].push_back(make_marker(TRACE_MARKER_TYPE_MAYBE_BLOCKING_SYSCALL, 0)); + refs[i].push_back(make_timestamp(220)); + } refs[i].push_back(make_instr(50)); refs[i].push_back(make_exit(TID_BASE + i)); } @@ -997,6 +1017,7 @@ test_synthetic_time_quanta() /*verbosity=*/4); sched_ops.quantum_unit = scheduler_t::QUANTUM_TIME; sched_ops.quantum_duration = 3; + sched_ops.block_time_scale = 5.; // Ensure it waits a while. zipfile_ostream_t outfile(record_fname); sched_ops.schedule_record_ostream = &outfile; if (scheduler.init(sched_inputs, NUM_OUTPUTS, sched_ops) != @@ -1010,7 +1031,7 @@ test_synthetic_time_quanta() scheduler_t::stream_status_t status = stream->next_record(memref, time); if (status != expect_status) { std::cerr << "Expected status " << expect_status << " != " << status - << "\n"; + << " at time " << time << "\n"; assert(false); } if (status == scheduler_t::STATUS_OK) { @@ -1043,18 +1064,30 @@ test_synthetic_time_quanta() check_next(cpu0, ++time, scheduler_t::STATUS_OK, TID_C, TRACE_TYPE_INSTR); // Advance cpu1 which is now at its quantum end at time 6 and should switch. check_next(cpu1, ++time, scheduler_t::STATUS_OK, TID_A, TRACE_TYPE_INSTR); - check_next(cpu1, ++time, scheduler_t::STATUS_OK, TID_A, TRACE_TYPE_INSTR); - check_next(cpu1, time, scheduler_t::STATUS_OK, TID_A, TRACE_TYPE_THREAD_EXIT); + check_next(cpu1, time, scheduler_t::STATUS_OK, TID_A, TRACE_TYPE_MARKER); + check_next(cpu1, time, scheduler_t::STATUS_OK, TID_A, TRACE_TYPE_MARKER); + check_next(cpu1, time, scheduler_t::STATUS_OK, TID_A, TRACE_TYPE_MARKER); + check_next(cpu1, time, scheduler_t::STATUS_OK, TID_A, TRACE_TYPE_MARKER); + // We just hit a blocking syscall in A so we swap to B. check_next(cpu1, ++time, scheduler_t::STATUS_OK, TID_B, TRACE_TYPE_INSTR); - // This is another quantum end at 9 but the queue is empty. + // This is another quantum end at 9 but no other input is available. check_next(cpu1, ++time, scheduler_t::STATUS_OK, TID_B, TRACE_TYPE_INSTR); - // Finish off the inputs. + check_next(cpu1, time, scheduler_t::STATUS_OK, TID_B, TRACE_TYPE_THREAD_EXIT); + check_next(cpu1, ++time, scheduler_t::STATUS_IDLE); + // Finish off C on cpu 0. check_next(cpu0, ++time, scheduler_t::STATUS_OK, TID_C, TRACE_TYPE_INSTR); check_next(cpu0, ++time, scheduler_t::STATUS_OK, TID_C, TRACE_TYPE_INSTR); check_next(cpu0, time, scheduler_t::STATUS_OK, TID_C, TRACE_TYPE_THREAD_EXIT); - check_next(cpu0, time, scheduler_t::STATUS_IDLE); - check_next(cpu1, time, scheduler_t::STATUS_OK, TID_B, TRACE_TYPE_THREAD_EXIT); - check_next(cpu1, time, scheduler_t::STATUS_EOF); + // Both cpus wait until A is unblocked. + check_next(cpu1, ++time, scheduler_t::STATUS_IDLE); + check_next(cpu0, ++time, scheduler_t::STATUS_IDLE); + check_next(cpu1, ++time, scheduler_t::STATUS_IDLE); + check_next(cpu0, ++time, scheduler_t::STATUS_IDLE); + check_next(cpu1, ++time, scheduler_t::STATUS_IDLE); + check_next(cpu1, ++time, scheduler_t::STATUS_OK, TID_A, TRACE_TYPE_INSTR); + check_next(cpu1, time, scheduler_t::STATUS_OK, TID_A, TRACE_TYPE_THREAD_EXIT); + check_next(cpu1, ++time, scheduler_t::STATUS_EOF); + check_next(cpu0, ++time, scheduler_t::STATUS_EOF); if (scheduler.write_recorded_schedule() != scheduler_t::STATUS_SUCCESS) assert(false); } @@ -1083,8 +1116,8 @@ test_synthetic_time_quanta() for (int i = 0; i < NUM_OUTPUTS; i++) { std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n"; } - assert(sched_as_string[0] == "..A..CCC._"); - assert(sched_as_string[1] == "..BAA.BB."); + assert(sched_as_string[0] == "..A..CCC._________"); + assert(sched_as_string[1] == "..BA....BB.____A."); } #endif } @@ -1412,21 +1445,36 @@ test_synthetic_with_syscalls_multiple() std::vector inputs; inputs.push_back(make_thread(tid)); inputs.push_back(make_pid(1)); + inputs.push_back(make_version(TRACE_ENTRY_VERSION)); + uint64_t stamp = + 10000 * workload_idx + 1000 * (NUM_INPUTS_PER_WORKLOAD - input_idx); for (int instr_idx = 0; instr_idx < NUM_INSTRS; instr_idx++) { - // Sprinkle timestamps every other instruction. We use the - // same formula as test_synthetic_with_priorities(). - if (instr_idx % 2 == 0) { - inputs.push_back(make_timestamp( - 1000 * workload_idx + - 100 * (NUM_INPUTS_PER_WORKLOAD - input_idx) + 10 * instr_idx)); + // Sprinkle timestamps every other instruction. We use a similar + // priority scheme as test_synthetic_with_priorities() but we leave + // room for blocking syscall timestamp gaps. + if (instr_idx % 2 == 0 && + (inputs.back().type != TRACE_TYPE_MARKER || + inputs.back().size != TRACE_MARKER_TYPE_TIMESTAMP)) { + inputs.push_back(make_timestamp(stamp)); } inputs.push_back(make_instr(42 + instr_idx * 4)); // Insert some blocking syscalls in the high-priority (see below) // middle threads. if (input_idx == 1 && instr_idx % (workload_idx + 1) == workload_idx) { + inputs.push_back(make_timestamp(stamp + 10)); + inputs.push_back(make_marker(TRACE_MARKER_TYPE_SYSCALL, 42)); inputs.push_back( make_marker(TRACE_MARKER_TYPE_MAYBE_BLOCKING_SYSCALL, 0)); + // Blocked for 5 100-threshold blocking-syscall units. + inputs.push_back(make_timestamp(stamp + 550)); + } else { + // Insert meta records to keep the locksteps lined up. + inputs.push_back(make_marker(TRACE_MARKER_TYPE_CPU_ID, 0)); + inputs.push_back(make_marker(TRACE_MARKER_TYPE_CPU_ID, 0)); + inputs.push_back(make_marker(TRACE_MARKER_TYPE_CPU_ID, 0)); + inputs.push_back(make_marker(TRACE_MARKER_TYPE_CPU_ID, 0)); } + stamp += 10; } inputs.push_back(make_exit(tid)); readers.emplace_back( @@ -1465,23 +1513,26 @@ test_synthetic_with_syscalls_multiple() if (scheduler.init(sched_inputs, NUM_OUTPUTS, sched_ops) != scheduler_t::STATUS_SUCCESS) assert(false); - std::vector sched_as_string = - run_lockstep_simulation(scheduler, NUM_OUTPUTS, TID_BASE); + // We omit the "." marker chars to keep the strings short enough to be readable. + std::vector sched_as_string = run_lockstep_simulation( + scheduler, NUM_OUTPUTS, TID_BASE, /*send_time=*/false, /*print_markers=*/false); for (int i = 0; i < NUM_OUTPUTS; i++) { std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n"; } // See the test_synthetic_with_priorities() test which has our base sequence. // But now B hits a syscall every instr, and E every other instr, so neither // reaches its 3-instr quantum. (H's syscalls are every 3rd instr coinciding with its - // quantum.) Furthermore, B isn't finished by the time E and H are done and we see - // the lower-priority C and F getting scheduled while B is in a wait state for its - // blocking syscall. + // quantum.) Furthermore, B, E, and H are blocked long enough that we see + // the lower-priority C and F getting scheduled. We end up with idle cores + // while we wait for B. + // We've omitted the "." marker records so these are not precisely simultaneous, + // so the view here may show 2 on the same core at once: but we check for that + // with the "." in run_lockstep_simulation(). The omitted "." markers also + // explains why the two strings are different lengths. assert(sched_as_string[0] == - ".B..HH.H.B.H.HH..B.HH.H..B.E.B...II.I.JJ.JJ.JJ.JJ.J.CC.C.II.I..DD.DA.AA.G.GG." - "DD.D.___"); + "BHHHFFFJJJJJJJBEEHHHIIIFFFAAAHHHBAAAGGGAAABGGG__B___B___B"); assert(sched_as_string[1] == - ".EE..B..EE..B..EE..B..EE...CC.C.FF.FB..C.CC.F.FF.I.II.FF.F..AA.A.GG.GD.DD.AA." - "A.GG.G."); + "EECCCIIICCCJJFFFCCCBIIIEEDDDGGGDDDEEDDD____EB__________________________"); } static void @@ -1507,20 +1558,36 @@ test_synthetic_with_syscalls_single() std::vector inputs; inputs.push_back(make_thread(tid)); inputs.push_back(make_pid(1)); + inputs.push_back(make_version(TRACE_ENTRY_VERSION)); + uint64_t stamp = + 10000 * workload_idx + 1000 * (NUM_INPUTS_PER_WORKLOAD - input_idx); for (int instr_idx = 0; instr_idx < NUM_INSTRS; instr_idx++) { - // Sprinkle timestamps every other instruction. We use the - // same formula as test_synthetic_with_priorities(). - if (instr_idx % 2 == 0) { - inputs.push_back(make_timestamp( - 1000 * workload_idx + - 100 * (NUM_INPUTS_PER_WORKLOAD - input_idx) + 10 * instr_idx)); + // Sprinkle timestamps every other instruction. We use a similar + // priority scheme as test_synthetic_with_priorities() but we leave + // room for blocking syscall timestamp gaps. + if (instr_idx % 2 == 0 && + (inputs.back().type != TRACE_TYPE_MARKER || + inputs.back().size != TRACE_MARKER_TYPE_TIMESTAMP)) { + inputs.push_back(make_timestamp(stamp)); } inputs.push_back(make_instr(42 + instr_idx * 4)); - // Insert some blocking syscalls. + // Insert some blocking syscalls in the high-priority (see below) + // middle threads. if (instr_idx % 3 == 1) { + inputs.push_back(make_timestamp(stamp + 10)); + inputs.push_back(make_marker(TRACE_MARKER_TYPE_SYSCALL, 42)); inputs.push_back( make_marker(TRACE_MARKER_TYPE_MAYBE_BLOCKING_SYSCALL, 0)); + // Blocked for 3 100-threshold blocking-syscall units. + inputs.push_back(make_timestamp(stamp + 350)); + } else { + // Insert meta records to keep the locksteps lined up. + inputs.push_back(make_marker(TRACE_MARKER_TYPE_CPU_ID, 0)); + inputs.push_back(make_marker(TRACE_MARKER_TYPE_CPU_ID, 0)); + inputs.push_back(make_marker(TRACE_MARKER_TYPE_CPU_ID, 0)); + inputs.push_back(make_marker(TRACE_MARKER_TYPE_CPU_ID, 0)); } + stamp += 10; } inputs.push_back(make_exit(tid)); readers.emplace_back( @@ -1543,8 +1610,11 @@ test_synthetic_with_syscalls_single() for (int i = 0; i < NUM_OUTPUTS; i++) { std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n"; } - assert(sched_as_string[0] == ".AA..AA.A.A.AA..A."); - assert(sched_as_string[1] == "__________________"); + // We expect an idle CPU every 3 instrs but starting at the 2nd (1-based % 3). + assert(sched_as_string[0] == + "..A....A....__A....A.....A....__A.....A....A....__A....."); + assert(sched_as_string[1] == + "________________________________________________________"); } static bool @@ -1626,7 +1696,7 @@ test_synthetic_with_syscalls_precise() std::vector refs; for (scheduler_t::stream_status_t status = stream->next_record(memref); status != scheduler_t::STATUS_EOF; status = stream->next_record(memref)) { - if (status == scheduler_t::STATUS_WAIT) + if (status == scheduler_t::STATUS_WAIT || status == scheduler_t::STATUS_IDLE) continue; assert(status == scheduler_t::STATUS_OK); refs.push_back(memref); @@ -1768,6 +1838,73 @@ test_synthetic_with_syscalls_latencies() assert(res); } +static void +test_synthetic_with_syscalls_idle() +{ + std::cerr << "\n----------------\nTesting syscall idle time duration\n"; + // We test that a blocked input is put to the back of the queue on each retry. + static constexpr int NUM_INPUTS = 4; + static constexpr int NUM_OUTPUTS = 1; + static constexpr int NUM_INSTRS = 12; + static constexpr memref_tid_t TID_BASE = 100; + std::vector sched_inputs; + std::vector readers; + for (int input_idx = 0; input_idx < NUM_INPUTS; input_idx++) { + memref_tid_t tid = TID_BASE + input_idx; + std::vector inputs; + inputs.push_back(make_thread(tid)); + inputs.push_back(make_pid(1)); + inputs.push_back(make_version(TRACE_ENTRY_VERSION)); + uint64_t stamp = 10000 * NUM_INPUTS; + inputs.push_back(make_timestamp(stamp)); + for (int instr_idx = 0; instr_idx < NUM_INSTRS; instr_idx++) { + inputs.push_back(make_instr(42 + instr_idx * 4)); + if (instr_idx == 1) { + // Insert a blocking syscall in one input. + if (input_idx == 0) { + inputs.push_back(make_timestamp(stamp + 10)); + inputs.push_back(make_marker(TRACE_MARKER_TYPE_SYSCALL, 42)); + inputs.push_back( + make_marker(TRACE_MARKER_TYPE_MAYBE_BLOCKING_SYSCALL, 0)); + // Blocked for 2 100-threshold blocking-syscall units, but + // after each queue rejection it should go to the back of + // the queue and all the other inputs should be selected + // before another retry. + inputs.push_back(make_timestamp(stamp + 210)); + } else { + // Insert a timestamp to match the blocked input so the inputs + // are all at equal priority in the queue. + inputs.push_back(make_timestamp(stamp + 210)); + } + } + } + inputs.push_back(make_exit(tid)); + readers.emplace_back(std::unique_ptr(new mock_reader_t(inputs)), + std::unique_ptr(new mock_reader_t()), tid); + } + sched_inputs.emplace_back(std::move(readers)); + scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT, + scheduler_t::DEPENDENCY_TIMESTAMPS, + scheduler_t::SCHEDULER_DEFAULTS, + /*verbosity=*/3); + sched_ops.quantum_duration = 3; + scheduler_t scheduler; + if (scheduler.init(sched_inputs, NUM_OUTPUTS, sched_ops) != + scheduler_t::STATUS_SUCCESS) + assert(false); + std::vector sched_as_string = + run_lockstep_simulation(scheduler, NUM_OUTPUTS, TID_BASE); + for (int i = 0; i < NUM_OUTPUTS; i++) { + std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n"; + } + // The timestamps provide the ABCD ordering, but A's blocking syscall after its + // 2nd instr makes it delayed for 3 full queue cycles of BCD BCD: A's duration + // of 2 is decremented after the 1st (to 1) and 2nd (to 0) and A is finally + // schedulable after the 3rd. + assert(sched_as_string[0] == + "..AA......BB.B..CC.C..DD.DBBBCCCDDDBBBCCCDDDAAABBB.CCC.DDD.AAAAAAA."); +} + static void test_synthetic_with_syscalls() { @@ -1775,6 +1912,7 @@ test_synthetic_with_syscalls() test_synthetic_with_syscalls_single(); test_synthetic_with_syscalls_precise(); test_synthetic_with_syscalls_latencies(); + test_synthetic_with_syscalls_idle(); } #if (defined(X86_64) || defined(ARM_64)) && defined(HAS_ZIP) @@ -3161,7 +3299,6 @@ test_inactive() check_next(stream1, scheduler_t::STATUS_OK, TID_B, TRACE_TYPE_THREAD_EXIT); check_next(stream1, scheduler_t::STATUS_OK, TID_A, TRACE_TYPE_THREAD_EXIT); check_next(stream1, scheduler_t::STATUS_EOF); - if (scheduler.write_recorded_schedule() != scheduler_t::STATUS_SUCCESS) assert(false); } @@ -3209,6 +3346,7 @@ test_direct_switch() std::vector refs_A = { make_thread(TID_A), make_pid(1), + make_version(TRACE_ENTRY_VERSION), // A has the earliest timestamp and starts. make_timestamp(1001), make_marker(TRACE_MARKER_TYPE_CPU_ID, 0), @@ -3226,6 +3364,7 @@ test_direct_switch() std::vector refs_B = { make_thread(TID_B), make_pid(1), + make_version(TRACE_ENTRY_VERSION), // B would go next by timestamp, so this is a good test of direct switches. make_timestamp(2001), make_marker(TRACE_MARKER_TYPE_CPU_ID, 0), @@ -3238,6 +3377,7 @@ test_direct_switch() std::vector refs_C = { make_thread(TID_C), make_pid(1), + make_version(TRACE_ENTRY_VERSION), make_timestamp(3001), make_marker(TRACE_MARKER_TYPE_CPU_ID, 0), make_instr(/*pc=*/301), @@ -3263,9 +3403,11 @@ test_direct_switch() std::unique_ptr(new mock_reader_t()), TID_C); // The string constructor writes "." for markers. // We expect A's first switch to be to C even though B has an earlier timestamp. - // TODO i#5843: Once we have i/o wait times we can make this more interesting - // by having C be unschedule-able at switch time. - static const char *const CORE0_SCHED_STRING = "..AA........CC......A...BBBB.C..."; + // We expect C's direct switch to A to proceed immediately even though A still + // has significant blocked time left. But then after B is scheduled and finishes, + // we still have to wait for C's block time so we see idle underscores: + static const char *const CORE0_SCHED_STRING = + "...AA.........CC......A....BBBB.____________________C..."; std::vector sched_inputs; sched_inputs.emplace_back(std::move(readers)); @@ -3294,6 +3436,7 @@ test_main(int argc, const char *argv[]) // Avoid races with lazy drdecode init (b/279350357). dr_standalone_init(); + test_direct_switch(); test_serial(); test_parallel(); test_param_checks();