Skip to content
This repository has been archived by the owner on Aug 26, 2021. It is now read-only.

Commit

Permalink
Merge pull request #329 from jetstack/fix-workqueue
Browse files Browse the repository at this point in the history
Adjust controller to process one ingress at a time. Use per-item rate limiting workqueue.
  • Loading branch information
munnerz authored May 8, 2018
2 parents 119f8db + b75fb1d commit 87ffd39
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 69 deletions.
16 changes: 8 additions & 8 deletions pkg/ingress/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func IgnoreIngress(ing *k8sExtensions.Ingress) error {

func New(client kubelego.KubeLego, namespace string, name string) *Ingress {
ingress := &Ingress{
exists: true,
Exists: true,
kubelego: client,
}

Expand All @@ -66,7 +66,7 @@ func New(client kubelego.KubeLego, namespace string, name string) *Ingress {
Name: name,
},
}
ingress.exists = false
ingress.Exists = false

} else {
client.Log().Warn("Error while getting ingress: ", err)
Expand All @@ -88,7 +88,7 @@ func All(client kubelego.KubeLego) (ingresses []kubelego.Ingress, err error) {
ingresses,
&Ingress{
IngressApi: &ingSlice.Items[i],
exists: true,
Exists: true,
kubelego: client,
},
)
Expand All @@ -100,7 +100,7 @@ var _ kubelego.Ingress = &Ingress{}

type Ingress struct {
IngressApi *k8sExtensions.Ingress
exists bool
Exists bool
kubelego kubelego.KubeLego
}

Expand All @@ -125,14 +125,14 @@ func (o *Ingress) Save() (err error) {

// check if it contains rules
if len(o.IngressApi.Spec.Rules) > 0 {
if o.exists {
if o.Exists {
obj, err = o.client().Update(o.IngressApi)
} else {
obj, err = o.client().Create(o.IngressApi)
o.exists = true
o.Exists = true
}
} else {
if o.exists {
if o.Exists {
err = o.client().Delete(o.IngressApi.Namespace, &k8sMeta.DeleteOptions{})
obj = nil
}
Expand All @@ -146,7 +146,7 @@ func (o *Ingress) Save() (err error) {

func (i *Ingress) Delete() error {

if i.IngressApi == nil || !i.exists {
if i.IngressApi == nil || !i.Exists {
return nil
}

Expand Down
66 changes: 22 additions & 44 deletions pkg/kubelego/configure.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package kubelego

import (
"github.com/jetstack/kube-lego/pkg/ingress"
"github.com/jetstack/kube-lego/pkg/kubelego_const"
utilerrors "k8s.io/apimachinery/pkg/util/errors"

"fmt"
"strings"
Expand Down Expand Up @@ -53,75 +53,53 @@ func (kl *KubeLego) TlsIgnoreDuplicatedSecrets(tlsSlice []kubelego.Tls) []kubele
return output
}

func (kl *KubeLego) processProvider(ings []kubelego.Ingress) (err error) {

func (kl *KubeLego) processProvider(ing kubelego.Ingress) (err error) {
var errs []error
for providerName, provider := range kl.legoIngressProvider {
err := provider.Reset()
if err != nil {
provider.Log().Error(err)
continue
errs = append(errs, err)
}

for _, ing := range ings {
if providerName == ing.IngressProvider() {
err = provider.Process(ing)
if err != nil {
provider.Log().Error(err)
}
if providerName == ing.IngressProvider() {
err = provider.Process(ing)
if err != nil {
errs = append(errs, err)
}
}

err = provider.Finalize()
if err != nil {
provider.Log().Error(err)
errs = append(errs, err)
}
}
return nil
return utilerrors.NewAggregate(errs)
}

func (kl *KubeLego) reconfigure(ingressesAll []kubelego.Ingress) error {
tlsSlice := []kubelego.Tls{}
ingresses := []kubelego.Ingress{}

// filter ingresses, collect tls names
for _, ing := range ingressesAll {
if ing.Ignore() {
continue
}
tlsSlice = append(tlsSlice, ing.Tls()...)
ingresses = append(ingresses, ing)
func (kl *KubeLego) reconfigure(ing kubelego.Ingress) error {
if ing.Ignore() {
return nil
}

// setup providers
kl.processProvider(ingresses)
err := kl.processProvider(ing)
if err != nil {
return err
}

// normify tls config
tlsSlice = kl.TlsIgnoreDuplicatedSecrets(tlsSlice)
// NOTE: this no longer performs a global deduplication
tlsSlice := kl.TlsIgnoreDuplicatedSecrets(ing.Tls())

// process certificate validity
kl.Log().Info("process certificate requests for ingresses")
errs := kl.TlsProcessHosts(tlsSlice)
if len(errs) > 0 {
errsStr := []string{}
for _, err := range errs {
errsStr = append(errsStr, fmt.Sprintf("%s", err))
}
kl.Log().Error("Error while processing certificate requests: ", strings.Join(errsStr, ", "))

// request a rerun of reconfigure
kl.workQueue.Add(true)
}

return nil
}

func (kl *KubeLego) Reconfigure() error {
ingressesAll, err := ingress.All(kl)
if err != nil {
err := utilerrors.NewAggregate(errs)
kl.Log().Errorf("Error while processing certificate requests: %v", err)
return err
}

return kl.reconfigure(ingressesAll)
return nil
}

func (kl *KubeLego) TlsProcessHosts(tlsSlice []kubelego.Tls) []error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelego/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,5 @@ type KubeLego struct {
waitGroup sync.WaitGroup

// work queue
workQueue *workqueue.Type
workQueue workqueue.RateLimitingInterface
}
129 changes: 113 additions & 16 deletions pkg/kubelego/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/jetstack/kube-lego/pkg/ingress"
klconst "github.com/jetstack/kube-lego/pkg/kubelego_const"

k8sMeta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -27,13 +28,25 @@ func ingressWatchFunc(c *kubernetes.Clientset, ns string) func(options k8sMeta.L
}
}

func (kl *KubeLego) requestReconfigure() {
kl.workQueue.Add(true)
// requestReconfigure will trigger a resync of *all* ingress resources.
func (kl *KubeLego) requestReconfigure() error {
allIng, err := ingress.All(kl)
if err != nil {
return err
}
for _, ing := range allIng {
key, err := cache.MetaNamespaceKeyFunc(ing.Object)
if err != nil {
return err
}
kl.workQueue.AddRateLimited(key)
}
return nil
}

func (kl *KubeLego) WatchReconfigure() {

kl.workQueue = workqueue.New()
kl.workQueue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(time.Minute*10, time.Hour*24), "kube-lego")

// handle worker shutdown
go func() {
Expand All @@ -49,10 +62,44 @@ func (kl *KubeLego) WatchReconfigure() {
if quit {
return
}
kl.Log().Debugf("worker: begin processing %v", item)
kl.Reconfigure()
kl.Log().Debugf("worker: done processing %v", item)
kl.workQueue.Done(item)
func(item interface{}) {
defer kl.workQueue.Done(item)
key, ok := item.(string)
if !ok {
kl.Log().Errorf("worker: invalid item in workqueue: %v", item)
kl.workQueue.Forget(item)
return
}
name, namespace, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
kl.Log().Errorf("worker: invalid string in workqueue %q: %v", item, err)
kl.workQueue.Forget(item)
return
}
kl.Log().Debugf("worker: begin processing %v", key)
// attempt to get an internal ingress type.
ing := ingress.New(kl, namespace, name)
// if it doesn't exist for some reason, exit here and forget the
// item from the workqueue.
if ing.Exists == false {
kl.Log().Errorf("worker: ingress for key %q no longer exists. Skipping...", key)
kl.workQueue.Forget(item)
return
}
// attempt to process the ingress
err = kl.reconfigure(ing)
if err != nil {
kl.Log().Errorf("worker: error processing item, requeuing after rate limit: %v", err)
// we requeue the item and skip calling Forget here to ensure
// a rate limit is applied when adding the item after a failure
kl.workQueue.AddRateLimited(key)
return
}
kl.Log().Debugf("worker: done processing %v", key)
// as this validation was a success, we should forget the item from
// the workqueue.
kl.workQueue.Forget(item)
}(item)
}
}()
}
Expand All @@ -61,7 +108,7 @@ func (kl *KubeLego) WatchEvents() {

kl.Log().Debugf("start watching ingress objects")

resyncPeriod := 60 * time.Second
resyncPeriod := 10 * time.Minute

ingEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
Expand All @@ -70,31 +117,63 @@ func (kl *KubeLego) WatchEvents() {
return
}
kl.Log().Debugf("CREATE ingress/%s/%s", addIng.Namespace, addIng.Name)
kl.workQueue.Add(true)
if key, err := cache.MetaNamespaceKeyFunc(addIng); err != nil {
kl.Log().Errorf("worker: failed to key ingress: %v", err)
return
} else {
kl.Log().Infof("Queued item %q to be processed immediately", key)
// immediately queue creation events.
// if we called AddRateLimited here, we would initially wait 10m
// before processing anything at all.
kl.workQueue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
delIng := obj.(*k8sExtensions.Ingress)
if ingress.IgnoreIngress(delIng) != nil {
return
}
kl.Log().Debugf("DELETE ingress/%s/%s", delIng.Namespace, delIng.Name)
kl.workQueue.Add(true)
if key, err := cache.MetaNamespaceKeyFunc(delIng); err != nil {
kl.Log().Errorf("worker: failed to key ingress: %v", err)
return
} else {
kl.Log().Infof("Detected deleted ingress %q - skipping", key)
// skip processing deleted items, as there is no reason to due to
// the way kube-lego serialises authorization attempts
// kl.workQueue.AddRateLimited(key)
kl.workQueue.Forget(key)
}
},
UpdateFunc: func(old, cur interface{}) {
oldIng := old.(*k8sExtensions.Ingress)
upIng := cur.(*k8sExtensions.Ingress)

//ignore resource version in equality check
oldIng.ResourceVersion = ""
upIng.ResourceVersion = ""

if !reflect.DeepEqual(oldIng, upIng) {
shouldForceProcess := anyDifferent(oldIng.Annotations, upIng.Annotations,
klconst.AnnotationIngressClass,
klconst.AnnotationIngressProvider,
klconst.AnnotationKubeLegoManaged,
klconst.AnnotationSslRedirect,
klconst.AnnotationWhitelistSourceRange)

// we requeue ingresses only when their spec has changed, as the indicates
// a user has updated the specification of their ingress and as such we should
// re-trigger a validation if required.
if !reflect.DeepEqual(oldIng.Spec, upIng.Spec) || shouldForceProcess {
upIng := cur.(*k8sExtensions.Ingress)
if ingress.IgnoreIngress(upIng) != nil {
return
}
kl.Log().Debugf("UPDATE ingress/%s/%s", upIng.Namespace, upIng.Name)
kl.workQueue.Add(true)
if key, err := cache.MetaNamespaceKeyFunc(upIng); err != nil {
kl.Log().Errorf("worker: failed to key ingress: %v", err)
return
} else {
kl.Log().Infof("Detected spec change - queued ingress %q to be processed", key)
// immediately queue the item, as its spec has changed so it may now
// be valid
kl.workQueue.Add(key)
}
}
},
}
Expand All @@ -111,3 +190,21 @@ func (kl *KubeLego) WatchEvents() {

go controller.Run(kl.stopCh)
}

// anyDifferent returns true if any of the keys passed are different in the given
// map.
func anyDifferent(left, right map[string]string, keys ...string) bool {
// if either left or right are nil, and the other isn't, then
// return true to kick off re-processing
if left == nil && right == nil ||
left == nil && right != nil ||
left != nil && right == nil {
return true
}
for _, k := range keys {
if left[k] != right[k] {
return true
}
}
return false
}

0 comments on commit 87ffd39

Please sign in to comment.