Skip to content

Commit

Permalink
i#6726 replay cpuid: Sort as-traced outputs by cpuid (#6729)
Browse files Browse the repository at this point in the history
Currently, in as-traced mode the output streams are assigned to the
cores in the as-traced schedule file in file order. But that order is
essentially random, which scrambles key arrangements like which core is
on which socket. Here we sort by the recorded cpuid to recreate the same
cpuid order as before.

Adds a unit test.  Also tested on larger cases with > 100 cores.

Issue: #6726
  • Loading branch information
derekbruening authored Mar 27, 2024
1 parent ea2bf8c commit d20abf4
Show file tree
Hide file tree
Showing 3 changed files with 211 additions and 89 deletions.
106 changes: 69 additions & 37 deletions clients/drcachesim/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1081,8 +1081,9 @@ scheduler_tmpl_t<RecordType, ReaderType>::read_traced_schedule()
tid2input[inputs_[i].tid] = i;
}
std::vector<std::set<uint64_t>> start2stop(inputs_.size());
// We number the outputs according to their order in the file.
// XXX i#5843: Should we support some direction from the user on this? Simulation
// We initially number the outputs according to their order in the file, and then
// sort by the stored cpuid below.
// XXX i#6726: Should we support some direction from the user on this? Simulation
// may want to preserve the NUMA relationships and may need to set up its simulated
// cores at init time, so it would prefer to partition by output stream identifier.
// Maybe we could at least add the proposed memtrace_stream_t query for cpuid and
Expand All @@ -1094,9 +1095,13 @@ scheduler_tmpl_t<RecordType, ReaderType>::read_traced_schedule()
std::vector<std::vector<schedule_output_tracker_t>> all_sched(outputs_.size());
// Work around i#6107 by tracking counts sorted by timestamp for each input.
std::vector<std::vector<schedule_input_tracker_t>> input_sched(inputs_.size());
// These hold entries added in the on-disk (unsorted) order.
std::vector<output_ordinal_t> disk_ord2index; // Initially [i] holds i.
std::vector<uint64_t> disk_ord2cpuid; // [i] holds cpuid for entry i.
while (options_.replay_as_traced_istream->read(reinterpret_cast<char *>(&entry),
sizeof(entry))) {
if (entry.cpu != cur_cpu) {
// This is a zipfile component boundary: one conmponent per cpu.
if (cur_cpu != std::numeric_limits<uint64_t>::max()) {
++cur_output;
if (cur_output >= static_cast<int>(outputs_.size())) {
Expand All @@ -1105,9 +1110,8 @@ scheduler_tmpl_t<RecordType, ReaderType>::read_traced_schedule()
}
}
cur_cpu = entry.cpu;
VPRINT(this, 1, "Output #%d is as-traced CPU #%" PRId64 "\n", cur_output,
cur_cpu);
outputs_[cur_output].as_traced_cpuid = cur_cpu;
disk_ord2cpuid.push_back(cur_cpu);
disk_ord2index.push_back(cur_output);
}
input_ordinal_t input = tid2input[entry.thread];
// We'll fill in the stop ordinal in our second pass below.
Expand Down Expand Up @@ -1135,16 +1139,49 @@ scheduler_tmpl_t<RecordType, ReaderType>::read_traced_schedule()
res = remove_zero_instruction_segments(input_sched, all_sched);
if (res != sched_type_t::STATUS_SUCCESS)
return res;
for (int output_idx = 0; output_idx < static_cast<output_ordinal_t>(outputs_.size());
++output_idx) {
// Sort by cpuid to get a more natural ordering.
// Probably raw2trace should do this in the first place, but we have many
// schedule files already out there so we still need a sort here.
// If we didn't have cross-indices pointing at all_sched from input_sched, we
// would just sort all_sched: but instead we have to construct a separate
// ordering structure.
std::sort(disk_ord2index.begin(), disk_ord2index.end(),
[disk_ord2cpuid](const output_ordinal_t &l, const output_ordinal_t &r) {
return disk_ord2cpuid[l] < disk_ord2cpuid[r];
});
// disk_ord2index[i] used to hold i; now after sorting it holds the ordinal in
// the disk file that has the ith largest cpuid. We need to turn that into
// the output_idx ordinal for the cpu at ith ordinal in the disk file, for
// which we use a new vector disk_ord2output.
// E.g., if the original file was in this order disk_ord2cpuid = {6,2,3,7},
// disk_ord2index after sorting would hold {1,2,0,3}, which we want to turn
// into disk_ord2output = {2,0,1,3}.
std::vector<output_ordinal_t> disk_ord2output(disk_ord2index.size());
for (size_t i = 0; i < disk_ord2index.size(); ++i) {
disk_ord2output[disk_ord2index[i]] = static_cast<output_ordinal_t>(i);
}
for (int disk_idx = 0; disk_idx < static_cast<output_ordinal_t>(outputs_.size());
++disk_idx) {
if (disk_idx >= static_cast<int>(disk_ord2index.size())) {
// XXX i#6630: We should auto-set the output count and avoid
// having extra ouputs; these complicate idle computations, etc.
VPRINT(this, 1, "Output %d empty: returning eof up front\n", disk_idx);
outputs_[disk_idx].at_eof = true;
set_cur_input(disk_idx, INVALID_INPUT_ORDINAL);
continue;
}
output_ordinal_t output_idx = disk_ord2output[disk_idx];
VPRINT(this, 1, "Read %zu as-traced records for output #%d\n",
all_sched[output_idx].size(), output_idx);
all_sched[disk_idx].size(), output_idx);
outputs_[output_idx].as_traced_cpuid = disk_ord2cpuid[disk_idx];
VPRINT(this, 1, "Output #%d is as-traced CPU #%" PRId64 "\n", output_idx,
outputs_[output_idx].as_traced_cpuid);
// Update the stop_instruction field and collapse consecutive entries while
// inserting into the final location.
int start_consec = -1;
for (int sched_idx = 0;
sched_idx < static_cast<int>(all_sched[output_idx].size()); ++sched_idx) {
auto &segment = all_sched[output_idx][sched_idx];
for (int sched_idx = 0; sched_idx < static_cast<int>(all_sched[disk_idx].size());
++sched_idx) {
auto &segment = all_sched[disk_idx][sched_idx];
if (!segment.valid)
continue;
auto find = start2stop[segment.input].find(segment.start_instruction);
Expand All @@ -1158,27 +1195,27 @@ scheduler_tmpl_t<RecordType, ReaderType>::read_traced_schedule()
" time=%" PRId64 "\n",
sched_idx, segment.input, segment.start_instruction,
segment.stop_instruction, segment.timestamp);
if (sched_idx + 1 < static_cast<int>(all_sched[output_idx].size()) &&
segment.input == all_sched[output_idx][sched_idx + 1].input &&
if (sched_idx + 1 < static_cast<int>(all_sched[disk_idx].size()) &&
segment.input == all_sched[disk_idx][sched_idx + 1].input &&
segment.stop_instruction >
all_sched[output_idx][sched_idx + 1].start_instruction) {
all_sched[disk_idx][sched_idx + 1].start_instruction) {
// A second sanity check.
error_string_ = "Invalid decreasing start field in schedule file";
return STATUS_ERROR_INVALID_PARAMETER;
} else if (sched_idx + 1 < static_cast<int>(all_sched[output_idx].size()) &&
segment.input == all_sched[output_idx][sched_idx + 1].input &&
} else if (sched_idx + 1 < static_cast<int>(all_sched[disk_idx].size()) &&
segment.input == all_sched[disk_idx][sched_idx + 1].input &&
segment.stop_instruction ==
all_sched[output_idx][sched_idx + 1].start_instruction) {
all_sched[disk_idx][sched_idx + 1].start_instruction) {
// Collapse into next.
if (start_consec == -1)
start_consec = sched_idx;
} else {
schedule_output_tracker_t &toadd = start_consec >= 0
? all_sched[output_idx][start_consec]
: all_sched[output_idx][sched_idx];
? all_sched[disk_idx][start_consec]
: all_sched[disk_idx][sched_idx];
outputs_[output_idx].record.emplace_back(
schedule_record_t::DEFAULT, toadd.input, toadd.start_instruction,
all_sched[output_idx][sched_idx].stop_instruction, toadd.timestamp);
all_sched[disk_idx][sched_idx].stop_instruction, toadd.timestamp);
start_consec = -1;
VDO(this, 3, {
auto &added = outputs_[output_idx].record.back();
Expand All @@ -1193,24 +1230,19 @@ scheduler_tmpl_t<RecordType, ReaderType>::read_traced_schedule()
}
VPRINT(this, 1, "Collapsed duplicates for %zu as-traced records for output #%d\n",
outputs_[output_idx].record.size(), output_idx);
if (!outputs_[output_idx].record.empty()) {
if (outputs_[output_idx].record[0].value.start_instruction != 0) {
VPRINT(this, 1, "Initial input for output #%d is: wait state\n",
output_idx);
set_cur_input(output_idx, INVALID_INPUT_ORDINAL);
outputs_[output_idx].waiting = true;
outputs_[output_idx].record_index = -1;
} else {
VPRINT(this, 1, "Initial input for output #%d is %d\n", output_idx,
outputs_[output_idx].record[0].key.input);
set_cur_input(output_idx, outputs_[output_idx].record[0].key.input);
}
} else {
// XXX i#6630: We should auto-set the output count and avoid
// having extra ouputs; these complicate idle computations, etc.
VPRINT(this, 1, "Output %d empty: returning eof up front\n", output_idx);
outputs_[output_idx].at_eof = true;
if (outputs_[output_idx].record.empty()) {
error_string_ = "Empty as-traced schedule";
return STATUS_ERROR_INVALID_PARAMETER;
}
if (outputs_[output_idx].record[0].value.start_instruction != 0) {
VPRINT(this, 1, "Initial input for output #%d is: wait state\n", output_idx);
set_cur_input(output_idx, INVALID_INPUT_ORDINAL);
outputs_[output_idx].waiting = true;
outputs_[output_idx].record_index = -1;
} else {
VPRINT(this, 1, "Initial input for output #%d is %d\n", output_idx,
outputs_[output_idx].record[0].key.input);
set_cur_input(output_idx, outputs_[output_idx].record[0].key.input);
}
}
return STATUS_SUCCESS;
Expand Down
98 changes: 94 additions & 4 deletions clients/drcachesim/tests/scheduler_unit_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3410,6 +3410,94 @@ test_replay_as_traced_dup_start()
#endif
}

static void
test_replay_as_traced_sort()
{
#ifdef HAS_ZIP
// Test that outputs have the cpuids in sorted order.
std::cerr << "\n----------------\nTesting replay as-traced sorting\n";

static constexpr int NUM_INPUTS = 4;
static constexpr int NUM_OUTPUTS = NUM_INPUTS; // Required to be equal.
static constexpr int NUM_INSTRS = 2;
static constexpr memref_tid_t TID_BASE = 100;
static constexpr addr_t PC_BASE = 1000;
// Our unsorted cpuid order in the file.
static const std::vector<int> CPUIDS = { 42, 7, 56, 3 };
// Index into CPUIDS if sorted.
static const std::vector<int> INDICES = { 3, 1, 0, 2 };
static constexpr uint64_t TIMESTAMP_BASE = 100;

std::vector<trace_entry_t> inputs[NUM_INPUTS];
for (int input_idx = 0; input_idx < NUM_INPUTS; input_idx++) {
memref_tid_t tid = TID_BASE + input_idx;
inputs[input_idx].push_back(make_thread(tid));
inputs[input_idx].push_back(make_pid(1));
// These timestamps do not line up with the schedule file but
// that does not cause problems and leaving it this way
// simplifies the testdata construction.
inputs[input_idx].push_back(make_timestamp(TIMESTAMP_BASE));
inputs[input_idx].push_back(
make_marker(TRACE_MARKER_TYPE_CPU_ID, CPUIDS[input_idx]));
for (int instr_idx = 0; instr_idx < NUM_INSTRS; ++instr_idx) {
inputs[input_idx].push_back(make_instr(PC_BASE + instr_idx));
}
inputs[input_idx].push_back(make_exit(tid));
}

// Synthesize a cpu-schedule file with unsorted entries (see CPUIDS above).
std::string cpu_fname = "tmp_test_cpu_i6721.zip";
{
zipfile_ostream_t outfile(cpu_fname);
for (int i = 0; i < NUM_OUTPUTS; ++i) {
std::vector<schedule_entry_t> sched;
sched.emplace_back(TID_BASE + i, TIMESTAMP_BASE, CPUIDS[i], 0);
std::ostringstream cpu_string;
cpu_string << CPUIDS[i];
std::string err = outfile.open_new_component(cpu_string.str());
assert(err.empty());
if (!outfile.write(reinterpret_cast<char *>(sched.data()),
sched.size() * sizeof(sched[0])))
assert(false);
}
}

// Replay the recorded schedule.
std::vector<scheduler_t::input_workload_t> sched_inputs;
for (int i = 0; i < NUM_INPUTS; i++) {
memref_tid_t tid = TID_BASE + i;
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(inputs[i])),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), tid);
sched_inputs.emplace_back(std::move(readers));
}
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_RECORDED_OUTPUT,
scheduler_t::DEPENDENCY_TIMESTAMPS,
scheduler_t::SCHEDULER_DEFAULTS,
/*verbosity=*/4);
zipfile_istream_t infile(cpu_fname);
sched_ops.replay_as_traced_istream = &infile;
scheduler_t scheduler;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, std::move(sched_ops)) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
for (int i = 0; i < NUM_OUTPUTS; ++i) {
auto *stream = scheduler.get_stream(i);
memref_t memref;
scheduler_t::stream_status_t status = stream->next_record(memref);
if (status == scheduler_t::STATUS_OK) {
assert(memref.marker.tid == TID_BASE + INDICES[i]);
if (memref.marker.type == TRACE_TYPE_MARKER &&
memref.marker.marker_type == TRACE_MARKER_TYPE_CPU_ID) {
assert(static_cast<int>(memref.marker.marker_value) ==
CPUIDS[INDICES[i]]);
}
} else
assert(status == scheduler_t::STATUS_EOF);
}
#endif
}

