Skip to content

Commit

Permalink
Merge pull request rook#14258 from subhamkrai/add-cephfs-fencing
Browse files Browse the repository at this point in the history
mds: block of active mds ip only
  • Loading branch information
travisn authored Jul 17, 2024
2 parents b287aa9 + 9f1b64b commit 3ccdb53
Show file tree
Hide file tree
Showing 5 changed files with 311 additions and 61 deletions.
12 changes: 7 additions & 5 deletions pkg/operator/ceph/cluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,17 @@ type ReconcileCephCluster struct {
context *clusterd.Context
clusterController *ClusterController
opManagerContext context.Context
opConfig opcontroller.OperatorConfig
}

// Add creates a new CephCluster Controller and adds it to the Manager. The Manager will set fields on the Controller
// and Start it when the Manager is Started.
func Add(mgr manager.Manager, ctx *clusterd.Context, clusterController *ClusterController, opManagerContext context.Context) error {
return add(opManagerContext, mgr, newReconciler(mgr, ctx, clusterController, opManagerContext), ctx)
func Add(mgr manager.Manager, ctx *clusterd.Context, clusterController *ClusterController, opManagerContext context.Context, opConfig opcontroller.OperatorConfig) error {
return add(opManagerContext, mgr, newReconciler(mgr, ctx, clusterController, opManagerContext, opConfig), ctx, opConfig)
}

// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager, ctx *clusterd.Context, clusterController *ClusterController, opManagerContext context.Context) reconcile.Reconciler {
func newReconciler(mgr manager.Manager, ctx *clusterd.Context, clusterController *ClusterController, opManagerContext context.Context, opConfig opcontroller.OperatorConfig) reconcile.Reconciler {
// add "rook-" prefix to the controller name to make sure it is clear to all reading the events
// that they are coming from Rook. The controller name already has context that it is for Ceph
// and from the cluster controller.
Expand All @@ -118,12 +119,13 @@ func newReconciler(mgr manager.Manager, ctx *clusterd.Context, clusterController
client: mgr.GetClient(),
scheme: mgr.GetScheme(),
context: ctx,
opConfig: opConfig,
clusterController: clusterController,
opManagerContext: opManagerContext,
}
}

func add(opManagerContext context.Context, mgr manager.Manager, r reconcile.Reconciler, context *clusterd.Context) error {
func add(opManagerContext context.Context, mgr manager.Manager, r reconcile.Reconciler, context *clusterd.Context, opConfig opcontroller.OperatorConfig) error {
// Create a new controller
c, err := controller.New(controllerName, mgr, controller.Options{Reconciler: r})
if err != nil {
Expand Down Expand Up @@ -170,7 +172,7 @@ func add(opManagerContext context.Context, mgr manager.Manager, r reconcile.Reco
mgr.GetCache(),
&corev1.Node{TypeMeta: metav1.TypeMeta{Kind: "Node", APIVersion: corev1.SchemeGroupVersion.String()}},
handler.EnqueueRequestsFromMapFunc(handlerFunc),
predicateForNodeWatcher(opManagerContext, mgr.GetClient(), context))
predicateForNodeWatcher(opManagerContext, mgr.GetClient(), context, opConfig.OperatorNamespace))
err = c.Watch(nodeKind)
if err != nil {
return err
Expand Down
6 changes: 3 additions & 3 deletions pkg/operator/ceph/cluster/predicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ func shouldReconcileChangedNode(objOld, objNew *corev1.Node) bool {
}

// predicateForNodeWatcher is the predicate function to trigger reconcile on Node events
func predicateForNodeWatcher(ctx context.Context, client client.Client, context *clusterd.Context) predicate.Funcs {
func predicateForNodeWatcher(ctx context.Context, client client.Client, context *clusterd.Context, opNamespace string) predicate.Funcs {
return predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
clientCluster := newClientCluster(client, e.Object.GetNamespace(), context)
return clientCluster.onK8sNode(ctx, e.Object)
return clientCluster.onK8sNode(ctx, e.Object, opNamespace)
},

UpdateFunc: func(e event.UpdateEvent) bool {
Expand All @@ -67,7 +67,7 @@ func predicateForNodeWatcher(ctx context.Context, client client.Client, context
}

clientCluster := newClientCluster(client, e.ObjectNew.GetNamespace(), context)
return clientCluster.onK8sNode(ctx, e.ObjectNew)
return clientCluster.onK8sNode(ctx, e.ObjectNew, opNamespace)
}
}
return false
Expand Down
191 changes: 173 additions & 18 deletions pkg/operator/ceph/cluster/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ var (

// drivers that supports fencing, used in naming networkFence object
const (
rbdDriver = "rbd"
rbdDriver = "rbd"
cephfsDriver = "cephfs"
)

func newClientCluster(client client.Client, namespace string, context *clusterd.Context) *clientCluster {
Expand All @@ -80,7 +81,7 @@ func checkStorageForNode(cluster *cephv1.CephCluster) bool {
}

// onK8sNode is triggered when a node is added in the Kubernetes cluster
func (c *clientCluster) onK8sNode(ctx context.Context, object runtime.Object) bool {
func (c *clientCluster) onK8sNode(ctx context.Context, object runtime.Object, opNamespace string) bool {
node, ok := object.(*corev1.Node)
if !ok {
return false
Expand All @@ -90,7 +91,7 @@ func (c *clientCluster) onK8sNode(ctx context.Context, object runtime.Object) bo
cluster := c.getCephCluster()

// Continue reconcile in case of failure too since we don't want to block other node reconcile
if err := c.handleNodeFailure(ctx, cluster, node); err != nil {
if err := c.handleNodeFailure(ctx, cluster, node, opNamespace); err != nil {
logger.Errorf("failed to handle node failure. %v", err)
}

Expand Down Expand Up @@ -157,7 +158,7 @@ func (c *clientCluster) onK8sNode(ctx context.Context, object runtime.Object) bo
return false
}

func (c *clientCluster) handleNodeFailure(ctx context.Context, cluster *cephv1.CephCluster, node *corev1.Node) error {
func (c *clientCluster) handleNodeFailure(ctx context.Context, cluster *cephv1.CephCluster, node *corev1.Node, opNamespace string) error {
watchForNodeLoss, err := k8sutil.GetOperatorSetting(ctx, c.context.Clientset, opcontroller.OperatorSettingConfigMapName, "ROOK_WATCH_FOR_NODE_FAILURE", "true")
if err != nil {
return pkgerror.Wrapf(err, "failed to get configmap value `ROOK_WATCH_FOR_NODE_FAILURE`.")
Expand Down Expand Up @@ -198,7 +199,7 @@ func (c *clientCluster) handleNodeFailure(ctx context.Context, cluster *cephv1.C
}

if nodeHasOutOfServiceTaint {
err := c.fenceNode(ctx, node, cluster)
err := c.fenceNode(ctx, node, cluster, opNamespace)
if err != nil {
return pkgerror.Wrapf(err, "failed to create network fence for node %q.", node.Name)
}
Expand All @@ -210,20 +211,24 @@ func (c *clientCluster) handleNodeFailure(ctx context.Context, cluster *cephv1.C
return pkgerror.Wrapf(err, "failed to delete rbd network fence for node %q.", node.Name)
}

err = c.unfenceAndDeleteNetworkFence(ctx, *node, cluster, cephfsDriver)
if err != nil {
return pkgerror.Wrapf(err, "failed to delete cephFS network fence for node %q.", node.Name)
}
return nil
}

func (c *clientCluster) fenceNode(ctx context.Context, node *corev1.Node, cluster *cephv1.CephCluster) error {
func (c *clientCluster) fenceNode(ctx context.Context, node *corev1.Node, cluster *cephv1.CephCluster, opNamespace string) error {
volumesInuse := node.Status.VolumesInUse
if len(volumesInuse) == 0 {
logger.Debugf("no volumes in use for node %q", node.Name)
return nil
}
logger.Debugf("volumesInuse %s", volumesInuse)

rbdVolumesInUse := getCephVolumesInUse(cluster, volumesInuse)
if len(rbdVolumesInUse) == 0 {
logger.Debugf("no rbd volumes in use for out of service node %q", node.Name)
rbdVolumesInUse, cephFSVolumeInUse := getCephVolumesInUse(cluster, volumesInuse, opNamespace)
if len(rbdVolumesInUse) == 0 && len(cephFSVolumeInUse) == 0 {
logger.Debugf("no rbd or cephfs subvolumes in use for out of service node %q", node.Name)
return nil
}

Expand All @@ -233,7 +238,7 @@ func (c *clientCluster) fenceNode(ctx context.Context, node *corev1.Node, cluste
}

if len(rbdVolumesInUse) != 0 {
rbdPVList := listRBDPV(listPVs, cluster, rbdVolumesInUse)
rbdPVList := listRBDPV(listPVs, cluster, rbdVolumesInUse, opNamespace)
if len(rbdPVList) == 0 {
logger.Debug("No rbd PVs found on the node")
} else {
Expand Down Expand Up @@ -262,22 +267,59 @@ func (c *clientCluster) fenceNode(ctx context.Context, node *corev1.Node, cluste
}
}

if len(cephFSVolumeInUse) != 0 {
cephFSVolumeInUseMap := make(map[string]struct{})
for _, vol := range cephFSVolumeInUse {
cephFSVolumeInUseMap[vol] = struct{}{}
}
cephFSPVList := listRWOCephFSPV(listPVs, cluster, cephFSVolumeInUseMap, opNamespace)
if len(cephFSPVList) == 0 {
logger.Debug("No cephfs PVs found on the node %s", node.Name)
return nil
}
logger.Infof("node %q require fencing, found cephfs subvolumes in use", node.Name)
clusterInfo, _, _, err := opcontroller.LoadClusterInfo(c.context, ctx, cluster.Namespace, &cluster.Spec)
if err != nil {
return pkgerror.Wrap(err, "Failed to load cluster info.")
}

for i := range cephFSPVList {
err = c.fenceCephFSSubvolume(ctx, node, cluster, clusterInfo, cephFSPVList[i])
// We only need to create the network fence for any one of cephFS pv.
if err == nil {
break
}

// continue to fence next cephfs subvolume if active client not found
if stderrors.Is(err, errActiveClientNotFound) {
continue
}
if i == len(cephFSPVList)-1 {
return pkgerror.Wrap(err, "failed to fence cephfs subvolumes")
}
logger.Errorf("failed to fence cephfs subvolume %q, trying next cephfs subvolume", cephFSPVList[i].Name)
}

}
return nil
}

func getCephVolumesInUse(cluster *cephv1.CephCluster, volumesInUse []corev1.UniqueVolumeName) []string {
var rbdVolumesInUse []string
func getCephVolumesInUse(cluster *cephv1.CephCluster, volumesInUse []corev1.UniqueVolumeName, opNamespace string) ([]string, []string) {
var rbdVolumesInUse, cephFSVolumeInUse []string

for _, volume := range volumesInUse {
splitVolumeInUseBased := trimeVolumeInUse(volume)
logger.Infof("volumeInUse after split based on '^' %v", splitVolumeInUseBased)

if len(splitVolumeInUseBased) == 2 && splitVolumeInUseBased[0] == fmt.Sprintf("%s.rbd.csi.ceph.com", cluster.Namespace) {
if len(splitVolumeInUseBased) == 2 && splitVolumeInUseBased[0] == fmt.Sprintf("%s.rbd.csi.ceph.com", opNamespace) {
rbdVolumesInUse = append(rbdVolumesInUse, splitVolumeInUseBased[1])
}
}

return rbdVolumesInUse
if len(splitVolumeInUseBased) == 2 && splitVolumeInUseBased[0] == fmt.Sprintf("%s.cephfs.csi.ceph.com", opNamespace) {
cephFSVolumeInUse = append(cephFSVolumeInUse, splitVolumeInUseBased[1])
}
}
return rbdVolumesInUse, cephFSVolumeInUse
}

func trimeVolumeInUse(volume corev1.UniqueVolumeName) []string {
Expand All @@ -286,7 +328,7 @@ func trimeVolumeInUse(volume corev1.UniqueVolumeName) []string {
return splitVolumeInUseBased
}

func listRBDPV(listPVs *corev1.PersistentVolumeList, cluster *cephv1.CephCluster, rbdVolumesInUse []string) []corev1.PersistentVolume {
func listRBDPV(listPVs *corev1.PersistentVolumeList, cluster *cephv1.CephCluster, rbdVolumesInUse []string, opNamespace string) []corev1.PersistentVolume {
var listRbdPV []corev1.PersistentVolume

for _, pv := range listPVs.Items {
Expand All @@ -296,7 +338,7 @@ func listRBDPV(listPVs *corev1.PersistentVolumeList, cluster *cephv1.CephCluster
continue
}

if pv.Spec.CSI.Driver == fmt.Sprintf("%s.rbd.csi.ceph.com", cluster.Namespace) {
if pv.Spec.CSI.Driver == fmt.Sprintf("%s.rbd.csi.ceph.com", opNamespace) {
// Ignore PVs that support multinode access (RWX, ROX), since they can be mounted on multiple nodes.
if pvSupportsMultiNodeAccess(pv.Spec.AccessModes) {
continue
Expand All @@ -316,6 +358,35 @@ func listRBDPV(listPVs *corev1.PersistentVolumeList, cluster *cephv1.CephCluster
return listRbdPV
}

func listRWOCephFSPV(listPVs *corev1.PersistentVolumeList, cluster *cephv1.CephCluster, cephFSVolumesInUse map[string]struct{}, opNamespace string) []corev1.PersistentVolume {
var listCephFSPV []corev1.PersistentVolume

for _, pv := range listPVs.Items {
// Skip if pv is not provisioned by CSI
if pv.Spec.CSI == nil {
logger.Debugf("PV %q is not provisioned by CSI", pv.Name)
continue
}

if pv.Spec.CSI.Driver == fmt.Sprintf("%s.cephfs.csi.ceph.com", opNamespace) {
// Ignore PVs that support multinode access (RWX, ROX), since they can be mounted on multiple nodes.
if pvSupportsMultiNodeAccess(pv.Spec.AccessModes) {
continue
}

if pv.Spec.CSI.VolumeAttributes["staticVolume"] == "true" {
logger.Debugf("skipping, static PV %q", pv.Name)
continue
}
// Check if the volume is in use
if _, exists := cephFSVolumesInUse[pv.Spec.CSI.VolumeHandle]; exists {
listCephFSPV = append(listCephFSPV, pv)
}
}
}
return listCephFSPV
}

// pvSupportsMultiNodeAccess returns true if the PV access modes contain ReadWriteMany or ReadOnlyMany.
func pvSupportsMultiNodeAccess(accessModes []corev1.PersistentVolumeAccessMode) bool {
for _, accessMode := range accessModes {
Expand Down Expand Up @@ -358,6 +429,89 @@ func (c *clientCluster) fenceRbdImage(
return nil
}

func (c *clientCluster) fenceCephFSSubvolume(
ctx context.Context, node *corev1.Node, cluster *cephv1.CephCluster,
clusterInfo *cephclient.ClusterInfo, cephFSPV corev1.PersistentVolume) error {

logger.Infof("fencing cephfs subvolume %q on node %q", cephFSPV.Name, node.Name)

status, err := cephclient.StatusWithUser(c.context, clusterInfo)
if err != nil {
return pkgerror.Wrap(err, "failed to get ceph status for check active mds")
}

var activeMDS string
for _, fsRank := range status.Fsmap.ByRank {
if fsRank.Status == "up:active" {
activeMDS = fsRank.Name
}
}

args := []string{"tell", fmt.Sprintf("mds.%s", activeMDS), "client", "ls", "--format", "json"}
cmd := cephclient.NewCephCommand(c.context, clusterInfo, args)
cmd.JsonOutput = true

buf, err := cmd.Run()
if err != nil {
return fmt.Errorf("failed to list watchers for cephfs subvolumeName %s. %v", cephFSPV.Spec.CSI.VolumeAttributes["subvolumeName"], err)
}
ips, err := cephFSMDSClientMarshal(buf, node.Name, cephFSPV)
if err != nil {
return pkgerror.Wrap(err, "failed to unmarshal cephfs mds output")
}

if len(ips) == 0 {
logger.Infof("no active mds clients found for cephfs subvolume %q", cephFSPV.Name)
return errActiveClientNotFound
}

err = c.createNetworkFence(ctx, cephFSPV, node, cluster, ips, cephfsDriver)
if err != nil {
return pkgerror.Wrapf(err, "failed to create network fence for node %q", node.Name)
}

return nil
}

func cephFSMDSClientMarshal(output []byte, nodeName string, cephFSPV corev1.PersistentVolume) ([]string, error) {
type entity struct {
Addr struct {
Addr string `json:"addr"`
Nonce int `json:"nonce"`
} `json:"addr"`
}

type clientMetadata struct {
Root string `json:"root"`
Hostname string `json:"hostname"`
}

type cephFSData struct {
Entity entity `json:"entity"`
ClientMetadata clientMetadata `json:"client_metadata"`
}

var data []cephFSData
err := json.Unmarshal([]byte(output), &data)
if err != nil {
return []string{}, pkgerror.Wrapf(err, "failed to unmarshal cephFS data output")
}

watcherIPlist := []string{}
for _, d := range data {
if d.ClientMetadata.Hostname != "" {
if strings.Contains(nodeName, d.ClientMetadata.Hostname) && cephFSPV.Spec.CSI.VolumeAttributes["subvolumePath"] == d.ClientMetadata.Root {
logger.Infof("cephfs mds client ips to fence %v", d.Entity.Addr)
watcherIP := concatenateWatcherIp(d.Entity.Addr.Addr)
watcherIPlist = append(watcherIPlist, watcherIP)
break
}
}
}

return watcherIPlist, nil
}

func rbdStatusUnMarshal(output []byte) ([]string, error) {
type rbdStatus struct {
Watchers []struct {
Expand All @@ -380,7 +534,8 @@ func rbdStatusUnMarshal(output []byte) ([]string, error) {
}

func concatenateWatcherIp(address string) string {
// address is in format `10.63.0.5:0/1254753579` for rbd
// address is in format `10.63.0.5:0/1254753579` for rbd and
// in the format '10.244.0.12:0' for cephfs
// split with separation ':0' to remove nounce and concatenating `/32` to define a network with only one IP address
watcherIP := strings.Split(address, ":0")[0] + "/32"
return watcherIP
Expand Down
Loading

0 comments on commit 3ccdb53

Please sign in to comment.