Skip to content

Commit

Permalink
fix: retries for policies cm comprised of multiple dependent files
Browse files Browse the repository at this point in the history
Signed-off-by: Ievgenii Shepeliuk <[email protected]>
  • Loading branch information
eshepelyuk committed Jun 21, 2022
1 parent 7cf023a commit 2ae02bd
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 43 deletions.
13 changes: 11 additions & 2 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,16 @@ k3d: && _skaffold-ctx
k3d cluster delete kube-mgmt || true
k3d cluster create --config ./test/e2e/k3d.yaml

rebuild: && build
rm -rf {{skaffoldTags}}

# build and publish docker to local registry
build: _skaffold-ctx
skaffold build --file-output={{skaffoldTags}} --platform=linux/amd64

# install into local k8s
up: _skaffold-ctx down
kubectl delete cm -l kube-mgmt/e2e=true || true
skaffold deploy --build-artifacts={{skaffoldTags}}

# remove from local k8s
Expand All @@ -64,6 +68,8 @@ down:

# run only e2e test script
test-e2e-sh:
#!/usr/bin/env bash
set -euo pipefail
kubectl delete cm -l kube-mgmt/e2e=true || true
./test/e2e/{{E2E_TEST}}/test.sh
Expand All @@ -73,7 +79,10 @@ test-e2e: up test-e2e-sh
# run all e2e tests
test-e2e-all: build
#!/usr/bin/env bash
set -euxo pipefail
for E in $(find test/e2e/ -mindepth 1 -maxdepth 1 -type d -printf '%f\n'); do
set -euo pipefail
for E in $(find test/e2e/ -mindepth 1 -maxdepth 1 -type d -printf '%f\n' | sort); do
echo "================"
echo "= Running ${E} "
echo "================"
just E2E_TEST=${E} test-e2e
done
71 changes: 33 additions & 38 deletions pkg/configmap/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,12 @@ import (
)

