Skip to content

Commit

Permalink
feat: worker选取及恢复机制优化 TencentBlueKing#311
Browse files Browse the repository at this point in the history
  • Loading branch information
flyy1012 committed Nov 20, 2024
1 parent 885fb00 commit d25d27e
Showing 1 changed file with 21 additions and 21 deletions.
42 changes: 21 additions & 21 deletions src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,9 +649,7 @@ func (wr *resource) handleLock(ctx context.Context) {
case <-wr.emptyChan:
wr.onSlotEmpty()
case <-ticker.C:
if wr.waitingList.Len() > 0 {
go wr.occupyWaitList()
}
wr.occupyWaitList()
}
}
}
Expand Down Expand Up @@ -733,7 +731,7 @@ func (wr *resource) putSlot(msg lockWorkerMessage) {
} else if !hasAvailableWorker {
msg.result <- nil
wr.waitingList.Remove(e)
blog.Debugf("remote slot: occupy waiting list, but no slot available for ban worker list %v, just turn it local", msg.banWorkerList)
blog.Infof("remote slot: occupy waiting list, but no slot available for ban worker list %v, just turn it local", msg.banWorkerList)
} else {
blog.Debugf("remote slot: occupy waiting list, but no slot available %v", msg.banWorkerList)
}
Expand All @@ -754,23 +752,25 @@ func (wr *resource) onSlotEmpty() {
}

func (wr *resource) occupyWaitList() {
for e := wr.waitingList.Front(); e != nil; e = e.Next() {
msg := e.Value.(*lockWorkerMessage)
set := wr.getUsageSet(msg.jobUsage)
if wr.isIdle(set) {
h, hasAvailableWorker := wr.occupyWorkerSlots(msg.largeFile, msg.banWorkerList)
if h != nil {
set.occupied++
wr.occupiedSlots++
msg.result <- h
wr.waitingList.Remove(e)
blog.Debugf("remote slot: occupy waiting list")
} else if !hasAvailableWorker { // no slot available for ban worker list, turn it local
blog.Infof("remote slot: occupy waiting list, but no slot available for ban worker list %v, just turn it local", msg.banWorkerList)
msg.result <- nil
wr.waitingList.Remove(e)
} else {
blog.Debugf("remote slot: occupy waiting list, but no slot available %v", msg.banWorkerList)
if wr.waitingList.Len() > 0 {
for e := wr.waitingList.Front(); e != nil; e = e.Next() {
msg := e.Value.(*lockWorkerMessage)
set := wr.getUsageSet(msg.jobUsage)
if wr.isIdle(set) {
h, hasAvailableWorker := wr.occupyWorkerSlots(msg.largeFile, msg.banWorkerList)
if h != nil {
set.occupied++
wr.occupiedSlots++
msg.result <- h
wr.waitingList.Remove(e)
blog.Debugf("remote slot: occupy waiting list")
} else if !hasAvailableWorker { // no slot available for ban worker list, turn it local
blog.Infof("remote slot: occupy waiting list, but no slot available for ban worker list %v, just turn it local", msg.banWorkerList)
msg.result <- nil
wr.waitingList.Remove(e)
} else {
blog.Debugf("remote slot: occupy waiting list, but no slot available %v", msg.banWorkerList)
}
}
}
}
Expand Down

0 comments on commit d25d27e

Please sign in to comment.