Skip to content

Commit

Permalink
Add a workaround volume suspension for statefulset startup (#820)
Browse files Browse the repository at this point in the history
As per the k8s limitation, the pod will come up only when all of its
volumes are successfully mounted. So, the pod will remain in
"ContainerCreating" state forever if the drives has gone faulty or
detached from the node.

By suspending such faulty volumes, DirectPV can fake the CSI calls which
will allow pods to come up with these mocked volumes and the pod
restarts will be resilient.

Mocked volumes will be mounted as "read-only" and will be treated as
"offline" volumes.

Fixes #578
  • Loading branch information
Praveenrajmani authored Aug 16, 2023
1 parent 8efdfac commit 3a1fc0c
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 32 deletions.
11 changes: 11 additions & 0 deletions cmd/directpv/node-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ var nodeServerCmd = &cobra.Command{
if err := sys.Mkdir(consts.MountRootDir, 0o755); err != nil && !errors.Is(err, os.ErrExist) {
return err
}
if err := mountTempDir(); err != nil {
klog.ErrorS(err, "unable to make tmpfs mount", "Target", consts.TmpMountDir)
}
if err := device.Sync(c.Context(), nodeID); err != nil {
return err
}
Expand Down Expand Up @@ -113,3 +116,11 @@ func startNodeServer(ctx context.Context) error {

return <-errCh
}

func mountTempDir() error {
if err := sys.Mkdir(consts.TmpMountDir, 0o777); err != nil && !errors.Is(err, os.ErrExist) {
return err
}
// This read-only tmp mount will be used for suspended volumes.
return sys.Mount("tmpfs", consts.TmpMountDir, "tmpfs", []string{"ro"}, "size=1")
}
12 changes: 12 additions & 0 deletions docs/volume-management.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,15 @@ $ kubectl directpv clean --all
```

Refer [clean command](./command-reference.md#clean-command) for more information.

## Suspend volumes

***CAUTION: THIS IS DANGEROUS OPERATION WHICH LEADS TO DATA LOSS***

By Kubernetes design, [StatefulSet](https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/) workload is active only if all of its pods are in running state. Any faulty volumes will prevent the statefulset from starting up. DirectPV provides a workaround to suspend failed volumes by mounting them on empty `/var/lib/directpv/tmp` directory with read-only access. This can be done by adding `directpv.min.io/suspend: "true"` label on the respective volumes. Below is an example:

```sh
> kubectl directpv label volumes --nodes node-1 --drives dm-3 directpv.min.io/suspend=true
```

This label can be removed once the failed volumes are fixed. Upon removal, the volumes will resume using the respective allocated drives.
26 changes: 26 additions & 0 deletions functests/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,32 @@ function test_force_delete() {
done
}

# usage: test_volume_supending <plugin>
function test_volume_supending() {
if ! is_github_workflow; then
return
fi

echo "* Testing volume suspending"

directpv_client="$1"

"${directpv_client}" label volumes --drives "${LV_DEVICE}" directpv.min.io/suspend=true

running_count=0
required_count=$(kubectl get pods --field-selector=status.phase=Running --no-headers 2>/dev/null | grep -c '^minio-' || true)

kubectl delete pods --all --force

while [[ $running_count -lt $required_count ]]; do
echo " ...waiting for $(( required_count - running_count )) minio pods to come up after suspending the volumes"
sleep 30
running_count=$(kubectl get pods --field-selector=status.phase=Running --no-headers 2>/dev/null | grep -c '^minio-' || true)
done

"${directpv_client}" label volumes --drives "${LV_DEVICE}" directpv.min.io/suspend-
}

# usage: delete_minio <minio-yaml>
function delete_minio() {
echo "* Deleting minio"
Expand Down
2 changes: 2 additions & 0 deletions functests/tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ function run_tests() {
add_drives "${DIRECTPV_DIR}/kubectl-directpv"
deploy_minio minio.yaml
test_force_delete
test_volume_supending "${DIRECTPV_DIR}/kubectl-directpv"
uninstall_minio "${DIRECTPV_DIR}/kubectl-directpv" minio.yaml
test_volume_expansion "${DIRECTPV_DIR}/kubectl-directpv" sleep.yaml
remove_drives "${DIRECTPV_DIR}/kubectl-directpv"
uninstall_directpv "${DIRECTPV_DIR}/kubectl-directpv" "${pod_count}"
unmount_directpv
remove_luks
remove_lvm
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/directpv.min.io/types/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ const (

// RequestIDLabelKey label key for request ID
RequestIDLabelKey LabelKey = consts.GroupName + "/request-id"

// SuspendLabelKey denotes if the volume is suspended.
SuspendLabelKey LabelKey = consts.GroupName + "/suspend"
)

// LabelValue is a type definition for label value
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/directpv.min.io/v1beta1/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package v1beta1

import (
"strconv"

"github.com/minio/directpv/pkg/apis/directpv.min.io/types"
"github.com/minio/directpv/pkg/consts"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -286,6 +288,11 @@ func (volume DirectPVVolume) GetTenantName() string {
return string(volume.getLabel(types.LabelKey(Group + "/tenant")))
}

// IsSuspended returns if the volume is suspended.
func (volume DirectPVVolume) IsSuspended() bool {
return string(volume.getLabel(types.SuspendLabelKey)) == strconv.FormatBool(true)
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// DirectPVVolumeList denotes list of volumes.
Expand Down
3 changes: 3 additions & 0 deletions pkg/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,7 @@ const (

LegacyNodeServerName = "legacy-node-server"
LegacyControllerServerName = "legacy-controller"

// TmpFS mount
TmpMountDir = AppRootDir + "/tmp"
)
3 changes: 3 additions & 0 deletions pkg/consts/consts.go.in
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,7 @@ const (

LegacyNodeServerName = "legacy-node-server"
LegacyControllerServerName = "legacy-controller"

// TmpFS mount
TmpMountDir = AppRootDir + "/tmp"
)
71 changes: 39 additions & 32 deletions pkg/csi/node/publish_unpublish.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@ import (
"fmt"
"os"
"strings"
"syscall"

"github.com/container-storage-interface/spec/lib/go/csi"
directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
"github.com/minio/directpv/pkg/client"
"github.com/minio/directpv/pkg/consts"
"github.com/minio/directpv/pkg/drive"
"github.com/minio/directpv/pkg/k8s"
"github.com/minio/directpv/pkg/types"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -101,17 +99,21 @@ func (server *Server) NodePublishVolume(ctx context.Context, req *csi.NodePublis
return nil, status.Error(codes.InvalidArgument, "target path must not be empty")
}

podName, podNS, podLabels := getPodInfo(ctx, req)

volume, err := client.VolumeClient().Get(ctx, req.GetVolumeId(), metav1.GetOptions{TypeMeta: types.NewVolumeTypeMeta()})
if err != nil {
return nil, status.Error(codes.NotFound, err.Error())
}

if volume.Status.StagingTargetPath != req.GetStagingTargetPath() {
if !volume.IsSuspended() && volume.Status.StagingTargetPath != req.GetStagingTargetPath() {
return nil, status.Errorf(codes.FailedPrecondition, "volume %v is not yet staged, but requested with %v", volume.Name, req.GetStagingTargetPath())
}

if err := server.publishVolume(req, volume.IsSuspended()); err != nil {
klog.Errorf("unable to publish volume %s; %v", volume.Name, err)
return nil, status.Errorf(codes.Internal, "unable to publish volume; %v", err)
}

podName, podNS, podLabels := getPodInfo(ctx, req)
volume.SetPodName(podName)
volume.SetPodNS(podNS)
for key, value := range podLabels {
Expand All @@ -120,46 +122,51 @@ func (server *Server) NodePublishVolume(ctx context.Context, req *csi.NodePublis
}
}

mountPointMap, _, err := server.getMounts()
volume.Status.TargetPath = req.GetTargetPath()
_, err = client.VolumeClient().Update(ctx, volume, metav1.UpdateOptions{
TypeMeta: types.NewVolumeTypeMeta(),
})
if err != nil {
klog.ErrorS(err, "unable to get mounts")
return nil, status.Error(codes.Internal, err.Error())
return nil, status.Errorf(codes.Internal, "unable to update volume: %v", err)
}

stagingTargetPathDevices, found := mountPointMap[req.GetStagingTargetPath()]
if !found {
klog.Errorf("stagingPath %v is not mounted", req.GetStagingTargetPath())
return nil, status.Error(codes.Internal, fmt.Sprintf("stagingPath %v is not mounted", req.GetStagingTargetPath()))
}
return &csi.NodePublishVolumeResponse{}, nil
}

func (server *Server) publishVolume(req *csi.NodePublishVolumeRequest, isSuspended bool) error {
if err := server.mkdir(req.GetTargetPath()); err != nil && !errors.Is(err, os.ErrExist) {
if errors.Unwrap(err) == syscall.EIO {
if err := drive.SetIOError(ctx, volume.GetDriveID()); err != nil {
return nil, status.Errorf(codes.Internal, "unable to set drive error; %v", err)
}
return fmt.Errorf("unable to create target path; %v", err)
}
mountPointMap, _, err := server.getMounts()
if err != nil {
return err
}
if isSuspended {
// Suspended volumes will be bind-mounted as read-only to tmpfs mount (/var/lib/directpv/tmp).
if _, found := mountPointMap[consts.TmpMountDir]; !found {
return fmt.Errorf("%v is not mounted; restart this node server", consts.TmpMountDir)
}
if targetPathDevices, found := mountPointMap[req.GetTargetPath()]; found && targetPathDevices.Exist("tmpfs") {
klog.V(5).InfoS("stagingTargetPath is already bind-mounted to tmpfs mount", "stagingTargetPath", req.GetStagingTargetPath(), "targetPath", req.GetTargetPath())
return nil
}
klog.ErrorS(err, "unable to create target path", "TargetPath", req.GetTargetPath())
return nil, status.Errorf(codes.Internal, "unable to create target path: %v", err)
if err := server.bindMount(consts.TmpMountDir, req.GetTargetPath(), true); err != nil {
return fmt.Errorf("unable to bind mount target path %v to %v; %v", req.GetTargetPath(), consts.TmpMountDir, err)
}
return nil
}
stagingTargetPathDevices, found := mountPointMap[req.GetStagingTargetPath()]
if !found {
return fmt.Errorf("stagingPath %v is not mounted", req.GetStagingTargetPath())
}

if targetPathDevices, found := mountPointMap[req.GetTargetPath()]; found && targetPathDevices.Equal(stagingTargetPathDevices) {
klog.V(5).InfoS("stagingTargetPath is already bind-mounted to targetPath", "stagingTargetPath", req.GetStagingTargetPath(), "targetPath", req.GetTargetPath())
} else {
if err := server.bindMount(req.GetStagingTargetPath(), req.GetTargetPath(), req.GetReadonly()); err != nil {
klog.ErrorS(err, "unable to bind mount staging target path to target path", "StagingTargetPath", req.GetStagingTargetPath(), "TargetPath", req.GetTargetPath())
return nil, status.Errorf(codes.Internal, "unable to bind mount staging target path to target path; %v", err)
return fmt.Errorf("unable to bind mount staging target path to target path; %v", err)
}
}

volume.Status.TargetPath = req.GetTargetPath()
_, err = client.VolumeClient().Update(ctx, volume, metav1.UpdateOptions{
TypeMeta: types.NewVolumeTypeMeta(),
})
if err != nil {
return nil, status.Errorf(codes.Internal, "unable to update volume: %v", err)
}

return &csi.NodePublishVolumeResponse{}, nil
return nil
}

// NodeUnpublishVolume is node unpublish volume handler.
Expand Down
5 changes: 5 additions & 0 deletions pkg/csi/node/stage_unstage.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ func (server *Server) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
return nil, status.Error(codes.NotFound, err.Error())
}

if volume.IsSuspended() {
// Suspended volumes doesn't require staging.
return &csi.NodeStageVolumeResponse{}, nil
}

code, err := drive.StageVolume(
ctx,
volume,
Expand Down

0 comments on commit 3a1fc0c

Please sign in to comment.