diff --git a/charts/kubeai/templates/configmap.yaml b/charts/kubeai/templates/configmap.yaml index 39d3e75f..c4b8f482 100644 --- a/charts/kubeai/templates/configmap.yaml +++ b/charts/kubeai/templates/configmap.yaml @@ -12,6 +12,8 @@ data: {{- .Values.resourceProfiles | toYaml | nindent 6 }} modelServers: {{- .Values.modelServers | toYaml | nindent 6 }} + modelRollouts: + {{- .Values.modelRollouts | toYaml | nindent 6 }} modelServerPods: {{- if .Values.modelServerPods }} {{- if .Values.modelServerPods.podSecurityContext }} diff --git a/charts/kubeai/values.yaml b/charts/kubeai/values.yaml index 3f09a332..22b6327d 100644 --- a/charts/kubeai/values.yaml +++ b/charts/kubeai/values.yaml @@ -44,6 +44,10 @@ modelServerPods: drop: - ALL +modelRollouts: + # The number of replicas to add when rolling out a new model. + surge: 1 + resourceProfiles: cpu: imageName: "cpu" diff --git a/hack/dev-config.yaml b/hack/dev-config.yaml index e4c92298..98dcc94f 100644 --- a/hack/dev-config.yaml +++ b/hack/dev-config.yaml @@ -9,6 +9,8 @@ modelServers: images: default: "ollama/ollama:latest" cpu: "ollama/ollama:0.3.8" +modelRollouts: + surge: 0 messaging: errorMaxBackoff: 30s streams: [] @@ -18,8 +20,8 @@ messaging: resourceProfiles: cpu: requests: - cpu: 1 - memory: 2Gi + cpu: 0.5 + memory: 1Gi nvidia-gpu-l4: limits: nvidia.com/gpu: "1" diff --git a/internal/config/system.go b/internal/config/system.go index 7fdda7ca..4f550744 100644 --- a/internal/config/system.go +++ b/internal/config/system.go @@ -33,6 +33,8 @@ type System struct { ModelAutoscaling ModelAutoscaling `json:"modelAutoscaling" validate:"required"` ModelServerPods ModelServerPods `json:"modelServerPods,omitempty"` + + ModelRollouts ModelRollouts `json:"modelRollouts"` } func (s *System) DefaultAndValidate() error { @@ -59,6 +61,11 @@ func (s *System) DefaultAndValidate() error { return validator.New(validator.WithRequiredStructEnabled()).Struct(s) } +type ModelRollouts struct { + // Surge is the number of additional Pods to create when rolling out an update. + Surge int32 `json:"surge"` +} + type ModelAutoscaling struct { // Interval is the time between each autoscaling check. // Defaults to 10 seconds. diff --git a/internal/k8sutils/pods.go b/internal/k8sutils/pods.go index 6de4b050..96d2dcf6 100644 --- a/internal/k8sutils/pods.go +++ b/internal/k8sutils/pods.go @@ -10,6 +10,10 @@ import ( "k8s.io/apimachinery/pkg/util/rand" ) +func PodIsScheduled(pod *corev1.Pod) bool { + return pod.Spec.NodeName != "" +} + func PodIsReady(pod *corev1.Pod) bool { for _, cond := range pod.Status.Conditions { if cond.Type == corev1.PodReady && cond.Status == corev1.ConditionTrue { diff --git a/internal/manager/run.go b/internal/manager/run.go index 7c2e2ab3..57f1e43b 100644 --- a/internal/manager/run.go +++ b/internal/manager/run.go @@ -169,6 +169,7 @@ func Run(ctx context.Context, k8sCfg *rest.Config, cfg config.System) error { ResourceProfiles: cfg.ResourceProfiles, ModelServers: cfg.ModelServers, ModelServerPods: cfg.ModelServerPods, + ModelRollouts: cfg.ModelRollouts, } if err = modelReconciler.SetupWithManager(mgr); err != nil { return fmt.Errorf("unable to create Model controller: %w", err) diff --git a/internal/modelcontroller/model_controller.go b/internal/modelcontroller/model_controller.go index 88257bc3..0c1658b0 100644 --- a/internal/modelcontroller/model_controller.go +++ b/internal/modelcontroller/model_controller.go @@ -23,6 +23,7 @@ import ( "sort" "strconv" "strings" + "time" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" @@ -51,6 +52,7 @@ type ModelReconciler struct { ResourceProfiles map[string]config.ResourceProfile ModelServers config.ModelServers ModelServerPods config.ModelServerPods + ModelRollouts config.ModelRollouts } // +kubebuilder:rbac:groups=kubeai.org,resources=models,verbs=get;list;watch;create;update;patch;delete @@ -99,8 +101,17 @@ func (r *ModelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl } plan := r.calculatePodPlan(allPods, model, modelConfig) - if err := plan.execute(ctx, r.Client, r.Scheme); err != nil { - return ctrl.Result{}, fmt.Errorf("executing pod plan: %w", err) + if plan.containsActions() { + changed, err := plan.execute(ctx, r.Client, r.Scheme) + if changed { + // Slow things down to wait for caches to sync. + // This is important because the pod plan has some calculations that + // assume the cache is up to date. + time.Sleep(3 * time.Second) + } + if err != nil { + return ctrl.Result{}, fmt.Errorf("executing pod plan: %w", err) + } } // Summarize all pods. @@ -125,6 +136,7 @@ func (r *ModelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl // SetupWithManager sets up the controller with the Manager. func (r *ModelReconciler) SetupWithManager(mgr ctrl.Manager) error { + // TODO: Set Model concurrency. Pod rollouts can be slow. return ctrl.NewControllerManagedBy(mgr). For(&kubeaiv1.Model{}). Owns(&corev1.Pod{}). @@ -140,7 +152,7 @@ func (r *ModelReconciler) apply(ctx context.Context, model *kubeaiv1.Model, obj } */ -func (r *ModelReconciler) vLLMPodForModel(m *kubeaiv1.Model, profile ModelConfig, name string) *corev1.Pod { +func (r *ModelReconciler) vLLMPodForModel(m *kubeaiv1.Model, profile ModelConfig) *corev1.Pod { lbs := labelsForModel(m) ann := r.annotationsForModel(m) if _, ok := ann[kubeaiv1.ModelPodPortAnnotation]; !ok { @@ -187,7 +199,6 @@ func (r *ModelReconciler) vLLMPodForModel(m *kubeaiv1.Model, profile ModelConfig pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: name, Namespace: m.Namespace, Labels: lbs, Annotations: ann, @@ -279,7 +290,7 @@ func (r *ModelReconciler) vLLMPodForModel(m *kubeaiv1.Model, profile ModelConfig return pod } -func (r *ModelReconciler) oLlamaPodForModel(m *kubeaiv1.Model, profile ModelConfig, name string) *corev1.Pod { +func (r *ModelReconciler) oLlamaPodForModel(m *kubeaiv1.Model, profile ModelConfig) *corev1.Pod { lbs := labelsForModel(m) ann := r.annotationsForModel(m) @@ -340,7 +351,6 @@ func (r *ModelReconciler) oLlamaPodForModel(m *kubeaiv1.Model, profile ModelConf pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: name, Namespace: m.Namespace, Labels: lbs, Annotations: ann, @@ -441,7 +451,7 @@ func (r *ModelReconciler) oLlamaPodForModel(m *kubeaiv1.Model, profile ModelConf } -func (r *ModelReconciler) fasterWhisperPodForModel(m *kubeaiv1.Model, profile ModelConfig, name string) *corev1.Pod { +func (r *ModelReconciler) fasterWhisperPodForModel(m *kubeaiv1.Model, profile ModelConfig) *corev1.Pod { lbs := labelsForModel(m) ann := r.annotationsForModel(m) if _, ok := ann[kubeaiv1.ModelPodPortAnnotation]; !ok { @@ -489,7 +499,6 @@ func (r *ModelReconciler) fasterWhisperPodForModel(m *kubeaiv1.Model, profile Mo pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: name, Namespace: m.Namespace, Labels: lbs, Annotations: ann, @@ -579,7 +588,7 @@ func (r *ModelReconciler) fasterWhisperPodForModel(m *kubeaiv1.Model, profile Mo return pod } -func (r *ModelReconciler) infinityPodForModel(m *kubeaiv1.Model, profile ModelConfig, name string) *corev1.Pod { +func (r *ModelReconciler) infinityPodForModel(m *kubeaiv1.Model, profile ModelConfig) *corev1.Pod { lbs := labelsForModel(m) ann := r.annotationsForModel(m) @@ -644,7 +653,6 @@ func (r *ModelReconciler) infinityPodForModel(m *kubeaiv1.Model, profile ModelCo pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: name, Namespace: m.Namespace, Labels: lbs, Annotations: ann, diff --git a/internal/modelcontroller/pod_plan.go b/internal/modelcontroller/pod_plan.go new file mode 100644 index 00000000..12951f5d --- /dev/null +++ b/internal/modelcontroller/pod_plan.go @@ -0,0 +1,219 @@ +package modelcontroller + +import ( + "context" + "fmt" + "math" + "sort" + "strings" + + kubeaiv1 "github.com/substratusai/kubeai/api/v1" + "github.com/substratusai/kubeai/internal/k8sutils" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +// calculatePodPlan calculates the Pod plan for the given Model. +// It assumes the list of Pods represents an accurate snapshot of the current state. +// It returns a Pod plan that contains Pods to create and delete. +// If a rollout is required, it will return a Pod plan that: +// - Adds a surge Pod +// - Recreates any out-of-date Pod that is not Ready immediately +// - Waits for all Pods to be Ready before recreating any out-of-date Pods that are Ready +func (r *ModelReconciler) calculatePodPlan(allPods *corev1.PodList, model *kubeaiv1.Model, modelConfig ModelConfig) *podPlan { + var podForModel *corev1.Pod + switch model.Spec.Engine { + case kubeaiv1.OLlamaEngine: + podForModel = r.oLlamaPodForModel(model, modelConfig) + case kubeaiv1.FasterWhisperEngine: + podForModel = r.fasterWhisperPodForModel(model, modelConfig) + case kubeaiv1.InfinityEngine: + podForModel = r.infinityPodForModel(model, modelConfig) + default: + podForModel = r.vLLMPodForModel(model, modelConfig) + } + expectedHash := k8sutils.PodHash(podForModel.Spec) + podForModel.GenerateName = fmt.Sprintf("model-%s-%s-", model.Name, expectedHash) + k8sutils.SetLabel(podForModel, kubeaiv1.PodHashLabel, expectedHash) + + var ( + readyAll int + outOfDate []corev1.Pod + ) + + sortPodsByDeletionOrder(allPods.Items, expectedHash) + + for _, p := range allPods.Items { + upToDate := k8sutils.GetLabel(&p, kubeaiv1.PodHashLabel) == expectedHash + + if k8sutils.PodIsReady(&p) { + readyAll++ + } + + if !upToDate { + outOfDate = append(outOfDate, p) + } + } + + var ( + details []string + toCreate []*corev1.Pod + toDelete []*corev1.Pod + ) + + var desiredReplicas int32 + // NOTE: Replicas could be nil if autoscaling is disabled. + if model.Spec.Replicas != nil { + desiredReplicas = *model.Spec.Replicas + } + if len(outOfDate) > 0 { + desiredReplicas += r.ModelRollouts.Surge + } + observedReplicas := int32(len(allPods.Items)) + replicaDiff := observedReplicas - desiredReplicas + replicaDiffAbs := int32(math.Abs(float64(replicaDiff))) + + switch { + case replicaDiff == 0: + // At correct scale. + case replicaDiff < 0: + // Create Pods. + details = append(details, fmt.Sprintf("Creating %d Pods", replicaDiffAbs)) + for i := int32(0); i < replicaDiffAbs; i++ { + toCreate = append(toCreate, podForModel.DeepCopy()) + } + case replicaDiff > 0: + // Delete Pods. + details = append(details, fmt.Sprintf("Deleting %d Pods", replicaDiffAbs)) + toDeleteCount := replicaDiffAbs + for _, pod := range allPods.Items { + if toDeleteCount == 0 { + break + } + toDelete = append(toDelete, &pod) + toDeleteCount-- + } + } + + var recreated int + for _, pod := range outOfDate { + if !k8sutils.PodIsReady(&pod) { + details = append(details, fmt.Sprintf("Out-of-date Pod %q is not ready, immediately recreating", pod.Name)) + toDelete = append(toDelete, &pod) + // Avoid recreating the surge Pod when rollout is complete. + if recreated < len(outOfDate)-int(r.ModelRollouts.Surge) { + toCreate = append(toCreate, podForModel.DeepCopy()) + recreated++ + } + continue + } + if readyAll == int(desiredReplicas) { + details = append(details, fmt.Sprintf("All Pods ready, recreating out-of-date Pod %q", pod.Name)) + toDelete = append(toDelete, &pod) + // Avoid recreating the surge Pod when rollout is complete. + if recreated < len(outOfDate)-int(r.ModelRollouts.Surge) { + toCreate = append(toCreate, podForModel.DeepCopy()) + recreated++ + } + break + } + } + + return &podPlan{ + model: model, + toCreate: toCreate, + toDelete: toDelete, + details: details, + } +} + +type podPlan struct { + model *kubeaiv1.Model + toCreate []*corev1.Pod + toDelete []*corev1.Pod + details []string +} + +func (pp *podPlan) containsActions() bool { + return len(pp.toCreate) > 0 || len(pp.toDelete) > 0 +} + +// execute returns true if a Pod was created or deleted. +func (pp *podPlan) execute(ctx context.Context, client client.Client, scheme *runtime.Scheme) (bool, error) { + log := log.FromContext(ctx) + + detailsCSV := strings.Join(pp.details, ", ") + log.Info("Executing Pod plan", "modelName", pp.model.Name, "details", detailsCSV) + + var changed bool + + // Delete before create to avoid unnecessary Node scale-ups. + for _, pod := range pp.toDelete { + if err := client.Delete(ctx, &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: pod.Namespace, + Name: pod.Name, + }, + }); err != nil { + if apierrors.IsNotFound(err) { + log.Info("Pod already deleted", "podName", pod.Name) + } else { + return changed, fmt.Errorf("deleting pod: %w", err) + } + } + changed = true + } + + for _, pod := range pp.toCreate { + if err := ctrl.SetControllerReference(pp.model, pod, scheme); err != nil { + return changed, fmt.Errorf("setting controller reference: %w", err) + } + if err := client.Create(ctx, pod, k8sutils.DefaultCreateOptions()); err != nil { + if apierrors.IsAlreadyExists(err) { + log.Info("Pod already exists", "podName", pod.Name) + } else { + return changed, fmt.Errorf("creating pod: %w", err) + } + } + changed = true + } + + return changed, nil +} + +// sortPodsByDeletionOrder ensures Pods that are to be deleted/recreated +// first are lower index. +func sortPodsByDeletionOrder(pods []corev1.Pod, expectedHash string) { + sort.SliceStable(pods, func(i, j int) bool { + // Not ready Pods should be deleted first. + iReady := k8sutils.PodIsReady(&pods[i]) + jReady := k8sutils.PodIsReady(&pods[j]) + if iReady != jReady { + return !iReady + } + + // Unscheduled Pods should be deleted first. + iScheduled := k8sutils.PodIsScheduled(&pods[i]) + jScheduled := k8sutils.PodIsScheduled(&pods[j]) + if iScheduled != jScheduled { + return !iScheduled + } + + // Delete Pods that are from older hash first + iHash := k8sutils.GetLabel(&pods[i], kubeaiv1.PodHashLabel) + jHash := k8sutils.GetLabel(&pods[j], kubeaiv1.PodHashLabel) + if iHash != jHash { + return iHash != expectedHash + } + + // Younger Pods should be deleted first. + iCreationTime := pods[i].CreationTimestamp.Time + jCreationTime := pods[j].CreationTimestamp.Time + return iCreationTime.After(jCreationTime) + }) +} diff --git a/internal/modelcontroller/pod_plan_test.go b/internal/modelcontroller/pod_plan_test.go new file mode 100644 index 00000000..8bc717fc --- /dev/null +++ b/internal/modelcontroller/pod_plan_test.go @@ -0,0 +1,437 @@ +package modelcontroller + +import ( + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + v1 "github.com/substratusai/kubeai/api/v1" + "github.com/substratusai/kubeai/internal/config" + "github.com/substratusai/kubeai/internal/k8sutils" + "golang.org/x/exp/rand" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/ptr" +) + +const ( + testNewHash = "expected-hash" +) + +var ( + testYoungTS = metav1.NewTime(time.Now()) + testOldTS = metav1.NewTime(testYoungTS.Add(-time.Hour)) +) + +func Test_calculatePodPlan(t *testing.T) { + r := &ModelReconciler{ + ModelRollouts: config.ModelRollouts{ + Surge: 1, + }, + } + + model := &v1.Model{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-mdl", + Namespace: "test-ns", + }, + Spec: v1.ModelSpec{ + Engine: v1.VLLMEngine, + Replicas: ptr.To[int32](3), + }, + } + + modelConfig := ModelConfig{ + ResourceProfile: config.ResourceProfile{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + NodeSelector: map[string]string{ + "node": "selector", + }, + }, + } + + expectedHash := k8sutils.PodHash(r.vLLMPodForModel(model, modelConfig).Spec) + + type readiness bool + const ready = readiness(true) + const unready = readiness(false) + + testPod := func(name string, hash string, rdy readiness) corev1.Pod { + p := corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + v1.PodHashLabel: hash, + }, + }, + } + if rdy == ready { + p.Status.Conditions = []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + } + } + return p + } + + cases := []struct { + name string + replicas int32 + pods []corev1.Pod + wantNCreations int + wantDeletions []string + }{ + { + name: "do nothing", + pods: []corev1.Pod{ + testPod("up-to-date-ready-1", expectedHash, ready), + testPod("up-to-date-ready-2", expectedHash, ready), + testPod("up-to-date-unready-3", expectedHash, unready), + }, + }, + { + name: "scale up", + pods: []corev1.Pod{ + testPod("up-to-date-1", expectedHash, ready), + }, + wantNCreations: 2, + }, + { + name: "scale down", + pods: []corev1.Pod{ + testPod("ready-up-to-date-1", expectedHash, ready), + testPod("ready-up-to-date-2", expectedHash, ready), + testPod("unready-up-to-date", expectedHash, unready), + testPod("ready-up-to-date-3", expectedHash, ready), + }, + wantDeletions: []string{"unready-up-to-date"}, + }, + { + name: "rollout add surge and delete unreadies", + pods: []corev1.Pod{ + testPod("unready-out-of-date-1", "old-hash", unready), + testPod("unready-out-of-date-2", "old-hash", unready), + testPod("ready-out-of-date-3", "old-hash", ready), + }, + wantNCreations: 1 + 2, // Expect surge Pod + 2 recreations. + wantDeletions: []string{ + // Expect unready Pods to be deleted immediately even though all Pods are not ready. + "unready-out-of-date-1", "unready-out-of-date-2", + }, + }, + { + name: "rollout wait for readiness before deleting last out of date pod", + pods: []corev1.Pod{ + testPod("surge-pod", expectedHash, ready), + testPod("unready-up-to-date-1", expectedHash, unready), + testPod("unready-up-to-date-2", expectedHash, unready), + testPod("ready-out-of-date-3", "old-hash", ready), + }, + }, + { + name: "rollout delete ready out of date pod", + pods: []corev1.Pod{ + testPod("surge-pod", expectedHash, ready), + testPod("unready-up-to-date-1", expectedHash, ready), + testPod("unready-up-to-date-2", expectedHash, ready), + testPod("ready-out-of-date-3", "old-hash", ready), + }, + wantNCreations: 0, + wantDeletions: []string{"ready-out-of-date-3"}, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + plan := r.calculatePodPlan(&corev1.PodList{Items: c.pods}, model, modelConfig) + detailsCSV := strings.Join(plan.details, ", ") + require.Lenf(t, plan.toCreate, c.wantNCreations, "Unexpected creation count, details: %v", detailsCSV) + var deletionNames []string + for _, p := range plan.toDelete { + deletionNames = append(deletionNames, p.Name) + } + require.Lenf(t, deletionNames, len(c.wantDeletions), "Unexpected deletion count, details: %v", detailsCSV) + require.Equalf(t, c.wantDeletions, deletionNames, "Unexpected deleteion names, details: %v", detailsCSV) + }) + } +} + +func Test_sortPodsByDeletionOrder(t *testing.T) { + cases := []struct { + name string + pods []corev1.Pod + want []string + }{ + { + name: "empty", + pods: []corev1.Pod{}, + want: nil, + }, + { + name: "hash comparison", + pods: []corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "expected-hash-pod", + Labels: map[string]string{ + v1.PodHashLabel: testNewHash, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "old-hash-pod", + Labels: map[string]string{ + v1.PodHashLabel: "old-hash", + }, + }, + }, + }, + want: []string{ + "old-hash-pod", + "expected-hash-pod", + }, + }, + { + name: "ready comparison", + pods: []corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "ready-pod", + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "not-ready-pod", + }, + }, + }, + want: []string{ + "not-ready-pod", + "ready-pod", + }, + }, + { + name: "scheduled comparison", + pods: []corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "scheduled-pod", + }, + Spec: corev1.PodSpec{ + NodeName: "node", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "unscheduled-pod", + }, + }, + }, + want: []string{ + "unscheduled-pod", + "scheduled-pod", + }, + }, + { + name: "creation time comparison", + pods: []corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "old-pod", + CreationTimestamp: testOldTS, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "young-pod", + CreationTimestamp: testYoungTS, + }, + }, + }, + want: []string{ + "young-pod", + "old-pod", + }, + }, + { + name: "all", + pods: []corev1.Pod{ + youngReadyScheduledOldHashPod(), + youngUnreadyScheduledNewHashPod(), + youngUnreadyUnscheduledOldHashPod(), + oldUnreadyUnscheduledOldHashPod(), + youngUnreadyScheduledOldHashPod(), + youngReadyScheduledNewHashPod(), + oldUnreadyScheduledNewHashPod(), + }, + want: []string{ + youngUnreadyUnscheduledOldHashPod().Name, + oldUnreadyUnscheduledOldHashPod().Name, + youngUnreadyScheduledOldHashPod().Name, + youngUnreadyScheduledNewHashPod().Name, + oldUnreadyScheduledNewHashPod().Name, + youngReadyScheduledOldHashPod().Name, + youngReadyScheduledNewHashPod().Name, + }, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + // Run a lot of times with random input ordering. + for i := 0; i < 10000; i++ { + // Copy the slice to avoid modifying the original slice. + pods := append([]corev1.Pod(nil), c.pods...) + + randomizePodOrder(pods) + + sortPodsByDeletionOrder(pods, testNewHash) + + var namesAfter []string + for _, p := range pods { + namesAfter = append(namesAfter, p.Name) + } + require.Equal(t, c.want, namesAfter) + } + }) + } +} + +func randomizePodOrder(pods []corev1.Pod) { + for i := range pods { + j := rand.Intn(len(pods)) + pods[i], pods[j] = pods[j], pods[i] + } +} + +func youngReadyScheduledNewHashPod() corev1.Pod { + return corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "young-ready-scheduled-new-hash-pod", + Labels: map[string]string{ + v1.PodHashLabel: testNewHash, + }, + CreationTimestamp: testYoungTS, + }, + Spec: corev1.PodSpec{ + NodeName: "node", + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + }, + } +} + +func youngReadyScheduledOldHashPod() corev1.Pod { + + return corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "young-ready-scheduled-old-hash-pod", + Labels: map[string]string{ + v1.PodHashLabel: "old-hash", + }, + CreationTimestamp: testYoungTS, + }, + Spec: corev1.PodSpec{ + NodeName: "node", + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + }, + } +} + +func youngUnreadyScheduledOldHashPod() corev1.Pod { + return corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "young-unready-scheduled-old-hash-pod", + Labels: map[string]string{ + v1.PodHashLabel: "old-hash", + }, + CreationTimestamp: testYoungTS, + }, + Spec: corev1.PodSpec{ + NodeName: "node", + }, + } +} + +func oldUnreadyScheduledNewHashPod() corev1.Pod { + return corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "old-unready-scheduled-new-hash-pod", + Labels: map[string]string{ + v1.PodHashLabel: testNewHash, + }, + CreationTimestamp: testOldTS, + }, + Spec: corev1.PodSpec{ + NodeName: "node", + }, + } +} + +func youngUnreadyScheduledNewHashPod() corev1.Pod { + return corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "young-unready-scheduled-new-hash-pod", + Labels: map[string]string{ + v1.PodHashLabel: testNewHash, + }, + CreationTimestamp: testYoungTS, + }, + Spec: corev1.PodSpec{ + NodeName: "node", + }, + } +} + +func youngUnreadyUnscheduledOldHashPod() corev1.Pod { + return corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "young-unready-unscheduled-old-hash-pod", + Labels: map[string]string{ + v1.PodHashLabel: "old-hash", + }, + CreationTimestamp: testYoungTS, + }, + } +} + +func oldUnreadyUnscheduledOldHashPod() corev1.Pod { + return corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "old-unready-unscheduled-old-hash-pod", + Labels: map[string]string{ + v1.PodHashLabel: "old-hash", + }, + CreationTimestamp: testOldTS, + }, + } +} diff --git a/internal/modelcontroller/pods.go b/internal/modelcontroller/pods.go deleted file mode 100644 index 2c7f3e53..00000000 --- a/internal/modelcontroller/pods.go +++ /dev/null @@ -1,133 +0,0 @@ -package modelcontroller - -import ( - "context" - "fmt" - - kubeaiv1 "github.com/substratusai/kubeai/api/v1" - "github.com/substratusai/kubeai/internal/k8sutils" - corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/log" -) - -func (r *ModelReconciler) calculatePodPlan(allPods *corev1.PodList, model *kubeaiv1.Model, modelConfig ModelConfig) *podPlan { - var desiredReplicas int32 - // NOTE: Replicas could be nil if autoscaling is disabled. - if model.Spec.Replicas != nil { - desiredReplicas = *model.Spec.Replicas - } - - var podForModel func(*kubeaiv1.Model, ModelConfig, string) *corev1.Pod - switch model.Spec.Engine { - case kubeaiv1.OLlamaEngine: - podForModel = r.oLlamaPodForModel - case kubeaiv1.FasterWhisperEngine: - podForModel = r.fasterWhisperPodForModel - case kubeaiv1.InfinityEngine: - podForModel = r.infinityPodForModel - default: - podForModel = r.vLLMPodForModel - } - - var ( - toCreate []*corev1.Pod - toDelete []*corev1.Pod - ) - - podMap := make(map[string]corev1.Pod, len(allPods.Items)) - for _, pod := range allPods.Items { - podMap[pod.Name] = pod - } - - // Loop through a deterministic list of Pod names and create or delete Pods as needed. - // Because the client used to list Pods is a cache client, the exact number of Pods - // returned may not be accurate which is why we don't just compare total count of Pods - // to desired replicas. - for i := int32(0); i < desiredReplicas; i++ { - name := fmt.Sprintf("model-%s-%d", model.Name, i) - - expectedPod := podForModel(model, modelConfig, name) - // TODO: If collisions become an issue, we can add a Model.Status.CollisionCount (new field) - // to the PodHash call. - expectedPodHash := k8sutils.PodHash(expectedPod.Spec) - k8sutils.SetLabel(expectedPod, kubeaiv1.PodHashLabel, expectedPodHash) - - currentPod, ok := podMap[name] - if ok { - // Already exists, compare to see if it needs to be recreated. - // TODO: Consider implementing a rollout strategy. - // Right now, we just delete and recreate all Pods. - // This is probably OK for now since the proxy will handle - // the queuing of incoming requests until the new Pods are ready. - // (albeit with some failures). - if k8sutils.GetLabel(¤tPod, kubeaiv1.PodHashLabel) != expectedPodHash { - toDelete = append(toDelete, ¤tPod) - toCreate = append(toCreate, expectedPod) - } - } else { - toCreate = append(toCreate, expectedPod) - } - - // Remove the Pod from the map so we can delete any remaining Pods. - delete(podMap, name) - } - - // Delete the remaining pods. - for _, pod := range podMap { - toDelete = append(toDelete, &pod) - } - - return &podPlan{ - model: model, - toCreate: toCreate, - toDelete: toDelete, - } -} - -type podPlan struct { - model *kubeaiv1.Model - toCreate []*corev1.Pod - toDelete []*corev1.Pod -} - -func (pp *podPlan) execute(ctx context.Context, client client.Client, scheme *runtime.Scheme) error { - log := log.FromContext(ctx) - - // Delete before create to avoid unnecessary Node scale-ups. - for _, pod := range pp.toDelete { - log.Info("Deleting Pod", "podName", pod.Name) - if err := client.Delete(ctx, &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: pod.Namespace, - Name: pod.Name, - }, - }); err != nil { - if apierrors.IsNotFound(err) { - log.Info("Pod already deleted", "podName", pod.Name) - } else { - return fmt.Errorf("deleting pod: %w", err) - } - } - } - - for _, pod := range pp.toCreate { - if err := ctrl.SetControllerReference(pp.model, pod, scheme); err != nil { - return fmt.Errorf("setting controller reference: %w", err) - } - log.Info("Creating Pod", "podName", pod.Name) - if err := client.Create(ctx, pod, k8sutils.DefaultCreateOptions()); err != nil { - if apierrors.IsAlreadyExists(err) { - log.Info("Pod already exists", "podName", pod.Name) - } else { - return fmt.Errorf("creating pod: %w", err) - } - } - } - - return nil -} diff --git a/test/integration/messenger_test.go b/test/integration/messenger_test.go index d27f78a2..1a814526 100644 --- a/test/integration/messenger_test.go +++ b/test/integration/messenger_test.go @@ -53,8 +53,8 @@ func TestMessenger(t *testing.T) { // Assert on replicas before completing the request - otherwise there is a race condition // with the autoscaler. - requireModelReplicas(t, m, 1, "Replicas should be scaled up to 1 to process messaging request", 3*time.Second) - requireModelPods(t, m, 1, "Pod should be created for the messaging request", 3*time.Second) + requireModelReplicas(t, m, 1, "Replicas should be scaled up to 1 to process messaging request", 5*time.Second) + requireModelPods(t, m, 1, "Pod should be created for the messaging request", 5*time.Second) markAllModelPodsReady(t, m) completeBackendRequests(backendComplete, 1) diff --git a/test/integration/model_pod_recovery_test.go b/test/integration/model_pod_recovery_test.go index e15e3974..b2c0554c 100644 --- a/test/integration/model_pod_recovery_test.go +++ b/test/integration/model_pod_recovery_test.go @@ -4,7 +4,6 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" "k8s.io/utils/ptr" @@ -22,21 +21,17 @@ func TestModelPodRecovery(t *testing.T) { require.NoError(t, testK8sClient.Create(testCtx, m)) // Expect 3 Pods to be created. - requireModelPods(t, m, 3, "3 Pods should be created", 2*time.Second) + requireModelPods(t, m, 3, "3 Pods should be created", 5*time.Second) - // Delete pod-1. + podList := &corev1.PodList{} + require.NoError(t, testK8sClient.List(testCtx, podList, client.InNamespace(testNS), client.MatchingLabels{"model": m.Name})) + + // Delete pod[1]. pod1 := &corev1.Pod{} - require.NoError(t, testK8sClient.Get(testCtx, client.ObjectKey{Namespace: testNS, Name: "model-" + m.Name + "-1"}, pod1)) + require.NoError(t, testK8sClient.Get(testCtx, client.ObjectKeyFromObject(&podList.Items[1]), pod1)) require.NoError(t, testK8sClient.Delete(testCtx, pod1)) - origUID := pod1.UID - - require.Eventually(t, func() bool { - newPod1 := &corev1.Pod{} - if !assert.NoError(t, testK8sClient.Get(testCtx, client.ObjectKey{Namespace: testNS, Name: "model-" + m.Name + "-1"}, newPod1)) { - return false - } - return newPod1.UID != origUID - }, time.Second, 100*time.Millisecond, "Pod-1 should be recreated") - requireModelPods(t, m, 3, "3 Pods should be exist again", 2*time.Second) + // NOTE: Wait time needs to be long enough to take into account + // the rate limiting the Model Reconciler has for Pod updates. + requireModelPods(t, m, 3, "3 Pods should be exist again", 5*time.Second) } diff --git a/test/integration/model_pod_update_rollout_test.go b/test/integration/model_pod_update_rollout_test.go index 268ac39a..d08991bf 100644 --- a/test/integration/model_pod_update_rollout_test.go +++ b/test/integration/model_pod_update_rollout_test.go @@ -6,6 +6,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/substratusai/kubeai/internal/k8sutils" corev1 "k8s.io/api/core/v1" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" @@ -22,27 +23,39 @@ func TestModelPodUpdateRollout(t *testing.T) { require.NoError(t, testK8sClient.Create(testCtx, m)) // Expect 3 Pods to be created. - requireModelPods(t, m, 3, "3 Pods should be created", 2*time.Second) + // NOTE: Wait time needs to be long enough to take into account + // the rate limiting the Model Reconciler has for Pod updates. + requireModelPods(t, m, 3, "3 Pods should be created", 5*time.Second) // Update the Model object. const newArg = "--my-new-arg-added-in-testcase" updateModel(t, m, func() { m.Spec.Args = []string{newArg} }, "Adding a new arg to the Model") // Expect 3 Pods to be created with the new arg. - require.Eventually(t, func() bool { + require.EventuallyWithT(t, func(t *assert.CollectT) { podList := &corev1.PodList{} if !assert.NoError(t, testK8sClient.List(testCtx, podList, client.InNamespace(testNS), client.MatchingLabels{"model": m.Name})) { - return false + return } for _, pod := range podList.Items { - if !assert.Contains(t, pod.Spec.Containers[0].Args, newArg, "Pod should have the new arg") { - return false + if !k8sutils.PodIsReady(&pod) { + // Update Pod to be Ready so Rollout can proceed. + pod.Status.Conditions = append(pod.Status.Conditions, corev1.PodCondition{ + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }) + assert.NoError(t, testK8sClient.Status().Update(testCtx, &pod)) + return + } + if !assert.Containsf(t, pod.Spec.Containers[0].Args, newArg, "Pod %q should have the new arg", pod.Name) { + return } } assert.Equal(t, 3, len(podList.Items), "Exactly 3 Pods should exist") - return true - }, time.Second, 100*time.Millisecond, "Exactly 3 Pods should exist (with the new arg)") + // NOTE: Wait time needs to be long enough to take into account + // the rate limiting the Model Reconciler has for Pod updates. + }, 15*time.Second, 1*time.Second, "Exactly 3 Pods should exist (with the new arg)") } diff --git a/test/integration/model_profiles_test.go b/test/integration/model_profiles_test.go index 3e39dc9e..97083ce4 100644 --- a/test/integration/model_profiles_test.go +++ b/test/integration/model_profiles_test.go @@ -33,8 +33,12 @@ func TestModelProfiles(t *testing.T) { var pod *corev1.Pod require.EventuallyWithT(t, func(t *assert.CollectT) { podList := &corev1.PodList{} - assert.NoError(t, testK8sClient.List(testCtx, podList, client.InNamespace(testNS), client.MatchingLabels{"model": m.Name})) - assert.Len(t, podList.Items, 1) + if !assert.NoError(t, testK8sClient.List(testCtx, podList, client.InNamespace(testNS), client.MatchingLabels{"model": m.Name})) { + return + } + if !assert.Len(t, podList.Items, 1) { + return + } pod = &podList.Items[0] // The Pod should have a single container named "server". @@ -43,11 +47,13 @@ func TestModelProfiles(t *testing.T) { assert.Contains(t, pod.Spec.Tolerations, sysCfg().ResourceProfiles[resourceProfileCPU].Tolerations[0]) assert.Equal(t, sysCfg().ResourceProfiles[resourceProfileCPU].Affinity, pod.Spec.Affinity) assert.Equal(t, sysCfg().ResourceProfiles[resourceProfileCPU].NodeSelector, pod.Spec.NodeSelector) - }, 2*time.Second, time.Second/10, "Resource profile should be applied to the model Pod object") + }, 5*time.Second, time.Second/10, "Resource profile should be applied to the model Pod object") const userImage = "my-repo.com/my-repo/my-image:latest" require.EventuallyWithT(t, func(t *assert.CollectT) { - assert.NoError(t, testK8sClient.Get(testCtx, client.ObjectKeyFromObject(m), m)) + if !assert.NoError(t, testK8sClient.Get(testCtx, client.ObjectKeyFromObject(m), m)) { + return + } m.Spec.Image = userImage require.NoError(t, testK8sClient.Update(testCtx, m)) }, 2*time.Second, time.Second/10, "Update model with user specified image") @@ -55,12 +61,16 @@ func TestModelProfiles(t *testing.T) { require.NoError(t, testK8sClient.Delete(testCtx, pod)) require.EventuallyWithT(t, func(t *assert.CollectT) { podList := &corev1.PodList{} - assert.NoError(t, testK8sClient.List(testCtx, podList, client.InNamespace(testNS), client.MatchingLabels{"model": m.Name})) - assert.Len(t, podList.Items, 1) + if !assert.NoError(t, testK8sClient.List(testCtx, podList, client.InNamespace(testNS), client.MatchingLabels{"model": m.Name})) { + return + } + if !assert.Len(t, podList.Items, 1) { + return + } pod = &podList.Items[0] // The Pod should have a single container named "server". container := mustFindPodContainerByName(t, pod, "server") assert.Equal(t, userImage, container.Image) - }, 2*time.Second, time.Second/10, "User specified image should be respected") + }, 15*time.Second, time.Second/10, "User specified image should be respected") } diff --git a/test/integration/model_scaling_bounds_test.go b/test/integration/model_scaling_bounds_test.go index 7e6e059f..311da330 100644 --- a/test/integration/model_scaling_bounds_test.go +++ b/test/integration/model_scaling_bounds_test.go @@ -46,7 +46,7 @@ func TestModelScalingBounds(t *testing.T) { // Update the Model object to set MinReplicas to 1. updateModel(t, m, func() { m.Spec.MinReplicas = 1 }, "MinReplicas=1") - requireModelReplicas(t, m, 1, "Replicas should be scaled up to MinReplicas after update", time.Second) + requireModelReplicas(t, m, 1, "Replicas should be scaled up to MinReplicas after update", 5*time.Second) // Check that a Pod was created for the Model. require.EventuallyWithT(t, func(t *assert.CollectT) { @@ -60,5 +60,5 @@ func TestModelScalingBounds(t *testing.T) { updateModel(t, m, func() { m.Spec.Replicas = ptr.To(*m.Spec.MaxReplicas + 1) }, "Replicas > MaxReplicas") // Model should scale down to MaxReplicas. - requireModelReplicas(t, m, *m.Spec.MaxReplicas, "Replicas should be scaled down to MaxReplicas", time.Second) + requireModelReplicas(t, m, *m.Spec.MaxReplicas, "Replicas should be scaled down to MaxReplicas", 5*time.Second) } diff --git a/test/integration/proxy_test.go b/test/integration/proxy_test.go index 8dcc8300..05a1e51c 100644 --- a/test/integration/proxy_test.go +++ b/test/integration/proxy_test.go @@ -69,18 +69,18 @@ vllm:num_requests_waiting{model_name="%s"} %d.0 var wg sync.WaitGroup sendRequests(t, &wg, m.Name, 1, http.StatusOK) - requireModelReplicas(t, m, 1, "Replicas should be scaled up to 1 to process messaging request", time.Second) - requireModelPods(t, m, 1, "Pod should be created for the messaging request", time.Second) + requireModelReplicas(t, m, 1, "Replicas should be scaled up to 1 to process messaging request", 5*time.Second) + requireModelPods(t, m, 1, "Pod should be created for the messaging request", 5*time.Second) markAllModelPodsReady(t, m) completeRequests(backendComplete, 1) require.Equal(t, int32(1), totalBackendRequests.Load(), "ensure the request made its way to the backend") - const autoscaleUpWait = 10 * time.Second + const autoscaleUpWait = 15 * time.Second // Ensure the deployment is autoscaled past 1. // Simulate the backend processing the request. sendRequests(t, &wg, m.Name, 2, http.StatusOK) requireModelReplicas(t, m, 2, "Replicas should be scaled up to 2 to process pending messaging request", autoscaleUpWait) - requireModelPods(t, m, 2, "2 Pods should be created for the messaging requests", time.Second) + requireModelPods(t, m, 2, "2 Pods should be created for the messaging requests", 5*time.Second) markAllModelPodsReady(t, m) // Make sure deployment will not be scaled past max (3). @@ -94,9 +94,9 @@ vllm:num_requests_waiting{model_name="%s"} %d.0 require.Equal(t, int32(5), totalBackendRequests.Load(), "ensure all the requests made their way to the backend") // Ensure the deployment is autoscaled back down to MinReplicas. - const autoscaleDownWait = 10 * time.Second + const autoscaleDownWait = 15 * time.Second requireModelReplicas(t, m, m.Spec.MinReplicas, "Replicas should scale back to MinReplicas", autoscaleDownWait) - requireModelPods(t, m, int(m.Spec.MinReplicas), "Pods should be removed", time.Second) + requireModelPods(t, m, int(m.Spec.MinReplicas), "Pods should be removed", 5*time.Second) t.Log("Waiting for all requests to complete") wg.Wait()