Skip to content

Commit

Permalink
Merge pull request #338 from red-hat-storage/sync_us--devel
Browse files Browse the repository at this point in the history
Syncing latest changes from upstream devel for ceph-csi
  • Loading branch information
openshift-merge-bot[bot] authored Jul 12, 2024
2 parents 5ddb243 + e7762ac commit 748ffd4
Show file tree
Hide file tree
Showing 16 changed files with 2,241 additions and 141 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/ceph/ceph-csi/api v0.0.0-00010101000000-000000000000
github.com/ceph/go-ceph v0.28.0
github.com/container-storage-interface/spec v1.10.0
github.com/csi-addons/spec v0.2.1-0.20240619103729-12c61f25a2a5
github.com/csi-addons/spec v0.2.1-0.20240627093359-0dd74d521e67
github.com/gemalto/kmip-go v0.0.10
github.com/golang/protobuf v1.5.4
github.com/google/fscrypt v0.3.6-0.20240502174735-068b9f8f5dec
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -911,8 +911,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46t
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4=
github.com/csi-addons/spec v0.2.1-0.20240619103729-12c61f25a2a5 h1:/pXa+X+YKDPRI2JG8WEnxGKk6PcVZRhcLqdPks+bQa8=
github.com/csi-addons/spec v0.2.1-0.20240619103729-12c61f25a2a5/go.mod h1:Mwq4iLiUV4s+K1bszcWU6aMsR5KPsbIYzzszJ6+56vI=
github.com/csi-addons/spec v0.2.1-0.20240627093359-0dd74d521e67 h1:UAcAhE1pTkWaFBS0kvhHUcUsoEv5fsieD0tl8psQMCs=
github.com/csi-addons/spec v0.2.1-0.20240627093359-0dd74d521e67/go.mod h1:Mwq4iLiUV4s+K1bszcWU6aMsR5KPsbIYzzszJ6+56vI=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
Expand Down
65 changes: 62 additions & 3 deletions internal/cephfs/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"os"
"path"
"strings"
"syscall"
"time"

cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
"github.com/ceph/ceph-csi/internal/cephfs/mounter"
Expand Down Expand Up @@ -127,15 +129,72 @@ func maybeUnlockFileEncryption(
stagingTargetPath string,
volID fsutil.VolumeID,
) error {
if volOptions.IsEncrypted() {
log.DebugLog(ctx, "cephfs: unlocking fscrypt on volume %q path %s", volID, stagingTargetPath)
if !volOptions.IsEncrypted() {
return nil
}

// Define Mutex Lock variables
lockName := string(volID) + "-mutexLock"
lockDesc := "Lock for " + string(volID)
lockDuration := 150 * time.Second
// Generate a consistent lock cookie for the client using hostname and process ID
lockCookie := generateLockCookie()
var flags byte = 0

log.DebugLog(ctx, "Creating lock for the following volume ID %s", volID)

ioctx, err := volOptions.GetConnection().GetIoctx(volOptions.MetadataPool)
if err != nil {
log.ErrorLog(ctx, "Failed to create ioctx: %s", err)

return fscrypt.Unlock(ctx, volOptions.Encryption, stagingTargetPath, string(volID))
return err
}
defer ioctx.Destroy()

res, err := ioctx.LockExclusive(volOptions.VolID, lockName, lockCookie, lockDesc, lockDuration, &flags)
if res != 0 {
switch res {
case -int(syscall.EBUSY):
return fmt.Errorf("Lock is already held by another client and cookie pair for %v volume", volID)
case -int(syscall.EEXIST):
return fmt.Errorf("Lock is already held by the same client and cookie pair for %v volume", volID)
default:
return fmt.Errorf("Failed to lock volume ID %v: %w", volID, err)
}
}
log.DebugLog(ctx, "Lock successfully created for volume ID %s", volID)

log.DebugLog(ctx, "cephfs: unlocking fscrypt on volume %q path %s", volID, stagingTargetPath)
err = fscrypt.Unlock(ctx, volOptions.Encryption, stagingTargetPath, string(volID))
if err != nil {
return err
}

ret, err := ioctx.Unlock(string(volID), lockName, lockCookie)
switch ret {
case 0:
log.DebugLog(ctx, "Lock %s successfully released ", lockName)
case -int(syscall.ENOENT):
log.DebugLog(ctx, "Lock is not held by the specified %s, %s pair", lockCookie, lockName)
default:
log.ErrorLog(ctx, "Failed to release following lock, this will lead to orphan lock %s: %v",
lockName, err)
}

return nil
}

// generateLockCookie generates a consistent lock cookie for the client.
func generateLockCookie() string {
hostname, err := os.Hostname()
if err != nil {
hostname = "unknown-host"
}
pid := os.Getpid()

return fmt.Sprintf("%s-%d", hostname, pid)
}

// maybeInitializeFileEncryption initializes KMS and node specifics, if volContext enables encryption.
func maybeInitializeFileEncryption(
ctx context.Context,
Expand Down
18 changes: 18 additions & 0 deletions internal/csi-addons/rbd/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,24 @@ func (is *IdentityServer) GetCapabilities(
Type: identity.Capability_VolumeReplication_VOLUME_REPLICATION,
},
},
}, &identity.Capability{
Type: &identity.Capability_VolumeGroup_{
VolumeGroup: &identity.Capability_VolumeGroup{
Type: identity.Capability_VolumeGroup_VOLUME_GROUP,
},
},
}, &identity.Capability{
Type: &identity.Capability_VolumeGroup_{
VolumeGroup: &identity.Capability_VolumeGroup{
Type: identity.Capability_VolumeGroup_DO_NOT_ALLOW_VG_TO_DELETE_VOLUMES,
},
},
}, &identity.Capability{
Type: &identity.Capability_VolumeGroup_{
VolumeGroup: &identity.Capability_VolumeGroup{
Type: identity.Capability_VolumeGroup_LIMIT_VOLUME_TO_ONE_VOLUME_GROUP,
},
},
})
}