static void
test_replay_as_traced_from_file(const char *testdir)
{
Expand All @@ -3420,14 +3508,15 @@ test_replay_as_traced_from_file(const char *testdir)
std::string(testdir) + "/drmemtrace.threadsig.x64.tracedir/cpu_schedule.bin.zip";
// This checked-in trace has 8 threads on 7 cores. It doesn't have
// much thread migration but our synthetic test above covers that.
// The outputs use the stored cores sorted by cpuid.
static const char *const SCHED_STRING =
"Core #0: 1257598 \nCore #1: 1257603 \nCore #2: 1257601 \n"
"Core #3: 1257599 => 1257604 @ <366987,87875,13331862029895453> "
"Core #0: 1257602 \nCore #1: 1257600 \n"
"Core #2: 1257599 => 1257604 @ <366987,87875,13331862029895453> "
// The ordinal is really 1 ("<1,0,0>") but with the scheduler's readahead
// it becomes 2; easier to just check for that as trying to avoid readahead
// causes other problems with start-idle cores (i#6721).
// causes other problems (i#xxxx).
"(<366986,87875,13331862029895453> => <2,0,0>) \n"
"Core #4: 1257600 \nCore #5: 1257596 \nCore #6: 1257602 \n";
"Core #3: 1257596 \nCore #4: 1257603 \nCore #5: 1257601 \nCore #6: 1257598 \n";
static constexpr int NUM_OUTPUTS = 7; // Matches the actual trace's core footprint.
scheduler_t scheduler;
std::vector<scheduler_t::input_workload_t> sched_inputs;
Expand Down Expand Up @@ -4249,6 +4338,7 @@ test_main(int argc, const char *argv[])
test_replay_as_traced();
test_replay_as_traced_i6107_workaround();
test_replay_as_traced_dup_start();
test_replay_as_traced_sort();
test_inactive();
test_direct_switch();
test_kernel_switch_sequences();
Expand Down
Loading

0 comments on commit d20abf4

Please sign in to comment.