diff --git a/README.md b/README.md index c82bcab..22d7c5b 100644 --- a/README.md +++ b/README.md @@ -64,6 +64,7 @@ The default telemd runs the following instruments: * `kubernetes_cgrp_cpu` the cpu usage time of individual Kubernetes Pod containers * `kubernetes_cgrp_blkio` the total block io usage in bytes for individual Kubernetes Pod containers * `kubernetes_cgrp_memory` the total memory (RAM) usage in bytes for individual Kubernetes Pod containers +* `kubernetes_cgrp_net` the total network io usage in bytes for individual Kubernetes Pod containers ### Info keys diff --git a/internal/telemd/cfg.go b/internal/telemd/cfg.go index d192ea6..f98251a 100644 --- a/internal/telemd/cfg.go +++ b/internal/telemd/cfg.go @@ -77,6 +77,7 @@ func NewDefaultConfig() *Config { "kubernetes_cgrp_cpu": 1 * time.Second, "kubernetes_cgrp_blkio": 1 * time.Second, "kubernetes_cgrp_memory": 1 * time.Second, + "kubernetes_cgrp_net": 1 * time.Second, } return cfg diff --git a/internal/telemd/daemon.go b/internal/telemd/daemon.go index 96631dd..b16f468 100644 --- a/internal/telemd/daemon.go +++ b/internal/telemd/daemon.go @@ -53,6 +53,7 @@ func (daemon *Daemon) initInstruments(factory InstrumentFactory) { "kubernetes_cgrp_cpu": factory.NewKubernetesCgroupCpuInstrument(), "kubernetes_cgrp_blkio": factory.NewKubernetesCgroupBlkioInstrument(), "kubernetes_cgrp_memory": factory.NewKubernetesCgroupMemoryInstrument(), + "kubernetes_cgrp_net": factory.NewKubernetesCgroupNetInstrument(), } activeNetDevice, err := findActiveNetDevice() diff --git a/internal/telemd/instrument.go b/internal/telemd/instrument.go index d1ab047..852ccbb 100644 --- a/internal/telemd/instrument.go +++ b/internal/telemd/instrument.go @@ -33,6 +33,7 @@ type InstrumentFactory interface { NewDockerCgroupMemoryInstrument() Instrument NewKubernetesCgroupBlkioInstrument() Instrument NewKubernetesCgroupMemoryInstrument() Instrument + NewKubernetesCgroupNetInstrument() Instrument NewPsiCpuInstrument() Instrument NewPsiMemoryInstrument() Instrument NewPsiIoInstrument() Instrument @@ -75,12 +76,16 @@ type DockerCgroupv1NetworkInstrument struct { type DockerCgroupv2NetworkInstrument struct { pids map[string]string } + type DockerCgroupv1MemoryInstrument struct{} type DockerCgroupv2MemoryInstrument struct{} type KubernetesCgroupCpuInstrument struct{} type KubernetesCgroupBlkioInstrument struct{} type KuberenetesCgroupMemoryInstrument struct{} +type KubernetesCgroupv1NetworkInstrument struct { + pids map[string]string +} func (CpuUtilInstrument) MeasureAndReport(channel telem.TelemetryChannel) { then := readCpuUtil() @@ -531,6 +536,53 @@ func (c *DockerCgroupv1NetworkInstrument) MeasureAndReport(channel telem.Telemet } } +func (c KubernetesCgroupv1NetworkInstrument) MeasureAndReport(channel telem.TelemetryChannel) { + var kubepodRootDir = "/sys/fs/cgroup/cpuacct/kubepods" + var bestEffortDir = kubepodRootDir + "/" + "besteffort" + var burstableDir = kubepodRootDir + "/" + "burstable" + var guaranteedDir = kubepodRootDir + "/" + "guaranteed" + + for _, kubepodDir := range [3]string{bestEffortDir, burstableDir, guaranteedDir} { + + go func(kubepodDir string) { + for _, containerDir := range fetchKubernetesContainerDirs(kubepodDir) { + containerId := filepath.Base(containerDir) + + pid, ok := c.pids[containerId] + + if !ok { + // refresh pids + pids, err := containerProcessIds() + if err != nil { + log.Println("unable to get container process ids", err) + continue + } + c.pids = pids + + pid, ok = c.pids[containerId] + if !ok { + log.Println("could not get pid of container after refresh", containerId) + continue + } + } + + rx, tx, err := readTotalProcessNetworkStats(pid) + if err != nil { + if os.IsNotExist(err) { + delete(c.pids, containerId) // delete now and wait for next iteration to refresh + } else { + log.Println("error parsing network stats of pid", pid, err, err) + } + continue + } + + channel.Put(telem.NewTelemetry("kubernetes_cgrp_net/"+containerId[:12], float64(rx+tx))) + } + }(kubepodDir) + } + +} + func (c *DockerCgroupv2NetworkInstrument) MeasureAndReport(channel telem.TelemetryChannel) { prefix := "docker-" @@ -932,6 +984,9 @@ func (d defaultInstrumentFactory) NewKubernetesCgroupBlkioInstrument() Instrumen func (d defaultInstrumentFactory) NewKubernetesCgroupMemoryInstrument() Instrument { return KuberenetesCgroupMemoryInstrument{} } +func (d defaultInstrumentFactory) NewKubernetesCgroupNetInstrument() Instrument { + return KubernetesCgroupv1NetworkInstrument{} +} func (d defaultInstrumentFactory) NewDockerCgroupMemoryInstrument() Instrument { cgroup := checkCgroup() diff --git a/internal/telemd/sysparse.go b/internal/telemd/sysparse.go index 7cec428..92c2125 100644 --- a/internal/telemd/sysparse.go +++ b/internal/telemd/sysparse.go @@ -266,6 +266,10 @@ func getContainerId(pid string) (string, error) { // cgroup v1 return split, nil } + } else if strings.Contains(line, "kubepods") { + // kubernetes + // freezer:/kubepods/besteffort/podae778fdf-394c-4356-9625-ea50666783b1/2cc54a6877a50da0b6a2a5340dd1e8c5707a1d7d4b363e03b7cde76d2569f0c0 + return strings.Split(line, "/")[4], nil } } return "", errors.New("Did not find container for PID " + pid)