diff --git a/pkg/tracker/generic/resource_state_json_paths.go b/pkg/tracker/generic/resource_state_json_paths.go index 5283ab9..9cf867a 100644 --- a/pkg/tracker/generic/resource_state_json_paths.go +++ b/pkg/tracker/generic/resource_state_json_paths.go @@ -186,20 +186,23 @@ func casify(in ...string) []string { var result []string for _, value := range in { - result = append(result, value) - result = append(result, strings.ReplaceAll(value, " ", "")) - result = append(result, caps.ToUpper(strings.ReplaceAll(value, " ", ""))) - result = append(result, caps.ToCamel(value)) - result = append(result, caps.ToKebab(value)) - result = append(result, caps.ToDotNotation(value)) - result = append(result, caps.ToSnake(value)) - result = append(result, caps.ToTitle(value)) - result = append(result, caps.ToUpper(value)) - result = append(result, caps.ToLower(value)) - result = append(result, caps.ToLowerCamel(value)) - result = append(result, caps.ToScreamingDotNotation(value)) - result = append(result, caps.ToScreamingKebab(value)) - result = append(result, caps.ToScreamingSnake(value)) + result = append( + result, + value, + strings.ReplaceAll(value, " ", ""), + caps.ToUpper(strings.ReplaceAll(value, " ", "")), + caps.ToCamel(value), + caps.ToKebab(value), + caps.ToDotNotation(value), + caps.ToSnake(value), + caps.ToTitle(value), + caps.ToUpper(value), + caps.ToLower(value), + caps.ToLowerCamel(value), + caps.ToScreamingDotNotation(value), + caps.ToScreamingKebab(value), + caps.ToScreamingSnake(value), + ) } result = lo.Uniq(result) diff --git a/pkg/trackers/dyntracker/dynamic_readiness_tracker.go b/pkg/trackers/dyntracker/dynamic_readiness_tracker.go index 6fc7d4d..98e9a89 100644 --- a/pkg/trackers/dyntracker/dynamic_readiness_tracker.go +++ b/pkg/trackers/dyntracker/dynamic_readiness_tracker.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "regexp" "time" "github.com/samber/lo" @@ -15,6 +16,7 @@ import ( "k8s.io/client-go/kubernetes" watchtools "k8s.io/client-go/tools/watch" + "github.com/werf/kubedog/pkg/display" commontracker "github.com/werf/kubedog/pkg/tracker" "github.com/werf/kubedog/pkg/tracker/canary" "github.com/werf/kubedog/pkg/tracker/daemonset" @@ -37,6 +39,13 @@ type DynamicReadinessTracker struct { timeout time.Duration noActivityTimeout time.Duration + + saveLogsOnlyForContainers []string + saveLogsByRegex *regexp.Regexp + saveLogsByRegexForContainers map[string]*regexp.Regexp + ignoreLogs bool + ignoreLogsForContainers []string + saveEvents bool } func NewDynamicReadinessTracker( @@ -50,7 +59,7 @@ func NewDynamicReadinessTracker( opts DynamicReadinessTrackerOptions, ) *DynamicReadinessTracker { timeout := opts.Timeout - logsFromTime := opts.LogsFromTime + captureLogsFromTime := opts.CaptureLogsFromTime var noActivityTimeout time.Duration if opts.NoActivityTimeout != 0 { @@ -81,35 +90,35 @@ func NewDynamicReadinessTracker( tracker = deployment.NewTracker(resourceName, resourceNamespace, staticClient, commontracker.Options{ ParentContext: ctx, Timeout: timeout, - LogsFromTime: logsFromTime, + LogsFromTime: captureLogsFromTime, IgnoreReadinessProbeFailsByContainerName: ignoreReadinessProbeFailsByContainerName, }) case schema.GroupKind{Group: "apps", Kind: "DaemonSet"}, schema.GroupKind{Group: "extensions", Kind: "DaemonSet"}: tracker = daemonset.NewTracker(resourceName, resourceNamespace, staticClient, commontracker.Options{ ParentContext: ctx, Timeout: timeout, - LogsFromTime: logsFromTime, + LogsFromTime: captureLogsFromTime, IgnoreReadinessProbeFailsByContainerName: ignoreReadinessProbeFailsByContainerName, }) case schema.GroupKind{Group: "flagger.app", Kind: "Canary"}: tracker = canary.NewTracker(resourceName, resourceNamespace, staticClient, commontracker.Options{ ParentContext: ctx, Timeout: timeout, - LogsFromTime: logsFromTime, + LogsFromTime: captureLogsFromTime, IgnoreReadinessProbeFailsByContainerName: ignoreReadinessProbeFailsByContainerName, }) case schema.GroupKind{Group: "apps", Kind: "StatefulSet"}: tracker = statefulset.NewTracker(resourceName, resourceNamespace, staticClient, commontracker.Options{ ParentContext: ctx, Timeout: timeout, - LogsFromTime: logsFromTime, + LogsFromTime: captureLogsFromTime, IgnoreReadinessProbeFailsByContainerName: ignoreReadinessProbeFailsByContainerName, }) case schema.GroupKind{Group: "batch", Kind: "Job"}: tracker = job.NewTracker(resourceName, resourceNamespace, staticClient, commontracker.Options{ ParentContext: ctx, Timeout: timeout, - LogsFromTime: logsFromTime, + LogsFromTime: captureLogsFromTime, IgnoreReadinessProbeFailsByContainerName: ignoreReadinessProbeFailsByContainerName, }) default: @@ -121,19 +130,31 @@ func NewDynamicReadinessTracker( } return &DynamicReadinessTracker{ - taskState: taskState, - logStore: logStore, - tracker: tracker, - timeout: timeout, - noActivityTimeout: noActivityTimeout, + taskState: taskState, + logStore: logStore, + tracker: tracker, + timeout: timeout, + noActivityTimeout: noActivityTimeout, + saveLogsOnlyForContainers: opts.SaveLogsOnlyForContainers, + saveLogsByRegex: opts.SaveLogsByRegex, + saveLogsByRegexForContainers: opts.SaveLogsByRegexForContainers, + ignoreLogs: opts.IgnoreLogs, + ignoreLogsForContainers: opts.IgnoreLogsForContainers, + saveEvents: opts.SaveEvents, } } type DynamicReadinessTrackerOptions struct { Timeout time.Duration NoActivityTimeout time.Duration - LogsFromTime time.Time IgnoreReadinessProbeFailsByContainerName map[string]time.Duration + CaptureLogsFromTime time.Time + SaveLogsOnlyForContainers []string + SaveLogsByRegex *regexp.Regexp + SaveLogsByRegexForContainers map[string]*regexp.Regexp + IgnoreLogs bool + IgnoreLogsForContainers []string + SaveEvents bool } func (t *DynamicReadinessTracker) Track(ctx context.Context) error { @@ -192,7 +213,7 @@ func (t *DynamicReadinessTracker) trackDeployment(ctx context.Context, tracker * abortErr error ) t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { - t.addMissingPodsStatesFromDeploymentStatus(&status, ts) + t.handlePodsFromDeploymentStatus(&status, ts) t.handleDeploymentStatus(&status, ts) abort, abortErr = t.handleTaskStateStatus(ts) }) @@ -206,7 +227,7 @@ func (t *DynamicReadinessTracker) trackDeployment(ctx context.Context, tracker * abortErr error ) t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { - t.addMissingPodsStatesFromDeploymentStatus(&status, ts) + t.handlePodsFromDeploymentStatus(&status, ts) t.handleDeploymentStatus(&status, ts) abort, abortErr = t.handleTaskStateStatus(ts) }) @@ -220,7 +241,7 @@ func (t *DynamicReadinessTracker) trackDeployment(ctx context.Context, tracker * abortErr error ) t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { - t.addMissingPodsStatesFromDeploymentStatus(&report.DeploymentStatus, ts) + t.handlePodsFromDeploymentStatus(&report.DeploymentStatus, ts) t.handleDeploymentStatus(&report.DeploymentStatus, ts) abort, abortErr = t.handleTaskStateStatus(ts) }) @@ -234,8 +255,8 @@ func (t *DynamicReadinessTracker) trackDeployment(ctx context.Context, tracker * abortErr error ) t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { - t.addMissingPodsStatesFromDeploymentStatus(&report.DeploymentStatus, ts) - t.addMissingPodsStatesFromDeploymentPodAddedReport(&report, ts) + t.handlePodsFromDeploymentStatus(&report.DeploymentStatus, ts) + t.handlePodsFromDeploymentPodAddedReport(&report, ts) t.handleDeploymentStatus(&report.DeploymentStatus, ts) abort, abortErr = t.handleTaskStateStatus(ts) }) @@ -249,7 +270,7 @@ func (t *DynamicReadinessTracker) trackDeployment(ctx context.Context, tracker * abortErr error ) t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { - t.addMissingPodsStatesFromDeploymentStatus(&status, ts) + t.handlePodsFromDeploymentStatus(&status, ts) t.handleDeploymentStatus(&status, ts) abort, abortErr = t.handleTaskStateStatus(ts) }) @@ -263,7 +284,7 @@ func (t *DynamicReadinessTracker) trackDeployment(ctx context.Context, tracker * abortErr error ) t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { - t.addMissingPodsStatesFromDeploymentStatus(&status, ts) + t.handlePodsFromDeploymentStatus(&status, ts) t.handleDeploymentStatus(&status, ts) abort, abortErr = t.handleTaskStateStatus(ts) }) @@ -287,7 +308,7 @@ func (t *DynamicReadinessTracker) trackDeployment(ctx context.Context, tracker * abortErr error ) t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { - t.addMissingPodsStatesFromDeploymentStatus(&report.DeploymentStatus, ts) + t.handlePodsFromDeploymentStatus(&report.DeploymentStatus, ts) t.handleDeploymentStatus(&report.DeploymentStatus, ts) t.handleReplicaSetPodError(&report.ReplicaSetPodError, ts) abort, abortErr = t.handleTaskStateStatus(ts) @@ -323,7 +344,7 @@ func (t *DynamicReadinessTracker) trackStatefulSet(ctx context.Context, tracker abortErr error ) t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { - t.addMissingPodsStatesFromStatefulSetStatus(&status, ts) + t.handlePodsFromStatefulSetStatus(&status, ts) t.handleStatefulSetStatus(&status, ts) abort, abortErr = t.handleTaskStateStatus(ts) }) @@ -337,7 +358,7 @@ func (t *DynamicReadinessTracker) trackStatefulSet(ctx context.Context, tracker abortErr error ) t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { - t.addMissingPodsStatesFromStatefulSetStatus(&status, ts) + t.handlePodsFromStatefulSetStatus(&status, ts) t.handleStatefulSetStatus(&status, ts) abort, abortErr = t.handleTaskStateStatus(ts) }) @@ -351,8 +372,8 @@ func (t *DynamicReadinessTracker) trackStatefulSet(ctx context.Context, tracker abortErr error ) t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { - t.addMissingPodsStatesFromStatefulSetStatus(&report.StatefulSetStatus, ts) - t.addMissingPodsStatesFromStatefulSetPodAddedReport(&report, ts) + t.handlePodsFromStatefulSetStatus(&report.StatefulSetStatus, ts) + t.handlePodsFromStatefulSetPodAddedReport(&report, ts) t.handleStatefulSetStatus(&report.StatefulSetStatus, ts) abort, abortErr = t.handleTaskStateStatus(ts) }) @@ -366,7 +387,7 @@ func (t *DynamicReadinessTracker) trackStatefulSet(ctx context.Context, tracker abortErr error ) t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { - t.addMissingPodsStatesFromStatefulSetStatus(&status, ts) + t.handlePodsFromStatefulSetStatus(&status, ts) t.handleStatefulSetStatus(&status, ts) abort, abortErr = t.handleTaskStateStatus(ts) }) @@ -380,7 +401,7 @@ func (t *DynamicReadinessTracker) trackStatefulSet(ctx context.Context, tracker abortErr error ) t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { - t.addMissingPodsStatesFromStatefulSetStatus(&status, ts) + t.handlePodsFromStatefulSetStatus(&status, ts) t.handleStatefulSetStatus(&status, ts) abort, abortErr = t.handleTaskStateStatus(ts) }) @@ -404,7 +425,7 @@ func (t *DynamicReadinessTracker) trackStatefulSet(ctx context.Context, tracker abortErr error ) t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { - t.addMissingPodsStatesFromStatefulSetStatus(&report.StatefulSetStatus, ts) + t.handlePodsFromStatefulSetStatus(&report.StatefulSetStatus, ts) t.handleStatefulSetStatus(&report.StatefulSetStatus, ts) t.handleReplicaSetPodError(&report.ReplicaSetPodError, ts) abort, abortErr = t.handleTaskStateStatus(ts) @@ -440,7 +461,7 @@ func (t *DynamicReadinessTracker) trackDaemonSet(ctx context.Context, tracker *d abortErr error ) t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { - t.addMissingPodsStatesFromDaemonSetStatus(&status, ts) + t.handlePodsFromDaemonSetStatus(&status, ts) t.handleDaemonSetStatus(&status, ts) abort, abortErr = t.handleTaskStateStatus(ts) }) @@ -454,7 +475,7 @@ func (t *DynamicReadinessTracker) trackDaemonSet(ctx context.Context, tracker *d abortErr error ) t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { - t.addMissingPodsStatesFromDaemonSetStatus(&status, ts) + t.handlePodsFromDaemonSetStatus(&status, ts) t.handleDaemonSetStatus(&status, ts) abort, abortErr = t.handleTaskStateStatus(ts) }) @@ -468,8 +489,8 @@ func (t *DynamicReadinessTracker) trackDaemonSet(ctx context.Context, tracker *d abortErr error ) t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { - t.addMissingPodsStatesFromDaemonSetStatus(&report.DaemonSetStatus, ts) - t.addMissingPodsStatesFromDaemonSetPodAddedReport(&report, ts) + t.handlePodsFromDaemonSetStatus(&report.DaemonSetStatus, ts) + t.handlePodsFromDaemonSetPodAddedReport(&report, ts) t.handleDaemonSetStatus(&report.DaemonSetStatus, ts) abort, abortErr = t.handleTaskStateStatus(ts) }) @@ -483,7 +504,7 @@ func (t *DynamicReadinessTracker) trackDaemonSet(ctx context.Context, tracker *d abortErr error ) t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { - t.addMissingPodsStatesFromDaemonSetStatus(&status, ts) + t.handlePodsFromDaemonSetStatus(&status, ts) t.handleDaemonSetStatus(&status, ts) abort, abortErr = t.handleTaskStateStatus(ts) }) @@ -497,7 +518,7 @@ func (t *DynamicReadinessTracker) trackDaemonSet(ctx context.Context, tracker *d abortErr error ) t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { - t.addMissingPodsStatesFromDaemonSetStatus(&status, ts) + t.handlePodsFromDaemonSetStatus(&status, ts) t.handleDaemonSetStatus(&status, ts) abort, abortErr = t.handleTaskStateStatus(ts) }) @@ -521,7 +542,7 @@ func (t *DynamicReadinessTracker) trackDaemonSet(ctx context.Context, tracker *d abortErr error ) t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { - t.addMissingPodsStatesFromDaemonSetStatus(&report.DaemonSetStatus, ts) + t.handlePodsFromDaemonSetStatus(&report.DaemonSetStatus, ts) t.handleDaemonSetStatus(&report.DaemonSetStatus, ts) t.handleReplicaSetPodError(&report.PodError, ts) abort, abortErr = t.handleTaskStateStatus(ts) @@ -557,7 +578,7 @@ func (t *DynamicReadinessTracker) trackJob(ctx context.Context, tracker *job.Tra abortErr error ) t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { - t.addMissingPodsStatesFromJobStatus(&status, ts) + t.handlePodsFromJobStatus(&status, ts) t.handleJobStatus(&status, ts) abort, abortErr = t.handleTaskStateStatus(ts) }) @@ -571,7 +592,7 @@ func (t *DynamicReadinessTracker) trackJob(ctx context.Context, tracker *job.Tra abortErr error ) t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { - t.addMissingPodsStatesFromJobStatus(&status, ts) + t.handlePodsFromJobStatus(&status, ts) t.handleJobStatus(&status, ts) abort, abortErr = t.handleTaskStateStatus(ts) }) @@ -585,8 +606,8 @@ func (t *DynamicReadinessTracker) trackJob(ctx context.Context, tracker *job.Tra abortErr error ) t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { - t.addMissingPodsStatesFromJobStatus(&report.JobStatus, ts) - t.addMissingPodsStatesFromJobPodAddedReport(&report, ts) + t.handlePodsFromJobStatus(&report.JobStatus, ts) + t.handlePodsFromJobPodAddedReport(&report, ts) t.handleJobStatus(&report.JobStatus, ts) abort, abortErr = t.handleTaskStateStatus(ts) }) @@ -600,7 +621,7 @@ func (t *DynamicReadinessTracker) trackJob(ctx context.Context, tracker *job.Tra abortErr error ) t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { - t.addMissingPodsStatesFromJobStatus(&status, ts) + t.handlePodsFromJobStatus(&status, ts) t.handleJobStatus(&status, ts) abort, abortErr = t.handleTaskStateStatus(ts) }) @@ -614,7 +635,7 @@ func (t *DynamicReadinessTracker) trackJob(ctx context.Context, tracker *job.Tra abortErr error ) t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { - t.addMissingPodsStatesFromJobStatus(&status, ts) + t.handlePodsFromJobStatus(&status, ts) t.handleJobStatus(&status, ts) abort, abortErr = t.handleTaskStateStatus(ts) }) @@ -638,7 +659,7 @@ func (t *DynamicReadinessTracker) trackJob(ctx context.Context, tracker *job.Tra abortErr error ) t.taskState.RWTransaction(func(ts *statestore.ReadinessTaskState) { - t.addMissingPodsStatesFromJobStatus(&report.JobStatus, ts) + t.handlePodsFromJobStatus(&report.JobStatus, ts) t.handleJobStatus(&report.JobStatus, ts) t.handlePodError(&report.PodError, ts) abort, abortErr = t.handleTaskStateStatus(ts) @@ -811,95 +832,122 @@ func (t *DynamicReadinessTracker) trackGeneric(ctx context.Context, tracker *gen } } -func (t *DynamicReadinessTracker) addMissingPodsStatesFromDeploymentStatus(status *deployment.DeploymentStatus, taskState *statestore.ReadinessTaskState) { +func (t *DynamicReadinessTracker) handlePodsFromDeploymentStatus(status *deployment.DeploymentStatus, taskState *statestore.ReadinessTaskState) { pods := lo.PickBy(status.Pods, func(_ string, podStatus pod.PodStatus) bool { return lo.Contains(status.NewPodsNames, podStatus.Name) }) for _, pod := range pods { taskState.AddResourceState(pod.Name, taskState.Namespace(), podGvk) - taskState.AddDependency(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind(), pod.Name, taskState.Namespace(), podGvk) + + taskState.ResourceState(pod.Name, taskState.Namespace(), podGvk).RWTransaction(func(rs *statestore.ResourceState) { + setPodStatusAttribute(rs, pod.PodStatus.Phase) + }) } } -func (t *DynamicReadinessTracker) addMissingPodsStatesFromStatefulSetStatus(status *statefulset.StatefulSetStatus, taskState *statestore.ReadinessTaskState) { +func (t *DynamicReadinessTracker) handlePodsFromStatefulSetStatus(status *statefulset.StatefulSetStatus, taskState *statestore.ReadinessTaskState) { pods := lo.PickBy(status.Pods, func(_ string, podStatus pod.PodStatus) bool { return lo.Contains(status.NewPodsNames, podStatus.Name) }) for _, pod := range pods { taskState.AddResourceState(pod.Name, taskState.Namespace(), podGvk) - taskState.AddDependency(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind(), pod.Name, taskState.Namespace(), podGvk) + + taskState.ResourceState(pod.Name, taskState.Namespace(), podGvk).RWTransaction(func(rs *statestore.ResourceState) { + setPodStatusAttribute(rs, pod.PodStatus.Phase) + }) } } -func (t *DynamicReadinessTracker) addMissingPodsStatesFromDaemonSetStatus(status *daemonset.DaemonSetStatus, taskState *statestore.ReadinessTaskState) { +func (t *DynamicReadinessTracker) handlePodsFromDaemonSetStatus(status *daemonset.DaemonSetStatus, taskState *statestore.ReadinessTaskState) { pods := lo.PickBy(status.Pods, func(_ string, podStatus pod.PodStatus) bool { return lo.Contains(status.NewPodsNames, podStatus.Name) }) for _, pod := range pods { taskState.AddResourceState(pod.Name, taskState.Namespace(), podGvk) - taskState.AddDependency(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind(), pod.Name, taskState.Namespace(), podGvk) + + taskState.ResourceState(pod.Name, taskState.Namespace(), podGvk).RWTransaction(func(rs *statestore.ResourceState) { + setPodStatusAttribute(rs, pod.PodStatus.Phase) + }) } } -func (t *DynamicReadinessTracker) addMissingPodsStatesFromJobStatus(status *job.JobStatus, taskState *statestore.ReadinessTaskState) { +func (t *DynamicReadinessTracker) handlePodsFromJobStatus(status *job.JobStatus, taskState *statestore.ReadinessTaskState) { for _, pod := range status.Pods { taskState.AddResourceState(pod.Name, taskState.Namespace(), podGvk) - taskState.AddDependency(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind(), pod.Name, taskState.Namespace(), podGvk) + + taskState.ResourceState(pod.Name, taskState.Namespace(), podGvk).RWTransaction(func(rs *statestore.ResourceState) { + setPodStatusAttribute(rs, pod.PodStatus.Phase) + }) } } -func (t *DynamicReadinessTracker) addMissingPodsStatesFromDeploymentPodAddedReport(report *deployment.PodAddedReport, taskState *statestore.ReadinessTaskState) { +func (t *DynamicReadinessTracker) handlePodsFromDeploymentPodAddedReport(report *deployment.PodAddedReport, taskState *statestore.ReadinessTaskState) { if !report.ReplicaSetPod.ReplicaSet.IsNew { return } taskState.AddResourceState(report.ReplicaSetPod.Name, taskState.Namespace(), podGvk) - taskState.AddDependency(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind(), report.ReplicaSetPod.Name, taskState.Namespace(), podGvk) + + for _, pod := range report.DeploymentStatus.Pods { + taskState.ResourceState(report.ReplicaSetPod.Name, taskState.Namespace(), podGvk).RWTransaction(func(rs *statestore.ResourceState) { + setPodStatusAttribute(rs, pod.PodStatus.Phase) + }) + } } -func (t *DynamicReadinessTracker) addMissingPodsStatesFromStatefulSetPodAddedReport(report *statefulset.PodAddedReport, taskState *statestore.ReadinessTaskState) { +func (t *DynamicReadinessTracker) handlePodsFromStatefulSetPodAddedReport(report *statefulset.PodAddedReport, taskState *statestore.ReadinessTaskState) { if !report.ReplicaSetPod.ReplicaSet.IsNew { return } taskState.AddResourceState(report.ReplicaSetPod.Name, taskState.Namespace(), podGvk) - taskState.AddDependency(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind(), report.ReplicaSetPod.Name, taskState.Namespace(), podGvk) + + for _, pod := range report.StatefulSetStatus.Pods { + taskState.ResourceState(report.ReplicaSetPod.Name, taskState.Namespace(), podGvk).RWTransaction(func(rs *statestore.ResourceState) { + setPodStatusAttribute(rs, pod.PodStatus.Phase) + }) + } } -func (t *DynamicReadinessTracker) addMissingPodsStatesFromDaemonSetPodAddedReport(report *daemonset.PodAddedReport, taskState *statestore.ReadinessTaskState) { +func (t *DynamicReadinessTracker) handlePodsFromDaemonSetPodAddedReport(report *daemonset.PodAddedReport, taskState *statestore.ReadinessTaskState) { if !report.Pod.ReplicaSet.IsNew { return } taskState.AddResourceState(report.Pod.Name, taskState.Namespace(), podGvk) - taskState.AddDependency(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind(), report.Pod.Name, taskState.Namespace(), podGvk) + + for _, pod := range report.DaemonSetStatus.Pods { + taskState.ResourceState(report.Pod.Name, taskState.Namespace(), podGvk).RWTransaction(func(rs *statestore.ResourceState) { + setPodStatusAttribute(rs, pod.PodStatus.Phase) + }) + } } -func (t *DynamicReadinessTracker) addMissingPodsStatesFromJobPodAddedReport(report *job.PodAddedReport, taskState *statestore.ReadinessTaskState) { +func (t *DynamicReadinessTracker) handlePodsFromJobPodAddedReport(report *job.PodAddedReport, taskState *statestore.ReadinessTaskState) { taskState.AddResourceState(report.PodName, taskState.Namespace(), podGvk) - taskState.AddDependency(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind(), report.PodName, taskState.Namespace(), podGvk) + + for _, pod := range report.JobStatus.Pods { + taskState.ResourceState(report.PodName, taskState.Namespace(), podGvk).RWTransaction(func(rs *statestore.ResourceState) { + setPodStatusAttribute(rs, pod.PodStatus.Phase) + }) + } } func (t *DynamicReadinessTracker) handleDeploymentStatus(status *deployment.DeploymentStatus, taskState *statestore.ReadinessTaskState) { if status.ReplicasIndicator != nil { - replicasAttr := statestore.Attribute[int]{ - Value: int(status.ReplicasIndicator.TargetValue), - Internal: true, - } - taskState.ResourceState(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind()).RWTransaction(func(rs *statestore.ResourceState) { - rs.SetAttribute(statestore.AttributeNameRequiredReplicas, replicasAttr) + setReplicasAttribute(rs, int(status.ReplicasIndicator.TargetValue)) }) } @@ -922,13 +970,8 @@ func (t *DynamicReadinessTracker) handleDeploymentStatus(status *deployment.Depl func (t *DynamicReadinessTracker) handleStatefulSetStatus(status *statefulset.StatefulSetStatus, taskState *statestore.ReadinessTaskState) { if status.ReplicasIndicator != nil { - replicasAttr := statestore.Attribute[int]{ - Value: int(status.ReplicasIndicator.TargetValue), - Internal: true, - } - taskState.ResourceState(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind()).RWTransaction(func(rs *statestore.ResourceState) { - rs.SetAttribute(statestore.AttributeNameRequiredReplicas, replicasAttr) + setReplicasAttribute(rs, int(status.ReplicasIndicator.TargetValue)) }) } @@ -951,13 +994,8 @@ func (t *DynamicReadinessTracker) handleStatefulSetStatus(status *statefulset.St func (t *DynamicReadinessTracker) handleDaemonSetStatus(status *daemonset.DaemonSetStatus, taskState *statestore.ReadinessTaskState) { if status.ReplicasIndicator != nil { - replicasAttr := statestore.Attribute[int]{ - Value: int(status.ReplicasIndicator.TargetValue), - Internal: true, - } - taskState.ResourceState(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind()).RWTransaction(func(rs *statestore.ResourceState) { - rs.SetAttribute(statestore.AttributeNameRequiredReplicas, replicasAttr) + setReplicasAttribute(rs, int(status.ReplicasIndicator.TargetValue)) }) } @@ -980,7 +1018,7 @@ func (t *DynamicReadinessTracker) handleDaemonSetStatus(status *daemonset.Daemon func (t *DynamicReadinessTracker) handleJobStatus(status *job.JobStatus, taskState *statestore.ReadinessTaskState) { taskState.ResourceState(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind()).RWTransaction(func(rs *statestore.ResourceState) { - rs.SetAttribute(statestore.AttributeNameRequiredReplicas, 1) + setReplicasAttribute(rs, 1) }) if status.IsFailed { @@ -1019,6 +1057,12 @@ func (t *DynamicReadinessTracker) handleCanaryStatus(status *canary.CanaryStatus } func (t *DynamicReadinessTracker) handleGenericResourceStatus(status *generic.ResourceStatus, taskState *statestore.ReadinessTaskState) { + if status.Indicator != nil { + taskState.ResourceState(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind()).RWTransaction(func(rs *statestore.ResourceState) { + setGenericConditionAttributes(rs, status) + }) + } + if status.IsFailed() { taskState.ResourceState(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind()).RWTransaction(func(rs *statestore.ResourceState) { rs.AddError(errors.New(status.FailureReason()), "", time.Now()) @@ -1071,6 +1115,56 @@ func (t *DynamicReadinessTracker) handleReplicaSetPodLogChunk(logChunk *replicas } func (t *DynamicReadinessTracker) handlePodLogChunk(logChunk *pod.PodLogChunk, logStore *logstore.LogStore, taskState *statestore.ReadinessTaskState) { + if t.ignoreLogs { + return + } + + for _, ignoreLogForContainer := range t.ignoreLogsForContainers { + if ignoreLogForContainer == logChunk.ContainerName { + return + } + } + + if len(t.saveLogsOnlyForContainers) > 0 { + var save bool + for _, saveLogsOnlyForContainer := range t.saveLogsOnlyForContainers { + if saveLogsOnlyForContainer == logChunk.ContainerName { + save = true + break + } + } + if !save { + return + } + } + + logLines := logChunk.LogLines + if t.saveLogsByRegex != nil { + var filteredLogLines []display.LogLine + for _, logLine := range logLines { + if t.saveLogsByRegex.MatchString(logLine.Message) { + filteredLogLines = append(filteredLogLines, logLine) + } + } + logLines = filteredLogLines + } + + if len(t.saveLogsByRegexForContainers) > 0 { + if regex, ok := t.saveLogsByRegexForContainers[logChunk.ContainerName]; ok { + var filteredLogLines []display.LogLine + for _, logLine := range logLines { + if regex.MatchString(logLine.Message) { + filteredLogLines = append(filteredLogLines, logLine) + } + } + logLines = filteredLogLines + } + } + + if len(logLines) == 0 { + return + } + namespace := taskState.Namespace() var resourceLogs *util.Concurrent[*logstore.ResourceLogs] @@ -1090,7 +1184,7 @@ func (t *DynamicReadinessTracker) handlePodLogChunk(logChunk *pod.PodLogChunk, l logStore.AddResourceLogs(resourceLogs) } - for _, line := range logChunk.LogLines { + for _, line := range logLines { resourceLogs.RWTransaction(func(rl *logstore.ResourceLogs) { rl.AddLogLine(line.Message, "container/"+logChunk.ContainerName, lo.Must(time.Parse(time.RFC3339, line.Timestamp))) }) @@ -1098,6 +1192,10 @@ func (t *DynamicReadinessTracker) handlePodLogChunk(logChunk *pod.PodLogChunk, l } func (t *DynamicReadinessTracker) handleEventMessage(msg string, taskState *statestore.ReadinessTaskState, timestamp time.Time) { + if !t.saveEvents { + return + } + resourceState := taskState.ResourceState(taskState.Name(), taskState.Namespace(), taskState.GroupVersionKind()) resourceState.RWTransaction(func(rs *statestore.ResourceState) { @@ -1123,6 +1221,56 @@ func (t *DynamicReadinessTracker) handleTaskStateStatus(taskState *statestore.Re return abort, abortErr } +func setReplicasAttribute(resourceState *statestore.ResourceState, replicas int) { + attributes := resourceState.Attributes() + + if replicasAttr, found := lo.Find(attributes, func(attr statestore.Attributer) bool { + return attr.Name() == statestore.AttributeNameRequiredReplicas + }); found { + replicasAttr.(*statestore.Attribute[int]).Value = replicas + } else { + replicasAttr = statestore.NewAttribute(statestore.AttributeNameRequiredReplicas, replicas) + resourceState.AddAttribute(replicasAttr) + } +} + +func setPodStatusAttribute(resourceState *statestore.ResourceState, phase corev1.PodPhase) { + attributes := resourceState.Attributes() + + if statusAttr, found := lo.Find(attributes, func(attr statestore.Attributer) bool { + return attr.Name() == statestore.AttributeNameStatus + }); found { + statusAttr.(*statestore.Attribute[string]).Value = string(phase) + } else { + statusAttr = statestore.NewAttribute(statestore.AttributeNameStatus, string(phase)) + resourceState.AddAttribute(statusAttr) + } +} + +func setGenericConditionAttributes(resourceState *statestore.ResourceState, resourceStatus *generic.ResourceStatus) { + attributes := resourceState.Attributes() + + if conditionTargetAttr, found := lo.Find(attributes, func(attr statestore.Attributer) bool { + return attr.Name() == statestore.AttributeNameConditionTarget + }); found { + conditionTargetAttr.(*statestore.Attribute[string]).Value = resourceStatus.HumanConditionPath() + } else { + conditionTargetAttr = statestore.NewAttribute(statestore.AttributeNameConditionTarget, resourceStatus.HumanConditionPath()) + resourceState.AddAttribute(conditionTargetAttr) + } + + if resourceStatus.Indicator.Value != "" { + if conditionCurrentValueAttr, found := lo.Find(attributes, func(attr statestore.Attributer) bool { + return attr.Name() == statestore.AttributeNameConditionCurrentValue + }); found { + conditionCurrentValueAttr.(*statestore.Attribute[string]).Value = resourceStatus.Indicator.Value + } else { + conditionCurrentValueAttr = statestore.NewAttribute(statestore.AttributeNameConditionCurrentValue, resourceStatus.Indicator.Value) + resourceState.AddAttribute(conditionCurrentValueAttr) + } + } +} + var podGvk = schema.GroupVersionKind{ Group: "", Version: "v1", diff --git a/pkg/trackers/dyntracker/statestore/attribute.go b/pkg/trackers/dyntracker/statestore/attribute.go index 799a76b..75a239c 100644 --- a/pkg/trackers/dyntracker/statestore/attribute.go +++ b/pkg/trackers/dyntracker/statestore/attribute.go @@ -1,12 +1,29 @@ package statestore const ( - AttributeNameRequiredReplicas = "RequiredReplicas" + AttributeNameRequiredReplicas = "required replicas" + AttributeNameStatus = "status" + AttributeNameConditionTarget = "condition target" + AttributeNameConditionCurrentValue = "condition current value" ) -type Attributer interface{} +type Attributer interface { + Name() string +} + +func NewAttribute[T int | string](name string, value T) *Attribute[T] { + return &Attribute[T]{ + Value: value, + name: name, + } +} + +type Attribute[T int | string] struct { + Value T + + name string +} -type Attribute[T any] struct { - Value T - Internal bool +func (a *Attribute[T]) Name() string { + return a.name } diff --git a/pkg/trackers/dyntracker/statestore/readiness_task_state.go b/pkg/trackers/dyntracker/statestore/readiness_task_state.go index 7f36ca5..32851a2 100644 --- a/pkg/trackers/dyntracker/statestore/readiness_task_state.go +++ b/pkg/trackers/dyntracker/statestore/readiness_task_state.go @@ -144,9 +144,10 @@ func initReadinessTaskStateReadyConditions() []ReadinessTaskConditionFn { readyConditions = append(readyConditions, func(taskState *ReadinessTaskState) bool { resourcesReadyRequired := 1 taskState.ResourceState(taskState.name, taskState.namespace, taskState.groupVersionKind).RTransaction(func(s *ResourceState) { - attrs := s.Attributes() - if attr, found := attrs[AttributeNameRequiredReplicas]; found { - resourcesReadyRequired += attr.(Attribute[int]).Value + if replicasAttr, found := lo.Find(s.Attributes(), func(attr Attributer) bool { + return attr.Name() == AttributeNameRequiredReplicas + }); found { + resourcesReadyRequired += replicasAttr.(*Attribute[int]).Value } }) diff --git a/pkg/trackers/dyntracker/statestore/resource_state.go b/pkg/trackers/dyntracker/statestore/resource_state.go index 40ecdf6..c5b88c4 100644 --- a/pkg/trackers/dyntracker/statestore/resource_state.go +++ b/pkg/trackers/dyntracker/statestore/resource_state.go @@ -14,7 +14,7 @@ type ResourceState struct { groupVersionKind schema.GroupVersionKind status ResourceStatus - attributes map[string]Attributer + attributes []Attributer events []*Event errors map[string][]*Error } @@ -25,7 +25,6 @@ func NewResourceState(name, namespace string, groupVersionKind schema.GroupVersi namespace: namespace, groupVersionKind: groupVersionKind, status: ResourceStatusCreated, - attributes: make(map[string]Attributer), errors: make(map[string][]*Error), } } @@ -86,18 +85,12 @@ func (s *ResourceState) Events() []*Event { return append([]*Event{}, s.events...) } -func (s *ResourceState) SetAttribute(name string, attr Attributer) { - s.attributes[name] = attr +func (s *ResourceState) AddAttribute(attr Attributer) { + s.attributes = append(s.attributes, attr) } -func (s *ResourceState) Attributes() map[string]Attributer { - result := make(map[string]Attributer) - - for name, attr := range s.attributes { - result[name] = attr - } - - return result +func (s *ResourceState) Attributes() []Attributer { + return append([]Attributer{}, s.attributes...) } func (s *ResourceState) ID() string {