Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: workload scaling policies #357

Merged
merged 10 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ examples/**/**/tf.vars
examples/localdev/*
!examples/localdev/main.tf.sample
!examples/localdev/versions.tf
e2e/**/.env
e2e/**/local.auto.tfvars
.env
terraform.tfstate
.DS_Store
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ format-tf:
.PHONY: generate-sdk
generate-sdk:
@echo "==> Generating castai sdk client"
@API_TAGS=ExternalClusterAPI,PoliciesAPI,NodeConfigurationAPI,NodeTemplatesAPI,AuthTokenAPI,ScheduledRebalancingAPI,InventoryAPI,UsersAPI,OperationsAPI,EvictorAPI,SSOAPI,CommitmentsAPI go generate castai/sdk/generate.go
@API_TAGS=ExternalClusterAPI,PoliciesAPI,NodeConfigurationAPI,NodeTemplatesAPI,AuthTokenAPI,ScheduledRebalancingAPI,InventoryAPI,UsersAPI,OperationsAPI,EvictorAPI,SSOAPI,CommitmentsAPI,WorkloadOptimizationAPI go generate castai/sdk/generate.go

# The following command also rewrites existing documentation
.PHONY: generate-docs
Expand Down
1 change: 1 addition & 0 deletions castai/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func Provider(version string) *schema.Provider {
"castai_commitments": resourceCommitments(),
"castai_organization_members": resourceOrganizationMembers(),
"castai_sso_connection": resourceSSOConnection(),
"castai_workload_scaling_policy": resourceWorkloadScalingPolicy(),
},

