Skip to content

Commit

Permalink
⚡ prometheus and improvments (#24)
Browse files Browse the repository at this point in the history
modified:   controllers/pod_controller.go; modified:   controllers/pod_controller_functions.go; modified:   pkg/cache/rediscache/rediscache.go
  • Loading branch information
amitai-devops authored Dec 24, 2022
1 parent 59a8e5b commit ffd9a60
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 54 deletions.
89 changes: 60 additions & 29 deletions controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/jatalocks/kube-reqsizer/pkg/cache/localcache"
"github.com/jatalocks/kube-reqsizer/pkg/cache/rediscache"
"github.com/jatalocks/kube-reqsizer/types"
"github.com/prometheus/client_golang/prometheus"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -36,6 +37,7 @@ import (

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)

// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;update;patch
Expand Down Expand Up @@ -66,64 +68,90 @@ const (
operatorModeAnnotation = "reqsizer.jatalocks.github.io/mode"
)

var (
cpuOffset = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "cpu_offset",
Help: "Number of milli-cores that have been increased/removed since startup",
},
)
memoryOffset = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "memory_offset",
Help: "Number of megabits that have been increased/removed since startup",
},
)
cacheSize = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "cache_size",
Help: "Number of pod controllers currently in cache",
},
)
)

func init() {
// Register custom metrics with the global prometheus registry
metrics.Registry.MustRegister(cpuOffset, memoryOffset, cacheSize)
}

func cacheKeyFunc(obj interface{}) (string, error) {
return obj.(types.PodRequests).Name + "-" + obj.(types.PodRequests).Namespace, nil
return obj.(types.PodRequests).Name, nil
}

var cacheStore = cache.NewStore(cacheKeyFunc)

// Reconcile handles a reconciliation request for a Pod.
func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := r.Log.WithValues("pod", req.NamespacedName)

if r.EnablePersistence {
cacheSize.Set(float64(r.RedisClient.CacheSize()))
} else {
cacheSize.Set(float64(len(cacheStore.List())))
}
/*
Step 0: Fetch the Pod from the Kubernetes API.
*/

var pod corev1.Pod

if err := r.Get(ctx, req.NamespacedName, &pod); err != nil {
if apierrors.IsNotFound(err) {
// we'll ignore not-found errors, since they can't be fixed by an immediate
// requeue (we'll need to wait for a new notification), and we can get them
// on deleted requests.
return ctrl.Result{}, nil
}
log.Error(nil, "unable to fetch Pod")
return ctrl.Result{}, err
}

annotation, err := r.NamespaceOrPodHaveAnnotation(pod, ctx)
podReferenceName := r.GetPodCacheName(&pod) + "-" + pod.Namespace
annotation, err := r.NamespaceOrPodHaveAnnotation(&pod, ctx)
if err != nil {
log.Error(nil, "failed to get annotations")
return ctrl.Result{}, err
}
ignoreAnnotation, err := r.NamespaceOrPodHaveIgnoreAnnotation(pod, ctx)
ignoreAnnotation, err := r.NamespaceOrPodHaveIgnoreAnnotation(&pod, ctx)
if err != nil {
log.Error(nil, "failed to get annotations")
return ctrl.Result{}, err
}

