diff --git a/configured_oper.go b/configured_oper.go index c652609..7b28c3f 100644 --- a/configured_oper.go +++ b/configured_oper.go @@ -29,6 +29,10 @@ type configuredOper struct { runtime time.Duration results []Result resultFile *os.File + workerWg *sync.WaitGroup + amIdleWorkers int + amSuccess int + workPlanMu *sync.Mutex } type userQuitError string @@ -81,8 +85,14 @@ func New(am, workers int, output: oMode, outputFileMu: &sync.Mutex{}, increment: increment, + amSuccess: 0, + workerWg: &sync.WaitGroup{}, + amIdleWorkers: workers, + workPlanMu: &sync.Mutex{}, } + c.workerWg.Add(workers) + file, err := c.getFile(outputFile, outputFileMode) if err != nil { if errors.Is(err, UserQuitError) { diff --git a/configured_oper_test.go b/configured_oper_test.go index 13755a1..d0f4739 100644 --- a/configured_oper_test.go +++ b/configured_oper_test.go @@ -7,6 +7,7 @@ import ( "io" "os" "strings" + "sync" "testing" "github.com/baalimago/go_away_boilerplate/pkg/testboil" @@ -37,7 +38,10 @@ func Test_configuredOper(t *testing.T) { progress: output.HIDDEN, output: outputMode, outputFile: testFile, + workPlanMu: &sync.Mutex{}, + workerWg: &sync.WaitGroup{}, } + co.workerWg.Add(1) co.run(context.Background()) testFileName := testFile.Name() @@ -71,7 +75,10 @@ func Test_configuredOper(t *testing.T) { progress: outputMode, output: output.HIDDEN, outputFile: testFile, + workPlanMu: &sync.Mutex{}, + workerWg: &sync.WaitGroup{}, } + co.workerWg.Add(1) co.run(context.Background()) testFileName := testFile.Name() @@ -105,7 +112,10 @@ func Test_configuredOper(t *testing.T) { progressFormat: wantFormat, output: output.HIDDEN, outputFile: testFile, + workPlanMu: &sync.Mutex{}, + workerWg: &sync.WaitGroup{}, } + c.workerWg.Add(1) c.run(context.Background()) testFileName := testFile.Name() @@ -126,9 +136,12 @@ func Test_results(t *testing.T) { // This should ouput "test" want := "test" c := configuredOper{ - am: 1, - args: []string{"printf", want}, + am: 1, + args: []string{"printf", want}, + workPlanMu: &sync.Mutex{}, + workerWg: &sync.WaitGroup{}, } + c.workerWg.Add(1) c.run(context.Background()) gotLen := len(c.results) @@ -147,8 +160,11 @@ func Test_results(t *testing.T) { c := configuredOper{ am: wantAm, // Date is most likely to exist in most OS's running this test - args: []string{"date"}, + args: []string{"date"}, + workerWg: &sync.WaitGroup{}, + workPlanMu: &sync.Mutex{}, } + c.workerWg.Add(1) c.run(context.Background()) gotLen := len(c.results) // ensure that the correc amount is output @@ -189,7 +205,7 @@ func Test_configuredOper_New(t *testing.T) { t.Run("it should not return an error if increment is true and one argument is 'INC'", func(t *testing.T) { args := []string{"test", "abc", "INC"} - _, gotErr := New(0, -1, args, output.HIDDEN, "testing", output.HIDDEN, "", "", true, "") + _, gotErr := New(0, 0, args, output.HIDDEN, "testing", output.HIDDEN, "", "", true, "") if gotErr != nil { t.Fatalf("expected nil, got: %v", gotErr) } @@ -197,7 +213,7 @@ func Test_configuredOper_New(t *testing.T) { t.Run("it should not return an error if increment is true and one argument contains 'INC'", func(t *testing.T) { args := []string{"test", "abc", "another-argument/INC"} - _, gotErr := New(0, -1, args, output.HIDDEN, "testing", output.HIDDEN, "", "", true, "") + _, gotErr := New(0, 0, args, output.HIDDEN, "testing", output.HIDDEN, "", "", true, "") if gotErr != nil { t.Fatalf("expected nil, got: %v", gotErr) } diff --git a/configured_oper_work.go b/configured_oper_work.go index 55e07eb..b3c79f6 100644 --- a/configured_oper_work.go +++ b/configured_oper_work.go @@ -4,12 +4,12 @@ import ( "context" "fmt" "io" - "os" "os/exec" "strconv" "strings" "time" + "github.com/baalimago/go_away_boilerplate/pkg/threadsafe" "github.com/baalimago/repeater/pkg/filetools" ) @@ -55,9 +55,29 @@ func (c *configuredOper) setupWorkers(workCtx context.Context, workChan chan int for { select { case <-workCtx.Done(): + c.workerWg.Done() return case taskIdx := <-workChan: + c.workPlanMu.Lock() + workingWorkrs := c.workers - c.amIdleWorkers + requestedTasks := c.am + // The current amount of workers is enough to reach the requested + // amount of tasks in parallel so kill off this worker to not overshoot + // the amount of repetitions + if workingWorkrs+c.amSuccess >= requestedTasks { + c.workerWg.Done() + c.workPlanMu.Unlock() + return + } + c.amIdleWorkers-- + c.workPlanMu.Unlock() res := c.doWork(workerID, taskIdx) + c.workPlanMu.Lock() + c.amIdleWorkers++ + if !res.IsError { + c.amSuccess++ + } + c.workPlanMu.Unlock() resultChan <- res } } @@ -65,41 +85,78 @@ func (c *configuredOper) setupWorkers(workCtx context.Context, workChan chan int } } -// runDelegator in a blocking manner, will append data to stats -func (c *configuredOper) runDelegator(ctx context.Context, resultChan chan Result, workChan chan int, progressStreams []io.Writer) { +func (c *configuredOper) runDelegator(ctx context.Context, workChan chan int) error { i := 0 -WORK_DELEGATOR: for { if ctx.Err() != nil { - printErr(fmt.Sprintf("context error: %v", ctx.Err())) - os.Exit(1) + return nil } select { - case res := <-resultChan: - if strings.Contains(res.Output, "ERROR:") { - printErr(fmt.Sprintf("worker: %v received %v\n", res.WorkerID, res.Output)) + case <-ctx.Done(): + return nil + case workChan <- i: + amSuccess := threadsafe.Read(c.workPlanMu, &c.amSuccess) + if amSuccess >= c.am { + return nil } else { - // This is threadsafe snce only the delegator adds results - c.writeOutput(&res) - c.results = append(c.results, res) - filetools.WriteStringIfPossible(fmt.Sprintf(c.progressFormat, res.Idx, c.am), progressStreams) + i++ + } + } + } +} + +func (c *configuredOper) runResultCollector(ctx context.Context, resultChan chan Result, progressStreams []io.Writer) { + handleRes := func(res Result) int { + c.writeOutput(&res) + c.results = append(c.results, res) + amFails := 0 + for _, r := range c.results { + if r.IsError { + amFails++ } + } + amSuccess := len(c.results) - amFails + filetools.WriteStringIfPossible(fmt.Sprintf(c.progressFormat, amSuccess, amFails, c.am), progressStreams) + return amSuccess + } - if len(c.results) == c.am { - break WORK_DELEGATOR + emptyResChan := func() { + for { + select { + case res := <-resultChan: + handleRes(res) + default: + return } - case workChan <- i: - i++ + } + } + for { + if ctx.Err() != nil { + emptyResChan() + return + } + select { + case <-ctx.Done(): + emptyResChan() + return + case res := <-resultChan: + amSuccess := handleRes(res) + c.workPlanMu.Lock() + // Escape condition so that all results are collected + if amSuccess >= c.am && c.amIdleWorkers == c.workers { + c.workPlanMu.Unlock() + return + } + c.workPlanMu.Unlock() } } } // run the configured command. Blocking operation, errors are handeled internally as the output // depends on the configuration -func (c *configuredOper) run(ctx context.Context) statistics { +func (c *configuredOper) run(rootCtx context.Context) statistics { + ctx, ctxCancel := context.WithCancel(rootCtx) progressStreams := c.setupProgressStreams() - defer filetools.WriteStringIfPossible("\n", progressStreams) - workChan := make(chan int) // Buffer the channel for each worker, so that the workers may leave a result and then quit resultChan := make(chan Result, c.am) @@ -109,9 +166,21 @@ func (c *configuredOper) run(ctx context.Context) statistics { } c.setupWorkers(workCtx, workChan, resultChan) + go func() { + c.workerWg.Wait() + ctxCancel() + }() confOperStart := time.Now() - c.runDelegator(ctx, resultChan, workChan, progressStreams) - workCtxCancel() + go func() { + err := c.runDelegator(ctx, workChan) + if err != nil { + printErr(fmt.Sprintf("work delegator error: %v", err)) + ctxCancel() + } else { + workCtxCancel() + } + }() + c.runResultCollector(ctx, resultChan, progressStreams) c.runtime = time.Since(confOperStart) return c.calcStats() diff --git a/main.go b/main.go index 09fdadc..7eca317 100644 --- a/main.go +++ b/main.go @@ -13,7 +13,7 @@ import ( "github.com/baalimago/repeater/internal/output" ) -const DEFAULT_PROGRESS_FORMAT = "\rProgress: (%v/%v)" +const DEFAULT_PROGRESS_FORMAT = "\rProgress: (Success/Fail/Requested Am)(%v/%v/%v)" var ( amRunsFlag = flag.Int("n", 1, "Amount of times you wish to repeat the command.") @@ -63,7 +63,7 @@ func main() { select { case stats := <-isDone: if *statisticsFlag { - fmt.Printf("== Statistics ==%s\n", &stats) + fmt.Printf("%s\n", &stats) } if c.resultFile != nil { diff --git a/main_test.go b/main_test.go index c4a5ba3..9fb8a59 100644 --- a/main_test.go +++ b/main_test.go @@ -3,20 +3,25 @@ package main import ( "context" "fmt" + "sync" "testing" "github.com/baalimago/repeater/pkg/filetools" ) func Test_do(t *testing.T) { - t.Run("it should run command am amount of times", func(t *testing.T) { + t.Run("it should run command am amount of times, one workers", func(t *testing.T) { expectedCalls := 123 testFilePath := fmt.Sprintf("%v/testFile", t.TempDir()) // Add one line per run, anticipate a certain amount of lines in the test file... cOper := configuredOper{ - am: expectedCalls, - args: []string{"/bin/bash", "-c", fmt.Sprintf("echo 'line' >> %v", testFilePath)}, + am: expectedCalls, + args: []string{"/bin/bash", "-c", fmt.Sprintf("echo 'line' >> %v", testFilePath)}, + amIdleWorkers: 1, + workerWg: &sync.WaitGroup{}, + workPlanMu: &sync.Mutex{}, } + cOper.workerWg.Add(1) cOper.run(context.Background()) // ... anticipate a certain amount of lines in the file when it's done got, err := filetools.CheckAmLines(testFilePath) @@ -28,4 +33,33 @@ func Test_do(t *testing.T) { t.Fatalf("expected: %v, got: %v", expectedCalls, got) } }) + + for i := 0; i < 1000; i++ { + t.Run("it should run command am amount of times, 10 workers", func(t *testing.T) { + expectedCalls := 123 + amWorkers := 10 + testFilePath := fmt.Sprintf("%v/testFile", t.TempDir()) + // Add one line per run, anticipate a certain amount of lines in the test file... + cOper := configuredOper{ + am: expectedCalls, + workers: amWorkers, + amIdleWorkers: amWorkers, + args: []string{"/bin/bash", "-c", fmt.Sprintf("echo 'line' >> %v", testFilePath)}, + workerWg: &sync.WaitGroup{}, + workPlanMu: &sync.Mutex{}, + } + cOper.workerWg.Add(amWorkers) + cOper.run(context.Background()) + // ... anticipate a certain amount of lines in the file when it's done + got, err := filetools.CheckAmLines(testFilePath) + if err != nil { + t.Fatalf("failed to check amount of lines: %v", err) + } + + if got != expectedCalls { + t.Fatalf("expected: %v, got: %v", expectedCalls, got) + } + }) + } + } diff --git a/statistics.go b/statistics.go index c46bc93..11458b9 100644 --- a/statistics.go +++ b/statistics.go @@ -16,6 +16,8 @@ type Result struct { } type statistics struct { + am int + amFails int max Result min Result total time.Duration @@ -35,10 +37,18 @@ func (r *Result) Write(p []byte) (n int, err error) { func (c *configuredOper) calcStats() statistics { tot := time.Duration(0) n := len(c.results) + if n == 0 { + return statistics{} + } minDur := time.Duration(9223372036854775807) maxDur := time.Duration(-9223372036854775808) + amFails := 0 var min, max Result for _, r := range c.results { + if r.IsError { + amFails++ + continue + } if r.Runtime < minDur { min = r minDur = r.Runtime @@ -58,6 +68,8 @@ func (c *configuredOper) calcStats() statistics { variance := varSum / float64(n) stdDeviation := time.Duration(int64(math.Sqrt(variance))) return statistics{ + am: c.am, + amFails: amFails, runtime: c.runtime, min: min, max: max, @@ -70,10 +82,14 @@ func (c *configuredOper) calcStats() statistics { func (s *statistics) String() string { return fmt.Sprintf(` -Runtime: %s, Total routine work time: %v, -Average time per task: %v, Std deviation: %v -Max time, index: %v, time: %v -Min time, index: %v, time: %v`, +== Statistics == +Amount of repitions: %v, amount of failures: %v, +The following is calculated on successful attempts: + Runtime: %s, Total routine work time: %v, + Average time per task: %v, Std deviation: %v + Max time, index: %v, time: %v + Min time, index: %v, time: %v`, + s.am, s.amFails, s.runtime, s.total, s.average, s.stdDev, s.max.Idx, s.max.Runtime,