const (
defaultRetries = 2
statusAnnotationKey = "openpolicyagent.org/kube-mgmt-status"
<<<<<<< HEAD
retriesAnnotationKey = "openpolicyagent.org/kube-mgmt-retries"
=======
>>>>>>> c80123be (chore: same status annotation name for data and policies)

// Special namespace in Kubernetes federation that holds scheduling policies.
// commented because staticcheck: 'const kubeFederationSchedulingPolicy is unused (U1000)'
// kubeFederationSchedulingPolicy = "kube-federation-scheduling-policy"

resyncPeriod = time.Second * 60
syncResetBackoffMin = time.Second
syncResetBackoffMax = time.Second * 30
Expand Down Expand Up @@ -139,8 +135,7 @@ func (s *Sync) Run(namespaces []string) (chan struct{}, error) {
}
quit := make(chan struct{})

logrus.Infof("Policy/data ConfigMap processor connected to K8s: namespaces=%v", namespaces)

logrus.Infof("Policy/data ConfigMap processor connected to K8s: namespaces=%v", namespaces)
for _, namespace := range namespaces {
if namespace == "*" {
namespace = v1.NamespaceAll
Expand All @@ -166,22 +161,22 @@ func (s *Sync) Run(namespaces []string) (chan struct{}, error) {

func (s *Sync) add(obj interface{}) {
cm := obj.(*v1.ConfigMap)
match, isPolicy := s.matcher(cm)
logrus.Debugf("OnAdd cm=%v/%v, match=%v, isPolicy=%v", cm.Namespace, cm.Name, match, isPolicy)
if match {
if match, isPolicy := s.matcher(cm); match {
logrus.Debugf("OnAdd cm=%v/%v, isPolicy=%v", cm.Namespace, cm.Name, isPolicy)
s.syncAdd(cm, isPolicy)
}
}

func (s *Sync) update(oldObj, obj interface{}) {
oldCm, cm := oldObj.(*v1.ConfigMap), obj.(*v1.ConfigMap)
match, isPolicy := s.matcher(cm)
logrus.Debugf("OnUpdate cm=%v/%v, match=%v, isPolicy=%v", cm.Namespace, cm.Name, match, isPolicy)
if match {
if match, isPolicy := s.matcher(cm); match {
logrus.Debugf("OnUpdate cm=%v/%v, isPolicy=%v, oldVer=%v, newVer=%v",
cm.Namespace, cm.Name, isPolicy, oldCm.GetResourceVersion(), cm.GetResourceVersion())
if cm.GetResourceVersion() != oldCm.GetResourceVersion() {
fp, oldFp := fingerprint(cm), fingerprint(oldCm)
logrus.Debugf("OnUpdate cm=%v/%v, old fingerprint=%v, new fingeprint=%v", cm.Namespace, cm.Name, fp, oldFp)
if fp != oldFp {
newFp, oldFp := fingerprint(cm), fingerprint(oldCm)
rtrVal := cm.Annotations[retriesAnnotationKey]
logrus.Debugf("OnUpdate cm=%v/%v, retries=%v, oldFp=%v, newFp=%v", cm.Namespace, cm.Name, rtrVal, oldFp, newFp)
if newFp != oldFp || rtrVal != "0" {
s.syncAdd(cm, isPolicy)
}
}
Expand All @@ -198,15 +193,15 @@ func (s *Sync) delete(obj interface{}) {
obj = d.Obj
}
cm := obj.(*v1.ConfigMap)
logrus.Debugf("OnDelete cm=%v/%v", cm.Namespace, cm.Name)
if match, isPolicy := s.matcher(cm); match {
logrus.Debugf("OnDelete cm=%v/%v", cm.Namespace, cm.Name)
s.syncRemove(cm, isPolicy)
}
}

func (s *Sync) syncAdd(cm *v1.ConfigMap, isPolicy bool) {
path := fmt.Sprintf("%v/%v", cm.Namespace, cm.Name)
logrus.Debugf("Attempting to add cm=%v, isPolicy=%v", path, isPolicy)
logrus.Debugf("Adding cm=%v, isPolicy=%v", path, isPolicy)
// sort keys so that errors, if any, are always in the same order
sortedKeys := make([]string, 0, len(cm.Data))
for key := range cm.Data {
Expand All @@ -220,7 +215,7 @@ func (s *Sync) syncAdd(cm *v1.ConfigMap, isPolicy bool) {
var err error
if isPolicy {
err = s.opa.InsertPolicy(id, []byte(value))
logrus.Infof("Add policy %v finished, err=%v", id, err)
logrus.Infof("Added policy %v, err=%v", id, err)
} else {
// We don't need to know the JSON structure, just pass it
// directly to the OPA data store.
Expand All @@ -229,19 +224,30 @@ func (s *Sync) syncAdd(cm *v1.ConfigMap, isPolicy bool) {
logrus.Errorf("Failed to parse JSON data in configmap with id=%s", id)
} else {
err = s.opa.PutData(id, data)
logrus.Infof("Add data %v finished, err=%v", id, err)
logrus.Infof("Added data %v, err=%v", id, err)
}
}

if err != nil {
syncErr = append(syncErr, err)
}
}
if syncErr != nil {
var retries = 3
if cm.Annotations != nil && isPolicy {
logrus.Infof("Error loading policies from cm=%v, retries=%v", path, cm.Annotations[retriesAnnotationKey])
}
var retries int = 0
if isPolicy {
if rStr, ok := cm.Annotations[retriesAnnotationKey]; ok {
r, err := strconv.Atoi(rStr)
if err == nil && r > 0 {
retries = r - 1
logrus.Debugf("Adding policies error cm=%v, old retry=%v, new retry=%v", path, rStr, retries)
} else if err == nil && r == 0 {
retries = defaultRetries
logrus.Debugf("Adding policies error cm=%v, old retry=%v, new retry=%v", path, rStr, retries)
}
} else {
retries = defaultRetries
logrus.Debugf("Adding policies error cm=%v, no retry annotation, new retry=%v", path, retries)
}
}
s.setAnnotations(cm, status{
Status: "error",
Error: syncErr,
Expand All @@ -258,7 +264,6 @@ func (s *Sync) syncRemove(cm *v1.ConfigMap, isPolicy bool) {
path := fmt.Sprintf("%v/%v", cm.Namespace, cm.Name)
for key := range cm.Data {
id := fmt.Sprintf("%v/%v", path, key)

if isPolicy {
if err := s.opa.DeletePolicy(id); err != nil {
logrus.Errorf("Failed to delete policy %v: %v", id, err)
Expand All @@ -275,23 +280,13 @@ func (s *Sync) syncRemove(cm *v1.ConfigMap, isPolicy bool) {
func (s *Sync) setAnnotations(cm *v1.ConfigMap, st status, retries int) {
bs, err := json.Marshal(st)
if err != nil {
logrus.Errorf("Failed to serialize %v for %v/%v: %v", statusAnnotationKey, cm.Namespace, cm.Name, err)
logrus.Errorf("Failed to serialize status for cm=%v/%v, err=%v", cm.Namespace, cm.Name, err)
return
}
annotation := string(bs)
if cm.Annotations != nil {
if existing, ok := cm.Annotations[statusAnnotationKey]; ok {
if existing == annotation {
// If the annotation did not change, do not write it.
// (issue https://github.com/open-policy-agent/kube-mgmt/issues/90)
return
}
}
}
patch := map[string]interface{}{
"metadata": map[string]interface{}{
"annotations": map[string]interface{}{
statusAnnotationKey: annotation,
statusAnnotationKey: string(bs),
retriesAnnotationKey: strconv.Itoa(retries),
},
},
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/default/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ set -e
set -x

TOKEN=$(kubectl exec deploy/kube-mgmt-opa-kube-mgmt -c mgmt -- cat /bootstrap/mgmt-token)
OPA="http --default-scheme=https --verify=no -A bearer -a ${TOKEN} :8443/v1"
OPA="http --ignore-stdin --default-scheme=https --verify=no -A bearer -a ${TOKEN} :8443/v1"

${OPA}/data | jq -e '.result.test_helm_kubernetes_quickstart|keys|length==3'

Expand Down
16 changes: 16 additions & 0 deletions test/e2e/fixture-multi-fail.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
---
kind: ConfigMap
metadata:
name: multi-file-fail-policy
labels:
kube-mgmt/e2e: "true"
openpolicyagent.org/policy: rego
apiVersion: v1
data:
f.rego: |
package my_pkg_fail
import data.my_pkg_fail.functions.my_func
default my_rule := false
my_rule {
my_func(input.hello)
}
19 changes: 19 additions & 0 deletions test/e2e/fixture-multi.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
kind: ConfigMap
metadata:
name: multi-file-policy
labels:
kube-mgmt/e2e: "true"
openpolicyagent.org/policy: rego
apiVersion: v1
data:
a.rego: |
package my_pkg
import data.my_pkg.functions.my_func
default my_rule := false
my_rule {
my_func(input.hello)
}
b.rego: |
package my_pkg.functions
my_func(str) := startswith("world", str)
2 changes: 1 addition & 1 deletion test/e2e/labels/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
set -e
set -x

OPA="http :8080/v1"
OPA="http --ignore-stdin :8080/v1"

${OPA}/data | jq -e '.result.default//{}|keys|length==0'

Expand Down
63 changes: 63 additions & 0 deletions test/e2e/multi/test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#!/bin/bash
set -ex

OPA="http --ignore-stdin :8080/v1"
FX_OK="$(dirname $0)/../fixture-multi.yaml"
FX_KO="$(dirname $0)/../fixture-multi-fail.yaml"

${OPA}/data/my_pkg/my_rule | jq -e '.result==null'

kubectl apply -f ${FX_OK}
sleep 3

${OPA}/policies | jq -e '.result|any(.id=="default/multi-file-policy/a.rego")==true'
${OPA}/policies | jq -e '.result|any(.id=="default/multi-file-policy/b.rego")==true'

kubectl get cm multi-file-policy -ojson | \
jq -e '.metadata.annotations["openpolicyagent.org/kube-mgmt-status"]|fromjson|.status=="ok"'
kubectl get cm multi-file-policy -ojson | \
jq -e '.metadata.annotations["openpolicyagent.org/kube-mgmt-retries"]=="0"'

${OPA}/data/my_pkg/my_rule input[hello]=world | jq -e '.result==true'
${OPA}/data/my_pkg/my_rule input[hello]=incorrect | jq -e '.result==false'

######
#
######

kubectl apply -f ${FX_KO}
sleep 3

${OPA}/policies | jq -e '.result|length==2'

kubectl get cm multi-file-fail-policy -ojson | \
jq -e '.metadata.annotations["openpolicyagent.org/kube-mgmt-status"]|fromjson|.status=="error"'
kubectl get cm multi-file-fail-policy -ojson | \
jq -e '.metadata.annotations["openpolicyagent.org/kube-mgmt-retries"]=="0"'

######
#
######

cat ${FX_OK} | \
yq '.metadata.labels["openpolicyagent.org/policy"]=""' | \
yq '.metadata.annotations["openpolicyagent.org/kube-mgmt-retries"]="0"' | \
kubectl apply -f -
sleep 3

${OPA}/data/my_pkg/my_rule | jq -e '.result==null'

kubectl label --overwrite cm multi-file-policy openpolicyagent.org/policy=rego
sleep 3

${OPA}/policies | jq -e '.result|any(.id=="default/multi-file-policy/a.rego")==true'
${OPA}/policies | jq -e '.result|any(.id=="default/multi-file-policy/b.rego")==true'

kubectl get cm multi-file-policy -ojson | \
jq -e '.metadata.annotations["openpolicyagent.org/kube-mgmt-status"]|fromjson|.status=="ok"'
kubectl get cm multi-file-policy -ojson | \
jq -e '.metadata.annotations["openpolicyagent.org/kube-mgmt-retries"]=="0"'

${OPA}/data/my_pkg/my_rule input[hello]=world | jq -e '.result==true'
${OPA}/data/my_pkg/my_rule input[hello]=incorrect | jq -e '.result==false'

10 changes: 10 additions & 0 deletions test/e2e/multi/values.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
useHttps: false

opa: null

authz:
enabled: false

mgmt:
extraArgs:
- "--log-level=debug"
2 changes: 1 addition & 1 deletion test/e2e/no_https/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
set -e
set -x

OPA="http :8080/v1"
OPA="http --ignore-stdin :8080/v1"

${OPA}/data | jq -e '.result.default//{}|keys|length==0'

Expand Down

0 comments on commit 2ae02bd

Please sign in to comment.