Skip to content

Commit

Permalink
scheduler and timer should be separated (#124)
Browse files Browse the repository at this point in the history
  • Loading branch information
kotbegemot committed May 2, 2022
1 parent a4d058c commit 9597d6c
Show file tree
Hide file tree
Showing 15 changed files with 105 additions and 93 deletions.
49 changes: 42 additions & 7 deletions examples/timer/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,54 @@
#include <vector>

#include <actor-zeta.hpp>
#include <actor-zeta/clock/clock_thread_safe.hpp>
#include <actor-zeta/detail/memory_resource.hpp>

auto thread_pool_deleter = [](actor_zeta::scheduler_abstract_t* ptr) {
ptr->stop();
delete ptr;
};

constexpr static uint64_t command_alarm = 0;

static std::atomic<uint64_t> alarm_counter{0};

using actor_zeta::detail::pmr::memory_resource;

template<class Policy>
class advanced_scheduler_t final : public actor_zeta::scheduler_t<Policy> {
public:
using super = actor_zeta::scheduler_t<Policy>;

advanced_scheduler_t(size_t num_worker_threads, size_t max_throughput_param)
: super(num_worker_threads, max_throughput_param) {}

void start() override {
super::start();
clock_.start_dispatch_loop();
}

void stop() override {
super::stop();
clock_.stop_dispatch_loop();
}

actor_zeta::clock::thread_safe_clock_t& clock() {
return clock_;
}

private:
actor_zeta::clock::thread_safe_clock_t clock_;
};

using shared_work = advanced_scheduler_t<actor_zeta::work_sharing>;

auto thread_pool_deleter = [](shared_work* ptr) {
ptr->stop();
delete ptr;
};

/// non thread safe
class supervisor_lite final : public actor_zeta::cooperative_supervisor<supervisor_lite> {
public:
explicit supervisor_lite(memory_resource* ptr)
: cooperative_supervisor(ptr, "network")
, e_(new actor_zeta::scheduler_t<actor_zeta::work_sharing>(
, e_(new shared_work(
1,
100),
thread_pool_deleter) {
Expand All @@ -39,6 +69,11 @@ class supervisor_lite final : public actor_zeta::cooperative_supervisor<supervis
alarm_counter += 1;
}

actor_zeta::clock::thread_safe_clock_t& clock() {
return e_->clock();
}


protected:
auto scheduler_impl() noexcept -> actor_zeta::scheduler_abstract_t* final { return e_.get(); }
auto enqueue_impl(actor_zeta::message_ptr msg, actor_zeta::execution_unit*) -> void final {
Expand All @@ -49,7 +84,7 @@ class supervisor_lite final : public actor_zeta::cooperative_supervisor<supervis
}

private:
std::unique_ptr<actor_zeta::scheduler_abstract_t, decltype(thread_pool_deleter)> e_;
std::unique_ptr<shared_work, decltype(thread_pool_deleter)> e_;
std::vector<actor_zeta::actor> actors_;
};

Expand Down
2 changes: 0 additions & 2 deletions header/actor-zeta/base/cooperative_actor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

#include "forwards.hpp"
#include <actor-zeta/base/actor_abstract.hpp>
#include <actor-zeta/clock/clock.hpp>
#include <actor-zeta/detail/single_reader_queue.hpp>
#include <actor-zeta/scheduler/resumable.hpp>
#include <actor-zeta/base/behavior.hpp>
Expand Down Expand Up @@ -72,7 +71,6 @@ namespace actor_zeta { namespace base {

// ----------------------------------------------------- message processing

auto clock() noexcept -> clock::clock_t&;
supervisor_abstract* supervisor_;
scheduler::execution_unit* executor_;
mailbox::message* current_message_;
Expand Down
2 changes: 0 additions & 2 deletions header/actor-zeta/base/supervisor_abstract.hpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#pragma once

#include <actor-zeta/base/communication_module.hpp>
#include <actor-zeta/clock/clock.hpp>
#include <actor-zeta/detail/memory_resource.hpp>
#include <actor-zeta/scheduler/scheduler_abstract.hpp>

Expand All @@ -22,7 +21,6 @@ namespace actor_zeta { namespace base {
auto scheduler() noexcept -> scheduler::scheduler_abstract_t*;
auto resource() const -> detail::pmr::memory_resource*;
auto address() noexcept -> address_t;
auto clock() noexcept -> clock::clock_t&;

protected:
virtual auto scheduler_impl() noexcept -> scheduler::scheduler_abstract_t* = 0;
Expand Down
3 changes: 2 additions & 1 deletion header/actor-zeta/clock/clock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

#include <chrono>
#include <string>

#include <actor-zeta/base/forwards.hpp>
#include <actor-zeta/mailbox/message.hpp>
#include <actor-zeta/base/address.hpp>
#include <actor-zeta/detail/unique_function.hpp>

namespace actor_zeta { namespace clock {
Expand Down
3 changes: 0 additions & 3 deletions header/actor-zeta/impl/actor/cooperative_actor.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,5 @@ namespace actor_zeta { namespace base {
auto cooperative_actor::supervisor() -> supervisor_abstract* {
return supervisor_;
}
auto cooperative_actor::clock() noexcept -> clock::clock_t& {
return supervisor()->clock();
}

}} // namespace actor_zeta::base
4 changes: 0 additions & 4 deletions header/actor-zeta/impl/actor/supervisor_abstract.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,4 @@ namespace actor_zeta { namespace base {
return address_t(this);
}

auto supervisor_abstract::clock() noexcept -> clock::clock_t& {
return scheduler_impl()->clock();
}

}} // namespace actor_zeta::base
4 changes: 3 additions & 1 deletion header/actor-zeta/impl/clock/clock.ipp
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#pragma once
#include <actor-zeta/clock/clock.hpp>
#include <type_traits>

#include <actor-zeta/clock/clock.hpp>
#include <actor-zeta/base/address.hpp>

namespace actor_zeta { namespace clock {

clock_t::~clock_t() {
Expand Down
1 change: 1 addition & 0 deletions header/actor-zeta/scheduler/resumable.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <type_traits>
#include <cstddef>

#include "forwards.hpp"

Expand Down
9 changes: 0 additions & 9 deletions header/actor-zeta/scheduler/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#include <thread>
#include <vector>

#include <actor-zeta/clock/clock_thread_safe.hpp>
#include <actor-zeta/detail/ref_counted.hpp>
#include <actor-zeta/scheduler/scheduler_abstract.hpp>
#include <actor-zeta/scheduler/worker.hpp>
Expand Down Expand Up @@ -48,8 +47,6 @@ namespace actor_zeta { namespace scheduler {
for (auto& w : workers_) {
w->start();
}

clock_.start_dispatch_loop();
}

void stop() override {
Expand Down Expand Up @@ -108,19 +105,13 @@ namespace actor_zeta { namespace scheduler {
policy_.foreach_resumable(w.get(), f);
}
policy_.foreach_central_resumable(this, f);
clock_.stop_dispatch_loop();
}

void enqueue(resumable* ptr) override {
policy_.central_enqueue(this, ptr);
}

clock::clock_t& clock() noexcept override {
return clock_;
}

private:
clock::thread_safe_clock_t clock_;
std::vector<std::unique_ptr<worker_type>> workers_;
policy_data data_;
Policy policy_;
Expand Down
2 changes: 0 additions & 2 deletions header/actor-zeta/scheduler/scheduler_abstract.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#include <chrono>
#include <cstddef>

#include <actor-zeta/clock/clock.hpp>
#include "forwards.hpp"

namespace actor_zeta { namespace scheduler {
Expand All @@ -28,7 +27,6 @@ namespace actor_zeta { namespace scheduler {

virtual void start() = 0;
virtual void stop() = 0;
virtual clock::clock_t& clock() noexcept = 0;

protected:
std::atomic<size_t> next_worker_;
Expand Down
49 changes: 49 additions & 0 deletions test/timer/classes.hpp
Original file line number Diff line number Diff line change
@@ -1 +1,50 @@
#pragma once

#include <cassert>

#include <chrono>
#include <memory>
#include <vector>

#include <actor-zeta.hpp>
#include <actor-zeta/detail/memory_resource.hpp>
#include "tooltestsuites/scheduler_test.hpp"
#include "tooltestsuites/clock_test.hpp"

static std::atomic<uint64_t> alarm_counter{0};

using actor_zeta::detail::pmr::memory_resource;
/// non thread safe
constexpr static auto alarm_id = actor_zeta::make_message_id(0);
class supervisor_lite final : public actor_zeta::cooperative_supervisor<supervisor_lite> {
public:
explicit supervisor_lite(memory_resource* ptr)
: cooperative_supervisor(ptr, "network")
, executor_(new actor_zeta::test::scheduler_test_t(1, 1)) {
add_handler(alarm_id, &supervisor_lite::alarm);
scheduler()->start();
}

auto clock() noexcept -> actor_zeta::test::clock_test& {
return executor_->clock();
}

~supervisor_lite() override = default;

void alarm() {
alarm_counter += 1;
}

protected:
auto scheduler_impl() noexcept -> actor_zeta::scheduler_abstract_t* final { return executor_.get(); }
auto enqueue_impl(actor_zeta::message_ptr msg, actor_zeta::execution_unit*) -> void final {
{
set_current_message(std::move(msg));
execute(this, current_message());
}
}

private:
std::unique_ptr<actor_zeta::test::scheduler_test_t> executor_;
std::vector<actor_zeta::actor> actors_;
};
55 changes: 4 additions & 51 deletions test/timer/main.cpp
Original file line number Diff line number Diff line change
@@ -1,63 +1,16 @@
#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file
#include <catch2/catch.hpp>

#include <cassert>

#include <chrono>
#include <memory>
#include <vector>

#include <actor-zeta.hpp>
#include <actor-zeta/detail/memory_resource.hpp>

#include "classes.hpp"
#include "tooltestsuites/scheduler_test.hpp"

static std::atomic<uint64_t> alarm_counter{0};

using actor_zeta::detail::pmr::memory_resource;
/// non thread safe
constexpr static auto alarm_id = actor_zeta::make_message_id(0);
class supervisor_lite final : public actor_zeta::cooperative_supervisor<supervisor_lite> {
public:
explicit supervisor_lite(memory_resource* ptr)
: cooperative_supervisor(ptr, "network")
, executor_(new actor_zeta::test::scheduler_test_t(1, 1)) {
add_handler(alarm_id, &supervisor_lite::alarm);
scheduler()->start();
}

auto scheduler_test() noexcept -> actor_zeta::test::scheduler_test_t* {
return executor_.get();
}

~supervisor_lite() override = default;

void alarm() {
alarm_counter += 1;
}

protected:
auto scheduler_impl() noexcept -> actor_zeta::scheduler_abstract_t* final { return executor_.get(); }
auto enqueue_impl(actor_zeta::message_ptr msg, actor_zeta::execution_unit*) -> void final {
{
set_current_message(std::move(msg));
execute(this,current_message());
}
}

private:
std::unique_ptr<actor_zeta::test::scheduler_test_t> executor_;
std::vector<actor_zeta::actor> actors_;
};

TEST_CASE("timer") {
auto* mr_ptr = actor_zeta::detail::pmr::get_default_resource();
auto supervisor = actor_zeta::spawn_supervisor<supervisor_lite>(mr_ptr);

auto time = supervisor->clock().now() + std::chrono::seconds(10);
supervisor->clock().schedule_message(time, supervisor->address(), actor_zeta::make_message(actor_zeta::address_t::empty_address(), alarm_id));
supervisor->scheduler_test()->advance_time(std::chrono::seconds(10));
auto& clock = supervisor->clock();
auto time = clock.now() + std::chrono::seconds(10);
clock.schedule_message(time, supervisor->address(), actor_zeta::make_message(actor_zeta::address_t::empty_address(), alarm_id));
supervisor->clock().advance_time(std::chrono::seconds(10));

REQUIRE(alarm_counter.load() == 1);
}
4 changes: 2 additions & 2 deletions test/tooltestsuites/clock_test.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

#include <algorithm>
#include <map>
#include <chrono>

#include "actor-zeta/base/forwards.hpp"
#include "actor-zeta/clock/clock.hpp"
#include <actor-zeta/clock/clock.hpp>

namespace actor_zeta { namespace test {

Expand Down
2 changes: 1 addition & 1 deletion test/tooltestsuites/scheduler_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace actor_zeta { namespace test {
: super(num_worker_threads, max_throughput) {
}

clock::clock_t& scheduler_test_t::clock() noexcept {
clock_test& scheduler_test_t::clock() noexcept {
return clock_;
}

Expand Down
9 changes: 1 addition & 8 deletions test/tooltestsuites/scheduler_test.hpp
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
#pragma once

#include <algorithm>
#include <chrono>
#include <cstddef>
#include <deque>
#include <functional>
#include <limits>

#include <actor-zeta/mailbox/message.hpp>
#include "actor-zeta/base/actor_abstract.hpp"
#include "actor-zeta/scheduler/resumable.hpp"
#include "actor-zeta/scheduler/scheduler_abstract.hpp"
#include "clock_test.hpp"
Expand All @@ -26,7 +19,7 @@ namespace actor_zeta { namespace test {
bool run_once();
size_t run(size_t max_count = std::numeric_limits<size_t>::max());
size_t advance_time(clock::clock_t::duration_type);
clock::clock_t& clock() noexcept override;
clock_test& clock() noexcept;

protected:
void start() override;
Expand Down

0 comments on commit 9597d6c

Please sign in to comment.