From d42d4dae86e5970eccf5b3b5e2db676f00c8d872 Mon Sep 17 00:00:00 2001 From: Travis Nielsen Date: Tue, 12 Jul 2022 16:05:44 -0600 Subject: [PATCH] core: operator skips reconcile of mons and osds in debug During certain maintenance tasks the admin will own running operations on the ceph mons and osds, and the operator should not interfere with those operations. If the operator sees any mon in debug mode, every reconcile and mon health check will be skipped. Thus, mons will not be updated while any one of them is in maintenance. During OSD reconcile, individual OSD deployment updates will only be skipped for OSDs that are actively being debugged. The debug mode for osd and mon deployments is signaled by creating the ceph.rook.io/do-not-reconcile label. Signed-off-by: Travis Nielsen (cherry picked from commit 7c56b9328a69a20a6823422e5918cfe31eeb4943) (cherry picked from commit f34c94051817760c6848d9907d4908c3cfaec894) --- pkg/apis/ceph.rook.io/v1/labels.go | 5 ++++ pkg/operator/ceph/cluster/mon/health.go | 9 ++++++ pkg/operator/ceph/cluster/mon/mon.go | 29 +++++++++++++++++++ pkg/operator/ceph/cluster/osd/osd.go | 24 ++++++++++++++- pkg/operator/ceph/cluster/osd/update.go | 20 +++++++++---- pkg/operator/ceph/cluster/osd/update_test.go | 23 ++++++++++++++- .../ceph/disruption/clusterdisruption/osd.go | 4 ++- 7 files changed, 105 insertions(+), 9 deletions(-) diff --git a/pkg/apis/ceph.rook.io/v1/labels.go b/pkg/apis/ceph.rook.io/v1/labels.go index f395e91a3977..11ec19100d72 100644 --- a/pkg/apis/ceph.rook.io/v1/labels.go +++ b/pkg/apis/ceph.rook.io/v1/labels.go @@ -20,6 +20,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +const ( + // SkipReconcileLabelKey is a label indicating that the pod should not be reconciled + SkipReconcileLabelKey = "ceph.rook.io/do-not-reconcile" +) + // LabelsSpec is the main spec label for all daemons type LabelsSpec map[KeyType]Labels diff --git a/pkg/operator/ceph/cluster/mon/health.go b/pkg/operator/ceph/cluster/mon/health.go index 4a34c3933ceb..d341b38886c2 100644 --- a/pkg/operator/ceph/cluster/mon/health.go +++ b/pkg/operator/ceph/cluster/mon/health.go @@ -157,6 +157,15 @@ func (c *Cluster) checkHealth(ctx context.Context) error { return errors.New("skipping mon health check since there are no monitors") } + monsToSkipReconcile, err := c.getMonsToSkipReconcile() + if err != nil { + return errors.Wrap(err, "failed to check for mons to skip reconcile") + } + if monsToSkipReconcile.Len() > 0 { + logger.Warningf("skipping mon health check since mons are labeled with %s: %v", cephv1.SkipReconcileLabelKey, monsToSkipReconcile.List()) + return nil + } + logger.Debugf("Checking health for mons in cluster %q", c.ClusterInfo.Namespace) // For an external connection we use a special function to get the status diff --git a/pkg/operator/ceph/cluster/mon/mon.go b/pkg/operator/ceph/cluster/mon/mon.go index 8ce9e135643a..9b8d34d2f9d5 100644 --- a/pkg/operator/ceph/cluster/mon/mon.go +++ b/pkg/operator/ceph/cluster/mon/mon.go @@ -47,6 +47,7 @@ import ( kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" ) @@ -192,6 +193,15 @@ func (c *Cluster) Start(clusterInfo *cephclient.ClusterInfo, rookVersion string, logger.Infof("targeting the mon count %d", c.spec.Mon.Count) + monsToSkipReconcile, err := c.getMonsToSkipReconcile() + if err != nil { + return nil, errors.Wrap(err, "failed to check for mons to skip reconcile") + } + if monsToSkipReconcile.Len() > 0 { + logger.Warningf("skipping mon reconcile since mons are labeled with %s: %v", cephv1.SkipReconcileLabelKey, monsToSkipReconcile.List()) + return c.ClusterInfo, nil + } + // create the mons for a new cluster or ensure mons are running in an existing cluster return c.ClusterInfo, c.startMons(c.spec.Mon.Count) } @@ -1445,3 +1455,22 @@ func (c *Cluster) releaseOrchestrationLock() { c.orchestrationMutex.Unlock() logger.Debugf("Released lock for mon orchestration") } + +func (c *Cluster) getMonsToSkipReconcile() (sets.String, error) { + listOpts := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s,%s", k8sutil.AppAttr, AppName, cephv1.SkipReconcileLabelKey)} + + deployments, err := c.context.Clientset.AppsV1().Deployments(c.ClusterInfo.Namespace).List(c.ClusterInfo.Context, listOpts) + if err != nil { + return nil, errors.Wrap(err, "failed to query mons to skip reconcile") + } + + result := sets.NewString() + for _, deployment := range deployments.Items { + if monID, ok := deployment.Labels[config.MonType]; ok { + logger.Infof("found mon %q pod to skip reconcile", monID) + result.Insert(monID) + } + } + + return result, nil +} diff --git a/pkg/operator/ceph/cluster/osd/osd.go b/pkg/operator/ceph/cluster/osd/osd.go index 75207531e971..e130e9602455 100644 --- a/pkg/operator/ceph/cluster/osd/osd.go +++ b/pkg/operator/ceph/cluster/osd/osd.go @@ -195,8 +195,12 @@ func (c *Cluster) Start() error { if err != nil { return errors.Wrapf(err, "failed to get information about currently-running OSD Deployments in namespace %q", namespace) } + osdsToSkipReconcile, err := c.getOSDsToSkipReconcile() + if err != nil { + logger.Warningf("failed to get osds to skip reconcile. %v", err) + } logger.Debugf("%d of %d OSD Deployments need updated", updateQueue.Len(), deployments.Len()) - updateConfig := c.newUpdateConfig(config, updateQueue, deployments) + updateConfig := c.newUpdateConfig(config, updateQueue, deployments, osdsToSkipReconcile) // prepare for creating new OSDs statusConfigMaps := sets.NewString() @@ -257,6 +261,24 @@ func (c *Cluster) getExistingOSDDeploymentsOnPVCs() (sets.String, error) { return result, nil } +func (c *Cluster) getOSDsToSkipReconcile() (sets.String, error) { + listOpts := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s,%s", k8sutil.AppAttr, AppName, cephv1.SkipReconcileLabelKey)} + + deployments, err := c.context.Clientset.AppsV1().Deployments(c.clusterInfo.Namespace).List(c.clusterInfo.Context, listOpts) + if err != nil { + return nil, errors.Wrap(err, "failed to query OSDs to skip reconcile") + } + + result := sets.NewString() + for _, deployment := range deployments.Items { + if osdID, ok := deployment.Labels[OsdIdLabelKey]; ok { + result.Insert(osdID) + } + } + + return result, nil +} + func deploymentOnNode(c *Cluster, osd OSDInfo, nodeName string, config *provisionConfig) (*appsv1.Deployment, error) { osdLongName := fmt.Sprintf("OSD %d on node %q", osd.ID, nodeName) diff --git a/pkg/operator/ceph/cluster/osd/update.go b/pkg/operator/ceph/cluster/osd/update.go index 52c985c5f765..576b6f54caf3 100644 --- a/pkg/operator/ceph/cluster/osd/update.go +++ b/pkg/operator/ceph/cluster/osd/update.go @@ -28,6 +28,7 @@ import ( appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" ) // THE LIBRARY PROVIDED BY THIS FILE IS NOT THREAD SAFE @@ -42,17 +43,19 @@ var ( ) type updateConfig struct { - cluster *Cluster - provisionConfig *provisionConfig - queue *updateQueue // these OSDs need updated - numUpdatesNeeded int // the number of OSDs that needed updating - deployments *existenceList // these OSDs have existing deployments + cluster *Cluster + provisionConfig *provisionConfig + queue *updateQueue // these OSDs need updated + numUpdatesNeeded int // the number of OSDs that needed updating + deployments *existenceList // these OSDs have existing deployments + osdsToSkipReconcile sets.String // these OSDs should not be updated during reconcile } func (c *Cluster) newUpdateConfig( provisionConfig *provisionConfig, queue *updateQueue, deployments *existenceList, + osdsToSkipReconcile sets.String, ) *updateConfig { return &updateConfig{ c, @@ -60,6 +63,7 @@ func (c *Cluster) newUpdateConfig( queue, queue.Len(), deployments, + osdsToSkipReconcile, } } @@ -126,6 +130,11 @@ func (c *updateConfig) updateExistingOSDs(errs *provisionErrors) { continue } + if c.osdsToSkipReconcile.Has(strconv.Itoa(osdID)) { + logger.Warningf("Skipping update for OSD %d since labeled with %s", osdID, cephv1.SkipReconcileLabelKey) + continue + } + // backward compatibility for old deployments // Checking DeviceClass with None too, because ceph-volume lvm list return crush device class as None // Tracker https://tracker.ceph.com/issues/53425 @@ -154,7 +163,6 @@ func (c *updateConfig) updateExistingOSDs(errs *provisionErrors) { } else { if !c.cluster.ValidStorage.NodeExists(nodeOrPVCName) { // node will not reconcile, so don't update the deployment - // allow the OSD health checker to remove the OSD logger.Warningf( "not updating OSD %d on node %q. node no longer exists in the storage spec. "+ "if the user wishes to remove OSDs from the node, they must do so manually. "+ diff --git a/pkg/operator/ceph/cluster/osd/update_test.go b/pkg/operator/ceph/cluster/osd/update_test.go index 1f76f18d2ca5..677ec69b82e1 100644 --- a/pkg/operator/ceph/cluster/osd/update_test.go +++ b/pkg/operator/ceph/cluster/osd/update_test.go @@ -38,6 +38,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" ) @@ -110,7 +111,7 @@ func Test_updateExistingOSDs(t *testing.T) { } c = New(ctx, clusterInfo, spec, "rook/rook:master") config := c.newProvisionConfig() - updateConfig = c.newUpdateConfig(config, updateQueue, existingDeployments) + updateConfig = c.newUpdateConfig(config, updateQueue, existingDeployments, sets.NewString()) // prepare outputs deploymentsUpdated = []string{} @@ -480,6 +481,26 @@ func Test_updateExistingOSDs(t *testing.T) { assert.Equal(t, 0, updateQueue.Len()) // should be done with updates }) + + t.Run("skip osd reconcile", func(t *testing.T) { + clientset = fake.NewSimpleClientset() + updateQueue = newUpdateQueueWithIDs(0, 1) + existingDeployments = newExistenceListWithIDs(0) + forceUpgradeIfUnhealthy = true + updateInjectFailures = k8sutil.Failures{} + doSetup() + addDeploymentOnNode("node0", 0) + + osdToBeQueried = 0 + updateConfig.osdsToSkipReconcile.Insert("0") + updateConfig.updateExistingOSDs(errs) + assert.Zero(t, errs.len()) + assert.Equal(t, 1, updateQueue.Len()) + osdIDUpdated, ok := updateQueue.Pop() + assert.True(t, ok) + assert.Equal(t, 1, osdIDUpdated) + updateConfig.osdsToSkipReconcile.Delete("0") + }) } func Test_getOSDUpdateInfo(t *testing.T) { diff --git a/pkg/operator/ceph/disruption/clusterdisruption/osd.go b/pkg/operator/ceph/disruption/clusterdisruption/osd.go index 1359bf8d880c..c0632b205c6c 100644 --- a/pkg/operator/ceph/disruption/clusterdisruption/osd.go +++ b/pkg/operator/ceph/disruption/clusterdisruption/osd.go @@ -552,7 +552,9 @@ func (r *ReconcileClusterDisruption) getOSDFailureDomains(clusterInfo *cephclien nodeDrainFailureDomains.Insert(failureDomainName) } } else { - logger.Infof("osd %q is down but no node drain is detected", deployment.Name) + if !strings.HasSuffix(deployment.Name, "-debug") { + logger.Infof("osd %q is down but no node drain is detected", deployment.Name) + } } }