DataSourcesMap: map[string]*schema.Resource{
Expand Down
341 changes: 341 additions & 0 deletions castai/resource_workload_scaling_policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,341 @@
package castai

import (
"context"
"fmt"
"log"
"net/http"
"regexp"
"strings"
"time"

"github.com/google/uuid"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"

"github.com/castai/terraform-provider-castai/castai/sdk"
)

var (
k8sNameRegex = regexp.MustCompile("^[a-z0-9A-Z][a-z0-9A-Z._-]{0,61}[a-z0-9A-Z]$")
)

func resourceWorkloadScalingPolicy() *schema.Resource {
return &schema.Resource{
CreateContext: resourceWorkloadScalingPolicyCreate,
ReadContext: resourceWorkloadScalingPolicyRead,
UpdateContext: resourceWorkloadScalingPolicyUpdate,
DeleteContext: resourceWorkloadScalingPolicyDelete,
CustomizeDiff: resourceWorkloadScalingPolicyDiff,
Importer: &schema.ResourceImporter{
StateContext: workloadScalingPolicyImporter,
},
Description: "Manage workload scaling policy. Scaling policy [reference](https://docs.cast.ai/docs/woop-scaling-policies)",
Schema: map[string]*schema.Schema{
FieldClusterID: {
varnastadeus marked this conversation as resolved.
Show resolved Hide resolved
Type: schema.TypeString,
Required: true,
ForceNew: true,
Description: "CAST AI cluster id",
ValidateDiagFunc: validation.ToDiagFunc(validation.IsUUID),
},
"name": {
Type: schema.TypeString,
Required: true,
Description: "Scaling policy name",
ValidateDiagFunc: validation.ToDiagFunc(validation.StringMatch(k8sNameRegex, "name must adhere to the format guidelines of Kubernetes labels/annotations")),
},
"apply_type": {
Type: schema.TypeString,
Required: true,
Description: `Recommendation apply type.
- IMMEDIATE - pods are restarted immediately when new recommendation is generated.
- DEFERRED - pods are not restarted and recommendation values are applied during natural restarts only (new deployment, etc.)`,
ValidateDiagFunc: validation.ToDiagFunc(validation.StringInSlice([]string{"IMMEDIATE", "DEFERRED"}, false)),
},
"management_option": {
Type: schema.TypeString,
Required: true,
Description: `Defines possible options for workload management.
- READ_ONLY - workload watched (metrics collected), but no actions performed by CAST AI.
- MANAGED - workload watched (metrics collected), CAST AI may perform actions on the workload.`,
ValidateDiagFunc: validation.ToDiagFunc(validation.StringInSlice([]string{"READ_ONLY", "MANAGED"}, false)),
},
"cpu": {
Type: schema.TypeList,
Required: true,
MaxItems: 1,
Elem: resourceSchema("QUANTILE", 0),
},
"memory": {
Type: schema.TypeList,
Required: true,
MaxItems: 1,
Elem: resourceSchema("MAX", 0.1),
},
},
Timeouts: &schema.ResourceTimeout{
Create: schema.DefaultTimeout(15 * time.Second),
Read: schema.DefaultTimeout(15 * time.Second),
Update: schema.DefaultTimeout(15 * time.Second),
Delete: schema.DefaultTimeout(15 * time.Second),
},
}
}

func resourceSchema(function string, overhead float64) *schema.Resource {
varnastadeus marked this conversation as resolved.
Show resolved Hide resolved
return &schema.Resource{
Schema: map[string]*schema.Schema{
"function": {
Type: schema.TypeString,
Optional: true,
Description: "The function used to calculate the resource recommendation. Supported values: `QUANTILE`, `MAX`",
Default: function,
ValidateDiagFunc: validation.ToDiagFunc(validation.StringInSlice([]string{"QUANTILE", "MAX"}, false)),
},
"args": {
Type: schema.TypeList,
varnastadeus marked this conversation as resolved.
Show resolved Hide resolved
Optional: true,
MaxItems: 1,
Description: "The arguments for the function - i.e. for `QUANTILE` this should be a [0, 1] float. " +
"`MAX` doesn't accept any args",
Elem: &schema.Schema{
Type: schema.TypeString,
},
},
"overhead": {
Type: schema.TypeFloat,
Optional: true,
Description: "Overhead for the recommendation, e.g. `0.1` will result in 10% higher recommendation",
Default: overhead,
ValidateDiagFunc: validation.ToDiagFunc(validation.FloatBetween(0, 1)),
},
"apply_threshold": {
Type: schema.TypeFloat,
Optional: true,
Description: "The threshold of when to apply the recommendation. Recommendation will be applied when " +
"diff of current requests and new recommendation is greater than set value",
Default: 0.1,
ValidateDiagFunc: validation.ToDiagFunc(validation.FloatBetween(0.01, 1)),
},
},
}
}

func resourceWorkloadScalingPolicyCreate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
client := meta.(*ProviderConfig).api

clusterID := d.Get(FieldClusterID).(string)
req := sdk.WorkloadOptimizationAPICreateWorkloadScalingPolicyJSONRequestBody{
Name: d.Get("name").(string),
ApplyType: sdk.WorkloadoptimizationV1ApplyType(d.Get("apply_type").(string)),
RecommendationPolicies: sdk.WorkloadoptimizationV1RecommendationPolicies{
ManagementOption: sdk.WorkloadoptimizationV1ManagementOption(d.Get("management_option").(string)),
},
}

if v, ok := d.GetOk("cpu"); ok {
req.RecommendationPolicies.Cpu = toResourcePolicies(v.([]interface{})[0].(map[string]interface{}))
varnastadeus marked this conversation as resolved.
Show resolved Hide resolved
}

if v, ok := d.GetOk("memory"); ok {
req.RecommendationPolicies.Memory = toResourcePolicies(v.([]interface{})[0].(map[string]interface{}))
}

resp, err := client.WorkloadOptimizationAPICreateWorkloadScalingPolicyWithResponse(ctx, clusterID, req)
if checkErr := sdk.CheckOKResponse(resp, err); checkErr != nil {
return diag.FromErr(checkErr)
}

d.SetId(resp.JSON200.Id)

return resourceWorkloadScalingPolicyRead(ctx, d, meta)
}

func resourceWorkloadScalingPolicyRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
client := meta.(*ProviderConfig).api

clusterID := d.Get(FieldClusterID).(string)
resp, err := client.WorkloadOptimizationAPIGetWorkloadScalingPolicyWithResponse(ctx, clusterID, d.Id())
if err != nil {
return diag.FromErr(err)
}

if !d.IsNewResource() && resp.StatusCode() == http.StatusNotFound {
log.Printf("[WARN] Scaling policy (%s) not found, removing from state", d.Id())
d.SetId("")
return nil
}
if err := sdk.CheckOKResponse(resp, err); err != nil {
return diag.FromErr(err)
}

sp := resp.JSON200

if err := d.Set("name", sp.Name); err != nil {
return diag.FromErr(fmt.Errorf("setting name: %w", err))
}
if err := d.Set("apply_type", sp.ApplyType); err != nil {
return diag.FromErr(fmt.Errorf("setting apply type: %w", err))
}
if err := d.Set("management_option", sp.RecommendationPolicies.ManagementOption); err != nil {
return diag.FromErr(fmt.Errorf("setting management option: %w", err))
}
if err := d.Set("cpu", toResourceMap(sp.RecommendationPolicies.Cpu)); err != nil {
return diag.FromErr(fmt.Errorf("setting cpu: %w", err))
}
if err := d.Set("memory", toResourceMap(sp.RecommendationPolicies.Memory)); err != nil {
return diag.FromErr(fmt.Errorf("setting memory: %w", err))
}

return nil
}

func resourceWorkloadScalingPolicyUpdate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
if !d.HasChanges(
"name",
"apply_type",
"management_option",
"cpu",
"memory",
) {
log.Printf("[INFO] scaling policy up to date")
varnastadeus marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

client := meta.(*ProviderConfig).api
clusterID := d.Get(FieldClusterID).(string)
req := sdk.WorkloadOptimizationAPIUpdateWorkloadScalingPolicyJSONBody{
Name: d.Get("name").(string),
ApplyType: sdk.WorkloadoptimizationV1ApplyType(d.Get("apply_type").(string)),
RecommendationPolicies: sdk.WorkloadoptimizationV1RecommendationPolicies{
ManagementOption: sdk.WorkloadoptimizationV1ManagementOption(d.Get("management_option").(string)),
Cpu: toResourcePolicies(d.Get("cpu").([]interface{})[0].(map[string]interface{})),
Memory: toResourcePolicies(d.Get("memory").([]interface{})[0].(map[string]interface{})),
varnastadeus marked this conversation as resolved.
Show resolved Hide resolved
},
}

resp, err := client.WorkloadOptimizationAPIUpdateWorkloadScalingPolicyWithResponse(ctx, clusterID, d.Id(), req)
if checkErr := sdk.CheckOKResponse(resp, err); checkErr != nil {
return diag.FromErr(checkErr)
}

return resourceWorkloadScalingPolicyRead(ctx, d, meta)
}

func resourceWorkloadScalingPolicyDelete(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
client := meta.(*ProviderConfig).api
clusterID := d.Get(FieldClusterID).(string)

resp, err := client.WorkloadOptimizationAPIGetWorkloadScalingPolicyWithResponse(ctx, clusterID, d.Id())
if err != nil {
return diag.FromErr(err)
}
if resp.StatusCode() == http.StatusNotFound {
log.Printf("[DEBUG] Scaling policy (%s) not found, skipping delete", d.Id())
return nil
}
if err := sdk.StatusOk(resp); err != nil {
return diag.FromErr(err)
}

delResp, err := client.WorkloadOptimizationAPIDeleteWorkloadScalingPolicyWithResponse(ctx, clusterID, d.Id())
if err != nil {
return diag.FromErr(err)
}
if delResp.StatusCode() == http.StatusBadRequest {
log.Printf("[WARN] Scaling policy has active workloads (%s) and can't be deleted, removing from state", d.Id())
return nil
varnastadeus marked this conversation as resolved.
Show resolved Hide resolved
}
varnastadeus marked this conversation as resolved.
Show resolved Hide resolved
if err := sdk.StatusOk(delResp); err != nil {
return diag.FromErr(err)
}

return nil
}

