Skip to content

Commit

Permalink
fix(security): panic due to concurrent map write on handled go routine (
Browse files Browse the repository at this point in the history
#92)

* clean: remove not used fields and add it again when is usefull

* fix(security): panic due to concurrent map write on handled go routine

* test(security): implement deepcopy test suite and rename result to wantedresult
  • Loading branch information
42atomys authored Jul 9, 2022
1 parent 2d21f17 commit 805accf
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 24 deletions.
13 changes: 7 additions & 6 deletions internal/server/v1alpha1/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,16 @@ func (s *Server) runSecurity(spec *config.WebhookSpec, r *http.Request, body []b
return config.ErrSpecNotFound
}

pipeline := spec.SecurityPipeline
if pipeline == nil {
if spec.SecurityPipeline == nil {
return errors.New("no pipeline to run. security is not configured")
}

pipeline.Inputs["request"] = r
pipeline.Inputs["payload"] = string(body)

pipeline.WantResult(true).Run()
pipeline := spec.SecurityPipeline.DeepCopy()
pipeline.
WithInput("request", r).
WithInput("payload", string(body)).
WantResult(true).
Run()

log.Debug().Msgf("security pipeline result: %t", pipeline.CheckResult())
if !pipeline.CheckResult() {
Expand Down
60 changes: 45 additions & 15 deletions pkg/factory/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,23 @@ import (
// NewPipeline initializes a new pipeline
func NewPipeline() *Pipeline {
return &Pipeline{
Outputs: make(map[string]map[string]interface{}),
Variables: make(map[string]interface{}),
Config: make(map[string]interface{}),
Inputs: make(map[string]interface{}),
Outputs: make(map[string]map[string]interface{}),
Inputs: make(map[string]interface{}),
}
}

// DeepCopy creates a deep copy of the pipeline.
func (p *Pipeline) DeepCopy() *Pipeline {
deepCopy := NewPipeline().WantResult(p.WantedResult)
for _, f := range p.factories {
deepCopy.AddFactory(f)
}
for k, v := range p.Inputs {
deepCopy.WithInput(k, v)
}
return deepCopy
}

// AddFactory adds a new factory to the pipeline. New Factory is added to the
// end of the pipeline.
func (p *Pipeline) AddFactory(f *Factory) *Pipeline {
Expand All @@ -38,19 +48,19 @@ func (p *Pipeline) FactoryCount() int {
// the result is compared to the last result of the pipeline.
// type and value of the result must be the same as the last result
func (p *Pipeline) WantResult(result interface{}) *Pipeline {
p.Result = result
p.WantedResult = result
return p
}

// CheckResult checks if the pipeline result is the same as the wanted result.
// type and value of the result must be the same as the last result
func (p *Pipeline) CheckResult() bool {
for _, lr := range p.LastResults {
if reflect.TypeOf(lr) != reflect.TypeOf(p.Result) {
if reflect.TypeOf(lr) != reflect.TypeOf(p.WantedResult) {
log.Warn().Msgf("pipeline result is not the same type as wanted result")
return false
}
if lr == p.Result {
if lr == p.WantedResult {
return true
}
}
Expand Down Expand Up @@ -82,19 +92,14 @@ func (p *Pipeline) Run() *Factory {
log.Debug().Msgf("factory %s output %s = %+v", f.Name, v.Name, v.Value)
}

var key = f.Identifier()
if p.Outputs[key] == nil {
p.Outputs[key] = make(map[string]interface{})
}

if p.Result != nil {
if p.WantedResult != nil {
p.LastResults = make([]interface{}, 0)
}

for _, v := range f.Outputs {
p.Outputs[key][v.Name] = v.Value
p.writeOutputSafely(f.Identifier(), v.Name, v.Value)

if p.Result != nil {
if p.WantedResult != nil {
p.LastResults = append(p.LastResults, v.Value)
}
}
Expand All @@ -106,3 +111,28 @@ func (p *Pipeline) Run() *Factory {

return nil
}

// WithInput adds a new input to the pipeline. The input is added safely to prevent
// concurrent map writes error.
func (p *Pipeline) WithInput(name string, value interface{}) *Pipeline {
p.mu.Lock()
defer p.mu.Unlock()

p.Inputs[name] = value
return p
}

// writeOutputSafely writes the output to the pipeline output map. If the key
// already exists, the value is overwritten. This is principally used to
// write on the map withtout create a new map or PANIC due to concurrency map writes.
func (p *Pipeline) writeOutputSafely(factoryIdentifier, factoryOutputName string, value interface{}) {
p.mu.Lock()
defer p.mu.Unlock()

// Ensure the factory output map exists
if p.Outputs[factoryIdentifier] == nil {
p.Outputs[factoryIdentifier] = make(map[string]interface{})
}

p.Outputs[factoryIdentifier][factoryOutputName] = value
}
20 changes: 20 additions & 0 deletions pkg/factory/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ func (suite *testSuitePipeline) TestPipelineRun() {
suite.Equal(wantedResult, pipeline.Outputs["fake"]["message"])
}

func (suite *testSuitePipeline) TestPipelineWithInput() {
var pipeline = NewPipeline()
pipeline.WithInput("test", true)

suite.True(pipeline.Inputs["test"].(bool))
}

func (suite *testSuitePipeline) TestPipelineResultWithInvalidType() {
var pipeline = NewPipeline()

Expand Down Expand Up @@ -100,5 +107,18 @@ func (suite *testSuitePipeline) TestPipelineFailedDueToFactoryErr() {
pipeline.AddFactory(factory).AddFactory(factory2)
ret := pipeline.Run()
suite.Equal(factory, ret)
}

func (suite *testSuitePipeline) TestPipelineDeepCopy() {
var pipeline = NewPipeline()
var factory = newFactory(&fakeFactory{})
var factory2 = newFactory(&fakeFactory{})
factory.Inputs = make([]*Var, 0)

pipeline.AddFactory(factory).AddFactory(factory2)
pipeline.Inputs["name"] = "test"
pipeline.WantResult("test")

var pipeline2 = pipeline.DeepCopy()
suite.Equal(pipeline, pipeline2)
}
8 changes: 5 additions & 3 deletions pkg/factory/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package factory
import (
"context"
"reflect"
"sync"

"atomys.codes/webhooked/internal/valuable"
)
Expand All @@ -24,12 +25,13 @@ type InputConfig struct {
// It is used to store the inputs and outputs of all factories executed
// by the pipeline and secure the result of the pipeline.
type Pipeline struct {
mu sync.RWMutex
factories []*Factory

Result interface{}
LastResults []interface{}
WantedResult interface{}
LastResults []interface{}

Variables, Config, Inputs map[string]interface{}
Inputs map[string]interface{}

Outputs map[string]map[string]interface{}
}
Expand Down

0 comments on commit 805accf

Please sign in to comment.