Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: yaroslavborbat <[email protected]>
  • Loading branch information
yaroslavborbat committed Dec 5, 2024
1 parent 5c5875a commit d8e0a60
Showing 1 changed file with 93 additions and 56 deletions.
Original file line number Diff line number Diff line change
@@ -1,32 +1,28 @@
diff --git a/pkg/indexers/indexers.go b/pkg/indexers/indexers.go
new file mode 100644
index 0000000000..2729cd89a6
--- /dev/null
+++ b/pkg/indexers/indexers.go
@@ -0,0 +1,23 @@
+package indexers
+
+import (
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/client-go/tools/cache"
+)
+
+const PodByNode = "PodByNode"
+
+func NewPodByNodeIndexer() cache.Indexers {
+ return cache.Indexers{
+ "PodByNode": func(obj interface{}) ([]string, error) {
+ pod, ok := obj.(*corev1.Pod)
+ if !ok {
+ return nil, nil
+ }
+ if pod.Spec.NodeName == "" {
+ return nil, nil
+ }
+ return []string{pod.Spec.NodeName}, nil
+ },
+ }
+}
diff --git a/pkg/controller/virtinformers.go b/pkg/controller/virtinformers.go
index 72d94b53b5..3a0e46da4a 100644
--- a/pkg/controller/virtinformers.go
+++ b/pkg/controller/virtinformers.go
@@ -1385,7 +1385,19 @@ func (f *kubeInformerFactory) StorageClass() cache.SharedIndexInformer {
func (f *kubeInformerFactory) Pod() cache.SharedIndexInformer {
return f.getInformer("podInformer", func() cache.SharedIndexInformer {
lw := cache.NewListWatchFromClient(f.clientSet.CoreV1().RESTClient(), "pods", k8sv1.NamespaceAll, fields.Everything())
- return cache.NewSharedIndexInformer(lw, &k8sv1.Pod{}, f.defaultResync, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
+ return cache.NewSharedIndexInformer(lw, &k8sv1.Pod{}, f.defaultResync, cache.Indexers{
+ cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
+ "node": func(obj interface{}) ([]string, error) {
+ pod, ok := obj.(*k8sv1.Pod)
+ if !ok {
+ return nil, nil
+ }
+ if pod.Spec.NodeName == "" {
+ return nil, nil
+ }
+ return []string{pod.Spec.NodeName}, nil
+ },
+ })
})
}

diff --git a/pkg/util/affinity/nodeaffinity.go b/pkg/util/affinity/nodeaffinity.go
new file mode 100644
index 0000000000..eeadaa6a99
Expand Down Expand Up @@ -417,8 +413,22 @@ index 0000000000..b16c2f365f
+ }
+ return false
+}
diff --git a/pkg/virt-controller/watch/application.go b/pkg/virt-controller/watch/application.go
index f80a0653ad..d2f0bd36f4 100644
--- a/pkg/virt-controller/watch/application.go
+++ b/pkg/virt-controller/watch/application.go
@@ -638,6 +638,9 @@ func (vca *VirtControllerApp) initCommon() {
vca.cdiConfigInformer,
vca.clusterConfig,
topologyHinter,
+ vca.allPodInformer,
+ vca.namespaceInformer,
+ vca.nodeInformer,
)
if err != nil {
panic(err)
diff --git a/pkg/virt-controller/watch/vmi.go b/pkg/virt-controller/watch/vmi.go
index 0c4bfca389..6bd1ecf76a 100644
index 0c4bfca389..fa4e86ee17 100644
--- a/pkg/virt-controller/watch/vmi.go
+++ b/pkg/virt-controller/watch/vmi.go
@@ -69,6 +69,10 @@ import (
Expand All @@ -427,31 +437,52 @@ index 0c4bfca389..6bd1ecf76a 100644
"kubevirt.io/kubevirt/pkg/virt-controller/watch/descheduler"
+
+ "k8s.io/apimachinery/pkg/labels"
+ "kubevirt.io/kubevirt/pkg/indexers"
+
+ "kubevirt.io/kubevirt/pkg/util/affinity"
)

const (
@@ -114,6 +118,10 @@ func NewVMIController(templateService services.TemplateService,
@@ -92,6 +96,9 @@ func NewVMIController(templateService services.TemplateService,
cdiConfigInformer cache.SharedIndexInformer,
clusterConfig *virtconfig.ClusterConfig,
topologyHinter topology.Hinter,
+ allPodInformer cache.SharedIndexInformer,
+ namespaceInformer cache.SharedIndexInformer,
+ nodeInformer cache.SharedIndexInformer,
) (*VMIController, error) {

c := &VMIController{
@@ -112,12 +119,17 @@ func NewVMIController(templateService services.TemplateService,
topologyHinter: topologyHinter,
cidsMap: newCIDsMap(),
backendStorage: backendstorage.NewBackendStorage(clientset, clusterConfig, storageClassInformer.GetStore(), storageProfileInformer.GetStore(), pvcInformer.GetIndexer()),
+
+ allPodIndexer: allPodInformer.GetIndexer(),
+ namespaceIndexer: namespaceInformer.GetIndexer(),
+ nodeIndexer: nodeInformer.GetIndexer(),
}

+ if err := podInformer.AddIndexers(indexers.NewPodByNodeIndexer()); err != nil {
+ return nil, fmt.Errorf("failed to add pod indexers: %w", err)
+ }
+
c.hasSynced = func() bool {
return vmInformer.HasSynced() && vmiInformer.HasSynced() && podInformer.HasSynced() &&
dataVolumeInformer.HasSynced() && cdiConfigInformer.HasSynced() && cdiInformer.HasSynced() &&
@@ -208,6 +216,7 @@ type VMIController struct {
vmiIndexer cache.Indexer
vmStore cache.Store
podIndexer cache.Indexer
+ nodeIndexer cache.Indexer
pvcIndexer cache.Indexer
storageClassStore cache.Store
topologyHinter topology.Hinter
@@ -691,6 +700,10 @@ func (c *VMIController) updateStatus(vmi *virtv1.VirtualMachineInstance, pod *k8
- pvcInformer.HasSynced() && storageClassInformer.HasSynced() && storageProfileInformer.HasSynced()
+ pvcInformer.HasSynced() && storageClassInformer.HasSynced() && storageProfileInformer.HasSynced() &&
+ allPodInformer.HasSynced() && namespaceInformer.HasSynced() && nodeInformer.HasSynced()
}

_, err := vmiInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
@@ -221,6 +233,10 @@ type VMIController struct {
cidsMap *cidsMap
backendStorage *backendstorage.BackendStorage
hasSynced func() bool
+
+ allPodIndexer cache.Indexer
+ namespaceIndexer cache.Indexer
+ nodeIndexer cache.Indexer
}

func (c *VMIController) Run(threadiness int, stopCh <-chan struct{}) {
@@ -691,6 +707,10 @@ func (c *VMIController) updateStatus(vmi *virtv1.VirtualMachineInstance, pod *k8
c.syncVolumesUpdate(vmiCopy)
}

Expand All @@ -462,7 +493,7 @@ index 0c4bfca389..6bd1ecf76a 100644
case vmi.IsScheduled():
if !vmiPodExists {
vmiCopy.Status.Phase = virtv1.Failed
@@ -2416,6 +2429,166 @@ func (c *VMIController) syncVolumesUpdate(vmi *virtv1.VirtualMachineInstance) {
@@ -2416,6 +2436,172 @@ func (c *VMIController) syncVolumesUpdate(vmi *virtv1.VirtualMachineInstance) {
vmiConditions.UpdateCondition(vmi, &condition)
}

Expand Down Expand Up @@ -533,10 +564,16 @@ index 0c4bfca389..6bd1ecf76a 100644
+ if pod == nil || templatePod == nil {
+ return false, fmt.Errorf("pod or templatePod must not be nil")
+ }
+ node, err := c.clientset.CoreV1().Nodes().Get(context.Background(), pod.Spec.NodeName, v1.GetOptions{})
+ templatePod.Namespace = pod.Namespace
+ templatePod.Name = pod.Name
+ obj, exist, err := c.nodeIndexer.GetByKey(pod.Spec.NodeName)
+ if err != nil {
+ return false, err
+ }
+ node := obj.(*k8sv1.Node)
+ if !exist || node == nil {
+ return false, fmt.Errorf("not found node %s", pod.Spec.NodeName)
+ }
+
+ requiredNodeSelectorAndAffinity := affinity.GetRequiredNodeAffinity(templatePod)
+ match, err := requiredNodeSelectorAndAffinity.Match(node)
Expand All @@ -556,12 +593,10 @@ index 0c4bfca389..6bd1ecf76a 100644
+ for _, p := range pods {
+ podNamespaces[p.GetNamespace()] = struct{}{}
+ }
+ allNamespaces, err := c.clientset.CoreV1().Namespaces().List(context.Background(), v1.ListOptions{})
+ if err != nil {
+ return false, err
+ }
+ allNamespaces := c.namespaceIndexer.List()
+ namespaceLabels := make(map[string]labels.Set, len(podNamespaces))
+ for _, ns := range allNamespaces.Items {
+ for _, o := range allNamespaces {
+ ns := o.(*k8sv1.Namespace)
+ if _, ok := podNamespaces[ns.GetName()]; ok {
+ namespaceLabels[ns.GetName()] = ns.GetLabels()
+ }
Expand Down Expand Up @@ -604,14 +639,16 @@ index 0c4bfca389..6bd1ecf76a 100644
+
+// listPodsByNode takes a node and returns all Pods from the pod cache which run on this node
+func (c *VMIController) listPodsByNode(node string) ([]*k8sv1.Pod, error) {
+ objs, err := c.podIndexer.ByIndex(indexers.PodByNode, node)
+ objs, err := c.allPodIndexer.ByIndex("node", node)
+ if err != nil {
+ return nil, err
+ }
+ var pods []*k8sv1.Pod
+ pods := make([]*k8sv1.Pod, 0, len(objs))
+ for _, obj := range objs {
+ pod := obj.(*k8sv1.Pod)
+ pods = append(pods, pod)
+ pod, ok := obj.(*k8sv1.Pod)
+ if ok {
+ pods = append(pods, pod)
+ }
+ }
+ return pods, nil
+}
Expand Down

0 comments on commit d8e0a60

Please sign in to comment.