func resourceWorkloadScalingPolicyDiff(_ context.Context, d *schema.ResourceDiff, _ interface{}) error {
varnastadeus marked this conversation as resolved.
Show resolved Hide resolved
// Since tf doesn't support cross field validation, doing it here.
cpu := toResourcePolicies(d.Get("cpu").([]interface{})[0].(map[string]interface{}))
memory := toResourcePolicies(d.Get("memory").([]interface{})[0].(map[string]interface{}))

if err := validateArgs(cpu, "cpu"); err != nil {
return err
}
return validateArgs(memory, "memory")
}

func validateArgs(r sdk.WorkloadoptimizationV1ResourcePolicies, res string) error {
if r.Function == "QUANTILE" && len(r.Args) == 0 {
return fmt.Errorf("field %q: QUANTILE function requires args to be provided", res)
}
UndeadRat22 marked this conversation as resolved.
Show resolved Hide resolved
if r.Function == "MAX" && len(r.Args) > 0 {
return fmt.Errorf("field %q: MAX function doesn't accept any args", res)
}
return nil
}

func workloadScalingPolicyImporter(ctx context.Context, d *schema.ResourceData, meta interface{}) ([]*schema.ResourceData, error) {
ids := strings.Split(d.Id(), "/")
if len(ids) != 2 || ids[0] == "" || ids[1] == "" {
return nil, fmt.Errorf("expected import id with format: <cluster_id>/<scaling_policy name or id>, got: %q", d.Id())
}

clusterID, id := ids[0], ids[1]
if err := d.Set(FieldClusterID, clusterID); err != nil {
return nil, fmt.Errorf("setting cluster id: %w", err)
}
d.SetId(id)

// Return if scaling policy ID provided.
if _, err := uuid.Parse(id); err == nil {
return []*schema.ResourceData{d}, nil
UOndro marked this conversation as resolved.
Show resolved Hide resolved
}

// Find scaling policy ID by name.
client := meta.(*ProviderConfig).api
resp, err := client.WorkloadOptimizationAPIListWorkloadScalingPoliciesWithResponse(ctx, clusterID)
if err := sdk.CheckOKResponse(resp, err); err != nil {
return nil, err
}

for _, sp := range resp.JSON200.Items {
if sp.Name == id {
d.SetId(sp.Id)
return []*schema.ResourceData{d}, nil
}
}

return nil, fmt.Errorf("failed to find workload scaling policy with the following name: %v", id)
varnastadeus marked this conversation as resolved.
Show resolved Hide resolved
}

func toResourcePolicies(obj map[string]interface{}) sdk.WorkloadoptimizationV1ResourcePolicies {
out := sdk.WorkloadoptimizationV1ResourcePolicies{}

if v, ok := obj["function"].(string); ok {
out.Function = sdk.WorkloadoptimizationV1ResourcePoliciesFunction(v)
}
if v, ok := obj["args"].([]interface{}); ok && len(v) > 0 {
out.Args = toStringList(v)
}
if v, ok := obj["overhead"].(float64); ok {
out.Overhead = v
}
if v, ok := obj["apply_threshold"].(float64); ok {
out.ApplyThreshold = v
}

return out
}

func toResourceMap(p sdk.WorkloadoptimizationV1ResourcePolicies) []map[string]interface{} {
varnastadeus marked this conversation as resolved.
Show resolved Hide resolved
m := map[string]interface{}{
"function": p.Function,
"args": p.Args,
"overhead": p.Overhead,
"apply_threshold": p.ApplyThreshold,
}

return []map[string]interface{}{m}
}
Loading
Loading