Skip to content

Commit

Permalink
feat(qrm): support numa binding manager
Browse files Browse the repository at this point in the history
  • Loading branch information
luomingmeng committed Nov 8, 2024
1 parent 1017f1c commit fcdde91
Show file tree
Hide file tree
Showing 12 changed files with 1,578 additions and 0 deletions.
14 changes: 14 additions & 0 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/validator"
cpuutil "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/util"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/numabinding"
"github.com/kubewharf/katalyst-core/pkg/agent/utilcomponent/periodicalhandler"
"github.com/kubewharf/katalyst-core/pkg/config"
dynamicconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic"
Expand Down Expand Up @@ -116,6 +117,8 @@ type DynamicPolicy struct {
transitionPeriod time.Duration
cpuNUMAHintPreferPolicy string
cpuNUMAHintPreferLowThreshold float64

sharedNUNMABindingManager numabinding.Manager
}

func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration,
Expand Down Expand Up @@ -217,6 +220,11 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration
return false, agent.ComponentStub{}, fmt.Errorf("dynamic policy initReclaimPool failed with error: %v", err)
}

policyImplement.sharedNUNMABindingManager, err = numabinding.NewSharedNUMABindingManager(conf, wrappedEmitter, agentCtx.MetaServer, NewCPUAllocationUpdater(stateImpl))
if err != nil {
return false, agent.ComponentStub{}, err
}

err = agentCtx.MetaServer.ConfigurationManager.AddConfigWatcher(crd.AdminQoSConfigurationGVR)
if err != nil {
return false, nil, err
Expand Down Expand Up @@ -328,6 +336,7 @@ func (p *DynamicPolicy) Start() (err error) {
return
}
go p.advisorMonitor.Run(p.stopCh)
go p.sharedNUNMABindingManager.Run(p.stopCh)

go wait.BackoffUntil(func() { p.serveForAdvisor(p.stopCh) }, wait.NewExponentialBackoffManager(
800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 0, &clock.RealClock{}), true, p.stopCh)
Expand Down Expand Up @@ -676,6 +685,11 @@ func (p *DynamicPolicy) GetTopologyHints(ctx context.Context,
if p.hintHandlers[qosLevel] == nil {
return nil, fmt.Errorf("katalyst QoS level: %s is not supported yet", qosLevel)
}

if p.sharedNUNMABindingManager.IsProcessing() {
return nil, fmt.Errorf("numa binding manager is processing")
}

return p.hintHandlers[qosLevel](ctx, req)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
Copyright 2022 The Katalyst Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package dynamicpolicy

import (
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/allocation"
)

type cpuAllocationUpdater struct {
state state.State
}

func NewCPUAllocationUpdater(state state.State) allocation.Updater {
return &cpuAllocationUpdater{state: state}
}

func (s *cpuAllocationUpdater) UpdateAllocation(m allocation.PodAllocations) error {
// todo
return nil
}
124 changes: 124 additions & 0 deletions pkg/agent/qrm-plugins/util/allocation/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
Copyright 2022 The Katalyst Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package allocation

import (
"context"
"fmt"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"

"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/memory/dynamicpolicy/state"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/native"
)

type PodAllocations map[string]*Allocation

func (p *PodAllocations) Clone() PodAllocations {
a := make(PodAllocations)
for uid, allocation := range *p {
a[uid] = allocation
}
return a
}

type Request struct {
CPUMilli int64
Memory int64
}

type Allocation struct {
types.NamespacedName
Request
BindingNUMA int
}

func (n *Allocation) Clone() Allocation {
if n == nil {
return Allocation{}
}
return Allocation{
BindingNUMA: n.BindingNUMA,
NamespacedName: n.NamespacedName,
Request: Request{
CPUMilli: n.CPUMilli,
Memory: n.Memory,
},
}
}

func (n *Allocation) String() string {
if n == nil {
return ""
}
return fmt.Sprintf("%s/%d", n.NamespacedName, n.BindingNUMA)
}

type Updater interface {
UpdateAllocation(PodAllocations) error
}

func GetPodAllocations(ctx context.Context, metaServer *metaserver.MetaServer, memoryState state.ReadonlyState) (PodAllocations, error) {
allocations := make(PodAllocations)
podEntries := memoryState.GetPodResourceEntries()[v1.ResourceMemory]
for uid, podEntry := range podEntries {
for _, container := range podEntry {
if !container.CheckMainContainer() || !container.CheckShared() {
continue
}

bindingNUMA := -1
if container.CheckNUMABinding() {
bindingNUMA = container.NumaAllocationResult.ToSliceNoSortInt()[0]
}

pod, err := metaServer.GetPod(ctx, uid)
if err != nil {
general.Errorf("get pod %s failed: %v", uid, err)
return nil, err
}

if !native.PodIsActive(pod) {
continue
}

allocations[uid] = &Allocation{
NamespacedName: types.NamespacedName{
Namespace: pod.Namespace,
Name: pod.Name,
},
BindingNUMA: bindingNUMA,
Request: getPodRequest(pod),
}
}
}

return allocations, nil
}

func getPodRequest(pod *v1.Pod) Request {
req := native.SumUpPodRequestResources(pod)
cpuRequest := native.CPUQuantityGetter()(req)
memoryRequest := native.MemoryQuantityGetter()(req)
return Request{
CPUMilli: cpuRequest.MilliValue(),
Memory: memoryRequest.Value(),
}
}
Loading

0 comments on commit fcdde91

Please sign in to comment.