Skip to content

Commit

Permalink
Added no_wait option for clusters to skip waiting to start on cluster…
Browse files Browse the repository at this point in the history
… creation
  • Loading branch information
andrewnester committed Aug 27, 2024
1 parent c735629 commit 512e055
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 6 deletions.
35 changes: 29 additions & 6 deletions clusters/resource_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,17 @@ func (ClusterSpec) CustomizeSchemaResourceSpecific(s *common.CustomizableSchema)
return old == new
},
})
s.AddNewField("no_wait", &schema.Schema{
Type: schema.TypeBool,
Optional: true,
Default: false,
DiffSuppressFunc: func(k, old, new string, d *schema.ResourceData) bool {
if old == "" && new == "false" {
return true
}
return old == new
},
})
s.AddNewField("state", &schema.Schema{
Type: schema.TypeString,
Computed: true,
Expand Down Expand Up @@ -414,11 +425,8 @@ func resourceClusterCreate(ctx context.Context, d *schema.ResourceData, c *commo
if err != nil {
return err
}
clusterInfo, err := clusterWaiter.GetWithTimeout(timeout)
if err != nil {
return err
}
d.SetId(clusterInfo.ClusterId)

d.SetId(clusterWaiter.ClusterId)
d.Set("cluster_id", d.Id())
isPinned, ok := d.GetOk("is_pinned")
if ok && isPinned.(bool) {
Expand All @@ -437,6 +445,20 @@ func resourceClusterCreate(ctx context.Context, d *schema.ResourceData, c *commo
}); err != nil {
return err
}
}

// If there is a no_wait flag set to true, don't wait for the cluster to be created
noWait, ok := d.GetOk("no_wait")
if ok && noWait.(bool) {
return nil
}

clusterInfo, err := clusterWaiter.GetWithTimeout(timeout)
if err != nil {
return err
}

if len(cluster.Libraries) > 0 {
_, err := libraries.WaitForLibrariesInstalledSdk(ctx, w, compute.Wait{
ClusterID: d.Id(),
IsRunning: clusterInfo.IsRunningOrResizing(),
Expand Down Expand Up @@ -508,7 +530,7 @@ func resourceClusterRead(ctx context.Context, d *schema.ResourceData, c *common.
func hasClusterConfigChanged(d *schema.ResourceData) bool {
for k := range clusterSchema {
// TODO: create a map if we'll add more non-cluster config parameters in the future
if k == "library" || k == "is_pinned" {
if k == "library" || k == "is_pinned" || k == "no_wait" {
continue
}
if d.HasChange(k) {
Expand Down Expand Up @@ -551,6 +573,7 @@ func resourceClusterUpdate(ctx context.Context, d *schema.ResourceData, c *commo
for k := range clusterSchema {
if k == "library" ||
k == "is_pinned" ||
k == "no_wait" ||
k == "num_workers" ||
k == "autoscale" {
continue
Expand Down
155 changes: 155 additions & 0 deletions clusters/resource_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,161 @@ func TestResourceClusterCreatePhoton(t *testing.T) {
assert.Equal(t, "abc", d.Id())
}

func TestResourceClusterCreateNoWait_WithLibraries(t *testing.T) {
d, err := qa.ResourceFixture{
Fixtures: []qa.HTTPFixture{
{
Method: "POST",
Resource: "/api/2.1/clusters/create",
ExpectedRequest: compute.ClusterSpec{
NumWorkers: 100,
SparkVersion: "7.1-scala12",
NodeTypeId: "i3.xlarge",
AutoterminationMinutes: 60,
},
Response: compute.ClusterDetails{
ClusterId: "abc",
State: compute.StateUnknown,
},
},
{
Method: "GET",
ReuseRequest: true,
Resource: "/api/2.1/clusters/get?cluster_id=abc",
Response: compute.ClusterDetails{
ClusterId: "abc",
NumWorkers: 100,
SparkVersion: "7.1-scala12",
NodeTypeId: "i3.xlarge",
AutoterminationMinutes: 15,
State: compute.StateUnknown,
},
},
{
Method: "POST",
Resource: "/api/2.1/clusters/events",
ExpectedRequest: compute.GetEvents{
ClusterId: "abc",
Limit: 1,
Order: compute.GetEventsOrderDesc,
EventTypes: []compute.EventType{compute.EventTypePinned, compute.EventTypeUnpinned},
},
Response: compute.GetEventsResponse{
Events: []compute.ClusterEvent{},
TotalCount: 0,
},
},
{
Method: "POST",
Resource: "/api/2.0/libraries/install",
ExpectedRequest: compute.InstallLibraries{
ClusterId: "abc",
Libraries: []compute.Library{
{
Pypi: &compute.PythonPyPiLibrary{
Package: "seaborn==1.2.4",
},
},
},
},
},
{
Method: "GET",
Resource: "/api/2.0/libraries/cluster-status?cluster_id=abc",
Response: compute.ClusterLibraryStatuses{
LibraryStatuses: []compute.LibraryFullStatus{
{
Library: &compute.Library{
Pypi: &compute.PythonPyPiLibrary{
Package: "seaborn==1.2.4",
},
},
Status: compute.LibraryInstallStatusPending,
},
},
},
},
},
Create: true,
Resource: ResourceCluster(),
HCL: `num_workers = 100
spark_version = "7.1-scala12"
node_type_id = "i3.xlarge"
no_wait = true
library {
pypi {
package = "seaborn==1.2.4"
}
}`,
}.Apply(t)
assert.NoError(t, err)
assert.Equal(t, "abc", d.Id())
}

func TestResourceClusterCreateNoWait(t *testing.T) {
d, err := qa.ResourceFixture{
Fixtures: []qa.HTTPFixture{
{
Method: "POST",
Resource: "/api/2.1/clusters/create",
ExpectedRequest: compute.ClusterSpec{
NumWorkers: 100,
ClusterName: "Shared Autoscaling",
SparkVersion: "7.1-scala12",
NodeTypeId: "i3.xlarge",
AutoterminationMinutes: 15,
},
Response: compute.ClusterDetails{
ClusterId: "abc",
State: compute.StateUnknown,
},
},
{
Method: "GET",
ReuseRequest: true,
Resource: "/api/2.1/clusters/get?cluster_id=abc",
Response: compute.ClusterDetails{
ClusterId: "abc",
NumWorkers: 100,
ClusterName: "Shared Autoscaling",
SparkVersion: "7.1-scala12",
NodeTypeId: "i3.xlarge",
AutoterminationMinutes: 15,
State: compute.StateUnknown,
},
},
{
Method: "POST",
Resource: "/api/2.1/clusters/events",
ExpectedRequest: compute.GetEvents{
ClusterId: "abc",
Limit: 1,
Order: compute.GetEventsOrderDesc,
EventTypes: []compute.EventType{compute.EventTypePinned, compute.EventTypeUnpinned},
},
Response: compute.GetEventsResponse{
Events: []compute.ClusterEvent{},
TotalCount: 0,
},
},
},
Create: true,
Resource: ResourceCluster(),
State: map[string]any{
"autotermination_minutes": 15,
"cluster_name": "Shared Autoscaling",
"spark_version": "7.1-scala12",
"node_type_id": "i3.xlarge",
"num_workers": 100,
"is_pinned": false,
"no_wait": true,
},
}.Apply(t)
assert.NoError(t, err)
assert.Equal(t, "abc", d.Id())
}

func TestResourceClusterCreate_Error(t *testing.T) {
d, err := qa.ResourceFixture{
Fixtures: []qa.HTTPFixture{
Expand Down
1 change: 1 addition & 0 deletions docs/resources/cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ resource "databricks_cluster" "shared_autoscaling" {
* `custom_tags` - (Optional) Additional tags for cluster resources. Databricks will tag all cluster resources (e.g., AWS EC2 instances and EBS volumes) with these tags in addition to `default_tags`. If a custom cluster tag has the same name as a default cluster tag, the custom tag is prefixed with an `x_` when it is propagated.
* `spark_conf` - (Optional) Map with key-value pairs to fine-tune Spark clusters, where you can provide custom [Spark configuration properties](https://spark.apache.org/docs/latest/configuration.html) in a cluster configuration.
* `is_pinned` - (Optional) boolean value specifying if the cluster is pinned (not pinned by default). You must be a Databricks administrator to use this. The pinned clusters' maximum number is [limited to 100](https://docs.databricks.com/clusters/clusters-manage.html#pin-a-cluster), so `apply` may fail if you have more than that (this number may change over time, so check Databricks documentation for actual number).
* `no_wait` - (Optional) boolean value specifying if the provider don't need to wait for cluster to be started when creating it. Default value is false.

The following example demonstrates how to create an autoscaling cluster with [Delta Cache](https://docs.databricks.com/delta/optimizations/delta-cache.html) enabled:

Expand Down

0 comments on commit 512e055

Please sign in to comment.