Skip to content

Commit

Permalink
add controller concurrency options with elevated defaults
Browse files Browse the repository at this point in the history
  • Loading branch information
eljohnson92 committed Jul 10, 2024
1 parent 598c935 commit 31917b5
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 19 deletions.
55 changes: 41 additions & 14 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ import (
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
capi "sigs.k8s.io/cluster-api/api/v1beta1"
ctrl "sigs.k8s.io/controller-runtime"
crcontroller "sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

infrastructurev1alpha1 "github.com/linode/cluster-api-provider-linode/api/v1alpha1"
infrastructurev1alpha2 "github.com/linode/cluster-api-provider-linode/api/v1alpha2"
controller2 "github.com/linode/cluster-api-provider-linode/controller"
"github.com/linode/cluster-api-provider-linode/controller"
"github.com/linode/cluster-api-provider-linode/observability/tracing"
"github.com/linode/cluster-api-provider-linode/observability/wrappers/reconciler"
"github.com/linode/cluster-api-provider-linode/version"
Expand All @@ -58,10 +59,12 @@ var (
)

const (
controllerName = "cluster-api-provider-linode.linode.com"
gracePeriod = 5 * time.Second
envK8sNodeName = "K8S_NODE_NAME"
envK8sPodName = "K8S_POD_NAME"
controllerName = "cluster-api-provider-linode.linode.com"
envK8sNodeName = "K8S_NODE_NAME"
envK8sPodName = "K8S_POD_NAME"
concurrencyDefault = 10
qpsDefault = 20
burstDefault = 30
)

func init() {
Expand All @@ -84,6 +87,13 @@ func main() {
metricsAddr string
enableLeaderElection bool
probeAddr string

restConfigQPS int
restConfigBurst int
linodeClusterConcurrency int
linodeMachineConcurrency int
linodeObjectStorageBucketConcurrency int
linodeVPCConcurrency int
)
flag.StringVar(&machineWatchFilter, "machine-watch-filter", "", "The machines to watch by label.")
flag.StringVar(&clusterWatchFilter, "cluster-watch-filter", "", "The clusters to watch by label.")
Expand All @@ -93,6 +103,18 @@ func main() {
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
flag.IntVar(&restConfigQPS, "kube-api-qps", qpsDefault,
"Maximum queries per second from the controller client to the Kubernetes API server. Defaults to 20")
flag.IntVar(&restConfigBurst, "kube-api-burst", burstDefault,
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server. Default 30")
flag.IntVar(&linodeClusterConcurrency, "linodecluster-concurrency", concurrencyDefault,
"Number of LinodeClusters to process simultaneously. Default 10")
flag.IntVar(&linodeMachineConcurrency, "linodemachine-concurrency", concurrencyDefault,
"Number of LinodeMachines to process simultaneously. Default 10")
flag.IntVar(&linodeObjectStorageBucketConcurrency, "linodeobjectstoragebucket-concurrency", concurrencyDefault,
"Number of linodeObjectStorageBuckets to process simultaneously. Default 10")
flag.IntVar(&linodeVPCConcurrency, "linodevpc-concurrency", concurrencyDefault,
"Number of LinodeVPCs to process simultaneously. Default 10")
opts := zap.Options{
Development: true,
}
Expand All @@ -110,7 +132,12 @@ func main() {
linodeDNSToken = linodeToken
}

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
restConfig := ctrl.GetConfigOrDie()
restConfig.QPS = float32(restConfigQPS)
restConfig.Burst = restConfigBurst
restConfig.UserAgent = fmt.Sprintf("CAPL/%s", version.GetVersion())

mgr, err := ctrl.NewManager(restConfig, ctrl.Options{
Scheme: scheme,
Metrics: metricsserver.Options{BindAddress: metricsAddr},
HealthProbeBindAddress: probeAddr,
Expand All @@ -134,52 +161,52 @@ func main() {
}

if err = reconciler.NewReconcilerWithTracing(
&controller2.LinodeClusterReconciler{
&controller.LinodeClusterReconciler{
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor("LinodeClusterReconciler"),
WatchFilterValue: clusterWatchFilter,
LinodeApiKey: linodeToken,
},
).SetupWithManager(mgr); err != nil {
).SetupWithManager(mgr, crcontroller.Options{MaxConcurrentReconciles: linodeClusterConcurrency}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "LinodeCluster")
os.Exit(1)
}

if err = reconciler.NewReconcilerWithTracing(
&controller2.LinodeMachineReconciler{
&controller.LinodeMachineReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("LinodeMachineReconciler"),
WatchFilterValue: machineWatchFilter,
LinodeApiKey: linodeToken,
LinodeDNSAPIKey: linodeDNSToken,
},
).SetupWithManager(mgr); err != nil {
).SetupWithManager(mgr, crcontroller.Options{MaxConcurrentReconciles: linodeMachineConcurrency}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "LinodeMachine")
os.Exit(1)
}

if err = reconciler.NewReconcilerWithTracing(
&controller2.LinodeVPCReconciler{
&controller.LinodeVPCReconciler{
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor("LinodeVPCReconciler"),
WatchFilterValue: clusterWatchFilter,
LinodeApiKey: linodeToken,
},
).SetupWithManager(mgr); err != nil {
).SetupWithManager(mgr, crcontroller.Options{MaxConcurrentReconciles: linodeVPCConcurrency}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "LinodeVPC")
os.Exit(1)
}

if err = reconciler.NewReconcilerWithTracing(
&controller2.LinodeObjectStorageBucketReconciler{
&controller.LinodeObjectStorageBucketReconciler{
Client: mgr.GetClient(),
Logger: ctrl.Log.WithName("LinodeObjectStorageBucketReconciler"),
Recorder: mgr.GetEventRecorderFor("LinodeObjectStorageBucketReconciler"),
WatchFilterValue: objectStorageBucketWatchFilter,
LinodeApiKey: linodeToken,
},
).SetupWithManager(mgr); err != nil {
).SetupWithManager(mgr, crcontroller.Options{MaxConcurrentReconciles: linodeObjectStorageBucketConcurrency}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "LinodeObjectStorageBucket")
os.Exit(1)
}
Expand Down
4 changes: 3 additions & 1 deletion controller/linodecluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
crcontroller "sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"

Expand Down Expand Up @@ -288,9 +289,10 @@ func (r *LinodeClusterReconciler) reconcileDelete(ctx context.Context, logger lo
}

// SetupWithManager sets up the controller with the Manager.
func (r *LinodeClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *LinodeClusterReconciler) SetupWithManager(mgr ctrl.Manager, options crcontroller.Options) error {
err := ctrl.NewControllerManagedBy(mgr).
For(&infrav1alpha2.LinodeCluster{}).
WithOptions(options).
WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(mgr.GetLogger(), r.WatchFilterValue)).
Watches(
&clusterv1.Cluster{},
Expand Down
4 changes: 3 additions & 1 deletion controller/linodemachine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
crcontroller "sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -728,14 +729,15 @@ func (r *LinodeMachineReconciler) reconcileDelete(
}

// SetupWithManager sets up the controller with the Manager.
func (r *LinodeMachineReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *LinodeMachineReconciler) SetupWithManager(mgr ctrl.Manager, options crcontroller.Options) error {
linodeMachineMapper, err := kutil.ClusterToTypedObjectsMapper(r.Client, &infrav1alpha1.LinodeMachineList{}, mgr.GetScheme())
if err != nil {
return fmt.Errorf("failed to create mapper for LinodeMachines: %w", err)
}

err = ctrl.NewControllerManagedBy(mgr).
For(&infrav1alpha1.LinodeMachine{}).
WithOptions(options).
Watches(
&clusterv1.Machine{},
handler.EnqueueRequestsFromMapFunc(kutil.MachineToInfrastructureMapFunc(infrav1alpha1.GroupVersion.WithKind("LinodeMachine"))),
Expand Down
4 changes: 3 additions & 1 deletion controller/linodeobjectstoragebucket_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
crcontroller "sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
Expand Down Expand Up @@ -257,14 +258,15 @@ func (r *LinodeObjectStorageBucketReconciler) reconcileDelete(ctx context.Contex
}

// SetupWithManager sets up the controller with the Manager.
func (r *LinodeObjectStorageBucketReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *LinodeObjectStorageBucketReconciler) SetupWithManager(mgr ctrl.Manager, options crcontroller.Options) error {
linodeObjectStorageBucketMapper, err := kutil.ClusterToTypedObjectsMapper(r.Client, &infrav1alpha1.LinodeObjectStorageBucketList{}, mgr.GetScheme())
if err != nil {
return fmt.Errorf("failed to create mapper for LinodeObjectStorageBuckets: %w", err)
}

err = ctrl.NewControllerManagedBy(mgr).
For(&infrav1alpha1.LinodeObjectStorageBucket{}).
WithOptions(options).
Owns(&corev1.Secret{}).
WithEventFilter(predicate.And(
predicates.ResourceHasFilterLabel(mgr.GetLogger(), r.WatchFilterValue),
Expand Down
4 changes: 3 additions & 1 deletion controller/linodevpc_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
crcontroller "sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand Down Expand Up @@ -314,14 +315,15 @@ func (r *LinodeVPCReconciler) reconcileDelete(ctx context.Context, logger logr.L
}

// SetupWithManager sets up the controller with the Manager.
func (r *LinodeVPCReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *LinodeVPCReconciler) SetupWithManager(mgr ctrl.Manager, options crcontroller.Options) error {
linodeVPCMapper, err := kutil.ClusterToTypedObjectsMapper(r.Client, &infrav1alpha1.LinodeVPCList{}, mgr.GetScheme())
if err != nil {
return fmt.Errorf("failed to create mapper for LinodeVPCs: %w", err)
}

err = ctrl.NewControllerManagedBy(mgr).
For(&infrav1alpha1.LinodeVPC{}).
WithOptions(options).
WithEventFilter(
predicate.And(
// Filter for objects with a specific WatchLabel.
Expand Down
3 changes: 2 additions & 1 deletion observability/wrappers/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

Expand All @@ -31,5 +32,5 @@ type Reconciler interface {
client.SubResourceClientConstructor

Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)
SetupWithManager(mgr manager.Manager) error
SetupWithManager(mgr manager.Manager, options controller.Options) error
}

0 comments on commit 31917b5

Please sign in to comment.