Skip to content

Commit

Permalink
lxd/devices: fix deviceTaskBalance() to handle cpu pinning on isolate…
Browse files Browse the repository at this point in the history
…d CPUs

Signed-off-by: Alexander Mikhalitsyn <aleksandr.mikhalitsyn@canonical.com>
  • Loading branch information
mihalicyn committed Jan 20, 2025
1 parent 0b7b0f6 commit 8ab389b
Showing 1 changed file with 82 additions and 51 deletions.
133 changes: 82 additions & 51 deletions lxd/devices.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import (
)

type deviceTaskCPU struct {
id int64
strID string
count *int
id int64
strID string
count *int
isolated bool
}

type deviceTaskCPUs []deviceTaskCPU
Expand All @@ -36,6 +37,19 @@ func (c deviceTaskCPUs) Len() int { return len(c) }
func (c deviceTaskCPUs) Less(i, j int) bool { return *c[i].count < *c[j].count }
func (c deviceTaskCPUs) Swap(i, j int) { c[i], c[j] = c[j], c[i] }

type instanceCPUPinningMap map[instance.Instance][]string

func (pinning instanceCPUPinningMap) add(inst instance.Instance, cpu deviceTaskCPU) {
id := cpu.strID
_, ok := pinning[inst]
if ok {
pinning[inst] = append(pinning[inst], id)
} else {
pinning[inst] = []string{id}
}
*cpu.count += 1
}

