Skip to content

Commit

Permalink
Add Autoscaler State ConfigMap
Browse files Browse the repository at this point in the history
  • Loading branch information
nstogner committed Oct 9, 2024
1 parent 60bc02e commit ce91674
Show file tree
Hide file tree
Showing 19 changed files with 274 additions and 32 deletions.
7 changes: 7 additions & 0 deletions charts/kubeai/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,11 @@ Create the name of the huggingface secret to use
{{- end }}
{{- .Values.secrets.huggingface.name }}
{{- end }}
{{- end }}

{{/*
Set the name of the configmap to use for storing model autosacling state
*/}}
{{- define "models.autoscalerStateConfigMapName" -}}
{{- default (printf "%s-autoscaler-state" (include "kubeai.fullname" .)) .Values.modelAutoscaling.stateConfigMapName }}
{{- end }}
4 changes: 4 additions & 0 deletions charts/kubeai/templates/autoscalerstateconfigmap.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ include "models.autoscalerStateConfigMapName" . }}
4 changes: 3 additions & 1 deletion charts/kubeai/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ data:
{{- end}}
serviceAccountName: {{ include "models.serviceAccountName" . }}
modelAutoscaling:
{{- .Values.modelAutoscaling | toYaml | nindent 6 }}
interval: {{ .Values.modelAutoscaling.interval }}
timeWindow: {{ .Values.modelAutoscaling.timeWindow }}
stateConfigMapName: {{ include "models.autoscalerStateConfigMapName" . }}
messaging:
{{- .Values.messaging | toYaml | nindent 6 }}
3 changes: 3 additions & 0 deletions charts/kubeai/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ modelAutoscaling:
# Time window the autoscaling algorithm will consider when calculating
# the desired number of replicas.
timeWindow: 10m
# The name of the ConfigMap that stores the state of the autoscaler.
# Defaults to "{fullname}-autoscaler-state".
# stateConfigMapName: "kubeai-autoscaler-state"

