From 14e3fab65ff8c71d662a4a192a38a665b4c7b172 Mon Sep 17 00:00:00 2001 From: Zhang Kang Date: Thu, 20 Jun 2024 09:04:38 +0800 Subject: [PATCH] add taskid in informer Signed-off-by: Zhang Kang --- pkg/koordlet/statesinformer/api.go | 27 +- pkg/koordlet/statesinformer/impl/config.go | 3 + .../statesinformer/impl/config_test.go | 5 + .../statesinformer/impl/states_pods.go | 73 ++++++ .../statesinformer/impl/states_pods_test.go | 243 ++++++++++++++++++ 5 files changed, 349 insertions(+), 2 deletions(-) diff --git a/pkg/koordlet/statesinformer/api.go b/pkg/koordlet/statesinformer/api.go index 656b281795..21e1edd6af 100644 --- a/pkg/koordlet/statesinformer/api.go +++ b/pkg/koordlet/statesinformer/api.go @@ -27,14 +27,37 @@ import ( ) type PodMeta struct { - Pod *corev1.Pod - CgroupDir string + Pod *corev1.Pod + CgroupDir string + ContainerTaskIds map[string][]int32 +} + +// DeepCopyContainerTaskIds creates a deep copy of ContainerTaskIds +func DeepCopyContainerTaskIds(in map[string][]int32) map[string][]int32 { + if in == nil { + return nil + } + + out := make(map[string][]int32, len(in)) + + for key, value := range in { + if value == nil { + out[key] = nil + } else { + copyValue := make([]int32, len(value)) + copy(copyValue, value) + out[key] = copyValue + } + } + + return out } func (in *PodMeta) DeepCopy() *PodMeta { out := new(PodMeta) out.Pod = in.Pod.DeepCopy() out.CgroupDir = in.CgroupDir + out.ContainerTaskIds = DeepCopyContainerTaskIds(in.ContainerTaskIds) return out } diff --git a/pkg/koordlet/statesinformer/impl/config.go b/pkg/koordlet/statesinformer/impl/config.go index 2a4d42b916..2f36508f5e 100644 --- a/pkg/koordlet/statesinformer/impl/config.go +++ b/pkg/koordlet/statesinformer/impl/config.go @@ -33,6 +33,7 @@ type Config struct { DisableQueryKubeletConfig bool EnableNodeMetricReport bool MetricReportInterval time.Duration // Deprecated + EnablePodTaskIds bool } func NewDefaultConfig() *Config { @@ -45,6 +46,7 @@ func NewDefaultConfig() *Config { NodeTopologySyncInterval: 3 * time.Second, DisableQueryKubeletConfig: false, EnableNodeMetricReport: true, + EnablePodTaskIds: false, } } @@ -58,4 +60,5 @@ func (c *Config) InitFlags(fs *flag.FlagSet) { fs.BoolVar(&c.DisableQueryKubeletConfig, "disable-query-kubelet-config", c.DisableQueryKubeletConfig, "Disables querying the kubelet configuration from kubelet. Flag must be set to true if kubelet-insecure-tls=true is configured") fs.DurationVar(&c.MetricReportInterval, "report-interval", c.MetricReportInterval, "Deprecated since v1.1, use ColocationStrategy.MetricReportIntervalSeconds in config map of slo-controller") fs.BoolVar(&c.EnableNodeMetricReport, "enable-node-metric-report", c.EnableNodeMetricReport, "Enable status update of node metric crd.") + fs.BoolVar(&c.EnablePodTaskIds, "enable-pod-taskids", c.EnablePodTaskIds, "Enable pod taskids in statesinformer.") } diff --git a/pkg/koordlet/statesinformer/impl/config_test.go b/pkg/koordlet/statesinformer/impl/config_test.go index 964a68c261..3fe060a7f7 100644 --- a/pkg/koordlet/statesinformer/impl/config_test.go +++ b/pkg/koordlet/statesinformer/impl/config_test.go @@ -42,6 +42,7 @@ func TestNewDefaultConfig(t *testing.T) { DisableQueryKubeletConfig: false, EnableNodeMetricReport: true, MetricReportInterval: 0, + EnablePodTaskIds: false, }, }, } @@ -64,6 +65,7 @@ func TestConfig_InitFlags(t *testing.T) { "--node-topology-sync-interval=10s", "--disable-query-kubelet-config=true", "--enable-node-metric-report=false", + "--enable-pod-taskids=true", } fs := flag.NewFlagSet(cmdArgs[0], flag.ExitOnError) @@ -76,6 +78,7 @@ func TestConfig_InitFlags(t *testing.T) { NodeTopologySyncInterval time.Duration DisableQueryKubeletConfig bool EnableNodeMetricReport bool + EnablePodTaskIds bool } type args struct { fs *flag.FlagSet @@ -96,6 +99,7 @@ func TestConfig_InitFlags(t *testing.T) { NodeTopologySyncInterval: 10 * time.Second, DisableQueryKubeletConfig: true, EnableNodeMetricReport: false, + EnablePodTaskIds: true, }, args: args{fs: fs}, }, @@ -111,6 +115,7 @@ func TestConfig_InitFlags(t *testing.T) { NodeTopologySyncInterval: tt.fields.NodeTopologySyncInterval, DisableQueryKubeletConfig: tt.fields.DisableQueryKubeletConfig, EnableNodeMetricReport: tt.fields.EnableNodeMetricReport, + EnablePodTaskIds: tt.fields.EnablePodTaskIds, } c := NewDefaultConfig() c.InitFlags(tt.args.fs) diff --git a/pkg/koordlet/statesinformer/impl/states_pods.go b/pkg/koordlet/statesinformer/impl/states_pods.go index 7da62dbf16..ef45486835 100644 --- a/pkg/koordlet/statesinformer/impl/states_pods.go +++ b/pkg/koordlet/statesinformer/impl/states_pods.go @@ -31,6 +31,7 @@ import ( apiext "github.com/koordinator-sh/koordinator/apis/extension" "github.com/koordinator-sh/koordinator/pkg/koordlet/metrics" "github.com/koordinator-sh/koordinator/pkg/koordlet/pleg" + "github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor" "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer" koordletutil "github.com/koordinator-sh/koordinator/pkg/koordlet/util" "github.com/koordinator-sh/koordinator/pkg/koordlet/util/system" @@ -57,6 +58,7 @@ type podsInformer struct { nodeInformer *nodeInformer callbackRunner *callbackRunner + cgroupReader resourceexecutor.CgroupReader } func NewPodsInformer() *podsInformer { @@ -85,6 +87,8 @@ func (s *podsInformer) Setup(ctx *PluginOption, states *PluginState) { s.nodeInformer = nodeInformer s.callbackRunner = states.callbackRunner + + s.cgroupReader = resourceexecutor.NewCgroupReader() } func (s *podsInformer) Start(stopCh <-chan struct{}) { @@ -141,6 +145,70 @@ func (s *podsInformer) GetAllPods() []*statesinformer.PodMeta { return pods } +func (s *podsInformer) getTaskIds(podMeta *statesinformer.PodMeta) { + pod := podMeta.Pod + containerMap := make(map[string]*corev1.Container, len(pod.Spec.Containers)) + for i := range pod.Spec.Containers { + container := &pod.Spec.Containers[i] + containerMap[container.Name] = container + } + + for _, containerStat := range pod.Status.ContainerStatuses { + container, exist := containerMap[containerStat.Name] + if !exist { + klog.Warningf("container %s/%s/%s lost during reconcile resctrl group", pod.Namespace, + pod.Name, containerStat.Name) + continue + } + + containerDir, err := koordletutil.GetContainerCgroupParentDir(podMeta.CgroupDir, &containerStat) + if err != nil { + klog.V(4).Infof("failed to get pod container cgroup path for container %s/%s/%s, err: %s", + pod.Namespace, pod.Name, container.Name, err) + continue + } + ids, err := s.cgroupReader.ReadCPUTasks(containerDir) + if err == nil && ids != nil && podMeta.ContainerTaskIds != nil { + podMeta.ContainerTaskIds[containerStat.ContainerID] = ids + } + if err != nil && resourceexecutor.IsCgroupDirErr(err) { + klog.V(5).Infof("failed to read container task ids whose cgroup path %s does not exists, err: %s", + containerDir, err) + continue + } else if err != nil { + klog.Warningf("failed to get pod container cgroup task ids for container %s/%s/%s, err: %s", + pod.Namespace, pod.Name, container.Name, err) + continue + } + } + + sandboxID, err := koordletutil.GetPodSandboxContainerID(pod) + if err != nil { + klog.V(4).Infof("failed to get sandbox container ID for pod %s/%s, err: %s", + pod.Namespace, pod.Name, err) + return + } + sandboxContainerDir, err := koordletutil.GetContainerCgroupParentDirByID(podMeta.CgroupDir, sandboxID) + if err != nil { + klog.V(4).Infof("failed to get pod container cgroup path for sandbox container %s/%s/%s, err: %s", + pod.Namespace, pod.Name, sandboxID, err) + return + } + ids, err := s.cgroupReader.ReadCPUTasks(sandboxContainerDir) + if err == nil && ids != nil && podMeta.ContainerTaskIds != nil { + podMeta.ContainerTaskIds[sandboxID] = ids + } + if err != nil && resourceexecutor.IsCgroupDirErr(err) { + klog.V(5).Infof("failed to read container task ids whose cgroup path %s does not exists, err: %s", + sandboxContainerDir, err) + return + } else if err != nil { + klog.Warningf("failed to get pod container cgroup task ids for sandbox container %s/%s/%s, err: %s", + pod.Namespace, pod.Name, sandboxID, err) + return + } +} + func (s *podsInformer) syncPods() error { podList, err := s.kubelet.GetAllPods() @@ -159,6 +227,11 @@ func (s *podsInformer) syncPods() error { CgroupDir: genPodCgroupParentDir(pod), } newPodMap[string(pod.UID)] = podMeta + if s.config.EnablePodTaskIds && !util.IsPodTerminated(pod) { + podMeta.ContainerTaskIds = make(map[string][]int32) + // record pod's containers taskids + s.getTaskIds(podMeta) + } // record pod container metrics recordPodResourceMetrics(podMeta) } diff --git a/pkg/koordlet/statesinformer/impl/states_pods_test.go b/pkg/koordlet/statesinformer/impl/states_pods_test.go index aaa6eebbe1..fa0af6726e 100644 --- a/pkg/koordlet/statesinformer/impl/states_pods_test.go +++ b/pkg/koordlet/statesinformer/impl/states_pods_test.go @@ -39,6 +39,7 @@ import ( apiext "github.com/koordinator-sh/koordinator/apis/extension" fakekoordclientset "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned/fake" "github.com/koordinator-sh/koordinator/pkg/koordlet/metrics" + "github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor" "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer" "github.com/koordinator-sh/koordinator/pkg/koordlet/util/system" ) @@ -223,6 +224,8 @@ func Test_statesInformer_syncPods(t *testing.T) { }}, podHasSynced: atomic.NewBool(false), callbackRunner: NewCallbackRunner(), + cgroupReader: resourceexecutor.NewCgroupReader(), + config: c, } err := m.syncPods() @@ -377,6 +380,8 @@ func Test_statesInformer_syncKubeletLoop(t *testing.T) { }}, callbackRunner: NewCallbackRunner(), podHasSynced: atomic.NewBool(false), + cgroupReader: resourceexecutor.NewCgroupReader(), + config: c, } go m.syncKubeletLoop(c.KubeletSyncInterval, stopCh) time.Sleep(5 * time.Second) @@ -531,3 +536,241 @@ func Test_recordPodResourceMetrics(t *testing.T) { }) } } + +func testingPrepareContainerCgroupCPUTasks(t *testing.T, helper *system.FileTestUtil, containerParentPath, tasksStr string) { + tasks, err := system.GetCgroupResource(system.CPUTasksName) + assert.NoError(t, err) + helper.WriteCgroupFileContents(containerParentPath, tasks, tasksStr) +} + +func Test_getTaskIds(t *testing.T) { + type args struct { + podMeta *statesinformer.PodMeta + } + type fields struct { + containerParentDir string + containerTasksStr string + invalidPath bool + useCgroupsV2 bool + } + tests := []struct { + name string + args args + fields fields + want map[string][]int32 + }{ + { + name: "do nothing for empty pod", + args: args{ + podMeta: &statesinformer.PodMeta{Pod: &corev1.Pod{}}, + }, + want: nil, + }, + { + name: "successfully get task ids for the pod", + fields: fields{ + containerParentDir: "kubepods.slice/p0/cri-containerd-c0.scope", + containerTasksStr: "122450\n122454\n123111\n128912", + }, + args: args{ + podMeta: &statesinformer.PodMeta{ + Pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod0", + UID: "p0", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "container0", + }, + }, + }, + Status: corev1.PodStatus{ + ContainerStatuses: []corev1.ContainerStatus{ + { + Name: "container0", + ContainerID: "containerd://c0", + }, + }, + }, + }, + CgroupDir: "kubepods.slice/p0", + ContainerTaskIds: make(map[string][]int32), + }, + }, + want: map[string][]int32{"containerd://c0": {122450, 122454, 123111, 128912}}, + }, + { + name: "return empty for invalid path", + fields: fields{ + containerParentDir: "p0/cri-containerd-c0.scope", + containerTasksStr: "122454\n123111\n128912", + invalidPath: true, + }, + args: args{ + podMeta: &statesinformer.PodMeta{ + Pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod0", + UID: "p0", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "container0", + }, + }, + }, + Status: corev1.PodStatus{ + ContainerStatuses: []corev1.ContainerStatus{ + { + Name: "container0", + ContainerID: "containerd://c0", + }, + }, + }, + }, + CgroupDir: "kubepods.slice/p0", + ContainerTaskIds: make(map[string][]int32), + }, + }, + want: make(map[string][]int32), + }, + { + name: "missing container's status", + fields: fields{ + containerParentDir: "p0/cri-containerd-c0.scope", + containerTasksStr: "122454\n123111\n128912", + invalidPath: true, + }, + args: args{ + podMeta: &statesinformer.PodMeta{ + Pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod0", + UID: "p0", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "container0", + }, + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + ContainerStatuses: []corev1.ContainerStatus{}, + }, + }, + CgroupDir: "kubepods.slice/p0", + ContainerTaskIds: make(map[string][]int32), + }, + }, + want: make(map[string][]int32), + }, + { + name: "successfully get task ids on cgroups v2", + fields: fields{ + containerParentDir: "kubepods.slice/p0/cri-containerd-c0.scope", + containerTasksStr: "122450\n122454\n123111\n128912", + useCgroupsV2: true, + }, + args: args{ + podMeta: &statesinformer.PodMeta{ + Pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod0", + UID: "p0", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "container0", + }, + }, + }, + Status: corev1.PodStatus{ + ContainerStatuses: []corev1.ContainerStatus{ + { + Name: "container0", + ContainerID: "containerd://c0", + }, + }, + }, + }, + CgroupDir: "kubepods.slice/p0", + ContainerTaskIds: make(map[string][]int32), + }, + }, + want: map[string][]int32{"containerd://c0": {122450, 122454, 123111, 128912}}, + }, + { + name: "successfully get task ids from the sandbox container", + fields: fields{ + containerParentDir: "kubepods.slice/kubepods-besteffort.slice/kubepods-besteffort-podp0.slice/cri-containerd-abc.scope", + containerTasksStr: "122450\n122454\n123111\n128912", + useCgroupsV2: true, + }, + args: args{ + podMeta: &statesinformer.PodMeta{ + Pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod0", + UID: "p0", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "container0", + }, + }, + }, + Status: corev1.PodStatus{ + ContainerStatuses: []corev1.ContainerStatus{ + { + Name: "container0", + ContainerID: "containerd://c0", + }, + }, + }, + }, + CgroupDir: "kubepods.slice/kubepods-besteffort.slice/kubepods-besteffort-podp0.slice", + ContainerTaskIds: make(map[string][]int32), + }, + }, + want: map[string][]int32{"containerd://abc": {122450, 122454, 123111, 128912}}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + helper := system.NewFileTestUtil(t) + helper.SetCgroupsV2(tt.fields.useCgroupsV2) + defer helper.Cleanup() + + testingPrepareContainerCgroupCPUTasks(t, helper, tt.fields.containerParentDir, tt.fields.containerTasksStr) + + system.CommonRootDir = "" + if tt.fields.invalidPath { + system.Conf.CgroupRootDir = "invalidPath" + } + + c := NewDefaultConfig() + + m := &podsInformer{ + kubelet: &testKubeletStub{pods: corev1.PodList{ + Items: []corev1.Pod{ + {}, + }, + }}, + callbackRunner: NewCallbackRunner(), + podHasSynced: atomic.NewBool(false), + cgroupReader: resourceexecutor.NewCgroupReader(), + config: c, + } + + m.getTaskIds(tt.args.podMeta) + assert.Equal(t, tt.want, tt.args.podMeta.ContainerTaskIds) + }) + } +}