func deviceNetlinkListener() (chan []string, chan []string, chan device.USBEvent, chan device.UnixHotplugEvent, error) {
NETLINK_KOBJECT_UEVENT := 15 //nolint:revive
UEVENT_BUFFER_SIZE := 2048 //nolint:revive
Expand Down Expand Up @@ -298,7 +312,7 @@ func deviceNetlinkListener() (chan []string, chan []string, chan device.USBEvent
* The `loadBalancing` flag indicates whether the CPU pinning should be load balanced or not (e.g, NUMA placement when `limits.cpu` is a single number which means
* a required number of vCPUs per instance that can be chosen within a CPU pool).
*/
func fillFixedInstances(fixedInstances map[int64][]instance.Instance, inst instance.Instance, effectiveCpus []int64, targetCPUPool []int64, targetCPUNum int, loadBalancing bool) {
func fillFixedInstances(fixedInstances map[int64][]instance.Instance, inst instance.Instance, usableCpus []int64, targetCPUPool []int64, targetCPUNum int, loadBalancing bool) {
if len(targetCPUPool) < targetCPUNum {
diffCount := len(targetCPUPool) - targetCPUNum
logger.Warn("Insufficient CPUs in the target pool for required pinning: reducing required CPUs to match available CPUs", logger.Ctx{"available": len(targetCPUPool), "required": targetCPUNum, "difference": -diffCount})
Expand All @@ -308,7 +322,7 @@ func fillFixedInstances(fixedInstances map[int64][]instance.Instance, inst insta
// If the `targetCPUPool` has been manually specified (explicit CPU IDs/ranges specified with `limits.cpu`)
if len(targetCPUPool) == targetCPUNum && !loadBalancing {
for _, nr := range targetCPUPool {
if !shared.ValueInSlice(nr, effectiveCpus) {
if !shared.ValueInSlice(nr, usableCpus) {
continue
}

Expand Down Expand Up @@ -366,12 +380,12 @@ func fillFixedInstances(fixedInstances map[int64][]instance.Instance, inst insta
}
}

func getCPULists() (effectiveCpus string, cpus []int64, err error) {
func getCPULists() (effectiveCpus string, cpus []int64, isolCpus []int64, err error) {
// Get effective cpus list - those are all guaranteed to be online
cg, err := cgroup.NewFileReadWriter(1, true)
if err != nil {
logger.Error("Unable to load cgroup writer", logger.Ctx{"err": err})
return "", nil, err
return "", nil, nil, err
}

effectiveCpus, err = cg.GetEffectiveCpuset()
Expand All @@ -380,20 +394,20 @@ func getCPULists() (effectiveCpus string, cpus []int64, err error) {
effectiveCpus, err = cg.GetCpuset()
if err != nil {
logger.Error("Error reading host's cpuset.cpus", logger.Ctx{"err": err, "cpuset.cpus": effectiveCpus})
return "", nil, err
return "", nil, nil, err
}
}

effectiveCpusInt, err := resources.ParseCpuset(effectiveCpus)
if err != nil {
logger.Error("Error parsing effective CPU set", logger.Ctx{"err": err, "cpuset.cpus": effectiveCpus})
return "", nil, err
return "", nil, nil, err
}

isolatedCpusInt := resources.GetCPUIsolated()
isolCpus = resources.GetCPUIsolated()
effectiveCpusSlice := []string{}
for _, id := range effectiveCpusInt {
if shared.ValueInSlice(id, isolatedCpusInt) {
if shared.ValueInSlice(id, isolCpus) {
continue
}

Expand All @@ -404,10 +418,10 @@ func getCPULists() (effectiveCpus string, cpus []int64, err error) {
cpus, err = resources.ParseCpuset(effectiveCpus)
if err != nil {
logger.Error("Error parsing host's cpu set", logger.Ctx{"cpuset": effectiveCpus, "err": err})
return "", nil, err
return "", nil, nil, err
}

return effectiveCpus, cpus, nil
return effectiveCpus, cpus, isolCpus, nil
}

func getNumaNodeToCPUMap() (numaNodeToCPU map[int64][]int64, err error) {
Expand All @@ -431,6 +445,32 @@ func getNumaNodeToCPUMap() (numaNodeToCPU map[int64][]int64, err error) {
return numaNodeToCPU, err
}

func makeCPUUsageMap(cpus []int64, isolCpus []int64) map[int64]deviceTaskCPU {
usage := map[int64]deviceTaskCPU{}
for _, id := range cpus {
cpu := deviceTaskCPU{}
cpu.id = id
cpu.strID = fmt.Sprintf("%d", id)
count := 0
cpu.count = &count

usage[id] = cpu
}

for _, id := range isolCpus {
cpu := deviceTaskCPU{}
cpu.id = id
cpu.strID = fmt.Sprintf("%d", id)
count := 0
cpu.count = &count
cpu.isolated = true

usage[id] = cpu
}

return usage
}

func getNumaCPUs(numaNodeToCPU map[int64][]int64, cpuNodes string) ([]int64, error) {
var numaCpus []int64
if cpuNodes != "" {
Expand Down Expand Up @@ -473,7 +513,7 @@ func deviceTaskBalance(s *state.State) {
return
}

effectiveCpus, cpus, err := getCPULists()
effectiveCpus, cpus, isolCpus, err := getCPULists()
if err != nil {
return
}
Expand Down Expand Up @@ -549,65 +589,56 @@ func deviceTaskBalance(s *state.State) {
logger.Warn("The pinned CPUs override the NUMA configuration CPUs", logger.Ctx{"pinnedCPUs": instanceCpus, "numaCPUs": numaCpus})
}

fillFixedInstances(fixedInstances, c, cpus, instanceCpus, len(instanceCpus), false)
usableCpus := cpus
//
// For VM instances, historically, we allow explicit pinning to a CPUs
// listed in "isolcpus" kernel parameter. While for containers it was
// never allowed and let's keep it like this.
//
if c.Type() == instancetype.VM {
usableCpus = append(usableCpus, isolCpus...)
}

fillFixedInstances(fixedInstances, c, usableCpus, instanceCpus, len(instanceCpus), false)
}
}

// Balance things
pinning := map[instance.Instance][]string{}
usage := map[int64]deviceTaskCPU{}

for _, id := range cpus {
cpu := deviceTaskCPU{}
cpu.id = id
cpu.strID = fmt.Sprintf("%d", id)
count := 0
cpu.count = &count

usage[id] = cpu
}
pinning := instanceCPUPinningMap{}
usage := makeCPUUsageMap(cpus, isolCpus)

for cpu, ctns := range fixedInstances {
// Handle instances with explicit CPU pinnings
for cpu, instances := range fixedInstances {
c, ok := usage[cpu]
if !ok {
logger.Error("Internal error: instance using unavailable cpu")
continue
}

id := c.strID
for _, ctn := range ctns {
_, ok := pinning[ctn]
if ok {
pinning[ctn] = append(pinning[ctn], id)
} else {
pinning[ctn] = []string{id}
}
*c.count += 1
for _, inst := range instances {
pinning.add(inst, c)
}
}

sortedUsage := make(deviceTaskCPUs, 0)
for _, value := range usage {
sortedUsage = append(sortedUsage, value)
for _, cpu := range usage {
if cpu.isolated {
continue
}

sortedUsage = append(sortedUsage, cpu)
}

for ctn, count := range balancedInstances {
// Handle instances where automatic CPU pinning is needed
for inst, cpusToPin := range balancedInstances {
sort.Sort(sortedUsage)
for _, cpu := range sortedUsage {
if count == 0 {
if cpusToPin == 0 {
break
}

count -= 1

id := cpu.strID
_, ok := pinning[ctn]
if ok {
pinning[ctn] = append(pinning[ctn], id)
} else {
pinning[ctn] = []string{id}
}
*cpu.count += 1
cpusToPin -= 1
pinning.add(inst, cpu)
}
}

Expand Down

0 comments on commit 8ab389b

Please sign in to comment.