Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PoC: Dymanically add watch for clusterclaim CRD #2743

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 60 additions & 3 deletions controllers/ocsinitialization/ocsinitialization_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"gopkg.in/yaml.v2"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -33,10 +34,14 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

// operatorNamespace is the namespace the operator is running in
Expand All @@ -63,6 +68,8 @@ func InitNamespacedName() types.NamespacedName {
// nolint:revive
type OCSInitializationReconciler struct {
client.Client
cluster cluster.Cluster
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment on what does it holds?

controller controller.Controller
ctx context.Context
Log logr.Logger
Scheme *runtime.Scheme
Expand Down Expand Up @@ -152,6 +159,45 @@ func (r *OCSInitializationReconciler) Reconcile(ctx context.Context, request rec
return reconcile.Result{}, err
}

if _, ok := r.availableCrds[ClusterClaimCrdName]; ok {
crdHandler := handler.TypedEnqueueRequestsFromMapFunc(func(_ context.Context, obj client.Object) []reconcile.Request {
if obj.GetName() == ClusterClaimCrdName {
return []reconcile.Request{
{
NamespacedName: initNamespacedName,
},
}
}
return []reconcile.Request{}
})

crdPredicate := predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
if metaObj, ok := e.Object.(metav1.Object); ok {
return metaObj.GetName() == ClusterClaimCrdName
}
return false
},
DeleteFunc: func(_ event.DeleteEvent) bool {
return false
},
UpdateFunc: func(_ event.UpdateEvent) bool {
return false
},
GenericFunc: func(_ event.GenericEvent) bool {
return false
},
}
if err := r.controller.Watch(source.Kind[client.Object](r.cluster.GetCache(),
&apiextensionsv1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{
Name: ClusterClaimCrdName,
},
}, crdHandler, crdPredicate)); err != nil {
return reconcile.Result{}, fmt.Errorf("unable to watch CRD")
}
Comment on lines +191 to +198
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@umangachapagain This will not achieve the desired result. You cannot just add watches to a controller after the manager started, the cache will become incoherent and inconsistent. The only way to do it within the process is to close all controllers stop and reset the manager then start the manager again. What you are doing here can only work if the client we use is cacheless, but then the informers will not work properly and there will be a runtime penalty for lists and gets

}

r.clusters, err = util.GetClusters(ctx, r.Client)
if err != nil {
r.Log.Error(err, "Failed to get clusters")
Expand Down Expand Up @@ -264,14 +310,21 @@ func (r *OCSInitializationReconciler) Reconcile(ctx context.Context, request rec

// SetupWithManager sets up a controller with a manager
func (r *OCSInitializationReconciler) SetupWithManager(mgr ctrl.Manager) error {
cluster, err := cluster.New(mgr.GetConfig(), func(options *cluster.Options) {
options.Scheme = mgr.GetScheme()
})
Comment on lines +313 to +315
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we fetch it in main and pass it to all controllers and it can be used in a storagecluster controller as well.

Comment on lines +313 to +315
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@umangachapagain it's been sometime that I looked into controller-runtime (during ODFMS), pls excuse if I'm a bit wrong here.

You are using only Scheme from manager but not the cluster which is auto created and has a cache attached to it. The controller that is being saved on reconciler could probably be configured against managers' cluster cache but may not be against this newly created cluster cache.

If above is correct, we may want to use a single cache across all controllers?

if err != nil {
return err
}
r.cluster = cluster
operatorNamespace = r.OperatorNamespace
prometheusPredicate := predicate.NewPredicateFuncs(
func(client client.Object) bool {
return strings.HasPrefix(client.GetName(), PrometheusOperatorCSVNamePrefix)
},
)

ocsInitializationController := ctrl.NewControllerManagedBy(mgr).
ocsInitializationController, err := ctrl.NewControllerManagedBy(mgr).
For(&ocsv1.OCSInitialization{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Owns(&corev1.Service{}).
Owns(&corev1.Secret{}).
Expand Down Expand Up @@ -352,8 +405,12 @@ func (r *OCSInitializationReconciler) SetupWithManager(mgr ctrl.Manager) error {
},
),
builder.WithPredicates(prometheusPredicate),
)
return ocsInitializationController.Complete(r)
).Build(r)
if err != nil {
return err
}
r.controller = ocsInitializationController
return nil
}

func (r *OCSInitializationReconciler) ensureClusterClaimExists() error {
Expand Down
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -95,6 +96,7 @@ func init() {
utilruntime.Must(operatorsv1alpha1.AddToScheme(scheme))
utilruntime.Must(nadscheme.AddToScheme(scheme))
utilruntime.Must(ocsclientv1a1.AddToScheme(scheme))
utilruntime.Must(apiextensions.AddToScheme(scheme))
// +kubebuilder:scaffold:scheme
}

Expand Down
Loading