Skip to content

Commit

Permalink
Merge branch 'master' into i6734-shard-filetype
Browse files Browse the repository at this point in the history
  • Loading branch information
derekbruening authored Mar 27, 2024
2 parents a8c1393 + d20abf4 commit d9eb0c3
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 93 deletions.
106 changes: 69 additions & 37 deletions clients/drcachesim/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1081,8 +1081,9 @@ scheduler_tmpl_t<RecordType, ReaderType>::read_traced_schedule()
tid2input[inputs_[i].tid] = i;
}
std::vector<std::set<uint64_t>> start2stop(inputs_.size());
// We number the outputs according to their order in the file.
// XXX i#5843: Should we support some direction from the user on this? Simulation
// We initially number the outputs according to their order in the file, and then
// sort by the stored cpuid below.
// XXX i#6726: Should we support some direction from the user on this? Simulation
// may want to preserve the NUMA relationships and may need to set up its simulated
// cores at init time, so it would prefer to partition by output stream identifier.
// Maybe we could at least add the proposed memtrace_stream_t query for cpuid and
Expand All @@ -1094,9 +1095,13 @@ scheduler_tmpl_t<RecordType, ReaderType>::read_traced_schedule()
std::vector<std::vector<schedule_output_tracker_t>> all_sched(outputs_.size());
// Work around i#6107 by tracking counts sorted by timestamp for each input.
std::vector<std::vector<schedule_input_tracker_t>> input_sched(inputs_.size());
// These hold entries added in the on-disk (unsorted) order.
std::vector<output_ordinal_t> disk_ord2index; // Initially [i] holds i.
std::vector<uint64_t> disk_ord2cpuid; // [i] holds cpuid for entry i.
while (options_.replay_as_traced_istream->read(reinterpret_cast<char *>(&entry),
sizeof(entry))) {
if (entry.cpu != cur_cpu) {
// This is a zipfile component boundary: one conmponent per cpu.
if (cur_cpu != std::numeric_limits<uint64_t>::max()) {
++cur_output;
if (cur_output >= static_cast<int>(outputs_.size())) {
Expand All @@ -1105,9 +1110,8 @@ scheduler_tmpl_t<RecordType, ReaderType>::read_traced_schedule()
}
}
cur_cpu = entry.cpu;
VPRINT(this, 1, "Output #%d is as-traced CPU #%" PRId64 "\n", cur_output,
cur_cpu);
outputs_[cur_output].as_traced_cpuid = cur_cpu;
disk_ord2cpuid.push_back(cur_cpu);
disk_ord2index.push_back(cur_output);
}
input_ordinal_t input = tid2input[entry.thread];
// We'll fill in the stop ordinal in our second pass below.
Expand Down Expand Up @@ -1135,16 +1139,49 @@ scheduler_tmpl_t<RecordType, ReaderType>::read_traced_schedule()
res = remove_zero_instruction_segments(input_sched, all_sched);
if (res != sched_type_t::STATUS_SUCCESS)
return res;
for (int output_idx = 0; output_idx < static_cast<output_ordinal_t>(outputs_.size());
++output_idx) {
// Sort by cpuid to get a more natural ordering.
// Probably raw2trace should do this in the first place, but we have many
// schedule files already out there so we still need a sort here.
// If we didn't have cross-indices pointing at all_sched from input_sched, we
// would just sort all_sched: but instead we have to construct a separate
// ordering structure.
std::sort(disk_ord2index.begin(), disk_ord2index.end(),
[disk_ord2cpuid](const output_ordinal_t &l, const output_ordinal_t &r) {
return disk_ord2cpuid[l] < disk_ord2cpuid[r];
});
// disk_ord2index[i] used to hold i; now after sorting it holds the ordinal in
// the disk file that has the ith largest cpuid. We need to turn that into
// the output_idx ordinal for the cpu at ith ordinal in the disk file, for
// which we use a new vector disk_ord2output.
// E.g., if the original file was in this order disk_ord2cpuid = {6,2,3,7},
// disk_ord2index after sorting would hold {1,2,0,3}, which we want to turn
// into disk_ord2output = {2,0,1,3}.
std::vector<output_ordinal_t> disk_ord2output(disk_ord2index.size());
for (size_t i = 0; i < disk_ord2index.size(); ++i) {
disk_ord2output[disk_ord2index[i]] = static_cast<output_ordinal_t>(i);
}
for (int disk_idx = 0; disk_idx < static_cast<output_ordinal_t>(outputs_.size());
++disk_idx) {
if (disk_idx >= static_cast<int>(disk_ord2index.size())) {
// XXX i#6630: We should auto-set the output count and avoid
// having extra ouputs; these complicate idle computations, etc.
VPRINT(this, 1, "Output %d empty: returning eof up front\n", disk_idx);
outputs_[disk_idx].at_eof = true;
set_cur_input(disk_idx, INVALID_INPUT_ORDINAL);
continue;
}
output_ordinal_t output_idx = disk_ord2output[disk_idx];
VPRINT(this, 1, "Read %zu as-traced records for output #%d\n",
all_sched[output_idx].size(), output_idx);
all_sched[disk_idx].size(), output_idx);
outputs_[output_idx].as_traced_cpuid = disk_ord2cpuid[disk_idx];
VPRINT(this, 1, "Output #%d is as-traced CPU #%" PRId64 "\n", output_idx,
outputs_[output_idx].as_traced_cpuid);
// Update the stop_instruction field and collapse consecutive entries while
// inserting into the final location.
int start_consec = -1;
for (int sched_idx = 0;
sched_idx < static_cast<int>(all_sched[output_idx].size()); ++sched_idx) {
auto &segment = all_sched[output_idx][sched_idx];
for (int sched_idx = 0; sched_idx < static_cast<int>(all_sched[disk_idx].size());
++sched_idx) {
auto &segment = all_sched[disk_idx][sched_idx];
if (!segment.valid)
continue;
auto find = start2stop[segment.input].find(segment.start_instruction);
Expand All @@ -1158,27 +1195,27 @@ scheduler_tmpl_t<RecordType, ReaderType>::read_traced_schedule()
" time=%" PRId64 "\n",
sched_idx, segment.input, segment.start_instruction,
segment.stop_instruction, segment.timestamp);
if (sched_idx + 1 < static_cast<int>(all_sched[output_idx].size()) &&
segment.input == all_sched[output_idx][sched_idx + 1].input &&
if (sched_idx + 1 < static_cast<int>(all_sched[disk_idx].size()) &&
segment.input == all_sched[disk_idx][sched_idx + 1].input &&
segment.stop_instruction >
all_sched[output_idx][sched_idx + 1].start_instruction) {
all_sched[disk_idx][sched_idx + 1].start_instruction) {
// A second sanity check.
error_string_ = "Invalid decreasing start field in schedule file";
return STATUS_ERROR_INVALID_PARAMETER;
} else if (sched_idx + 1 < static_cast<int>(all_sched[output_idx].size()) &&
segment.input == all_sched[output_idx][sched_idx + 1].input &&
} else if (sched_idx + 1 < static_cast<int>(all_sched[disk_idx].size()) &&
segment.input == all_sched[disk_idx][sched_idx + 1].input &&
segment.stop_instruction ==
all_sched[output_idx][sched_idx + 1].start_instruction) {
all_sched[disk_idx][sched_idx + 1].start_instruction) {
// Collapse into next.
if (start_consec == -1)
start_consec = sched_idx;
} else {
schedule_output_tracker_t &toadd = start_consec >= 0
? all_sched[output_idx][start_consec]
: all_sched[output_idx][sched_idx];
? all_sched[disk_idx][start_consec]
: all_sched[disk_idx][sched_idx];
outputs_[output_idx].record.emplace_back(
schedule_record_t::DEFAULT, toadd.input, toadd.start_instruction,
all_sched[output_idx][sched_idx].stop_instruction, toadd.timestamp);
all_sched[disk_idx][sched_idx].stop_instruction, toadd.timestamp);
start_consec = -1;
VDO(this, 3, {
auto &added = outputs_[output_idx].record.back();
Expand All @@ -1193,24 +1230,19 @@ scheduler_tmpl_t<RecordType, ReaderType>::read_traced_schedule()
}
VPRINT(this, 1, "Collapsed duplicates for %zu as-traced records for output #%d\n",
outputs_[output_idx].record.size(), output_idx);
if (!outputs_[output_idx].record.empty()) {
if (outputs_[output_idx].record[0].value.start_instruction != 0) {
VPRINT(this, 1, "Initial input for output #%d is: wait state\n",
output_idx);
set_cur_input(output_idx, INVALID_INPUT_ORDINAL);
outputs_[output_idx].waiting = true;
outputs_[output_idx].record_index = -1;
} else {
VPRINT(this, 1, "Initial input for output #%d is %d\n", output_idx,
outputs_[output_idx].record[0].key.input);
set_cur_input(output_idx, outputs_[output_idx].record[0].key.input);
}
} else {
// XXX i#6630: We should auto-set the output count and avoid
// having extra ouputs; these complicate idle computations, etc.
VPRINT(this, 1, "Output %d empty: returning eof up front\n", output_idx);
outputs_[output_idx].at_eof = true;
if (outputs_[output_idx].record.empty()) {
error_string_ = "Empty as-traced schedule";
return STATUS_ERROR_INVALID_PARAMETER;
}
if (outputs_[output_idx].record[0].value.start_instruction != 0) {
VPRINT(this, 1, "Initial input for output #%d is: wait state\n", output_idx);
set_cur_input(output_idx, INVALID_INPUT_ORDINAL);
outputs_[output_idx].waiting = true;
outputs_[output_idx].record_index = -1;
} else {
VPRINT(this, 1, "Initial input for output #%d is %d\n", output_idx,
outputs_[output_idx].record[0].key.input);
set_cur_input(output_idx, outputs_[output_idx].record[0].key.input);
}
}
return STATUS_SUCCESS;
Expand Down
98 changes: 94 additions & 4 deletions clients/drcachesim/tests/scheduler_unit_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3410,6 +3410,94 @@ test_replay_as_traced_dup_start()
#endif
}

static void
test_replay_as_traced_sort()
{
#ifdef HAS_ZIP
// Test that outputs have the cpuids in sorted order.
std::cerr << "\n----------------\nTesting replay as-traced sorting\n";

static constexpr int NUM_INPUTS = 4;
static constexpr int NUM_OUTPUTS = NUM_INPUTS; // Required to be equal.
static constexpr int NUM_INSTRS = 2;
static constexpr memref_tid_t TID_BASE = 100;
static constexpr addr_t PC_BASE = 1000;
// Our unsorted cpuid order in the file.
static const std::vector<int> CPUIDS = { 42, 7, 56, 3 };
// Index into CPUIDS if sorted.
static const std::vector<int> INDICES = { 3, 1, 0, 2 };
static constexpr uint64_t TIMESTAMP_BASE = 100;

std::vector<trace_entry_t> inputs[NUM_INPUTS];
for (int input_idx = 0; input_idx < NUM_INPUTS; input_idx++) {
memref_tid_t tid = TID_BASE + input_idx;
inputs[input_idx].push_back(make_thread(tid));
inputs[input_idx].push_back(make_pid(1));
// These timestamps do not line up with the schedule file but
// that does not cause problems and leaving it this way
// simplifies the testdata construction.
inputs[input_idx].push_back(make_timestamp(TIMESTAMP_BASE));
inputs[input_idx].push_back(
make_marker(TRACE_MARKER_TYPE_CPU_ID, CPUIDS[input_idx]));
for (int instr_idx = 0; instr_idx < NUM_INSTRS; ++instr_idx) {
inputs[input_idx].push_back(make_instr(PC_BASE + instr_idx));
}
inputs[input_idx].push_back(make_exit(tid));
}

// Synthesize a cpu-schedule file with unsorted entries (see CPUIDS above).
std::string cpu_fname = "tmp_test_cpu_i6721.zip";
{
zipfile_ostream_t outfile(cpu_fname);
for (int i = 0; i < NUM_OUTPUTS; ++i) {
std::vector<schedule_entry_t> sched;
sched.emplace_back(TID_BASE + i, TIMESTAMP_BASE, CPUIDS[i], 0);
std::ostringstream cpu_string;
cpu_string << CPUIDS[i];
std::string err = outfile.open_new_component(cpu_string.str());
assert(err.empty());
if (!outfile.write(reinterpret_cast<char *>(sched.data()),
sched.size() * sizeof(sched[0])))
assert(false);
}
}

// Replay the recorded schedule.
std::vector<scheduler_t::input_workload_t> sched_inputs;
for (int i = 0; i < NUM_INPUTS; i++) {
memref_tid_t tid = TID_BASE + i;
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(inputs[i])),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), tid);
sched_inputs.emplace_back(std::move(readers));
}
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_RECORDED_OUTPUT,
scheduler_t::DEPENDENCY_TIMESTAMPS,
scheduler_t::SCHEDULER_DEFAULTS,
/*verbosity=*/4);
zipfile_istream_t infile(cpu_fname);
sched_ops.replay_as_traced_istream = &infile;
scheduler_t scheduler;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, std::move(sched_ops)) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
for (int i = 0; i < NUM_OUTPUTS; ++i) {
auto *stream = scheduler.get_stream(i);
memref_t memref;
scheduler_t::stream_status_t status = stream->next_record(memref);
if (status == scheduler_t::STATUS_OK) {
assert(memref.marker.tid == TID_BASE + INDICES[i]);
if (memref.marker.type == TRACE_TYPE_MARKER &&
memref.marker.marker_type == TRACE_MARKER_TYPE_CPU_ID) {
assert(static_cast<int>(memref.marker.marker_value) ==
CPUIDS[INDICES[i]]);
}
} else
assert(status == scheduler_t::STATUS_EOF);
}
#endif
}

