Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resctrl collector fixes #3326

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion resctrl/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package resctrl

import (
"fmt"
"math/rand"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -62,12 +63,14 @@ func (c *collector) setup() error {
c.running = true
}
go func() {
time.Sleep(time.Duration(rand.Float64() * float64(1*c.interval)))
var tStart time.Time
for {
time.Sleep(c.interval)
c.mu.Lock()
if c.destroyed {
break
}
tStart = time.Now()
klog.V(5).Infof("Trying to check %q containers control group.", c.id)
if c.running {
err = c.checkMonitoringGroup()
Expand All @@ -80,9 +83,12 @@ func (c *collector) setup() error {
if err != nil {
c.running = false
klog.Errorf("Failed to setup container %q resctrl collector: %s \n Trying again in next intervals.", c.id, err)
} else {
c.running = true
}
}
c.mu.Unlock()
time.Sleep(c.interval - time.Since(tStart))
}
}()
} else {
Expand Down
109 changes: 64 additions & 45 deletions resctrl/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ var (
modeFileName: {},
sizeFileName: {},
}

errNotEnoughPIDs = fmt.Errorf("there should be all pids in group")
errTooManyPIDs = fmt.Errorf("group should have container pids only")
)

func Setup() error {
Expand Down Expand Up @@ -104,7 +107,7 @@ func prepareMonitoringGroup(containerName string, getContainerPids func() ([]str
return rootResctrl, nil
}

pids, err := getContainerPids()
pids, err := getPids(containerName)
Comment on lines -107 to +110
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is getContainerPids() used (it does not seem to be)? If not, would it be possible to drop the argument?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be, but it touches quite a bunch of definitions. I'll follow up on this if that works for you:)

if err != nil {
return "", err
}
Expand All @@ -113,60 +116,80 @@ func prepareMonitoringGroup(containerName string, getContainerPids func() ([]str
return "", fmt.Errorf("couldn't obtain %q container pids: there is no pids in cgroup", containerName)
}

if !inHostNamespace {
processPath = "/rootfs/proc"
}
var processThreads []string
for _, pid := range pids {
pt, err := getAllProcessThreads(filepath.Join(processPath, strconv.Itoa(pid), processTask))
if err != nil {
return "", err
}
processThreads = append(processThreads, pt...)
}

// Firstly, find the control group to which the container belongs.
// Consider the root group.
controlGroupPath, err := findGroup(rootResctrl, pids, true, false)
if err != nil {
controlGroupPath, err := findGroup(rootResctrl, processThreads, true, false)
if err != nil && err != errNotEnoughPIDs && err != errTooManyPIDs {
return "", fmt.Errorf("%q %q: %q", noControlGroupFoundError, containerName, err)
}
if controlGroupPath == "" {
return "", fmt.Errorf("%q %q", noControlGroupFoundError, containerName)
}

// Check if there is any monitoring group.
monGroupPath, err := findGroup(filepath.Join(controlGroupPath, monGroupsDirName), pids, false, true)
if err != nil {
return "", fmt.Errorf("couldn't find monitoring group matching %q container: %v", containerName, err)
// Remove leading prefix.
// e.g. /my/container -> my/container
if len(containerName) >= minContainerNameLen && containerName[0] == containerPrefix {
containerName = containerName[1:]
}
// Add own prefix and use `-` instead `/`.
// e.g. my/container -> cadvisor-my-container
properContainerName := fmt.Sprintf("%s-%s", monGroupPrefix, strings.Replace(containerName, "/", "-", -1))
monGroupPath := filepath.Join(controlGroupPath, monitoringGroupDir, properContainerName)

// Prepare new one if not exists.
if monGroupPath == "" {
// Remove leading prefix.
// e.g. /my/container -> my/container
if len(containerName) >= minContainerNameLen && containerName[0] == containerPrefix {
containerName = containerName[1:]
createNew := false

// Check if there is any monitoring group.
existingPath, err := findGroup(filepath.Join(controlGroupPath, monGroupsDirName), processThreads, false, true)
if err != nil {
if err != errNotEnoughPIDs && err != errTooManyPIDs {
return "", fmt.Errorf("couldn't find monitoring group matching %q container: %v", containerName, err)
}

// Add own prefix and use `-` instead `/`.
// e.g. my/container -> cadvisor-my-container
properContainerName := fmt.Sprintf("%s-%s", monGroupPrefix, strings.Replace(containerName, "/", "-", -1))
monGroupPath = filepath.Join(controlGroupPath, monitoringGroupDir, properContainerName)
rmErr := os.Remove(monGroupPath)
if rmErr != nil && !os.IsNotExist(rmErr) {
return "", fmt.Errorf("couldn't clean up monitoring group matching %q container: %v", containerName, rmErr)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrap the error, please.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Errorf not enough? Do you have in mind specific error type? Please let me know how to improve this error

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return "", fmt.Errorf("couldn't clean up monitoring group matching %q container: %v", containerName, rmErr)
return "", fmt.Errorf("couldn't clean up monitoring group matching %q container: %w", containerName, rmErr)

}
if existingPath != monGroupPath {
rmErr = os.Remove(existingPath)
if rmErr != nil && !os.IsNotExist(rmErr) {
return "", fmt.Errorf("couldn't clean up monitoring group matching %q container: %v", containerName, rmErr)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrap the error, please.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have in mind specific error type? Please let me know how to improve this error

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return "", fmt.Errorf("couldn't clean up monitoring group matching %q container: %v", containerName, rmErr)
return "", fmt.Errorf("couldn't clean up monitoring group matching %q container: %w", containerName, rmErr)

}
}
createNew = true
}

// Prepare new one if not exists.
if createNew || existingPath == "" {
err = os.MkdirAll(monGroupPath, os.ModePerm)
if err != nil {
return "", fmt.Errorf("couldn't create monitoring group directory for %q container: %w", containerName, err)
}

if !inHostNamespace {
processPath = "/rootfs/proc"
}

for _, pid := range pids {
processThreads, err := getAllProcessThreads(filepath.Join(processPath, pid, processTask))
for _, thread := range processThreads {
treadInt, err := strconv.Atoi(thread)
if err != nil {
return "", err
return "", fmt.Errorf("couldn't parse %q: %w", thread, err)
}
for _, thread := range processThreads {
err = intelrdt.WriteIntelRdtTasks(monGroupPath, thread)
if err != nil {
secondError := os.Remove(monGroupPath)
if secondError != nil {
return "", fmt.Errorf(
"coudn't assign pids to %q container monitoring group: %w \n couldn't clear %q monitoring group: %v",
containerName, err, containerName, secondError)
}
return "", fmt.Errorf("coudn't assign pids to %q container monitoring group: %w", containerName, err)
err = intelrdt.WriteIntelRdtTasks(monGroupPath, treadInt)
if err != nil {
secondError := os.Remove(monGroupPath)
if secondError != nil {
return "", fmt.Errorf(
"coudn't assign pids to %q container monitoring group: %w \n couldn't clear %q monitoring group: %v",
containerName, err, containerName, secondError)
}
return "", fmt.Errorf("coudn't assign pids to %q container monitoring group: %w", containerName, err)
Comment on lines -149 to +192
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I'm wrong but:

  • The Old Way:
    1. Iterate over all pids.
    2. Fetch all the tids for a pid
    3. Iterate over all tids
    4. Write the tid to monitoring group.
    5. Exit tids loop.
    6. Exit pids loop.
  • The New Way:
    1. Iterate over all pids.
    2. Save all the tids for a pid to a variable.
    3. Exit pids loop.
    4. Iterate over all tids.
    5. Write the tid to monitoring group.
    6. Exit tids loop.

Functionally these two approaches are identical. In the PR description you wrote:

Collect threads instead of pids. During collector startup we collect threads, so for the following checks we should also collect threads, not pids. Pid approach woks on relatively simple processes, but for any complex one pid list will never be the same as the original thread list, thus resulting in err on every collector run.

, but threads (tids) have always been collected and written to monitoring group.

I might be missing something but I can't understand where the bug that you are trying to fix is. I will appreciate more detailed explanation that will help me to understand your reasoning. A test case failing with The Old Way and passing with The New Way would be perfect!

}
}
}
Expand All @@ -189,20 +212,16 @@ func getPids(containerName string) ([]int, error) {
// getAllProcessThreads obtains all available processes from directory.
// e.g. ls /proc/4215/task/ -> 4215, 4216, 4217, 4218
// func will return [4215, 4216, 4217, 4218].
func getAllProcessThreads(path string) ([]int, error) {
processThreads := make([]int, 0)
func getAllProcessThreads(path string) ([]string, error) {
processThreads := make([]string, 0)

threadDirs, err := os.ReadDir(path)
if err != nil {
return processThreads, err
}

for _, dir := range threadDirs {
pid, err := strconv.Atoi(dir.Name())
if err != nil {
return nil, fmt.Errorf("couldn't parse %q dir: %v", dir.Name(), err)
}
processThreads = append(processThreads, pid)
processThreads = append(processThreads, dir.Name())
iwankgb marked this conversation as resolved.
Show resolved Hide resolved
JulSenko marked this conversation as resolved.
Show resolved Hide resolved
}

return processThreads, nil
Expand Down Expand Up @@ -232,7 +251,7 @@ func findGroup(group string, pids []string, includeGroup bool, exclusive bool) (
for _, path := range availablePaths {
groupFound, err := arePIDsInGroup(path, pids, exclusive)
if err != nil {
return "", err
return path, err
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you want to return real value instead of zero value when you return an error?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not ideal, but we need path value for additional check later if error type is errNotEnoughPIDs or errTooManyPIDs. E.g.: group could have changed completely and we need to recreate it or that path is correct, but some threads died / were spawned anew.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is supposed to be information about an error, then it should be part of error struct rather then value returned from a function. I would expect you to check if err == errNotEnoughPIDs or if err == errTooManyPIDs and act accordingly. Interface that you propose is difficult to grasp, in my opinion.

JulSenko marked this conversation as resolved.
Show resolved Hide resolved
}
if groupFound {
return path, nil
Expand All @@ -259,7 +278,7 @@ func arePIDsInGroup(path string, pids []string, exclusive bool) (bool, error) {
if !ok {
// There are missing pids within group.
if any {
return false, fmt.Errorf("there should be all pids in group")
return false, errNotEnoughPIDs
}
return false, nil
}
Expand All @@ -269,7 +288,7 @@ func arePIDsInGroup(path string, pids []string, exclusive bool) (bool, error) {
// Check if there should be only passed pids in group.
if exclusive {
if len(tasks) != len(pids) {
return false, fmt.Errorf("group should have container pids only")
return false, errTooManyPIDs
}
}

Expand Down
15 changes: 3 additions & 12 deletions resctrl/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,10 +410,6 @@ func TestGetAllProcessThreads(t *testing.T) {
filepath.Join(path, "301", processTask, "301"),
touchDir,
},
{
filepath.Join(path, "301", processTask, "incorrect"),
touchDir,
},
}

for _, file := range files {
Expand All @@ -428,19 +424,14 @@ func TestGetAllProcessThreads(t *testing.T) {

var testCases = []struct {
path string
expected []int
expected []string
err string
}{
{
filepath.Join(mockedProcFs, "4215", processTask),
[]int{4215, 4216, 4217, 4218},
[]string{"4215", "4216", "4217", "4218"},
"",
},
{
filepath.Join(mockedProcFs, "301", processTask),
nil,
"couldn't parse \"incorrect\" dir: strconv.Atoi: parsing \"incorrect\": invalid syntax",
},
}

for _, test := range testCases {
Expand Down Expand Up @@ -515,7 +506,7 @@ func TestFindGroup(t *testing.T) {
[]string{"7"},
false,
true,
"",
filepath.Join(rootResctrl, "m1", monGroupsDirName, "test"),
"group should have container pids only",
},
}
Expand Down