Skip to content

Commit

Permalink
Include EiT state as part of the desired state hash sent to clients
Browse files Browse the repository at this point in the history
When in-transit encryption is enabled/disabled the kernel mount option
for cephFS needs to be updated between prefer-crc/secure. So the
desired state hash needs to include the EiT state, so that
if the EiT state is changed the desired state hash will change and
the client will reconcile to get the updated mount option.

Signed-off-by: Malay Kumar Parida <[email protected]>
  • Loading branch information
malayparida2000 committed Oct 7, 2024
1 parent f80f43b commit 8805b41
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 11 deletions.
65 changes: 54 additions & 11 deletions services/provider/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ import (
"encoding/json"
"encoding/pem"
"fmt"
"k8s.io/utils/ptr"
"math"
"net"
"slices"
"strconv"
"strings"
"time"

"k8s.io/utils/ptr"

"github.com/blang/semver/v4"
nbv1 "github.com/noobaa/noobaa-operator/v5/pkg/apis/noobaa/v1alpha1"
quotav1 "github.com/openshift/api/quota/v1"
Expand Down Expand Up @@ -193,7 +194,13 @@ func (s *OCSProviderServer) GetStorageConfig(ctx context.Context, req *pb.Storag
if err != nil {
return nil, status.Errorf(codes.Internal, "Failed to construct status response: %v", err)
}
desiredClientConfigHash := getDesiredClientConfigHash(channelName, consumerObj)

storageCluster, err := s.getStorageCluster(ctx)
if err != nil {
return nil, err
}

desiredClientConfigHash := getDesiredClientConfigHash(channelName, consumerObj, isEncryptionInTransitEnabled(storageCluster.Spec.Network))

klog.Infof("successfully returned the config details to the consumer.")
return &pb.StorageConfigResponse{
Expand Down Expand Up @@ -751,15 +758,13 @@ func (s *OCSProviderServer) GetStorageClaimConfig(ctx context.Context, req *pb.S
"csi.storage.k8s.io/controller-expand-secret-name": provisionerSecretName,
}

storageClusters := &ocsv1.StorageClusterList{}
if err := s.client.List(ctx, storageClusters, client.InNamespace(s.namespace), client.Limit(2)); err != nil {
return nil, status.Errorf(codes.Internal, "failed to get storage cluster: %v", err)
}
if len(storageClusters.Items) != 1 {
return nil, status.Errorf(codes.Internal, "expecting one single storagecluster to exist")
storageCluster, err := s.getStorageCluster(ctx)
if err != nil {
return nil, err
}

var kernelMountOptions map[string]string
for _, option := range strings.Split(util.GetCephFSKernelMountOptions(&storageClusters.Items[0]), ",") {
for _, option := range strings.Split(util.GetCephFSKernelMountOptions(storageCluster), ",") {
if kernelMountOptions == nil {
kernelMountOptions = map[string]string{}
}
Expand Down Expand Up @@ -847,18 +852,24 @@ func (s *OCSProviderServer) ReportStatus(ctx context.Context, req *pb.ReportStat
return nil, status.Errorf(codes.Internal, "Failed to construct status response: %v", err)
}

desiredClientConfigHash := getDesiredClientConfigHash(channelName, storageConsumer)
storageCluster, err := s.getStorageCluster(ctx)
if err != nil {
return nil, err
}

desiredClientConfigHash := getDesiredClientConfigHash(channelName, storageConsumer, isEncryptionInTransitEnabled(storageCluster.Spec.Network))

return &pb.ReportStatusResponse{
DesiredClientOperatorChannel: channelName,
DesiredConfigHash: desiredClientConfigHash,
}, nil
}

func getDesiredClientConfigHash(channelName string, storageConsumer *ocsv1alpha1.StorageConsumer) string {
func getDesiredClientConfigHash(channelName string, storageConsumer *ocsv1alpha1.StorageConsumer, inTransitEncryptionEnabled bool) string {
var arr = []any{
channelName,
storageConsumer.Spec.StorageQuotaInGiB,
inTransitEncryptionEnabled,
}
return util.CalculateMD5Hash(arr)
}
Expand All @@ -878,6 +889,38 @@ func (s *OCSProviderServer) getOCSSubscriptionChannel(ctx context.Context) (stri
return subscription.Spec.Channel, nil
}

func (s *OCSProviderServer) getStorageCluster(ctx context.Context) (*ocsv1.StorageCluster, error) {
scList := &ocsv1.StorageClusterList{}
if err := s.client.List(ctx, scList, client.InNamespace(s.namespace)); err != nil {
return nil, status.Errorf(codes.Internal, "failed to list storage clusters: %v", err)
}

var foundSc *ocsv1.StorageCluster

for i := range scList.Items {
sc := &scList.Items[i]
if sc.Spec.ExternalStorage.Enable { // Filter out external storage clusters
continue
}
if foundSc != nil {
return nil, status.Errorf(codes.Internal, "multiple storage clusters found")
}
foundSc = sc
}

if foundSc == nil {
return nil, status.Errorf(codes.NotFound, "no storage cluster found")
}
return foundSc, nil
}

func isEncryptionInTransitEnabled(networkSpec *rookCephv1.NetworkSpec) bool {
return networkSpec != nil &&
networkSpec.Connections != nil &&
networkSpec.Connections.Encryption != nil &&
networkSpec.Connections.Encryption.Enabled
}

func extractMonitorIps(data string) ([]string, error) {
var ips []string
mons := strings.Split(data, ",")
Expand Down
5 changes: 5 additions & 0 deletions services/provider/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,11 @@ func TestGetExternalResources(t *testing.T) {
ocsSubscription.Spec = ocsSubscriptionSpec
assert.NoError(t, client.Create(ctx, ocsSubscription))

storageCluster := &ocsv1.StorageCluster{}
storageCluster.Name = "test-storagecluster"
storageCluster.Namespace = serverNamespace
assert.NoError(t, client.Create(ctx, storageCluster))

// When ocsv1alpha1.StorageConsumerStateReady
req := pb.StorageConfigRequest{
StorageConsumerUUID: string(consumerResource.UID),
Expand Down

0 comments on commit 8805b41

Please sign in to comment.