Skip to content

Commit

Permalink
chore(qrm): refactor back tracking calculator to only consider per-po…
Browse files Browse the repository at this point in the history
…d one numa and stop iterating only when the one satisfied solution is found
  • Loading branch information
luomingmeng committed Nov 22, 2024
1 parent 6e451b7 commit ced90fa
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"sync"
"time"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/allocation"
Expand All @@ -43,11 +42,11 @@ const (
)

type backTrackingCalculator struct {
emitter metrics.MetricEmitter
metaServer *metaserver.MetaServer
reservedCPUs machine.CPUSet
maxNUMANum int
numaNodes []int
emitter metrics.MetricEmitter
metaServer *metaserver.MetaServer
numaNodes []int
reservedCPUs machine.CPUSet
maxIterateTime time.Duration

simpleCalculator NUMABindingCalculator

Expand All @@ -62,12 +61,11 @@ func NewBackTrackingCalculator(
reservedCPUs machine.CPUSet,
) NUMABindingCalculator {
return &backTrackingCalculator{
emitter: emitter,
metaServer: metaServer,
// todo: use numa num from conf
maxNUMANum: 1,
emitter: emitter,
metaServer: metaServer,
numaNodes: metaServer.CPUDetails.NUMANodes().ToSliceInt(),
reservedCPUs: reservedCPUs,
maxIterateTime: 30 * time.Second,
simpleCalculator: NewGreedyCalculator(),
}
}
Expand Down Expand Up @@ -175,12 +173,11 @@ func (b *backTrackingCalculator) sync(ctx context.Context) {
func (b *backTrackingCalculator) getAllNUMABindingResults(podAllocation allocation.PodAllocations,
numaAllocatable state.NUMAResource,
) ([][]numaBindingResult, map[int]string, error) {
numaBindingNUMAs := b.getNUMABindingNUMAs(podAllocation)
resultsIndex := make(map[int]string)
allNUMABindingResults := make([][]numaBindingResult, 0, len(podAllocation))
for podUID, alloc := range podAllocation {
resultsIndex[len(allNUMABindingResults)] = podUID
results, err := getNUMABindingResults(alloc, b.numaNodes, numaAllocatable, b.maxNUMANum, numaBindingNUMAs)
results, err := getNUMABindingResults(alloc, b.numaNodes, numaAllocatable)
if err != nil {
general.Errorf("get numa allocation for %s failed: %v", alloc.String(), err)
return nil, nil, err
Expand All @@ -194,83 +191,48 @@ func (b *backTrackingCalculator) mergeNUMABindingResults(results [][]numaBinding
podAllocation allocation.PodAllocations, numaAllocatable state.NUMAResource,
) []numaBindingResult {
var optimalResults []numaBindingResult
optimalNUMACount := 0
minNonNUMABindingCount := 0
withAllPodNUMABindingResult := false
iterateAllNUMABindingResults(results, func(results []numaBindingResult) {
current := numaAllocatable.Clone()
if b.maxNUMANum > 1 {
maxNUMACount := 1
maxNUMANodeAffinity, _ := bitmask.NewBitMask(1)
for _, result := range results {
if result.numaNodeAffinity.Count() > maxNUMACount {
maxNUMACount = result.numaNodeAffinity.Count()
maxNUMANodeAffinity = result.numaNodeAffinity
}
}

if maxNUMACount > 1 {
if withAllPodNUMABindingResult {
return
}

totalAllocatable := state.Resource{}
for i := 0; i < maxNUMACount; i++ {
totalAllocatable.AddResource(numaAllocatable[i])
}

nonNUMABindingCount := 0
for i, result := range results {
if bitmask.And(result.numaNodeAffinity, maxNUMANodeAffinity).Count() != 0 {
if !totalAllocatable.IsSatisfied(podAllocation[index[i]]) {
return
}
totalAllocatable.SubAllocation(podAllocation[index[i]])
nonNUMABindingCount += 1
} else {
numaID := result.numaNodeAffinity.GetBits()[0]
if !current[numaID].IsSatisfied(podAllocation[index[i]]) {
return
}
current[numaID].SubAllocation(podAllocation[index[i]])
}
}
optimalNUMACount = len(b.numaNodes) - maxNUMACount
if nonNUMABindingCount < minNonNUMABindingCount {
minNonNUMABindingCount = nonNUMABindingCount
optimalResults = deepCopyNUMABindingResults(results)
}
return
optimalNotSatisfiedPodCount := len(results)
iterateCount := int64(0)
begin := time.Now()
iterateAllNUMABindingResults(results, func(results []numaBindingResult) bool {
iterateCount++
if iterateCount > 10000 {
iterateCount = 0
d := time.Since(begin)
if d > b.maxIterateTime {
general.Infof("iterate all numa binding results %d times, costs %v", iterateCount, d)
return false
}
}

numaMask := bitmask.NewEmptyBitMask()
var currentResults []numaBindingResult
current := numaAllocatable.Clone()
notSatisfiedPodCount := 0
for i, result := range results {
numaID := result.numaNodeAffinity.GetBits()[0]
if !current[numaID].IsSatisfied(podAllocation[index[i]]) {
return
notSatisfiedPodCount += 1
if notSatisfiedPodCount > optimalNotSatisfiedPodCount {
return true
}
continue
}
numaMask.Or(result.numaNodeAffinity)
current[numaID].SubAllocation(podAllocation[index[i]])
currentResults = append(currentResults, result)
}
numaCount := numaMask.Count()
if !withAllPodNUMABindingResult || numaCount < optimalNUMACount {
optimalNUMACount = numaCount
optimalResults = deepCopyNUMABindingResults(results)
withAllPodNUMABindingResult = true

if notSatisfiedPodCount < optimalNotSatisfiedPodCount {
optimalResults = deepCopyNUMABindingResults(currentResults)
optimalNotSatisfiedPodCount = notSatisfiedPodCount
}
})
return optimalResults
}

func (b *backTrackingCalculator) getNUMABindingNUMAs(podAllocation allocation.PodAllocations) sets.Int {
numaSet := sets.NewInt()
for _, alloc := range podAllocation {
if alloc.BindingNUMA != -1 {
numaSet.Insert(alloc.BindingNUMA)
if notSatisfiedPodCount == 0 {
return false
}
}
return numaSet
return true
})

return optimalResults
}

// numaBindingResult is a struct containing the numaNodeAffinity for a pod
Expand All @@ -287,7 +249,7 @@ func deepCopyNUMABindingResults(results []numaBindingResult) []numaBindingResult
}

func getNUMABindingResults(allocation *allocation.Allocation, numaNodes []int,
numaAllocatable state.NUMAResource, maxNUMANum int, numaBindingNUMAs sets.Int,
numaAllocatable state.NUMAResource,
) ([]numaBindingResult, error) {
numaBindingResults := make([]numaBindingResult, 0, len(numaNodes))
for _, n := range numaNodes {
Expand All @@ -305,36 +267,6 @@ func getNUMABindingResults(allocation *allocation.Allocation, numaNodes []int,
})
}

if allocation.BindingNUMA != -1 {
return numaBindingResults, nil
}

maxNUMANum = general.Min(maxNUMANum, len(numaNodes))
if maxNUMANum > 1 {
for i := 2; i < maxNUMANum; i++ {
m := bitmask.NewEmptyBitMask()
totalResource := state.Resource{}
for j := 0; j < i; j++ {
err := m.Add(j)
if err != nil {
return nil, err
}
totalResource.AddResource(numaAllocatable[j])
}

if numaBindingNUMAs.Intersection(sets.NewInt(m.GetBits()...)).Len() > 0 {
continue
}

if !totalResource.IsSatisfied(allocation) {
continue
}

numaBindingResults = append(numaBindingResults, numaBindingResult{
numaNodeAffinity: m,
})
}
}
return numaBindingResults, nil
}

Expand All @@ -358,21 +290,25 @@ func getNUMABindingResults(allocation *allocation.Allocation, numaNodes []int,
// allNUMABindingResults[-1][z]
// }
// callback(permutation)
func iterateAllNUMABindingResults(allNUMABindingResults [][]numaBindingResult, callback func([]numaBindingResult)) {
func iterateAllNUMABindingResults(allNUMABindingResults [][]numaBindingResult, callback func([]numaBindingResult) bool) {
// Internal helper function to accumulate the permutation before calling the callback.
var iterate func(i int, accum []numaBindingResult)
iterate = func(i int, accum []numaBindingResult) {
var iterate func(i int, accum []numaBindingResult) bool
iterate = func(i int, accum []numaBindingResult) bool {
// Base case: we have looped through all providers and have a full permutation.
if i == len(allNUMABindingResults) {
callback(accum)
return
return callback(accum)
}

// Loop through all hints for provider 'i', and recurse to build the
// the permutation of this hint with all hints from providers 'i++'.
for j := range allNUMABindingResults[i] {
iterate(i+1, append(accum, allNUMABindingResults[i][j]))
con := iterate(i+1, append(accum, allNUMABindingResults[i][j]))
if !con {
return false
}
}
return true
}

iterate(0, []numaBindingResult{})
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ limitations under the License.
package calculator

import (
"github.com/kubewharf/katalyst-core/pkg/metrics"
"reflect"
"testing"

"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/allocation"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/state"
Expand All @@ -36,11 +36,9 @@ func newBitMask(bits ...int) bitmask.BitMask {
func Test_getNUMABindingResults(t *testing.T) {
t.Parallel()
type args struct {
allocation *allocation.Allocation
numaNodes []int
numaAllocatable state.NUMAResource
maxNUMANum int
numaBindingNUMAs sets.Int
allocation *allocation.Allocation
numaNodes []int
numaAllocatable state.NUMAResource
}
tests := []struct {
name string
Expand Down Expand Up @@ -80,11 +78,9 @@ func Test_getNUMABindingResults(t *testing.T) {
Memory: 10000,
},
},
maxNUMANum: 1,
numaNodes: []int{
0, 1, 2, 3,
},
numaBindingNUMAs: sets.NewInt(),
},
want: []numaBindingResult{
{
Expand All @@ -106,7 +102,7 @@ func Test_getNUMABindingResults(t *testing.T) {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
got, err := getNUMABindingResults(tt.args.allocation, tt.args.numaNodes, tt.args.numaAllocatable, tt.args.maxNUMANum, tt.args.numaBindingNUMAs)
got, err := getNUMABindingResults(tt.args.allocation, tt.args.numaNodes, tt.args.numaAllocatable)
if (err != nil) != tt.wantErr {
t.Errorf("getNUMABindingResults() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down Expand Up @@ -229,7 +225,7 @@ func Test_backTrackingCalculator_asyncCalculateNUMABindingResult(t *testing.T) {
CPUMilli: 2000,
Memory: 1000,
},
BindingNUMA: 1,
BindingNUMA: 0,
},
"pod3": &allocation.Allocation{
NamespacedName: types.NamespacedName{
Expand All @@ -251,7 +247,7 @@ func Test_backTrackingCalculator_asyncCalculateNUMABindingResult(t *testing.T) {
CPUMilli: 4000,
Memory: 1000,
},
BindingNUMA: 0,
BindingNUMA: 2,
},
},
want1: true,
Expand All @@ -262,8 +258,8 @@ func Test_backTrackingCalculator_asyncCalculateNUMABindingResult(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
b := &backTrackingCalculator{
maxNUMANum: tt.fields.maxNUMANum,
numaNodes: tt.fields.numaNodes,
emitter: metrics.DummyMetrics{},
numaNodes: tt.fields.numaNodes,
}
got, got1, err := b.asyncCalculateNUMABindingResult(tt.args.current, tt.args.numaAllocatable)
if (err != nil) != tt.wantErr {
Expand Down

0 comments on commit ced90fa

Please sign in to comment.