Expand Down
204 changes: 204 additions & 0 deletions internal/csi-addons/rbd/volumegroup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
Copyright 2024 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package rbd

import (
"context"
"fmt"

"github.com/ceph/ceph-csi/internal/rbd"
"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util/log"

"github.com/csi-addons/spec/lib/go/volumegroup"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// VolumeGroupServer struct of rbd CSI driver with supported methods of
// VolumeGroup controller server spec.
type VolumeGroupServer struct {
// added UnimplementedControllerServer as a member of ControllerServer.
// if volumegroup spec add more RPC services in the proto file, then we
// don't need to add all RPC methods leading to forward compatibility.
*volumegroup.UnimplementedControllerServer
}

// NewVolumeGroupServer creates a new VolumeGroupServer which handles the
// VolumeGroup Service requests from the CSI-Addons specification.
func NewVolumeGroupServer() *VolumeGroupServer {
return &VolumeGroupServer{}
}

func (vs *VolumeGroupServer) RegisterService(server grpc.ServiceRegistrar) {
volumegroup.RegisterControllerServer(server, vs)
}

// CreateVolumeGroup RPC call to create a volume group.
//
// From the spec:
// This RPC will be called by the CO to create a new volume group on behalf of
// a user. This operation MUST be idempotent. If a volume group corresponding
// to the specified volume group name already exists, is compatible with the
// specified parameters in the CreateVolumeGroupRequest, the Plugin MUST reply
// 0 OK with the corresponding CreateVolumeGroupResponse. CSI Plugins MAY
// create the following types of volume groups:
//
// Create a new empty volume group or a group with specific volumes. Note that
// N volumes with some backend label Y could be considered to be in "group Y"
// which might not be a physical group on the storage backend. In this case, an
// empty group can still be created by the CO to hold volumes. After the empty
// group is created, create a new volume. CO may call
// ModifyVolumeGroupMembership to add new volumes to the group.
//
// Implementation steps:
// 1. resolve all volumes given in the volume_ids list (can be empty)
// 2. create the Volume Group
// 3. add all volumes to the Volume Group
//
// Idempotency should be handled by the rbd.Manager, keeping this function and
// the potential error handling as simple as possible.
func (vs *VolumeGroupServer) CreateVolumeGroup(
ctx context.Context,
req *volumegroup.CreateVolumeGroupRequest,
) (*volumegroup.CreateVolumeGroupResponse, error) {
mgr := rbd.NewManager(req.GetParameters(), req.GetSecrets())
defer mgr.Destroy(ctx)

// resolve all volumes
volumes := make([]types.Volume, len(req.GetVolumeIds()))
for i, id := range req.GetVolumeIds() {
vol, err := mgr.GetVolumeByID(ctx, id)
if err != nil {
return nil, status.Errorf(
codes.InvalidArgument,
"failed to find required volume %q for volume group %q: %s",
id,
req.GetName(),
err.Error())
}

//nolint:gocritic // need to call .Destroy() for all volumes
defer vol.Destroy(ctx)
volumes[i] = vol
}

log.DebugLog(ctx, fmt.Sprintf("all %d Volumes for VolumeGroup %q have been found", len(volumes), req.GetName()))

// create a RBDVolumeGroup
vg, err := mgr.CreateVolumeGroup(ctx, req.GetName())
if err != nil {
return nil, status.Errorf(
codes.Internal,
"failed to create volume group %q: %s",
req.GetName(),
err.Error())
}

log.DebugLog(ctx, fmt.Sprintf("VolumeGroup %q had been created", req.GetName()))

// add each rbd-image to the RBDVolumeGroup
for _, vol := range volumes {
err = vg.AddVolume(ctx, vol)
if err != nil {
return nil, status.Errorf(
codes.Internal,
"failed to add volume %q to volume group %q: %s",
vol,
req.GetName(),
err.Error())
}
}

log.DebugLog(ctx, fmt.Sprintf("all %d Volumes have been added to for VolumeGroup %q", len(volumes), req.GetName()))

return &volumegroup.CreateVolumeGroupResponse{
VolumeGroup: vg.ToCSI(ctx),
}, nil
}

// DeleteVolumeGroup RPC call to delete a volume group.
//
// From the spec:
// This RPC will be called by the CO to delete a volume group on behalf of a
// user. This operation MUST be idempotent.
//
// If a volume group corresponding to the specified volume_group_id does not
// exist or the artifacts associated with the volume group do not exist
// anymore, the Plugin MUST reply 0 OK.
//
// A volume cannot be deleted individually when it is part of the group. It has
// to be removed from the group first. Delete a volume group will delete all
// volumes in the group.
//
// Note:
// The undocumented DO_NOT_ALLOW_VG_TO_DELETE_VOLUMES capability is set. There
// is no need to delete each volume that may be part of the volume group. If
// the volume group is not empty, a FAILED_PRECONDITION error will be returned.
func (vs *VolumeGroupServer) DeleteVolumeGroup(
ctx context.Context,
req *volumegroup.DeleteVolumeGroupRequest,
) (*volumegroup.DeleteVolumeGroupResponse, error) {
mgr := rbd.NewManager(nil, req.GetSecrets())
defer mgr.Destroy(ctx)

// resolve the volume group
vg, err := mgr.GetVolumeGroupByID(ctx, req.GetVolumeGroupId())
if err != nil {
return nil, status.Errorf(
codes.NotFound,
"could not find volume group %q: %s",
req.GetVolumeGroupId(),
err.Error())
}
defer vg.Destroy(ctx)

log.DebugLog(ctx, fmt.Sprintf("VolumeGroup %q has been found", req.GetVolumeGroupId()))

// verify that the volume group is empty
volumes, err := vg.ListVolumes(ctx)
if err != nil {
return nil, status.Errorf(
codes.NotFound,
"could not list volumes for voluem group %q: %s",
req.GetVolumeGroupId(),
err.Error())
}

log.DebugLog(ctx, fmt.Sprintf("VolumeGroup %q contains %d volumes", req.GetVolumeGroupId(), len(volumes)))

if len(volumes) != 0 {
return nil, status.Errorf(
codes.FailedPrecondition,
"rejecting to delete non-empty volume group %q",
req.GetVolumeGroupId())
}

// delete the volume group
err = mgr.DeleteVolumeGroup(ctx, vg)
if err != nil {
return nil, status.Errorf(codes.Internal,
"failed to delete volume group %q: %s",
req.GetVolumeGroupId(),
err.Error())
}

log.DebugLog(ctx, fmt.Sprintf("VolumeGroup %q has been deleted", req.GetVolumeGroupId()))

return &volumegroup.DeleteVolumeGroupResponse{}, nil
}
Loading

0 comments on commit 748ffd4

Please sign in to comment.