Skip to content

Commit

Permalink
Merge branch 'master' into i7084-change-parameter-name-to-rseq-aborted
Browse files Browse the repository at this point in the history
  • Loading branch information
ivankyluk authored Nov 14, 2024
2 parents d9ef38a + 9388e31 commit 12ae86c
Show file tree
Hide file tree
Showing 7 changed files with 831 additions and 783 deletions.
5 changes: 3 additions & 2 deletions clients/drcachesim/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ scheduler_tmpl_t<RecordType, ReaderType>::init(
scheduler_impl_deleter_t>(
new scheduler_replay_tmpl_t<RecordType, ReaderType>);
} else {
// Non-dynamic and non-replay fixed modes such as analyzer serial and
// parallel modes.
// Non-dynamic and non-replay fixed modes such as analyzer
// parallel mode with a static mapping of inputs to outputs and analyzer
// serial mode with a simple time interleaving of all inputs onto one output.
impl_ = std::unique_ptr<scheduler_impl_tmpl_t<RecordType, ReaderType>,
scheduler_impl_deleter_t>(
new scheduler_fixed_tmpl_t<RecordType, ReaderType>);
Expand Down
356 changes: 303 additions & 53 deletions clients/drcachesim/scheduler/scheduler_dynamic.cpp

Large diffs are not rendered by default.

72 changes: 57 additions & 15 deletions clients/drcachesim/scheduler/scheduler_fixed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,9 @@
#include "scheduler.h"
#include "scheduler_impl.h"

#include <atomic>
#include <cinttypes>
#include <cstdint>
#include <mutex>
#include <thread>

#include "memref.h"
#include "mutex_dbg_owned.h"
Expand All @@ -50,45 +48,89 @@
namespace dynamorio {
namespace drmemtrace {

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::scheduler_status_t
scheduler_fixed_tmpl_t<RecordType, ReaderType>::set_initial_schedule(
std::unordered_map<int, std::vector<int>> &workload2inputs)
{
if (options_.mapping == sched_type_t::MAP_TO_CONSISTENT_OUTPUT) {
// Assign the inputs up front to avoid locks once we're in parallel mode.
// We use a simple round-robin static assignment for now.
for (int i = 0; i < static_cast<input_ordinal_t>(inputs_.size()); ++i) {
size_t index = i % outputs_.size();
if (outputs_[index].input_indices.empty())
set_cur_input(static_cast<input_ordinal_t>(index), i);
outputs_[index].input_indices.push_back(i);
VPRINT(this, 2, "Assigning input #%d to output #%zd\n", i, index);
}
} else if (options_.mapping == sched_type_t::MAP_TO_RECORDED_OUTPUT) {
if (options_.replay_as_traced_istream != nullptr) {
return sched_type_t::STATUS_ERROR_INVALID_PARAMETER;
} else if (outputs_.size() > 1) {
return sched_type_t::STATUS_ERROR_INVALID_PARAMETER;
} else if (inputs_.size() == 1) {
set_cur_input(0, 0);
} else {
// The old file_reader_t interleaving would output the top headers for every
// thread first and then pick the oldest timestamp once it reached a
// timestamp. We instead queue those headers so we can start directly with the
// oldest timestamp's thread.
uint64_t min_time = std::numeric_limits<uint64_t>::max();
input_ordinal_t min_input = -1;
for (int i = 0; i < static_cast<input_ordinal_t>(inputs_.size()); ++i) {
if (inputs_[i].next_timestamp < min_time) {
min_time = inputs_[i].next_timestamp;
min_input = i;
}
}
if (min_input < 0)
return sched_type_t::STATUS_ERROR_INVALID_PARAMETER;
set_cur_input(0, static_cast<input_ordinal_t>(min_input));
}
} else {
return sched_type_t::STATUS_ERROR_INVALID_PARAMETER;
}
return sched_type_t::STATUS_SUCCESS;
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_fixed_tmpl_t<RecordType, ReaderType>::pick_next_input_for_mode(
output_ordinal_t output, uint64_t blocked_time, input_ordinal_t prev_index,
input_ordinal_t &index)
{
if (this->options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS) {
if (options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS) {
uint64_t min_time = std::numeric_limits<uint64_t>::max();
for (size_t i = 0; i < this->inputs_.size(); ++i) {
std::lock_guard<mutex_dbg_owned> lock(*this->inputs_[i].lock);
if (!this->inputs_[i].at_eof && this->inputs_[i].next_timestamp > 0 &&
this->inputs_[i].next_timestamp < min_time) {
min_time = this->inputs_[i].next_timestamp;
for (size_t i = 0; i < inputs_.size(); ++i) {
std::lock_guard<mutex_dbg_owned> lock(*inputs_[i].lock);
if (!inputs_[i].at_eof && inputs_[i].next_timestamp > 0 &&
inputs_[i].next_timestamp < min_time) {
min_time = inputs_[i].next_timestamp;
index = static_cast<int>(i);
}
}
if (index < 0) {
stream_status_t status = this->eof_or_idle(output, prev_index);
if (status != sched_type_t::STATUS_STOLE)
return status;
index = this->outputs_[output].cur_input;
index = outputs_[output].cur_input;
return sched_type_t::STATUS_OK;
}
VPRINT(this, 2,
"next_record[%d]: advancing to timestamp %" PRIu64 " == input #%d\n",
output, min_time, index);
} else if (this->options_.mapping == sched_type_t::MAP_TO_CONSISTENT_OUTPUT) {
} else if (options_.mapping == sched_type_t::MAP_TO_CONSISTENT_OUTPUT) {
// We're done with the prior thread; take the next one that was
// pre-allocated to this output (pre-allocated to avoid locks). Invariant:
// the same output will not be accessed by two different threads
// simultaneously in this mode, allowing us to support a lock-free
// parallel-friendly increment here.
int indices_index = ++this->outputs_[output].input_indices_index;
if (indices_index >=
static_cast<int>(this->outputs_[output].input_indices.size())) {
int indices_index = ++outputs_[output].input_indices_index;
if (indices_index >= static_cast<int>(outputs_[output].input_indices.size())) {
VPRINT(this, 2, "next_record[%d]: all at eof\n", output);
return sched_type_t::STATUS_EOF;
}
index = this->outputs_[output].input_indices[indices_index];
index = outputs_[output].input_indices[indices_index];
VPRINT(this, 2, "next_record[%d]: advancing to local index %d == input #%d\n",
output, indices_index, index);
} else
Expand All @@ -103,7 +145,7 @@ scheduler_fixed_tmpl_t<RecordType, ReaderType>::check_for_input_switch(
output_ordinal_t output, RecordType &record, input_info_t *input, uint64_t cur_time,
bool &need_new_input, bool &preempt, uint64_t &blocked_time)
{
if (this->options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS &&
if (options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS &&
this->record_type_is_timestamp(record, input->next_timestamp))
need_new_input = true;
return sched_type_t::STATUS_OK;
Expand Down
Loading

0 comments on commit 12ae86c

Please sign in to comment.