Skip to content

Commit

Permalink
i#5694 core-sharded: Add TRACE_MARKER_TYPE_CORE_WAIT (#6415)
Browse files Browse the repository at this point in the history
Adds a new TRACE_MARKER_TYPE_CORE_WAIT marker which is a synthetic
marker inserted in core-sharded drmemtrace analysis tool mode when the
scheduler returns STATUS_WAIT. This is meant for tools which analyze
schedules themselves.

Adds a unit test.

Issue: #5694
  • Loading branch information
derekbruening authored Nov 8, 2023
1 parent fe8eec3 commit c0cf363
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 11 deletions.
31 changes: 26 additions & 5 deletions clients/drcachesim/analyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,17 @@ analyzer_t::record_is_timestamp(const memref_t &record)
record.marker.marker_type == TRACE_MARKER_TYPE_TIMESTAMP;
}

template <>
memref_t
analyzer_t::create_wait_marker()
{
memref_t record = {}; // Zero the other fields.
record.marker.type = TRACE_TYPE_MARKER;
record.marker.marker_type = TRACE_MARKER_TYPE_CORE_WAIT;
record.marker.tid = INVALID_THREAD_ID;
return record;
}

/******************************************************************************
* Specializations for analyzer_tmpl_t<record_reader_t>, aka record_analyzer_t.
*/
Expand Down Expand Up @@ -160,6 +171,17 @@ record_analyzer_t::record_is_timestamp(const trace_entry_t &record)
return record.type == TRACE_TYPE_MARKER && record.size == TRACE_MARKER_TYPE_TIMESTAMP;
}

template <>
trace_entry_t
record_analyzer_t::create_wait_marker()
{
trace_entry_t record;
record.type = TRACE_TYPE_MARKER;
record.size = TRACE_MARKER_TYPE_CORE_WAIT;
record.addr = 0; // Marker value has no meaning so we zero it.
return record;
}