if ((!r.EnableAnnotation) || (r.EnableAnnotation && annotation)) && !ignoreAnnotation {
log.Info("Cache Reference Name: " + podReferenceName)

data, err := r.ClientSet.RESTClient().Get().AbsPath(fmt.Sprintf("apis/metrics.k8s.io/v1beta1/namespaces/%v/pods/%v", pod.Namespace, pod.Name)).DoRaw(ctx)

if err != nil {
log.Error(nil, "failed to get stats from pod")
return ctrl.Result{}, err
}
PodUsageData := GeneratePodRequestsObjectFromRestData(data)
err, _, _, deploymentName := r.GetPodParentKind(pod, ctx)
if err != nil {
deploymentName = pod.Name
}
SumPodRequest := types.PodRequests{Name: deploymentName, Namespace: pod.Namespace, ContainerRequests: []types.ContainerRequests{}}
SumPodRequest := types.PodRequests{Name: podReferenceName, Namespace: pod.Namespace, ContainerRequests: []types.ContainerRequests{}}

SumPodRequest.ContainerRequests = PodUsageData.ContainerRequests
var LatestPodRequest types.PodRequests
if r.EnablePersistence {
LatestPodRequest, err = r.RedisClient.FetchFromCache(deploymentName + "-" + pod.Namespace)
LatestPodRequest, err = r.RedisClient.FetchFromCache(podReferenceName)
} else {
LatestPodRequest, err = localcache.FetchFromCache(cacheStore, deploymentName+"-"+pod.Namespace)
LatestPodRequest, err = localcache.FetchFromCache(cacheStore, podReferenceName)
}

if err != nil {
SumPodRequest.Sample = 0
log.Info(fmt.Sprint("Adding cache sample ", SumPodRequest.Sample))
Expand Down Expand Up @@ -184,7 +212,6 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
}
}
}
log.Info(fmt.Sprint(SumPodRequest))
if (SumPodRequest.Sample >= r.SampleSize) && r.MinimumUptimeOfPodInParent(pod, ctx) {
log.Info("Sample Size and Minimum Time have been reached")
PodChange := false
Expand Down Expand Up @@ -216,28 +243,34 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
case "average":
if r.ValidateCPU(currentC.CPU, AverageUsageCPU) {
pod.Spec.Containers[i].Resources.Requests[v1.ResourceCPU] = resource.MustParse(fmt.Sprintf("%dm", int(float64(AverageUsageCPU)*r.CPUFactor)))
cpuOffset.Add(float64(int(float64(AverageUsageCPU)*r.CPUFactor) - int(currentC.CPU)))
PodChange = true
}
if r.ValidateMemory(currentC.Memory, AverageUsageMemory) {
pod.Spec.Containers[i].Resources.Requests[v1.ResourceMemory] = resource.MustParse(fmt.Sprintf("%dMi", int(float64(AverageUsageMemory)*r.MemoryFactor)))
memoryOffset.Add(float64(int(float64(AverageUsageMemory)*r.MemoryFactor) - int(currentC.Memory)))
PodChange = true
}
case "min":
if r.ValidateCPU(currentC.CPU, c.MinCPU) {
pod.Spec.Containers[i].Resources.Requests[v1.ResourceCPU] = resource.MustParse(fmt.Sprintf("%dm", int(float64(c.MinCPU)*r.CPUFactor)))
cpuOffset.Add(float64(int(float64(c.MinCPU)*r.CPUFactor) - int(currentC.CPU)))
PodChange = true
}
if r.ValidateMemory(currentC.Memory, c.MinMemory) {
pod.Spec.Containers[i].Resources.Requests[v1.ResourceMemory] = resource.MustParse(fmt.Sprintf("%dMi", int(float64(c.MinMemory)*r.MemoryFactor)))
memoryOffset.Add(float64(int(float64(c.MinMemory)*r.MemoryFactor) - int(currentC.Memory)))
PodChange = true
}
case "max":
if r.ValidateCPU(currentC.CPU, c.MaxCPU) {
pod.Spec.Containers[i].Resources.Requests[v1.ResourceCPU] = resource.MustParse(fmt.Sprintf("%dm", int(float64(c.MaxCPU)*r.CPUFactor)))
cpuOffset.Add(float64(int(float64(c.MaxCPU)*r.CPUFactor) - int(currentC.CPU)))
PodChange = true
}
if r.ValidateMemory(currentC.Memory, c.MaxMemory) {
pod.Spec.Containers[i].Resources.Requests[v1.ResourceMemory] = resource.MustParse(fmt.Sprintf("%dMi", int(float64(c.MaxMemory)*r.MemoryFactor)))
memoryOffset.Add(float64(int(float64(c.MaxMemory)*r.MemoryFactor) - int(currentC.Memory)))
PodChange = true
}
}
Expand All @@ -249,6 +282,15 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
}
}
}
if r.EnablePersistence {
if err := r.RedisClient.DeleteFromCache(SumPodRequest); err != nil {
log.Error(err, err.Error())
}
} else {
if err := localcache.DeleteFromCache(cacheStore, LatestPodRequest); err != nil {
log.Error(err, err.Error())
}
}
if PodChange {
pod.Annotations["reqsizer.jatalocks.github.io/changed"] = "true"

Expand All @@ -267,20 +309,9 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
UpdatePodController(podSpec, Requests, ctx)

return r.UpdateKubeObject(deployment.(client.Object), ctx)

}

