Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

i#6726 replay cpuid: Sort as-traced outputs by cpuid #6729

Merged
merged 7 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 65 additions & 37 deletions clients/drcachesim/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -809,10 +809,6 @@ scheduler_tmpl_t<RecordType, ReaderType>::set_initial_schedule(
// The filetype, if present, is before the first timestamp. If we only need the
// filetype we avoid going as far as the timestamp.
bool gather_filetype = options_.read_inputs_in_init;
// Avoid reading ahead for replay as it makes the input ords not match in tests.
if (options_.mapping == MAP_TO_RECORDED_OUTPUT &&
options_.replay_as_traced_istream != nullptr)
gather_filetype = false;
if (gather_filetype || gather_timestamps) {
sched_type_t::scheduler_status_t res =
get_initial_input_content(gather_timestamps);
Expand Down Expand Up @@ -1043,7 +1039,11 @@ scheduler_tmpl_t<RecordType, ReaderType>::read_recorded_schedule()
}
for (int i = 0; i < static_cast<output_ordinal_t>(outputs_.size()); ++i) {
if (outputs_[i].record.empty()) {
// XXX i#6630: We should auto-set the output count and avoid
// having extra outputs; these complicate idle computations, etc.
VPRINT(this, 1, "output %d empty: returning eof up front\n", i);
set_cur_input(i, INVALID_INPUT_ORDINAL);
outputs_[i].at_eof = true;
} else if (outputs_[i].record[0].type == schedule_record_t::IDLE) {
set_cur_input(i, INVALID_INPUT_ORDINAL);
outputs_[i].waiting = true;
Expand Down Expand Up @@ -1081,8 +1081,9 @@ scheduler_tmpl_t<RecordType, ReaderType>::read_traced_schedule()
tid2input[inputs_[i].tid] = i;
}
std::vector<std::set<uint64_t>> start2stop(inputs_.size());
// We number the outputs according to their order in the file.
// XXX i#5843: Should we support some direction from the user on this? Simulation
// We initially number the outputs according to their order in the file, and then
// sort by the stored cpuid below.
// XXX i#6726: Should we support some direction from the user on this? Simulation
// may want to preserve the NUMA relationships and may need to set up its simulated
// cores at init time, so it would prefer to partition by output stream identifier.
// Maybe we could at least add the proposed memtrace_stream_t query for cpuid and
Expand All @@ -1094,6 +1095,8 @@ scheduler_tmpl_t<RecordType, ReaderType>::read_traced_schedule()
std::vector<std::vector<schedule_output_tracker_t>> all_sched(outputs_.size());
// Work around i#6107 by tracking counts sorted by timestamp for each input.
std::vector<std::vector<schedule_input_tracker_t>> input_sched(inputs_.size());
std::vector<output_ordinal_t> disk2index;
std::vector<uint64_t> disk2cpuid;
while (options_.replay_as_traced_istream->read(reinterpret_cast<char *>(&entry),
sizeof(entry))) {
if (entry.cpu != cur_cpu) {
derekbruening marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -1105,9 +1108,8 @@ scheduler_tmpl_t<RecordType, ReaderType>::read_traced_schedule()
}
}
cur_cpu = entry.cpu;
VPRINT(this, 1, "Output #%d is as-traced CPU #%" PRId64 "\n", cur_output,
cur_cpu);
outputs_[cur_output].as_traced_cpuid = cur_cpu;
disk2cpuid.push_back(cur_cpu);
disk2index.push_back(cur_output);
}
input_ordinal_t input = tid2input[entry.thread];
// We'll fill in the stop ordinal in our second pass below.
Expand Down Expand Up @@ -1135,16 +1137,42 @@ scheduler_tmpl_t<RecordType, ReaderType>::read_traced_schedule()
res = remove_zero_instruction_segments(input_sched, all_sched);
if (res != sched_type_t::STATUS_SUCCESS)
return res;
for (int output_idx = 0; output_idx < static_cast<output_ordinal_t>(outputs_.size());
++output_idx) {
// Sort by cpuid to get a more natural ordering.
// Probably raw2trace should do this in the first place, but we have many
// schedule files already out there so we still need a sort here.
std::sort(disk2index.begin(), disk2index.end(),
[disk2cpuid](const output_ordinal_t &l, const output_ordinal_t &r) {
return disk2cpuid[l] < disk2cpuid[r];
abhinav92003 marked this conversation as resolved.
Show resolved Hide resolved
});
// disk2index now holds the sorted index, but we need another array to
// store that in the disk order. E.g., if the original file held 6,2,3,7,
// disk2index would then hold 1,2,0,3, but we want 2,0,1,3.
derekbruening marked this conversation as resolved.
Show resolved Hide resolved
abhinav92003 marked this conversation as resolved.
Show resolved Hide resolved
derekbruening marked this conversation as resolved.
Show resolved Hide resolved
std::vector<output_ordinal_t> disk2output(disk2index.size());
derekbruening marked this conversation as resolved.
Show resolved Hide resolved
for (size_t i = 0; i < disk2index.size(); ++i) {
disk2output[disk2index[i]] = static_cast<output_ordinal_t>(i);
}
for (int disk_idx = 0; disk_idx < static_cast<output_ordinal_t>(outputs_.size());
++disk_idx) {
if (disk_idx >= static_cast<int>(disk2index.size())) {
// XXX i#6630: We should auto-set the output count and avoid
// having extra ouputs; these complicate idle computations, etc.
VPRINT(this, 1, "Output %d empty: returning eof up front\n", disk_idx);
outputs_[disk_idx].at_eof = true;
set_cur_input(disk_idx, INVALID_INPUT_ORDINAL);
continue;
}
output_ordinal_t output_idx = disk2output[disk_idx];
VPRINT(this, 1, "Read %zu as-traced records for output #%d\n",
all_sched[output_idx].size(), output_idx);
all_sched[disk_idx].size(), output_idx);
outputs_[output_idx].as_traced_cpuid = disk2cpuid[disk_idx];
VPRINT(this, 1, "Output #%d is as-traced CPU #%" PRId64 "\n", output_idx,
outputs_[output_idx].as_traced_cpuid);
// Update the stop_instruction field and collapse consecutive entries while
// inserting into the final location.
int start_consec = -1;
for (int sched_idx = 0;
sched_idx < static_cast<int>(all_sched[output_idx].size()); ++sched_idx) {
auto &segment = all_sched[output_idx][sched_idx];
for (int sched_idx = 0; sched_idx < static_cast<int>(all_sched[disk_idx].size());
++sched_idx) {
auto &segment = all_sched[disk_idx][sched_idx];
if (!segment.valid)
continue;
auto find = start2stop[segment.input].find(segment.start_instruction);
Expand All @@ -1158,27 +1186,27 @@ scheduler_tmpl_t<RecordType, ReaderType>::read_traced_schedule()
" time=%" PRId64 "\n",
sched_idx, segment.input, segment.start_instruction,
segment.stop_instruction, segment.timestamp);
if (sched_idx + 1 < static_cast<int>(all_sched[output_idx].size()) &&
segment.input == all_sched[output_idx][sched_idx + 1].input &&
if (sched_idx + 1 < static_cast<int>(all_sched[disk_idx].size()) &&
segment.input == all_sched[disk_idx][sched_idx + 1].input &&
segment.stop_instruction >
all_sched[output_idx][sched_idx + 1].start_instruction) {
all_sched[disk_idx][sched_idx + 1].start_instruction) {
// A second sanity check.
error_string_ = "Invalid decreasing start field in schedule file";
return STATUS_ERROR_INVALID_PARAMETER;
} else if (sched_idx + 1 < static_cast<int>(all_sched[output_idx].size()) &&
segment.input == all_sched[output_idx][sched_idx + 1].input &&
} else if (sched_idx + 1 < static_cast<int>(all_sched[disk_idx].size()) &&
segment.input == all_sched[disk_idx][sched_idx + 1].input &&
segment.stop_instruction ==
all_sched[output_idx][sched_idx + 1].start_instruction) {
all_sched[disk_idx][sched_idx + 1].start_instruction) {
// Collapse into next.
if (start_consec == -1)
start_consec = sched_idx;
} else {
schedule_output_tracker_t &toadd = start_consec >= 0
? all_sched[output_idx][start_consec]
: all_sched[output_idx][sched_idx];
? all_sched[disk_idx][start_consec]
: all_sched[disk_idx][sched_idx];
outputs_[output_idx].record.emplace_back(
schedule_record_t::DEFAULT, toadd.input, toadd.start_instruction,
all_sched[output_idx][sched_idx].stop_instruction, toadd.timestamp);
all_sched[disk_idx][sched_idx].stop_instruction, toadd.timestamp);
start_consec = -1;
VDO(this, 3, {
auto &added = outputs_[output_idx].record.back();
Expand All @@ -1193,20 +1221,20 @@ scheduler_tmpl_t<RecordType, ReaderType>::read_traced_schedule()
}
VPRINT(this, 1, "Collapsed duplicates for %zu as-traced records for output #%d\n",
outputs_[output_idx].record.size(), output_idx);
if (!outputs_[output_idx].record.empty()) {
if (outputs_[output_idx].record[0].value.start_instruction != 0) {
VPRINT(this, 1, "Initial input for output #%d is: wait state\n",
output_idx);
set_cur_input(output_idx, INVALID_INPUT_ORDINAL);
outputs_[output_idx].waiting = true;
outputs_[output_idx].record_index = -1;
} else {
VPRINT(this, 1, "Initial input for output #%d is %d\n", output_idx,
outputs_[output_idx].record[0].key.input);
set_cur_input(output_idx, outputs_[output_idx].record[0].key.input);
}
} else
if (outputs_[output_idx].record.empty()) {
error_string_ = "Empty as-traced schedule";
return STATUS_ERROR_INVALID_PARAMETER;
}
if (outputs_[output_idx].record[0].value.start_instruction != 0) {
VPRINT(this, 1, "Initial input for output #%d is: wait state\n", output_idx);
set_cur_input(output_idx, INVALID_INPUT_ORDINAL);
outputs_[output_idx].waiting = true;
outputs_[output_idx].record_index = -1;
} else {
VPRINT(this, 1, "Initial input for output #%d is %d\n", output_idx,
outputs_[output_idx].record[0].key.input);
set_cur_input(output_idx, outputs_[output_idx].record[0].key.input);
}
}
return STATUS_SUCCESS;
}
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
7 changes: 7 additions & 0 deletions clients/drcachesim/tests/record_filter_start_idle.templatex
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Output .* entries from .* entries.
Schedule stats tool results:
.*
Core #0 schedule: .*
Core #1 schedule: .*
Core #2 schedule: .*
Core #3 schedule: .*
100 changes: 96 additions & 4 deletions clients/drcachesim/tests/scheduler_unit_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3410,6 +3410,93 @@ test_replay_as_traced_dup_start()
#endif
}

static void
test_replay_as_traced_sort()
{
#ifdef HAS_ZIP
// Test that outputs have the cpuids in sorted order.
std::cerr << "\n----------------\nTesting replay as-traced sorting\n";

static constexpr int NUM_INPUTS = 4;
static constexpr int NUM_OUTPUTS = NUM_INPUTS; // Required to be equal.
static constexpr int NUM_INSTRS = 2;
static constexpr memref_tid_t TID_BASE = 100;
static constexpr addr_t PC_BASE = 1000;
static const std::vector<int> CPUIDS = { 42, 7, 56, 3 };
// Index into CPUIDS if sorted.
static const std::vector<int> INDICES = { 3, 1, 0, 2 };
static constexpr uint64_t TIMESTAMP_BASE = 100;

std::vector<trace_entry_t> inputs[NUM_INPUTS];
for (int input_idx = 0; input_idx < NUM_INPUTS; input_idx++) {
memref_tid_t tid = TID_BASE + input_idx;
inputs[input_idx].push_back(make_thread(tid));
inputs[input_idx].push_back(make_pid(1));
// These timestamps do not line up with the schedule file but
// that does not cause problems and leaving it this way
// simplifies the testdata construction.
inputs[input_idx].push_back(make_timestamp(TIMESTAMP_BASE));
inputs[input_idx].push_back(
make_marker(TRACE_MARKER_TYPE_CPU_ID, CPUIDS[input_idx]));
for (int instr_idx = 0; instr_idx < NUM_INSTRS; ++instr_idx) {
inputs[input_idx].push_back(make_instr(PC_BASE + instr_idx));
}
inputs[input_idx].push_back(make_exit(tid));
}

// Synthesize a cpu-schedule file.
derekbruening marked this conversation as resolved.
Show resolved Hide resolved
std::string cpu_fname = "tmp_test_cpu_i6721.zip";
{
zipfile_ostream_t outfile(cpu_fname);
for (int i = 0; i < NUM_OUTPUTS; ++i) {
std::vector<schedule_entry_t> sched;
sched.emplace_back(TID_BASE + i, TIMESTAMP_BASE, CPUIDS[i], 0);
std::ostringstream cpu_string;
cpu_string << CPUIDS[i];
std::string err = outfile.open_new_component(cpu_string.str());
assert(err.empty());
if (!outfile.write(reinterpret_cast<char *>(sched.data()),
sched.size() * sizeof(sched[0])))
assert(false);
}
}

// Replay the recorded schedule.
std::vector<scheduler_t::input_workload_t> sched_inputs;
for (int i = 0; i < NUM_INPUTS; i++) {
memref_tid_t tid = TID_BASE + i;
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(inputs[i])),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), tid);
sched_inputs.emplace_back(std::move(readers));
}
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_RECORDED_OUTPUT,
scheduler_t::DEPENDENCY_TIMESTAMPS,
scheduler_t::SCHEDULER_DEFAULTS,
/*verbosity=*/4);
zipfile_istream_t infile(cpu_fname);
sched_ops.replay_as_traced_istream = &infile;
scheduler_t scheduler;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, std::move(sched_ops)) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
for (int i = 0; i < NUM_OUTPUTS; ++i) {
auto *stream = scheduler.get_stream(i);
memref_t memref;
scheduler_t::stream_status_t status = stream->next_record(memref);
if (status == scheduler_t::STATUS_OK) {
assert(memref.marker.tid == TID_BASE + INDICES[i]);
if (memref.marker.type == TRACE_TYPE_MARKER &&
memref.marker.marker_type == TRACE_MARKER_TYPE_CPU_ID) {
assert(static_cast<int>(memref.marker.marker_value) ==
CPUIDS[INDICES[i]]);
}
} else
assert(status == scheduler_t::STATUS_EOF);
}
#endif
}

