Skip to content

Commit

Permalink
Feature/order output (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
F-Amaral authored Oct 27, 2023
1 parent 528343e commit 0a88db7
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 25 deletions.
17 changes: 14 additions & 3 deletions async/processor.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package async

import "github.com/f-amaral/go-async/sort"

// Processor is a generic interface to process asynchronously a list of inputs.
type Processor[I any, O any] interface {
Process([]I) ProcessResult
Expand All @@ -8,9 +10,14 @@ type Processor[I any, O any] interface {

// JobResult is the result of a single job.
type JobResult struct {
Input any
Output any
Err error
Input any
Output any
Err error
ExecIndex int
}

func (j JobResult) GetIndex() int {
return j.ExecIndex
}

// ProcessResult is the result of a list of jobs, the hasError flag indicates if any job has failed.
Expand All @@ -30,3 +37,7 @@ func (p *ProcessResult) GetErrors() (errs []error) {
}
return errs
}

func (p *ProcessResult) Sort() {
p.Results = sort.Merge(p.Results)
}
57 changes: 41 additions & 16 deletions async/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,55 @@ package async_test

import (
"errors"
"sort"
"testing"

"github.com/f-amaral/go-async/async"
"github.com/stretchr/testify/assert"
)

func TestProcessResult_GetErrors_WhenJobsHaveErrors(t *testing.T) {
jobResults := buildJobResultsWithErrs()
processResult := async.ProcessResult{
Results: jobResults,
HasError: true,
}
errs := processResult.GetErrors()
assert.Equal(t, len(jobResults), len(errs))
func TestProcessResult(t *testing.T) {
t.Run("Should get errors when jobs have errors", func(t *testing.T) {

jobResults := buildJobResultsWithErrs()
processResult := async.ProcessResult{
Results: jobResults,
HasError: true,
}
errs := processResult.GetErrors()
assert.Equal(t, len(jobResults), len(errs))
})
t.Run("Should get errors when jobs dont have errors", func(t *testing.T) {

jobResults := buildJobResultsWithoutErrs()
processResult := async.ProcessResult{
Results: jobResults,
HasError: false,
}
errs := processResult.GetErrors()
assert.Equal(t, 0, len(errs))
})
t.Run("Should return correct sorted outputs", func(t *testing.T) {
jobResults := buildJobResultsWithoutErrs()
processResult := async.ProcessResult{
Results: jobResults,
HasError: false,
}
processResult.Sort()
assert.True(t, sort.SliceIsSorted(processResult.Results, func(i, j int) bool {
return processResult.Results[i].GetIndex() < processResult.Results[j].GetIndex()
}))
})
}

func TestProcessResult_GetErrors_WhenJobsDontHaveErrors(t *testing.T) {
jobResults := buildJobResultsWithoutErrs()
processResult := async.ProcessResult{
Results: jobResults,
HasError: false,
}
errs := processResult.GetErrors()
assert.Equal(t, 0, len(errs))
func TestJobResult(t *testing.T) {
t.Run("Should return correct index when getting index", func(t *testing.T) {

jobResult := async.JobResult{
ExecIndex: 10,
}
assert.Equal(t, 10, jobResult.GetIndex())
})
}

// region Benchmarks
Expand Down
38 changes: 32 additions & 6 deletions pool/jobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,39 +9,60 @@ type JobFn[I any, O any] func(I) (O, error)
type jobParams[I any] struct {
input I
resultsCh chan async.JobResult
execIndex int
}

type jobPool[I any, O any] struct {
jobCh chan<- jobParams[I]
sort bool
}

func NewPool[I any, O any](workers int, jobFunc JobFn[I, O]) async.Processor[I, O] {
func (s *jobParams[I]) GetIndex() int {
return s.execIndex
}

type option[I any, O any] func(*jobPool[I, O])

func NewPool[I any, O any](workers int, jobFunc JobFn[I, O], options ...option[I, O]) async.Processor[I, O] {
jobs := make(chan jobParams[I], workers)

for w := 0; w < workers; w++ {
go func() {
for params := range jobs {
output, err := jobFunc(params.input)
params.resultsCh <- async.JobResult{
Input: params.input,
Output: output,
Err: err,
Input: params.input,
Output: output,
Err: err,
ExecIndex: params.GetIndex(),
}
}
}()
}

return &jobPool[I, O]{
p := &jobPool[I, O]{
jobCh: jobs,
}

for _, opt := range options {
opt(p)
}
return p
}

func WithSortingOutput[I any, O any]() option[I, O] {
return func(p *jobPool[I, O]) {
p.sort = true
}
}

func (s *jobPool[I, O]) Process(inputs []I) async.ProcessResult {
results := make(chan async.JobResult, len(inputs))
for _, input := range inputs {
for executionIndex, input := range inputs {
s.jobCh <- jobParams[I]{
input: input,
resultsCh: results,
execIndex: executionIndex,
}
}

Expand All @@ -62,7 +83,12 @@ func (s *jobPool[I, O]) Process(inputs []I) async.ProcessResult {
output.Results = append(output.Results, result)
}

if s.sort {
output.Sort()
}

return output

}

func (s *jobPool[I, O]) Close() {
Expand Down
22 changes: 22 additions & 0 deletions pool/jobpool_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pool_test

import (
"sort"
"testing"

"github.com/f-amaral/go-async/async"
Expand Down Expand Up @@ -46,6 +47,27 @@ func TestJobPool_Process(t *testing.T) {
assert.Equal(t, len(dataSet), len(res.Results))
assert.True(t, res.HasError)
})
t.Run("Should return sorted output when created with sorting output", func(t *testing.T) {
// arrange
dataSet := make([]int, 100)
for i := 0; i < 100; i++ {
dataSet[i] = i
}

sut := pool.NewPool[int, int](10, func(i int) (int, error) {
return i, assert.AnError
}, pool.WithSortingOutput[int, int]())

// act
res := sut.Process(dataSet)

// assert
assert.Equal(t, len(dataSet), len(res.Results))
assert.True(t, res.HasError)
assert.True(t, sort.SliceIsSorted(res.Results, func(i, j int) bool {
return res.Results[i].ExecIndex < res.Results[j].ExecIndex
}))
})
}

func TestJobPool_Close(t *testing.T) {
Expand Down

0 comments on commit 0a88db7

Please sign in to comment.