Skip to content

Commit

Permalink
i#5843 scheduler: Add direct thread switch support (#6424)
Browse files Browse the repository at this point in the history
Adds support for the TRACE_MARKER_TYPE_DIRECT_THREAD_SWITCH marker, when
it appears after TRACE_MARKER_TYPE_MAYBE_BLOCKING_SYSCALL. The scheduler
directly switches to the target thread if it is on the ready queue.
Performing a forced migration if the target is running on another output
is not yet implemented. Once i/o wait states are added, waking up a
target thread will be added, but that is future work as well.

Adds a DEPENDENCY_DIRECT_SWITCH_BITFIELD and renames
DEPENDENCY_TIMESTAMPS
to DEPENDENCY_TIMESTAMP_BITFIELD so we can combine them, and makes a new
enum entry DEPENDENCY_TIMESTAMPS which combines the two bitfields, which is
what nearly every use case should want while still giving us control and
without really breaking compatibility (and by providing bits and
combinations the enum type is all that's needed still).

Adds a unit test where the schedule would clearly be different without
the switch target.

Issue: #5843
  • Loading branch information
derekbruening authored Nov 12, 2023
1 parent a633603 commit 568aef5
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 12 deletions.
4 changes: 4 additions & 0 deletions api/docs/release.dox
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ changes:
calls changed to contain the actual return value, rather than just whether
successful. A new marker #dynamorio::drmemtrace::TRACE_MARKER_TYPE_SYSCALL_FAILED
was added to indicate failure.
- Changed the enum value of #dynamorio::drmemtrace::scheduler_t::DEPENDENCY_TIMESTAMPS
to include direct switch dependencies. This is not a binary compatibility change
as the old value still refers purely to timestamps, but on a recompile it
refers to timestamps and direct switches, which is what most users should want.

Further non-compatibility-affecting changes include:
- Fixed a bug in the AArch64 codec with the way that SVE scalar+immediate predicated
Expand Down
12 changes: 8 additions & 4 deletions clients/drcachesim/common/trace_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -561,10 +561,14 @@ typedef enum {
TRACE_MARKER_TYPE_SYSCALL_FAILED,

/**
* This marker is emitted prior to a system call that causes an immediate switch to
* another thread on the same core (with the current thread entering an unscheduled
* state), bypassing the kernel scheduler's normal dynamic switch code based on run
* queues. The marker value holds the thread id of the target thread.
* This marker is emitted prior to a system call (but after the system call's
* #TRACE_MARKER_TYPE_SYSCALL and #TRACE_MARKER_TYPE_MAYBE_BLOCKING_SYSCALL markers)
* that causes an immediate switch to another thread on the same core (with the
* current thread entering an unscheduled state), bypassing the kernel scheduler's
* normal dynamic switch code based on run queues. The marker value holds the
* thread id of the target thread. This should generally always be after a
* #TRACE_MARKER_TYPE_MAYBE_BLOCKING_SYSCALL marker as such a switch always
* has a chance of blocking if the target needs to be migrated.
*/
TRACE_MARKER_TYPE_DIRECT_THREAD_SWITCH,

Expand Down
53 changes: 49 additions & 4 deletions clients/drcachesim/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::init(
input.reader_end = std::move(reader.end);
input.needs_init = true;
workload_tids[input.tid] = input.index;
tid2input_[workload_tid_t(workload_idx, input.tid)] = index;
}
} else {
if (!workload.readers.empty())
Expand All @@ -556,6 +557,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::init(
for (const auto &it : workload_tids) {
inputs_[it.second].workload = workload_idx;
workload2inputs[workload_idx].push_back(it.second);
tid2input_[workload_tid_t(workload_idx, it.first)] = it.second;
}
}
for (const auto &modifiers : workload.thread_modifiers) {
Expand Down Expand Up @@ -1567,9 +1569,9 @@ typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_tmpl_t<RecordType, ReaderType>::set_cur_input(output_ordinal_t output,
input_ordinal_t input)
{
// XXX i#5843: Merge tracking of current inputs with ready_queue_ to better manage
// XXX i#5843: Merge tracking of current inputs with ready_priority_ to better manage
// the possible 3 states of each input (a live cur_input for an output stream, in
// the ready_queue_, or at EOF).
// the ready_queue_, or at EOF) (4 states once we add i/o wait times).
assert(output >= 0 && output < static_cast<output_ordinal_t>(outputs_.size()));
// 'input' might be INVALID_INPUT_ORDINAL.
assert(input < static_cast<input_ordinal_t>(inputs_.size()));
Expand Down Expand Up @@ -1764,7 +1766,37 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
if (res != sched_type_t::STATUS_OK)
return res;
} else if (options_.mapping == MAP_TO_ANY_OUTPUT) {
if (ready_queue_empty()) {
if (prev_index != INVALID_INPUT_ORDINAL &&
inputs_[prev_index].switch_to_input != INVALID_INPUT_ORDINAL) {
input_info_t *target = &inputs_[inputs_[prev_index].switch_to_input];
inputs_[prev_index].switch_to_input = INVALID_INPUT_ORDINAL;
// TODO i#5843: Once we add i/o wait times, we should also check
// the sleeping queue and wake up the target.
// We should probably implement the "Merge tracking..." proposal
// from the comment at the top of set_cur_input() too.
// XXX i#5843: Add an invariant check that the next timestamp of the
// target is later than the pre-switch-syscall timestamp?
if (ready_priority_.find(target)) {
VPRINT(this, 2, "next_record[%d]: direct switch to input %d\n",
output, target->index);
ready_priority_.erase(target);
index = target->index;
} else {
// TODO i#5843: If the target is running on another output, we
// need to do a forced migration by setting a flag to force a
// preempt and presumably waiting (STATUS_WAIT or STATUS_IDLE?)
// here until the input is available.
// For now we print a message so we can notice when this
// happens, but we ignore the direct switch request.
VPRINT(this, 1,
"Direct switch target input #%d is running elsewhere and "
"forced migration is NYI\n",
target->index);
}
}
if (index != INVALID_INPUT_ORDINAL) {
// We found a direct switch target above.
} else if (ready_queue_empty()) {
if (prev_index == INVALID_INPUT_ORDINAL)
return sched_type_t::STATUS_EOF;
std::lock_guard<std::mutex> lock(*inputs_[prev_index].lock);
Expand Down Expand Up @@ -1979,7 +2011,20 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
// XXX: We may prefer to stop before the return value marker for futex,
// or a kernel xfer marker, but our recorded format is on instr
// boundaries so we live with those being before the switch.
if (record_type_is_instr(record)) {
if (record_type_is_marker(record, marker_type, marker_value) &&
marker_type == TRACE_MARKER_TYPE_DIRECT_THREAD_SWITCH) {
memref_tid_t target_tid = marker_value;
auto it =
tid2input_.find(workload_tid_t(input->workload, target_tid));
if (it == tid2input_.end()) {
VPRINT(this, 1,
"Failed to find input for target switch thread %" PRId64
"\n",
target_tid);
} else {
input->switch_to_input = it->second;
}
} else if (record_type_is_instr(record)) {
// Assume it will block and we should switch to a different input.
need_new_input = true;
in_wait_state = true;
Expand Down
54 changes: 50 additions & 4 deletions clients/drcachesim/scheduler/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,10 +355,14 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
MAP_AS_PREVIOUSLY,
};

/** Flags specifying how inter-input-stream dependencies are handled. */
/**
* Flags specifying how inter-input-stream dependencies are handled. The _BITFIELD
* values can be combined. Typical combinations are provided so the enum type can be
* used directly.
*/
enum inter_input_dependency_t {
/** Ignores all inter-input dependencies. */
DEPENDENCY_IGNORE,
DEPENDENCY_IGNORE = 0x00,
/**
* Ensures timestamps in the inputs arrive at the outputs in timestamp order.
* For #MAP_TO_ANY_OUTPUT, enforcing asked-for context switch rates is more
Expand All @@ -367,7 +371,23 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
* points for picking the next input, but timestamps will not preempt an input.
* To precisely follow the recorded timestamps, use #MAP_TO_RECORDED_OUTPUT.
*/
DEPENDENCY_TIMESTAMPS,
DEPENDENCY_TIMESTAMPS_BITFIELD = 0x01,
/**
* Ensures timestamps in the inputs arrive at the outputs in timestamp order.
* For #MAP_TO_ANY_OUTPUT, enforcing asked-for context switch rates is more
* important that honoring precise trace-buffer-based timestamp inter-input
* dependencies: thus, timestamp ordering will be followed at context switch
* points for picking the next input, but timestamps will not preempt an input.
* To precisely follow the recorded timestamps, use #MAP_TO_RECORDED_OUTPUT.
*/
DEPENDENCY_DIRECT_SWITCH_BITFIELD = 0x02,
/**
* Combines #DEPENDENCY_TIMESTAMPS_BITFIELD and
* #DEPENDENCY_DIRECT_SWITCH_BITFIELD. This is the recommended setting for most
* schedules.
*/
DEPENDENCY_TIMESTAMPS =
(DEPENDENCY_TIMESTAMPS_BITFIELD | DEPENDENCY_DIRECT_SWITCH_BITFIELD),
// TODO i#5843: Add inferred data dependencies.
};

Expand Down Expand Up @@ -945,8 +965,11 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
bool order_by_timestamp = false;
// Global ready queue counter used to provide FIFO for same-priority inputs.
uint64_t queue_counter = 0;
// Used to switch on the insruction *after* a blocking syscall.
// Used to switch on the instruction *after* a blocking syscall.
bool processing_blocking_syscall = false;
// Use for special kernel features where one thread specifies a target
// thread to replace it.
input_ordinal_t switch_to_input = INVALID_INPUT_ORDINAL;
// Used to switch before we've read the next instruction.
bool switching_pre_instruction = false;
// Used for time-based quanta.
Expand Down Expand Up @@ -1282,6 +1305,29 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
flexible_queue_t<input_info_t *, InputTimestampComparator> ready_priority_;
// Global ready queue counter used to provide FIFO for same-priority inputs.
uint64_t ready_counter_ = 0;
// Map from workload,tid pair to input.
struct workload_tid_t {
workload_tid_t(int wl, memref_tid_t tid)
: workload(wl)
, tid(tid)
{
}
bool
operator==(const workload_tid_t &rhs) const
{
return workload == rhs.workload && tid == rhs.tid;
}
int workload;
memref_tid_t tid;
};
struct workload_tid_hash_t {
std::size_t
operator()(const workload_tid_t &wt) const
{
return std::hash<int>()(wt.workload) ^ std::hash<memref_tid_t>()(wt.tid);
}
};
std::unordered_map<workload_tid_t, input_ordinal_t, workload_tid_hash_t> tid2input_;
};

/** See #dynamorio::drmemtrace::scheduler_tmpl_t. */
Expand Down
92 changes: 92 additions & 0 deletions clients/drcachesim/tests/scheduler_unit_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2964,6 +2964,97 @@ test_inactive()
#endif // HAS_ZIP
}

static void
test_direct_switch()
{
std::cerr << "\n----------------\nTesting direct switches\n";
// We have just 1 output to better control the order and avoid flakiness.
static constexpr int NUM_OUTPUTS = 1;
static constexpr int QUANTUM_DURATION = 100; // Never reached.
static constexpr memref_tid_t TID_BASE = 100;
static constexpr memref_tid_t TID_A = TID_BASE + 0;
static constexpr memref_tid_t TID_B = TID_BASE + 1;
static constexpr memref_tid_t TID_C = TID_BASE + 2;
std::vector<trace_entry_t> refs_A = {
make_thread(TID_A),
make_pid(1),
// A has the earliest timestamp and starts.
make_timestamp(1001),
make_marker(TRACE_MARKER_TYPE_CPU_ID, 0),
make_instr(/*pc=*/101),
make_instr(/*pc=*/102),
make_timestamp(1002),
make_marker(TRACE_MARKER_TYPE_CPU_ID, 0),
make_marker(TRACE_MARKER_TYPE_SYSCALL, 999),
make_marker(TRACE_MARKER_TYPE_MAYBE_BLOCKING_SYSCALL, 0),
make_marker(TRACE_MARKER_TYPE_DIRECT_THREAD_SWITCH, TID_C),
make_timestamp(4001),
make_instr(/*pc=*/401),
make_exit(TID_A),
};
std::vector<trace_entry_t> refs_B = {
make_thread(TID_B),
make_pid(1),
// B would go next by timestamp, so this is a good test of direct switches.
make_timestamp(2001),
make_marker(TRACE_MARKER_TYPE_CPU_ID, 0),
make_instr(/*pc=*/201),
make_instr(/*pc=*/202),
make_instr(/*pc=*/203),
make_instr(/*pc=*/204),
make_exit(TID_B),
};
std::vector<trace_entry_t> refs_C = {
make_thread(TID_C),
make_pid(1),
make_timestamp(3001),
make_marker(TRACE_MARKER_TYPE_CPU_ID, 0),
make_instr(/*pc=*/301),
make_instr(/*pc=*/302),
make_timestamp(3002),
make_marker(TRACE_MARKER_TYPE_CPU_ID, 0),
make_marker(TRACE_MARKER_TYPE_SYSCALL, 999),
make_marker(TRACE_MARKER_TYPE_MAYBE_BLOCKING_SYSCALL, 0),
make_marker(TRACE_MARKER_TYPE_DIRECT_THREAD_SWITCH, TID_A),
make_timestamp(5001),
make_instr(/*pc=*/501),
// Test a non-existent target: should be ignored, but not crash.
make_marker(TRACE_MARKER_TYPE_MAYBE_BLOCKING_SYSCALL, 0),
make_marker(TRACE_MARKER_TYPE_DIRECT_THREAD_SWITCH, TID_BASE + 3),
make_exit(TID_C),
};
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(refs_A)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_A);
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(refs_B)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_B);
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(refs_C)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_C);
// The string constructor writes "." for markers.
// We expect A's first switch to be to C even though B has an earlier timestamp.
// TODO i#5843: Once we have i/o wait times we can make this more interesting
// by having C be unschedule-able at switch time.
static const char *const CORE0_SCHED_STRING = "..AA........CC......A...BBBB.C...";

std::vector<scheduler_t::input_workload_t> sched_inputs;
sched_inputs.emplace_back(std::move(readers));
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT,
scheduler_t::DEPENDENCY_TIMESTAMPS,
scheduler_t::SCHEDULER_DEFAULTS,
/*verbosity=*/3);
sched_ops.quantum_duration = QUANTUM_DURATION;
scheduler_t scheduler;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, sched_ops) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
std::vector<std::string> sched_as_string =
run_lockstep_simulation(scheduler, NUM_OUTPUTS, TID_BASE);
for (int i = 0; i < NUM_OUTPUTS; i++) {
std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n";
}
assert(sched_as_string[0] == CORE0_SCHED_STRING);
}

int
test_main(int argc, const char *argv[])
{
Expand Down Expand Up @@ -2996,6 +3087,7 @@ test_main(int argc, const char *argv[])
test_replay_as_traced();
test_replay_as_traced_i6107_workaround();
test_inactive();
test_direct_switch();

dr_standalone_exit();
return 0;
Expand Down

0 comments on commit 568aef5

Please sign in to comment.