Skip to content

Commit

Permalink
feat: workload scaling policies (#357)
Browse files Browse the repository at this point in the history
  • Loading branch information
varnastadeus authored Aug 9, 2024
1 parent bbb5b3a commit 43426b4
Show file tree
Hide file tree
Showing 11 changed files with 6,122 additions and 1,870 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ examples/**/**/tf.vars
examples/localdev/*
!examples/localdev/main.tf.sample
!examples/localdev/versions.tf
e2e/**/.env
e2e/**/local.auto.tfvars
.env
terraform.tfstate
.DS_Store
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ format-tf:
.PHONY: generate-sdk
generate-sdk:
@echo "==> Generating castai sdk client"
@API_TAGS=ExternalClusterAPI,PoliciesAPI,NodeConfigurationAPI,NodeTemplatesAPI,AuthTokenAPI,ScheduledRebalancingAPI,InventoryAPI,UsersAPI,OperationsAPI,EvictorAPI,SSOAPI,CommitmentsAPI go generate castai/sdk/generate.go
@API_TAGS=ExternalClusterAPI,PoliciesAPI,NodeConfigurationAPI,NodeTemplatesAPI,AuthTokenAPI,ScheduledRebalancingAPI,InventoryAPI,UsersAPI,OperationsAPI,EvictorAPI,SSOAPI,CommitmentsAPI,WorkloadOptimizationAPI go generate castai/sdk/generate.go

# The following command also rewrites existing documentation
.PHONY: generate-docs
Expand Down
1 change: 1 addition & 0 deletions castai/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func Provider(version string) *schema.Provider {
"castai_commitments": resourceCommitments(),
"castai_organization_members": resourceOrganizationMembers(),
"castai_sso_connection": resourceSSOConnection(),
"castai_workload_scaling_policy": resourceWorkloadScalingPolicy(),
},

DataSourcesMap: map[string]*schema.Resource{
Expand Down
344 changes: 344 additions & 0 deletions castai/resource_workload_scaling_policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,344 @@
package castai

import (
"context"
"fmt"
"net/http"
"regexp"
"strings"
"time"

"github.com/google/uuid"
"github.com/hashicorp/terraform-plugin-log/tflog"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"

"github.com/castai/terraform-provider-castai/castai/sdk"
)

var (
k8sNameRegex = regexp.MustCompile("^[a-z0-9A-Z][a-z0-9A-Z._-]{0,61}[a-z0-9A-Z]$")
)

func resourceWorkloadScalingPolicy() *schema.Resource {
return &schema.Resource{
CreateContext: resourceWorkloadScalingPolicyCreate,
ReadContext: resourceWorkloadScalingPolicyRead,
UpdateContext: resourceWorkloadScalingPolicyUpdate,
DeleteContext: resourceWorkloadScalingPolicyDelete,
CustomizeDiff: resourceWorkloadScalingPolicyDiff,
Importer: &schema.ResourceImporter{
StateContext: workloadScalingPolicyImporter,
},
Description: "Manage workload scaling policy. Scaling policy [reference](https://docs.cast.ai/docs/woop-scaling-policies)",
Schema: map[string]*schema.Schema{
FieldClusterID: {
Type: schema.TypeString,
Required: true,
ForceNew: true,
Description: "CAST AI cluster id",
ValidateDiagFunc: validation.ToDiagFunc(validation.IsUUID),
},
"name": {
Type: schema.TypeString,
Required: true,
Description: "Scaling policy name",
ValidateDiagFunc: validation.ToDiagFunc(validation.StringMatch(k8sNameRegex, "name must adhere to the format guidelines of Kubernetes labels/annotations")),
},
"apply_type": {
Type: schema.TypeString,
Required: true,
Description: `Recommendation apply type.
- IMMEDIATE - pods are restarted immediately when new recommendation is generated.
- DEFERRED - pods are not restarted and recommendation values are applied during natural restarts only (new deployment, etc.)`,
ValidateDiagFunc: validation.ToDiagFunc(validation.StringInSlice([]string{"IMMEDIATE", "DEFERRED"}, false)),
},
"management_option": {
Type: schema.TypeString,
Required: true,
Description: `Defines possible options for workload management.
- READ_ONLY - workload watched (metrics collected), but no actions performed by CAST AI.
- MANAGED - workload watched (metrics collected), CAST AI may perform actions on the workload.`,
ValidateDiagFunc: validation.ToDiagFunc(validation.StringInSlice([]string{"READ_ONLY", "MANAGED"}, false)),
},
"cpu": {
Type: schema.TypeList,
Required: true,
MaxItems: 1,
Elem: workloadScalingPolicyResourceSchema("QUANTILE", 0),
},
"memory": {
Type: schema.TypeList,
Required: true,
MaxItems: 1,
Elem: workloadScalingPolicyResourceSchema("MAX", 0.1),
},
},
Timeouts: &schema.ResourceTimeout{
Create: schema.DefaultTimeout(15 * time.Second),
Read: schema.DefaultTimeout(15 * time.Second),
Update: schema.DefaultTimeout(15 * time.Second),
Delete: schema.DefaultTimeout(15 * time.Second),
},
}
}

func workloadScalingPolicyResourceSchema(function string, overhead float64) *schema.Resource {
return &schema.Resource{
Schema: map[string]*schema.Schema{
"function": {
Type: schema.TypeString,
Optional: true,
Description: "The function used to calculate the resource recommendation. Supported values: `QUANTILE`, `MAX`",
Default: function,
ValidateDiagFunc: validation.ToDiagFunc(validation.StringInSlice([]string{"QUANTILE", "MAX"}, false)),
},
"args": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Description: "The arguments for the function - i.e. for `QUANTILE` this should be a [0, 1] float. " +
"`MAX` doesn't accept any args",
Elem: &schema.Schema{
Type: schema.TypeString,
},
},
"overhead": {
Type: schema.TypeFloat,
Optional: true,
Description: "Overhead for the recommendation, e.g. `0.1` will result in 10% higher recommendation",
Default: overhead,
ValidateDiagFunc: validation.ToDiagFunc(validation.FloatBetween(0, 1)),
},
"apply_threshold": {
Type: schema.TypeFloat,
Optional: true,
Description: "The threshold of when to apply the recommendation. Recommendation will be applied when " +
"diff of current requests and new recommendation is greater than set value",
Default: 0.1,
ValidateDiagFunc: validation.ToDiagFunc(validation.FloatBetween(0.01, 1)),
},
},
}
}

func resourceWorkloadScalingPolicyCreate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
client := meta.(*ProviderConfig).api

clusterID := d.Get(FieldClusterID).(string)
req := sdk.WorkloadOptimizationAPICreateWorkloadScalingPolicyJSONRequestBody{
Name: d.Get("name").(string),
ApplyType: sdk.WorkloadoptimizationV1ApplyType(d.Get("apply_type").(string)),
RecommendationPolicies: sdk.WorkloadoptimizationV1RecommendationPolicies{
ManagementOption: sdk.WorkloadoptimizationV1ManagementOption(d.Get("management_option").(string)),
},
}

if v, ok := d.GetOk("cpu"); ok {
req.RecommendationPolicies.Cpu = toWorkloadScalingPolicies(v.([]interface{})[0].(map[string]interface{}))
}

if v, ok := d.GetOk("memory"); ok {
req.RecommendationPolicies.Memory = toWorkloadScalingPolicies(v.([]interface{})[0].(map[string]interface{}))
}

resp, err := client.WorkloadOptimizationAPICreateWorkloadScalingPolicyWithResponse(ctx, clusterID, req)
if checkErr := sdk.CheckOKResponse(resp, err); checkErr != nil {
return diag.FromErr(checkErr)
}

d.SetId(resp.JSON200.Id)

return resourceWorkloadScalingPolicyRead(ctx, d, meta)
}

func resourceWorkloadScalingPolicyRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
client := meta.(*ProviderConfig).api

clusterID := d.Get(FieldClusterID).(string)
resp, err := client.WorkloadOptimizationAPIGetWorkloadScalingPolicyWithResponse(ctx, clusterID, d.Id())
if err != nil {
return diag.FromErr(err)
}

if !d.IsNewResource() && resp.StatusCode() == http.StatusNotFound {
tflog.Warn(ctx, "Scaling policy not found, removing from state", map[string]interface{}{"id": d.Id()})
d.SetId("")
return nil
}
if err := sdk.CheckOKResponse(resp, err); err != nil {
return diag.FromErr(err)
}

sp := resp.JSON200

if err := d.Set("name", sp.Name); err != nil {
return diag.FromErr(fmt.Errorf("setting name: %w", err))
}
if err := d.Set("apply_type", sp.ApplyType); err != nil {
return diag.FromErr(fmt.Errorf("setting apply type: %w", err))
}
if err := d.Set("management_option", sp.RecommendationPolicies.ManagementOption); err != nil {
return diag.FromErr(fmt.Errorf("setting management option: %w", err))
}
if err := d.Set("cpu", toWorkloadScalingPoliciesMap(sp.RecommendationPolicies.Cpu)); err != nil {
return diag.FromErr(fmt.Errorf("setting cpu: %w", err))
}
if err := d.Set("memory", toWorkloadScalingPoliciesMap(sp.RecommendationPolicies.Memory)); err != nil {
return diag.FromErr(fmt.Errorf("setting memory: %w", err))
}

return nil
}

func resourceWorkloadScalingPolicyUpdate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
if !d.HasChanges(
"name",
"apply_type",
"management_option",
"cpu",
"memory",
) {
tflog.Info(ctx, "scaling policy up to date")
return nil
}

client := meta.(*ProviderConfig).api
clusterID := d.Get(FieldClusterID).(string)
req := sdk.WorkloadOptimizationAPIUpdateWorkloadScalingPolicyJSONBody{
Name: d.Get("name").(string),
ApplyType: sdk.WorkloadoptimizationV1ApplyType(d.Get("apply_type").(string)),
RecommendationPolicies: sdk.WorkloadoptimizationV1RecommendationPolicies{
ManagementOption: sdk.WorkloadoptimizationV1ManagementOption(d.Get("management_option").(string)),
Cpu: toWorkloadScalingPolicies(d.Get("cpu").([]interface{})[0].(map[string]interface{})),
Memory: toWorkloadScalingPolicies(d.Get("memory").([]interface{})[0].(map[string]interface{})),
},
}

