From f818d2a13486cd41b64ff5dc348e532d9192f141 Mon Sep 17 00:00:00 2001 From: etkmao Date: Mon, 28 Oct 2024 15:43:39 +0800 Subject: [PATCH 1/3] fix(core): fix task not run bug when no right notify_one --- .../footstone/include/footstone/cv_driver.h | 2 + modules/footstone/include/footstone/driver.h | 4 +- .../footstone/include/footstone/task_runner.h | 2 + modules/footstone/include/footstone/worker.h | 3 + modules/footstone/src/cv_driver.cc | 13 ++++ modules/footstone/src/task_runner.cc | 48 +++++++++++++ modules/footstone/src/worker.cc | 70 ++++++++++++++++++- 7 files changed, 140 insertions(+), 2 deletions(-) diff --git a/modules/footstone/include/footstone/cv_driver.h b/modules/footstone/include/footstone/cv_driver.h index e79621c421b..9b53231d5ff 100644 --- a/modules/footstone/include/footstone/cv_driver.h +++ b/modules/footstone/include/footstone/cv_driver.h @@ -36,6 +36,8 @@ class CVDriver: public Driver { virtual void Notify() override; virtual void WaitFor(const TimeDelta& delta) override; + virtual std::mutex& Mutex() override; + virtual void WaitFor(const TimeDelta& delta, std::unique_lock& lock) override; virtual void Start() override; virtual void Terminate() override; diff --git a/modules/footstone/include/footstone/driver.h b/modules/footstone/include/footstone/driver.h index 81417dbe2c9..eab90e14bf7 100644 --- a/modules/footstone/include/footstone/driver.h +++ b/modules/footstone/include/footstone/driver.h @@ -23,7 +23,7 @@ #pragma once #include - +#include #include "footstone/time_delta.h" namespace footstone { @@ -36,6 +36,8 @@ class Driver { virtual void Notify() = 0; virtual void WaitFor(const TimeDelta& delta) = 0; + virtual std::mutex& Mutex() = 0; + virtual void WaitFor(const TimeDelta& delta, std::unique_lock& lock) = 0; virtual void Start() = 0; virtual void Terminate() = 0; diff --git a/modules/footstone/include/footstone/task_runner.h b/modules/footstone/include/footstone/task_runner.h index 356d87b36d2..b83a124ecff 100644 --- a/modules/footstone/include/footstone/task_runner.h +++ b/modules/footstone/include/footstone/task_runner.h @@ -122,6 +122,8 @@ class TaskRunner { std::unique_ptr PopIdleTask(); std::unique_ptr GetTopDelayTask(); std::unique_ptr GetNext(); + bool HasTask(); + bool HasMoreUrgentTask(TimeDelta min_wait_time, TimePoint now); std::queue> task_queue_; std::mutex queue_mutex_; diff --git a/modules/footstone/include/footstone/worker.h b/modules/footstone/include/footstone/worker.h index 31134f8b3b9..a4df43c9da0 100644 --- a/modules/footstone/include/footstone/worker.h +++ b/modules/footstone/include/footstone/worker.h @@ -102,6 +102,9 @@ class Worker { void BalanceNoLock(); void SortNoLock(); + bool HasTask(); + bool HasMoreUrgentTask(TimeDelta min_wait_time, TimePoint now); + int32_t WorkerKeyCreate(uint32_t task_runner_id, const std::function& destruct); bool WorkerKeyDelete(uint32_t task_runner_id, int32_t key); bool WorkerSetSpecific(uint32_t task_runner_id, int32_t key, void *p); diff --git a/modules/footstone/src/cv_driver.cc b/modules/footstone/src/cv_driver.cc index 7356c419aa7..724720d73b9 100644 --- a/modules/footstone/src/cv_driver.cc +++ b/modules/footstone/src/cv_driver.cc @@ -24,6 +24,7 @@ namespace footstone { inline namespace runner { void CVDriver::Notify() { + std::unique_lock lock(mutex_); cv_.notify_one(); } @@ -37,6 +38,18 @@ void CVDriver::WaitFor(const TimeDelta& delta) { } } +std::mutex& CVDriver::Mutex() { + return mutex_; +} + +void CVDriver::WaitFor(const TimeDelta& delta, std::unique_lock& lock) { + if (delta != TimeDelta::Max() && delta >= TimeDelta::Zero()) { + cv_.wait_for(lock, std::chrono::nanoseconds(delta.ToNanoseconds())); + } else { + cv_.wait(lock); + } +} + void CVDriver::Start() { while (!is_terminated_) { unit_(); diff --git a/modules/footstone/src/task_runner.cc b/modules/footstone/src/task_runner.cc index b8a60590a26..108ca7a6340 100644 --- a/modules/footstone/src/task_runner.cc +++ b/modules/footstone/src/task_runner.cc @@ -266,5 +266,53 @@ void TaskRunner::RunnerDestroySpecifics() { return worker->WorkerDestroySpecific(task_runner_id); } +bool TaskRunner::HasTask() { + { + std::lock_guard lock(queue_mutex_); + if (!task_queue_.empty()) { + return true; + } + } + { + std::lock_guard lock(delay_mutex_); + if (!delayed_task_queue_.empty()) { + return true; + } + } + { + std::lock_guard lock(idle_mutex_); + if (!idle_task_queue_.empty()) { + return true; + } + } + return false; +} + +bool TaskRunner::HasMoreUrgentTask(TimeDelta min_wait_time, TimePoint now) { + { + std::lock_guard lock(queue_mutex_); + if (!task_queue_.empty()) { + return true; + } + } + { + std::lock_guard lock(idle_mutex_); + if (!idle_task_queue_.empty()) { + return true; + } + } + { + std::lock_guard lock(delay_mutex_); + if (!delayed_task_queue_.empty()) { + const DelayedEntry& delayed_task = delayed_task_queue_.top(); + if(delayed_task.first - now < min_wait_time) { + return true; + } + } + } + + return false; +} + } // namespace runner } // namespace footstone diff --git a/modules/footstone/src/worker.cc b/modules/footstone/src/worker.cc index b6b8e5fd9e3..ce9faadd110 100644 --- a/modules/footstone/src/worker.cc +++ b/modules/footstone/src/worker.cc @@ -405,7 +405,17 @@ std::unique_ptr Worker::GetNextTask() { if (driver_->IsTerminated()) { return nullptr; } - driver_->WaitFor(min_wait_time_); + std::unique_lock lock(driver_->Mutex()); + if (min_wait_time_ == TimeDelta::Max()) { + if (HasTask()) { + return nullptr; + } + } else { + if (HasMoreUrgentTask(min_wait_time_, now)) { + return nullptr; + } + } + driver_->WaitFor(min_wait_time_, lock); return nullptr; } @@ -532,5 +542,63 @@ void Worker::UpdateSpecific(uint32_t task_runner_id, specific_map_[task_runner_id] = array; // insert or update } +bool Worker::HasTask() { + bool has_found = false; + for (auto &group : running_group_list_) { + for (auto &runner : group) { + if (runner->HasTask()) { + has_found = true; + break; + } + } + if (has_found) { + break; + } + } + if (!has_found) { + for (auto &group : pending_group_list_) { + for (auto &runner : group) { + if (runner->HasTask()) { + has_found = true; + break; + } + } + if (has_found) { + break; + } + } + } + return has_found; +} + +bool Worker::HasMoreUrgentTask(TimeDelta min_wait_time, TimePoint now) { + bool has_found = false; + for (auto &group : running_group_list_) { + for (auto &runner : group) { + if (runner->HasMoreUrgentTask(min_wait_time, now)) { + has_found = true; + break; + } + } + if (has_found) { + break; + } + } + if (!has_found) { + for (auto &group : pending_group_list_) { + for (auto &runner : group) { + if (runner->HasMoreUrgentTask(min_wait_time, now)) { + has_found = true; + break; + } + } + if (has_found) { + break; + } + } + } + return has_found; +} + } // namespace runner } // namespace footstone From e71f15f550b719a359ff15d94edc278392bf9b55 Mon Sep 17 00:00:00 2001 From: etkmao Date: Mon, 28 Oct 2024 15:52:58 +0800 Subject: [PATCH 2/3] fix(core): fix wait forever when terminate --- modules/footstone/src/worker.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/footstone/src/worker.cc b/modules/footstone/src/worker.cc index ce9faadd110..d82b036db61 100644 --- a/modules/footstone/src/worker.cc +++ b/modules/footstone/src/worker.cc @@ -402,10 +402,10 @@ std::unique_ptr Worker::GetNextTask() { })); return wrapper_idle_task; } + std::unique_lock lock(driver_->Mutex()); if (driver_->IsTerminated()) { return nullptr; } - std::unique_lock lock(driver_->Mutex()); if (min_wait_time_ == TimeDelta::Max()) { if (HasTask()) { return nullptr; From 5bae5b0c530a861c218dffbaaee7765fb8f1498b Mon Sep 17 00:00:00 2001 From: etkmao Date: Mon, 28 Oct 2024 16:50:51 +0800 Subject: [PATCH 3/3] fix(core): fix looper_driver for android and ios --- modules/footstone/include/footstone/cv_driver.h | 3 --- modules/footstone/include/footstone/driver.h | 5 +++-- .../include/footstone/platform/adr/looper_driver.h | 2 +- .../include/footstone/platform/ios/looper_driver.h | 4 ++-- modules/footstone/src/cv_driver.cc | 14 -------------- .../footstone/src/platform/adr/looper_driver.cc | 2 +- .../footstone/src/platform/ios/looper_driver.cc | 2 +- 7 files changed, 8 insertions(+), 24 deletions(-) diff --git a/modules/footstone/include/footstone/cv_driver.h b/modules/footstone/include/footstone/cv_driver.h index 9b53231d5ff..de6a1698290 100644 --- a/modules/footstone/include/footstone/cv_driver.h +++ b/modules/footstone/include/footstone/cv_driver.h @@ -35,15 +35,12 @@ class CVDriver: public Driver { virtual ~CVDriver() = default; virtual void Notify() override; - virtual void WaitFor(const TimeDelta& delta) override; - virtual std::mutex& Mutex() override; virtual void WaitFor(const TimeDelta& delta, std::unique_lock& lock) override; virtual void Start() override; virtual void Terminate() override; private: std::condition_variable cv_; - std::mutex mutex_; }; } diff --git a/modules/footstone/include/footstone/driver.h b/modules/footstone/include/footstone/driver.h index eab90e14bf7..c18869e6741 100644 --- a/modules/footstone/include/footstone/driver.h +++ b/modules/footstone/include/footstone/driver.h @@ -35,8 +35,7 @@ class Driver { virtual ~Driver() = default; virtual void Notify() = 0; - virtual void WaitFor(const TimeDelta& delta) = 0; - virtual std::mutex& Mutex() = 0; + std::mutex& Mutex() { return mutex_; } virtual void WaitFor(const TimeDelta& delta, std::unique_lock& lock) = 0; virtual void Start() = 0; virtual void Terminate() = 0; @@ -62,6 +61,8 @@ class Driver { * */ bool is_exit_immediately_; + + std::mutex mutex_; }; } diff --git a/modules/footstone/include/footstone/platform/adr/looper_driver.h b/modules/footstone/include/footstone/platform/adr/looper_driver.h index 213dd9c3042..e6f4e5053dc 100644 --- a/modules/footstone/include/footstone/platform/adr/looper_driver.h +++ b/modules/footstone/include/footstone/platform/adr/looper_driver.h @@ -36,7 +36,7 @@ class LooperDriver: public Driver { virtual ~LooperDriver(); virtual void Notify() override; - virtual void WaitFor(const TimeDelta& delta) override; + virtual void WaitFor(const TimeDelta& delta, std::unique_lock& lock) override; virtual void Start() override; virtual void Terminate() override; diff --git a/modules/footstone/include/footstone/platform/ios/looper_driver.h b/modules/footstone/include/footstone/platform/ios/looper_driver.h index 1ec849a044f..1e07e2c90e8 100644 --- a/modules/footstone/include/footstone/platform/ios/looper_driver.h +++ b/modules/footstone/include/footstone/platform/ios/looper_driver.h @@ -35,10 +35,10 @@ class LooperDriver: public Driver { virtual ~LooperDriver(); virtual void Notify() override; - virtual void WaitFor(const TimeDelta& delta) override; + virtual void WaitFor(const TimeDelta& delta, std::unique_lock& lock) override; virtual void Start() override; virtual void Terminate() override; - + void OnTimerFire(CFRunLoopTimerRef timer); private: diff --git a/modules/footstone/src/cv_driver.cc b/modules/footstone/src/cv_driver.cc index 724720d73b9..ff180f7100e 100644 --- a/modules/footstone/src/cv_driver.cc +++ b/modules/footstone/src/cv_driver.cc @@ -28,20 +28,6 @@ void CVDriver::Notify() { cv_.notify_one(); } -void CVDriver::WaitFor(const TimeDelta& delta) { - std::unique_lock lock(mutex_); - - if (delta != TimeDelta::Max() && delta >= TimeDelta::Zero()) { - cv_.wait_for(lock, std::chrono::nanoseconds(delta.ToNanoseconds())); - } else { - cv_.wait(lock); - } -} - -std::mutex& CVDriver::Mutex() { - return mutex_; -} - void CVDriver::WaitFor(const TimeDelta& delta, std::unique_lock& lock) { if (delta != TimeDelta::Max() && delta >= TimeDelta::Zero()) { cv_.wait_for(lock, std::chrono::nanoseconds(delta.ToNanoseconds())); diff --git a/modules/footstone/src/platform/adr/looper_driver.cc b/modules/footstone/src/platform/adr/looper_driver.cc index 22ea02c9daa..96a3b371122 100644 --- a/modules/footstone/src/platform/adr/looper_driver.cc +++ b/modules/footstone/src/platform/adr/looper_driver.cc @@ -57,7 +57,7 @@ void LooperDriver::Notify() { timerfd_settime(fd_, TFD_TIMER_ABSTIME, &spec, nullptr); } -void LooperDriver::WaitFor(const TimeDelta& delta) { +void LooperDriver::WaitFor(const TimeDelta& delta, std::unique_lock& lock) { auto nano_secs = delta.ToNanoseconds(); if (nano_secs < 1) { nano_secs = 1; diff --git a/modules/footstone/src/platform/ios/looper_driver.cc b/modules/footstone/src/platform/ios/looper_driver.cc index b817da01eac..eac5e0d0bcb 100644 --- a/modules/footstone/src/platform/ios/looper_driver.cc +++ b/modules/footstone/src/platform/ios/looper_driver.cc @@ -56,7 +56,7 @@ void LooperDriver::Notify() { CFAbsoluteTimeGetCurrent()); } -void LooperDriver::WaitFor(const TimeDelta& delta) { +void LooperDriver::WaitFor(const TimeDelta& delta, std::unique_lock& lock) { CFRunLoopTimerSetNextFireDate( delayed_wake_timer_, CFAbsoluteTimeGetCurrent() + delta.ToSecondsF());