/********************************************************************
* Other analyzer_tmpl_t routines that do not need to be specialized.
*/
Expand Down Expand Up @@ -510,11 +532,10 @@ analyzer_tmpl_t<RecordType, ReaderType>::process_tasks(analyzer_worker_data_t *w
if (sched_by_time_)
cur_micros = get_current_microseconds();
if (status == sched_type_t::STATUS_WAIT) {
// TODO i#5694: We'd like the forthcoming schedule_stats tool to know about
// waits and idle periods (to record "-" in its string): should the analyzer
// insert a new marker type that doesn't count toward ordinals (or else it
// needs a scheduler API to inject it)?
continue;
// We let tools know about waits so they can analyze the schedule.
// We synthesize a record here. If we wanted this to count toward output
// stream ordinals we would need to add a scheduler API to inject it.
record = create_wait_marker();
} else if (status != sched_type_t::STATUS_OK) {
if (status == sched_type_t::STATUS_REGION_INVALID) {
worker->error =
Expand Down
3 changes: 3 additions & 0 deletions clients/drcachesim/analyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,9 @@ template <typename RecordType, typename ReaderType> class analyzer_tmpl_t {
bool
record_is_timestamp(const RecordType &record);

RecordType
create_wait_marker();

// Invoked when the given interval finishes during serial or parallel
// analysis of the trace. For parallel analysis, the shard_id
// parameter should be set to the shard_id for which the interval
Expand Down
11 changes: 11 additions & 0 deletions clients/drcachesim/common/trace_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,17 @@ typedef enum {
*/
TRACE_MARKER_TYPE_DIRECT_THREAD_SWITCH,

/**
* This marker is used for core-sharded analyses to indicate that the current
* core is waiting on another core. This is primarily only useful for analyses
* studying the scheduling of threads onto cores. A new marker is emitted each
* time the tool analysis framework requests a new record from the scheduler and
* is given a wait status. There are no units of time here but each repetition
* is roughly the time where a regular record could have been read and passed
* along.
*/
TRACE_MARKER_TYPE_CORE_WAIT,

// ...
// These values are reserved for future built-in marker types.
// ...
Expand Down
6 changes: 6 additions & 0 deletions clients/drcachesim/reader/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@ class reader_t : public std::iterator<std::input_iterator_tag, memref_t>,
bool
is_record_synthetic() const override
{
if (cur_ref_.marker.type == TRACE_TYPE_MARKER &&
cur_ref_.marker.marker_type == TRACE_MARKER_TYPE_CORE_WAIT) {
// These are synthetic records not part of the input and not
// counting toward ordinals.
return true;
}
return suppress_ref_count_ >= 0;
}

Expand Down
195 changes: 190 additions & 5 deletions clients/drcachesim/tests/analysis_unit_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,21 @@

/* Unit tests for trace analysis APIs. */

#include "analyzer.h"
#include "mock_reader.h"
#include "scheduler.h"

#include <assert.h>

#include <atomic>
#include <iostream>
#include <thread>
#include <vector>

#include "analyzer.h"
#include "mock_reader.h"
#include "scheduler.h"
#ifdef HAS_ZIP
# include "zipfile_istream.h"
# include "zipfile_ostream.h"
#endif

namespace dynamorio {
namespace drmemtrace {

Expand Down Expand Up @@ -177,10 +183,189 @@ test_queries()
return true;
}

bool
test_wait_records()
{
#ifdef HAS_ZIP
std::cerr << "\n----------------\nTesting wait records\n";

static constexpr int NUM_INPUTS = 5;
static constexpr int NUM_OUTPUTS = 2;
static constexpr int NUM_INSTRS = 9;
static constexpr memref_tid_t TID_BASE = 100;
static constexpr int CPU0 = 6;
static constexpr int CPU1 = 9;
std::vector<trace_entry_t> inputs[NUM_INPUTS];
for (int i = 0; i < NUM_INPUTS; i++) {
memref_tid_t tid = TID_BASE + i;
inputs[i].push_back(make_thread(tid));
inputs[i].push_back(make_pid(1));
// The last input will be earlier than all others. It will execute
// 3 instrs on each core. This is to test the case when an output
// begins in the wait state.
for (int j = 0; j < (i == NUM_INPUTS - 1 ? 6 : NUM_INSTRS); j++)
inputs[i].push_back(make_instr(42 + j * 4));
inputs[i].push_back(make_exit(tid));
}

// Synthesize a cpu-schedule file with some waits in it, if run in lockstep.
// In pure lockstep it looks like this with a - for a wait and . for a
// non-instruction record, to help understand the file entries below:
// core0: "EEE-AAA-CCCAAACCCBBB.DDD."
// core1: "---EEE.BBBDDDBBBDDDAAA.CCC."
std::string cpu_fname = "tmp_test_wait_records.zip";
{
// Instr counts are 1-based, but the first lists 0 (really starts at 1).
std::vector<schedule_entry_t> sched0;
sched0.emplace_back(TID_BASE + 4, 10, CPU0, 0);
sched0.emplace_back(TID_BASE, 101, CPU0, 0);
sched0.emplace_back(TID_BASE + 2, 103, CPU0, 0);
sched0.emplace_back(TID_BASE, 105, CPU0, 4);
sched0.emplace_back(TID_BASE + 2, 107, CPU0, 4);
sched0.emplace_back(TID_BASE + 1, 109, CPU0, 7);
sched0.emplace_back(TID_BASE + 3, 111, CPU0, 7);
std::vector<schedule_entry_t> sched1;
sched1.emplace_back(TID_BASE + 4, 20, CPU1, 4);
sched1.emplace_back(TID_BASE + 1, 102, CPU1, 0);
sched1.emplace_back(TID_BASE + 3, 104, CPU1, 0);
sched1.emplace_back(TID_BASE + 1, 106, CPU1, 4);
sched1.emplace_back(TID_BASE + 3, 108, CPU1, 4);
sched1.emplace_back(TID_BASE, 110, CPU1, 7);
sched1.emplace_back(TID_BASE + 2, 112, CPU1, 7);
std::ostringstream cpu0_string;
cpu0_string << CPU0;
std::ostringstream cpu1_string;
cpu1_string << CPU1;
zipfile_ostream_t outfile(cpu_fname);
std::string err = outfile.open_new_component(cpu0_string.str());
assert(err.empty());
if (!outfile.write(reinterpret_cast<char *>(sched0.data()),
sched0.size() * sizeof(sched0[0])))
assert(false);
err = outfile.open_new_component(cpu1_string.str());
assert(err.empty());
if (!outfile.write(reinterpret_cast<char *>(sched1.data()),
sched1.size() * sizeof(sched1[0])))
assert(false);
}

// Replay the recorded schedule.
std::vector<scheduler_t::input_workload_t> sched_inputs;
for (int i = 0; i < NUM_INPUTS; i++) {
memref_tid_t tid = TID_BASE + i;
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(inputs[i])),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), tid);
sched_inputs.emplace_back(std::move(readers));
}
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_RECORDED_OUTPUT,
scheduler_t::DEPENDENCY_TIMESTAMPS,
scheduler_t::SCHEDULER_DEFAULTS,
/*verbosity=*/1);
zipfile_istream_t infile(cpu_fname);
sched_ops.replay_as_traced_istream = &infile;

class test_tool_t : public analysis_tool_t {
public:
// Caller must pre-allocate the vector with a slot per output stream.
test_tool_t(std::vector<std::string> *schedule_strings)
: schedule_strings_(schedule_strings)
{
}
bool
process_memref(const memref_t &memref) override
{
assert(false); // Only expect parallel mode.
return false;
}
bool
print_results() override
{
return true;
}
bool
parallel_shard_supported() override
{
return true;
}
void *
parallel_shard_init_stream(int shard_index, void *worker_data,
memtrace_stream_t *stream) override
{
auto per_shard = new per_shard_t;
per_shard->index = shard_index;
per_shard->stream = stream;
return reinterpret_cast<void *>(per_shard);
}
bool
parallel_shard_exit(void *shard_data) override
{
per_shard_t *shard = reinterpret_cast<per_shard_t *>(shard_data);
(*schedule_strings_)[shard->index] = shard->schedule;
delete shard;
return true;
}
bool
parallel_shard_memref(void *shard_data, const memref_t &memref) override
{
per_shard_t *shard = reinterpret_cast<per_shard_t *>(shard_data);
// We run in *rough* lockstep to avoid a flaky test: we just need to
// avoid the 2nd output making it through several initial records
// before the 1st output runs and sees a STATUS_WAIT.
static constexpr int MAX_WAITS = 100000;
int waits;
while (global_records_ < 3 * shard->records / 2) {
std::this_thread::yield();
// Avoid a hang. It shouldn't happen with these inputs though.
if (++waits > MAX_WAITS)
break;
}
++shard->records;
++global_records_;
if (memref.marker.type == TRACE_TYPE_MARKER &&
memref.marker.marker_type == TRACE_MARKER_TYPE_CORE_WAIT) {
shard->schedule += '-';
return true;
}
int64_t input = shard->stream->get_input_id();
shard->schedule += 'A' + static_cast<char>(input % 26);
return true;
}

private:
struct per_shard_t {
int index;
memtrace_stream_t *stream;
std::string schedule;
int64_t records = 0;
};
std::atomic<int64_t> global_records_ { 0 };
std::vector<std::string> *schedule_strings_;
};

std::vector<std::string> schedule_strings(NUM_OUTPUTS);
std::vector<analysis_tool_t *> tools;
auto test_tool = std::unique_ptr<test_tool_t>(new test_tool_t(&schedule_strings));
tools.push_back(test_tool.get());
mock_analyzer_t analyzer(sched_inputs, &tools[0], (int)tools.size(),
/*parallel=*/true, NUM_OUTPUTS, &sched_ops);
assert(!!analyzer);
bool res = analyzer.run();
assert(res);
for (const auto &sched : schedule_strings) {
std::cerr << "Schedule: " << sched << "\n";
}
// Due to non-determinism we can't put too many restrictions here so we
// just ensure we saw at least one wait at the start.
assert(schedule_strings[1][0] == '-');
#endif
return true;
}

int
test_main(int argc, const char *argv[])
{
if (!test_queries())
if (!test_queries() || !test_wait_records())
return 1;
std::cerr << "All done!\n";
return 0;
Expand Down
5 changes: 4 additions & 1 deletion clients/drcachesim/tools/basic_counts.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ basic_counts_t::parallel_shard_memref(void *shard_data, const memref_t &memref)
{
per_shard_t *per_shard = reinterpret_cast<per_shard_t *>(shard_data);
counters_t *counters = &per_shard->counters[per_shard->counters.size() - 1];
if (memref.instr.tid != per_shard->last_tid_) {
if (memref.instr.tid != INVALID_THREAD_ID &&
memref.instr.tid != per_shard->last_tid_) {
counters->unique_threads.insert(memref.instr.tid);
per_shard->last_tid_ = memref.instr.tid;
}
Expand Down Expand Up @@ -168,6 +169,8 @@ basic_counts_t::parallel_shard_memref(void *shard_data, const memref_t &memref)
} else if (memref.marker.marker_type == TRACE_MARKER_TYPE_KERNEL_EVENT ||
memref.marker.marker_type == TRACE_MARKER_TYPE_KERNEL_XFER) {
++counters->xfer_markers;
} else if (memref.marker.marker_type == TRACE_MARKER_TYPE_CORE_WAIT) {
// This is a synthetic record so do not increment any counts.
} else {
if (memref.marker.marker_type == TRACE_MARKER_TYPE_WINDOW_ID &&
static_cast<intptr_t>(memref.marker.marker_value) !=
Expand Down
5 changes: 5 additions & 0 deletions clients/drcachesim/tools/view.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,9 +424,14 @@ view_t::parallel_shard_memref(void *shard_data, const memref_t &memref)
std::cerr << "<marker: system call trace end>\n";
break;
case TRACE_MARKER_TYPE_BRANCH_TARGET:
// These are not expected to be visible (since the reader adds them
// to memref.instr.indirect_branch_target) but we handle nonetheless.
std::cerr << "<marker: indirect branch target 0x" << std::hex
<< memref.marker.marker_value << std::dec << ">\n";
break;
case TRACE_MARKER_TYPE_CORE_WAIT:
std::cerr << "<marker: wait for another core>\n";
break;
default:
std::cerr << "<marker: type " << memref.marker.marker_type << "; value "
<< memref.marker.marker_value << ">\n";
Expand Down

0 comments on commit c0cf363

Please sign in to comment.