From 8d628d198523beb0beb379a94661d31ac8c3b3b7 Mon Sep 17 00:00:00 2001 From: Divyansh Vijayvergia <171924202+Divyansh-db@users.noreply.github.com> Date: Wed, 31 Jul 2024 15:21:41 +0200 Subject: [PATCH] [Internal] Rewrite DLT pipelines using SDK (#3839) ## Changes Rewrite DLT pipelines resource using SDK. 2 new fields are a part of DLT pipeline schema now `gateway_definition` - The definition of a gateway pipeline to support CDC. `ingestion_definition` - The configuration for a managed ingestion pipeline. These settings cannot be used with the `library`, `target` or `catalog` settings. ## Tests All existing unit tests and integration tests are passing - [x] relevant change in `docs/` folder - [x] covered with integration tests in `internal/acceptance` - [x] relevant acceptance tests are passing - [x] using Go SDK --- docs/resources/pipeline.md | 26 +- exporter/exporter_test.go | 36 +- exporter/importables.go | 31 +- exporter/importables_test.go | 24 +- internal/acceptance/pipeline_test.go | 138 +++- pipelines/data_pipelines_test.go | 23 +- pipelines/resource_pipeline.go | 514 ++++++--------- pipelines/resource_pipeline_test.go | 939 ++++++++++++--------------- 8 files changed, 829 insertions(+), 902 deletions(-) diff --git a/docs/resources/pipeline.md b/docs/resources/pipeline.md index 8517845acd..6e610aa435 100644 --- a/docs/resources/pipeline.md +++ b/docs/resources/pipeline.md @@ -76,13 +76,26 @@ The following arguments are supported: * `library` blocks - Specifies pipeline code and required artifacts. Syntax resembles [library](cluster.md#library-configuration-block) configuration block with the addition of a special `notebook` & `file` library types that should have the `path` attribute. *Right now only the `notebook` & `file` types are supported.* * `cluster` blocks - [Clusters](cluster.md) to run the pipeline. If none is specified, pipelines will automatically select a default cluster configuration for the pipeline. *Please note that DLT pipeline clusters are supporting only subset of attributes as described in [documentation](https://docs.databricks.com/data-engineering/delta-live-tables/delta-live-tables-api-guide.html#pipelinesnewcluster).* Also, note that `autoscale` block is extended with the `mode` parameter that controls the autoscaling algorithm (possible values are `ENHANCED` for new, enhanced autoscaling algorithm, or `LEGACY` for old algorithm). * `continuous` - A flag indicating whether to run the pipeline continuously. The default value is `false`. -* `development` - A flag indicating whether to run the pipeline in development mode. The default value is `true`. +* `development` - A flag indicating whether to run the pipeline in development mode. The default value is `false`. * `photon` - A flag indicating whether to use Photon engine. The default value is `false`. * `serverless` - An optional flag indicating if serverless compute should be used for this DLT pipeline. Requires `catalog` to be set, as it could be used only with Unity Catalog. * `catalog` - The name of catalog in Unity Catalog. *Change of this parameter forces recreation of the pipeline.* (Conflicts with `storage`). * `target` - The name of a database (in either the Hive metastore or in a UC catalog) for persisting pipeline output data. Configuring the target setting allows you to view and query the pipeline output data from the Databricks UI. * `edition` - optional name of the [product edition](https://docs.databricks.com/data-engineering/delta-live-tables/delta-live-tables-concepts.html#editions). Supported values are: `CORE`, `PRO`, `ADVANCED` (default). Not required when `serverless` is set to `true`. * `channel` - optional name of the release channel for Spark version used by DLT pipeline. Supported values are: `CURRENT` (default) and `PREVIEW`. +* `allow_duplicate_names` - Optional boolean flag. If false, deployment will fail if name conflicts with that of another pipeline. default is `false`. +* `deployment` - Deployment type of this pipeline. Supports following attributes: + * `kind` - The deployment method that manages the pipeline. + * `metadata_file_path` - The path to the file containing metadata about the deployment. +* `filters` - Filters on which Pipeline packages to include in the deployed graph. This block consists of following attributes: + * `include` - Paths to include. + * `exclude` - Paths to exclude. +* `gateway_definition` - The definition of a gateway pipeline to support CDC. Consists of following attributes: + * `connection_id` - Immutable. The Unity Catalog connection this gateway pipeline uses to communicate with the source. + * `gateway_storage_catalog` - Required, Immutable. The name of the catalog for the gateway pipeline's storage location. + * `gateway_storage_name` - Required. The Unity Catalog-compatible naming for the gateway storage location. This is the destination to use for the data that is extracted by the gateway. Delta Live Tables system will automatically create the storage location under the catalog and schema. + * `gateway_storage_schema` - Required, Immutable. The name of the schema for the gateway pipelines's storage location. + ### notification block @@ -95,6 +108,17 @@ DLT allows to specify one or more notification blocks to get notifications about * `on-update-fatal-failure` - a pipeline update fails with a non-retryable (fatal) error. * `on-flow-failure` - a single data flow fails. +### ingestion_definition block + +The configuration for a managed ingestion pipeline. These settings cannot be used with the `library`, `target` or `catalog` settings. This block consists of following attributes: + +* `connection_name` - Immutable. The Unity Catalog connection this ingestion pipeline uses to communicate with the source. Specify either ingestion_gateway_id or connection_name. +* `ingestion_gateway_id` - Immutable. Identifier for the ingestion gateway used by this ingestion pipeline to communicate with the source. Specify either ingestion_gateway_id or connection_name. +* `objects` - Required. Settings specifying tables to replicate and the destination for the replicated tables. +* `table_configuration` - Configuration settings to control the ingestion of tables. These settings are applied to all tables in the pipeline. + + + ## Attribute Reference In addition to all arguments above, the following attributes are exported: diff --git a/exporter/exporter_test.go b/exporter/exporter_test.go index ca07f3c6db..8cc973c4e1 100644 --- a/exporter/exporter_test.go +++ b/exporter/exporter_test.go @@ -19,6 +19,7 @@ import ( "github.com/databricks/databricks-sdk-go/service/iam" sdk_jobs "github.com/databricks/databricks-sdk-go/service/jobs" "github.com/databricks/databricks-sdk-go/service/ml" + "github.com/databricks/databricks-sdk-go/service/pipelines" "github.com/databricks/databricks-sdk-go/service/serving" "github.com/databricks/databricks-sdk-go/service/settings" "github.com/databricks/databricks-sdk-go/service/sharing" @@ -29,7 +30,6 @@ import ( "github.com/databricks/terraform-provider-databricks/commands" "github.com/databricks/terraform-provider-databricks/common" "github.com/databricks/terraform-provider-databricks/jobs" - "github.com/databricks/terraform-provider-databricks/pipelines" "github.com/databricks/terraform-provider-databricks/qa" "github.com/databricks/terraform-provider-databricks/repos" "github.com/databricks/terraform-provider-databricks/scim" @@ -252,7 +252,7 @@ var emptyPipelines = qa.HTTPFixture{ Method: "GET", ReuseRequest: true, Resource: "/api/2.0/pipelines?max_results=50", - Response: pipelines.PipelineListResponse{}, + Response: pipelines.ListPipelinesResponse{}, } var emptyClusterPolicies = qa.HTTPFixture{ @@ -1951,10 +1951,10 @@ func TestImportingDLTPipelines(t *testing.T) { { Method: "GET", Resource: "/api/2.0/pipelines?max_results=50", - Response: pipelines.PipelineListResponse{ + Response: pipelines.ListPipelinesResponse{ Statuses: []pipelines.PipelineStateInfo{ { - PipelineID: "123", + PipelineId: "123", Name: "Pipeline1", }, }, @@ -2009,7 +2009,7 @@ func TestImportingDLTPipelines(t *testing.T) { }, { Method: "GET", - Resource: "/api/2.0/pipelines/123", + Resource: "/api/2.0/pipelines/123?", Response: getJSONObject("test-data/get-dlt-pipeline.json"), }, { @@ -2130,14 +2130,14 @@ func TestImportingDLTPipelinesMatchingOnly(t *testing.T) { { Method: "GET", Resource: "/api/2.0/pipelines?max_results=50", - Response: pipelines.PipelineListResponse{ + Response: pipelines.ListPipelinesResponse{ Statuses: []pipelines.PipelineStateInfo{ { - PipelineID: "123", + PipelineId: "123", Name: "Pipeline1 test", }, { - PipelineID: "124", + PipelineId: "124", Name: "Pipeline1", }, }, @@ -2145,7 +2145,7 @@ func TestImportingDLTPipelinesMatchingOnly(t *testing.T) { }, { Method: "GET", - Resource: "/api/2.0/pipelines/123", + Resource: "/api/2.0/pipelines/123?", Response: getJSONObject("test-data/get-dlt-pipeline.json"), }, { @@ -2494,14 +2494,14 @@ func TestIncrementalDLTAndMLflowWebhooks(t *testing.T) { { Method: "GET", Resource: "/api/2.0/pipelines?max_results=50", - Response: pipelines.PipelineListResponse{ + Response: pipelines.ListPipelinesResponse{ Statuses: []pipelines.PipelineStateInfo{ { - PipelineID: "abc", + PipelineId: "abc", Name: "abc", }, { - PipelineID: "def", + PipelineId: "def", Name: "def", }, }, @@ -2509,18 +2509,18 @@ func TestIncrementalDLTAndMLflowWebhooks(t *testing.T) { }, { Method: "GET", - Resource: "/api/2.0/pipelines/abc", - Response: pipelines.PipelineInfo{ - PipelineID: "abc", + Resource: "/api/2.0/pipelines/abc?", + Response: pipelines.GetPipelineResponse{ + PipelineId: "abc", Name: "abc", LastModified: 1681466931226, }, }, { Method: "GET", - Resource: "/api/2.0/pipelines/def", - Response: pipelines.PipelineInfo{ - PipelineID: "def", + Resource: "/api/2.0/pipelines/def?", + Response: pipelines.GetPipelineResponse{ + PipelineId: "def", Name: "def", LastModified: 1690156900000, Spec: &pipelines.PipelineSpec{ diff --git a/exporter/importables.go b/exporter/importables.go index a32c84e9d3..b154541bf4 100644 --- a/exporter/importables.go +++ b/exporter/importables.go @@ -20,6 +20,7 @@ import ( "github.com/databricks/databricks-sdk-go/service/iam" sdk_jobs "github.com/databricks/databricks-sdk-go/service/jobs" "github.com/databricks/databricks-sdk-go/service/ml" + "github.com/databricks/databricks-sdk-go/service/pipelines" "github.com/databricks/databricks-sdk-go/service/serving" "github.com/databricks/databricks-sdk-go/service/settings" "github.com/databricks/databricks-sdk-go/service/sharing" @@ -31,7 +32,6 @@ import ( "github.com/databricks/terraform-provider-databricks/jobs" "github.com/databricks/terraform-provider-databricks/mws" "github.com/databricks/terraform-provider-databricks/permissions" - "github.com/databricks/terraform-provider-databricks/pipelines" "github.com/databricks/terraform-provider-databricks/repos" tfsharing "github.com/databricks/terraform-provider-databricks/sharing" tfsql "github.com/databricks/terraform-provider-databricks/sql" @@ -1943,8 +1943,13 @@ var resourcesMap map[string]importable = map[string]importable{ return name + "_" + d.Id() }, List: func(ic *importContext) error { - api := pipelines.NewPipelinesAPI(ic.Context, ic.Client) - pipelinesList, err := api.List(50, "") + w, err := ic.Client.WorkspaceClient() + if err != nil { + return err + } + pipelinesList, err := w.Pipelines.ListPipelinesAll(ic.Context, pipelines.ListPipelinesRequest{ + MaxResults: 50, + }) if err != nil { return err } @@ -1954,7 +1959,9 @@ var resourcesMap map[string]importable = map[string]importable{ } var modifiedAt int64 if ic.incremental { - pipeline, err := api.Read(q.PipelineID) + pipeline, err := w.Pipelines.Get(ic.Context, pipelines.GetPipelineRequest{ + PipelineId: q.PipelineId, + }) if err != nil { return err } @@ -1962,7 +1969,7 @@ var resourcesMap map[string]importable = map[string]importable{ } ic.EmitIfUpdatedAfterMillis(&resource{ Resource: "databricks_pipeline", - ID: q.PipelineID, + ID: q.PipelineId, }, modifiedAt, fmt.Sprintf("DLT Pipeline '%s'", q.Name)) log.Printf("[INFO] Imported %d of %d DLT Pipelines", i+1, len(pipelinesList)) } @@ -1996,25 +2003,25 @@ var resourcesMap map[string]importable = map[string]importable{ ID: cluster.AwsAttributes.InstanceProfileArn, }) } - if cluster.InstancePoolID != "" { + if cluster.InstancePoolId != "" { ic.Emit(&resource{ Resource: "databricks_instance_pool", - ID: cluster.InstancePoolID, + ID: cluster.InstancePoolId, }) } - if cluster.DriverInstancePoolID != "" { + if cluster.DriverInstancePoolId != "" { ic.Emit(&resource{ Resource: "databricks_instance_pool", - ID: cluster.DriverInstancePoolID, + ID: cluster.DriverInstancePoolId, }) } - if cluster.PolicyID != "" { + if cluster.PolicyId != "" { ic.Emit(&resource{ Resource: "databricks_cluster_policy", - ID: cluster.PolicyID, + ID: cluster.PolicyId, }) } - ic.emitInitScriptsLegacy(cluster.InitScripts) + ic.emitInitScripts(cluster.InitScripts) ic.emitSecretsFromSecretsPathMap(cluster.SparkConf) ic.emitSecretsFromSecretsPathMap(cluster.SparkEnvVars) } diff --git a/exporter/importables_test.go b/exporter/importables_test.go index fbcf94adcf..cd4e27846f 100644 --- a/exporter/importables_test.go +++ b/exporter/importables_test.go @@ -15,6 +15,7 @@ import ( "github.com/databricks/databricks-sdk-go/service/compute" "github.com/databricks/databricks-sdk-go/service/iam" sdk_jobs "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/databricks/databricks-sdk-go/service/pipelines" "github.com/databricks/databricks-sdk-go/service/sharing" sdk_workspace "github.com/databricks/databricks-sdk-go/service/workspace" tfcatalog "github.com/databricks/terraform-provider-databricks/catalog" @@ -23,7 +24,8 @@ import ( "github.com/databricks/terraform-provider-databricks/common" "github.com/databricks/terraform-provider-databricks/jobs" "github.com/databricks/terraform-provider-databricks/permissions" - "github.com/databricks/terraform-provider-databricks/pipelines" + + dlt_pipelines "github.com/databricks/terraform-provider-databricks/pipelines" "github.com/databricks/terraform-provider-databricks/policies" "github.com/databricks/terraform-provider-databricks/pools" "github.com/databricks/terraform-provider-databricks/provider" @@ -291,7 +293,7 @@ func TestRepoIgnore(t *testing.T) { func TestDLTIgnore(t *testing.T) { ic := importContextForTest() - d := pipelines.ResourcePipeline().ToResource().TestResourceData() + d := dlt_pipelines.ResourcePipeline().ToResource().TestResourceData() d.SetId("12345") r := &resource{ID: "12345", Data: d} // job without libraries @@ -1340,14 +1342,14 @@ func TestIncrementalListDLT(t *testing.T) { { Method: "GET", Resource: "/api/2.0/pipelines?max_results=50", - Response: pipelines.PipelineListResponse{ + Response: pipelines.ListPipelinesResponse{ Statuses: []pipelines.PipelineStateInfo{ { - PipelineID: "abc", + PipelineId: "abc", Name: "abc", }, { - PipelineID: "def", + PipelineId: "def", Name: "def", }, }, @@ -1355,18 +1357,18 @@ func TestIncrementalListDLT(t *testing.T) { }, { Method: "GET", - Resource: "/api/2.0/pipelines/abc", - Response: pipelines.PipelineInfo{ - PipelineID: "abc", + Resource: "/api/2.0/pipelines/abc?", + Response: pipelines.GetPipelineResponse{ + PipelineId: "abc", Name: "abc", LastModified: 1681466931226, }, }, { Method: "GET", - Resource: "/api/2.0/pipelines/def", - Response: pipelines.PipelineInfo{ - PipelineID: "def", + Resource: "/api/2.0/pipelines/def?", + Response: pipelines.GetPipelineResponse{ + PipelineId: "def", Name: "def", LastModified: 1690156900000, }, diff --git a/internal/acceptance/pipeline_test.go b/internal/acceptance/pipeline_test.go index 263ef24b99..7b78e2c23c 100644 --- a/internal/acceptance/pipeline_test.go +++ b/internal/acceptance/pipeline_test.go @@ -2,6 +2,7 @@ package acceptance import ( "context" + "fmt" "testing" "github.com/databricks/databricks-sdk-go/service/pipelines" @@ -13,11 +14,11 @@ var ( dltNotebookResource = ` resource "databricks_notebook" "this" { content_base64 = base64encode(<<-EOT - CREATE LIVE TABLE clickstream_raw AS + CREATE LIVE TABLE clickstream_raw AS SELECT * FROM json.` + "`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`" + ` - + -- COMMAND ---------- - + CREATE LIVE TABLE clickstream_clean( CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL), CONSTRAINT valid_count EXPECT (click_count > 0) ON VIOLATION FAIL UPDATE @@ -29,9 +30,9 @@ var ( CAST (prev_id AS INT) AS previous_page_id, prev_title AS previous_page_title FROM live.clickstream_raw - + -- COMMAND ---------- - + CREATE LIVE TABLE top_spark_referers TBLPROPERTIES ("quality" = "gold") AS SELECT previous_page_title as referrer, @@ -39,7 +40,7 @@ var ( FROM live.clickstream_clean WHERE current_page_title = 'Apache_Spark' ORDER BY click_count DESC - LIMIT 10 + LIMIT 10 EOT ) path = "/Shared/${local.name}" @@ -57,7 +58,7 @@ func TestAccPipelineResource_CreatePipeline(t *testing.T) { resource "databricks_pipeline" "this" { name = local.name storage = "/test/${local.name}" - + configuration = { key1 = "value1" key2 = "value2" @@ -97,11 +98,11 @@ func TestAccAwsPipelineResource_CreatePipeline(t *testing.T) { workspaceLevel(t, step{ Template: ` locals { - name = "pipeline-acceptance-aws-{var.RANDOM}" + name = "pipeline-acceptance-aws-{var.STICKY_RANDOM}" } resource "databricks_pipeline" "this" { name = local.name - storage = "/test/${local.name}" + storage = "/test/${local.name}" configuration = { key1 = "value1" key2 = "value2" @@ -137,11 +138,11 @@ func TestAccAwsPipelineResource_CreatePipeline(t *testing.T) { }, step{ Template: ` locals { - name = "pipeline-acceptance-aws-{var.RANDOM}" + name = "pipeline-acceptance-aws-{var.STICKY_RANDOM}" } resource "databricks_pipeline" "this" { name = local.name - storage = "/test/${local.name}" + storage = "/test/${local.name}" configuration = { key1 = "value1" key2 = "value2" @@ -186,7 +187,7 @@ func TestAccPipelineResource_CreatePipelineWithoutWorkers(t *testing.T) { resource "databricks_pipeline" "this" { name = local.name storage = "/test/${local.name}" - + configuration = { key1 = "value1" key2 = "value2" @@ -203,7 +204,7 @@ func TestAccPipelineResource_CreatePipelineWithoutWorkers(t *testing.T) { label = "default" num_workers = 0 spark_conf = { - "spark.databricks.cluster.profile" = "singleNode" + "spark.databricks.cluster.profile" = "singleNode" } } @@ -228,3 +229,114 @@ func TestAccPipelineResource_CreatePipelineWithoutWorkers(t *testing.T) { ), }) } + +func TestAccPipelineResourcLastModified(t *testing.T) { + var lastModified int64 + workspaceLevel(t, step{ + Template: ` + locals { + name = "pipeline-acceptance-{var.STICKY_RANDOM}" + } + resource "databricks_pipeline" "this" { + name = local.name + storage = "/test/${local.name}" + + configuration = { + key1 = "value1" + key2 = "value2" + } + + library { + notebook { + path = databricks_notebook.this.path + } + } + + cluster { + instance_pool_id = "{env.TEST_INSTANCE_POOL_ID}" + label = "default" + num_workers = 2 + custom_tags = { + cluster_type = "default" + } + } + + cluster { + instance_pool_id = "{env.TEST_INSTANCE_POOL_ID}" + label = "maintenance" + num_workers = 1 + custom_tags = { + cluster_type = "maintenance" + } + } + continuous = false + } + ` + dltNotebookResource, + Check: resourceCheck("databricks_pipeline.this", func(ctx context.Context, client *common.DatabricksClient, id string) error { + ctx = context.WithValue(ctx, common.Api, common.API_2_1) + w, err := client.WorkspaceClient() + assert.NoError(t, err) + pipeline, err := w.Pipelines.Get(ctx, pipelines.GetPipelineRequest{ + PipelineId: id, + }) + assert.NoError(t, err) + assert.Equal(t, pipeline.CreatorUserName, pipeline.RunAsUserName) + lastModified = pipeline.LastModified + return nil + }), + }, step{ + Template: ` + locals { + name = "pipeline-acceptance-{var.STICKY_RANDOM}" + } + resource "databricks_pipeline" "this" { + name = local.name + storage = "/test/${local.name}" + + configuration = { + key1 = "value1" + key2 = "value2" + key3 = "value3" + } + + library { + notebook { + path = databricks_notebook.this.path + } + } + + cluster { + instance_pool_id = "{env.TEST_INSTANCE_POOL_ID}" + label = "default" + num_workers = 2 + custom_tags = { + cluster_type = "default" + } + } + + cluster { + instance_pool_id = "{env.TEST_INSTANCE_POOL_ID}" + label = "maintenance" + num_workers = 1 + custom_tags = { + cluster_type = "maintenance" + } + } + continuous = false + expected_last_modified = ` + fmt.Sprintf("%d", lastModified) + ` + } + ` + dltNotebookResource, + Check: resourceCheck("databricks_pipeline.this", func(ctx context.Context, client *common.DatabricksClient, id string) error { + ctx = context.WithValue(ctx, common.Api, common.API_2_1) + w, err := client.WorkspaceClient() + assert.NoError(t, err) + pipeline, err := w.Pipelines.Get(ctx, pipelines.GetPipelineRequest{ + PipelineId: id, + }) + assert.NoError(t, err) + assert.NotEqual(t, pipeline.LastModified, lastModified) + return nil + }), + }) + +} diff --git a/pipelines/data_pipelines_test.go b/pipelines/data_pipelines_test.go index b08b2fb2a2..84b0840e40 100755 --- a/pipelines/data_pipelines_test.go +++ b/pipelines/data_pipelines_test.go @@ -3,6 +3,7 @@ package pipelines import ( "testing" + "github.com/databricks/databricks-sdk-go/service/pipelines" "github.com/databricks/terraform-provider-databricks/qa" ) @@ -22,10 +23,10 @@ func TestDataSourcePipelines(t *testing.T) { { Method: "GET", Resource: "/api/2.0/pipelines?max_results=100", - Response: PipelineListResponse{ - Statuses: []PipelineStateInfo{ + Response: pipelines.ListPipelinesResponse{ + Statuses: []pipelines.PipelineStateInfo{ { - PipelineID: "123", + PipelineId: "123", Name: "Pipeline1", CreatorUserName: "user1", }, @@ -35,10 +36,10 @@ func TestDataSourcePipelines(t *testing.T) { { Method: "GET", Resource: "/api/2.0/pipelines?max_results=100&page_token=token1", - Response: PipelineListResponse{ - Statuses: []PipelineStateInfo{ + Response: pipelines.ListPipelinesResponse{ + Statuses: []pipelines.PipelineStateInfo{ { - PipelineID: "123", + PipelineId: "123", Name: "Pipeline1", CreatorUserName: "user1", }, @@ -64,10 +65,10 @@ func TestDataSourcePipelines_Search(t *testing.T) { { Method: "GET", Resource: "/api/2.0/pipelines?filter=name+LIKE+%27Pipeline1%27&max_results=100", - Response: PipelineListResponse{ - Statuses: []PipelineStateInfo{ + Response: pipelines.ListPipelinesResponse{ + Statuses: []pipelines.PipelineStateInfo{ { - PipelineID: "123", + PipelineId: "123", Name: "Pipeline1", CreatorUserName: "user1", }, @@ -95,7 +96,7 @@ func TestDataSourcePipelines_SearchError(t *testing.T) { { Method: "GET", Resource: "/api/2.0/pipelines?filter=name+LIKE+%27Pipeline2%27&max_results=100", - Response: PipelineListResponse{}, + Response: pipelines.ListPipelinesResponse{}, }, }, Resource: DataSourcePipelines(), @@ -114,7 +115,7 @@ func TestDataSourcePipelines_NoneFound(t *testing.T) { { Method: "GET", Resource: "/api/2.0/pipelines?max_results=100", - Response: PipelineListResponse{}, + Response: pipelines.ListPipelinesResponse{}, }, }, Resource: DataSourcePipelines(), diff --git a/pipelines/resource_pipeline.go b/pipelines/resource_pipeline.go index 457bf95744..2b827e7e3b 100644 --- a/pipelines/resource_pipeline.go +++ b/pipelines/resource_pipeline.go @@ -7,308 +7,112 @@ import ( "regexp" "time" - "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" - "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" - "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation" - + "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/apierr" - "github.com/databricks/databricks-sdk-go/marshal" - "github.com/databricks/databricks-sdk-go/service/compute" + "github.com/databricks/databricks-sdk-go/service/pipelines" "github.com/databricks/terraform-provider-databricks/clusters" "github.com/databricks/terraform-provider-databricks/common" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/retry" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation" ) // DefaultTimeout is the default amount of time that Terraform will wait when creating, updating and deleting pipelines. const DefaultTimeout = 20 * time.Minute -// dltAutoScale is a struct the describes auto scaling for DLT clusters -type dltAutoScale struct { - MinWorkers int32 `json:"min_workers,omitempty"` - MaxWorkers int32 `json:"max_workers,omitempty"` - Mode string `json:"mode,omitempty"` -} - -// We separate this struct from Cluster for two reasons: -// 1. Pipeline clusters include a `Label` field. -// 2. Spark version is not required (and shouldn't be specified) for pipeline clusters. -// 3. num_workers is optional, and there is no single-node support for pipelines clusters. -type pipelineCluster struct { - Label string `json:"label,omitempty"` // used only by pipelines - - NumWorkers int32 `json:"num_workers,omitempty" tf:"group:size"` - Autoscale *dltAutoScale `json:"autoscale,omitempty" tf:"group:size"` - - NodeTypeID string `json:"node_type_id,omitempty" tf:"group:node_type,computed"` - DriverNodeTypeID string `json:"driver_node_type_id,omitempty" tf:"computed"` - InstancePoolID string `json:"instance_pool_id,omitempty" tf:"group:node_type"` - DriverInstancePoolID string `json:"driver_instance_pool_id,omitempty"` - AwsAttributes *clusters.AwsAttributes `json:"aws_attributes,omitempty"` - GcpAttributes *clusters.GcpAttributes `json:"gcp_attributes,omitempty"` - AzureAttributes *clusters.AzureAttributes `json:"azure_attributes,omitempty"` - - EnableLocalDiskEncryption bool `json:"enable_local_disk_encryption,omitempty" tf:"computed"` - - PolicyID string `json:"policy_id,omitempty"` - ApplyPolicyDefaultValues bool `json:"apply_policy_default_values,omitempty"` - - SparkConf map[string]string `json:"spark_conf,omitempty"` - SparkEnvVars map[string]string `json:"spark_env_vars,omitempty"` - CustomTags map[string]string `json:"custom_tags,omitempty"` - - SSHPublicKeys []string `json:"ssh_public_keys,omitempty" tf:"max_items:10"` - InitScripts []clusters.InitScriptStorageInfo `json:"init_scripts,omitempty" tf:"max_items:10"` // TODO: tf:alias - ClusterLogConf *clusters.StorageInfo `json:"cluster_log_conf,omitempty"` - - ForceSendFields []string `json:"-"` -} - -func (s *pipelineCluster) UnmarshalJSON(b []byte) error { - return marshal.Unmarshal(b, s) -} - -func (s pipelineCluster) MarshalJSON() ([]byte, error) { - return marshal.Marshal(s) -} - -type NotebookLibrary struct { - Path string `json:"path"` -} - -type FileLibrary struct { - Path string `json:"path"` -} - -type PipelineLibrary struct { - Jar string `json:"jar,omitempty"` - Maven *compute.MavenLibrary `json:"maven,omitempty"` - Whl string `json:"whl,omitempty"` - Notebook *NotebookLibrary `json:"notebook,omitempty"` - File *FileLibrary `json:"file,omitempty"` -} - -type filters struct { - Include []string `json:"include,omitempty"` - Exclude []string `json:"exclude,omitempty"` -} - -type Notification struct { - EmailRecipients []string `json:"email_recipients" tf:"min_items:1"` - Alerts []string `json:"alerts" tf:"min_items:1"` -} - -type PipelineSpec struct { - ID string `json:"id,omitempty" tf:"computed"` - Name string `json:"name,omitempty"` - Storage string `json:"storage,omitempty" tf:"force_new"` - Catalog string `json:"catalog,omitempty" tf:"force_new"` - Configuration map[string]string `json:"configuration,omitempty"` - Clusters []pipelineCluster `json:"clusters,omitempty" tf:"alias:cluster"` - Libraries []PipelineLibrary `json:"libraries,omitempty" tf:"slice_set,alias:library"` - Filters *filters `json:"filters,omitempty"` - Continuous bool `json:"continuous,omitempty"` - Development bool `json:"development,omitempty"` - AllowDuplicateNames bool `json:"allow_duplicate_names,omitempty"` - Target string `json:"target,omitempty"` - Photon bool `json:"photon,omitempty"` - Edition string `json:"edition,omitempty" tf:"suppress_diff,default:ADVANCED"` - Channel string `json:"channel,omitempty" tf:"suppress_diff,default:CURRENT"` - Notifications []Notification `json:"notifications,omitempty" tf:"alias:notification"` - Serverless bool `json:"serverless" tf:"optional"` - Deployment *PipelineDeployment `json:"deployment,omitempty"` -} - -type createPipelineResponse struct { - PipelineID string `json:"pipeline_id"` -} - -// PipelineState ... -type PipelineState string - -// Constants for PipelineStates -const ( - StateDeploying PipelineState = "DEPLOYING" - StateStarting PipelineState = "STARTING" - StateRunning PipelineState = "RUNNING" - StateStopping PipelineState = "STOPPPING" - StateDeleted PipelineState = "DELETED" - StateRecovering PipelineState = "RECOVERING" - StateFailed PipelineState = "FAILED" - StateResetting PipelineState = "RESETTING" - StateIdle PipelineState = "IDLE" -) - -// PipelineHealthStatus ... -type PipelineHealthStatus string - -// Constants for PipelineHealthStatus -const ( - HealthStatusHealthy PipelineHealthStatus = "HEALTHY" - HealthStatusUnhealthy PipelineHealthStatus = "UNHEALTHY" -) - -type PipelineInfo struct { - PipelineID string `json:"pipeline_id"` - Spec *PipelineSpec `json:"spec"` - State *PipelineState `json:"state"` - Cause string `json:"cause"` - ClusterID string `json:"cluster_id"` - Name string `json:"name"` - Health *PipelineHealthStatus `json:"health"` - CreatorUserName string `json:"creator_user_name"` - LastModified int64 `json:"last_modified"` -} - -type PipelineUpdateStateInfo struct { - UpdateID string `json:"update_id"` - State *PipelineState `json:"state"` - CreationTime string `json:"creation_time"` -} - -type PipelineStateInfo struct { - PipelineID string `json:"pipeline_id"` - State *PipelineState `json:"state"` - ClusterID string `json:"cluster_id"` - Name string `json:"name"` - Health *PipelineHealthStatus `json:"health"` - CreatorUserName string `json:"creator_user_name"` - RunAsUserName string `json:"run_as_user_name"` - LatestUpdates []PipelineUpdateStateInfo `json:"latest_updates,omitempty"` -} - -type PipelineListResponse struct { - Statuses []PipelineStateInfo `json:"statuses"` - NextPageToken string `json:"next_page_token,omitempty"` - PrevPageToken string `json:"prev_page_token,omitempty"` -} - -type PipelinesAPI struct { - client *common.DatabricksClient - ctx context.Context -} - -type DeploymentKind string - -const ( - DeploymentKindBundle DeploymentKind = "BUNDLE" -) - -type PipelineDeployment struct { - Kind DeploymentKind `json:"kind,omitempty"` - MetadataFilePath string `json:"metadata_file_path,omitempty"` -} - -func NewPipelinesAPI(ctx context.Context, m any) PipelinesAPI { - return PipelinesAPI{m.(*common.DatabricksClient), ctx} +func adjustForceSendFields(clusterList *[]pipelines.PipelineCluster) { + for i := range *clusterList { + cluster := &((*clusterList)[i]) + // TF Go SDK doesn't differentiate between the default and not set values. + // If nothing is specified, DLT creates a cluster with enhanced autoscaling + // from 1 to 5 nodes, which is different than sending a request for zero workers. + // The solution here is to look for the Spark configuration to determine + // if the user only wants a single node cluster (only master, no workers). + if cluster.SparkConf["spark.databricks.cluster.profile"] == "singleNode" { + cluster.ForceSendFields = append(cluster.ForceSendFields, "NumWorkers") + } + } } -func (a PipelinesAPI) Create(s PipelineSpec, timeout time.Duration) (string, error) { - adjustForceSendFields(&s) +func Create(w *databricks.WorkspaceClient, ctx context.Context, d *schema.ResourceData, timeout time.Duration) error { + var createPipelineRequest createPipelineRequestStruct + common.DataToStructPointer(d, pipelineSchema, &createPipelineRequest) + adjustForceSendFields(&createPipelineRequest.Clusters) - var resp createPipelineResponse - err := a.client.Post(a.ctx, "/pipelines", s, &resp) + createdPipeline, err := w.Pipelines.Create(ctx, createPipelineRequest.CreatePipeline) if err != nil { - return "", err + return err } - id := resp.PipelineID - err = a.waitForState(id, timeout, StateRunning) + id := createdPipeline.PipelineId + err = waitForState(w, ctx, id, timeout, pipelines.PipelineStateRunning) if err != nil { log.Printf("[INFO] Pipeline creation failed, attempting to clean up pipeline %s", id) - err2 := a.Delete(id, timeout) + err2 := Delete(w, ctx, id, timeout) if err2 != nil { log.Printf("[WARN] Unable to delete pipeline %s; this resource needs to be manually cleaned up", id) - return "", fmt.Errorf("multiple errors occurred when creating pipeline. Error while waiting for creation: \"%v\"; error while attempting to clean up failed pipeline: \"%v\"", err, err2) + return fmt.Errorf("multiple errors occurred when creating pipeline. Error while waiting for creation: \"%v\"; error while attempting to clean up failed pipeline: \"%v\"", err, err2) } log.Printf("[INFO] Successfully cleaned up pipeline %s", id) - return "", err + return err } - return id, nil + d.SetId(id) + return nil } -func adjustForceSendFields(s *PipelineSpec) { - for i := range s.Clusters { - cluster := &s.Clusters[i] - // TF Go SDK doesn't differentiate between the default and not set values. - // If nothing is specified, DLT creates a cluster with enhanced autoscaling - // from 1 to 5 nodes, which is different than sending a request for zero workers. - // The solution here is to look for the Spark configuration to determine - // if the user only wants a single node cluster (only master, no workers). - if cluster.SparkConf["spark.databricks.cluster.profile"] == "singleNode" { - cluster.ForceSendFields = append(cluster.ForceSendFields, "NumWorkers") - } - } +func Read(w *databricks.WorkspaceClient, ctx context.Context, id string) (*pipelines.GetPipelineResponse, error) { + return w.Pipelines.Get(ctx, pipelines.GetPipelineRequest{ + PipelineId: id, + }) } -func (a PipelinesAPI) Read(id string) (p PipelineInfo, err error) { - err = a.client.Get(a.ctx, "/pipelines/"+id, nil, &p) - return -} +func Update(w *databricks.WorkspaceClient, ctx context.Context, d *schema.ResourceData, timeout time.Duration) error { + var updatePipelineRequest updatePipelineRequestStruct + common.DataToStructPointer(d, pipelineSchema, &updatePipelineRequest) + updatePipelineRequest.EditPipeline.PipelineId = d.Id() + adjustForceSendFields(&updatePipelineRequest.Clusters) -func (a PipelinesAPI) Update(id string, s PipelineSpec, timeout time.Duration) error { - adjustForceSendFields(&s) - err := a.client.Put(a.ctx, "/pipelines/"+id, s) + err := w.Pipelines.Update(ctx, updatePipelineRequest.EditPipeline) if err != nil { return err } - return a.waitForState(id, timeout, StateRunning) + return waitForState(w, ctx, d.Id(), timeout, pipelines.PipelineStateRunning) } -func (a PipelinesAPI) Delete(id string, timeout time.Duration) error { - err := a.client.Delete(a.ctx, "/pipelines/"+id, map[string]string{}) +func Delete(w *databricks.WorkspaceClient, ctx context.Context, id string, timeout time.Duration) error { + err := w.Pipelines.Delete(ctx, pipelines.DeletePipelineRequest{ + PipelineId: id, + }) if err != nil { return err } - return resource.RetryContext(a.ctx, timeout, - func() *resource.RetryError { - i, err := a.Read(id) + return retry.RetryContext(ctx, timeout, + func() *retry.RetryError { + i, err := Read(w, ctx, id) if err != nil { if apierr.IsMissing(err) { return nil } - return resource.NonRetryableError(err) + return retry.NonRetryableError(err) } - message := fmt.Sprintf("Pipeline %s is in state %s, not yet deleted", id, *i.State) + message := fmt.Sprintf("Pipeline %s is in state %s, not yet deleted", id, i.State) log.Printf("[DEBUG] %s", message) - return resource.RetryableError(fmt.Errorf(message)) + return retry.RetryableError(fmt.Errorf(message)) }) } -// List returns a list of the DLT pipelines. List could be filtered by name -func (a PipelinesAPI) List(pageSize int, filter string) ([]PipelineStateInfo, error) { - payload := map[string]any{"max_results": pageSize} - if filter != "" { - payload["filter"] = filter - } - result := []PipelineStateInfo{} - - for { - var resp PipelineListResponse - err := a.client.Get(a.ctx, "/pipelines", payload, &resp) - if err != nil { - return []PipelineStateInfo{}, err - } - result = append(result, resp.Statuses...) - if resp.NextPageToken == "" { - break - } - payload["page_token"] = resp.NextPageToken - } - - return result, nil -} - -func (a PipelinesAPI) waitForState(id string, timeout time.Duration, desiredState PipelineState) error { - return resource.RetryContext(a.ctx, timeout, - func() *resource.RetryError { - i, err := a.Read(id) +func waitForState(w *databricks.WorkspaceClient, ctx context.Context, id string, timeout time.Duration, desiredState pipelines.PipelineState) error { + return retry.RetryContext(ctx, timeout, + func() *retry.RetryError { + i, err := Read(w, ctx, id) if err != nil { - return resource.NonRetryableError(err) + return retry.NonRetryableError(err) } - state := *i.State + state := i.State if state == desiredState { return nil } - if state == StateFailed { - return resource.NonRetryableError(fmt.Errorf("pipeline %s has failed", id)) + if state == pipelines.PipelineStateFailed { + return retry.NonRetryableError(fmt.Errorf("pipeline %s has failed", id)) } if !i.Spec.Continuous { // continuous pipelines just need a non-FAILED check @@ -316,10 +120,67 @@ func (a PipelinesAPI) waitForState(id string, timeout time.Duration, desiredStat } message := fmt.Sprintf("Pipeline %s is in state %s, not yet in state %s", id, state, desiredState) log.Printf("[DEBUG] %s", message) - return resource.RetryableError(fmt.Errorf(message)) + return retry.RetryableError(fmt.Errorf(message)) }) } +type createPipelineRequestStruct struct { + pipelines.CreatePipeline +} + +var aliasMap = map[string]string{ + "clusters": "cluster", + "libraries": "library", + "notifications": "notification", +} + +func (createPipelineRequestStruct) Aliases() map[string]map[string]string { + return map[string]map[string]string{ + "pipelines.createPipelineRequestStruct": aliasMap, + } +} + +func (createPipelineRequestStruct) CustomizeSchema(s *common.CustomizableSchema) *common.CustomizableSchema { + return s +} + +type updatePipelineRequestStruct struct { + pipelines.EditPipeline +} + +func (updatePipelineRequestStruct) Aliases() map[string]map[string]string { + return map[string]map[string]string{ + "pipelines.updatePipelineRequestStruct": aliasMap, + } +} + +func (updatePipelineRequestStruct) CustomizeSchema(s *common.CustomizableSchema) *common.CustomizableSchema { + return s +} + +type Pipeline struct { + pipelines.PipelineSpec + AllowDuplicateNames bool `json:"allow_duplicate_names,omitempty"` + Cause string `json:"cause,omitempty"` + ClusterId string `json:"cluster_id,omitempty"` + CreatorUserName string `json:"creator_user_name,omitempty"` + Health pipelines.GetPipelineResponseHealth `json:"health,omitempty"` + LastModified int64 `json:"last_modified,omitempty"` + LatestUpdates []pipelines.UpdateStateInfo `json:"latest_updates,omitempty"` + RunAsUserName string `json:"run_as_user_name,omitempty"` + ExpectedLastModified int64 `json:"expected_last_modified,omitempty"` + State pipelines.PipelineState `json:"state,omitempty"` + // Provides the URL to the pipeline in the Databricks UI. + URL string `json:"url,omitempty"` +} + +func (Pipeline) Aliases() map[string]map[string]string { + return map[string]map[string]string{ + "pipelines.Pipeline": aliasMap, + } + +} + func suppressStorageDiff(k, old, new string, d *schema.ResourceData) bool { defaultStorageRegex := regexp.MustCompile( `^dbfs:/pipelines/[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$`) @@ -331,70 +192,125 @@ func suppressStorageDiff(k, old, new string, d *schema.ResourceData) bool { return false } -func adjustPipelineResourceSchema(m map[string]*schema.Schema) map[string]*schema.Schema { - clustersSchema := common.MustSchemaMap(m, "cluster") - clustersSchema["spark_conf"].DiffSuppressFunc = clusters.SparkConfDiffSuppressFunc - common.MustSchemaPath(clustersSchema, - "aws_attributes", "zone_id").DiffSuppressFunc = clusters.ZoneDiffSuppress - common.MustSchemaPath(clustersSchema, "autoscale", "mode").DiffSuppressFunc = common.EqualFoldDiffSuppress - - common.MustSchemaPath(clustersSchema, "init_scripts", "dbfs").Deprecated = clusters.DbfsDeprecationWarning - - gcpAttributesSchema := common.MustSchemaMap(clustersSchema, "gcp_attributes") - delete(gcpAttributesSchema, "use_preemptible_executors") - delete(gcpAttributesSchema, "boot_disk_size") - - m["library"].MinItems = 1 - m["url"] = &schema.Schema{ - Type: schema.TypeString, - Computed: true, - } - m["channel"].ValidateFunc = validation.StringInSlice([]string{"current", "preview"}, true) - m["edition"].ValidateFunc = validation.StringInSlice([]string{"pro", "core", "advanced"}, true) - m["edition"].DiffSuppressFunc = common.EqualFoldDiffSuppress - - m["storage"].DiffSuppressFunc = suppressStorageDiff - m["storage"].ConflictsWith = []string{"catalog"} - m["catalog"].ConflictsWith = []string{"storage"} - - return m +func (Pipeline) CustomizeSchema(s *common.CustomizableSchema) *common.CustomizableSchema { + + // ForceNew fields + s.SchemaPath("storage").SetForceNew() + s.SchemaPath("catalog").SetForceNew() + s.SchemaPath("gateway_definition", "connection_id").SetForceNew() + s.SchemaPath("gateway_definition", "gateway_storage_catalog").SetForceNew() + s.SchemaPath("gateway_definition", "gateway_storage_schema").SetForceNew() + s.SchemaPath("ingestion_definition", "connection_name").SetForceNew() + s.SchemaPath("ingestion_definition", "ingestion_gateway_id").SetForceNew() + + // Computed fields + s.SchemaPath("id").SetComputed() + s.SchemaPath("cluster", "node_type_id").SetComputed() + s.SchemaPath("cluster", "driver_node_type_id").SetComputed() + s.SchemaPath("cluster", "enable_local_disk_encryption").SetComputed() + s.SchemaPath("url").SetComputed() + + s.SchemaPath("state").SetComputed() + s.SchemaPath("latest_updates").SetComputed() + s.SchemaPath("last_modified").SetComputed() + s.SchemaPath("health").SetComputed() + s.SchemaPath("cause").SetComputed() + s.SchemaPath("cluster_id").SetComputed() + s.SchemaPath("creator_user_name").SetComputed() + + // SuppressDiff fields + s.SchemaPath("edition").SetSuppressDiff() + s.SchemaPath("channel").SetSuppressDiff() + s.SchemaPath("cluster", "spark_conf").SetCustomSuppressDiff(clusters.SparkConfDiffSuppressFunc) + s.SchemaPath("cluster", "aws_attributes", "zone_id").SetCustomSuppressDiff(clusters.ZoneDiffSuppress) + s.SchemaPath("cluster", "autoscale", "mode").SetCustomSuppressDiff(common.EqualFoldDiffSuppress) + s.SchemaPath("edition").SetCustomSuppressDiff(common.EqualFoldDiffSuppress) + s.SchemaPath("storage").SetCustomSuppressDiff(suppressStorageDiff) + + // Deprecated fields + s.SchemaPath("cluster", "init_scripts", "dbfs").SetDeprecated(clusters.DbfsDeprecationWarning) + s.SchemaPath("library", "whl").SetDeprecated("The 'whl' field is deprecated") + + // Delete fields + s.SchemaPath("cluster", "gcp_attributes").RemoveField("use_preemptible_executors") + s.SchemaPath("cluster", "gcp_attributes").RemoveField("boot_disk_size") + + // Default values + s.SchemaPath("edition").SetDefault("ADVANCED") + s.SchemaPath("channel").SetDefault("CURRENT") + + // ConflictsWith fields + s.SchemaPath("storage").SetConflictsWith([]string{"catalog"}) + s.SchemaPath("catalog").SetConflictsWith([]string{"storage"}) + s.SchemaPath("ingestion_definition", "connection_name").SetConflictsWith([]string{"ingestion_definition.0.ingestion_gateway_id"}) + + // MinItems fields + s.SchemaPath("library").SetMinItems(1) + s.SchemaPath("notification", "email_recipients").SetMinItems(1) + s.SchemaPath("notification", "alerts").SetMinItems(1) + + // MaxItems fields + s.SchemaPath("cluster", "ssh_public_keys").SetMaxItems(10) + s.SchemaPath("cluster", "init_scripts").SetMaxItems(10) + + // ValidateFunc fields + s.SchemaPath("channel").SetValidateFunc(validation.StringInSlice([]string{"current", "preview"}, true)) + s.SchemaPath("edition").SetValidateFunc(validation.StringInSlice([]string{"pro", "core", "advanced"}, true)) + + // RequiredWith fields + s.SchemaPath("gateway_definition").SetRequiredWith([]string{"gateway_definition.0.gateway_storage_name", "gateway_definition.0.gateway_storage_catalog", "gateway_definition.0.gateway_storage_schema"}) + s.SchemaPath("ingestion_definition").SetRequiredWith([]string{"ingestion_definition.0.objects"}) + + return s } -// ResourcePipeline defines the Terraform resource for pipelines. +var pipelineSchema = common.StructToSchema(Pipeline{}, nil) + func ResourcePipeline() common.Resource { - var pipelineSchema = common.StructToSchema(PipelineSpec{}, adjustPipelineResourceSchema) return common.Resource{ Schema: pipelineSchema, Create: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error { - var s PipelineSpec - common.DataToStructPointer(d, pipelineSchema, &s) - api := NewPipelinesAPI(ctx, c) - id, err := api.Create(s, d.Timeout(schema.TimeoutCreate)) + w, err := c.WorkspaceClient() + if err != nil { + return err + } + err = Create(w, ctx, d, d.Timeout(schema.TimeoutCreate)) if err != nil { return err } - d.SetId(id) d.Set("url", c.FormatURL("#joblist/pipelines/", d.Id())) return nil }, Read: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error { - i, err := NewPipelinesAPI(ctx, c).Read(d.Id()) + w, err := c.WorkspaceClient() + if err != nil { + return err + } + readPipeline, err := Read(w, ctx, d.Id()) + if err != nil { return err } - if i.Spec == nil { - return fmt.Errorf("pipeline spec is nil for '%v'", i.PipelineID) + if readPipeline.Spec == nil { + return fmt.Errorf("pipeline spec is nil for '%v'", readPipeline.PipelineId) } - return common.StructToData(*i.Spec, pipelineSchema, d) + return common.StructToData(readPipeline.Spec, pipelineSchema, d) }, Update: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error { - var s PipelineSpec - common.DataToStructPointer(d, pipelineSchema, &s) - return NewPipelinesAPI(ctx, c).Update(d.Id(), s, d.Timeout(schema.TimeoutUpdate)) + w, err := c.WorkspaceClient() + if err != nil { + return err + } + return Update(w, ctx, d, d.Timeout(schema.TimeoutUpdate)) + }, Delete: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error { - api := NewPipelinesAPI(ctx, c) - return api.Delete(d.Id(), d.Timeout(schema.TimeoutDelete)) + w, err := c.WorkspaceClient() + if err != nil { + return err + } + return Delete(w, ctx, d.Id(), d.Timeout(schema.TimeoutDelete)) + }, Timeouts: &schema.ResourceTimeout{ Default: schema.DefaultTimeout(DefaultTimeout), diff --git a/pipelines/resource_pipeline_test.go b/pipelines/resource_pipeline_test.go index 4fed6c1404..c05a3f7e3c 100644 --- a/pipelines/resource_pipeline_test.go +++ b/pipelines/resource_pipeline_test.go @@ -1,23 +1,28 @@ package pipelines import ( - "context" + "errors" "testing" - "github.com/databricks/terraform-provider-databricks/common" "github.com/databricks/terraform-provider-databricks/qa" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/databricks/databricks-sdk-go/apierr" + "github.com/databricks/databricks-sdk-go/experimental/mocks" + "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/stretchr/testify/mock" ) -var basicPipelineSpec = PipelineSpec{ +var createRequest = pipelines.CreatePipeline{ Name: "test-pipeline", Storage: "/test/storage", Configuration: map[string]string{ "key1": "value1", "key2": "value2", }, - Clusters: []pipelineCluster{ + Clusters: []pipelines.PipelineCluster{ { Label: "default", CustomTags: map[string]string{ @@ -25,18 +30,18 @@ var basicPipelineSpec = PipelineSpec{ }, }, }, - Libraries: []PipelineLibrary{ + Libraries: []pipelines.PipelineLibrary{ { - Notebook: &NotebookLibrary{ + Notebook: &pipelines.NotebookLibrary{ Path: "/Test", }, }, }, - Filters: &filters{ + Filters: &pipelines.Filters{ Include: []string{"com.databricks.include"}, Exclude: []string{"com.databricks.exclude"}, }, - Deployment: &PipelineDeployment{ + Deployment: &pipelines.PipelineDeployment{ Kind: "BUNDLE", MetadataFilePath: "/foo/bar", }, @@ -44,94 +49,124 @@ var basicPipelineSpec = PipelineSpec{ Channel: "CURRENT", } -func TestResourcePipelineCreate(t *testing.T) { - d, err := qa.ResourceFixture{ - Fixtures: []qa.HTTPFixture{ - { - Method: "POST", - Resource: "/api/2.0/pipelines", - ExpectedRequest: basicPipelineSpec, - Response: createPipelineResponse{ - PipelineID: "abcd", - }, - }, - { - Method: "GET", - Resource: "/api/2.0/pipelines/abcd", - Response: map[string]any{ - "id": "abcd", - "name": "test-pipeline", - "state": "DEPLOYING", - "spec": basicPipelineSpec, - }, +var updateRequest = pipelines.EditPipeline{ + Id: "abcd", + PipelineId: "abcd", + Name: "test", + Storage: "/test/storage", + Libraries: []pipelines.PipelineLibrary{ + { + Notebook: &pipelines.NotebookLibrary{ + Path: "/Test", }, - { - Method: "GET", - Resource: "/api/2.0/pipelines/abcd", - Response: map[string]any{ - "id": "abcd", - "name": "test-pipeline", - "state": "RUNNING", - "spec": basicPipelineSpec, - }, + }, + }, + Filters: &pipelines.Filters{ + Include: []string{"com.databricks.include"}, + }, + Channel: "CURRENT", + Edition: "ADVANCED", +} + +var basicPipelineSpec = pipelines.PipelineSpec{ + Name: "test-pipeline", + Storage: "/test/storage", + Configuration: map[string]string{ + "key1": "value1", + "key2": "value2", + }, + Clusters: []pipelines.PipelineCluster{ + { + Label: "default", + CustomTags: map[string]string{ + "cluster_tag1": "cluster_value1", }, - { - Method: "GET", - Resource: "/api/2.0/pipelines/abcd", - Response: map[string]any{ - "id": "abcd", - "name": "test-pipeline", - "state": "RUNNING", - "spec": basicPipelineSpec, - }, + }, + }, + Libraries: []pipelines.PipelineLibrary{ + { + Notebook: &pipelines.NotebookLibrary{ + Path: "/Test", }, }, - Create: true, + }, + Filters: &pipelines.Filters{ + Include: []string{"com.databricks.include"}, + Exclude: []string{"com.databricks.exclude"}, + }, + Deployment: &pipelines.PipelineDeployment{ + Kind: "BUNDLE", + MetadataFilePath: "/foo/bar", + }, + Edition: "ADVANCED", + Channel: "CURRENT", +} + +func TestResourcePipelineCreate(t *testing.T) { + qa.ResourceFixture{ + MockWorkspaceClientFunc: func(w *mocks.MockWorkspaceClient) { + e := w.GetMockPipelinesAPI().EXPECT() + e.Create(mock.Anything, createRequest).Return(&pipelines.CreatePipelineResponse{ + PipelineId: "abcd", + }, nil) + e.Get(mock.Anything, pipelines.GetPipelineRequest{ + PipelineId: "abcd", + }).Return(&pipelines.GetPipelineResponse{ + PipelineId: "abcd", + Name: "test-pipeline", + State: pipelines.PipelineStateDeploying, + Spec: &basicPipelineSpec, + }, nil).Once() + e.Get(mock.Anything, pipelines.GetPipelineRequest{ + PipelineId: "abcd", + }).Return(&pipelines.GetPipelineResponse{ + PipelineId: "abcd", + Name: "test-pipeline", + State: pipelines.PipelineStateRunning, + Spec: &basicPipelineSpec, + }, nil).Once() + + }, Resource: ResourcePipeline(), - HCL: `name = "test-pipeline" - storage = "/test/storage" - configuration = { - key1 = "value1" - key2 = "value2" - } - cluster { - label = "default" - custom_tags = { - "cluster_tag1" = "cluster_value1" - } - } - library { - notebook { - path = "/Test" - } - } - filters { - include = ["com.databricks.include"] - exclude = ["com.databricks.exclude"] - } - continuous = false - deployment { - kind = "BUNDLE" - metadata_file_path = "/foo/bar" - } + Create: true, + HCL: ` + name = "test-pipeline" + storage = "/test/storage" + configuration = { + key1 = "value1" + key2 = "value2" + } + cluster { + label = "default" + custom_tags = { + "cluster_tag1" = "cluster_value1" + } + } + library { + notebook { + path = "/Test" + } + } + filters { + include = ["com.databricks.include"] + exclude = ["com.databricks.exclude"] + } + continuous = false + deployment { + kind = "BUNDLE" + metadata_file_path = "/foo/bar" + } `, - }.Apply(t) - assert.NoError(t, err) - assert.Equal(t, "abcd", d.Id()) + }.ApplyAndExpectData(t, map[string]any{ + "id": "abcd", + }) } func TestResourcePipelineCreate_Error(t *testing.T) { - d, err := qa.ResourceFixture{ - Fixtures: []qa.HTTPFixture{ - { - Method: "POST", - Resource: "/api/2.0/pipelines", - Response: common.APIErrorBody{ - ErrorCode: "INVALID_REQUEST", - Message: "Internal error happened", - }, - Status: 400, - }, + qa.ResourceFixture{ + MockWorkspaceClientFunc: func(w *mocks.MockWorkspaceClient) { + e := w.GetMockPipelinesAPI().EXPECT() + e.Create(mock.Anything, mock.Anything).Return(nil, errors.New("Internal error happened")) }, Resource: ResourcePipeline(), HCL: `name = "test" @@ -146,43 +181,29 @@ func TestResourcePipelineCreate_Error(t *testing.T) { } `, Create: true, - }.Apply(t) - qa.AssertErrorStartsWith(t, err, "Internal error happened") - assert.Equal(t, "", d.Id(), "Id should be empty for error creates") + }.ExpectError(t, "Internal error happened") } func TestResourcePipelineCreate_ErrorWhenWaitingFailedCleanup(t *testing.T) { qa.ResourceFixture{ - Fixtures: []qa.HTTPFixture{ - { - Method: "POST", - Resource: "/api/2.0/pipelines", - Response: createPipelineResponse{ - PipelineID: "abcd", - }, - }, - { - Method: "GET", - Resource: "/api/2.0/pipelines/abcd", - Response: map[string]any{ - "id": "abcd", - "name": "test-pipeline", - "state": "FAILED", - }, - }, - { - Method: "DELETE", - Resource: "/api/2.0/pipelines/abcd?", - }, - { - Method: "GET", - Resource: "/api/2.0/pipelines/abcd", - Response: common.APIErrorBody{ - ErrorCode: "INTERNAL_ERROR", - Message: "Internal error", - }, - Status: 500, - }, + MockWorkspaceClientFunc: func(w *mocks.MockWorkspaceClient) { + e := w.GetMockPipelinesAPI().EXPECT() + e.Create(mock.Anything, mock.Anything).Return(&pipelines.CreatePipelineResponse{ + PipelineId: "abcd", + }, nil) + e.Get(mock.Anything, pipelines.GetPipelineRequest{ + PipelineId: "abcd", + }).Return(&pipelines.GetPipelineResponse{ + PipelineId: "abcd", + Name: "test-pipeline", + State: pipelines.PipelineStateFailed, + }, nil).Once() + e.Delete(mock.Anything, pipelines.DeletePipelineRequest{ + PipelineId: "abcd", + }).Return(errors.New("Internal error")) + e.Get(mock.Anything, pipelines.GetPipelineRequest{ + PipelineId: "abcd", + }).Return(nil, errors.New("Internal error")) }, Resource: ResourcePipeline(), HCL: `name = "test" @@ -203,37 +224,28 @@ func TestResourcePipelineCreate_ErrorWhenWaitingFailedCleanup(t *testing.T) { } func TestResourcePipelineCreate_ErrorWhenWaitingSuccessfulCleanup(t *testing.T) { - d, err := qa.ResourceFixture{ - Fixtures: []qa.HTTPFixture{ - { - Method: "POST", - Resource: "/api/2.0/pipelines", - Response: createPipelineResponse{ - PipelineID: "abcd", - }, - }, - { - Method: "GET", - Resource: "/api/2.0/pipelines/abcd", - Response: map[string]any{ - "id": "abcd", - "name": "test-pipeline", - "state": "FAILED", - }, - }, - { - Method: "DELETE", - Resource: "/api/2.0/pipelines/abcd?", - }, - { - Method: "GET", - Resource: "/api/2.0/pipelines/abcd", - Response: common.APIErrorBody{ - ErrorCode: "RESOURCE_DOES_NOT_EXIST", - Message: "No such resource", - }, - Status: 404, - }, + qa.ResourceFixture{ + MockWorkspaceClientFunc: func(w *mocks.MockWorkspaceClient) { + e := w.GetMockPipelinesAPI().EXPECT() + e.Create(mock.Anything, mock.Anything).Return(&pipelines.CreatePipelineResponse{ + PipelineId: "abcd", + }, nil) + + e.Get(mock.Anything, pipelines.GetPipelineRequest{ + PipelineId: "abcd", + }).Return(&pipelines.GetPipelineResponse{ + PipelineId: "abcd", + Name: "test-pipeline", + State: pipelines.PipelineStateFailed, + }, nil).Once() + + e.Delete(mock.Anything, pipelines.DeletePipelineRequest{ + PipelineId: "abcd", + }).Return(nil) + + e.Get(mock.Anything, pipelines.GetPipelineRequest{ + PipelineId: "abcd", + }).Return(nil, apierr.ErrNotFound) }, Resource: ResourcePipeline(), HCL: `name = "test" @@ -248,48 +260,43 @@ func TestResourcePipelineCreate_ErrorWhenWaitingSuccessfulCleanup(t *testing.T) } `, Create: true, - }.Apply(t) - qa.AssertErrorStartsWith(t, err, "pipeline abcd has failed") - assert.Equal(t, "", d.Id(), "Id should be empty for error creates") + }.ExpectError(t, "pipeline abcd has failed") } func TestResourcePipelineRead(t *testing.T) { - d, err := qa.ResourceFixture{ - Fixtures: []qa.HTTPFixture{ - { - Method: "GET", - Resource: "/api/2.0/pipelines/abcd", - Response: PipelineInfo{ - PipelineID: "abcd", - Spec: &basicPipelineSpec, - }, - }, + qa.ResourceFixture{ + MockWorkspaceClientFunc: func(w *mocks.MockWorkspaceClient) { + e := w.GetMockPipelinesAPI().EXPECT() + e.Get(mock.Anything, pipelines.GetPipelineRequest{ + PipelineId: "abcd", + }).Return(&pipelines.GetPipelineResponse{ + PipelineId: "abcd", + Spec: &basicPipelineSpec, + }, nil) }, Resource: ResourcePipeline(), Read: true, New: true, ID: "abcd", - }.Apply(t) - assert.NoError(t, err) - assert.Equal(t, "abcd", d.Id(), "Id should not be empty") - assert.Equal(t, "/test/storage", d.Get("storage")) - assert.Equal(t, "value1", d.Get("configuration.key1")) - assert.Equal(t, "com.databricks.include", d.Get("filters.0.include.0")) - assert.Equal(t, false, d.Get("continuous")) + }.ApplyAndExpectData(t, map[string]any{ + "id": "abcd", + "storage": "/test/storage", + "configuration": map[string]any{ + "key1": "value1", + "key2": "value2", + }, + "filters.0.include.0": "com.databricks.include", + "continuous": false, + }) } func TestResourcePipelineRead_NotFound(t *testing.T) { qa.ResourceFixture{ - Fixtures: []qa.HTTPFixture{ - { - Method: "GET", - Resource: "/api/2.0/pipelines/abcd", - Response: common.APIErrorBody{ - ErrorCode: "NOT_FOUND", - Message: "Item not found", - }, - Status: 404, - }, + MockWorkspaceClientFunc: func(w *mocks.MockWorkspaceClient) { + e := w.GetMockPipelinesAPI().EXPECT() + e.Get(mock.Anything, pipelines.GetPipelineRequest{ + PipelineId: "abcd", + }).Return(nil, apierr.ErrNotFound) }, Resource: ResourcePipeline(), Read: true, @@ -300,16 +307,11 @@ func TestResourcePipelineRead_NotFound(t *testing.T) { func TestResourcePipelineRead_Error(t *testing.T) { d, err := qa.ResourceFixture{ - Fixtures: []qa.HTTPFixture{ - { - Method: "GET", - Resource: "/api/2.0/pipelines/abcd", - Response: common.APIErrorBody{ - ErrorCode: "INVALID_REQUEST", - Message: "Internal error happened", - }, - Status: 400, - }, + MockWorkspaceClientFunc: func(w *mocks.MockWorkspaceClient) { + e := w.GetMockPipelinesAPI().EXPECT() + e.Get(mock.Anything, pipelines.GetPipelineRequest{ + PipelineId: "abcd", + }).Return(nil, errors.New("Internal error happened")) }, Resource: ResourcePipeline(), Read: true, @@ -320,49 +322,42 @@ func TestResourcePipelineRead_Error(t *testing.T) { } func TestResourcePipelineUpdate(t *testing.T) { - state := StateRunning - spec := PipelineSpec{ - ID: "abcd", + state := pipelines.PipelineStateRunning + spec := pipelines.PipelineSpec{ + Id: "abcd", Name: "test", Storage: "/test/storage", - Libraries: []PipelineLibrary{ + Libraries: []pipelines.PipelineLibrary{ { - Notebook: &NotebookLibrary{ + Notebook: &pipelines.NotebookLibrary{ Path: "/Test", }, }, }, - Filters: &filters{ + Filters: &pipelines.Filters{ Include: []string{"com.databricks.include"}, }, Channel: "CURRENT", Edition: "ADVANCED", } - d, err := qa.ResourceFixture{ - Fixtures: []qa.HTTPFixture{ - { - Method: "PUT", - Resource: "/api/2.0/pipelines/abcd", - ExpectedRequest: spec, - }, - { - Method: "GET", - Resource: "/api/2.0/pipelines/abcd", - Response: PipelineInfo{ - PipelineID: "abcd", - Spec: &spec, - State: &state, - }, - }, - { - Method: "GET", - Resource: "/api/2.0/pipelines/abcd", - Response: PipelineInfo{ - PipelineID: "abcd", - Spec: &spec, - State: &state, - }, - }, + qa.ResourceFixture{ + MockWorkspaceClientFunc: func(w *mocks.MockWorkspaceClient) { + e := w.GetMockPipelinesAPI().EXPECT() + e.Update(mock.Anything, updateRequest).Return(nil) + e.Get(mock.Anything, pipelines.GetPipelineRequest{ + PipelineId: "abcd", + }).Return(&pipelines.GetPipelineResponse{ + PipelineId: "abcd", + Spec: &spec, + State: state, + }, nil).Once() + e.Get(mock.Anything, pipelines.GetPipelineRequest{ + PipelineId: "abcd", + }).Return(&pipelines.GetPipelineResponse{ + PipelineId: "abcd", + Spec: &spec, + State: state, + }, nil) }, Resource: ResourcePipeline(), HCL: `name = "test" @@ -381,23 +376,16 @@ func TestResourcePipelineUpdate(t *testing.T) { }, Update: true, ID: "abcd", - }.Apply(t) - assert.NoError(t, err) - assert.Equal(t, "abcd", d.Id(), "Id should be the same as in reading") + }.ApplyAndExpectData(t, map[string]any{ + "id": "abcd", + }) } func TestResourcePipelineUpdate_Error(t *testing.T) { - d, err := qa.ResourceFixture{ - Fixtures: []qa.HTTPFixture{ - { // read log output for better stub url... - Method: "PUT", - Resource: "/api/2.0/pipelines/abcd", - Response: common.APIErrorBody{ - ErrorCode: "INVALID_REQUEST", - Message: "Internal error happened", - }, - Status: 400, - }, + qa.ResourceFixture{ + MockWorkspaceClientFunc: func(w *mocks.MockWorkspaceClient) { + e := w.GetMockPipelinesAPI().EXPECT() + e.Update(mock.Anything, mock.Anything).Return(errors.New("Internal error happened")) }, Resource: ResourcePipeline(), HCL: `name = "test" @@ -416,46 +404,39 @@ func TestResourcePipelineUpdate_Error(t *testing.T) { "storage": "/test/storage", }, ID: "abcd", - }.Apply(t) - qa.AssertErrorStartsWith(t, err, "Internal error happened") - assert.Equal(t, "abcd", d.Id()) + }.ExpectError(t, "Internal error happened") } func TestResourcePipelineUpdate_FailsAfterUpdate(t *testing.T) { - state := StateFailed - spec := PipelineSpec{ - ID: "abcd", + state := pipelines.PipelineStateFailed + spec := pipelines.PipelineSpec{ + Id: "abcd", Name: "test", Storage: "/test/storage", - Libraries: []PipelineLibrary{ + Libraries: []pipelines.PipelineLibrary{ { - Notebook: &NotebookLibrary{ + Notebook: &pipelines.NotebookLibrary{ Path: "/Test", }, }, }, - Filters: &filters{ + Filters: &pipelines.Filters{ Include: []string{"com.databricks.include"}, }, Channel: "CURRENT", Edition: "ADVANCED", } d, err := qa.ResourceFixture{ - Fixtures: []qa.HTTPFixture{ - { - Method: "PUT", - Resource: "/api/2.0/pipelines/abcd", - ExpectedRequest: spec, - }, - { - Method: "GET", - Resource: "/api/2.0/pipelines/abcd", - Response: PipelineInfo{ - PipelineID: "abcd", - Spec: &spec, - State: &state, - }, - }, + MockWorkspaceClientFunc: func(w *mocks.MockWorkspaceClient) { + e := w.GetMockPipelinesAPI().EXPECT() + e.Update(mock.Anything, updateRequest).Return(nil) + e.Get(mock.Anything, pipelines.GetPipelineRequest{ + PipelineId: "abcd", + }).Return(&pipelines.GetPipelineResponse{ + PipelineId: "abcd", + Spec: &spec, + State: state, + }, nil) }, Resource: ResourcePipeline(), HCL: `name = "test" @@ -480,52 +461,37 @@ func TestResourcePipelineUpdate_FailsAfterUpdate(t *testing.T) { } func TestResourcePipelineDelete(t *testing.T) { - state := StateRunning - d, err := qa.ResourceFixture{ - Fixtures: []qa.HTTPFixture{ - { - Method: "DELETE", - Resource: "/api/2.0/pipelines/abcd?", - }, - { - Method: "GET", - Resource: "/api/2.0/pipelines/abcd", - Response: PipelineInfo{ - PipelineID: "abcd", - Spec: &basicPipelineSpec, - State: &state, - }, - }, - { - Method: "GET", - Resource: "/api/2.0/pipelines/abcd", - Response: common.APIErrorBody{ - ErrorCode: "RESOURCE_DOES_NOT_EXIST", - Message: "No such resource", - }, - Status: 404, - }, + state := pipelines.PipelineStateRunning + qa.ResourceFixture{ + MockWorkspaceClientFunc: func(w *mocks.MockWorkspaceClient) { + e := w.GetMockPipelinesAPI().EXPECT() + e.Delete(mock.Anything, pipelines.DeletePipelineRequest{ + PipelineId: "abcd", + }).Return(nil) + e.Get(mock.Anything, pipelines.GetPipelineRequest{ + PipelineId: "abcd", + }).Return(&pipelines.GetPipelineResponse{ + PipelineId: "abcd", + Spec: &basicPipelineSpec, + State: state, + }, nil).Once() + e.Get(mock.Anything, pipelines.GetPipelineRequest{ + PipelineId: "abcd", + }).Return(nil, apierr.ErrNotFound) }, Resource: ResourcePipeline(), Delete: true, ID: "abcd", - }.Apply(t) - assert.NoError(t, err) - assert.Equal(t, "abcd", d.Id()) + }.ApplyAndExpectData(t, map[string]any{ + "id": "abcd", + }) } func TestResourcePipelineDelete_Error(t *testing.T) { d, err := qa.ResourceFixture{ - Fixtures: []qa.HTTPFixture{ - { - Method: "DELETE", - Resource: "/api/2.0/pipelines/abcd?", - Response: common.APIErrorBody{ - ErrorCode: "INVALID_REQUEST", - Message: "Internal error happened", - }, - Status: 500, - }, + MockWorkspaceClientFunc: func(w *mocks.MockWorkspaceClient) { + e := w.GetMockPipelinesAPI().EXPECT() + e.Delete(mock.Anything, mock.Anything).Return(errors.New("Internal error happened")) }, Resource: ResourcePipeline(), Delete: true, @@ -535,73 +501,6 @@ func TestResourcePipelineDelete_Error(t *testing.T) { assert.Equal(t, "abcd", d.Id()) } -func TestListPipelines(t *testing.T) { - client, server, err := qa.HttpFixtureClient(t, []qa.HTTPFixture{ - { - Method: "GET", - Resource: "/api/2.0/pipelines?max_results=1", - Response: PipelineListResponse{ - Statuses: []PipelineStateInfo{ - { - PipelineID: "123", - Name: "Pipeline1", - CreatorUserName: "user1", - }, - }, - NextPageToken: "token1", - }, - }, - { - Method: "GET", - Resource: "/api/2.0/pipelines?max_results=1&page_token=token1", - Response: PipelineListResponse{ - Statuses: []PipelineStateInfo{ - { - PipelineID: "456", - Name: "Pipeline2", - CreatorUserName: "user2", - }, - }, - PrevPageToken: "token0", - }, - }, - }) - defer server.Close() - require.NoError(t, err) - - ctx := context.Background() - data, err := NewPipelinesAPI(ctx, client).List(1, "") - require.NoError(t, err) - require.Equal(t, 2, len(data)) - require.Equal(t, "Pipeline1", data[0].Name) - require.Equal(t, "456", data[1].PipelineID) -} - -func TestListPipelinesWithFilter(t *testing.T) { - client, server, err := qa.HttpFixtureClient(t, []qa.HTTPFixture{ - { - Method: "GET", - Resource: "/api/2.0/pipelines?filter=name%20LIKE%20%27Pipeline1%27&max_results=1", - Response: PipelineListResponse{ - Statuses: []PipelineStateInfo{ - { - PipelineID: "123", - Name: "Pipeline1", - CreatorUserName: "user1", - }, - }, - }, - }, - }) - defer server.Close() - require.NoError(t, err) - - ctx := context.Background() - data, err := NewPipelinesAPI(ctx, client).List(1, "name LIKE 'Pipeline1'") - require.NoError(t, err) - require.Equal(t, 1, len(data)) -} - func TestStorageSuppressDiff(t *testing.T) { k := "storage" generated := "dbfs:/pipelines/c609bbb0-2e42-4bc8-bb4e-a1c26d6e9403" @@ -611,14 +510,14 @@ func TestStorageSuppressDiff(t *testing.T) { } func TestResourcePipelineCreateServerless(t *testing.T) { - var serverlessPipelineSpec = PipelineSpec{ + var serverlessPipelineSpec = pipelines.PipelineSpec{ Name: "test-pipeline-serverless", Storage: "/test/storage", Configuration: map[string]string{ "key1": "value1", "key2": "value2", }, - Clusters: []pipelineCluster{ + Clusters: []pipelines.PipelineCluster{ { Label: "default", CustomTags: map[string]string{ @@ -626,58 +525,49 @@ func TestResourcePipelineCreateServerless(t *testing.T) { }, }, }, - Libraries: []PipelineLibrary{ + Libraries: []pipelines.PipelineLibrary{ { - Notebook: &NotebookLibrary{ + Notebook: &pipelines.NotebookLibrary{ Path: "/TestServerless", }, }, }, - Filters: &filters{ + Filters: &pipelines.Filters{ Include: []string{"com.databricks.include"}, Exclude: []string{"com.databricks.exclude"}, }, Serverless: true, } - d, err := qa.ResourceFixture{ - Fixtures: []qa.HTTPFixture{ - { - Method: "POST", - Resource: "/api/2.0/pipelines", - Response: createPipelineResponse{ - PipelineID: "serverless", - }, - }, - { - Method: "GET", - Resource: "/api/2.0/pipelines/serverless", - Response: map[string]any{ - "id": "serverless", - "name": "test-pipeline-serverless", - "state": "DEPLOYING", - "spec": serverlessPipelineSpec, - }, - }, - { - Method: "GET", - Resource: "/api/2.0/pipelines/serverless", - Response: map[string]any{ - "id": "serverless", - "name": "test-pipeline-serverless", - "state": "RUNNING", - "spec": serverlessPipelineSpec, - }, - }, - { - Method: "GET", - Resource: "/api/2.0/pipelines/serverless", - Response: map[string]any{ - "id": "serverless", - "name": "test-pipeline-serverless", - "state": "RUNNING", - "spec": serverlessPipelineSpec, - }, - }, + qa.ResourceFixture{ + MockWorkspaceClientFunc: func(w *mocks.MockWorkspaceClient) { + e := w.GetMockPipelinesAPI().EXPECT() + e.Create(mock.Anything, mock.Anything).Return(&pipelines.CreatePipelineResponse{ + PipelineId: "serverless", + }, nil) + e.Get(mock.Anything, pipelines.GetPipelineRequest{ + PipelineId: "serverless", + }).Return(&pipelines.GetPipelineResponse{ + PipelineId: "serverless", + Name: "test-pipeline-serverless", + State: pipelines.PipelineStateDeploying, + Spec: &serverlessPipelineSpec, + }, nil).Once() + e.Get(mock.Anything, pipelines.GetPipelineRequest{ + PipelineId: "serverless", + }).Return(&pipelines.GetPipelineResponse{ + PipelineId: "serverless", + Name: "test-pipeline-serverless", + State: pipelines.PipelineStateRunning, + Spec: &serverlessPipelineSpec, + }, nil).Once() + e.Get(mock.Anything, pipelines.GetPipelineRequest{ + PipelineId: "serverless", + }).Return(&pipelines.GetPipelineResponse{ + PipelineId: "serverless", + Name: "test-pipeline-serverless", + State: pipelines.PipelineStateRunning, + Spec: &serverlessPipelineSpec, + }, nil) }, Create: true, Resource: ResourcePipeline(), @@ -705,56 +595,48 @@ func TestResourcePipelineCreateServerless(t *testing.T) { continuous = false serverless = true `, - }.Apply(t) - assert.NoError(t, err) - assert.Equal(t, "serverless", d.Id()) + }.ApplyAndExpectData(t, map[string]any{ + "id": "serverless", + }) } func TestZeroWorkers(t *testing.T) { - d, err := qa.ResourceFixture{ - Fixtures: []qa.HTTPFixture{ - { - Method: "POST", - Resource: "/api/2.0/pipelines", - ExpectedRequest: PipelineSpec{ - Name: "test-pipeline", - Channel: "CURRENT", - Edition: "ADVANCED", - Clusters: []pipelineCluster{ - { - Label: "default", - NumWorkers: 0, - SparkConf: map[string]string{ - "spark.databricks.cluster.profile": "singleNode", - }, - ForceSendFields: []string{"NumWorkers"}, + qa.ResourceFixture{ + MockWorkspaceClientFunc: func(w *mocks.MockWorkspaceClient) { + e := w.GetMockPipelinesAPI().EXPECT() + e.Create(mock.Anything, pipelines.CreatePipeline{ + Name: "test-pipeline", + Channel: "CURRENT", + Edition: "ADVANCED", + Clusters: []pipelines.PipelineCluster{ + { + Label: "default", + NumWorkers: 0, + SparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", }, + ForceSendFields: []string{"NumWorkers"}, }, }, - Response: createPipelineResponse{ - PipelineID: "abcd", - }, - }, - { - Method: "GET", - Resource: "/api/2.0/pipelines/abcd", - Response: map[string]any{ - "id": "abcd", - "name": "test-pipeline", - "state": "RUNNING", - "spec": basicPipelineSpec, - }, - }, - { - Method: "GET", - Resource: "/api/2.0/pipelines/abcd", - Response: map[string]any{ - "id": "abcd", - "name": "test-pipeline", - "state": "RUNNING", - "spec": basicPipelineSpec, - }, - }, + }).Return(&pipelines.CreatePipelineResponse{ + PipelineId: "abcd", + }, nil) + e.Get(mock.Anything, pipelines.GetPipelineRequest{ + PipelineId: "abcd", + }).Return(&pipelines.GetPipelineResponse{ + PipelineId: "abcd", + Name: "test-pipeline", + State: pipelines.PipelineStateRunning, + Spec: &basicPipelineSpec, + }, nil).Once() + e.Get(mock.Anything, pipelines.GetPipelineRequest{ + PipelineId: "abcd", + }).Return(&pipelines.GetPipelineResponse{ + PipelineId: "abcd", + Name: "test-pipeline", + State: pipelines.PipelineStateRunning, + Spec: &basicPipelineSpec, + }, nil).Once() }, Create: true, Resource: ResourcePipeline(), @@ -763,60 +645,51 @@ func TestZeroWorkers(t *testing.T) { label = "default" num_workers = 0 spark_conf = { - spark.databricks.cluster.profile = "singleNode" + spark.databricks.cluster.profile = "singleNode" } } `, - }.Apply(t) - assert.NoError(t, err) - assert.Equal(t, "abcd", d.Id()) + }.ApplyAndExpectData(t, map[string]any{ + "id": "abcd", + }) } func TestAutoscaling(t *testing.T) { - d, err := qa.ResourceFixture{ - Fixtures: []qa.HTTPFixture{ - { - Method: "POST", - Resource: "/api/2.0/pipelines", - ExpectedRequest: PipelineSpec{ - Name: "test-pipeline", - Channel: "CURRENT", - Edition: "ADVANCED", - Clusters: []pipelineCluster{ - { - Label: "default", - - Autoscale: &dltAutoScale{ - MinWorkers: 2, - MaxWorkers: 10, - }, + qa.ResourceFixture{ + MockWorkspaceClientFunc: func(w *mocks.MockWorkspaceClient) { + e := w.GetMockPipelinesAPI().EXPECT() + e.Create(mock.Anything, pipelines.CreatePipeline{ + Name: "test-pipeline", + Channel: "CURRENT", + Edition: "ADVANCED", + Clusters: []pipelines.PipelineCluster{ + { + Label: "default", + Autoscale: &pipelines.PipelineClusterAutoscale{ + MinWorkers: 2, + MaxWorkers: 10, }, }, }, - Response: createPipelineResponse{ - PipelineID: "abcd", - }, - }, - { - Method: "GET", - Resource: "/api/2.0/pipelines/abcd", - Response: map[string]any{ - "id": "abcd", - "name": "test-pipeline", - "state": "RUNNING", - "spec": basicPipelineSpec, - }, - }, - { - Method: "GET", - Resource: "/api/2.0/pipelines/abcd", - Response: map[string]any{ - "id": "abcd", - "name": "test-pipeline", - "state": "RUNNING", - "spec": basicPipelineSpec, - }, - }, + }).Return(&pipelines.CreatePipelineResponse{ + PipelineId: "abcd", + }, nil) + e.Get(mock.Anything, pipelines.GetPipelineRequest{ + PipelineId: "abcd", + }).Return(&pipelines.GetPipelineResponse{ + PipelineId: "abcd", + Name: "test-pipeline", + State: pipelines.PipelineStateRunning, + Spec: &basicPipelineSpec, + }, nil).Once() + e.Get(mock.Anything, pipelines.GetPipelineRequest{ + PipelineId: "abcd", + }).Return(&pipelines.GetPipelineResponse{ + PipelineId: "abcd", + Name: "test-pipeline", + State: pipelines.PipelineStateRunning, + Spec: &basicPipelineSpec, + }, nil).Once() }, Create: true, Resource: ResourcePipeline(), @@ -829,51 +702,43 @@ func TestAutoscaling(t *testing.T) { } } `, - }.Apply(t) - assert.NoError(t, err) - assert.Equal(t, "abcd", d.Id()) + }.ApplyAndExpectData(t, map[string]any{ + "id": "abcd", + }) } func TestDefault(t *testing.T) { d, err := qa.ResourceFixture{ - Fixtures: []qa.HTTPFixture{ - { - Method: "POST", - Resource: "/api/2.0/pipelines", - ExpectedRequest: PipelineSpec{ - Name: "test-pipeline", - Channel: "CURRENT", - Edition: "ADVANCED", - Clusters: []pipelineCluster{ - { - Label: "default", - }, + MockWorkspaceClientFunc: func(w *mocks.MockWorkspaceClient) { + e := w.GetMockPipelinesAPI().EXPECT() + e.Create(mock.Anything, pipelines.CreatePipeline{ + Name: "test-pipeline", + Channel: "CURRENT", + Edition: "ADVANCED", + Clusters: []pipelines.PipelineCluster{ + { + Label: "default", }, }, - Response: createPipelineResponse{ - PipelineID: "abcd", - }, - }, - { - Method: "GET", - Resource: "/api/2.0/pipelines/abcd", - Response: map[string]any{ - "id": "abcd", - "name": "test-pipeline", - "state": "RUNNING", - "spec": basicPipelineSpec, - }, - }, - { - Method: "GET", - Resource: "/api/2.0/pipelines/abcd", - Response: map[string]any{ - "id": "abcd", - "name": "test-pipeline", - "state": "RUNNING", - "spec": basicPipelineSpec, - }, - }, + }).Return(&pipelines.CreatePipelineResponse{ + PipelineId: "abcd", + }, nil) + e.Get(mock.Anything, pipelines.GetPipelineRequest{ + PipelineId: "abcd", + }).Return(&pipelines.GetPipelineResponse{ + PipelineId: "abcd", + Name: "test-pipeline", + State: pipelines.PipelineStateRunning, + Spec: &basicPipelineSpec, + }, nil).Once() + e.Get(mock.Anything, pipelines.GetPipelineRequest{ + PipelineId: "abcd", + }).Return(&pipelines.GetPipelineResponse{ + PipelineId: "abcd", + Name: "test-pipeline", + State: pipelines.PipelineStateRunning, + Spec: &basicPipelineSpec, + }, nil).Once() }, Create: true, Resource: ResourcePipeline(),