Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Syncing latest changes from devel for ceph-csi #207

Merged
merged 4 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 7 additions & 44 deletions internal/cephfs/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@ func (cs *ControllerServer) ControllerExpandVolume(
// CreateSnapshot creates the snapshot in backend and stores metadata
// in store
//
//nolint:gocognit,gocyclo,cyclop // golangci-lint did not catch this earlier, needs to get fixed late
//nolint:gocyclo,cyclop // golangci-lint did not catch this earlier, needs to get fixed late
func (cs *ControllerServer) CreateSnapshot(
ctx context.Context,
req *csi.CreateSnapshotRequest,
Expand Down Expand Up @@ -830,29 +830,15 @@ func (cs *ControllerServer) CreateSnapshot(
}
defer cs.VolumeLocks.Release(sourceVolID)
snapName := req.GetName()
sid, snapInfo, err := store.CheckSnapExists(ctx, parentVolOptions, cephfsSnap, cs.ClusterName, cs.SetMetadata, cr)
sid, err := store.CheckSnapExists(ctx, parentVolOptions, cephfsSnap, cs.ClusterName, cs.SetMetadata, cr)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

// check are we able to retrieve the size of parent
// ceph fs subvolume info command got added in 14.2.10 and 15.+
// as we are not able to retrieve the parent size we are rejecting the
// request to create snapshot.
// TODO: For this purpose we could make use of cached clusterAdditionalInfo
// too.
volClient := core.NewSubVolume(parentVolOptions.GetConnection(), &parentVolOptions.SubVolume,
parentVolOptions.ClusterID, cs.ClusterName, cs.SetMetadata)
info, err := volClient.GetSubVolumeInfo(ctx)
if err != nil {
// Check error code value against ErrInvalidCommand to understand the cluster
// support it or not, It's safe to evaluate as the filtering
// is already done from GetSubVolumeInfo() and send out the error here.
if errors.Is(err, cerrors.ErrInvalidCommand) {
return nil, status.Error(
codes.FailedPrecondition,
"subvolume info command not supported in current ceph cluster")
}
if sid != nil {
errDefer := store.UndoSnapReservation(ctx, parentVolOptions, *sid, snapName, cr)
if errDefer != nil {
Expand All @@ -866,22 +852,11 @@ func (cs *ControllerServer) CreateSnapshot(

metadata := k8s.GetSnapshotMetadata(req.GetParameters())
if sid != nil {
// check snapshot is protected
protected := true
snapClient := core.NewSnapshot(parentVolOptions.GetConnection(), sid.FsSnapshotName,
parentVolOptions.ClusterID, cs.ClusterName, cs.SetMetadata, &parentVolOptions.SubVolume)
if !(snapInfo.Protected == core.SnapshotIsProtected) {
err = snapClient.ProtectSnapshot(ctx)
if err != nil {
protected = false
log.WarningLog(ctx, "failed to protect snapshot of snapshot: %s (%s)",
sid.FsSnapshotName, err)
}
}

// Update snapshot-name/snapshot-namespace/snapshotcontent-name details on
// subvolume snapshot as metadata in case snapshot already exist
if len(metadata) != 0 {
snapClient := core.NewSnapshot(parentVolOptions.GetConnection(), sid.FsSnapshotName,
parentVolOptions.ClusterID, cs.ClusterName, cs.SetMetadata, &parentVolOptions.SubVolume)
err = snapClient.SetAllSnapshotMetadata(metadata)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
Expand All @@ -894,7 +869,7 @@ func (cs *ControllerServer) CreateSnapshot(
SnapshotId: sid.SnapshotID,
SourceVolumeId: req.GetSourceVolumeId(),
CreationTime: sid.CreationTime,
ReadyToUse: protected,
ReadyToUse: true,
},
}, nil
}
Expand Down Expand Up @@ -968,10 +943,6 @@ func (cs *ControllerServer) doSnapshot(
return snap, fmt.Errorf("failed to get snapshot info for snapshot:%s", snapID)
}
snap.CreationTime = timestamppb.New(snap.CreatedAt)
err = snapClient.ProtectSnapshot(ctx)
if err != nil {
log.ErrorLog(ctx, "failed to protect snapshot %s %v", snapID, err)
}

// Set snapshot-name/snapshot-namespace/snapshotcontent-name details
// on subvolume snapshot as metadata on create
Expand Down Expand Up @@ -1006,8 +977,6 @@ func (cs *ControllerServer) validateSnapshotReq(ctx context.Context, req *csi.Cr

// DeleteSnapshot deletes the snapshot in backend and removes the
// snapshot metadata from store.
//
//nolint:gocyclo,cyclop // TODO: reduce complexity
func (cs *ControllerServer) DeleteSnapshot(
ctx context.Context,
req *csi.DeleteSnapshotRequest,
Expand Down Expand Up @@ -1100,14 +1069,6 @@ func (cs *ControllerServer) DeleteSnapshot(
if snapInfo.HasPendingClones == "yes" {
return nil, status.Errorf(codes.FailedPrecondition, "snapshot %s has pending clones", snapshotID)
}
snapClient := core.NewSnapshot(volOpt.GetConnection(), sid.FsSnapshotName,
volOpt.ClusterID, cs.ClusterName, cs.SetMetadata, &volOpt.SubVolume)
if snapInfo.Protected == core.SnapshotIsProtected {
err = snapClient.UnprotectSnapshot(ctx)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
}

needsDelete, err := store.UnrefSelfInSnapshotBackedVolumes(ctx, volOpt, sid.SnapshotID)
if err != nil {
Expand All @@ -1119,6 +1080,8 @@ func (cs *ControllerServer) DeleteSnapshot(
}

if needsDelete {
snapClient := core.NewSnapshot(volOpt.GetConnection(), sid.FsSnapshotName,
volOpt.ClusterID, cs.ClusterName, cs.SetMetadata, &volOpt.SubVolume)
err = deleteSnapshotAndUndoReservation(
ctx,
snapClient,
Expand Down
65 changes: 5 additions & 60 deletions internal/cephfs/core/clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package core

import (
"context"
"errors"
"fmt"

cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
Expand All @@ -34,11 +33,6 @@ type cephFSCloneState struct {
errorMsg string
}

const (
// SnapshotIsProtected string indicates that the snapshot is currently protected.
SnapshotIsProtected = "yes"
)

// CephFSCloneError indicates that fetching the clone state returned an error.
var CephFSCloneError = cephFSCloneState{}

Expand Down Expand Up @@ -73,43 +67,20 @@ func (s *subVolumeClient) CreateCloneFromSubvolume(

return err
}
var (
// if protectErr is not nil we will delete the snapshot as the protect fails
protectErr error
// if cloneErr is not nil we will unprotect the snapshot and delete the snapshot
cloneErr error
)
defer func() {
if protectErr != nil {
err = snapClient.DeleteSnapshot(ctx)
if err != nil {
log.ErrorLog(ctx, "failed to delete snapshot %s %v", snapshotID, err)
}
}

// if cloneErr is not nil we will delete the snapshot
var cloneErr error

defer func() {
if cloneErr != nil {
if err = s.PurgeVolume(ctx, true); err != nil {
log.ErrorLog(ctx, "failed to delete volume %s: %v", s.VolID, err)
}
if err = snapClient.UnprotectSnapshot(ctx); err != nil {
// In case the snap is already unprotected we get ErrSnapProtectionExist error code
// in that case we are safe and we could discard this error and we are good to go
// ahead with deletion
if !errors.Is(err, cerrors.ErrSnapProtectionExist) {
log.ErrorLog(ctx, "failed to unprotect snapshot %s %v", snapshotID, err)
}
}
if err = snapClient.DeleteSnapshot(ctx); err != nil {
log.ErrorLog(ctx, "failed to delete snapshot %s %v", snapshotID, err)
}
}
}()
protectErr = snapClient.ProtectSnapshot(ctx)
if protectErr != nil {
log.ErrorLog(ctx, "failed to protect snapshot %s %v", snapshotID, protectErr)

return protectErr
}
cloneErr = snapClient.CloneSnapshot(ctx, s.SubVolume)
if cloneErr != nil {
log.ErrorLog(ctx, "failed to clone snapshot %s %s to %s %v", parentvolOpt.VolID, snapshotID, s.VolID, cloneErr)
Expand Down Expand Up @@ -139,16 +110,6 @@ func (s *subVolumeClient) CreateCloneFromSubvolume(
}

// As we completed clone, remove the intermediate snap
if err = snapClient.UnprotectSnapshot(ctx); err != nil {
// In case the snap is already unprotected we get ErrSnapProtectionExist error code
// in that case we are safe and we could discard this error and we are good to go
// ahead with deletion
if !errors.Is(err, cerrors.ErrSnapProtectionExist) {
log.ErrorLog(ctx, "failed to unprotect snapshot %s %v", snapshotID, err)

return err
}
}
if err = snapClient.DeleteSnapshot(ctx); err != nil {
log.ErrorLog(ctx, "failed to delete snapshot %s %v", snapshotID, err)

Expand All @@ -166,24 +127,8 @@ func (s *subVolumeClient) CleanupSnapshotFromSubvolume(
// identified during PVC-PVC cloning.
snapShotID := s.VolID
snapClient := NewSnapshot(s.conn, snapShotID, s.clusterID, s.clusterName, s.enableMetadata, parentVol)
snapInfo, err := snapClient.GetSnapshotInfo(ctx)
if err != nil {
if errors.Is(err, cerrors.ErrSnapNotFound) {
return nil
}

return err
}

if snapInfo.Protected == SnapshotIsProtected {
err = snapClient.UnprotectSnapshot(ctx)
if err != nil {
log.ErrorLog(ctx, "failed to unprotect snapshot %s %v", snapShotID, err)

return err
}
}
err = snapClient.DeleteSnapshot(ctx)
err := snapClient.DeleteSnapshot(ctx)
if err != nil {
log.ErrorLog(ctx, "failed to delete snapshot %s %v", snapShotID, err)

Expand Down
81 changes: 0 additions & 81 deletions internal/cephfs/core/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,6 @@ import (
"github.com/golang/protobuf/ptypes/timestamp"
)

// autoProtect points to the snapshot auto-protect feature of
// the subvolume.
const (
autoProtect = "snapshot-autoprotect"
)

// SnapshotClient is the interface that holds the signature of snapshot methods
// that interacts with CephFS snapshot API's.
type SnapshotClient interface {
Expand All @@ -45,10 +39,6 @@ type SnapshotClient interface {
DeleteSnapshot(ctx context.Context) error
// GetSnapshotInfo returns the snapshot info of the subvolume.
GetSnapshotInfo(ctx context.Context) (SnapshotInfo, error)
// ProtectSnapshot protects the snapshot of the subvolume.
ProtectSnapshot(ctx context.Context) error
// UnprotectSnapshot unprotects the snapshot of the subvolume.
UnprotectSnapshot(ctx context.Context) error
// CloneSnapshot clones the snapshot of the subvolume.
CloneSnapshot(ctx context.Context, cloneVolOptions *SubVolume) error
// SetAllSnapshotMetadata set all the metadata from arg parameters on
Expand Down Expand Up @@ -139,7 +129,6 @@ type SnapshotInfo struct {
CreatedAt time.Time
CreationTime *timestamp.Timestamp
HasPendingClones string
Protected string
}

// GetSnapshotInfo returns the snapshot info of the subvolume.
Expand Down Expand Up @@ -169,80 +158,10 @@ func (s *snapshotClient) GetSnapshotInfo(ctx context.Context) (SnapshotInfo, err
}
snap.CreatedAt = info.CreatedAt.Time
snap.HasPendingClones = info.HasPendingClones
snap.Protected = info.Protected

return snap, nil
}

// ProtectSnapshot protects the snapshot of the subvolume.
func (s *snapshotClient) ProtectSnapshot(ctx context.Context) error {
// If "snapshot-autoprotect" feature is present, The ProtectSnapshot
// call should be treated as a no-op.
if checkSubvolumeHasFeature(autoProtect, s.Features) {
return nil
}
fsa, err := s.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin: %s", err)

return err
}

err = fsa.ProtectSubVolumeSnapshot(s.FsName, s.SubvolumeGroup, s.VolID, s.SnapshotID)
if err != nil {
if errors.Is(err, rados.ErrObjectExists) {
return nil
}
log.ErrorLog(
ctx,
"failed to protect subvolume snapshot %s %s in fs %s with error: %s",
s.VolID,
s.SnapshotID,
s.FsName,
err)

return err
}

return nil
}

// UnprotectSnapshot unprotects the snapshot of the subvolume.
func (s *snapshotClient) UnprotectSnapshot(ctx context.Context) error {
// If "snapshot-autoprotect" feature is present, The UnprotectSnapshot
// call should be treated as a no-op.
if checkSubvolumeHasFeature(autoProtect, s.Features) {
return nil
}
fsa, err := s.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin: %s", err)

return err
}

err = fsa.UnprotectSubVolumeSnapshot(s.FsName, s.SubvolumeGroup, s.VolID,
s.SnapshotID)
if err != nil {
// In case the snap is already unprotected we get ErrSnapProtectionExist error code
// in that case we are safe and we could discard this error.
if errors.Is(err, rados.ErrObjectExists) {
return nil
}
log.ErrorLog(
ctx,
"failed to unprotect subvolume snapshot %s %s in fs %s with error: %s",
s.VolID,
s.SnapshotID,
s.FsName,
err)

return err
}

return nil
}

// CloneSnapshot clones the snapshot of the subvolume.
func (s *snapshotClient) CloneSnapshot(
ctx context.Context,
Expand Down
3 changes: 0 additions & 3 deletions internal/cephfs/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ var (
// statically provisioned.
ErrNonStaticVolume = coreError.New("volume not static")

// ErrSnapProtectionExist is returned when the snapshot is already protected.
ErrSnapProtectionExist = coreError.New("snapshot protection already exists")

// ErrSnapNotFound is returned when snap name passed is not found in the list
// of snapshots for the given image.
ErrSnapNotFound = coreError.New("snapshot not found")
Expand Down
Loading