static void
test_replay_as_traced_from_file(const char *testdir)
{
Expand All @@ -3420,11 +3507,15 @@ test_replay_as_traced_from_file(const char *testdir)
std::string(testdir) + "/drmemtrace.threadsig.x64.tracedir/cpu_schedule.bin.zip";
// This checked-in trace has 8 threads on 7 cores. It doesn't have
// much thread migration but our synthetic test above covers that.
// The outputs use the stored cores sorted by cpuid.
static const char *const SCHED_STRING =
"Core #0: 1257598 \nCore #1: 1257603 \nCore #2: 1257601 \n"
"Core #3: 1257599 => 1257604 @ <366987,87875,13331862029895453> "
"(<366986,87875,13331862029895453> => <1,0,0>) \n"
"Core #4: 1257600 \nCore #5: 1257596 \nCore #6: 1257602 \n";
"Core #0: 1257602 \nCore #1: 1257600 \n"
"Core #2: 1257599 => 1257604 @ <366987,87875,13331862029895453> "
// The ordinal is really 1 ("<1,0,0>") but with the scheduler's readahead
// it becomes 2; easier to just check for that as trying to avoid readahead
// causes other problems (i#xxxx).
"(<366986,87875,13331862029895453> => <2,0,0>) \n"
"Core #3: 1257596 \nCore #4: 1257603 \nCore #5: 1257601 \nCore #6: 1257598 \n";
static constexpr int NUM_OUTPUTS = 7; // Matches the actual trace's core footprint.
scheduler_t scheduler;
std::vector<scheduler_t::input_workload_t> sched_inputs;
Expand Down Expand Up @@ -4246,6 +4337,7 @@ test_main(int argc, const char *argv[])
test_replay_as_traced();
test_replay_as_traced_i6107_workaround();
test_replay_as_traced_dup_start();
test_replay_as_traced_sort();
test_inactive();
test_direct_switch();
test_kernel_switch_sequences();
Expand Down
Loading
Loading