static void
test_replay_as_traced_from_file(const char *testdir)
{
Expand All @@ -3420,14 +3508,15 @@ test_replay_as_traced_from_file(const char *testdir)
std::string(testdir) + "/drmemtrace.threadsig.x64.tracedir/cpu_schedule.bin.zip";
// This checked-in trace has 8 threads on 7 cores. It doesn't have
// much thread migration but our synthetic test above covers that.
// The outputs use the stored cores sorted by cpuid.
static const char *const SCHED_STRING =
"Core #0: 1257598 \nCore #1: 1257603 \nCore #2: 1257601 \n"
"Core #3: 1257599 => 1257604 @ <366987,87875,13331862029895453> "
"Core #0: 1257602 \nCore #1: 1257600 \n"
"Core #2: 1257599 => 1257604 @ <366987,87875,13331862029895453> "
// The ordinal is really 1 ("<1,0,0>") but with the scheduler's readahead
// it becomes 2; easier to just check for that as trying to avoid readahead
// causes other problems with start-idle cores (i#6721).
// causes other problems (i#xxxx).
"(<366986,87875,13331862029895453> => <2,0,0>) \n"
"Core #4: 1257600 \nCore #5: 1257596 \nCore #6: 1257602 \n";
"Core #3: 1257596 \nCore #4: 1257603 \nCore #5: 1257601 \nCore #6: 1257598 \n";
static constexpr int NUM_OUTPUTS = 7; // Matches the actual trace's core footprint.
scheduler_t scheduler;
std::vector<scheduler_t::input_workload_t> sched_inputs;
Expand Down Expand Up @@ -4249,6 +4338,7 @@ test_main(int argc, const char *argv[])
test_replay_as_traced();
test_replay_as_traced_i6107_workaround();
test_replay_as_traced_dup_start();
test_replay_as_traced_sort();
test_inactive();
test_direct_switch();
test_kernel_switch_sequences();
Expand Down
Loading

0 comments on commit d9eb0c3

Please sign in to comment.