Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add job storage #607

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type executor struct {

distributedLocker Locker // support running jobs across multiple instances
distributedElector Elector // support running jobs across multiple instances

resultReporter JobResultReporter // support reporting job results
}

func newExecutor() executor {
Expand All @@ -69,7 +71,7 @@ func newExecutor() executor {
return e
}

func runJob(f jobFunction) {
func runJob(f jobFunction, reporter JobResultReporter) {
panicHandlerMutex.RLock()
defer panicHandlerMutex.RUnlock()

Expand All @@ -87,16 +89,22 @@ func runJob(f jobFunction) {
err := callJobFuncWithParams(f.function, f.parameters)
if err != nil {
_ = callJobFuncWithParams(f.eventListeners.onError, []interface{}{f.getName(), err})
if reporter != nil {
reporter.ReportJobResult(JobResult{UUID: f.id, JobName: f.jobName, RunTimes: f.jobRunTimes, ReportTime: time.Now(), Err: err})
}
} else {
_ = callJobFuncWithParams(f.eventListeners.noError, []interface{}{f.getName()})
if reporter != nil {
reporter.ReportJobResult(JobResult{UUID: f.id, JobName: f.jobName, RunTimes: f.jobRunTimes, ReportTime: time.Now()})
}
}
_ = callJobFuncWithParams(f.eventListeners.afterJobRuns, []interface{}{f.getName()})
callJobFunc(f.eventListeners.onAfterJobExecution)
f.isRunning.Store(false)
f.runFinishCount.Add(1)
}

func (jf *jobFunction) singletonRunner() {
func (jf *jobFunction) singletonRunner(reporter JobResultReporter) {
jf.singletonRunnerOn.Store(true)
jf.singletonWgMu.Lock()
jf.singletonWg.Add(1)
Expand All @@ -113,7 +121,7 @@ func (jf *jobFunction) singletonRunner() {
return
case <-jf.singletonQueue:
if !jf.stopped.Load() {
runJob(*jf)
runJob(*jf, reporter)
}
}
}
Expand Down Expand Up @@ -174,7 +182,7 @@ func (e *executor) runJob(f jobFunction) {
if err != nil {
return
}
runJob(f)
runJob(f, e.resultReporter)
return
}
if e.distributedLocker != nil {
Expand Down Expand Up @@ -207,15 +215,15 @@ func (e *executor) runJob(f jobFunction) {
}
_ = l.Unlock(f.ctx)
}()
runJob(f)
runJob(f, e.resultReporter)
return
}
runJob(f)
runJob(f, e.resultReporter)
case singletonMode:
e.singletonWgs.Store(f.singletonWg, f.singletonWgMu)

if !f.singletonRunnerOn.Load() {
go f.singletonRunner()
go f.singletonRunner(e.resultReporter)
}
f.singletonQueueMu.Lock()
f.singletonQueue <- struct{}{}
Expand Down
72 changes: 72 additions & 0 deletions executor_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package gocron

import (
"errors"
"sync"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"go.uber.org/atomic"
)
Expand Down Expand Up @@ -65,3 +68,72 @@ func Test_ExecutorPanicHandling(t *testing.T) {
state := <-panicHandled
assert.Equal(t, state, true)
}

type testReporter struct {
lock sync.Mutex
results []JobResult
}

func (r *testReporter) ReportJobResult(result JobResult) {
r.lock.Lock()
defer r.lock.Unlock()
r.results = append(r.results, result)
}

func Test_ExecutorJobReport(t *testing.T) {
reporter := &testReporter{lock: sync.Mutex{}, results: make([]JobResult, 0)}
e := newExecutor()
e.resultReporter = reporter

wg := &sync.WaitGroup{}
wg.Add(2)
e.start()

uuids := []uuid.UUID{uuid.New(), uuid.New()}

e.jobFunctions <- jobFunction{
id: uuids[0],
jobName: "test_fn",
funcName: "test_fn",
function: func() {
wg.Done()
},
parameters: nil,
isRunning: atomic.NewBool(false),
runStartCount: atomic.NewInt64(0),
runFinishCount: atomic.NewInt64(0),
jobRunTimes: &jobRunTimes{nextRun: time.Now()},
}

mockedErr := errors.New("mocked error")
e.jobFunctions <- jobFunction{
id: uuids[1],
jobName: "test_fn",
funcName: "test_fn",
function: func() error {
wg.Done()
return mockedErr
},
parameters: nil,
isRunning: atomic.NewBool(false),
runStartCount: atomic.NewInt64(0),
runFinishCount: atomic.NewInt64(0),
jobRunTimes: &jobRunTimes{nextRun: time.Now()},
}

wg.Wait()
e.stop()

assert.Len(t, reporter.results, 2)
var result1 JobResult
var result2 JobResult
if reporter.results[0].UUID == uuids[0] {
result1 = reporter.results[0]
result2 = reporter.results[1]
} else {
result1 = reporter.results[1]
result2 = reporter.results[0]
}
assert.NoError(t, result1.Err)
assert.Error(t, result2.Err)
}
7 changes: 6 additions & 1 deletion job.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,18 @@ const (

// newJob creates a new Job with the provided interval
func newJob(interval int, startImmediately bool, singletonMode bool) *Job {
return newJobWithUUID(interval, startImmediately, singletonMode, uuid.New())
}

// newJobWithUUID creates a new Job with the provided interval and uuid
func newJobWithUUID(interval int, startImmediately bool, singletonMode bool, id uuid.UUID) *Job {
ctx, cancel := context.WithCancel(context.Background())
job := &Job{
mu: &jobMutex{},
interval: interval,
unit: seconds,
jobFunction: jobFunction{
id: uuid.New(),
id: id,
jobRunTimes: &jobRunTimes{
jobRunTimesMu: &sync.Mutex{},
lastRun: time.Time{},
Expand Down
61 changes: 61 additions & 0 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1427,6 +1427,62 @@ func (s *Scheduler) newJob(interval int) *Job {
return newJob(interval, !s.waitForInterval, s.singletonMode)
}

// LoadFromJobObject loads a job from a JobObject, loading the job's UUID,name,schedule, and last run time.
// if cron is set, it will be used first.
func (s *Scheduler) LoadFromJobObject(obj JobObject) *Scheduler {
j := newJobWithUUID(0, !s.waitForInterval, s.singletonMode, obj.UUID)
s.jobsMutex.Lock()
s.jobs[j.id] = j
s.jobsMutex.Unlock()
s.inScheduleChain = &j.id

s.Name(obj.JobName)

if obj.Cron != "" {
if obj.CronWithSeconds {
s.CronWithSeconds(obj.Cron)
} else {
s.Cron(obj.Cron)
}
return s
}

s.Every(obj.Every)
if obj.Unit != "" {
switch obj.Unit {
case "Millisecond", "Milliseconds":
s.Millisecond()
case "Second", "Seconds":
s.Second()
case "Minute", "Minutes":
s.Minute()
case "Hour", "Hours":
s.Hour()
case "Day", "Days":
s.Day()
case "Week", "Weeks":
s.Week()
}
}
if obj.WeekDay != nil {
s.Weekday(*obj.WeekDay)
}
if len(obj.Months) > 0 {
s.Months(obj.Months...)
}

if len(obj.At) > 0 {
for _, at := range obj.At {
s.At(at)
}
}

j.lastRun = obj.LastRun
j.nextRun = obj.LastRun

return s
}

// WaitForScheduleAll defaults the scheduler to create all
// new jobs with the WaitForSchedule option as true.
// The jobs will not start immediately but rather will
Expand Down Expand Up @@ -1502,6 +1558,11 @@ func (s *Scheduler) WithDistributedLocker(l Locker) {
s.executor.distributedLocker = l
}

// WithResultReporter allows the caller to receive the result of a job
func (s *Scheduler) WithResultReporter(r JobResultReporter) {
s.executor.resultReporter = r
}

// WithDistributedElector prevents the same job from being run more than once
// when multiple schedulers are trying to schedule the same job, by allowing only
// the leader to run jobs. Non-leaders wait until the leader instance goes down
Expand Down
80 changes: 80 additions & 0 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
Expand Down Expand Up @@ -2945,3 +2946,82 @@ func runTestWithDistributedElector(t *testing.T, maxConcurrentJobs int) {
assert.Equal(t, leaderIndex, r)
}
}

func TestLoadJobFromObject(t *testing.T) {
// test suite 1: job schedule expression should be parsed correctly
monday := time.Monday
uid := []uuid.UUID{
uuid.New(),
uuid.New(),
uuid.New(),
uuid.New(),
uuid.New(),
uuid.New(),
}
objects := []JobObject{
{JobName: "cron", UUID: (uid[0]), Cron: "*/1 * * * *"},
{JobName: "cronSeconds", UUID: (uid[1]), CronWithSeconds: true, Cron: "*/1 * * * * *"},
{JobName: "everyDayAt", UUID: (uid[2]), Every: 1, Unit: "Day", At: []interface{}{"10:00", time.Date(2022, 2, 16, 10, 30, 0, 0, time.UTC)}},
{JobName: "every", UUID: (uid[3]), Every: "5m"},
{JobName: "everyWeekdayAt", UUID: (uid[4]), Every: 1, WeekDay: &monday, At: []interface{}{"12:00;13:00"}},
{JobName: "everyMonthDay", UUID: (uid[5]), Every: 2, Months: []int{1, 2, 3}},
}

s1 := NewScheduler(time.UTC)
for _, jo := range objects {
_, err := s1.LoadFromJobObject(jo).Do(func() {})
assert.NoError(t, err, jo.JobName)
}
s2 := NewScheduler(time.UTC)
s2.Cron("*/1 * * * *").Name("cron").Do(func() {})
s2.CronWithSeconds("*/1 * * * * *").Name("cronSeconds").Do(func() {})
s2.Every(1).Day().At("10:00").At(time.Date(2022, 2, 16, 10, 30, 0, 0, time.UTC)).Name("everyDayAt").Do(func() {})
s2.Every("5m").Name("every").Do(func() {})
s2.Every(1).Monday().At("12:00").At("13:00").Name("everyWeekdayAt").Do(func() {})
s2.Every(2).Months(1, 2, 3).Name("everyMonthDay").Do(func() {})

findJob2 := func(name string) *Job {
for _, job := range s2.jobs {
if job.jobName == name {
return job
}
}
return nil
}
now := time.Now()
job1 := s1.jobs[uid[0]]
job2 := findJob2(job1.jobName)
assert.NotNil(t, job2)
assert.Equal(t, job1.jobName, job2.jobName)
assert.Equal(t, job1.cronSchedule.Next(now), job2.cronSchedule.Next(now))

job1 = s1.jobs[uid[1]]
job2 = findJob2(job1.jobName)
assert.NotNil(t, job2)
assert.Equal(t, job1.jobName, job2.jobName)
assert.Equal(t, job1.cronSchedule.Next(now), job2.cronSchedule.Next(now))

job1 = s1.jobs[uid[2]]
job2 = findJob2(job1.jobName)
assert.NotNil(t, job2)
assert.Equal(t, job1.jobName, job2.jobName)
assert.Equal(t, job1.interval, job2.interval)
assert.Equal(t, job1.unit, job2.unit)
assert.ElementsMatch(t, job1.atTimes, job2.atTimes)

job1 = s1.jobs[uid[3]]
job2 = findJob2(job1.jobName)
assert.NotNil(t, job2)
assert.Equal(t, job1.jobName, job2.jobName)
assert.Equal(t, job1.interval, job2.interval)
assert.Equal(t, job1.unit, job2.unit)
assert.ElementsMatch(t, job1.scheduledWeekdays, job2.scheduledWeekdays)

job1 = s1.jobs[uid[4]]
job2 = findJob2(job1.jobName)
assert.NotNil(t, job2)
assert.Equal(t, job1.jobName, job2.jobName)
assert.Equal(t, job1.interval, job2.interval)
assert.Equal(t, job1.unit, job2.unit)
assert.ElementsMatch(t, job1.daysOfTheMonth, job2.daysOfTheMonth)
}
40 changes: 40 additions & 0 deletions storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package gocron

import (
"time"

"github.com/google/uuid"
)

type JobObject struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about the actual job function? Right now, only the job run information is stored and not the actual function to be run 🤔

UUID uuid.UUID
JobName string
LastRun time.Time

Cron string
CronWithSeconds bool

Every interface{}

Unit string
WeekDay *time.Weekday
Months []int

At []interface{}
}

type JobStorage interface {
LoadJobs() ([]JobObject, error)
}

type JobResult struct {
UUID uuid.UUID
JobName string
RunTimes *jobRunTimes
ReportTime time.Time
Err error
}

type JobResultReporter interface {
ReportJobResult(JobResult)
}