if r.EnablePersistence {
if err := r.RedisClient.DeleteFromCache(SumPodRequest); err != nil {
log.Error(err, err.Error())
}
} else {
if err := localcache.DeleteFromCache(cacheStore, LatestPodRequest); err != nil {
log.Error(err, err.Error())
}
}
}
}

return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}
79 changes: 56 additions & 23 deletions controllers/pod_controller_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ import (
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
)

func (r *PodReconciler) NamespaceOrPodHaveAnnotation(pod corev1.Pod, ctx context.Context) (bool, error) {
func (r *PodReconciler) NamespaceOrPodHaveAnnotation(pod *corev1.Pod, ctx context.Context) (bool, error) {
podHasAnnotation := pod.Annotations[operatorAnnotation] == "true"
namespace, err := r.ClientSet.CoreV1().Namespaces().Get(ctx, pod.Namespace, metav1.GetOptions{})
if err != nil {
Expand All @@ -30,7 +31,7 @@ func (r *PodReconciler) NamespaceOrPodHaveAnnotation(pod corev1.Pod, ctx context
return (podHasAnnotation || namespaceHasAnnotation), nil
}

func (r *PodReconciler) NamespaceOrPodHaveIgnoreAnnotation(pod corev1.Pod, ctx context.Context) (bool, error) {
func (r *PodReconciler) NamespaceOrPodHaveIgnoreAnnotation(pod *corev1.Pod, ctx context.Context) (bool, error) {
podHasIgnoreAnnotation := pod.Annotations[operatorAnnotation] == "false"
namespace, err := r.ClientSet.CoreV1().Namespaces().Get(ctx, pod.Namespace, metav1.GetOptions{})
if err != nil {
Expand Down Expand Up @@ -68,7 +69,7 @@ func (r *PodReconciler) UpdateKubeObject(pod client.Object, ctx context.Context)
if apierrors.IsConflict(err) {
// The Pod has been updated since we read it.
// Requeue the Pod to try to reconciliate again.
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}
if apierrors.IsNotFound(err) {
// The Pod has been deleted since we read it.
Expand All @@ -78,7 +79,7 @@ func (r *PodReconciler) UpdateKubeObject(pod client.Object, ctx context.Context)
log.Error(err, "unable to update pod")
return ctrl.Result{}, err
}
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}

func UpdatePodController(podspec *corev1.PodSpec, Requests []types.NewContainerRequests, ctx context.Context) {
Expand Down Expand Up @@ -190,11 +191,22 @@ func (r *PodReconciler) MinimumUptimeOfPodInParent(pod corev1.Pod, ctx context.C
}
err, _, _, deploymentName := r.GetPodParentKind(pod, ctx)
if err != nil {
log.Error(err)
return false
}

// Create the label selector
labelSelector := labels.Set{
"app": deploymentName,
"app.kubernetes.io/name": deploymentName,
"app.kubernetes.io/instance": deploymentName,
"app.kubernetes.io/component": deploymentName,
}

options := metav1.ListOptions{
LabelSelector: "app=" + deploymentName,
LabelSelector: labelSelector.AsSelector().String(),
}

podList, _ := r.ClientSet.CoreV1().Pods(pod.Namespace).List(ctx, options)
// List() returns a pointer to slice, derefernce it, before iterating
for _, podInfo := range (*podList).Items {
Expand All @@ -207,26 +219,47 @@ func (r *PodReconciler) MinimumUptimeOfPodInParent(pod corev1.Pod, ctx context.C
}

func (r *PodReconciler) GetPodParentKind(pod corev1.Pod, ctx context.Context) (error, *v1.PodSpec, interface{}, string) {
switch pod.OwnerReferences[0].Kind {
case "ReplicaSet":
replica, err := r.ClientSet.AppsV1().ReplicaSets(pod.Namespace).Get(ctx, pod.OwnerReferences[0].Name, metav1.GetOptions{})
if err != nil {
log.Error(err, err.Error())
return err, nil, nil, ""
}
deployment, err := r.ClientSet.AppsV1().Deployments(pod.Namespace).Get(ctx, replica.OwnerReferences[0].Name, metav1.GetOptions{})
if replica.OwnerReferences[0].Kind == "Deployment" {
if len(pod.OwnerReferences) > 0 {
switch pod.OwnerReferences[0].Kind {
case "ReplicaSet":
replica, err := r.ClientSet.AppsV1().ReplicaSets(pod.Namespace).Get(ctx, pod.OwnerReferences[0].Name, metav1.GetOptions{})
if err != nil {
log.Error(err, err.Error())
return err, nil, nil, ""
}
deployment, err := r.ClientSet.AppsV1().Deployments(pod.Namespace).Get(ctx, replica.OwnerReferences[0].Name, metav1.GetOptions{})
if replica.OwnerReferences[0].Kind == "Deployment" {
return err, &deployment.Spec.Template.Spec, deployment, deployment.Name
} else {
return errors.New("Is Owned by Unknown CRD"), nil, nil, ""
}
case "DaemonSet":
deployment, err := r.ClientSet.AppsV1().DaemonSets(pod.Namespace).Get(ctx, pod.OwnerReferences[0].Name, metav1.GetOptions{})
return err, &deployment.Spec.Template.Spec, deployment, deployment.Name
} else {
case "StatefulSet":
deployment, err := r.ClientSet.AppsV1().StatefulSets(pod.Namespace).Get(ctx, pod.OwnerReferences[0].Name, metav1.GetOptions{})
return err, &deployment.Spec.Template.Spec, deployment, deployment.Name
default:
return errors.New("Is Owned by Unknown CRD"), nil, nil, ""
}
case "DaemonSet":
deployment, err := r.ClientSet.AppsV1().DaemonSets(pod.Namespace).Get(ctx, pod.OwnerReferences[0].Kind, metav1.GetOptions{})
return err, &deployment.Spec.Template.Spec, deployment, deployment.Name
case "StatefulSet":
deployment, err := r.ClientSet.AppsV1().StatefulSets(pod.Namespace).Get(ctx, pod.OwnerReferences[0].Kind, metav1.GetOptions{})
return err, &deployment.Spec.Template.Spec, deployment, deployment.Name
default:
return errors.New("Is Owned by Unknown CRD"), nil, nil, ""
} else {
return errors.New("Pod Has No Owner"), nil, nil, ""
}
}

func (r *PodReconciler) GetPodCacheName(pod *corev1.Pod) string {
val, ok := pod.Labels["app"]
if !ok {
val, ok = pod.Labels["app.kubernetes.io/name"]
if !ok {
val, ok = pod.Labels["app.kubernetes.io/instance"]
if !ok {
val, ok = pod.Labels["app.kubernetes.io/component"]
if !ok {
val = strings.Split(pod.Name, "-")[0]
}
}
}
}
return val
}
4 changes: 2 additions & 2 deletions pkg/cache/rediscache/rediscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (client RedisClient) AddToCache(object types.PodRequests) error {
klog.Errorf("failed to add key value to cache error", err)
return err
}
err = client.Client.Set(object.Name+"-"+object.Namespace, val, 0).Err()
err = client.Client.Set(object.Name, val, 0).Err()
if err != nil {
klog.Errorf("failed to add key value to cache error", err)
return err
Expand All @@ -41,7 +41,7 @@ func (client RedisClient) FetchFromCache(key string) (types.PodRequests, error)
}

func (client RedisClient) DeleteFromCache(object types.PodRequests) error {
return client.Client.Del(object.Name + "-" + object.Namespace).Err()
return client.Client.Del(object.Name).Err()
}

func (client RedisClient) CacheSize() int64 {
Expand Down

0 comments on commit ffd9a60

Please sign in to comment.