From 94322435031be79b3a0990d588cb84abe777e2a5 Mon Sep 17 00:00:00 2001 From: pkoukk Date: Thu, 9 Nov 2023 15:54:46 +0800 Subject: [PATCH] add job storage --- executor.go | 22 ++++++++----- executor_test.go | 72 ++++++++++++++++++++++++++++++++++++++++++ job.go | 7 ++++- scheduler.go | 61 ++++++++++++++++++++++++++++++++++++ scheduler_test.go | 80 +++++++++++++++++++++++++++++++++++++++++++++++ storage.go | 40 ++++++++++++++++++++++++ 6 files changed, 274 insertions(+), 8 deletions(-) create mode 100644 storage.go diff --git a/executor.go b/executor.go index 894468c4..4cb22b77 100644 --- a/executor.go +++ b/executor.go @@ -55,6 +55,8 @@ type executor struct { distributedLocker Locker // support running jobs across multiple instances distributedElector Elector // support running jobs across multiple instances + + resultReporter JobResultReporter // support reporting job results } func newExecutor() executor { @@ -69,7 +71,7 @@ func newExecutor() executor { return e } -func runJob(f jobFunction) { +func runJob(f jobFunction, reporter JobResultReporter) { panicHandlerMutex.RLock() defer panicHandlerMutex.RUnlock() @@ -87,8 +89,14 @@ func runJob(f jobFunction) { err := callJobFuncWithParams(f.function, f.parameters) if err != nil { _ = callJobFuncWithParams(f.eventListeners.onError, []interface{}{f.getName(), err}) + if reporter != nil { + reporter.ReportJobResult(JobResult{UUID: f.id, JobName: f.jobName, RunTimes: f.jobRunTimes, ReportTime: time.Now(), Err: err}) + } } else { _ = callJobFuncWithParams(f.eventListeners.noError, []interface{}{f.getName()}) + if reporter != nil { + reporter.ReportJobResult(JobResult{UUID: f.id, JobName: f.jobName, RunTimes: f.jobRunTimes, ReportTime: time.Now()}) + } } _ = callJobFuncWithParams(f.eventListeners.afterJobRuns, []interface{}{f.getName()}) callJobFunc(f.eventListeners.onAfterJobExecution) @@ -96,7 +104,7 @@ func runJob(f jobFunction) { f.runFinishCount.Add(1) } -func (jf *jobFunction) singletonRunner() { +func (jf *jobFunction) singletonRunner(reporter JobResultReporter) { jf.singletonRunnerOn.Store(true) jf.singletonWgMu.Lock() jf.singletonWg.Add(1) @@ -113,7 +121,7 @@ func (jf *jobFunction) singletonRunner() { return case <-jf.singletonQueue: if !jf.stopped.Load() { - runJob(*jf) + runJob(*jf, reporter) } } } @@ -174,7 +182,7 @@ func (e *executor) runJob(f jobFunction) { if err != nil { return } - runJob(f) + runJob(f, e.resultReporter) return } if e.distributedLocker != nil { @@ -207,15 +215,15 @@ func (e *executor) runJob(f jobFunction) { } _ = l.Unlock(f.ctx) }() - runJob(f) + runJob(f, e.resultReporter) return } - runJob(f) + runJob(f, e.resultReporter) case singletonMode: e.singletonWgs.Store(f.singletonWg, f.singletonWgMu) if !f.singletonRunnerOn.Load() { - go f.singletonRunner() + go f.singletonRunner(e.resultReporter) } f.singletonQueueMu.Lock() f.singletonQueue <- struct{}{} diff --git a/executor_test.go b/executor_test.go index 03ce53cc..cef4e4ea 100644 --- a/executor_test.go +++ b/executor_test.go @@ -1,9 +1,12 @@ package gocron import ( + "errors" "sync" "testing" + "time" + "github.com/google/uuid" "github.com/stretchr/testify/assert" "go.uber.org/atomic" ) @@ -65,3 +68,72 @@ func Test_ExecutorPanicHandling(t *testing.T) { state := <-panicHandled assert.Equal(t, state, true) } + +type testReporter struct { + lock sync.Mutex + results []JobResult +} + +func (r *testReporter) ReportJobResult(result JobResult) { + r.lock.Lock() + defer r.lock.Unlock() + r.results = append(r.results, result) +} + +func Test_ExecutorJobReport(t *testing.T) { + reporter := &testReporter{lock: sync.Mutex{}, results: make([]JobResult, 0)} + e := newExecutor() + e.resultReporter = reporter + + wg := &sync.WaitGroup{} + wg.Add(2) + e.start() + + uuids := []uuid.UUID{uuid.New(), uuid.New()} + + e.jobFunctions <- jobFunction{ + id: uuids[0], + jobName: "test_fn", + funcName: "test_fn", + function: func() { + wg.Done() + }, + parameters: nil, + isRunning: atomic.NewBool(false), + runStartCount: atomic.NewInt64(0), + runFinishCount: atomic.NewInt64(0), + jobRunTimes: &jobRunTimes{nextRun: time.Now()}, + } + + mockedErr := errors.New("mocked error") + e.jobFunctions <- jobFunction{ + id: uuids[1], + jobName: "test_fn", + funcName: "test_fn", + function: func() error { + wg.Done() + return mockedErr + }, + parameters: nil, + isRunning: atomic.NewBool(false), + runStartCount: atomic.NewInt64(0), + runFinishCount: atomic.NewInt64(0), + jobRunTimes: &jobRunTimes{nextRun: time.Now()}, + } + + wg.Wait() + e.stop() + + assert.Len(t, reporter.results, 2) + var result1 JobResult + var result2 JobResult + if reporter.results[0].UUID == uuids[0] { + result1 = reporter.results[0] + result2 = reporter.results[1] + } else { + result1 = reporter.results[1] + result2 = reporter.results[0] + } + assert.NoError(t, result1.Err) + assert.Error(t, result2.Err) +} diff --git a/job.go b/job.go index 13f01979..85fc94aa 100644 --- a/job.go +++ b/job.go @@ -138,13 +138,18 @@ const ( // newJob creates a new Job with the provided interval func newJob(interval int, startImmediately bool, singletonMode bool) *Job { + return newJobWithUUID(interval, startImmediately, singletonMode, uuid.New()) +} + +// newJobWithUUID creates a new Job with the provided interval and uuid +func newJobWithUUID(interval int, startImmediately bool, singletonMode bool, id uuid.UUID) *Job { ctx, cancel := context.WithCancel(context.Background()) job := &Job{ mu: &jobMutex{}, interval: interval, unit: seconds, jobFunction: jobFunction{ - id: uuid.New(), + id: id, jobRunTimes: &jobRunTimes{ jobRunTimesMu: &sync.Mutex{}, lastRun: time.Time{}, diff --git a/scheduler.go b/scheduler.go index f51e0bde..16b560e1 100644 --- a/scheduler.go +++ b/scheduler.go @@ -1427,6 +1427,62 @@ func (s *Scheduler) newJob(interval int) *Job { return newJob(interval, !s.waitForInterval, s.singletonMode) } +// LoadFromJobObject loads a job from a JobObject, loading the job's UUID,name,schedule, and last run time. +// if cron is set, it will be used first. +func (s *Scheduler) LoadFromJobObject(obj JobObject) *Scheduler { + j := newJobWithUUID(0, !s.waitForInterval, s.singletonMode, obj.UUID) + s.jobsMutex.Lock() + s.jobs[j.id] = j + s.jobsMutex.Unlock() + s.inScheduleChain = &j.id + + s.Name(obj.JobName) + + if obj.Cron != "" { + if obj.CronWithSeconds { + s.CronWithSeconds(obj.Cron) + } else { + s.Cron(obj.Cron) + } + return s + } + + s.Every(obj.Every) + if obj.Unit != "" { + switch obj.Unit { + case "Millisecond", "Milliseconds": + s.Millisecond() + case "Second", "Seconds": + s.Second() + case "Minute", "Minutes": + s.Minute() + case "Hour", "Hours": + s.Hour() + case "Day", "Days": + s.Day() + case "Week", "Weeks": + s.Week() + } + } + if obj.WeekDay != nil { + s.Weekday(*obj.WeekDay) + } + if len(obj.Months) > 0 { + s.Months(obj.Months...) + } + + if len(obj.At) > 0 { + for _, at := range obj.At { + s.At(at) + } + } + + j.lastRun = obj.LastRun + j.nextRun = obj.LastRun + + return s +} + // WaitForScheduleAll defaults the scheduler to create all // new jobs with the WaitForSchedule option as true. // The jobs will not start immediately but rather will @@ -1502,6 +1558,11 @@ func (s *Scheduler) WithDistributedLocker(l Locker) { s.executor.distributedLocker = l } +// WithResultReporter allows the caller to receive the result of a job +func (s *Scheduler) WithResultReporter(r JobResultReporter) { + s.executor.resultReporter = r +} + // WithDistributedElector prevents the same job from being run more than once // when multiple schedulers are trying to schedule the same job, by allowing only // the leader to run jobs. Non-leaders wait until the leader instance goes down diff --git a/scheduler_test.go b/scheduler_test.go index 9edfbd87..bdaee2d5 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/atomic" @@ -2945,3 +2946,82 @@ func runTestWithDistributedElector(t *testing.T, maxConcurrentJobs int) { assert.Equal(t, leaderIndex, r) } } + +func TestLoadJobFromObject(t *testing.T) { + // test suite 1: job schedule expression should be parsed correctly + monday := time.Monday + uid := []uuid.UUID{ + uuid.New(), + uuid.New(), + uuid.New(), + uuid.New(), + uuid.New(), + uuid.New(), + } + objects := []JobObject{ + {JobName: "cron", UUID: (uid[0]), Cron: "*/1 * * * *"}, + {JobName: "cronSeconds", UUID: (uid[1]), CronWithSeconds: true, Cron: "*/1 * * * * *"}, + {JobName: "everyDayAt", UUID: (uid[2]), Every: 1, Unit: "Day", At: []interface{}{"10:00", time.Date(2022, 2, 16, 10, 30, 0, 0, time.UTC)}}, + {JobName: "every", UUID: (uid[3]), Every: "5m"}, + {JobName: "everyWeekdayAt", UUID: (uid[4]), Every: 1, WeekDay: &monday, At: []interface{}{"12:00;13:00"}}, + {JobName: "everyMonthDay", UUID: (uid[5]), Every: 2, Months: []int{1, 2, 3}}, + } + + s1 := NewScheduler(time.UTC) + for _, jo := range objects { + _, err := s1.LoadFromJobObject(jo).Do(func() {}) + assert.NoError(t, err, jo.JobName) + } + s2 := NewScheduler(time.UTC) + s2.Cron("*/1 * * * *").Name("cron").Do(func() {}) + s2.CronWithSeconds("*/1 * * * * *").Name("cronSeconds").Do(func() {}) + s2.Every(1).Day().At("10:00").At(time.Date(2022, 2, 16, 10, 30, 0, 0, time.UTC)).Name("everyDayAt").Do(func() {}) + s2.Every("5m").Name("every").Do(func() {}) + s2.Every(1).Monday().At("12:00").At("13:00").Name("everyWeekdayAt").Do(func() {}) + s2.Every(2).Months(1, 2, 3).Name("everyMonthDay").Do(func() {}) + + findJob2 := func(name string) *Job { + for _, job := range s2.jobs { + if job.jobName == name { + return job + } + } + return nil + } + now := time.Now() + job1 := s1.jobs[uid[0]] + job2 := findJob2(job1.jobName) + assert.NotNil(t, job2) + assert.Equal(t, job1.jobName, job2.jobName) + assert.Equal(t, job1.cronSchedule.Next(now), job2.cronSchedule.Next(now)) + + job1 = s1.jobs[uid[1]] + job2 = findJob2(job1.jobName) + assert.NotNil(t, job2) + assert.Equal(t, job1.jobName, job2.jobName) + assert.Equal(t, job1.cronSchedule.Next(now), job2.cronSchedule.Next(now)) + + job1 = s1.jobs[uid[2]] + job2 = findJob2(job1.jobName) + assert.NotNil(t, job2) + assert.Equal(t, job1.jobName, job2.jobName) + assert.Equal(t, job1.interval, job2.interval) + assert.Equal(t, job1.unit, job2.unit) + assert.ElementsMatch(t, job1.atTimes, job2.atTimes) + + job1 = s1.jobs[uid[3]] + job2 = findJob2(job1.jobName) + assert.NotNil(t, job2) + assert.Equal(t, job1.jobName, job2.jobName) + assert.Equal(t, job1.interval, job2.interval) + assert.Equal(t, job1.unit, job2.unit) + assert.ElementsMatch(t, job1.scheduledWeekdays, job2.scheduledWeekdays) + + job1 = s1.jobs[uid[4]] + job2 = findJob2(job1.jobName) + assert.NotNil(t, job2) + assert.Equal(t, job1.jobName, job2.jobName) + assert.Equal(t, job1.interval, job2.interval) + assert.Equal(t, job1.unit, job2.unit) + assert.ElementsMatch(t, job1.daysOfTheMonth, job2.daysOfTheMonth) +} diff --git a/storage.go b/storage.go new file mode 100644 index 00000000..0eb2ba49 --- /dev/null +++ b/storage.go @@ -0,0 +1,40 @@ +package gocron + +import ( + "time" + + "github.com/google/uuid" +) + +type JobObject struct { + UUID uuid.UUID + JobName string + LastRun time.Time + + Cron string + CronWithSeconds bool + + Every interface{} + + Unit string + WeekDay *time.Weekday + Months []int + + At []interface{} +} + +type JobStorage interface { + LoadJobs() ([]JobObject, error) +} + +type JobResult struct { + UUID uuid.UUID + JobName string + RunTimes *jobRunTimes + ReportTime time.Time + Err error +} + +type JobResultReporter interface { + ReportJobResult(JobResult) +}