From cdadf3c9b6a4432964377ddea1c1a216565c2baa Mon Sep 17 00:00:00 2001 From: nasusoba <108400027+nasusoba@users.noreply.github.com> Date: Fri, 23 Aug 2024 13:21:57 +0800 Subject: [PATCH] support ssa patch (#136) Signed-off-by: nasusoba finished implementation Signed-off-by: nasusoba add test Signed-off-by: nasusoba fix ssaCache init Signed-off-by: nasusoba increase control plane replicas for test Signed-off-by: nasusoba fix typo Signed-off-by: nasusoba fix typo --- .golangci.yml | 1 + controlplane/controllers/const.go | 2 + .../kthreescontrolplane_controller.go | 97 +++++++ controlplane/controllers/scale.go | 157 +++++++--- go.mod | 6 +- pkg/k3s/control_plane.go | 20 +- pkg/util/contract/types.go | 199 +++++++++++++ pkg/util/hash/hash.go | 45 +++ pkg/util/ssa/cache.go | 114 ++++++++ pkg/util/ssa/doc.go | 20 ++ pkg/util/ssa/filterintent.go | 147 ++++++++++ pkg/util/ssa/managedfields.go | 180 ++++++++++++ pkg/util/ssa/matchers.go | 113 ++++++++ pkg/util/ssa/patch.go | 159 +++++++++++ test/e2e/config/k3s-docker.yaml | 4 +- test/e2e/e2e_suite_test.go | 4 +- test/e2e/helpers.go | 2 +- test/e2e/inplace_rollout_test.go | 267 ++++++++++++++++++ 18 files changed, 1491 insertions(+), 46 deletions(-) create mode 100644 pkg/util/contract/types.go create mode 100644 pkg/util/hash/hash.go create mode 100644 pkg/util/ssa/cache.go create mode 100644 pkg/util/ssa/doc.go create mode 100644 pkg/util/ssa/filterintent.go create mode 100644 pkg/util/ssa/managedfields.go create mode 100644 pkg/util/ssa/matchers.go create mode 100644 pkg/util/ssa/patch.go create mode 100644 test/e2e/inplace_rollout_test.go diff --git a/.golangci.yml b/.golangci.yml index cffdca26..9e5d938c 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -71,6 +71,7 @@ linters-settings: - github.com/go-logr/logr - github.com/coredns/corefile-migration/migration - github.com/pkg/errors + - github.com/davecgh/go-spew/spew - k8s.io/api - k8s.io/apimachinery/pkg diff --git a/controlplane/controllers/const.go b/controlplane/controllers/const.go index c8224515..b13662b8 100644 --- a/controlplane/controllers/const.go +++ b/controlplane/controllers/const.go @@ -36,4 +36,6 @@ const ( etcdRemovalRequeueAfter = 30 * time.Second k3sHookName = "k3s" + + kcpManagerName = "capi-kthreescontrolplane" ) diff --git a/controlplane/controllers/kthreescontrolplane_controller.go b/controlplane/controllers/kthreescontrolplane_controller.go index 34ee91f0..7ae3096a 100644 --- a/controlplane/controllers/kthreescontrolplane_controller.go +++ b/controlplane/controllers/kthreescontrolplane_controller.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" kerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" "sigs.k8s.io/cluster-api/controllers/external" "sigs.k8s.io/cluster-api/util" @@ -53,6 +54,8 @@ import ( "github.com/k3s-io/cluster-api-k3s/pkg/machinefilters" "github.com/k3s-io/cluster-api-k3s/pkg/secret" "github.com/k3s-io/cluster-api-k3s/pkg/token" + "github.com/k3s-io/cluster-api-k3s/pkg/util/contract" + "github.com/k3s-io/cluster-api-k3s/pkg/util/ssa" ) // KThreesControlPlaneReconciler reconciles a KThreesControlPlane object. @@ -68,6 +71,7 @@ type KThreesControlPlaneReconciler struct { managementCluster k3s.ManagementCluster managementClusterUncached k3s.ManagementCluster + ssaCache ssa.Cache } // +kubebuilder:rbac:groups=core,resources=events,verbs=get;list;watch;create;patch @@ -302,6 +306,7 @@ func (r *KThreesControlPlaneReconciler) SetupWithManager(ctx context.Context, mg r.Scheme = mgr.GetScheme() r.controller = c r.recorder = mgr.GetEventRecorderFor("k3s-control-plane-controller") + r.ssaCache = ssa.NewCache() if r.managementCluster == nil { r.managementCluster = &k3s.Management{ @@ -516,6 +521,10 @@ func (r *KThreesControlPlaneReconciler) reconcile(ctx context.Context, cluster * return reconcile.Result{}, err } + if err := r.syncMachines(ctx, controlPlane); err != nil { + return ctrl.Result{}, errors.Wrap(err, "failed to sync Machines") + } + // Aggregate the operational state of all the machines; while aggregating we are adding the // source ref (reason@machine/name) so the problem can be easily tracked down to its source machine. conditions.SetAggregate(controlPlane.KCP, controlplanev1.MachinesReadyCondition, ownedMachines.ConditionGetters(), conditions.AddSourceRef(), conditions.WithStepCounterIf(false)) @@ -673,6 +682,94 @@ func (r *KThreesControlPlaneReconciler) reconcileKubeconfig(ctx context.Context, return reconcile.Result{}, nil } +// syncMachines updates Machines, InfrastructureMachines and KThreesConfigs to propagate in-place mutable fields from KCP. +// Note: It also cleans up managed fields of all Machines so that Machines that were +// created/patched before (<= v0.2.0) the controller adopted Server-Side-Apply (SSA) can also work with SSA. +// Note: For InfrastructureMachines and KThreesConfigs it also drops ownership of "metadata.labels" and +// "metadata.annotations" from "manager" so that "capi-kthreescontrolplane" can own these fields and can work with SSA. +// Otherwise, fields would be co-owned by our "old" "manager" and "capi-kthreescontrolplane" and then we would not be +// able to e.g. drop labels and annotations. +func (r *KThreesControlPlaneReconciler) syncMachines(ctx context.Context, controlPlane *k3s.ControlPlane) error { + patchHelpers := map[string]*patch.Helper{} + for machineName := range controlPlane.Machines { + m := controlPlane.Machines[machineName] + // If the machine is already being deleted, we don't need to update it. + if !m.DeletionTimestamp.IsZero() { + continue + } + + // Cleanup managed fields of all Machines. + // We do this so that Machines that were created/patched before the controller adopted Server-Side-Apply (SSA) + // (<= v0.2.0) can also work with SSA. Otherwise, fields would be co-owned by our "old" "manager" and + // "capi-kthreescontrolplane" and then we would not be able to e.g. drop labels and annotations. + if err := ssa.CleanUpManagedFieldsForSSAAdoption(ctx, r.Client, m, kcpManagerName); err != nil { + return errors.Wrapf(err, "failed to update Machine: failed to adjust the managedFields of the Machine %s", klog.KObj(m)) + } + // Update Machine to propagate in-place mutable fields from KCP. + updatedMachine, err := r.updateMachine(ctx, m, controlPlane.KCP, controlPlane.Cluster) + if err != nil { + return errors.Wrapf(err, "failed to update Machine: %s", klog.KObj(m)) + } + controlPlane.Machines[machineName] = updatedMachine + // Since the machine is updated, re-create the patch helper so that any subsequent + // Patch calls use the correct base machine object to calculate the diffs. + // Example: reconcileControlPlaneConditions patches the machine objects in a subsequent call + // and, it should use the updated machine to calculate the diff. + // Note: If the patchHelpers are not re-computed based on the new updated machines, subsequent + // Patch calls will fail because the patch will be calculated based on an outdated machine and will error + // because of outdated resourceVersion. + // TODO: This should be cleaned-up to have a more streamline way of constructing and using patchHelpers. + patchHelper, err := patch.NewHelper(updatedMachine, r.Client) + if err != nil { + return err + } + patchHelpers[machineName] = patchHelper + + labelsAndAnnotationsManagedFieldPaths := []contract.Path{ + {"f:metadata", "f:annotations"}, + {"f:metadata", "f:labels"}, + } + infraMachine, infraMachineFound := controlPlane.InfraResources[machineName] + // Only update the InfraMachine if it is already found, otherwise just skip it. + // This could happen e.g. if the cache is not up-to-date yet. + if infraMachineFound { + // Cleanup managed fields of all InfrastructureMachines to drop ownership of labels and annotations + // from "manager". We do this so that InfrastructureMachines that are created using the Create method + // can also work with SSA. Otherwise, labels and annotations would be co-owned by our "old" "manager" + // and "capi-kthreescontrolplane" and then we would not be able to e.g. drop labels and annotations. + if err := ssa.DropManagedFields(ctx, r.Client, infraMachine, kcpManagerName, labelsAndAnnotationsManagedFieldPaths); err != nil { + return errors.Wrapf(err, "failed to clean up managedFields of InfrastructureMachine %s", klog.KObj(infraMachine)) + } + // Update in-place mutating fields on InfrastructureMachine. + if err := r.updateExternalObject(ctx, infraMachine, controlPlane.KCP, controlPlane.Cluster); err != nil { + return errors.Wrapf(err, "failed to update InfrastructureMachine %s", klog.KObj(infraMachine)) + } + } + + kthreesConfigs, kthreesConfigsFound := controlPlane.KthreesConfigs[machineName] + // Only update the kthreesConfigs if it is already found, otherwise just skip it. + // This could happen e.g. if the cache is not up-to-date yet. + if kthreesConfigsFound { + // Note: Set the GroupVersionKind because updateExternalObject depends on it. + kthreesConfigs.SetGroupVersionKind(m.Spec.Bootstrap.ConfigRef.GroupVersionKind()) + // Cleanup managed fields of all KThreesConfigs to drop ownership of labels and annotations + // from "manager". We do this so that KThreesConfigs that are created using the Create method + // can also work with SSA. Otherwise, labels and annotations would be co-owned by our "old" "manager" + // and "capi-kthreescontrolplane" and then we would not be able to e.g. drop labels and annotations. + if err := ssa.DropManagedFields(ctx, r.Client, kthreesConfigs, kcpManagerName, labelsAndAnnotationsManagedFieldPaths); err != nil { + return errors.Wrapf(err, "failed to clean up managedFields of kthreesConfigs %s", klog.KObj(kthreesConfigs)) + } + // Update in-place mutating fields on BootstrapConfig. + if err := r.updateExternalObject(ctx, kthreesConfigs, controlPlane.KCP, controlPlane.Cluster); err != nil { + return errors.Wrapf(err, "failed to update KThreesConfigs %s", klog.KObj(kthreesConfigs)) + } + } + } + // Update the patch helpers. + controlPlane.SetPatchHelpers(patchHelpers) + return nil +} + // reconcileControlPlaneConditions is responsible of reconciling conditions reporting the status of static pods and // the status of the etcd cluster. func (r *KThreesControlPlaneReconciler) reconcileControlPlaneConditions(ctx context.Context, controlPlane *k3s.ControlPlane) error { diff --git a/controlplane/controllers/scale.go b/controlplane/controllers/scale.go index 48d25384..1fe1afd4 100644 --- a/controlplane/controllers/scale.go +++ b/controlplane/controllers/scale.go @@ -27,6 +27,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/types" kerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apiserver/pkg/storage/names" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" @@ -36,10 +37,12 @@ import ( "sigs.k8s.io/cluster-api/util/conditions" "sigs.k8s.io/cluster-api/util/patch" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" bootstrapv1 "github.com/k3s-io/cluster-api-k3s/bootstrap/api/v1beta2" controlplanev1 "github.com/k3s-io/cluster-api-k3s/controlplane/api/v1beta2" k3s "github.com/k3s-io/cluster-api-k3s/pkg/k3s" + "github.com/k3s-io/cluster-api-k3s/pkg/util/ssa" ) var ErrPreConditionFailed = errors.New("precondition check failed") @@ -253,6 +256,12 @@ func selectMachineForScaleDown(ctx context.Context, controlPlane *k3s.ControlPla func (r *KThreesControlPlaneReconciler) cloneConfigsAndGenerateMachine(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KThreesControlPlane, bootstrapSpec *bootstrapv1.KThreesConfigSpec, failureDomain *string) error { var errs []error + // Compute desired Machine + machine, err := r.computeDesiredMachine(kcp, cluster, failureDomain, nil) + if err != nil { + return errors.Wrap(err, "failed to create Machine: failed to compute desired Machine") + } + // Since the cloned resource should eventually have a controller ref for the Machine, we create an // OwnerReference here without the Controller field set infraCloneOwner := &metav1.OwnerReference{ @@ -275,6 +284,7 @@ func (r *KThreesControlPlaneReconciler) cloneConfigsAndGenerateMachine(ctx conte // Safe to return early here since no resources have been created yet. return fmt.Errorf("failed to clone infrastructure template: %w", err) } + machine.Spec.InfrastructureRef = *infraRef // Clone the bootstrap configuration bootstrapRef, err := r.generateKThreesConfig(ctx, kcp, cluster, bootstrapSpec) @@ -284,8 +294,9 @@ func (r *KThreesControlPlaneReconciler) cloneConfigsAndGenerateMachine(ctx conte // Only proceed to generating the Machine if we haven't encountered an error if len(errs) == 0 { - if err := r.generateMachine(ctx, kcp, cluster, infraRef, bootstrapRef, failureDomain); err != nil { - errs = append(errs, fmt.Errorf("failed to create Machine: %w", err)) + machine.Spec.Bootstrap.ConfigRef = bootstrapRef + if err := r.createMachine(ctx, kcp, machine); err != nil { + errs = append(errs, errors.Wrap(err, "failed to create Machine")) } } @@ -355,23 +366,117 @@ func (r *KThreesControlPlaneReconciler) generateKThreesConfig(ctx context.Contex return bootstrapRef, nil } -func (r *KThreesControlPlaneReconciler) generateMachine(ctx context.Context, kcp *controlplanev1.KThreesControlPlane, cluster *clusterv1.Cluster, infraRef, bootstrapRef *corev1.ObjectReference, failureDomain *string) error { - machine := &clusterv1.Machine{ +// updateExternalObject updates the external object with the labels and annotations from KCP. +func (r *KThreesControlPlaneReconciler) updateExternalObject(ctx context.Context, obj client.Object, kcp *controlplanev1.KThreesControlPlane, cluster *clusterv1.Cluster) error { + updatedObject := &unstructured.Unstructured{} + updatedObject.SetGroupVersionKind(obj.GetObjectKind().GroupVersionKind()) + updatedObject.SetNamespace(obj.GetNamespace()) + updatedObject.SetName(obj.GetName()) + // Set the UID to ensure that Server-Side-Apply only performs an update + // and does not perform an accidental create. + updatedObject.SetUID(obj.GetUID()) + + // Update labels + updatedObject.SetLabels(k3s.ControlPlaneLabelsForCluster(cluster.Name, kcp.Spec.MachineTemplate)) + // Update annotations + updatedObject.SetAnnotations(kcp.Spec.MachineTemplate.ObjectMeta.Annotations) + + if err := ssa.Patch(ctx, r.Client, kcpManagerName, updatedObject, ssa.WithCachingProxy{Cache: r.ssaCache, Original: obj}); err != nil { + return errors.Wrapf(err, "failed to update %s", obj.GetObjectKind().GroupVersionKind().Kind) + } + return nil +} + +func (r *KThreesControlPlaneReconciler) createMachine(ctx context.Context, kcp *controlplanev1.KThreesControlPlane, machine *clusterv1.Machine) error { + if err := ssa.Patch(ctx, r.Client, kcpManagerName, machine); err != nil { + return errors.Wrap(err, "failed to create Machine") + } + // Remove the annotation tracking that a remediation is in progress (the remediation completed when + // the replacement machine has been created above). + delete(kcp.Annotations, controlplanev1.RemediationInProgressAnnotation) + return nil +} + +func (r *KThreesControlPlaneReconciler) updateMachine(ctx context.Context, machine *clusterv1.Machine, kcp *controlplanev1.KThreesControlPlane, cluster *clusterv1.Cluster) (*clusterv1.Machine, error) { + updatedMachine, err := r.computeDesiredMachine(kcp, cluster, machine.Spec.FailureDomain, machine) + if err != nil { + return nil, errors.Wrap(err, "failed to update Machine: failed to compute desired Machine") + } + + err = ssa.Patch(ctx, r.Client, kcpManagerName, updatedMachine, ssa.WithCachingProxy{Cache: r.ssaCache, Original: machine}) + if err != nil { + return nil, errors.Wrap(err, "failed to update Machine") + } + return updatedMachine, nil +} + +// computeDesiredMachine computes the desired Machine. +// This Machine will be used during reconciliation to: +// * create a new Machine +// * update an existing Machine +// Because we are using Server-Side-Apply we always have to calculate the full object. +// There are small differences in how we calculate the Machine depending on if it +// is a create or update. Example: for a new Machine we have to calculate a new name, +// while for an existing Machine we have to use the name of the existing Machine. +// Also, for an existing Machine, we will not copy its labels, as they are not managed by the KThreesControlPlane controller. +func (r *KThreesControlPlaneReconciler) computeDesiredMachine(kcp *controlplanev1.KThreesControlPlane, cluster *clusterv1.Cluster, failureDomain *string, existingMachine *clusterv1.Machine) (*clusterv1.Machine, error) { + var machineName string + var machineUID types.UID + var version *string + annotations := map[string]string{} + if existingMachine == nil { + // Creating a new machine + machineName = names.SimpleNameGenerator.GenerateName(kcp.Name + "-") + version = &kcp.Spec.Version + + // Machine's bootstrap config may be missing ClusterConfiguration if it is not the first machine in the control plane. + // We store ClusterConfiguration as annotation here to detect any changes in KCP ClusterConfiguration and rollout the machine if any. + serverConfig, err := json.Marshal(kcp.Spec.KThreesConfigSpec.ServerConfig) + if err != nil { + return nil, errors.Wrap(err, "failed to marshal cluster configuration") + } + annotations[controlplanev1.KThreesServerConfigurationAnnotation] = string(serverConfig) + + // In case this machine is being created as a consequence of a remediation, then add an annotation + // tracking remediating data. + // NOTE: This is required in order to track remediation retries. + if remediationData, ok := kcp.Annotations[controlplanev1.RemediationInProgressAnnotation]; ok { + annotations[controlplanev1.RemediationForAnnotation] = remediationData + } + } else { + // Updating an existing machine + machineName = existingMachine.Name + machineUID = existingMachine.UID + version = existingMachine.Spec.Version + + // For existing machine only set the ClusterConfiguration annotation if the machine already has it. + // We should not add the annotation if it was missing in the first place because we do not have enough + // information. + if serverConfig, ok := existingMachine.Annotations[controlplanev1.KThreesServerConfigurationAnnotation]; ok { + annotations[controlplanev1.KThreesServerConfigurationAnnotation] = serverConfig + } + + // If the machine already has remediation data then preserve it. + // NOTE: This is required in order to track remediation retries. + if remediationData, ok := existingMachine.Annotations[controlplanev1.RemediationForAnnotation]; ok { + annotations[controlplanev1.RemediationForAnnotation] = remediationData + } + } + + // Construct the basic Machine. + desiredMachine := &clusterv1.Machine{ ObjectMeta: metav1.ObjectMeta{ - Name: names.SimpleNameGenerator.GenerateName(kcp.Name + "-"), + Name: machineName, Namespace: kcp.Namespace, + UID: machineUID, Labels: k3s.ControlPlaneLabelsForCluster(cluster.Name, kcp.Spec.MachineTemplate), OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(kcp, controlplanev1.GroupVersion.WithKind("KThreesControlPlane")), }, }, Spec: clusterv1.MachineSpec{ - ClusterName: cluster.Name, - Version: &kcp.Spec.Version, - InfrastructureRef: *infraRef, - Bootstrap: clusterv1.Bootstrap{ - ConfigRef: bootstrapRef, - }, + ClusterName: cluster.Name, + Version: version, FailureDomain: failureDomain, NodeDrainTimeout: kcp.Spec.MachineTemplate.NodeDrainTimeout, NodeVolumeDetachTimeout: kcp.Spec.MachineTemplate.NodeVolumeDetachTimeout, @@ -379,31 +484,17 @@ func (r *KThreesControlPlaneReconciler) generateMachine(ctx context.Context, kcp }, } - annotations := map[string]string{} - - // Machine's bootstrap config may be missing ClusterConfiguration if it is not the first machine in the control plane. - // We store ClusterConfiguration as annotation here to detect any changes in KCP ClusterConfiguration and rollout the machine if any. - serverConfig, err := json.Marshal(kcp.Spec.KThreesConfigSpec.ServerConfig) - if err != nil { - return fmt.Errorf("failed to marshal cluster configuration: %w", err) - } - annotations[controlplanev1.KThreesServerConfigurationAnnotation] = string(serverConfig) - - // In case this machine is being created as a consequence of a remediation, then add an annotation - // tracking remediating data. - // NOTE: This is required in order to track remediation retries. - if remediationData, ok := kcp.Annotations[controlplanev1.RemediationInProgressAnnotation]; ok { - annotations[controlplanev1.RemediationForAnnotation] = remediationData + // Set annotations + for k, v := range kcp.Spec.MachineTemplate.ObjectMeta.Annotations { + annotations[k] = v } - machine.SetAnnotations(annotations) + desiredMachine.SetAnnotations(annotations) - if err := r.Client.Create(ctx, machine); err != nil { - return fmt.Errorf("failed to create machine: %w", err) + if existingMachine != nil { + desiredMachine.Spec.InfrastructureRef = existingMachine.Spec.InfrastructureRef + desiredMachine.Spec.Bootstrap.ConfigRef = existingMachine.Spec.Bootstrap.ConfigRef } - // Remove the annotation tracking that a remediation is in progress (the remediation completed when - // the replacement machine has been created above). - delete(kcp.Annotations, controlplanev1.RemediationInProgressAnnotation) - return nil + return desiredMachine, nil } diff --git a/go.mod b/go.mod index 34123939..3777f483 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.21 require ( github.com/coredns/corefile-migration v1.0.21 + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc github.com/go-logr/logr v1.4.1 github.com/google/uuid v1.4.0 github.com/onsi/ginkgo v1.16.5 @@ -12,7 +13,9 @@ require ( github.com/pkg/errors v0.9.1 go.etcd.io/etcd/api/v3 v3.5.13 go.etcd.io/etcd/client/v3 v3.5.13 + golang.org/x/exp v0.0.0-20230905200255-921286631fa9 google.golang.org/grpc v1.60.1 + google.golang.org/protobuf v1.33.0 k8s.io/api v0.29.3 k8s.io/apimachinery v0.29.3 k8s.io/apiserver v0.29.3 @@ -44,7 +47,6 @@ require ( github.com/coredns/caddy v1.1.0 // indirect github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect - github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/distribution/reference v0.5.0 // indirect github.com/docker/docker v25.0.5+incompatible // indirect github.com/docker/go-connections v0.5.0 // indirect @@ -120,7 +122,6 @@ require ( go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect golang.org/x/crypto v0.21.0 // indirect - golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect golang.org/x/net v0.23.0 // indirect golang.org/x/oauth2 v0.18.0 // indirect golang.org/x/sync v0.6.0 // indirect @@ -134,7 +135,6 @@ require ( google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20231106174013-bbf56f31fb17 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f // indirect - google.golang.org/protobuf v1.33.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect diff --git a/pkg/k3s/control_plane.go b/pkg/k3s/control_plane.go index c29dbfab..4d93e3d7 100644 --- a/pkg/k3s/control_plane.go +++ b/pkg/k3s/control_plane.go @@ -64,8 +64,8 @@ type ControlPlane struct { // TODO: we should see if we can combine these with the Machine objects so we don't have all these separate lookups // See discussion on https://github.com/kubernetes-sigs/cluster-api/pull/3405 - kthreesConfigs map[string]*bootstrapv1.KThreesConfig - infraResources map[string]*unstructured.Unstructured + KthreesConfigs map[string]*bootstrapv1.KThreesConfig + InfraResources map[string]*unstructured.Unstructured } // NewControlPlane returns an instantiated ControlPlane. @@ -106,8 +106,8 @@ func NewControlPlane(ctx context.Context, client client.Client, cluster *cluster Machines: ownedMachines, machinesPatchHelpers: patchHelpers, hasEtcdCA: hasEtcdCA, - kthreesConfigs: kthreesConfigs, - infraResources: infraObjects, + KthreesConfigs: kthreesConfigs, + InfraResources: infraObjects, reconciliationTime: metav1.Now(), }, nil } @@ -287,7 +287,7 @@ func (c *ControlPlane) MachinesNeedingRollout() collections.Machines { // Machines that are scheduled for rollout (KCP.Spec.RolloutAfter set, the RolloutAfter deadline is expired, and the machine was created before the deadline). collections.ShouldRolloutAfter(&c.reconciliationTime, c.KCP.Spec.RolloutAfter), // Machines that do not match with KCP config. - collections.Not(machinefilters.MatchesKCPConfiguration(c.infraResources, c.kthreesConfigs, c.KCP)), + collections.Not(machinefilters.MatchesKCPConfiguration(c.InfraResources, c.KthreesConfigs, c.KCP)), ) } @@ -371,3 +371,13 @@ func (c *ControlPlane) PatchMachines(ctx context.Context) error { return kerrors.NewAggregate(errList) } + +// SetPatchHelpers updates the patch helpers. +func (c *ControlPlane) SetPatchHelpers(patchHelpers map[string]*patch.Helper) { + if c.machinesPatchHelpers == nil { + c.machinesPatchHelpers = map[string]*patch.Helper{} + } + for machineName, patchHelper := range patchHelpers { + c.machinesPatchHelpers[machineName] = patchHelper + } +} diff --git a/pkg/util/contract/types.go b/pkg/util/contract/types.go new file mode 100644 index 00000000..e6f1f6e8 --- /dev/null +++ b/pkg/util/contract/types.go @@ -0,0 +1,199 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package contract + +import ( + "strconv" + "strings" + + "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +// ErrFieldNotFound is returned when a field is not found in the object. +var ErrFieldNotFound = errors.New("field not found") + +// Path defines a how to access a field in an Unstructured object. +type Path []string + +// Append a field name to a path. +func (p Path) Append(k string) Path { + return append(p, k) +} + +// IsParentOf check if one path is Parent of the other. +func (p Path) IsParentOf(other Path) bool { + if len(p) >= len(other) { + return false + } + for i := range p { + if p[i] != other[i] { + return false + } + } + return true +} + +// Equal check if two path are equal (exact match). +func (p Path) Equal(other Path) bool { + if len(p) != len(other) { + return false + } + for i := range p { + if p[i] != other[i] { + return false + } + } + return true +} + +// Overlaps return true if two paths are Equal or one IsParentOf the other. +func (p Path) Overlaps(other Path) bool { + return other.Equal(p) || other.IsParentOf(p) || p.IsParentOf(other) +} + +// String returns the path as a dotted string. +func (p Path) String() string { + return strings.Join(p, ".") +} + +// Int64 represents an accessor to an int64 path value. +type Int64 struct { + path Path +} + +// Path returns the path to the int64 value. +func (i *Int64) Path() Path { + return i.path +} + +// Get gets the int64 value. +func (i *Int64) Get(obj *unstructured.Unstructured) (*int64, error) { + value, ok, err := unstructured.NestedInt64(obj.UnstructuredContent(), i.path...) + if err != nil { + return nil, errors.Wrapf(err, "failed to get %s from object", "."+strings.Join(i.path, ".")) + } + if !ok { + return nil, errors.Wrapf(ErrFieldNotFound, "path %s", "."+strings.Join(i.path, ".")) + } + return &value, nil +} + +// Set sets the int64 value in the path. +func (i *Int64) Set(obj *unstructured.Unstructured, value int64) error { + if err := unstructured.SetNestedField(obj.UnstructuredContent(), value, i.path...); err != nil { + return errors.Wrapf(err, "failed to set path %s of object %v", "."+strings.Join(i.path, "."), obj.GroupVersionKind()) + } + return nil +} + +// Bool represents an accessor to an bool path value. +type Bool struct { + path Path +} + +// Path returns the path to the bool value. +func (b *Bool) Path() Path { + return b.path +} + +// Get gets the bool value. +func (b *Bool) Get(obj *unstructured.Unstructured) (*bool, error) { + value, ok, err := unstructured.NestedBool(obj.UnstructuredContent(), b.path...) + if err != nil { + return nil, errors.Wrapf(err, "failed to get %s from object", "."+strings.Join(b.path, ".")) + } + if !ok { + return nil, errors.Wrapf(ErrFieldNotFound, "path %s", "."+strings.Join(b.path, ".")) + } + return &value, nil +} + +// Set sets the bool value in the path. +func (b *Bool) Set(obj *unstructured.Unstructured, value bool) error { + if err := unstructured.SetNestedField(obj.UnstructuredContent(), value, b.path...); err != nil { + return errors.Wrapf(err, "failed to set path %s of object %v", "."+strings.Join(b.path, "."), obj.GroupVersionKind()) + } + return nil +} + +// String represents an accessor to a string path value. +type String struct { + path Path +} + +// Path returns the path to the string value. +func (s *String) Path() Path { + return s.path +} + +// Get gets the string value. +func (s *String) Get(obj *unstructured.Unstructured) (*string, error) { + value, ok, err := unstructured.NestedString(obj.UnstructuredContent(), s.path...) + if err != nil { + return nil, errors.Wrapf(err, "failed to get %s from object", "."+strings.Join(s.path, ".")) + } + if !ok { + return nil, errors.Wrapf(ErrFieldNotFound, "path %s", "."+strings.Join(s.path, ".")) + } + return &value, nil +} + +// Set sets the string value in the path. +func (s *String) Set(obj *unstructured.Unstructured, value string) error { + if err := unstructured.SetNestedField(obj.UnstructuredContent(), value, s.path...); err != nil { + return errors.Wrapf(err, "failed to set path %s of object %v", "."+strings.Join(s.path, "."), obj.GroupVersionKind()) + } + return nil +} + +// Duration represents an accessor to a metav1.Duration path value. +type Duration struct { + path Path +} + +// Path returns the path to the metav1.Duration value. +func (i *Duration) Path() Path { + return i.path +} + +// Get gets the metav1.Duration value. +func (i *Duration) Get(obj *unstructured.Unstructured) (*metav1.Duration, error) { + durationString, ok, err := unstructured.NestedString(obj.UnstructuredContent(), i.path...) + if err != nil { + return nil, errors.Wrapf(err, "failed to get %s from object", "."+strings.Join(i.path, ".")) + } + if !ok { + return nil, errors.Wrapf(ErrFieldNotFound, "path %s", "."+strings.Join(i.path, ".")) + } + + d := &metav1.Duration{} + if err := d.UnmarshalJSON([]byte(strconv.Quote(durationString))); err != nil { + return nil, errors.Wrapf(err, "failed to unmarshal duration %s from object", "."+strings.Join(i.path, ".")) + } + + return d, nil +} + +// Set sets the metav1.Duration value in the path. +func (i *Duration) Set(obj *unstructured.Unstructured, value metav1.Duration) error { + if err := unstructured.SetNestedField(obj.UnstructuredContent(), value.Duration.String(), i.path...); err != nil { + return errors.Wrapf(err, "failed to set path %s of object %v", "."+strings.Join(i.path, "."), obj.GroupVersionKind()) + } + return nil +} diff --git a/pkg/util/hash/hash.go b/pkg/util/hash/hash.go new file mode 100644 index 00000000..06eaabbd --- /dev/null +++ b/pkg/util/hash/hash.go @@ -0,0 +1,45 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package hash provides utils to calculate hashes. +package hash + +import ( + "fmt" + "hash/fnv" + + "github.com/davecgh/go-spew/spew" +) + +// Compute computes the hash of an object using the spew library. +// Note: spew follows pointers and prints actual values of the nested objects +// ensuring the hash does not change when a pointer changes. +func Compute(obj interface{}) (uint32, error) { + hasher := fnv.New32a() + + printer := spew.ConfigState{ + Indent: " ", + SortKeys: true, + DisableMethods: true, + SpewKeys: true, + } + + if _, err := printer.Fprintf(hasher, "%#v", obj); err != nil { + return 0, fmt.Errorf("failed to calculate hash") + } + + return hasher.Sum32(), nil +} diff --git a/pkg/util/ssa/cache.go b/pkg/util/ssa/cache.go new file mode 100644 index 00000000..9f60507a --- /dev/null +++ b/pkg/util/ssa/cache.go @@ -0,0 +1,114 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ssa + +import ( + "fmt" + "time" + + "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + + "github.com/k3s-io/cluster-api-k3s/pkg/util/hash" +) + +const ( + // ttl is the duration for which we keep the keys in the cache. + ttl = 10 * time.Minute + + // expirationInterval is the interval in which we will remove expired keys + // from the cache. + expirationInterval = 10 * time.Hour +) + +// Cache caches SSA request results. +// Specifically we only use it to cache that a certain request +// doesn't have to be repeated anymore because there was no diff. +type Cache interface { + // Add adds the given key to the Cache. + // Note: keys expire after the ttl. + Add(key string) + + // Has checks if the given key (still) exists in the Cache. + // Note: keys expire after the ttl. + Has(key string) bool +} + +// NewCache creates a new cache. +func NewCache() Cache { + r := &ssaCache{ + Store: cache.NewTTLStore(func(obj interface{}) (string, error) { + // We only add strings to the cache, so it's safe to cast to string. + return obj.(string), nil + }, ttl), + } + go func() { + for { + // Call list to clear the cache of expired items. + // We have to do this periodically as the cache itself only expires + // items lazily. If we don't do this the cache grows indefinitely. + r.List() + + time.Sleep(expirationInterval) + } + }() + return r +} + +type ssaCache struct { + cache.Store +} + +// Add adds the given key to the Cache. +// Note: keys expire after the ttl. +func (r *ssaCache) Add(key string) { + // Note: We can ignore the error here because by only allowing strings + // and providing the corresponding keyFunc ourselves we can guarantee that + // the error never occurs. + _ = r.Store.Add(key) +} + +// Has checks if the given key (still) exists in the Cache. +// Note: keys expire after the ttl. +func (r *ssaCache) Has(key string) bool { + // Note: We can ignore the error here because GetByKey never returns an error. + _, exists, _ := r.Store.GetByKey(key) + return exists +} + +// ComputeRequestIdentifier computes a request identifier for the cache. +// The identifier is unique for a specific request to ensure we don't have to re-run the request +// once we found out that it would not produce a diff. +// The identifier consists of: gvk, namespace, name and resourceVersion of the original object and a hash of the modified +// object. This ensures that we re-run the request as soon as either original or modified changes. +func ComputeRequestIdentifier(scheme *runtime.Scheme, original, modified client.Object) (string, error) { + modifiedObjectHash, err := hash.Compute(modified) + if err != nil { + return "", errors.Wrapf(err, "failed to calculate request identifier: failed to compute hash for modified object") + } + + gvk, err := apiutil.GVKForObject(original, scheme) + if err != nil { + return "", errors.Wrapf(err, "failed to calculate request identifier: failed to get GroupVersionKind of original object %s", klog.KObj(original)) + } + + return fmt.Sprintf("%s.%s.%s.%d", gvk.String(), klog.KObj(original), original.GetResourceVersion(), modifiedObjectHash), nil +} diff --git a/pkg/util/ssa/doc.go b/pkg/util/ssa/doc.go new file mode 100644 index 00000000..43e5c913 --- /dev/null +++ b/pkg/util/ssa/doc.go @@ -0,0 +1,20 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +Package ssa provides functionality to perform Server-Side Apply operations. +*/ +package ssa diff --git a/pkg/util/ssa/filterintent.go b/pkg/util/ssa/filterintent.go new file mode 100644 index 00000000..a4741c47 --- /dev/null +++ b/pkg/util/ssa/filterintent.go @@ -0,0 +1,147 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ssa + +import ( + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + "github.com/k3s-io/cluster-api-k3s/pkg/util/contract" +) + +// FilterObjectInput holds info required while filtering the object. +type FilterObjectInput struct { + // AllowedPaths instruct FilterObject to ignore everything except given paths. + AllowedPaths []contract.Path + + // IgnorePaths instruct FilterObject to ignore given paths. + // NOTE: IgnorePaths are used to filter out fields nested inside AllowedPaths, e.g. + // spec.ControlPlaneEndpoint. + // NOTE: ignore paths which point to an array are not supported by the current implementation. + IgnorePaths []contract.Path +} + +// FilterObject filter out changes not relevant for the controller. +func FilterObject(obj *unstructured.Unstructured, input *FilterObjectInput) { + // filter out changes not in the allowed paths (fields to not consider, e.g. status); + if len(input.AllowedPaths) > 0 { + FilterIntent(&FilterIntentInput{ + Path: contract.Path{}, + Value: obj.Object, + ShouldFilter: IsPathNotAllowed(input.AllowedPaths), + }) + } + + // filter out changes for ignore paths (well known fields owned by other controllers, e.g. + // spec.controlPlaneEndpoint in the InfrastructureCluster object); + if len(input.IgnorePaths) > 0 { + FilterIntent(&FilterIntentInput{ + Path: contract.Path{}, + Value: obj.Object, + ShouldFilter: IsPathIgnored(input.IgnorePaths), + }) + } +} + +// FilterIntent ensures that object only includes the fields and values for which the controller has an opinion, +// and filter out everything else by removing it from the Value. +// NOTE: This func is called recursively only for fields of type Map, but this is ok given the current use cases +// this func has to address. More specifically, we are using this func for filtering out not allowed paths and for ignore paths; +// all of them are defined in reconcile_state.go and are targeting well-known fields inside nested maps. +// Allowed paths / ignore paths which point to an array are not supported by the current implementation. +func FilterIntent(ctx *FilterIntentInput) bool { + value, ok := ctx.Value.(map[string]interface{}) + if !ok { + return false + } + + gotDeletions := false + for field := range value { + fieldCtx := &FilterIntentInput{ + // Compose the Path for the nested field. + Path: ctx.Path.Append(field), + // Gets the original and the modified Value for the field. + Value: value[field], + // Carry over global values from the context. + ShouldFilter: ctx.ShouldFilter, + } + + // If the field should be filtered out, delete it from the modified object. + if fieldCtx.ShouldFilter(fieldCtx.Path) { + delete(value, field) + gotDeletions = true + continue + } + + // Process nested fields and get in return if FilterIntent removed fields. + if FilterIntent(fieldCtx) { + // Ensure we are not leaving empty maps around. + if v, ok := fieldCtx.Value.(map[string]interface{}); ok && len(v) == 0 { + delete(value, field) + gotDeletions = true + } + } + } + return gotDeletions +} + +// FilterIntentInput holds info required while filtering the intent for server side apply. +// NOTE: in server side apply an intent is a partial object that only includes the fields and values for which the user has an opinion. +type FilterIntentInput struct { + // the Path of the field being processed. + Path contract.Path + + // the Value for the current Path. + Value interface{} + + // ShouldFilter handle the func that determine if the current Path should be dropped or not. + ShouldFilter func(path contract.Path) bool +} + +// IsPathAllowed returns true when the Path is one of the AllowedPaths. +func IsPathAllowed(allowedPaths []contract.Path) func(path contract.Path) bool { + return func(path contract.Path) bool { + for _, p := range allowedPaths { + // NOTE: we allow everything Equal or one IsParentOf one of the allowed paths. + // e.g. if allowed Path is metadata.labels, we allow both metadata and metadata.labels; + // this is required because allowed Path is called recursively. + if path.Overlaps(p) { + return true + } + } + return false + } +} + +// IsPathNotAllowed returns true when the Path is NOT one of the AllowedPaths. +func IsPathNotAllowed(allowedPaths []contract.Path) func(path contract.Path) bool { + return func(path contract.Path) bool { + isAllowed := IsPathAllowed(allowedPaths) + return !isAllowed(path) + } +} + +// IsPathIgnored returns true when the Path is one of the IgnorePaths. +func IsPathIgnored(ignorePaths []contract.Path) func(path contract.Path) bool { + return func(path contract.Path) bool { + for _, p := range ignorePaths { + if path.Equal(p) { + return true + } + } + return false + } +} diff --git a/pkg/util/ssa/managedfields.go b/pkg/util/ssa/managedfields.go new file mode 100644 index 00000000..f7986d19 --- /dev/null +++ b/pkg/util/ssa/managedfields.go @@ -0,0 +1,180 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package ssa contains utils related to Server-Side-Apply. +package ssa + +import ( + "context" + "encoding/json" + + "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + + "github.com/k3s-io/cluster-api-k3s/pkg/util/contract" +) + +const classicManager = "manager" + +// DropManagedFields modifies the managedFields entries on the object that belong to "manager" (Operation=Update) +// to drop ownership of the given paths if there is no field yet that is managed by `ssaManager`. +// +// If we want to be able to drop fields that were previously owned by the "manager" we have to ensure that +// fields are not co-owned by "manager" and `ssaManager`. Otherwise, when we drop the fields with SSA +// (i.e. `ssaManager`) the fields would remain as they are still owned by "manager". +// The following code will do a one-time update on the managed fields. +// We won't do this on subsequent reconciles. This case will be identified by checking if `ssaManager` owns any fields. +// Dropping ownership in paths for existing "manager" entries (which could also be from other controllers) is safe, +// as we assume that if other controllers are still writing fields on the object they will just do it again and thus +// gain ownership again. +func DropManagedFields(ctx context.Context, c client.Client, obj client.Object, ssaManager string, paths []contract.Path) error { + // Return if `ssaManager` already owns any fields. + if hasFieldsManagedBy(obj, ssaManager) { + return nil + } + + // Since there is no field managed by `ssaManager` it means that + // this is the first time this object is being processed after the controller calling this function + // started to use SSA patches. + // It is required to clean-up managedFields from entries created by the regular patches. + // This will ensure that `ssaManager` will be able to modify the fields that + // were originally owned by "manager". + base := obj.DeepCopyObject().(client.Object) + + // Modify managedFieldEntry for manager=manager and operation=update to drop ownership + // for the given paths to avoid having two managers holding values. + originalManagedFields := obj.GetManagedFields() + managedFields := make([]metav1.ManagedFieldsEntry, 0, len(originalManagedFields)) + for _, managedField := range originalManagedFields { + if managedField.Manager == classicManager && + managedField.Operation == metav1.ManagedFieldsOperationUpdate { + // Unmarshal the managed fields into a map[string]interface{} + fieldsV1 := map[string]interface{}{} + if err := json.Unmarshal(managedField.FieldsV1.Raw, &fieldsV1); err != nil { + return errors.Wrap(err, "failed to unmarshal managed fields") + } + + // Filter out the ownership for the given paths. + FilterIntent(&FilterIntentInput{ + Path: contract.Path{}, + Value: fieldsV1, + ShouldFilter: IsPathIgnored(paths), + }) + + fieldsV1Raw, err := json.Marshal(fieldsV1) + if err != nil { + return errors.Wrap(err, "failed to marshal managed fields") + } + managedField.FieldsV1.Raw = fieldsV1Raw + + managedFields = append(managedFields, managedField) + } else { + // Do not modify the entry. Use as is. + managedFields = append(managedFields, managedField) + } + } + + obj.SetManagedFields(managedFields) + + return c.Patch(ctx, obj, client.MergeFrom(base)) +} + +// CleanUpManagedFieldsForSSAAdoption deletes the managedFields entries on the object that belong to "manager" (Operation=Update) +// if there is no field yet that is managed by `ssaManager`. +// It adds an "empty" entry in managedFields of the object if no field is currently managed by `ssaManager`. +// +// In previous versions of Cluster API K3S (<= v0.2.0) we were writing objects with Create and Patch which resulted in fields +// being owned by the "manager". After switching to Server-Side-Apply (SSA), fields will be owned by `ssaManager`. +// +// If we want to be able to drop fields that were previously owned by the "manager" we have to ensure that +// fields are not co-owned by "manager" and `ssaManager`. Otherwise, when we drop the fields with SSA +// (i.e. `ssaManager`) the fields would remain as they are still owned by "manager". +// The following code will do a one-time update on the managed fields to drop all entries for "manager". +// We won't do this on subsequent reconciles. This case will be identified by checking if `ssaManager` owns any fields. +// Dropping all existing "manager" entries (which could also be from other controllers) is safe, as we assume that if +// other controllers are still writing fields on the object they will just do it again and thus gain ownership again. +func CleanUpManagedFieldsForSSAAdoption(ctx context.Context, c client.Client, obj client.Object, ssaManager string) error { + // Return if `ssaManager` already owns any fields. + if hasFieldsManagedBy(obj, ssaManager) { + return nil + } + + // Since there is no field managed by `ssaManager` it means that + // this is the first time this object is being processed after the controller calling this function + // started to use SSA patches. + // It is required to clean-up managedFields from entries created by the regular patches. + // This will ensure that `ssaManager` will be able to modify the fields that + // were originally owned by "manager". + base := obj.DeepCopyObject().(client.Object) + + // Remove managedFieldEntry for manager=manager and operation=update to prevent having two managers holding values. + originalManagedFields := obj.GetManagedFields() + managedFields := make([]metav1.ManagedFieldsEntry, 0, len(originalManagedFields)) + for i := range originalManagedFields { + if originalManagedFields[i].Manager == classicManager && + originalManagedFields[i].Operation == metav1.ManagedFieldsOperationUpdate { + continue + } + managedFields = append(managedFields, originalManagedFields[i]) + } + + // Add a seeding managedFieldEntry for SSA executed by the management controller, to prevent SSA to create/infer + // a default managedFieldEntry when the first SSA is applied. + // More specifically, if an existing object doesn't have managedFields when applying the first SSA the API server + // creates an entry with operation=Update (kind of guessing where the object comes from), but this entry ends up + // acting as a co-ownership and we want to prevent this. + // NOTE: fieldV1Map cannot be empty, so we add metadata.name which will be cleaned up at the first SSA patch. + fieldV1Map := map[string]interface{}{ + "f:metadata": map[string]interface{}{ + "f:name": map[string]interface{}{}, + }, + } + fieldV1, err := json.Marshal(fieldV1Map) + if err != nil { + return errors.Wrap(err, "failed to create seeding fieldV1Map for cleaning up legacy managed fields") + } + now := metav1.Now() + gvk, err := apiutil.GVKForObject(obj, c.Scheme()) + if err != nil { + return errors.Wrapf(err, "failed to get GroupVersionKind of object %s", klog.KObj(obj)) + } + managedFields = append(managedFields, metav1.ManagedFieldsEntry{ + Manager: ssaManager, + Operation: metav1.ManagedFieldsOperationApply, + APIVersion: gvk.GroupVersion().String(), + Time: &now, + FieldsType: "FieldsV1", + FieldsV1: &metav1.FieldsV1{Raw: fieldV1}, + }) + + obj.SetManagedFields(managedFields) + + return c.Patch(ctx, obj, client.MergeFrom(base)) +} + +// hasFieldsManagedBy returns true if any of the fields in obj are managed by manager. +func hasFieldsManagedBy(obj client.Object, manager string) bool { + managedFields := obj.GetManagedFields() + for _, mf := range managedFields { + if mf.Manager == manager { + return true + } + } + return false +} diff --git a/pkg/util/ssa/matchers.go b/pkg/util/ssa/matchers.go new file mode 100644 index 00000000..834ce1d9 --- /dev/null +++ b/pkg/util/ssa/matchers.go @@ -0,0 +1,113 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ssa + +import ( + "encoding/json" + "fmt" + + "github.com/onsi/gomega/types" + "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/k3s-io/cluster-api-k3s/pkg/util/contract" +) + +// MatchManagedFieldsEntry is a gomega Matcher to check if a ManagedFieldsEntry has the given name and operation. +func MatchManagedFieldsEntry(manager string, operation metav1.ManagedFieldsOperationType) types.GomegaMatcher { + return &managedFieldMatcher{ + manager: manager, + operation: operation, + } +} + +type managedFieldMatcher struct { + manager string + operation metav1.ManagedFieldsOperationType +} + +func (mf *managedFieldMatcher) Match(actual interface{}) (bool, error) { + managedFieldsEntry, ok := actual.(metav1.ManagedFieldsEntry) + if !ok { + return false, fmt.Errorf("expecting metav1.ManagedFieldsEntry got %T", actual) + } + + return managedFieldsEntry.Manager == mf.manager && managedFieldsEntry.Operation == mf.operation, nil +} + +func (mf *managedFieldMatcher) FailureMessage(actual interface{}) string { + managedFieldsEntry := actual.(metav1.ManagedFieldsEntry) + return fmt.Sprintf("Expected ManagedFieldsEntry to match Manager:%s and Operation:%s, got Manager:%s, Operation:%s", + mf.manager, mf.operation, managedFieldsEntry.Manager, managedFieldsEntry.Operation) +} + +func (mf *managedFieldMatcher) NegatedFailureMessage(actual interface{}) string { + managedFieldsEntry := actual.(metav1.ManagedFieldsEntry) + return fmt.Sprintf("Expected ManagedFieldsEntry to not match Manager:%s and Operation:%s, got Manager:%s, Operation:%s", + mf.manager, mf.operation, managedFieldsEntry.Manager, managedFieldsEntry.Operation) +} + +// MatchFieldOwnership is a gomega Matcher to check if path is owned by the given manager and operation. +// Note: The path has to be specified as is observed in managed fields. Example: to check if the labels are owned +// by the correct manager the correct way to pass the path is contract.Path{"f:metadata","f:labels"}. +func MatchFieldOwnership(manager string, operation metav1.ManagedFieldsOperationType, path contract.Path) types.GomegaMatcher { + return &fieldOwnershipMatcher{ + path: path, + manager: manager, + operation: operation, + } +} + +type fieldOwnershipMatcher struct { + path contract.Path + manager string + operation metav1.ManagedFieldsOperationType +} + +func (fom *fieldOwnershipMatcher) Match(actual interface{}) (bool, error) { + managedFields, ok := actual.([]metav1.ManagedFieldsEntry) + if !ok { + return false, fmt.Errorf("expecting []metav1.ManagedFieldsEntry got %T", actual) + } + for _, managedFieldsEntry := range managedFields { + if managedFieldsEntry.Manager == fom.manager && managedFieldsEntry.Operation == fom.operation { + fieldsV1 := map[string]interface{}{} + if err := json.Unmarshal(managedFieldsEntry.FieldsV1.Raw, &fieldsV1); err != nil { + return false, errors.Wrap(err, "failed to parse managedFieldsEntry.FieldsV1") + } + FilterIntent(&FilterIntentInput{ + Path: contract.Path{}, + Value: fieldsV1, + ShouldFilter: IsPathNotAllowed([]contract.Path{fom.path}), + }) + return len(fieldsV1) > 0, nil + } + } + return false, nil +} + +func (fom *fieldOwnershipMatcher) FailureMessage(actual interface{}) string { + managedFields := actual.([]metav1.ManagedFieldsEntry) + return fmt.Sprintf("Expected Path %s to be owned by Manager:%s and Operation:%s, did not find correct ownership: %s", + fom.path, fom.manager, fom.operation, managedFields) +} + +func (fom *fieldOwnershipMatcher) NegatedFailureMessage(actual interface{}) string { + managedFields := actual.([]metav1.ManagedFieldsEntry) + return fmt.Sprintf("Expected Path %s to not be owned by Manager:%s and Operation:%s, did not find correct ownership: %s", + fom.path, fom.manager, fom.operation, managedFields) +} diff --git a/pkg/util/ssa/patch.go b/pkg/util/ssa/patch.go new file mode 100644 index 00000000..dcd5f2a8 --- /dev/null +++ b/pkg/util/ssa/patch.go @@ -0,0 +1,159 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ssa + +import ( + "context" + + "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + + "github.com/k3s-io/cluster-api-k3s/pkg/util/contract" +) + +// Option is the interface for configuration that modifies Options for a patch request. +type Option interface { + // ApplyToOptions applies this configuration to the given Options. + ApplyToOptions(*Options) +} + +// WithCachingProxy enables caching for the patch request. +// The original and modified object will be used to generate an +// identifier for the request. +// The cache will be used to cache the result of the request. +type WithCachingProxy struct { + Cache Cache + Original client.Object +} + +// ApplyToOptions applies WithCachingProxy to the given Options. +func (w WithCachingProxy) ApplyToOptions(in *Options) { + in.WithCachingProxy = true + in.Cache = w.Cache + in.Original = w.Original +} + +// Options contains the options for the Patch func. +type Options struct { + WithCachingProxy bool + Cache Cache + Original client.Object +} + +// Patch executes an SSA patch. +// If WithCachingProxy is set and the request didn't change the object +// we will cache this result, so subsequent calls don't have to run SSA again. +func Patch(ctx context.Context, c client.Client, fieldManager string, modified client.Object, opts ...Option) error { + // Calculate the options. + options := &Options{} + for _, opt := range opts { + opt.ApplyToOptions(options) + } + + // Convert the object to unstructured and filter out fields we don't + // want to set (e.g. metadata creationTimestamp). + // Note: This is necessary to avoid continuous reconciles. + modifiedUnstructured, err := prepareModified(c.Scheme(), modified) + if err != nil { + return err + } + + gvk, err := apiutil.GVKForObject(modifiedUnstructured, c.Scheme()) + if err != nil { + return errors.Wrapf(err, "failed to apply object: failed to get GroupVersionKind of modified object %s", klog.KObj(modifiedUnstructured)) + } + + var requestIdentifier string + if options.WithCachingProxy { + // Check if the request is cached. + requestIdentifier, err = ComputeRequestIdentifier(c.Scheme(), options.Original, modifiedUnstructured) + if err != nil { + return errors.Wrapf(err, "failed to apply object") + } + if options.Cache.Has(requestIdentifier) { + // If the request is cached return the original object. + if err := c.Scheme().Convert(options.Original, modified, ctx); err != nil { + return errors.Wrapf(err, "failed to write original into modified object") + } + // Recover gvk e.g. for logging. + modified.GetObjectKind().SetGroupVersionKind(gvk) + return nil + } + } + + patchOptions := []client.PatchOption{ + client.ForceOwnership, + client.FieldOwner(fieldManager), + } + if err := c.Patch(ctx, modifiedUnstructured, client.Apply, patchOptions...); err != nil { + return errors.Wrapf(err, "failed to apply %s %s", gvk.Kind, klog.KObj(modifiedUnstructured)) + } + + // Write back the modified object so callers can access the patched object. + if err := c.Scheme().Convert(modifiedUnstructured, modified, ctx); err != nil { + return errors.Wrapf(err, "failed to write modified object") + } + + // Recover gvk e.g. for logging. + modified.GetObjectKind().SetGroupVersionKind(gvk) + + if options.WithCachingProxy { + // If the SSA call did not update the object, add the request to the cache. + if options.Original.GetResourceVersion() == modifiedUnstructured.GetResourceVersion() { + options.Cache.Add(requestIdentifier) + } + } + + return nil +} + +// prepareModified converts obj into an Unstructured and filters out undesired fields. +func prepareModified(scheme *runtime.Scheme, obj client.Object) (*unstructured.Unstructured, error) { + u := &unstructured.Unstructured{} + switch obj.(type) { + case *unstructured.Unstructured: + u = obj.DeepCopyObject().(*unstructured.Unstructured) + default: + if err := scheme.Convert(obj, u, nil); err != nil { + return nil, errors.Wrap(err, "failed to convert object to Unstructured") + } + } + + // Only keep the paths that we have opinions on. + FilterObject(u, &FilterObjectInput{ + AllowedPaths: []contract.Path{ + // apiVersion, kind, name and namespace are required field for a server side apply intent. + {"apiVersion"}, + {"kind"}, + {"metadata", "name"}, + {"metadata", "namespace"}, + // uid is optional for a server side apply intent but sets the expectation of an object getting created or a specific one updated. + {"metadata", "uid"}, + // our controllers only have an opinion on labels, annotation, finalizers ownerReferences and spec. + {"metadata", "labels"}, + {"metadata", "annotations"}, + {"metadata", "finalizers"}, + {"metadata", "ownerReferences"}, + {"spec"}, + }, + }) + return u, nil +} diff --git a/test/e2e/config/k3s-docker.yaml b/test/e2e/config/k3s-docker.yaml index 71bdf58d..a2dbed2d 100644 --- a/test/e2e/config/k3s-docker.yaml +++ b/test/e2e/config/k3s-docker.yaml @@ -65,7 +65,7 @@ providers: # ${ProjectRoot}/metadata.yaml to init the management cluster # this version should be updated when ${ProjectRoot}/metadata.yaml # is modified - - name: v0.1.99 # next; use manifest from source files + - name: v0.2.99 # next; use manifest from source files value: "../../../bootstrap/config/default" files: - sourcePath: "../../../metadata.yaml" @@ -73,7 +73,7 @@ providers: - name: k3s type: ControlPlaneProvider versions: - - name: v0.1.99 # next; use manifest from source files + - name: v0.2.99 # next; use manifest from source files value: "../../../controlplane/config/default" files: - sourcePath: "../../../metadata.yaml" diff --git a/test/e2e/e2e_suite_test.go b/test/e2e/e2e_suite_test.go index 90719446..90a49149 100644 --- a/test/e2e/e2e_suite_test.go +++ b/test/e2e/e2e_suite_test.go @@ -41,8 +41,8 @@ import ( "sigs.k8s.io/cluster-api/test/framework/clusterctl" "sigs.k8s.io/cluster-api/test/framework/ginkgoextensions" - bootstrapv1 "github.com/k3s-io/cluster-api-k3s/bootstrap/api/v1beta1" - controlplanev1 "github.com/k3s-io/cluster-api-k3s/controlplane/api/v1beta1" + bootstrapv1 "github.com/k3s-io/cluster-api-k3s/bootstrap/api/v1beta2" + controlplanev1 "github.com/k3s-io/cluster-api-k3s/controlplane/api/v1beta2" dockerinfrav1 "sigs.k8s.io/cluster-api/test/infrastructure/docker/api/v1beta1" ) diff --git a/test/e2e/helpers.go b/test/e2e/helpers.go index 0fb6b693..1a035073 100644 --- a/test/e2e/helpers.go +++ b/test/e2e/helpers.go @@ -34,7 +34,7 @@ import ( "sigs.k8s.io/cluster-api/util/patch" "sigs.k8s.io/controller-runtime/pkg/client" - controlplanev1 "github.com/k3s-io/cluster-api-k3s/controlplane/api/v1beta1" + controlplanev1 "github.com/k3s-io/cluster-api-k3s/controlplane/api/v1beta2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" diff --git a/test/e2e/inplace_rollout_test.go b/test/e2e/inplace_rollout_test.go new file mode 100644 index 00000000..70b206bc --- /dev/null +++ b/test/e2e/inplace_rollout_test.go @@ -0,0 +1,267 @@ +//go:build e2e +// +build e2e + +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "context" + "fmt" + "path/filepath" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "golang.org/x/exp/rand" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" + "k8s.io/utils/pointer" + + controlplanev1 "github.com/k3s-io/cluster-api-k3s/controlplane/api/v1beta2" + clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + "sigs.k8s.io/cluster-api/test/framework" + "sigs.k8s.io/cluster-api/test/framework/clusterctl" + "sigs.k8s.io/cluster-api/util" + "sigs.k8s.io/cluster-api/util/labels" + "sigs.k8s.io/cluster-api/util/patch" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// This test follows CAPI's clusterclass_rollout test to test the inplace mutable fields +// setting on ControlPlane object could be rollout to underlying machines. +// The original test does not apply to k3s cluster as it modified controlPlane fields specific to KubeadmControlPlane. +// Link to CAPI clusterclass_rollout test: https://github.com/kubernetes-sigs/cluster-api/blob/main/test/e2e/clusterclass_rollout.go +var _ = Describe("Inplace mutable fields rollout test", func() { + var ( + ctx = context.TODO() + specName = "inplace-rollout" + namespace *corev1.Namespace + cancelWatches context.CancelFunc + result *ApplyClusterTemplateAndWaitResult + clusterName string + clusterctlLogFolder string + infrastructureProvider string + ) + + BeforeEach(func() { + Expect(e2eConfig.Variables).To(HaveKey(KubernetesVersion)) + + clusterName = fmt.Sprintf("capik3s-inplace-%s", util.RandomString(6)) + infrastructureProvider = "docker" + + // Setup a Namespace where to host objects for this spec and create a watcher for the namespace events. + namespace, cancelWatches = setupSpecNamespace(ctx, specName, bootstrapClusterProxy, artifactFolder) + + result = new(ApplyClusterTemplateAndWaitResult) + + clusterctlLogFolder = filepath.Join(artifactFolder, "clusters", bootstrapClusterProxy.GetName()) + }) + + AfterEach(func() { + cleanInput := cleanupInput{ + SpecName: specName, + Cluster: result.Cluster, + ClusterProxy: bootstrapClusterProxy, + Namespace: namespace, + CancelWatches: cancelWatches, + IntervalsGetter: e2eConfig.GetIntervals, + SkipCleanup: skipCleanup, + ArtifactFolder: artifactFolder, + } + + dumpSpecResourcesAndCleanup(ctx, cleanInput) + }) + + Context("Modifying inplace mutable fields", func() { + It("Should apply new value without triggering rollout", func() { + By("Creating a workload cluster with topology") + ApplyClusterTemplateAndWait(ctx, ApplyClusterTemplateAndWaitInput{ + ClusterProxy: bootstrapClusterProxy, + ConfigCluster: clusterctl.ConfigClusterInput{ + LogFolder: clusterctlLogFolder, + ClusterctlConfigPath: clusterctlConfigPath, + KubeconfigPath: bootstrapClusterProxy.GetKubeconfigPath(), + InfrastructureProvider: infrastructureProvider, + Flavor: "topology", + Namespace: namespace.Name, + ClusterName: clusterName, + KubernetesVersion: e2eConfig.GetVariable(KubernetesVersion), + ControlPlaneMachineCount: pointer.Int64Ptr(3), + WorkerMachineCount: pointer.Int64Ptr(1), + }, + WaitForClusterIntervals: e2eConfig.GetIntervals(specName, "wait-cluster"), + WaitForControlPlaneIntervals: e2eConfig.GetIntervals(specName, "wait-control-plane"), + WaitForMachineDeployments: e2eConfig.GetIntervals(specName, "wait-worker-nodes"), + }, result) + + By("Rolling out changes to control plane (in-place)") + machinesBeforeUpgrade := getMachinesByCluster(ctx, bootstrapClusterProxy.GetClient(), result.Cluster) + By("Modifying the control plane configuration via Cluster topology and wait for changes to be applied to the control plane object (in-place)") + modifyControlPlaneViaClusterAndWait(ctx, modifyControlPlaneViaClusterAndWaitInput{ + ClusterProxy: bootstrapClusterProxy, + Cluster: result.Cluster, + ModifyControlPlaneTopology: func(topology *clusterv1.ControlPlaneTopology) { + // Drop existing labels and annotations and set new ones. + topology.Metadata.Labels = map[string]string{ + "Cluster.topology.controlPlane.newLabel": "Cluster.topology.controlPlane.newLabelValue", + } + topology.Metadata.Annotations = map[string]string{ + "Cluster.topology.controlPlane.newAnnotation": "Cluster.topology.controlPlane.newAnnotationValue", + } + topology.NodeDrainTimeout = &metav1.Duration{Duration: time.Duration(rand.Intn(20)) * time.Second} //nolint:gosec + topology.NodeDeletionTimeout = &metav1.Duration{Duration: time.Duration(rand.Intn(20)) * time.Second} //nolint:gosec + topology.NodeVolumeDetachTimeout = &metav1.Duration{Duration: time.Duration(rand.Intn(20)) * time.Second} //nolint:gosec + }, + WaitForControlPlane: e2eConfig.GetIntervals(specName, "wait-control-plane"), + }) + + By("Verifying there are no unexpected rollouts through in-place rollout") + Consistently(func(g Gomega) { + machinesAfterUpgrade := getMachinesByCluster(ctx, bootstrapClusterProxy.GetClient(), result.Cluster) + g.Expect(machinesAfterUpgrade.Equal(machinesBeforeUpgrade)).To(BeTrue(), "Machines must not be replaced through in-place rollout") + }, 30*time.Second, 1*time.Second).Should(Succeed()) + }) + }) +}) + +// modifyControlPlaneViaClusterAndWaitInput is the input type for modifyControlPlaneViaClusterAndWait. +type modifyControlPlaneViaClusterAndWaitInput struct { + ClusterProxy framework.ClusterProxy + Cluster *clusterv1.Cluster + ModifyControlPlaneTopology func(topology *clusterv1.ControlPlaneTopology) + WaitForControlPlane []interface{} +} + +// modifyControlPlaneViaClusterAndWait modifies the ControlPlaneTopology of a Cluster topology via ModifyControlPlaneTopology. +// It then waits until the changes are rolled out to the ControlPlane of the Cluster. +func modifyControlPlaneViaClusterAndWait(ctx context.Context, input modifyControlPlaneViaClusterAndWaitInput) { + Expect(ctx).NotTo(BeNil(), "ctx is required for modifyControlPlaneViaClusterAndWait") + Expect(input.ClusterProxy).ToNot(BeNil(), "Invalid argument. input.ClusterProxy can't be nil when calling modifyControlPlaneViaClusterAndWait") + Expect(input.Cluster).ToNot(BeNil(), "Invalid argument. input.Cluster can't be nil when calling modifyControlPlaneViaClusterAndWait") + + mgmtClient := input.ClusterProxy.GetClient() + + Byf("Modifying the control plane topology of Cluster %s", klog.KObj(input.Cluster)) + + // Patch the control plane topology in the Cluster. + patchHelper, err := patch.NewHelper(input.Cluster, mgmtClient) + Expect(err).ToNot(HaveOccurred()) + input.ModifyControlPlaneTopology(&input.Cluster.Spec.Topology.ControlPlane) + Expect(patchHelper.Patch(ctx, input.Cluster)).To(Succeed()) + + // NOTE: We only wait until the change is rolled out to the control plane object and not to the control plane machines. + Byf("Waiting for control plane rollout to complete.") + Eventually(func(g Gomega) { + // Get the ControlPlane. + controlPlaneRef := input.Cluster.Spec.ControlPlaneRef + controlPlaneTopology := input.Cluster.Spec.Topology.ControlPlane + + // Get KThreesControlPlane object. + cpObj := &controlplanev1.KThreesControlPlane{} + kcpObjKey := client.ObjectKey{ + Namespace: input.Cluster.Namespace, + Name: controlPlaneRef.Name, + } + err = mgmtClient.Get(ctx, kcpObjKey, cpObj) + g.Expect(err).ToNot(HaveOccurred()) + + // Verify that the fields from Cluster topology are set on the control plane. + assertControlPlaneTopologyFields(g, cpObj, controlPlaneTopology) + + // Verify that the control plane machines have the required fields. + cluster := input.Cluster + controlPlaneMachineList := &clusterv1.MachineList{} + g.Expect(mgmtClient.List(ctx, controlPlaneMachineList, client.InNamespace(cluster.Namespace), client.MatchingLabels{ + clusterv1.MachineControlPlaneLabel: "", + clusterv1.ClusterNameLabel: cluster.Name, + })).To(Succeed()) + for _, m := range controlPlaneMachineList.Items { + metadata := m.ObjectMeta + for k, v := range controlPlaneTopology.Metadata.Labels { + g.Expect(metadata.Labels).To(HaveKeyWithValue(k, v)) + } + for k, v := range controlPlaneTopology.Metadata.Annotations { + g.Expect(metadata.Annotations).To(HaveKeyWithValue(k, v)) + } + + if controlPlaneTopology.NodeDrainTimeout != nil { + nodeDrainTimeout := m.Spec.NodeDrainTimeout + g.Expect(nodeDrainTimeout).To(Equal(controlPlaneTopology.NodeDrainTimeout)) + } + + if controlPlaneTopology.NodeDeletionTimeout != nil { + nodeDeletionTimeout := m.Spec.NodeDeletionTimeout + g.Expect(nodeDeletionTimeout).To(Equal(controlPlaneTopology.NodeDeletionTimeout)) + } + + if controlPlaneTopology.NodeVolumeDetachTimeout != nil { + nodeVolumeDetachTimeout := m.Spec.NodeVolumeDetachTimeout + g.Expect(nodeVolumeDetachTimeout).To(Equal(controlPlaneTopology.NodeVolumeDetachTimeout)) + } + } + }, input.WaitForControlPlane...).Should(Succeed()) +} + +// assertControlPlaneTopologyFields asserts that all fields set in the ControlPlaneTopology have been set on the ControlPlane. +// Note: We intentionally focus on the fields set in the ControlPlaneTopology and ignore the ones set through ClusterClass or +// ControlPlane template as we want to validate that the fields of the ControlPlaneTopology have been propagated correctly. +func assertControlPlaneTopologyFields(g Gomega, controlPlane *controlplanev1.KThreesControlPlane, controlPlaneTopology clusterv1.ControlPlaneTopology) { + metadata := controlPlane.ObjectMeta + for k, v := range controlPlaneTopology.Metadata.Labels { + g.Expect(metadata.Labels).To(HaveKeyWithValue(k, v)) + } + for k, v := range controlPlaneTopology.Metadata.Annotations { + g.Expect(metadata.Annotations).To(HaveKeyWithValue(k, v)) + } + + if controlPlaneTopology.NodeDrainTimeout != nil { + nodeDrainTimeout := controlPlane.Spec.MachineTemplate.NodeDrainTimeout + g.Expect(nodeDrainTimeout).To(Equal(controlPlaneTopology.NodeDrainTimeout)) + } + + if controlPlaneTopology.NodeDeletionTimeout != nil { + nodeDeletionTimeout := controlPlane.Spec.MachineTemplate.NodeDeletionTimeout + g.Expect(nodeDeletionTimeout).To(Equal(controlPlaneTopology.NodeDeletionTimeout)) + } + + if controlPlaneTopology.NodeVolumeDetachTimeout != nil { + nodeVolumeDetachTimeout := controlPlane.Spec.MachineTemplate.NodeVolumeDetachTimeout + g.Expect(nodeVolumeDetachTimeout).To(Equal(controlPlaneTopology.NodeVolumeDetachTimeout)) + } +} + +// getMachinesByCluster gets the Machines of a Cluster and returns them as a Set of Machine names. +// Note: This excludes MachinePool Machines as they are not replaced by rollout yet. +func getMachinesByCluster(ctx context.Context, client client.Client, cluster *clusterv1.Cluster) sets.Set[string] { + machines := sets.Set[string]{} + machinesByCluster := framework.GetMachinesByCluster(ctx, framework.GetMachinesByClusterInput{ + Lister: client, + ClusterName: cluster.Name, + Namespace: cluster.Namespace, + }) + for i := range machinesByCluster { + m := machinesByCluster[i] + if !labels.IsMachinePoolOwned(&m) { + machines.Insert(m.Name) + } + } + return machines +}