Skip to content

Commit

Permalink
comments
Browse files Browse the repository at this point in the history
  • Loading branch information
tanmay-db committed Aug 28, 2024
1 parent c5bbe00 commit 4a61953
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 93 deletions.
9 changes: 9 additions & 0 deletions common/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/databricks/databricks-sdk-go/config"
"github.com/databricks/databricks-sdk-go/service/iam"
"github.com/golang-jwt/jwt/v4"
"github.com/hashicorp/terraform-plugin-framework/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
)

Expand Down Expand Up @@ -56,6 +57,14 @@ type DatabricksClient struct {
mu sync.Mutex
}

func (c *DatabricksClient) GetWorkspaceClient() (*databricks.WorkspaceClient, diag.Diagnostics) {
w, err := c.WorkspaceClient()
if err != nil {
return nil, diag.Diagnostics{diag.NewErrorDiagnostic("Failed to get workspace client", err.Error())}
}
return w, nil
}

func (c *DatabricksClient) WorkspaceClient() (*databricks.WorkspaceClient, error) {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down
33 changes: 33 additions & 0 deletions internal/providers/pluginfw/common/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package common

import (
"fmt"

"github.com/databricks/terraform-provider-databricks/common"
"github.com/hashicorp/terraform-plugin-framework/datasource"
"github.com/hashicorp/terraform-plugin-framework/resource"
)

func ConfigureDataSource(req datasource.ConfigureRequest, resp *datasource.ConfigureResponse) *common.DatabricksClient {
client, ok := req.ProviderData.(*common.DatabricksClient)
if !ok {
resp.Diagnostics.AddError(
"Unexpected Data Source Configure Type",
fmt.Sprintf("Expected *common.DatabricksClient, got: %T. Please report this issue to the provider developers.", req.ProviderData),
)
return nil
}
return client
}

func ConfigureResource(req resource.ConfigureRequest, resp *resource.ConfigureResponse) *common.DatabricksClient {
client, ok := req.ProviderData.(*common.DatabricksClient)
if !ok {
resp.Diagnostics.AddError(
"Unexpected Resource Configure Type",
fmt.Sprintf("Expected *common.DatabricksClient, got: %T. Please report this issue to the provider developers.", req.ProviderData),
)
return nil
}
return client
}
10 changes: 8 additions & 2 deletions internal/providers/pluginfw/pluginfw.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/databricks/terraform-provider-databricks/commands"
"github.com/databricks/terraform-provider-databricks/common"
providercommon "github.com/databricks/terraform-provider-databricks/internal/providers/common"
"github.com/databricks/terraform-provider-databricks/internal/providers/pluginfw/resources/qualitymonitor"
"github.com/databricks/terraform-provider-databricks/internal/providers/pluginfw/resources/volume"

"github.com/hashicorp/terraform-plugin-framework/datasource"
"github.com/hashicorp/terraform-plugin-framework/diag"
Expand All @@ -38,11 +40,15 @@ type DatabricksProviderPluginFramework struct {
var _ provider.Provider = (*DatabricksProviderPluginFramework)(nil)

func (p *DatabricksProviderPluginFramework) Resources(ctx context.Context) []func() resource.Resource {
return []func() resource.Resource{}
return []func() resource.Resource{
qualitymonitor.ResourceQualityMonitor,
}
}

func (p *DatabricksProviderPluginFramework) DataSources(ctx context.Context) []func() datasource.DataSource {
return []func() datasource.DataSource{}
return []func() datasource.DataSource{
volume.DataSourceVolumes,
}
}

func (p *DatabricksProviderPluginFramework) Schema(ctx context.Context, req provider.SchemaRequest, resp *provider.SchemaResponse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/databricks/terraform-provider-databricks/common"
pluginfwcommon "github.com/databricks/terraform-provider-databricks/internal/providers/pluginfw/common"
"github.com/databricks/terraform-provider-databricks/internal/providers/pluginfw/converters"
"github.com/databricks/terraform-provider-databricks/internal/providers/pluginfw/tfschema"
"github.com/databricks/terraform-provider-databricks/internal/service/catalog_tf"
"github.com/hashicorp/terraform-plugin-framework/diag"
"github.com/hashicorp/terraform-plugin-framework/path"
"github.com/hashicorp/terraform-plugin-framework/resource"
"github.com/hashicorp/terraform-plugin-framework/resource/schema"
Expand All @@ -20,29 +22,32 @@ import (

const qualityMonitorDefaultProvisionTimeout = 15 * time.Minute

func WaitForMonitor(w *databricks.WorkspaceClient, ctx context.Context, monitorName string) error {
return retry.RetryContext(ctx, qualityMonitorDefaultProvisionTimeout, func() *retry.RetryError {
endpoint, err := w.QualityMonitors.GetByTableName(ctx, monitorName)
var _ resource.Resource = &QualityMonitorResource{}

func ResourceQualityMonitor() resource.Resource {
return &QualityMonitorResource{}
}

func waitForMonitor(ctx context.Context, w *databricks.WorkspaceClient, monitor *catalog.MonitorInfo) diag.Diagnostics {
monitorName := monitor.TableName
err := retry.RetryContext(ctx, qualityMonitorDefaultProvisionTimeout, func() *retry.RetryError {
monitor, err := w.QualityMonitors.GetByTableName(ctx, monitorName)
if err != nil {
return retry.NonRetryableError(err)
}

switch endpoint.Status {
switch monitor.Status {
case catalog.MonitorInfoStatusMonitorStatusActive:
return nil
case catalog.MonitorInfoStatusMonitorStatusError, catalog.MonitorInfoStatusMonitorStatusFailed:
return retry.NonRetryableError(fmt.Errorf("monitor status retrund %s for monitor: %s", endpoint.Status, monitorName))
return retry.NonRetryableError(fmt.Errorf("monitor status retrund %s for monitor: %s", monitor.Status, monitorName))
}
return retry.RetryableError(fmt.Errorf("monitor %s is still pending", monitorName))
})
}

var _ resource.Resource = &QualityMonitorResource{}

func ResourceQualityMonitor() func() resource.Resource {
return func() resource.Resource {
return &QualityMonitorResource{}
if err != nil {
return diag.Diagnostics{diag.NewErrorDiagnostic("Failed to get Monitor", err.Error())}
}
return nil
}

type MonitorInfoExtended struct {
Expand All @@ -61,7 +66,7 @@ func (r *QualityMonitorResource) Metadata(ctx context.Context, req resource.Meta

func (r *QualityMonitorResource) Schema(ctx context.Context, req resource.SchemaRequest, resp *resource.SchemaResponse) {
resp.Schema = schema.Schema{
Description: "Terraform schema for Databricks Lakehouse Monitor. MonitorInfo struct is used to create the schema",
Description: "Terraform schema for Databricks Quality Monitor",
Attributes: tfschema.ResourceStructToSchemaMap(MonitorInfoExtended{}, func(c tfschema.CustomizableSchema) tfschema.CustomizableSchema {
c.SetRequired("assets_dir")
c.SetRequired("output_schema_name")
Expand All @@ -78,28 +83,16 @@ func (r *QualityMonitorResource) Schema(ctx context.Context, req resource.Schema
}

func (d *QualityMonitorResource) Configure(ctx context.Context, req resource.ConfigureRequest, resp *resource.ConfigureResponse) {
if req.ProviderData == nil {
return
}
client, ok := req.ProviderData.(*common.DatabricksClient)
if !ok {
resp.Diagnostics.AddError(
"Unexpected Data Source Configure Type",
fmt.Sprintf("Expected *common.DatabricksClient, got: %T. Please report this issue to the provider developers.", req.ProviderData),
)
return
}
d.Client = client
d.Client = pluginfwcommon.ConfigureResource(req, resp)
}

func (r *QualityMonitorResource) Create(ctx context.Context, req resource.CreateRequest, resp *resource.CreateResponse) {
w, err := r.Client.WorkspaceClient()
if err != nil {
resp.Diagnostics.AddError("Failed to get workspace client", err.Error())
w, diags := r.Client.GetWorkspaceClient()
if diags.HasError() {
return
}
var monitorInfoTfSDK MonitorInfoExtended
diags := req.Plan.Get(ctx, &monitorInfoTfSDK)
diags = req.Plan.Get(ctx, &monitorInfoTfSDK)
resp.Diagnostics.Append(diags...)
if resp.Diagnostics.HasError() {
return
Expand All @@ -110,40 +103,33 @@ func (r *QualityMonitorResource) Create(ctx context.Context, req resource.Create
if diags.HasError() {
return
}
endpoint, err := w.QualityMonitors.Create(ctx, createMonitorGoSDK)
monitor, err := w.QualityMonitors.Create(ctx, createMonitorGoSDK)
if err != nil {
resp.Diagnostics.AddError("Failed to get created monitor", err.Error())
return
}
err = WaitForMonitor(w, ctx, endpoint.TableName)
if err != nil {
resp.Diagnostics.AddError("Failed to wait for newly created monitor", err.Error())
return
}

new_endpoint, err := w.QualityMonitors.GetByTableName(ctx, createMonitorGoSDK.TableName)
if err != nil {
resp.Diagnostics.AddError("Failed to get newly created monitor", err.Error())
resp.Diagnostics.Append(waitForMonitor(ctx, w, monitor)...)
if resp.Diagnostics.HasError() {
return
}

var newMonitorInfoTfSDK MonitorInfoExtended
diags = converters.GoSdkToTfSdkStruct(ctx, new_endpoint, &newMonitorInfoTfSDK)
if diags.HasError() {
resp.Diagnostics.Append(converters.GoSdkToTfSdkStruct(ctx, monitor, &newMonitorInfoTfSDK)...)
if resp.Diagnostics.HasError() {
return
}

resp.Diagnostics.Append(resp.State.Set(ctx, newMonitorInfoTfSDK)...)
}

func (r *QualityMonitorResource) Read(ctx context.Context, req resource.ReadRequest, resp *resource.ReadResponse) {
w, err := r.Client.WorkspaceClient()
if err != nil {
resp.Diagnostics.AddError("Failed to get workspace client", err.Error())
w, diags := r.Client.GetWorkspaceClient()
if diags.HasError() {
return
}

var getMonitor catalog_tf.GetQualityMonitorRequest
diags := req.State.GetAttribute(ctx, path.Root("table_name"), &getMonitor.TableName)
diags = req.State.GetAttribute(ctx, path.Root("table_name"), &getMonitor.TableName)
resp.Diagnostics.Append(diags...)
if resp.Diagnostics.HasError() {
return
Expand All @@ -166,13 +152,13 @@ func (r *QualityMonitorResource) Read(ctx context.Context, req resource.ReadRequ
}

func (r *QualityMonitorResource) Update(ctx context.Context, req resource.UpdateRequest, resp *resource.UpdateResponse) {
w, err := r.Client.WorkspaceClient()
if err != nil {
resp.Diagnostics.AddError("Failed to get workspace client", err.Error())
w, diags := r.Client.GetWorkspaceClient()
if diags.HasError() {
return
}

var monitorInfoTfSDK MonitorInfoExtended
diags := req.Plan.Get(ctx, &monitorInfoTfSDK)
diags = req.Plan.Get(ctx, &monitorInfoTfSDK)
resp.Diagnostics.Append(diags...)
if resp.Diagnostics.HasError() {
return
Expand All @@ -189,26 +175,18 @@ func (r *QualityMonitorResource) Update(ctx context.Context, req resource.Update
if diags.HasError() {
return
}
_, err = w.QualityMonitors.Update(ctx, updateMonitorGoSDK)
monitor, err := w.QualityMonitors.Update(ctx, updateMonitorGoSDK)
if err != nil {
resp.Diagnostics.AddError("Failed to update monitor", err.Error())
return
}
err = WaitForMonitor(w, ctx, updateMonitorGoSDK.TableName)
if err != nil {
resp.Diagnostics.AddError("Failed to wait for updated monitor", err.Error())
return
}

// Get the created monitor.
new_endpoint, err := w.QualityMonitors.GetByTableName(ctx, updateMonitorGoSDK.TableName)
if err != nil {
resp.Diagnostics.AddError("Failed to get newly created monitor", err.Error())
resp.Diagnostics.Append(waitForMonitor(ctx, w, monitor)...)
if resp.Diagnostics.HasError() {
return
}

var newMonitorInfoTfSDK MonitorInfoExtended
diags = converters.GoSdkToTfSdkStruct(ctx, new_endpoint, &newMonitorInfoTfSDK)
diags = converters.GoSdkToTfSdkStruct(ctx, monitor, &newMonitorInfoTfSDK)
if diags.HasError() {
return
}
Expand All @@ -217,18 +195,18 @@ func (r *QualityMonitorResource) Update(ctx context.Context, req resource.Update
}

func (r *QualityMonitorResource) Delete(ctx context.Context, req resource.DeleteRequest, resp *resource.DeleteResponse) {
w, err := r.Client.WorkspaceClient()
if err != nil {
resp.Diagnostics.AddError("Failed to get workspace client", err.Error())
w, diags := r.Client.GetWorkspaceClient()
if diags.HasError() {
return
}

var deleteRequest catalog_tf.DeleteQualityMonitorRequest
diags := req.State.GetAttribute(ctx, path.Root("table_name"), &deleteRequest.TableName)
diags = req.State.GetAttribute(ctx, path.Root("table_name"), &deleteRequest.TableName)
resp.Diagnostics.Append(diags...)
if resp.Diagnostics.HasError() {
return
}
err = w.QualityMonitors.DeleteByTableName(ctx, deleteRequest.TableName.ValueString())
err := w.QualityMonitors.DeleteByTableName(ctx, deleteRequest.TableName.ValueString())
if err != nil {
resp.Diagnostics.AddError("Failed to delete monitor", err.Error())
return
Expand Down
35 changes: 10 additions & 25 deletions internal/providers/pluginfw/resources/volume/data_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,19 @@ package volume

import (
"context"
"fmt"

"github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/databricks/terraform-provider-databricks/common"
pluginfwcommon "github.com/databricks/terraform-provider-databricks/internal/providers/pluginfw/common"
"github.com/databricks/terraform-provider-databricks/internal/providers/pluginfw/converters"
"github.com/databricks/terraform-provider-databricks/internal/providers/pluginfw/tfschema"
"github.com/hashicorp/terraform-plugin-framework/datasource"
"github.com/hashicorp/terraform-plugin-framework/datasource/schema"
"github.com/hashicorp/terraform-plugin-framework/types"
)

func DataSourceVolumes() func() datasource.DataSource {
return func() datasource.DataSource {
return &VolumesDataSource{}
}
func DataSourceVolumes() datasource.DataSource {
return &VolumesDataSource{}
}

var _ datasource.DataSource = &VolumesDataSource{}
Expand All @@ -36,25 +35,12 @@ func (d *VolumesDataSource) Metadata(ctx context.Context, req datasource.Metadat

func (d *VolumesDataSource) Schema(ctx context.Context, req datasource.SchemaRequest, resp *datasource.SchemaResponse) {
resp.Schema = schema.Schema{
Attributes: tfschema.DataSourceStructToSchemaMap(VolumesList{}, func(c tfschema.CustomizableSchema) tfschema.CustomizableSchema {
return c
}),
Attributes: tfschema.DataSourceStructToSchemaMap(VolumesList{}, nil),
}
}

func (d *VolumesDataSource) Configure(ctx context.Context, req datasource.ConfigureRequest, resp *datasource.ConfigureResponse) {
if req.ProviderData == nil {
return
}
client, ok := req.ProviderData.(*common.DatabricksClient)
if !ok {
resp.Diagnostics.AddError(
"Unexpected Data Source Configure Type",
fmt.Sprintf("Expected *common.DatabricksClient, got: %T. Please report this issue to the provider developers.", req.ProviderData),
)
return
}
d.Client = client
func (d *VolumesDataSource) Configure(_ context.Context, req datasource.ConfigureRequest, resp *datasource.ConfigureResponse) {
d.Client = pluginfwcommon.ConfigureDataSource(req, resp)
}

func (d *VolumesDataSource) Read(ctx context.Context, req datasource.ReadRequest, resp *datasource.ReadResponse) {
Expand All @@ -70,10 +56,9 @@ func (d *VolumesDataSource) Read(ctx context.Context, req datasource.ReadRequest
if resp.Diagnostics.HasError() {
return
}
volumes, err := w.Volumes.ListAll(ctx, catalog.ListVolumesRequest{
CatalogName: volumesList.CatalogName.ValueString(),
SchemaName: volumesList.SchemaName.ValueString(),
})
var listVolumesRequest catalog.ListVolumesRequest
converters.TfSdkToGoSdkStruct(ctx, volumesList, &listVolumesRequest)
volumes, err := w.Volumes.ListAll(ctx, listVolumesRequest)
if err != nil {
resp.Diagnostics.AddError("Failed to get volumes for the catalog and schema", err.Error())
return
Expand Down

0 comments on commit 4a61953

Please sign in to comment.