Skip to content

Commit

Permalink
Upgraded worker pattern
Browse files Browse the repository at this point in the history
- The -n flag now means "amount of succeeds", instead of "amount of runs"
  * On exit code not 0, repeater will try again
- Upgraded the control of amount of runs
- Upgraded the result collection and separated the structure so that
  it's less linked to the workers
  • Loading branch information
baalimago committed Mar 2, 2024
1 parent 2471d96 commit b7c46e7
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 36 deletions.
10 changes: 10 additions & 0 deletions configured_oper.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ type configuredOper struct {
runtime time.Duration
results []Result
resultFile *os.File
workerWg *sync.WaitGroup
amIdleWorkers int
amSuccess int
workPlanMu *sync.Mutex
}

type userQuitError string
Expand Down Expand Up @@ -81,8 +85,14 @@ func New(am, workers int,
output: oMode,
outputFileMu: &sync.Mutex{},
increment: increment,
amSuccess: 0,
workerWg: &sync.WaitGroup{},
amIdleWorkers: workers,
workPlanMu: &sync.Mutex{},
}

c.workerWg.Add(workers)

file, err := c.getFile(outputFile, outputFileMode)
if err != nil {
if errors.Is(err, UserQuitError) {
Expand Down
26 changes: 21 additions & 5 deletions configured_oper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"os"
"strings"
"sync"
"testing"

"github.com/baalimago/go_away_boilerplate/pkg/testboil"
Expand Down Expand Up @@ -37,7 +38,10 @@ func Test_configuredOper(t *testing.T) {
progress: output.HIDDEN,
output: outputMode,
outputFile: testFile,
workPlanMu: &sync.Mutex{},
workerWg: &sync.WaitGroup{},
}
co.workerWg.Add(1)

co.run(context.Background())
testFileName := testFile.Name()
Expand Down Expand Up @@ -71,7 +75,10 @@ func Test_configuredOper(t *testing.T) {
progress: outputMode,
output: output.HIDDEN,
outputFile: testFile,
workPlanMu: &sync.Mutex{},
workerWg: &sync.WaitGroup{},
}
co.workerWg.Add(1)

co.run(context.Background())
testFileName := testFile.Name()
Expand Down Expand Up @@ -105,7 +112,10 @@ func Test_configuredOper(t *testing.T) {
progressFormat: wantFormat,
output: output.HIDDEN,
outputFile: testFile,
workPlanMu: &sync.Mutex{},
workerWg: &sync.WaitGroup{},
}
c.workerWg.Add(1)

c.run(context.Background())
testFileName := testFile.Name()
Expand All @@ -126,9 +136,12 @@ func Test_results(t *testing.T) {
// This should ouput "test"
want := "test"
c := configuredOper{
am: 1,
args: []string{"printf", want},
am: 1,
args: []string{"printf", want},
workPlanMu: &sync.Mutex{},
workerWg: &sync.WaitGroup{},
}
c.workerWg.Add(1)

c.run(context.Background())
gotLen := len(c.results)
Expand All @@ -147,8 +160,11 @@ func Test_results(t *testing.T) {
c := configuredOper{
am: wantAm,
// Date is most likely to exist in most OS's running this test
args: []string{"date"},
args: []string{"date"},
workerWg: &sync.WaitGroup{},
workPlanMu: &sync.Mutex{},
}
c.workerWg.Add(1)
c.run(context.Background())
gotLen := len(c.results)
// ensure that the correc amount is output
Expand Down Expand Up @@ -189,15 +205,15 @@ func Test_configuredOper_New(t *testing.T) {

t.Run("it should not return an error if increment is true and one argument is 'INC'", func(t *testing.T) {
args := []string{"test", "abc", "INC"}
_, gotErr := New(0, -1, args, output.HIDDEN, "testing", output.HIDDEN, "", "", true, "")
_, gotErr := New(0, 0, args, output.HIDDEN, "testing", output.HIDDEN, "", "", true, "")
if gotErr != nil {
t.Fatalf("expected nil, got: %v", gotErr)
}
})

t.Run("it should not return an error if increment is true and one argument contains 'INC'", func(t *testing.T) {
args := []string{"test", "abc", "another-argument/INC"}
_, gotErr := New(0, -1, args, output.HIDDEN, "testing", output.HIDDEN, "", "", true, "")
_, gotErr := New(0, 0, args, output.HIDDEN, "testing", output.HIDDEN, "", "", true, "")
if gotErr != nil {
t.Fatalf("expected nil, got: %v", gotErr)
}
Expand Down
113 changes: 91 additions & 22 deletions configured_oper_work.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import (
"context"
"fmt"
"io"
"os"
"os/exec"
"strconv"
"strings"
"time"

"github.com/baalimago/go_away_boilerplate/pkg/threadsafe"
"github.com/baalimago/repeater/pkg/filetools"
)

Expand Down Expand Up @@ -55,51 +55,108 @@ func (c *configuredOper) setupWorkers(workCtx context.Context, workChan chan int
for {
select {
case <-workCtx.Done():
c.workerWg.Done()
return
case taskIdx := <-workChan:
c.workPlanMu.Lock()
workingWorkrs := c.workers - c.amIdleWorkers
requestedTasks := c.am
// The current amount of workers is enough to reach the requested
// amount of tasks in parallel so kill off this worker to not overshoot
// the amount of repetitions
if workingWorkrs+c.amSuccess >= requestedTasks {
c.workerWg.Done()
c.workPlanMu.Unlock()
return
}
c.amIdleWorkers--
c.workPlanMu.Unlock()
res := c.doWork(workerID, taskIdx)
c.workPlanMu.Lock()
c.amIdleWorkers++
if !res.IsError {
c.amSuccess++
}
c.workPlanMu.Unlock()
resultChan <- res
}
}
}(i)
}
}

// runDelegator in a blocking manner, will append data to stats
func (c *configuredOper) runDelegator(ctx context.Context, resultChan chan Result, workChan chan int, progressStreams []io.Writer) {
func (c *configuredOper) runDelegator(ctx context.Context, workChan chan int) error {
i := 0
WORK_DELEGATOR:
for {
if ctx.Err() != nil {
printErr(fmt.Sprintf("context error: %v", ctx.Err()))
os.Exit(1)
return nil
}
select {
case res := <-resultChan:
if strings.Contains(res.Output, "ERROR:") {
printErr(fmt.Sprintf("worker: %v received %v\n", res.WorkerID, res.Output))
case <-ctx.Done():
return nil
case workChan <- i:
amSuccess := threadsafe.Read(c.workPlanMu, &c.amSuccess)
if amSuccess >= c.am {
return nil
} else {
// This is threadsafe snce only the delegator adds results
c.writeOutput(&res)
c.results = append(c.results, res)
filetools.WriteStringIfPossible(fmt.Sprintf(c.progressFormat, res.Idx, c.am), progressStreams)
i++
}
}
}
}

func (c *configuredOper) runResultCollector(ctx context.Context, resultChan chan Result, progressStreams []io.Writer) {
handleRes := func(res Result) int {
c.writeOutput(&res)
c.results = append(c.results, res)
amFails := 0
for _, r := range c.results {
if r.IsError {
amFails++
}
}
amSuccess := len(c.results) - amFails
filetools.WriteStringIfPossible(fmt.Sprintf(c.progressFormat, amSuccess, amFails, c.am), progressStreams)
return amSuccess
}

if len(c.results) == c.am {
break WORK_DELEGATOR
emptyResChan := func() {
for {
select {
case res := <-resultChan:
handleRes(res)
default:
return
}
case workChan <- i:
i++
}
}
for {
if ctx.Err() != nil {
emptyResChan()
return
}
select {
case <-ctx.Done():
emptyResChan()
return
case res := <-resultChan:
amSuccess := handleRes(res)
c.workPlanMu.Lock()
// Escape condition so that all results are collected
if amSuccess >= c.am && c.amIdleWorkers == c.workers {
c.workPlanMu.Unlock()
return
}
c.workPlanMu.Unlock()
}
}
}

// run the configured command. Blocking operation, errors are handeled internally as the output
// depends on the configuration
func (c *configuredOper) run(ctx context.Context) statistics {
func (c *configuredOper) run(rootCtx context.Context) statistics {
ctx, ctxCancel := context.WithCancel(rootCtx)
progressStreams := c.setupProgressStreams()
defer filetools.WriteStringIfPossible("\n", progressStreams)

workChan := make(chan int)
// Buffer the channel for each worker, so that the workers may leave a result and then quit
resultChan := make(chan Result, c.am)
Expand All @@ -109,9 +166,21 @@ func (c *configuredOper) run(ctx context.Context) statistics {
}
c.setupWorkers(workCtx, workChan, resultChan)

go func() {
c.workerWg.Wait()
ctxCancel()
}()
confOperStart := time.Now()
c.runDelegator(ctx, resultChan, workChan, progressStreams)
workCtxCancel()
go func() {
err := c.runDelegator(ctx, workChan)
if err != nil {
printErr(fmt.Sprintf("work delegator error: %v", err))
ctxCancel()
} else {
workCtxCancel()
}
}()
c.runResultCollector(ctx, resultChan, progressStreams)
c.runtime = time.Since(confOperStart)

return c.calcStats()
Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/baalimago/repeater/internal/output"
)

const DEFAULT_PROGRESS_FORMAT = "\rProgress: (%v/%v)"
const DEFAULT_PROGRESS_FORMAT = "\rProgress: (Success/Fail/Requested Am)(%v/%v/%v)"

var (
amRunsFlag = flag.Int("n", 1, "Amount of times you wish to repeat the command.")
Expand Down Expand Up @@ -63,7 +63,7 @@ func main() {
select {
case stats := <-isDone:
if *statisticsFlag {
fmt.Printf("== Statistics ==%s\n", &stats)
fmt.Printf("%s\n", &stats)
}

if c.resultFile != nil {
Expand Down
40 changes: 37 additions & 3 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,25 @@ package main
import (
"context"
"fmt"
"sync"
"testing"

"github.com/baalimago/repeater/pkg/filetools"
)

func Test_do(t *testing.T) {
t.Run("it should run command am amount of times", func(t *testing.T) {
t.Run("it should run command am amount of times, one workers", func(t *testing.T) {
expectedCalls := 123
testFilePath := fmt.Sprintf("%v/testFile", t.TempDir())
// Add one line per run, anticipate a certain amount of lines in the test file...
cOper := configuredOper{
am: expectedCalls,
args: []string{"/bin/bash", "-c", fmt.Sprintf("echo 'line' >> %v", testFilePath)},
am: expectedCalls,
args: []string{"/bin/bash", "-c", fmt.Sprintf("echo 'line' >> %v", testFilePath)},
amIdleWorkers: 1,
workerWg: &sync.WaitGroup{},
workPlanMu: &sync.Mutex{},
}
cOper.workerWg.Add(1)
cOper.run(context.Background())
// ... anticipate a certain amount of lines in the file when it's done
got, err := filetools.CheckAmLines(testFilePath)
Expand All @@ -28,4 +33,33 @@ func Test_do(t *testing.T) {
t.Fatalf("expected: %v, got: %v", expectedCalls, got)
}
})

for i := 0; i < 1000; i++ {
t.Run("it should run command am amount of times, 10 workers", func(t *testing.T) {
expectedCalls := 123
amWorkers := 10
testFilePath := fmt.Sprintf("%v/testFile", t.TempDir())
// Add one line per run, anticipate a certain amount of lines in the test file...
cOper := configuredOper{
am: expectedCalls,
workers: amWorkers,
amIdleWorkers: amWorkers,
args: []string{"/bin/bash", "-c", fmt.Sprintf("echo 'line' >> %v", testFilePath)},
workerWg: &sync.WaitGroup{},
workPlanMu: &sync.Mutex{},
}
cOper.workerWg.Add(amWorkers)
cOper.run(context.Background())
// ... anticipate a certain amount of lines in the file when it's done
got, err := filetools.CheckAmLines(testFilePath)
if err != nil {
t.Fatalf("failed to check amount of lines: %v", err)
}

if got != expectedCalls {
t.Fatalf("expected: %v, got: %v", expectedCalls, got)
}
})
}

}
Loading

0 comments on commit b7c46e7

Please sign in to comment.