resp, err := client.WorkloadOptimizationAPIUpdateWorkloadScalingPolicyWithResponse(ctx, clusterID, d.Id(), req)
if checkErr := sdk.CheckOKResponse(resp, err); checkErr != nil {
return diag.FromErr(checkErr)
}

return resourceWorkloadScalingPolicyRead(ctx, d, meta)
}

func resourceWorkloadScalingPolicyDelete(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
client := meta.(*ProviderConfig).api
clusterID := d.Get(FieldClusterID).(string)

resp, err := client.WorkloadOptimizationAPIGetWorkloadScalingPolicyWithResponse(ctx, clusterID, d.Id())
if err != nil {
return diag.FromErr(err)
}
if resp.StatusCode() == http.StatusNotFound {
tflog.Debug(ctx, "Scaling policy not found, skipping delete", map[string]interface{}{"id": d.Id()})
return nil
}
if err := sdk.StatusOk(resp); err != nil {
return diag.FromErr(err)
}

if resp.JSON200.IsReadonly || resp.JSON200.IsDefault {
tflog.Warn(ctx, "Default/readonly scaling policy can't be deleted, removing from state", map[string]interface{}{
"id": d.Id(),
})
return nil
}

delResp, err := client.WorkloadOptimizationAPIDeleteWorkloadScalingPolicyWithResponse(ctx, clusterID, d.Id())
if err != nil {
return diag.FromErr(err)
}
if err := sdk.StatusOk(delResp); err != nil {
return diag.FromErr(err)
}

return nil
}

func resourceWorkloadScalingPolicyDiff(_ context.Context, d *schema.ResourceDiff, _ interface{}) error {
// Since tf doesn't support cross field validation, doing it here.
cpu := toWorkloadScalingPolicies(d.Get("cpu").([]interface{})[0].(map[string]interface{}))
memory := toWorkloadScalingPolicies(d.Get("memory").([]interface{})[0].(map[string]interface{}))

if err := validateArgs(cpu, "cpu"); err != nil {
return err
}
return validateArgs(memory, "memory")
}

func validateArgs(r sdk.WorkloadoptimizationV1ResourcePolicies, res string) error {
if r.Function == "QUANTILE" && len(r.Args) == 0 {
return fmt.Errorf("field %q: QUANTILE function requires args to be provided", res)
}
if r.Function == "MAX" && len(r.Args) > 0 {
return fmt.Errorf("field %q: MAX function doesn't accept any args", res)
}
return nil
}

func workloadScalingPolicyImporter(ctx context.Context, d *schema.ResourceData, meta interface{}) ([]*schema.ResourceData, error) {
ids := strings.Split(d.Id(), "/")
if len(ids) != 2 || ids[0] == "" || ids[1] == "" {
return nil, fmt.Errorf("expected import id with format: <cluster_id>/<scaling_policy name or id>, got: %q", d.Id())
}

clusterID, nameOrID := ids[0], ids[1]
if err := d.Set(FieldClusterID, clusterID); err != nil {
return nil, fmt.Errorf("setting cluster nameOrID: %w", err)
}
d.SetId(nameOrID)

// Return if scaling policy ID provided.
if _, err := uuid.Parse(nameOrID); err == nil {
return []*schema.ResourceData{d}, nil
}

// Find scaling policy ID by name.
client := meta.(*ProviderConfig).api
resp, err := client.WorkloadOptimizationAPIListWorkloadScalingPoliciesWithResponse(ctx, clusterID)
if err := sdk.CheckOKResponse(resp, err); err != nil {
return nil, err
}

for _, sp := range resp.JSON200.Items {
if sp.Name == nameOrID {
d.SetId(sp.Id)
return []*schema.ResourceData{d}, nil
}
}

return nil, fmt.Errorf("failed to find workload scaling policy with the following name: %v", nameOrID)
}

func toWorkloadScalingPolicies(obj map[string]interface{}) sdk.WorkloadoptimizationV1ResourcePolicies {
out := sdk.WorkloadoptimizationV1ResourcePolicies{}

if v, ok := obj["function"].(string); ok {
out.Function = sdk.WorkloadoptimizationV1ResourcePoliciesFunction(v)
}
if v, ok := obj["args"].([]interface{}); ok && len(v) > 0 {
out.Args = toStringList(v)
}
if v, ok := obj["overhead"].(float64); ok {
out.Overhead = v
}
if v, ok := obj["apply_threshold"].(float64); ok {
out.ApplyThreshold = v
}

return out
}

func toWorkloadScalingPoliciesMap(p sdk.WorkloadoptimizationV1ResourcePolicies) []map[string]interface{} {
m := map[string]interface{}{
"function": p.Function,
"args": p.Args,
"overhead": p.Overhead,
"apply_threshold": p.ApplyThreshold,
}

return []map[string]interface{}{m}
}
Loading

0 comments on commit 43426b4

Please sign in to comment.