From 6cb0464a4e0e881d8d282dbb63baecbfde6f6b18 Mon Sep 17 00:00:00 2001 From: James Munnelly Date: Tue, 24 Apr 2018 21:44:43 +0100 Subject: [PATCH 1/5] Adjust controller to process one ingress at a time. Use per-item rate limiting workqueue. --- pkg/ingress/ingress.go | 16 ++++----- pkg/kubelego/configure.go | 41 +++++----------------- pkg/kubelego/type.go | 2 +- pkg/kubelego/watch.go | 74 +++++++++++++++++++++++++++++++++------ 4 files changed, 81 insertions(+), 52 deletions(-) diff --git a/pkg/ingress/ingress.go b/pkg/ingress/ingress.go index f53723bf..1893ac0c 100644 --- a/pkg/ingress/ingress.go +++ b/pkg/ingress/ingress.go @@ -52,7 +52,7 @@ func IgnoreIngress(ing *k8sExtensions.Ingress) error { func New(client kubelego.KubeLego, namespace string, name string) *Ingress { ingress := &Ingress{ - exists: true, + Exists: true, kubelego: client, } @@ -66,7 +66,7 @@ func New(client kubelego.KubeLego, namespace string, name string) *Ingress { Name: name, }, } - ingress.exists = false + ingress.Exists = false } else { client.Log().Warn("Error while getting ingress: ", err) @@ -88,7 +88,7 @@ func All(client kubelego.KubeLego) (ingresses []kubelego.Ingress, err error) { ingresses, &Ingress{ IngressApi: &ingSlice.Items[i], - exists: true, + Exists: true, kubelego: client, }, ) @@ -100,7 +100,7 @@ var _ kubelego.Ingress = &Ingress{} type Ingress struct { IngressApi *k8sExtensions.Ingress - exists bool + Exists bool kubelego kubelego.KubeLego } @@ -125,14 +125,14 @@ func (o *Ingress) Save() (err error) { // check if it contains rules if len(o.IngressApi.Spec.Rules) > 0 { - if o.exists { + if o.Exists { obj, err = o.client().Update(o.IngressApi) } else { obj, err = o.client().Create(o.IngressApi) - o.exists = true + o.Exists = true } } else { - if o.exists { + if o.Exists { err = o.client().Delete(o.IngressApi.Namespace, &k8sMeta.DeleteOptions{}) obj = nil } @@ -146,7 +146,7 @@ func (o *Ingress) Save() (err error) { func (i *Ingress) Delete() error { - if i.IngressApi == nil || !i.exists { + if i.IngressApi == nil || !i.Exists { return nil } diff --git a/pkg/kubelego/configure.go b/pkg/kubelego/configure.go index cd6d1015..ba2325d3 100644 --- a/pkg/kubelego/configure.go +++ b/pkg/kubelego/configure.go @@ -1,7 +1,6 @@ package kubelego import ( - "github.com/jetstack/kube-lego/pkg/ingress" "github.com/jetstack/kube-lego/pkg/kubelego_const" "fmt" @@ -53,7 +52,7 @@ func (kl *KubeLego) TlsIgnoreDuplicatedSecrets(tlsSlice []kubelego.Tls) []kubele return output } -func (kl *KubeLego) processProvider(ings []kubelego.Ingress) (err error) { +func (kl *KubeLego) processProvider(ing kubelego.Ingress) (err error) { for providerName, provider := range kl.legoIngressProvider { err := provider.Reset() @@ -62,12 +61,10 @@ func (kl *KubeLego) processProvider(ings []kubelego.Ingress) (err error) { continue } - for _, ing := range ings { - if providerName == ing.IngressProvider() { - err = provider.Process(ing) - if err != nil { - provider.Log().Error(err) - } + if providerName == ing.IngressProvider() { + err = provider.Process(ing) + if err != nil { + provider.Log().Error(err) } } @@ -79,24 +76,13 @@ func (kl *KubeLego) processProvider(ings []kubelego.Ingress) (err error) { return nil } -func (kl *KubeLego) reconfigure(ingressesAll []kubelego.Ingress) error { - tlsSlice := []kubelego.Tls{} - ingresses := []kubelego.Ingress{} - - // filter ingresses, collect tls names - for _, ing := range ingressesAll { - if ing.Ignore() { - continue - } - tlsSlice = append(tlsSlice, ing.Tls()...) - ingresses = append(ingresses, ing) - } - +func (kl *KubeLego) reconfigure(ing kubelego.Ingress) error { // setup providers - kl.processProvider(ingresses) + kl.processProvider(ing) // normify tls config - tlsSlice = kl.TlsIgnoreDuplicatedSecrets(tlsSlice) + // NOTE: this no longer performs a global deduplication + tlsSlice := kl.TlsIgnoreDuplicatedSecrets(ing.Tls()) // process certificate validity kl.Log().Info("process certificate requests for ingresses") @@ -115,15 +101,6 @@ func (kl *KubeLego) reconfigure(ingressesAll []kubelego.Ingress) error { return nil } -func (kl *KubeLego) Reconfigure() error { - ingressesAll, err := ingress.All(kl) - if err != nil { - return err - } - - return kl.reconfigure(ingressesAll) -} - func (kl *KubeLego) TlsProcessHosts(tlsSlice []kubelego.Tls) []error { errs := []error{} for _, tlsElem := range tlsSlice { diff --git a/pkg/kubelego/type.go b/pkg/kubelego/type.go index 06813eec..c72af452 100644 --- a/pkg/kubelego/type.go +++ b/pkg/kubelego/type.go @@ -50,5 +50,5 @@ type KubeLego struct { waitGroup sync.WaitGroup // work queue - workQueue *workqueue.Type + workQueue workqueue.RateLimitingInterface } diff --git a/pkg/kubelego/watch.go b/pkg/kubelego/watch.go index 6913e03a..5b0f7011 100644 --- a/pkg/kubelego/watch.go +++ b/pkg/kubelego/watch.go @@ -27,13 +27,25 @@ func ingressWatchFunc(c *kubernetes.Clientset, ns string) func(options k8sMeta.L } } -func (kl *KubeLego) requestReconfigure() { - kl.workQueue.Add(true) +// requestReconfigure will trigger a resync of *all* ingress resources. +func (kl *KubeLego) requestReconfigure() error { + allIng, err := ingress.All(kl) + if err != nil { + return err + } + for _, ing := range allIng { + key, err := cache.MetaNamespaceKeyFunc(ing.Object) + if err != nil { + return err + } + kl.workQueue.AddRateLimited(key) + } + return nil } func (kl *KubeLego) WatchReconfigure() { - kl.workQueue = workqueue.New() + kl.workQueue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(time.Minute*10, time.Hour*24), "kube-lego") // handle worker shutdown go func() { @@ -49,10 +61,35 @@ func (kl *KubeLego) WatchReconfigure() { if quit { return } - kl.Log().Debugf("worker: begin processing %v", item) - kl.Reconfigure() - kl.Log().Debugf("worker: done processing %v", item) - kl.workQueue.Done(item) + func(item interface{}) { + defer kl.workQueue.Done(item) + key, ok := item.(string) + if !ok { + kl.Log().Errorf("worker: invalid item in workqueue: %v", item) + kl.workQueue.Forget(item) + return + } + name, namespace, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + kl.Log().Errorf("worker: invalid string in workqueue: %s", item) + kl.workQueue.Forget(item) + return + } + kl.Log().Debugf("worker: begin processing %v", key) + ing := ingress.New(kl, namespace, name) + if ing.Exists == false { + kl.Log().Errorf("worker: ingress for key %q no longer exists. Skipping...", key) + kl.workQueue.Forget(item) + return + } + err = kl.reconfigure(ing) + if err != nil { + kl.Log().Errorf("worker: error processing item: %v", err) + return + } + kl.Log().Debugf("worker: done processing %v", key) + kl.workQueue.Forget(item) + }(item) } }() } @@ -61,7 +98,7 @@ func (kl *KubeLego) WatchEvents() { kl.Log().Debugf("start watching ingress objects") - resyncPeriod := 60 * time.Second + resyncPeriod := 10 * time.Minute ingEventHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -70,7 +107,12 @@ func (kl *KubeLego) WatchEvents() { return } kl.Log().Debugf("CREATE ingress/%s/%s", addIng.Namespace, addIng.Name) - kl.workQueue.Add(true) + if key, err := cache.MetaNamespaceKeyFunc(addIng); err != nil { + kl.Log().Errorf("worker: failed to key ingress: %v", err) + return + } else { + kl.workQueue.AddRateLimited(key) + } }, DeleteFunc: func(obj interface{}) { delIng := obj.(*k8sExtensions.Ingress) @@ -78,7 +120,12 @@ func (kl *KubeLego) WatchEvents() { return } kl.Log().Debugf("DELETE ingress/%s/%s", delIng.Namespace, delIng.Name) - kl.workQueue.Add(true) + if key, err := cache.MetaNamespaceKeyFunc(delIng); err != nil { + kl.Log().Errorf("worker: failed to key ingress: %v", err) + return + } else { + kl.workQueue.AddRateLimited(key) + } }, UpdateFunc: func(old, cur interface{}) { oldIng := old.(*k8sExtensions.Ingress) @@ -94,7 +141,12 @@ func (kl *KubeLego) WatchEvents() { return } kl.Log().Debugf("UPDATE ingress/%s/%s", upIng.Namespace, upIng.Name) - kl.workQueue.Add(true) + if key, err := cache.MetaNamespaceKeyFunc(upIng); err != nil { + kl.Log().Errorf("worker: failed to key ingress: %v", err) + return + } else { + kl.workQueue.AddRateLimited(key) + } } }, } From 38ae381a568606fc890e80afe1ad0a6a906d910e Mon Sep 17 00:00:00 2001 From: James Munnelly Date: Tue, 24 Apr 2018 21:50:20 +0100 Subject: [PATCH 2/5] Remove erroneous call to workQueue.Add --- pkg/kubelego/configure.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pkg/kubelego/configure.go b/pkg/kubelego/configure.go index ba2325d3..5d375473 100644 --- a/pkg/kubelego/configure.go +++ b/pkg/kubelego/configure.go @@ -93,9 +93,6 @@ func (kl *KubeLego) reconfigure(ing kubelego.Ingress) error { errsStr = append(errsStr, fmt.Sprintf("%s", err)) } kl.Log().Error("Error while processing certificate requests: ", strings.Join(errsStr, ", ")) - - // request a rerun of reconfigure - kl.workQueue.Add(true) } return nil From 1e37ef3830b00163e3d081a931f0bc19acd02113 Mon Sep 17 00:00:00 2001 From: James Munnelly Date: Tue, 24 Apr 2018 23:05:19 +0100 Subject: [PATCH 3/5] Queue ingress immediately on CREATE --- pkg/kubelego/watch.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/kubelego/watch.go b/pkg/kubelego/watch.go index 5b0f7011..39c5d196 100644 --- a/pkg/kubelego/watch.go +++ b/pkg/kubelego/watch.go @@ -111,7 +111,8 @@ func (kl *KubeLego) WatchEvents() { kl.Log().Errorf("worker: failed to key ingress: %v", err) return } else { - kl.workQueue.AddRateLimited(key) + kl.Log().Infof("Queued item %q to be processed immediately", key) + kl.workQueue.Add(key) } }, DeleteFunc: func(obj interface{}) { @@ -124,6 +125,7 @@ func (kl *KubeLego) WatchEvents() { kl.Log().Errorf("worker: failed to key ingress: %v", err) return } else { + kl.Log().Infof("Queued item %q to be processed (delay of 10m)", key) kl.workQueue.AddRateLimited(key) } }, @@ -145,7 +147,8 @@ func (kl *KubeLego) WatchEvents() { kl.Log().Errorf("worker: failed to key ingress: %v", err) return } else { - kl.workQueue.AddRateLimited(key) + kl.Log().Infof("Detected deleted ingress %q - skipping", key) + // kl.workQueue.Add(key) } } }, From 61acbfd68ba61427066bafc0e275f9580ee778f6 Mon Sep 17 00:00:00 2001 From: James Munnelly Date: Tue, 24 Apr 2018 23:35:15 +0100 Subject: [PATCH 4/5] Only apply rate limit upon failure. Only sync ingresses when their spec changes. --- pkg/kubelego/watch.go | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/pkg/kubelego/watch.go b/pkg/kubelego/watch.go index 39c5d196..2643f17c 100644 --- a/pkg/kubelego/watch.go +++ b/pkg/kubelego/watch.go @@ -76,18 +76,27 @@ func (kl *KubeLego) WatchReconfigure() { return } kl.Log().Debugf("worker: begin processing %v", key) + // attempt to get an internal ingress type. ing := ingress.New(kl, namespace, name) + // if it doesn't exist for some reason, exit here and forget the + // item from the workqueue. if ing.Exists == false { kl.Log().Errorf("worker: ingress for key %q no longer exists. Skipping...", key) kl.workQueue.Forget(item) return } + // attempt to process the ingress err = kl.reconfigure(ing) if err != nil { - kl.Log().Errorf("worker: error processing item: %v", err) + kl.Log().Errorf("worker: error processing item, requeuing after rate limit: %v", err) + // we requeue the item and skip calling Forget here to ensure + // a rate limit is applied when adding the item after a failure + kl.workQueue.AddRateLimited(key) return } kl.Log().Debugf("worker: done processing %v", key) + // as this validation was a success, we should forget the item from + // the workqueue. kl.workQueue.Forget(item) }(item) } @@ -112,6 +121,9 @@ func (kl *KubeLego) WatchEvents() { return } else { kl.Log().Infof("Queued item %q to be processed immediately", key) + // immediately queue creation events. + // if we called AddRateLimited here, we would initially wait 10m + // before processing anything at all. kl.workQueue.Add(key) } }, @@ -125,8 +137,10 @@ func (kl *KubeLego) WatchEvents() { kl.Log().Errorf("worker: failed to key ingress: %v", err) return } else { - kl.Log().Infof("Queued item %q to be processed (delay of 10m)", key) - kl.workQueue.AddRateLimited(key) + kl.Log().Infof("Detected deleted ingress %q - skipping", key) + // skip processing deleted items, as there is no reason to due to + // the way kube-lego serialises authorization attempts + // kl.workQueue.AddRateLimited(key) } }, UpdateFunc: func(old, cur interface{}) { @@ -137,7 +151,10 @@ func (kl *KubeLego) WatchEvents() { oldIng.ResourceVersion = "" upIng.ResourceVersion = "" - if !reflect.DeepEqual(oldIng, upIng) { + // we requeue ingresses only when their spec has changed, as the indicates + // a user has updated the specification of their ingress and as such we should + // re-trigger a validation if required. + if !reflect.DeepEqual(oldIng.Spec, upIng.Spec) { upIng := cur.(*k8sExtensions.Ingress) if ingress.IgnoreIngress(upIng) != nil { return @@ -147,8 +164,10 @@ func (kl *KubeLego) WatchEvents() { kl.Log().Errorf("worker: failed to key ingress: %v", err) return } else { - kl.Log().Infof("Detected deleted ingress %q - skipping", key) - // kl.workQueue.Add(key) + kl.Log().Infof("Detected spec change - queued ingress %q to be processed", key) + // immediately queue the item, as its spec has changed so it may now + // be valid + kl.workQueue.Add(key) } } }, From b75fb1dfd9b313e70bf7b163c50aaa6ede6851c2 Mon Sep 17 00:00:00 2001 From: James Munnelly Date: Tue, 1 May 2018 12:32:22 +0200 Subject: [PATCH 5/5] Address review comments --- pkg/kubelego/configure.go | 28 ++++++++++++++++------------ pkg/kubelego/watch.go | 33 ++++++++++++++++++++++++++++----- 2 files changed, 44 insertions(+), 17 deletions(-) diff --git a/pkg/kubelego/configure.go b/pkg/kubelego/configure.go index 5d375473..676ec96f 100644 --- a/pkg/kubelego/configure.go +++ b/pkg/kubelego/configure.go @@ -2,6 +2,7 @@ package kubelego import ( "github.com/jetstack/kube-lego/pkg/kubelego_const" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "fmt" "strings" @@ -53,32 +54,37 @@ func (kl *KubeLego) TlsIgnoreDuplicatedSecrets(tlsSlice []kubelego.Tls) []kubele } func (kl *KubeLego) processProvider(ing kubelego.Ingress) (err error) { - + var errs []error for providerName, provider := range kl.legoIngressProvider { err := provider.Reset() if err != nil { - provider.Log().Error(err) - continue + errs = append(errs, err) } if providerName == ing.IngressProvider() { err = provider.Process(ing) if err != nil { - provider.Log().Error(err) + errs = append(errs, err) } } err = provider.Finalize() if err != nil { - provider.Log().Error(err) + errs = append(errs, err) } } - return nil + return utilerrors.NewAggregate(errs) } func (kl *KubeLego) reconfigure(ing kubelego.Ingress) error { + if ing.Ignore() { + return nil + } // setup providers - kl.processProvider(ing) + err := kl.processProvider(ing) + if err != nil { + return err + } // normify tls config // NOTE: this no longer performs a global deduplication @@ -88,11 +94,9 @@ func (kl *KubeLego) reconfigure(ing kubelego.Ingress) error { kl.Log().Info("process certificate requests for ingresses") errs := kl.TlsProcessHosts(tlsSlice) if len(errs) > 0 { - errsStr := []string{} - for _, err := range errs { - errsStr = append(errsStr, fmt.Sprintf("%s", err)) - } - kl.Log().Error("Error while processing certificate requests: ", strings.Join(errsStr, ", ")) + err := utilerrors.NewAggregate(errs) + kl.Log().Errorf("Error while processing certificate requests: %v", err) + return err } return nil diff --git a/pkg/kubelego/watch.go b/pkg/kubelego/watch.go index 2643f17c..d89f8184 100644 --- a/pkg/kubelego/watch.go +++ b/pkg/kubelego/watch.go @@ -5,6 +5,7 @@ import ( "time" "github.com/jetstack/kube-lego/pkg/ingress" + klconst "github.com/jetstack/kube-lego/pkg/kubelego_const" k8sMeta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -71,7 +72,7 @@ func (kl *KubeLego) WatchReconfigure() { } name, namespace, err := cache.SplitMetaNamespaceKey(key) if err != nil { - kl.Log().Errorf("worker: invalid string in workqueue: %s", item) + kl.Log().Errorf("worker: invalid string in workqueue %q: %v", item, err) kl.workQueue.Forget(item) return } @@ -141,20 +142,24 @@ func (kl *KubeLego) WatchEvents() { // skip processing deleted items, as there is no reason to due to // the way kube-lego serialises authorization attempts // kl.workQueue.AddRateLimited(key) + kl.workQueue.Forget(key) } }, UpdateFunc: func(old, cur interface{}) { oldIng := old.(*k8sExtensions.Ingress) upIng := cur.(*k8sExtensions.Ingress) - //ignore resource version in equality check - oldIng.ResourceVersion = "" - upIng.ResourceVersion = "" + shouldForceProcess := anyDifferent(oldIng.Annotations, upIng.Annotations, + klconst.AnnotationIngressClass, + klconst.AnnotationIngressProvider, + klconst.AnnotationKubeLegoManaged, + klconst.AnnotationSslRedirect, + klconst.AnnotationWhitelistSourceRange) // we requeue ingresses only when their spec has changed, as the indicates // a user has updated the specification of their ingress and as such we should // re-trigger a validation if required. - if !reflect.DeepEqual(oldIng.Spec, upIng.Spec) { + if !reflect.DeepEqual(oldIng.Spec, upIng.Spec) || shouldForceProcess { upIng := cur.(*k8sExtensions.Ingress) if ingress.IgnoreIngress(upIng) != nil { return @@ -185,3 +190,21 @@ func (kl *KubeLego) WatchEvents() { go controller.Run(kl.stopCh) } + +// anyDifferent returns true if any of the keys passed are different in the given +// map. +func anyDifferent(left, right map[string]string, keys ...string) bool { + // if either left or right are nil, and the other isn't, then + // return true to kick off re-processing + if left == nil && right == nil || + left == nil && right != nil || + left != nil && right == nil { + return true + } + for _, k := range keys { + if left[k] != right[k] { + return true + } + } + return false +}