Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/fix cv driver #4093

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions modules/footstone/include/footstone/cv_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,12 @@ class CVDriver: public Driver {
virtual ~CVDriver() = default;

virtual void Notify() override;
virtual void WaitFor(const TimeDelta& delta) override;
virtual void WaitFor(const TimeDelta& delta, std::unique_lock<std::mutex>& lock) override;
virtual void Start() override;
virtual void Terminate() override;

private:
std::condition_variable cv_;
std::mutex mutex_;
};

}
Expand Down
7 changes: 5 additions & 2 deletions modules/footstone/include/footstone/driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#pragma once

#include <functional>

#include <mutex>
#include "footstone/time_delta.h"

namespace footstone {
Expand All @@ -35,7 +35,8 @@ class Driver {
virtual ~Driver() = default;

virtual void Notify() = 0;
virtual void WaitFor(const TimeDelta& delta) = 0;
std::mutex& Mutex() { return mutex_; }
virtual void WaitFor(const TimeDelta& delta, std::unique_lock<std::mutex>& lock) = 0;
virtual void Start() = 0;
virtual void Terminate() = 0;

Expand All @@ -60,6 +61,8 @@ class Driver {
*
*/
bool is_exit_immediately_;

std::mutex mutex_;
};

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class LooperDriver: public Driver {
virtual ~LooperDriver();

virtual void Notify() override;
virtual void WaitFor(const TimeDelta& delta) override;
virtual void WaitFor(const TimeDelta& delta, std::unique_lock<std::mutex>& lock) override;
virtual void Start() override;
virtual void Terminate() override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ class LooperDriver: public Driver {
virtual ~LooperDriver();

virtual void Notify() override;
virtual void WaitFor(const TimeDelta& delta) override;
virtual void WaitFor(const TimeDelta& delta, std::unique_lock<std::mutex>& lock) override;
virtual void Start() override;
virtual void Terminate() override;

void OnTimerFire(CFRunLoopTimerRef timer);

private:
Expand Down
2 changes: 2 additions & 0 deletions modules/footstone/include/footstone/task_runner.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ class TaskRunner {
std::unique_ptr<IdleTask> PopIdleTask();
std::unique_ptr<Task> GetTopDelayTask();
std::unique_ptr<Task> GetNext();
bool HasTask();
bool HasMoreUrgentTask(TimeDelta min_wait_time, TimePoint now);

std::queue<std::unique_ptr<Task>> task_queue_;
std::mutex queue_mutex_;
Expand Down
3 changes: 3 additions & 0 deletions modules/footstone/include/footstone/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ class Worker {
void BalanceNoLock();
void SortNoLock();

bool HasTask();
bool HasMoreUrgentTask(TimeDelta min_wait_time, TimePoint now);

int32_t WorkerKeyCreate(uint32_t task_runner_id, const std::function<void(void *)>& destruct);
bool WorkerKeyDelete(uint32_t task_runner_id, int32_t key);
bool WorkerSetSpecific(uint32_t task_runner_id, int32_t key, void *p);
Expand Down
5 changes: 2 additions & 3 deletions modules/footstone/src/cv_driver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ namespace footstone {
inline namespace runner {

void CVDriver::Notify() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.notify_one();
}

void CVDriver::WaitFor(const TimeDelta& delta) {
std::unique_lock<std::mutex> lock(mutex_);

void CVDriver::WaitFor(const TimeDelta& delta, std::unique_lock<std::mutex>& lock) {
if (delta != TimeDelta::Max() && delta >= TimeDelta::Zero()) {
cv_.wait_for(lock, std::chrono::nanoseconds(delta.ToNanoseconds()));
} else {
Expand Down
2 changes: 1 addition & 1 deletion modules/footstone/src/platform/adr/looper_driver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void LooperDriver::Notify() {
timerfd_settime(fd_, TFD_TIMER_ABSTIME, &spec, nullptr);
}

void LooperDriver::WaitFor(const TimeDelta& delta) {
void LooperDriver::WaitFor(const TimeDelta& delta, std::unique_lock<std::mutex>& lock) {
auto nano_secs = delta.ToNanoseconds();
if (nano_secs < 1) {
nano_secs = 1;
Expand Down
2 changes: 1 addition & 1 deletion modules/footstone/src/platform/ios/looper_driver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ void LooperDriver::Notify() {
CFAbsoluteTimeGetCurrent());
}

void LooperDriver::WaitFor(const TimeDelta& delta) {
void LooperDriver::WaitFor(const TimeDelta& delta, std::unique_lock<std::mutex>& lock) {
CFRunLoopTimerSetNextFireDate(
delayed_wake_timer_,
CFAbsoluteTimeGetCurrent() + delta.ToSecondsF());
Expand Down
48 changes: 48 additions & 0 deletions modules/footstone/src/task_runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -266,5 +266,53 @@ void TaskRunner::RunnerDestroySpecifics() {
return worker->WorkerDestroySpecific(task_runner_id);
}

bool TaskRunner::HasTask() {
{
std::lock_guard<std::mutex> lock(queue_mutex_);
if (!task_queue_.empty()) {
return true;
}
}
{
std::lock_guard<std::mutex> lock(delay_mutex_);
if (!delayed_task_queue_.empty()) {
return true;
}
}
{
std::lock_guard<std::mutex> lock(idle_mutex_);
if (!idle_task_queue_.empty()) {
return true;
}
}
return false;
}

bool TaskRunner::HasMoreUrgentTask(TimeDelta min_wait_time, TimePoint now) {
{
std::lock_guard<std::mutex> lock(queue_mutex_);
if (!task_queue_.empty()) {
return true;
}
}
{
std::lock_guard<std::mutex> lock(idle_mutex_);
if (!idle_task_queue_.empty()) {
return true;
}
}
{
std::lock_guard<std::mutex> lock(delay_mutex_);
if (!delayed_task_queue_.empty()) {
const DelayedEntry& delayed_task = delayed_task_queue_.top();
if(delayed_task.first - now < min_wait_time) {
return true;
}
}
}

return false;
}

} // namespace runner
} // namespace footstone
70 changes: 69 additions & 1 deletion modules/footstone/src/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,20 @@ std::unique_ptr<Task> Worker::GetNextTask() {
}));
return wrapper_idle_task;
}
std::unique_lock<std::mutex> lock(driver_->Mutex());
if (driver_->IsTerminated()) {
return nullptr;
}
driver_->WaitFor(min_wait_time_);
if (min_wait_time_ == TimeDelta::Max()) {
if (HasTask()) {
return nullptr;
}
} else {
if (HasMoreUrgentTask(min_wait_time_, now)) {
return nullptr;
}
}
driver_->WaitFor(min_wait_time_, lock);
return nullptr;
}

Expand Down Expand Up @@ -532,5 +542,63 @@ void Worker::UpdateSpecific(uint32_t task_runner_id,
specific_map_[task_runner_id] = array; // insert or update
}

bool Worker::HasTask() {
bool has_found = false;
for (auto &group : running_group_list_) {
for (auto &runner : group) {
if (runner->HasTask()) {
has_found = true;
break;
}
}
if (has_found) {
break;
}
}
if (!has_found) {
for (auto &group : pending_group_list_) {
for (auto &runner : group) {
if (runner->HasTask()) {
has_found = true;
break;
}
}
if (has_found) {
break;
}
}
}
return has_found;
}

bool Worker::HasMoreUrgentTask(TimeDelta min_wait_time, TimePoint now) {
bool has_found = false;
for (auto &group : running_group_list_) {
for (auto &runner : group) {
if (runner->HasMoreUrgentTask(min_wait_time, now)) {
has_found = true;
break;
}
}
if (has_found) {
break;
}
}
if (!has_found) {
for (auto &group : pending_group_list_) {
for (auto &runner : group) {
if (runner->HasMoreUrgentTask(min_wait_time, now)) {
has_found = true;
break;
}
}
if (has_found) {
break;
}
}
}
return has_found;
}

} // namespace runner
} // namespace footstone
Loading