Skip to content

Commit

Permalink
Merge pull request #207 from red-hat-storage/sync_ds--devel
Browse files Browse the repository at this point in the history
Syncing latest changes from devel for ceph-csi
  • Loading branch information
openshift-ci[bot] authored Nov 6, 2023
2 parents 43688ed + ef69b84 commit eafc5ab
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 196 deletions.
51 changes: 7 additions & 44 deletions internal/cephfs/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@ func (cs *ControllerServer) ControllerExpandVolume(
// CreateSnapshot creates the snapshot in backend and stores metadata
// in store
//
//nolint:gocognit,gocyclo,cyclop // golangci-lint did not catch this earlier, needs to get fixed late
//nolint:gocyclo,cyclop // golangci-lint did not catch this earlier, needs to get fixed late
func (cs *ControllerServer) CreateSnapshot(
ctx context.Context,
req *csi.CreateSnapshotRequest,
Expand Down Expand Up @@ -830,29 +830,15 @@ func (cs *ControllerServer) CreateSnapshot(
}
defer cs.VolumeLocks.Release(sourceVolID)
snapName := req.GetName()
sid, snapInfo, err := store.CheckSnapExists(ctx, parentVolOptions, cephfsSnap, cs.ClusterName, cs.SetMetadata, cr)
sid, err := store.CheckSnapExists(ctx, parentVolOptions, cephfsSnap, cs.ClusterName, cs.SetMetadata, cr)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

// check are we able to retrieve the size of parent
// ceph fs subvolume info command got added in 14.2.10 and 15.+
// as we are not able to retrieve the parent size we are rejecting the
// request to create snapshot.
// TODO: For this purpose we could make use of cached clusterAdditionalInfo
// too.
volClient := core.NewSubVolume(parentVolOptions.GetConnection(), &parentVolOptions.SubVolume,
parentVolOptions.ClusterID, cs.ClusterName, cs.SetMetadata)
info, err := volClient.GetSubVolumeInfo(ctx)
if err != nil {
// Check error code value against ErrInvalidCommand to understand the cluster
// support it or not, It's safe to evaluate as the filtering
// is already done from GetSubVolumeInfo() and send out the error here.
if errors.Is(err, cerrors.ErrInvalidCommand) {
return nil, status.Error(
codes.FailedPrecondition,
"subvolume info command not supported in current ceph cluster")
}
if sid != nil {
errDefer := store.UndoSnapReservation(ctx, parentVolOptions, *sid, snapName, cr)
if errDefer != nil {
Expand All @@ -866,22 +852,11 @@ func (cs *ControllerServer) CreateSnapshot(

metadata := k8s.GetSnapshotMetadata(req.GetParameters())
if sid != nil {
// check snapshot is protected
protected := true
snapClient := core.NewSnapshot(parentVolOptions.GetConnection(), sid.FsSnapshotName,
parentVolOptions.ClusterID, cs.ClusterName, cs.SetMetadata, &parentVolOptions.SubVolume)
if !(snapInfo.Protected == core.SnapshotIsProtected) {
err = snapClient.ProtectSnapshot(ctx)
if err != nil {
protected = false
log.WarningLog(ctx, "failed to protect snapshot of snapshot: %s (%s)",
sid.FsSnapshotName, err)
}
}

// Update snapshot-name/snapshot-namespace/snapshotcontent-name details on
// subvolume snapshot as metadata in case snapshot already exist
if len(metadata) != 0 {
snapClient := core.NewSnapshot(parentVolOptions.GetConnection(), sid.FsSnapshotName,
parentVolOptions.ClusterID, cs.ClusterName, cs.SetMetadata, &parentVolOptions.SubVolume)
err = snapClient.SetAllSnapshotMetadata(metadata)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
Expand All @@ -894,7 +869,7 @@ func (cs *ControllerServer) CreateSnapshot(
SnapshotId: sid.SnapshotID,
SourceVolumeId: req.GetSourceVolumeId(),
CreationTime: sid.CreationTime,
ReadyToUse: protected,
ReadyToUse: true,
},
}, nil
}
Expand Down Expand Up @@ -968,10 +943,6 @@ func (cs *ControllerServer) doSnapshot(
return snap, fmt.Errorf("failed to get snapshot info for snapshot:%s", snapID)
}
snap.CreationTime = timestamppb.New(snap.CreatedAt)
err = snapClient.ProtectSnapshot(ctx)
if err != nil {
log.ErrorLog(ctx, "failed to protect snapshot %s %v", snapID, err)
}

// Set snapshot-name/snapshot-namespace/snapshotcontent-name details
// on subvolume snapshot as metadata on create
Expand Down Expand Up @@ -1006,8 +977,6 @@ func (cs *ControllerServer) validateSnapshotReq(ctx context.Context, req *csi.Cr

// DeleteSnapshot deletes the snapshot in backend and removes the
// snapshot metadata from store.
//
//nolint:gocyclo,cyclop // TODO: reduce complexity
func (cs *ControllerServer) DeleteSnapshot(
ctx context.Context,
req *csi.DeleteSnapshotRequest,
Expand Down Expand Up @@ -1100,14 +1069,6 @@ func (cs *ControllerServer) DeleteSnapshot(
if snapInfo.HasPendingClones == "yes" {
return nil, status.Errorf(codes.FailedPrecondition, "snapshot %s has pending clones", snapshotID)
}
snapClient := core.NewSnapshot(volOpt.GetConnection(), sid.FsSnapshotName,
volOpt.ClusterID, cs.ClusterName, cs.SetMetadata, &volOpt.SubVolume)
if snapInfo.Protected == core.SnapshotIsProtected {
err = snapClient.UnprotectSnapshot(ctx)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
}

needsDelete, err := store.UnrefSelfInSnapshotBackedVolumes(ctx, volOpt, sid.SnapshotID)
if err != nil {
Expand All @@ -1119,6 +1080,8 @@ func (cs *ControllerServer) DeleteSnapshot(
}

if needsDelete {
snapClient := core.NewSnapshot(volOpt.GetConnection(), sid.FsSnapshotName,
volOpt.ClusterID, cs.ClusterName, cs.SetMetadata, &volOpt.SubVolume)
err = deleteSnapshotAndUndoReservation(
ctx,
snapClient,
Expand Down
65 changes: 5 additions & 60 deletions internal/cephfs/core/clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package core

import (
"context"
"errors"
"fmt"

cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
Expand All @@ -34,11 +33,6 @@ type cephFSCloneState struct {
errorMsg string
}

const (
// SnapshotIsProtected string indicates that the snapshot is currently protected.
SnapshotIsProtected = "yes"
)

// CephFSCloneError indicates that fetching the clone state returned an error.
var CephFSCloneError = cephFSCloneState{}

Expand Down Expand Up @@ -73,43 +67,20 @@ func (s *subVolumeClient) CreateCloneFromSubvolume(

return err
}
var (
// if protectErr is not nil we will delete the snapshot as the protect fails
protectErr error
// if cloneErr is not nil we will unprotect the snapshot and delete the snapshot
cloneErr error
)
defer func() {
if protectErr != nil {
err = snapClient.DeleteSnapshot(ctx)
if err != nil {
log.ErrorLog(ctx, "failed to delete snapshot %s %v", snapshotID, err)
}
}

// if cloneErr is not nil we will delete the snapshot
var cloneErr error

defer func() {
if cloneErr != nil {
if err = s.PurgeVolume(ctx, true); err != nil {
log.ErrorLog(ctx, "failed to delete volume %s: %v", s.VolID, err)
}
if err = snapClient.UnprotectSnapshot(ctx); err != nil {
// In case the snap is already unprotected we get ErrSnapProtectionExist error code
// in that case we are safe and we could discard this error and we are good to go
// ahead with deletion
if !errors.Is(err, cerrors.ErrSnapProtectionExist) {
log.ErrorLog(ctx, "failed to unprotect snapshot %s %v", snapshotID, err)
}
}
if err = snapClient.DeleteSnapshot(ctx); err != nil {
log.ErrorLog(ctx, "failed to delete snapshot %s %v", snapshotID, err)
}
}
}()
protectErr = snapClient.ProtectSnapshot(ctx)
if protectErr != nil {
log.ErrorLog(ctx, "failed to protect snapshot %s %v", snapshotID, protectErr)

return protectErr
}
cloneErr = snapClient.CloneSnapshot(ctx, s.SubVolume)
if cloneErr != nil {
log.ErrorLog(ctx, "failed to clone snapshot %s %s to %s %v", parentvolOpt.VolID, snapshotID, s.VolID, cloneErr)
Expand Down Expand Up @@ -139,16 +110,6 @@ func (s *subVolumeClient) CreateCloneFromSubvolume(
}

// As we completed clone, remove the intermediate snap
if err = snapClient.UnprotectSnapshot(ctx); err != nil {
// In case the snap is already unprotected we get ErrSnapProtectionExist error code
// in that case we are safe and we could discard this error and we are good to go
// ahead with deletion
if !errors.Is(err, cerrors.ErrSnapProtectionExist) {
log.ErrorLog(ctx, "failed to unprotect snapshot %s %v", snapshotID, err)

return err
}
}
if err = snapClient.DeleteSnapshot(ctx); err != nil {
log.ErrorLog(ctx, "failed to delete snapshot %s %v", snapshotID, err)

Expand All @@ -166,24 +127,8 @@ func (s *subVolumeClient) CleanupSnapshotFromSubvolume(
// identified during PVC-PVC cloning.
snapShotID := s.VolID
snapClient := NewSnapshot(s.conn, snapShotID, s.clusterID, s.clusterName, s.enableMetadata, parentVol)
snapInfo, err := snapClient.GetSnapshotInfo(ctx)
if err != nil {
if errors.Is(err, cerrors.ErrSnapNotFound) {
return nil
}

return err
}

if snapInfo.Protected == SnapshotIsProtected {
err = snapClient.UnprotectSnapshot(ctx)
if err != nil {
log.ErrorLog(ctx, "failed to unprotect snapshot %s %v", snapShotID, err)

return err
}
}
err = snapClient.DeleteSnapshot(ctx)
err := snapClient.DeleteSnapshot(ctx)
if err != nil {
log.ErrorLog(ctx, "failed to delete snapshot %s %v", snapShotID, err)

Expand Down
81 changes: 0 additions & 81 deletions internal/cephfs/core/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,6 @@ import (
"github.com/golang/protobuf/ptypes/timestamp"
)

// autoProtect points to the snapshot auto-protect feature of
// the subvolume.
const (
autoProtect = "snapshot-autoprotect"
)

// SnapshotClient is the interface that holds the signature of snapshot methods
// that interacts with CephFS snapshot API's.
type SnapshotClient interface {
Expand All @@ -45,10 +39,6 @@ type SnapshotClient interface {
DeleteSnapshot(ctx context.Context) error
// GetSnapshotInfo returns the snapshot info of the subvolume.
GetSnapshotInfo(ctx context.Context) (SnapshotInfo, error)
// ProtectSnapshot protects the snapshot of the subvolume.
ProtectSnapshot(ctx context.Context) error
// UnprotectSnapshot unprotects the snapshot of the subvolume.
UnprotectSnapshot(ctx context.Context) error
// CloneSnapshot clones the snapshot of the subvolume.
CloneSnapshot(ctx context.Context, cloneVolOptions *SubVolume) error
// SetAllSnapshotMetadata set all the metadata from arg parameters on
Expand Down Expand Up @@ -139,7 +129,6 @@ type SnapshotInfo struct {
CreatedAt time.Time
CreationTime *timestamp.Timestamp
HasPendingClones string
Protected string
}

// GetSnapshotInfo returns the snapshot info of the subvolume.
Expand Down Expand Up @@ -169,80 +158,10 @@ func (s *snapshotClient) GetSnapshotInfo(ctx context.Context) (SnapshotInfo, err
}
snap.CreatedAt = info.CreatedAt.Time
snap.HasPendingClones = info.HasPendingClones
snap.Protected = info.Protected

return snap, nil
}

// ProtectSnapshot protects the snapshot of the subvolume.
func (s *snapshotClient) ProtectSnapshot(ctx context.Context) error {
// If "snapshot-autoprotect" feature is present, The ProtectSnapshot
// call should be treated as a no-op.
if checkSubvolumeHasFeature(autoProtect, s.Features) {
return nil
}
fsa, err := s.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin: %s", err)

return err
}

err = fsa.ProtectSubVolumeSnapshot(s.FsName, s.SubvolumeGroup, s.VolID, s.SnapshotID)
if err != nil {
if errors.Is(err, rados.ErrObjectExists) {
return nil
}
log.ErrorLog(
ctx,
"failed to protect subvolume snapshot %s %s in fs %s with error: %s",
s.VolID,
s.SnapshotID,
s.FsName,
err)

return err
}

return nil
}

// UnprotectSnapshot unprotects the snapshot of the subvolume.
func (s *snapshotClient) UnprotectSnapshot(ctx context.Context) error {
// If "snapshot-autoprotect" feature is present, The UnprotectSnapshot
// call should be treated as a no-op.
if checkSubvolumeHasFeature(autoProtect, s.Features) {
return nil
}
fsa, err := s.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin: %s", err)

return err
}

err = fsa.UnprotectSubVolumeSnapshot(s.FsName, s.SubvolumeGroup, s.VolID,
s.SnapshotID)
if err != nil {
// In case the snap is already unprotected we get ErrSnapProtectionExist error code
// in that case we are safe and we could discard this error.
if errors.Is(err, rados.ErrObjectExists) {
return nil
}
log.ErrorLog(
ctx,
"failed to unprotect subvolume snapshot %s %s in fs %s with error: %s",
s.VolID,
s.SnapshotID,
s.FsName,
err)

return err
}

return nil
}

// CloneSnapshot clones the snapshot of the subvolume.
func (s *snapshotClient) CloneSnapshot(
ctx context.Context,
Expand Down
3 changes: 0 additions & 3 deletions internal/cephfs/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ var (
// statically provisioned.
ErrNonStaticVolume = coreError.New("volume not static")

// ErrSnapProtectionExist is returned when the snapshot is already protected.
ErrSnapProtectionExist = coreError.New("snapshot protection already exists")

// ErrSnapNotFound is returned when snap name passed is not found in the list
// of snapshots for the given image.
ErrSnapNotFound = coreError.New("snapshot not found")
Expand Down
Loading

0 comments on commit eafc5ab

Please sign in to comment.