From c75f1969d7c1b5ae64fec35252d1e8134d8de94f Mon Sep 17 00:00:00 2001 From: yaroslavborbat Date: Wed, 13 Nov 2024 17:26:12 +0300 Subject: [PATCH] fix Signed-off-by: yaroslavborbat --- ...uto-migrate-if-nodeplacement-changed.patch | 662 +++++++++++++++++- .../controller/vmchange/pod_placement_ce.go | 6 +- .../pkg/version/edition.go | 6 +- templates/kubevirt/kubevirt.yaml | 6 + 4 files changed, 654 insertions(+), 26 deletions(-) diff --git a/images/virt-artifact/patches/024-auto-migrate-if-nodeplacement-changed.patch b/images/virt-artifact/patches/024-auto-migrate-if-nodeplacement-changed.patch index 52fb571cc..d1492b344 100644 --- a/images/virt-artifact/patches/024-auto-migrate-if-nodeplacement-changed.patch +++ b/images/virt-artifact/patches/024-auto-migrate-if-nodeplacement-changed.patch @@ -1,52 +1,631 @@ +diff --git a/pkg/indexers/indexers.go b/pkg/indexers/indexers.go +new file mode 100644 +index 0000000000..2729cd89a6 +--- /dev/null ++++ b/pkg/indexers/indexers.go +@@ -0,0 +1,23 @@ ++package indexers ++ ++import ( ++ corev1 "k8s.io/api/core/v1" ++ "k8s.io/client-go/tools/cache" ++) ++ ++const PodByNode = "PodByNode" ++ ++func NewPodByNodeIndexer() cache.Indexers { ++ return cache.Indexers{ ++ "PodByNode": func(obj interface{}) ([]string, error) { ++ pod, ok := obj.(*corev1.Pod) ++ if !ok { ++ return nil, nil ++ } ++ if pod.Spec.NodeName == "" { ++ return nil, nil ++ } ++ return []string{pod.Spec.NodeName}, nil ++ }, ++ } ++} +diff --git a/pkg/util/affinity/nodeaffinity.go b/pkg/util/affinity/nodeaffinity.go +new file mode 100644 +index 0000000000..eeadaa6a99 +--- /dev/null ++++ b/pkg/util/affinity/nodeaffinity.go +@@ -0,0 +1,253 @@ ++/* ++Copyright 2020 The Kubernetes Authors. ++ ++Licensed under the Apache License, Version 2.0 (the "License"); ++you may not use this file except in compliance with the License. ++You may obtain a copy of the License at ++ ++ http://www.apache.org/licenses/LICENSE-2.0 ++ ++Unless required by applicable law or agreed to in writing, software ++distributed under the License is distributed on an "AS IS" BASIS, ++WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++See the License for the specific language governing permissions and ++limitations under the License. ++*/ ++ ++package affinity ++ ++import ( ++ v1 "k8s.io/api/core/v1" ++ "k8s.io/apimachinery/pkg/fields" ++ "k8s.io/apimachinery/pkg/labels" ++ "k8s.io/apimachinery/pkg/selection" ++ "k8s.io/apimachinery/pkg/util/errors" ++ "k8s.io/apimachinery/pkg/util/validation/field" ++) ++ ++// LazyErrorNodeSelector is a runtime representation of v1.NodeSelector that ++// only reports parse errors when no terms match. ++type LazyErrorNodeSelector struct { ++ terms []nodeSelectorTerm ++} ++ ++// NewLazyErrorNodeSelector creates a NodeSelector that only reports parse ++// errors when no terms match. ++func NewLazyErrorNodeSelector(ns *v1.NodeSelector, opts ...field.PathOption) *LazyErrorNodeSelector { ++ p := field.ToPath(opts...) ++ parsedTerms := make([]nodeSelectorTerm, 0, len(ns.NodeSelectorTerms)) ++ path := p.Child("nodeSelectorTerms") ++ for i, term := range ns.NodeSelectorTerms { ++ // nil or empty term selects no objects ++ if isEmptyNodeSelectorTerm(&term) { ++ continue ++ } ++ p := path.Index(i) ++ parsedTerms = append(parsedTerms, newNodeSelectorTerm(&term, p)) ++ } ++ return &LazyErrorNodeSelector{ ++ terms: parsedTerms, ++ } ++} ++ ++// Match checks whether the node labels and fields match the selector terms, ORed; ++// nil or empty term matches no objects. ++// Parse errors are only returned if no terms matched. ++func (ns *LazyErrorNodeSelector) Match(node *v1.Node) (bool, error) { ++ if node == nil { ++ return false, nil ++ } ++ nodeLabels := labels.Set(node.Labels) ++ nodeFields := extractNodeFields(node) ++ ++ var errs []error ++ for _, term := range ns.terms { ++ match, tErrs := term.match(nodeLabels, nodeFields) ++ if len(tErrs) > 0 { ++ errs = append(errs, tErrs...) ++ continue ++ } ++ if match { ++ return true, nil ++ } ++ } ++ return false, errors.Flatten(errors.NewAggregate(errs)) ++} ++ ++func isEmptyNodeSelectorTerm(term *v1.NodeSelectorTerm) bool { ++ return len(term.MatchExpressions) == 0 && len(term.MatchFields) == 0 ++} ++ ++func extractNodeFields(n *v1.Node) fields.Set { ++ f := make(fields.Set) ++ if len(n.Name) > 0 { ++ f["metadata.name"] = n.Name ++ } ++ return f ++} ++ ++type nodeSelectorTerm struct { ++ matchLabels labels.Selector ++ matchFields fields.Selector ++ parseErrs []error ++} ++ ++func newNodeSelectorTerm(term *v1.NodeSelectorTerm, path *field.Path) nodeSelectorTerm { ++ var parsedTerm nodeSelectorTerm ++ var errs []error ++ if len(term.MatchExpressions) != 0 { ++ p := path.Child("matchExpressions") ++ parsedTerm.matchLabels, errs = nodeSelectorRequirementsAsSelector(term.MatchExpressions, p) ++ if errs != nil { ++ parsedTerm.parseErrs = append(parsedTerm.parseErrs, errs...) ++ } ++ } ++ if len(term.MatchFields) != 0 { ++ p := path.Child("matchFields") ++ parsedTerm.matchFields, errs = nodeSelectorRequirementsAsFieldSelector(term.MatchFields, p) ++ if errs != nil { ++ parsedTerm.parseErrs = append(parsedTerm.parseErrs, errs...) ++ } ++ } ++ return parsedTerm ++} ++ ++func (t *nodeSelectorTerm) match(nodeLabels labels.Set, nodeFields fields.Set) (bool, []error) { ++ if t.parseErrs != nil { ++ return false, t.parseErrs ++ } ++ if t.matchLabels != nil && !t.matchLabels.Matches(nodeLabels) { ++ return false, nil ++ } ++ if t.matchFields != nil && len(nodeFields) > 0 && !t.matchFields.Matches(nodeFields) { ++ return false, nil ++ } ++ return true, nil ++} ++ ++var validSelectorOperators = []v1.NodeSelectorOperator{ ++ v1.NodeSelectorOpIn, ++ v1.NodeSelectorOpNotIn, ++ v1.NodeSelectorOpExists, ++ v1.NodeSelectorOpDoesNotExist, ++ v1.NodeSelectorOpGt, ++ v1.NodeSelectorOpLt, ++} ++ ++// nodeSelectorRequirementsAsSelector converts the []NodeSelectorRequirement api type into a struct that implements ++// labels.Selector. ++func nodeSelectorRequirementsAsSelector(nsm []v1.NodeSelectorRequirement, path *field.Path) (labels.Selector, []error) { ++ if len(nsm) == 0 { ++ return labels.Nothing(), nil ++ } ++ var errs []error ++ selector := labels.NewSelector() ++ for i, expr := range nsm { ++ p := path.Index(i) ++ var op selection.Operator ++ switch expr.Operator { ++ case v1.NodeSelectorOpIn: ++ op = selection.In ++ case v1.NodeSelectorOpNotIn: ++ op = selection.NotIn ++ case v1.NodeSelectorOpExists: ++ op = selection.Exists ++ case v1.NodeSelectorOpDoesNotExist: ++ op = selection.DoesNotExist ++ case v1.NodeSelectorOpGt: ++ op = selection.GreaterThan ++ case v1.NodeSelectorOpLt: ++ op = selection.LessThan ++ default: ++ errs = append(errs, field.NotSupported(p.Child("operator"), expr.Operator, validSelectorOperators)) ++ continue ++ } ++ r, err := labels.NewRequirement(expr.Key, op, expr.Values, field.WithPath(p)) ++ if err != nil { ++ errs = append(errs, err) ++ } else { ++ selector = selector.Add(*r) ++ } ++ } ++ if len(errs) != 0 { ++ return nil, errs ++ } ++ return selector, nil ++} ++ ++var validFieldSelectorOperators = []v1.NodeSelectorOperator{ ++ v1.NodeSelectorOpIn, ++ v1.NodeSelectorOpNotIn, ++} ++ ++// nodeSelectorRequirementsAsFieldSelector converts the []NodeSelectorRequirement core type into a struct that implements ++// fields.Selector. ++func nodeSelectorRequirementsAsFieldSelector(nsr []v1.NodeSelectorRequirement, path *field.Path) (fields.Selector, []error) { ++ if len(nsr) == 0 { ++ return fields.Nothing(), nil ++ } ++ var errs []error ++ ++ var selectors []fields.Selector ++ for i, expr := range nsr { ++ p := path.Index(i) ++ switch expr.Operator { ++ case v1.NodeSelectorOpIn: ++ if len(expr.Values) != 1 { ++ errs = append(errs, field.Invalid(p.Child("values"), expr.Values, "must have one element")) ++ } else { ++ selectors = append(selectors, fields.OneTermEqualSelector(expr.Key, expr.Values[0])) ++ } ++ ++ case v1.NodeSelectorOpNotIn: ++ if len(expr.Values) != 1 { ++ errs = append(errs, field.Invalid(p.Child("values"), expr.Values, "must have one element")) ++ } else { ++ selectors = append(selectors, fields.OneTermNotEqualSelector(expr.Key, expr.Values[0])) ++ } ++ ++ default: ++ errs = append(errs, field.NotSupported(p.Child("operator"), expr.Operator, validFieldSelectorOperators)) ++ } ++ } ++ ++ if len(errs) != 0 { ++ return nil, errs ++ } ++ return fields.AndSelectors(selectors...), nil ++} ++ ++type RequiredNodeAffinity struct { ++ labelSelector labels.Selector ++ nodeSelector *LazyErrorNodeSelector ++} ++ ++// GetRequiredNodeAffinity returns the parsing result of pod's nodeSelector and nodeAffinity. ++func GetRequiredNodeAffinity(pod *v1.Pod) RequiredNodeAffinity { ++ var selector labels.Selector ++ if len(pod.Spec.NodeSelector) > 0 { ++ selector = labels.SelectorFromSet(pod.Spec.NodeSelector) ++ } ++ // Use LazyErrorNodeSelector for backwards compatibility of parsing errors. ++ var affinity *LazyErrorNodeSelector ++ if pod.Spec.Affinity != nil && ++ pod.Spec.Affinity.NodeAffinity != nil && ++ pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil { ++ affinity = NewLazyErrorNodeSelector(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution) ++ } ++ return RequiredNodeAffinity{labelSelector: selector, nodeSelector: affinity} ++} ++ ++// Match checks whether the pod is schedulable onto nodes according to ++// the requirements in both nodeSelector and nodeAffinity. ++func (s RequiredNodeAffinity) Match(node *v1.Node) (bool, error) { ++ if s.labelSelector != nil { ++ if !s.labelSelector.Matches(labels.Set(node.Labels)) { ++ return false, nil ++ } ++ } ++ if s.nodeSelector != nil { ++ return s.nodeSelector.Match(node) ++ } ++ return true, nil ++} +diff --git a/pkg/util/affinity/podaffinity.go b/pkg/util/affinity/podaffinity.go +new file mode 100644 +index 0000000000..f3fd29a831 +--- /dev/null ++++ b/pkg/util/affinity/podaffinity.go +@@ -0,0 +1,104 @@ ++/* ++Copyright 2015 The Kubernetes Authors. ++ ++Licensed under the Apache License, Version 2.0 (the "License"); ++you may not use this file except in compliance with the License. ++You may obtain a copy of the License at ++ ++ http://www.apache.org/licenses/LICENSE-2.0 ++ ++Unless required by applicable law or agreed to in writing, software ++distributed under the License is distributed on an "AS IS" BASIS, ++WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++See the License for the specific language governing permissions and ++limitations under the License. ++*/ ++ ++package affinity ++ ++import ( ++ v1 "k8s.io/api/core/v1" ++ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ++ "k8s.io/apimachinery/pkg/labels" ++ "k8s.io/apimachinery/pkg/util/sets" ++) ++ ++// AffinityTerm is a processed version of v1.PodAffinityTerm. ++type AffinityTerm struct { ++ Namespaces sets.Set[string] ++ Selector labels.Selector ++ TopologyKey string ++ NamespaceSelector labels.Selector ++} ++ ++// Matches returns true if the pod matches the label selector and namespaces or namespace selector. ++func (at *AffinityTerm) Matches(pod *v1.Pod, nsLabels labels.Set) bool { ++ if at.Namespaces.Has(pod.Namespace) || at.NamespaceSelector.Matches(nsLabels) { ++ return at.Selector.Matches(labels.Set(pod.Labels)) ++ } ++ return false ++} ++ ++func newAffinityTerm(pod *v1.Pod, term *v1.PodAffinityTerm) (*AffinityTerm, error) { ++ selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector) ++ if err != nil { ++ return nil, err ++ } ++ ++ namespaces := getNamespacesFromPodAffinityTerm(pod, term) ++ nsSelector, err := metav1.LabelSelectorAsSelector(term.NamespaceSelector) ++ if err != nil { ++ return nil, err ++ } ++ ++ return &AffinityTerm{Namespaces: namespaces, Selector: selector, TopologyKey: term.TopologyKey, NamespaceSelector: nsSelector}, nil ++} ++ ++// GetAffinityTerms receives a Pod and affinity terms and returns the namespaces and ++// selectors of the terms. ++func GetAffinityTerms(pod *v1.Pod, v1Terms []v1.PodAffinityTerm) ([]AffinityTerm, error) { ++ if v1Terms == nil { ++ return nil, nil ++ } ++ ++ var terms []AffinityTerm ++ for i := range v1Terms { ++ t, err := newAffinityTerm(pod, &v1Terms[i]) ++ if err != nil { ++ // We get here if the label selector failed to process ++ return nil, err ++ } ++ terms = append(terms, *t) ++ } ++ return terms, nil ++} ++ ++// returns a set of names according to the namespaces indicated in podAffinityTerm. ++// If namespaces is empty it considers the given pod's namespace. ++func getNamespacesFromPodAffinityTerm(pod *v1.Pod, podAffinityTerm *v1.PodAffinityTerm) sets.Set[string] { ++ names := sets.Set[string]{} ++ if len(podAffinityTerm.Namespaces) == 0 && podAffinityTerm.NamespaceSelector == nil { ++ names.Insert(pod.Namespace) ++ } else { ++ names.Insert(podAffinityTerm.Namespaces...) ++ } ++ return names ++} ++ ++func GetPodAffinityTerms(affinity *v1.Affinity) (terms []v1.PodAffinityTerm) { ++ if affinity != nil && affinity.PodAffinity != nil { ++ if len(affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 { ++ terms = affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution ++ } ++ } ++ return terms ++} ++ ++func GetPodAntiAffinityTerms(affinity *v1.Affinity) (terms []v1.PodAffinityTerm) { ++ if affinity != nil && affinity.PodAntiAffinity != nil { ++ if len(affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 { ++ terms = affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution ++ } ++ } ++ return terms ++} diff --git a/pkg/virt-controller/watch/vmi.go b/pkg/virt-controller/watch/vmi.go -index 0c4bfca389..cf7440e84f 100644 +index 0c4bfca389..cc5c6a6c3e 100644 --- a/pkg/virt-controller/watch/vmi.go +++ b/pkg/virt-controller/watch/vmi.go -@@ -691,6 +691,10 @@ func (c *VMIController) updateStatus(vmi *virtv1.VirtualMachineInstance, pod *k8 +@@ -30,16 +30,20 @@ import ( + "strings" + "time" + ++ "k8s.io/apimachinery/pkg/labels" + "k8s.io/utils/ptr" + ++ "kubevirt.io/kubevirt/pkg/indexers" + "kubevirt.io/kubevirt/pkg/virt-controller/network" + ++ networkv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1" ++ ++ "kubevirt.io/kubevirt/pkg/util/affinity" ++ + "kubevirt.io/kubevirt/pkg/virt-controller/watch/topology" + + backendstorage "kubevirt.io/kubevirt/pkg/storage/backend-storage" + +- networkv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1" +- + k8sv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + k8serrors "k8s.io/apimachinery/pkg/api/errors" +@@ -51,7 +55,6 @@ import ( + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + "k8s.io/utils/trace" +- + virtv1 "kubevirt.io/api/core/v1" + "kubevirt.io/client-go/kubecli" + "kubevirt.io/client-go/log" +@@ -114,6 +117,10 @@ func NewVMIController(templateService services.TemplateService, + backendStorage: backendstorage.NewBackendStorage(clientset, clusterConfig, storageClassInformer.GetStore(), storageProfileInformer.GetStore(), pvcInformer.GetIndexer()), + } + ++ if err := podInformer.AddIndexers(indexers.NewPodByNodeIndexer()); err != nil { ++ return nil, fmt.Errorf("failed to add pod indexers: %w", err) ++ } ++ + c.hasSynced = func() bool { + return vmInformer.HasSynced() && vmiInformer.HasSynced() && podInformer.HasSynced() && + dataVolumeInformer.HasSynced() && cdiConfigInformer.HasSynced() && cdiInformer.HasSynced() && +@@ -208,6 +215,7 @@ type VMIController struct { + vmiIndexer cache.Indexer + vmStore cache.Store + podIndexer cache.Indexer ++ nodeIndexer cache.Indexer + pvcIndexer cache.Indexer + storageClassStore cache.Store + topologyHinter topology.Hinter +@@ -691,6 +699,10 @@ func (c *VMIController) updateStatus(vmi *virtv1.VirtualMachineInstance, pod *k8 c.syncVolumesUpdate(vmiCopy) } -+ if err := c.updateNodePlacementCondition(vmiCopy, pod); err != nil { -+ return fmt.Errorf("failed to update condition %s", virtv1.VirtualMachineInstanceNodePlacementChange) ++ if err := c.syncNodePlacementCondition(vmiCopy, pod); err != nil { ++ return fmt.Errorf("failed to update condition %s", virtv1.VirtualMachineInstanceNodePlacementNotMatched) + } + case vmi.IsScheduled(): if !vmiPodExists { vmiCopy.Status.Phase = virtv1.Failed -@@ -2416,6 +2420,42 @@ func (c *VMIController) syncVolumesUpdate(vmi *virtv1.VirtualMachineInstance) { +@@ -2416,6 +2428,173 @@ func (c *VMIController) syncVolumesUpdate(vmi *virtv1.VirtualMachineInstance) { vmiConditions.UpdateCondition(vmi, &condition) } -+func (c *VMIController) updateNodePlacementCondition(vmi *virtv1.VirtualMachineInstance, pod *k8sv1.Pod) error { ++func (c *VMIController) syncNodePlacementCondition(vmi *virtv1.VirtualMachineInstance, pod *k8sv1.Pod) error { + status := k8sv1.ConditionFalse -+ changed, err := c.isChangedNodePlacement(vmi, pod) ++ templatePod, err := c.templateService.RenderLaunchManifest(vmi) ++ if err != nil { ++ return fmt.Errorf("failed to render pod manifest: %w", err) ++ } ++ changed, err := c.isChangedNodePlacement(pod, templatePod) + if err != nil { + return fmt.Errorf("could not verify if NodePlacement update is required: %w", err) + } + if changed { -+ status = k8sv1.ConditionTrue ++ matched, err := c.nodePlacementIsMatched(pod, templatePod) ++ if err != nil { ++ return fmt.Errorf("failed to verify if NodePlacement update is matched: %w", err) ++ } ++ if !matched { ++ status = k8sv1.ConditionTrue ++ } + } -+ c.syncNodePlacementUpdate(vmi, status) ++ c.syncNodePlacementNotMatchedCondition(vmi, status) + return nil +} + -+func (c *VMIController) isChangedNodePlacement(vmi *virtv1.VirtualMachineInstance, pod *k8sv1.Pod) (bool, error) { -+ if vmi == nil || pod == nil { ++func (c *VMIController) isChangedNodePlacement(pod, templatePod *k8sv1.Pod) (bool, error) { ++ if pod == nil || templatePod == nil { + return false, nil + } -+ templatePod, err := c.templateService.RenderLaunchManifest(vmi) -+ if err != nil { -+ return false, err ++ ++ // when migration controller creating target pod. It will be created with PodAntiAffinity ++ { ++ var antiAffinityTerm *k8sv1.PodAffinityTerm ++ ++ if pod.Spec.Affinity != nil && ++ pod.Spec.Affinity.PodAntiAffinity != nil && ++ len(pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) > 0 { ++ for _, rd := range pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution { ++ if rd.LabelSelector != nil { ++ if _, found := rd.LabelSelector.MatchLabels[virtv1.CreatedByLabel]; found { ++ antiAffinityTerm = rd.DeepCopy() ++ } ++ } ++ } ++ } ++ if antiAffinityTerm != nil { ++ antiAffinityRule := &k8sv1.PodAntiAffinity{ ++ RequiredDuringSchedulingIgnoredDuringExecution: []k8sv1.PodAffinityTerm{*antiAffinityTerm}, ++ } ++ if templatePod.Spec.Affinity == nil { ++ templatePod.Spec.Affinity = &k8sv1.Affinity{ ++ PodAntiAffinity: antiAffinityRule, ++ } ++ } else if templatePod.Spec.Affinity.PodAntiAffinity == nil { ++ templatePod.Spec.Affinity.PodAntiAffinity = antiAffinityRule ++ } else { ++ templatePod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution = append(templatePod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution, *antiAffinityTerm) ++ } ++ } + } + + return !equality.Semantic.DeepEqual(pod.Spec.NodeSelector, templatePod.Spec.NodeSelector) || + !equality.Semantic.DeepEqual(pod.Spec.Affinity, templatePod.Spec.Affinity), nil +} + -+func (c *VMIController) syncNodePlacementUpdate(vmi *virtv1.VirtualMachineInstance, status k8sv1.ConditionStatus) { ++func (c *VMIController) nodePlacementIsMatched(pod, templatePod *k8sv1.Pod) (bool, error) { ++ if pod == nil || templatePod == nil { ++ return false, fmt.Errorf("pod or templatePod must not be nil") ++ } ++ node, err := c.clientset.CoreV1().Nodes().Get(context.Background(), pod.Spec.NodeName, v1.GetOptions{}) ++ if err != nil { ++ return false, err ++ } ++ pods, err := c.listPodsByNode(pod.Spec.NodeName) ++ if err != nil { ++ return false, err ++ } ++ ++ requiredNodeSelectorAndAffinity := affinity.GetRequiredNodeAffinity(templatePod) ++ match, err := requiredNodeSelectorAndAffinity.Match(node) ++ if err != nil { ++ return false, fmt.Errorf("failed to match required node selector and affinity: %w", err) ++ } ++ if !match { ++ return false, nil ++ } ++ ++ podNamespaces := make(map[string]struct{}) ++ for _, p := range pods { ++ podNamespaces[p.GetName()] = struct{}{} ++ } ++ allNamespaces, err := c.clientset.CoreV1().Namespaces().List(context.Background(), v1.ListOptions{}) ++ if err != nil { ++ return false, err ++ } ++ namespaceLabels := make(map[string]labels.Set, len(podNamespaces)) ++ for _, ns := range allNamespaces.Items { ++ if _, ok := podNamespaces[ns.GetName()]; ok { ++ namespaceLabels[ns.GetNamespace()] = ns.GetLabels() ++ } ++ } ++ ++ podAffinityTerms, err := affinity.GetAffinityTerms(templatePod, affinity.GetPodAffinityTerms(templatePod.Spec.Affinity)) ++ if err != nil { ++ return false, err ++ } ++ podAntiAffinityTerms, err := affinity.GetAffinityTerms(templatePod, affinity.GetPodAntiAffinityTerms(templatePod.Spec.Affinity)) ++ if err != nil { ++ return false, err ++ } ++ ++ var podMatchedByPodAffinityFound bool ++ ++ for _, p := range pods { ++ if p.GetUID() == pod.GetUID() { ++ continue ++ } ++ if p.Status.Phase == k8sv1.PodSucceeded || p.Status.Phase == k8sv1.PodFailed { ++ continue ++ } ++ nsLabels := namespaceLabels[p.GetNamespace()] ++ ++ // If at least one matches the podAffinity, then node placement is suitable. ++ if !podMatchedByPodAffinityFound { ++ matched := true ++ for _, podAffinityTerm := range podAffinityTerms { ++ if !podAffinityTerm.Matches(p, nsLabels) { ++ matched = false ++ break ++ } ++ } ++ podMatchedByPodAffinityFound = matched ++ } ++ // If at least one matches the podAntiAffinity, then node placement is not suitable. return false ++ for _, podAntiAffinityTerm := range podAntiAffinityTerms { ++ if podAntiAffinityTerm.Matches(p, nsLabels) { ++ return false, nil ++ } ++ } ++ ++ } ++ ++ return podMatchedByPodAffinityFound, nil ++} ++ ++// listPodsByNode takes a node and returns all Pods from the pod cache which run on this node ++func (c *VMIController) listPodsByNode(node string) ([]*k8sv1.Pod, error) { ++ objs, err := c.podIndexer.ByIndex(indexers.PodByNode, node) ++ if err != nil { ++ return nil, err ++ } ++ var pods []*k8sv1.Pod ++ for _, obj := range objs { ++ pod := obj.(*k8sv1.Pod) ++ pods = append(pods, pod) ++ } ++ return pods, nil ++} ++ ++func (c *VMIController) syncNodePlacementNotMatchedCondition(vmi *virtv1.VirtualMachineInstance, status k8sv1.ConditionStatus) { + vmiConditions := controller.NewVirtualMachineInstanceConditionManager() + condition := virtv1.VirtualMachineInstanceCondition{ -+ Type: virtv1.VirtualMachineInstanceNodePlacementChange, ++ Type: virtv1.VirtualMachineInstanceNodePlacementNotMatched, + Status: status, + LastTransitionTime: v1.Now(), + } @@ -57,7 +636,7 @@ index 0c4bfca389..cf7440e84f 100644 if len(dvs) == 0 { return diff --git a/pkg/virt-controller/watch/workload-updater/workload-updater.go b/pkg/virt-controller/watch/workload-updater/workload-updater.go -index a7d0f76e24..0482b732fe 100644 +index a7d0f76e24..e9205679de 100644 --- a/pkg/virt-controller/watch/workload-updater/workload-updater.go +++ b/pkg/virt-controller/watch/workload-updater/workload-updater.go @@ -214,7 +214,7 @@ func (c *WorkloadUpdateController) updateVmi(_, obj interface{}) { @@ -75,14 +654,57 @@ index a7d0f76e24..0482b732fe 100644 +func isNodePlacementInProgress(vmi *virtv1.VirtualMachineInstance) bool { + return controller.NewVirtualMachineInstanceConditionManager().HasConditionWithStatus(vmi, -+ virtv1.VirtualMachineInstanceNodePlacementChange, k8sv1.ConditionTrue) ++ virtv1.VirtualMachineInstanceNodePlacementNotMatched, k8sv1.ConditionTrue) +} + func (c *WorkloadUpdateController) doesRequireMigration(vmi *virtv1.VirtualMachineInstance) bool { if vmi.IsFinal() || migrationutils.IsMigrating(vmi) { return false +@@ -337,6 +342,9 @@ func (c *WorkloadUpdateController) doesRequireMigration(vmi *virtv1.VirtualMachi + if isVolumesUpdateInProgress(vmi) { + return true + } ++ if isNodePlacementInProgress(vmi) { ++ return true ++ } + + return false + } +@@ -352,6 +360,9 @@ func (c *WorkloadUpdateController) shouldAbortMigration(vmi *virtv1.VirtualMachi + if isVolumesUpdateInProgress(vmi) { + return false + } ++ if isNodePlacementInProgress(vmi) { ++ return false ++ } + if vmi.Status.MigrationState != nil && vmi.Status.MigrationState.TargetNodeDomainReadyTimestamp != nil { + return false + } +diff --git a/pkg/virt-handler/vm.go b/pkg/virt-handler/vm.go +index cdc1f815c3..24352cf6e9 100644 +--- a/pkg/virt-handler/vm.go ++++ b/pkg/virt-handler/vm.go +@@ -3468,6 +3468,7 @@ func (d *VirtualMachineController) finalizeMigration(vmi *v1.VirtualMachineInsta + d.recorder.Event(vmi, k8sv1.EventTypeWarning, err.Error(), "failed to update guest memory") + } + removeMigratedVolumes(vmi) ++ finalizeNodePlacement(vmi) + + options := &cmdv1.VirtualMachineOptions{} + options.InterfaceMigration = domainspec.BindingMigrationByInterfaceName(vmi.Spec.Domain.Devices.Interfaces, d.clusterConfig.GetNetworkBindings()) +@@ -3684,6 +3685,10 @@ func (d *VirtualMachineController) hotplugMemory(vmi *v1.VirtualMachineInstance, + return nil + } + ++func finalizeNodePlacement(vmi *v1.VirtualMachineInstance) { ++ controller.NewVirtualMachineInstanceConditionManager().RemoveCondition(vmi, v1.VirtualMachineInstanceNodePlacementNotMatched) ++} ++ + func removeMigratedVolumes(vmi *v1.VirtualMachineInstance) { + vmiConditions := controller.NewVirtualMachineInstanceConditionManager() + vmiConditions.RemoveCondition(vmi, v1.VirtualMachineInstanceVolumesChange) diff --git a/staging/src/kubevirt.io/api/core/v1/types.go b/staging/src/kubevirt.io/api/core/v1/types.go -index 7aa814d8f1..b7e5792a71 100644 +index 7aa814d8f1..841387d304 100644 --- a/staging/src/kubevirt.io/api/core/v1/types.go +++ b/staging/src/kubevirt.io/api/core/v1/types.go @@ -568,6 +568,9 @@ const ( @@ -91,7 +713,7 @@ index 7aa814d8f1..b7e5792a71 100644 VirtualMachineInstanceDataVolumesReady VirtualMachineInstanceConditionType = "DataVolumesReady" + + // Indicates that the VMI has affinity or nodeSelector changes -+ VirtualMachineInstanceNodePlacementChange VirtualMachineInstanceConditionType = "NodePlacementChange" ++ VirtualMachineInstanceNodePlacementNotMatched VirtualMachineInstanceConditionType = "NodePlacementNotMatched" ) // These are valid reasons for VMI conditions. diff --git a/images/virtualization-artifact/pkg/controller/vmchange/pod_placement_ce.go b/images/virtualization-artifact/pkg/controller/vmchange/pod_placement_ce.go index 5823cf867..442d68276 100644 --- a/images/virtualization-artifact/pkg/controller/vmchange/pod_placement_ce.go +++ b/images/virtualization-artifact/pkg/controller/vmchange/pod_placement_ce.go @@ -1,3 +1,6 @@ +//go:build !EE +// +build !EE + /* Copyright 2024 Flant JSC @@ -14,9 +17,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -//go:build !EE -// +build !EE - package vmchange const placementAction = ActionRestart diff --git a/images/virtualization-artifact/pkg/version/edition.go b/images/virtualization-artifact/pkg/version/edition.go index d3b50731d..cc15b792f 100644 --- a/images/virtualization-artifact/pkg/version/edition.go +++ b/images/virtualization-artifact/pkg/version/edition.go @@ -1,3 +1,6 @@ +//go:build !EE +// +build !EE + /* Copyright 2024 Flant JSC @@ -14,9 +17,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -//go:build !EE -// +build !EE - package version const edition = "CE" diff --git a/templates/kubevirt/kubevirt.yaml b/templates/kubevirt/kubevirt.yaml index 6f3213798..b80339b32 100644 --- a/templates/kubevirt/kubevirt.yaml +++ b/templates/kubevirt/kubevirt.yaml @@ -22,6 +22,12 @@ spec: tokenBucketRateLimiter: qps: 5000 burst: 6000 + migrations: + bandwidthPerMigration: 64Mi + completionTimeoutPerGiB: 800 + parallelMigrationsPerCluster: 5 + parallelOutboundMigrationsPerNode: 2 + progressTimeout: 150 smbios: manufacturer: Flant family: Deckhouse