Skip to content

Commit

Permalink
feat: cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
hum committed Jan 15, 2024
1 parent b6ebeb3 commit dfccafd
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 66 deletions.
71 changes: 25 additions & 46 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

type Scheduler struct {
tasks map[uuid.UUID]*Task
tasks map[uuid.UUID]*task
taskLock sync.Mutex
}

Expand All @@ -18,23 +18,22 @@ type Scheduler struct {
// To use the scheduler, add a task via `scheduler.RegisterTask` or `scheduler.RunOnce`
func NewScheduler() *Scheduler {
return &Scheduler{
tasks: make(map[uuid.UUID]*Task),
tasks: make(map[uuid.UUID]*task),
}
}

// Register allows for a task to be added within the execution loop of the scheduler
func (s *Scheduler) RegisterTask(t *Task) (uuid.UUID, error) {
// RegisterTask allows for a task to be added within the execution loop of the scheduler
func (s *Scheduler) RegisterTask(t *task) error {
return s.registerTask(t, false)
}

// RunOnce allows a one-time execution of a task directly within the runtime of the scheduler
func (s *Scheduler) RunOnce(t *Task) error {
_, err := s.registerTask(t, true)
return err
func (s *Scheduler) RunOnce(t *task) error {
return s.registerTask(t, true)
}

// GetTask returns a task registered under provided uuid.UUID. If the task is not registered, the function returns an error.
func (s *Scheduler) GetTask(idx uuid.UUID) (*Task, error) {
func (s *Scheduler) GetTask(idx uuid.UUID) (*task, error) {
s.taskLock.Lock()
defer s.taskLock.Unlock()

Expand All @@ -52,6 +51,9 @@ func (s *Scheduler) RemoveTask(idx uuid.UUID) error {
return err
}

s.taskLock.Lock()
defer s.taskLock.Unlock()

delete(s.tasks, idx)
return nil
}
Expand All @@ -73,49 +75,36 @@ func (s *Scheduler) Stop() {

// registerTask validates task's correctness before adding it to the group of tasks.
// Tasks can run either on a schedule, or be executed only once within the context of the scheduler via the parameter `runOnce`.
func (s *Scheduler) registerTask(t *Task, runOnce bool) (uuid.UUID, error) {
func (s *Scheduler) registerTask(t *task, runOnce bool) error {
s.taskLock.Lock()
defer s.taskLock.Unlock()

// Go panics with duration <0
if t.Interval <= time.Duration(0) {
return uuid.UUID{}, fmt.Errorf("invalid interval=%d", t.Interval)
}

// Tasks are identified with Google's UUID, but long-term it would be nice
// to have internal ID generation to eliminate external dependency
taskid, _ := uuid.NewUUID()
if _, ok := s.tasks[taskid]; ok {
return uuid.UUID{}, fmt.Errorf("taskid=%s is already registered", taskid)
}

// Only add the task to the hash-map of tasks if it's not a one-time run.
if !runOnce {
s.tasks[taskid] = t
if _, ok := s.tasks[t.ID]; ok {
return fmt.Errorf("taskid=%s is already registered", t.ID)
}

s.tasks[t.ID] = t
s.execTask(t, runOnce)
return taskid, nil
return nil
}

// exec is the entrypoint for the execution of the task. It only accepts a task's identifier.
// execTask is the entrypoint for the execution of the task. It only accepts a task's identifier.
// Tasks are ran in goroutines, which belong to each task respectively.
//
// @TODO: Should exec accept a task pointer to decouple it from the internal array buffer of tasks?
func (s *Scheduler) execTask(task *Task, runOnce bool) {
func (s *Scheduler) execTask(task *task, runOnce bool) {
go func() {
time.AfterFunc(time.Until(task.StartTime), func() {
if err := task.ctx.Err(); err != nil {
// @TODO: Add IDs to the task itself
// @TODO: Never print stuff to a console directly
// A) Return as a slice of errors to the caller
// B) Expose a logging interface for the caller to have a control over
// C) Ignore (?)
fmt.Printf("err: task is cancelled but wanted to be ran\n")
fmt.Printf("err: task=%s is cancelled but wanted to be ran\n", task.ID)

// Make sure to also stop the tick timer
if task.t != nil {
task.t.Stop()
if task.timer != nil {
task.timer.Stop()
}
return
}
Expand All @@ -128,23 +117,13 @@ func (s *Scheduler) execTask(task *Task, runOnce bool) {
tick = time.Until(task.cron.Next())
}

task.t = time.AfterFunc(tick, func() {
task.timer = time.AfterFunc(tick, func() {
go task.run()
defer func() {
if !runOnce {
// Reset the internal timer.
// @TODO: Is there a cleaner way to set this?
if task.cron != nil {
// CRON reset
task.t.Reset(time.Until(task.cron.Next()))
} else {
// Interval reset
task.t.Reset(task.Interval)
}
task.resetTimer()
} else {
// Cancel the context and stop the internal timer for a runOnce task
task.cancel()
task.t.Stop()
s.RemoveTask(task.ID)
}
}()
})
Expand All @@ -167,8 +146,8 @@ func (s *Scheduler) stopTask(taskid uuid.UUID) error {
s.tasks[taskid].cancel()

// Stops the internal timer of the task
if s.tasks[taskid].t != nil {
s.tasks[taskid].t.Stop()
if s.tasks[taskid].timer != nil {
s.tasks[taskid].timer.Stop()
}
return nil
}
23 changes: 11 additions & 12 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,18 @@ func TestSchedulerAddsTask(t *testing.T) {
fmt.Println("hello, world!")
return nil
},
// Large enough interval for the whole test to finish so that we do not print anything to the console
Interval: 15 * time.Second,
})

scheduler := scheduled.NewScheduler()
taskid, err := scheduler.RegisterTask(task)
err := scheduler.RegisterTask(task)
require.NoError(t, err)

registeredTask, err := scheduler.GetTask(taskid)
registeredTask, err := scheduler.GetTask(task.ID)
require.NoError(t, err)
require.Equal(t, task, registeredTask)

err = scheduler.RemoveTask(taskid)
err = scheduler.RemoveTask(task.ID)
require.NoError(t, err)
}

Expand All @@ -41,13 +40,13 @@ func TestSchedulerRemovesTask(t *testing.T) {
})

scheduler := scheduled.NewScheduler()
taskid, err := scheduler.RegisterTask(task)
err := scheduler.RegisterTask(task)
require.NoError(t, err)

err = scheduler.RemoveTask(taskid)
err = scheduler.RemoveTask(task.ID)
require.NoError(t, err)

_, err = scheduler.GetTask(taskid)
_, err = scheduler.GetTask(task.ID)
require.Error(t, err)
}

Expand All @@ -63,7 +62,7 @@ func TestSchedulerExecutesTaskAtLeastOnce(t *testing.T) {
})

scheduler := scheduled.NewScheduler()
_, err := scheduler.RegisterTask(task)
err := scheduler.RegisterTask(task)
require.NoError(t, err)

test_loop:
Expand Down Expand Up @@ -120,7 +119,7 @@ func TestSchedulerExecutesTaskMultipleTimes(t *testing.T) {
})

scheduler := scheduled.NewScheduler()
idx, err := scheduler.RegisterTask(task)
err := scheduler.RegisterTask(task)
require.NoError(t, err)

var count = 0
Expand All @@ -138,7 +137,7 @@ test_loop:
}
}

err = scheduler.RemoveTask(idx)
err = scheduler.RemoveTask(task.ID)
require.NoError(t, err)
}

Expand All @@ -155,7 +154,7 @@ func TestSchedulerExecutesTaskAtStartTime(t *testing.T) {
})

scheduler := scheduled.NewScheduler()
_, err := scheduler.RegisterTask(task)
err := scheduler.RegisterTask(task)
require.NoError(t, err)

test_loop:
Expand Down Expand Up @@ -186,7 +185,7 @@ func TestSchedulerExecutesCRONTaskAtLeastOnce(t *testing.T) {
})

scheduler := scheduled.NewScheduler()
_, err := scheduler.RegisterTask(task)
err := scheduler.RegisterTask(task)
require.NoError(t, err)

test_loop:
Expand Down
42 changes: 34 additions & 8 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/essentialkaos/ek/v12/cron"
"github.com/google/uuid"
)

type TaskOpts struct {
Expand All @@ -28,7 +29,10 @@ type TaskOpts struct {
type TaskFunc func() error
type TaskErrFunc func(err error)

type Task struct {
type task struct {
// Task identifier is used to be able to get and stop tasks already registered in the scheduler
ID uuid.UUID

// Underlying function to be ran within the scheduler
Fn TaskFunc

Expand All @@ -47,13 +51,14 @@ type Task struct {
ctx context.Context
cancel context.CancelFunc

t *time.Timer
timer *time.Timer
}

func NewTask(opts TaskOpts) *Task {
var (
ctx, cancel = context.WithCancel(context.Background())
)
func NewTask(opts TaskOpts) *task {
// If the caller has picked neither CRON nor interval, then we have no idea how to schedule the task
if opts.Cron == "" && opts.Interval <= 0 {
panic("neither cron nor interval is set")
}

// Only set the cron expression if the value is set
var cronExpr *cron.Expr = nil
Expand All @@ -69,7 +74,18 @@ func NewTask(opts TaskOpts) *Task {
cronExpr = c
}

return &Task{
var (
// uuid.NewUUID() only returns an (uuid, error) for historical reasons.
// It never returns an error anymore, so can be safely ignored.
// [Issue](https://github.com/google/uuid/issues/63)
taskid, _ = uuid.NewUUID()

// Internal context for the task. Used for cancellation.
ctx, cancel = context.WithCancel(context.Background())
)

return &task{
ID: taskid,
Fn: opts.Fn,
ErrFn: opts.ErrFn,
Interval: opts.Interval,
Expand All @@ -80,7 +96,7 @@ func NewTask(opts TaskOpts) *Task {
}
}

func (t *Task) run() {
func (t *task) run() {
if err := t.Fn(); err != nil {
if t.ErrFn != nil {
// While it may not be pretty, it works.
Expand All @@ -93,3 +109,13 @@ func (t *Task) run() {
return
}
}

// resetTimer sets the next tick for the task to run with its internal timer.
func (t *task) resetTimer() {
var next = t.Interval
if t.cron != nil {
// CRON has a precedence over interval
next = time.Until(t.cron.Next())
}
t.timer.Reset(next)
}

0 comments on commit dfccafd

Please sign in to comment.