messaging:
errorMaxBackoff: 30s
Expand Down
9 changes: 9 additions & 0 deletions internal/config/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ func (s *System) DefaultAndValidate() error {
if s.ModelAutoscaling.TimeWindow.Duration == 0 {
s.ModelAutoscaling.TimeWindow.Duration = 10 * time.Minute
}
if s.ModelAutoscaling.StateConfigMapName == "" {
s.ModelAutoscaling.StateConfigMapName = "kubeai-autoscaler-state"
}

if s.LeaderElection.LeaseDuration.Duration == 0 {
s.LeaderElection.LeaseDuration.Duration = 15 * time.Second
Expand Down Expand Up @@ -116,6 +119,12 @@ type ModelAutoscaling struct {
// calculating the average number of requests.
// Defaults to 10 minutes.
TimeWindow Duration `json:"timeWindow" validate:"required"`
// StateConfigMapName is the name of the ConfigMap that will be used
// to store the state of the autoscaler. This ConfigMap ensures that
// the autoscaler can recover from crashes and restarts without losing
// its state.
// Defaults to "kubeai-autoscaler-state".
StateConfigMapName string `json:"stateConfigMapName" validate:"required"`
}

// RequiredConsecutiveScaleDowns returns the number of consecutive scale down
Expand Down
25 changes: 20 additions & 5 deletions internal/manager/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (

"github.com/prometheus/client_golang/prometheus/promhttp"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/healthz"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

Expand Down Expand Up @@ -171,6 +173,13 @@ func Run(ctx context.Context, k8sCfg *rest.Config, cfg config.System) error {
return fmt.Errorf("unable to create clientset: %w", err)
}

// Create a new client for the autoscaler to use. This client will not use
// a cache and will be ready to user immediately.
k8sClient, err := client.New(mgr.GetConfig(), client.Options{Scheme: mgr.GetScheme()})
if err != nil {
return fmt.Errorf("unable to create client: %w", err)
}

hostname, err := os.Hostname()
if err != nil {
return fmt.Errorf("unable to get hostname: %w", err)
Expand All @@ -181,7 +190,7 @@ func Run(ctx context.Context, k8sCfg *rest.Config, cfg config.System) error {
cfg.LeaderElection.RetryPeriod.Duration,
)

endpointManager, err := endpoints.NewResolver(mgr)
endpointResolver, err := endpoints.NewResolver(mgr)
if err != nil {
return fmt.Errorf("unable to setup model resolver: %w", err)
}
Expand Down Expand Up @@ -216,16 +225,22 @@ func Run(ctx context.Context, k8sCfg *rest.Config, cfg config.System) error {
return fmt.Errorf("unable to parse metrics port: %w", err)
}

modelAutoscaler := modelautoscaler.New(
modelAutoscaler, err := modelautoscaler.New(
ctx,
k8sClient,
leaderElection,
modelScaler,
endpointManager,
endpointResolver,
cfg.ModelAutoscaling,
metricsPort,
types.NamespacedName{Name: cfg.ModelAutoscaling.StateConfigMapName, Namespace: namespace},
cfg.FixedSelfMetricAddrs,
)
if err != nil {
return fmt.Errorf("unable to create model autoscaler: %w", err)
}

modelProxy := modelproxy.NewHandler(modelScaler, endpointManager, 3, nil)
modelProxy := modelproxy.NewHandler(modelScaler, endpointResolver, 3, nil)
openaiHandler := openaiserver.NewHandler(mgr.GetClient(), modelProxy)
mux := http.NewServeMux()
mux.Handle("/openai/", openaiHandler)
Expand Down Expand Up @@ -253,7 +268,7 @@ func Run(ctx context.Context, k8sCfg *rest.Config, cfg config.System) error {
stream.MaxHandlers,
cfg.Messaging.ErrorMaxBackoff.Duration,
modelScaler,
endpointManager,
endpointResolver,
httpClient,
)
if err != nil {
Expand Down
60 changes: 51 additions & 9 deletions internal/modelautoscaler/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,56 @@ import (
"github.com/substratusai/kubeai/internal/leader"
"github.com/substratusai/kubeai/internal/modelscaler"
"github.com/substratusai/kubeai/internal/movingaverage"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func New(
ctx context.Context,
k8sClient client.Client,
leaderElection *leader.Election,
scaler *modelscaler.ModelScaler,
resolver *endpoints.Resolver,
cfg config.ModelAutoscaling,
metricsPort int,
stateConfigMapRef types.NamespacedName,
fixedSelfMetricAddrs []string,
) *Autoscaler {
) (*Autoscaler, error) {
a := &Autoscaler{
k8sClient: k8sClient,
leaderElection: leaderElection,
scaler: scaler,
resolver: resolver,
movingAvgByModel: map[string]*movingaverage.Simple{},
cfg: cfg,
metricsPort: metricsPort,
stateConfigMapRef: stateConfigMapRef,
fixedSelfMetricAddrs: fixedSelfMetricAddrs,
}
return a
lastModelState, err := a.loadLastTotalModelState(ctx)
if err != nil {
return nil, fmt.Errorf("loading last state of models: %w", err)
}
log.Printf("Loaded last state of models: %d total, last calculated on %s", len(lastModelState.Models), lastModelState.LastCalculationTime)
for m, s := range lastModelState.Models {
// Preload moving averages with the last known state.
// If the last known state was 5.5, the preloaded moving average
// would look like [5.5, 5.5, 5.5, ...].
preloaded := newPrefilledFloat64Slice(a.cfg.AverageWindowCount(), s.AverageActiveRequests)
a.movingAvgByModel[m] = movingaverage.NewSimple(preloaded)
}

return a, nil
}

// Autoscaler is responsible for making continuous adjustments to
// the scale of the backend. It is not responsible for scale-from-zero.
// Each deployment has its own autoscaler.
type Autoscaler struct {
k8sClient client.Client

stateConfigMapRef types.NamespacedName

leaderElection *leader.Election

scaler *modelscaler.ModelScaler
Expand Down Expand Up @@ -76,6 +100,8 @@ func (a *Autoscaler) Start(ctx context.Context) {
continue
}

nextModelState := newTotalModelState()

var selfAddrs []string
if len(a.fixedSelfMetricAddrs) > 0 {
selfAddrs = a.fixedSelfMetricAddrs
Expand Down Expand Up @@ -121,17 +147,33 @@ func (a *Autoscaler) Start(ctx context.Context) {
log.Printf("Calculated target replicas for model %q: ceil(%v/%v) = %v, current requests: sum(%v) = %v, history: %v",
m.Name, avgActiveRequests, *m.Spec.TargetRequests, ceil, activeRequests, activeRequestSum, avg.History())
a.scaler.Scale(ctx, &m, int32(ceil), a.cfg.RequiredConsecutiveScaleDowns(*m.Spec.ScaleDownDelaySeconds))

nextModelState.Models[m.Name] = modelState{
AverageActiveRequests: avgActiveRequests,
}
}

if err := a.saveTotalModelState(ctx, nextModelState); err != nil {
log.Printf("Failed to save model state: %v", err)
}
}
}

func (r *Autoscaler) getMovingAvgActiveReqPerModel(model string) *movingaverage.Simple {
r.movingAvgByModelMtx.Lock()
a, ok := r.movingAvgByModel[model]
func (a *Autoscaler) getMovingAvgActiveReqPerModel(model string) *movingaverage.Simple {
a.movingAvgByModelMtx.Lock()
avg, ok := a.movingAvgByModel[model]
if !ok {
a = movingaverage.NewSimple(make([]float64, r.cfg.AverageWindowCount()))
r.movingAvgByModel[model] = a
avg = movingaverage.NewSimple(make([]float64, a.cfg.AverageWindowCount()))
a.movingAvgByModel[model] = avg
}
a.movingAvgByModelMtx.Unlock()
return avg
}

func newPrefilledFloat64Slice(length int, value float64) []float64 {
s := make([]float64, length)
for i := range s {
s[i] = value
}
r.movingAvgByModelMtx.Unlock()
return a
return s
}
65 changes: 65 additions & 0 deletions internal/modelautoscaler/state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package modelautoscaler

import (
"context"
"encoding/json"
"fmt"
"log"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func newTotalModelState() totalModelState {
return totalModelState{
Models: make(map[string]modelState),
LastCalculationTime: time.Now(),
}
}

type totalModelState struct {
Models map[string]modelState `json:"models"`
LastCalculationTime time.Time `json:"lastCalculationTime"`
}

type modelState struct {
AverageActiveRequests float64 `json:"averageActiveRequests"`
}

func (a *Autoscaler) loadLastTotalModelState(ctx context.Context) (totalModelState, error) {
cm := &corev1.ConfigMap{}
if err := a.k8sClient.Get(ctx, a.stateConfigMapRef, cm); err != nil {
return totalModelState{}, fmt.Errorf("get ConfigMap %q: %w", a.stateConfigMapRef, err)
}
const key = "models"
jsonState, ok := cm.Data[key]
if !ok {
log.Printf("Autoscaler state ConfigMap %q has no key %q, state not loaded", key, a.stateConfigMapRef)
return totalModelState{}, nil
}
tms := totalModelState{}
if err := json.Unmarshal([]byte(jsonState), &tms); err != nil {
return totalModelState{}, fmt.Errorf("unmarshalling state: %w", err)
}
return tms, nil
}

func (a *Autoscaler) saveTotalModelState(ctx context.Context, state totalModelState) error {
jsonState, err := json.Marshal(state)
if err != nil {
return fmt.Errorf("marshalling state: %w", err)
}
patch := fmt.Sprintf(`{"data":{"models":%q}}`, string(jsonState))
if err := a.k8sClient.Patch(ctx, &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: a.stateConfigMapRef.Namespace,
Name: a.stateConfigMapRef.Name,
},
}, client.RawPatch(types.StrategicMergePatchType, []byte(patch))); err != nil {
return fmt.Errorf("patching ConfigMap %q: %w", a.stateConfigMapRef, err)
}
return nil
}
71 changes: 71 additions & 0 deletions test/integration/autoscaler_state_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package integration

import (
"encoding/json"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/substratusai/kubeai/internal/config"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
)

// TestAutoscalerState tests the autoscaler's state management.
func TestAutoscalerState(t *testing.T) {
m := modelForTest(t)
m.Spec.MaxReplicas = ptr.To[int32](100)
m.Spec.TargetRequests = ptr.To[int32](1)
m.Spec.ScaleDownDelaySeconds = ptr.To[int64](2)

ms := newTestMetricsServer(t, m.Name)

sysCfg := baseSysCfg(t)
sysCfg.ModelAutoscaling.TimeWindow = config.Duration{Duration: 1 * time.Second}
sysCfg.ModelAutoscaling.Interval = config.Duration{Duration: time.Second / 4}
sysCfg.FixedSelfMetricAddrs = []string{
ms.addr(),
}
t.Logf("FixedSelfMetricAddrs: %v", sysCfg.FixedSelfMetricAddrs)
initTest(t, sysCfg)

ms.activeRequests.Store(2)

// Create the Model object in the Kubernetes cluster.
require.NoError(t, testK8sClient.Create(testCtx, m))

requireModelReplicas(t, m, 2, "Replicas should be autoscaled", 15*time.Second)

// Assert that state was saved.
cm := &corev1.ConfigMap{}
require.EventuallyWithT(t, func(t *assert.CollectT) {
err := testK8sClient.Get(testCtx, types.NamespacedName{Name: sysCfg.ModelAutoscaling.StateConfigMapName, Namespace: testNS}, cm)
if !assert.NoError(t, err) {
return
}
if !assert.NotNil(t, cm.Data) {
return
}
data, ok := cm.Data["models"]
if !assert.True(t, ok) {
return
}
type modelState struct {
AverageActiveRequests float64 `json:"averageActiveRequests"`
}
var state struct {
Models map[string]modelState `json:"models"`
}
if !assert.NoError(t, json.Unmarshal([]byte(data), &state)) {
return
}
if !assert.Len(t, state.Models, 1) {
return
}
assert.Equal(t, state.Models[m.Name], modelState{
AverageActiveRequests: 2.0,
})
}, 15*time.Second, 1*time.Second)
}
2 changes: 1 addition & 1 deletion test/integration/autoscaling_ha_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestAutoscalingHA(t *testing.T) {
s2 := newTestMetricsServer(t, m.Name)
s3 := newTestMetricsServer(t, m.Name)

sysCfg := baseSysCfg()
sysCfg := baseSysCfg(t)
sysCfg.ModelAutoscaling.TimeWindow = config.Duration{Duration: 1 * time.Second}
sysCfg.ModelAutoscaling.Interval = config.Duration{Duration: time.Second / 4}
sysCfg.FixedSelfMetricAddrs = []string{
Expand Down
Loading

0 comments on commit ce91674

Please sign in to comment.