From 666de9e4e69ed8adb1a135dad895207e8281b797 Mon Sep 17 00:00:00 2001 From: sid-acryl <155424659+sid-acryl@users.noreply.github.com> Date: Wed, 22 May 2024 18:57:45 +0530 Subject: [PATCH] fix(ingestion/powerbi): Databricks support for table lineage (#10416) --- .../docs/sources/powerbi/powerbi_pre.md | 1 + .../ingestion/source/powerbi/config.py | 19 +++- .../source/powerbi/m_query/parser.py | 1 + .../source/powerbi/m_query/resolver.py | 100 +++++++++++++++--- .../powerbi/powerbi-lexical-grammar.rule | 5 +- .../integration/powerbi/test_m_parser.py | 89 +++++++++++++++- 6 files changed, 197 insertions(+), 18 deletions(-) diff --git a/metadata-ingestion/docs/sources/powerbi/powerbi_pre.md b/metadata-ingestion/docs/sources/powerbi/powerbi_pre.md index fcfae6cd1e6d7..52fe1cd77e8fd 100644 --- a/metadata-ingestion/docs/sources/powerbi/powerbi_pre.md +++ b/metadata-ingestion/docs/sources/powerbi/powerbi_pre.md @@ -39,6 +39,7 @@ PowerBI Source supports M-Query expression for below listed PowerBI Data Sources 3. PostgreSQL 4. Microsoft SQL Server 5. Google BigQuery +6. Databricks Native SQL query parsing is supported for `Snowflake` and `Amazon Redshift` data-sources. diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py index 49998ffbce879..6e74bfda3743c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py @@ -206,6 +206,16 @@ class PlatformDetail(ConfigModel): ) +class DataBricksPlatformDetail(PlatformDetail): + """ + metastore is an additional field used in Databricks connector to generate the dataset urn + """ + + metastore: str = pydantic.Field( + description="Databricks Unity Catalog metastore name.", + ) + + class OwnershipMapping(ConfigModel): create_corp_user: bool = pydantic.Field( default=True, description="Whether ingest PowerBI user as Datahub Corpuser" @@ -268,11 +278,14 @@ class PowerBiDashboardSourceConfig( hidden_from_docs=True, ) # PowerBI datasource's server to platform instance mapping - server_to_platform_instance: Dict[str, PlatformDetail] = pydantic.Field( + server_to_platform_instance: Dict[ + str, Union[PlatformDetail, DataBricksPlatformDetail] + ] = pydantic.Field( default={}, description="A mapping of PowerBI datasource's server i.e host[:port] to Data platform instance." - " :port is optional and only needed if your datasource server is running on non-standard port." - "For Google BigQuery the datasource's server is google bigquery project name", + " :port is optional and only needed if your datasource server is running on non-standard port. " + "For Google BigQuery the datasource's server is google bigquery project name. " + "For Databricks Unity Catalog the datasource's server is workspace FQDN.", ) # deprecated warning _dataset_type_mapping = pydantic_field_deprecated( diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py index 9134932c39fe0..a31fe7b248c7e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py @@ -57,6 +57,7 @@ def get_upstream_tables( config: PowerBiDashboardSourceConfig, parameters: Dict[str, str] = {}, ) -> List[resolver.Lineage]: + if table.expression is None: logger.debug(f"Expression is none for table {table.full_name}") return [] diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py index 49fbf926e49be..1e399047b0997 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py @@ -9,6 +9,7 @@ import datahub.emitter.mce_builder as builder from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.source.powerbi.config import ( + DataBricksPlatformDetail, DataPlatformPair, PlatformDetail, PowerBiDashboardSourceConfig, @@ -155,6 +156,17 @@ def get_db_detail_from_argument( return arguments[0], arguments[1] + @staticmethod + def get_tokens( + arg_list: Tree, + ) -> List[str]: + arguments: List[str] = tree_function.strip_char_from_list( + values=tree_function.remove_whitespaces_from_list( + tree_function.token_values(arg_list) + ), + ) + return arguments + def parse_custom_sql( self, query: str, server: str, database: Optional[str], schema: Optional[str] ) -> Lineage: @@ -760,19 +772,69 @@ def create_lineage( class DatabrickDataPlatformTableCreator(AbstractDataPlatformTableCreator): + def form_qualified_table_name( + self, + value_dict: Dict[Any, Any], + catalog_name: str, + data_platform_pair: DataPlatformPair, + server: str, + ) -> str: + # database and catalog names are same in M-Query + db_name: str = ( + catalog_name if "Database" not in value_dict else value_dict["Database"] + ) + + schema_name: str = value_dict["Schema"] + + table_name: str = value_dict["Table"] + + platform_detail: PlatformDetail = ( + self.platform_instance_resolver.get_platform_instance( + PowerBIPlatformDetail( + data_platform_pair=data_platform_pair, + data_platform_server=server, + ) + ) + ) + + metastore: Optional[str] = None + + qualified_table_name: str = f"{db_name}.{schema_name}.{table_name}" + + if isinstance(platform_detail, DataBricksPlatformDetail): + metastore = platform_detail.metastore + + if metastore is not None: + return f"{metastore}.{qualified_table_name}" + + return qualified_table_name + def create_lineage( self, data_access_func_detail: DataAccessFunctionDetail ) -> Lineage: logger.debug( f"Processing Databrick data-access function detail {data_access_func_detail}" ) - value_dict = {} + value_dict: Dict[str, str] = {} temp_accessor: Optional[ Union[IdentifierAccessor, AbstractIdentifierAccessor] ] = data_access_func_detail.identifier_accessor + while temp_accessor: if isinstance(temp_accessor, IdentifierAccessor): - value_dict[temp_accessor.items["Kind"]] = temp_accessor.items["Name"] + # Condition to handle databricks M-query pattern where table, schema and database all are present in + # same invoke statement + if all( + element in temp_accessor.items + for element in ["Item", "Schema", "Catalog"] + ): + value_dict["Schema"] = temp_accessor.items["Schema"] + value_dict["Table"] = temp_accessor.items["Item"] + else: + value_dict[temp_accessor.items["Kind"]] = temp_accessor.items[ + "Name" + ] + if temp_accessor.next is not None: temp_accessor = temp_accessor.next else: @@ -783,24 +845,30 @@ def create_lineage( ) return Lineage.empty() - db_name: str = value_dict["Database"] - schema_name: str = value_dict["Schema"] - table_name: str = value_dict["Table"] - - qualified_table_name: str = f"{db_name}.{schema_name}.{table_name}" - - server, _ = self.get_db_detail_from_argument(data_access_func_detail.arg_list) - if server is None: + arguments = self.get_tokens(data_access_func_detail.arg_list) + if len(arguments) < 4: logger.info( - f"server information is not available for {qualified_table_name}. Skipping upstream table" + f"Databricks workspace and catalog information in arguments({arguments}). " + f"Skipping upstream table" ) return Lineage.empty() + workspace_fqdn: str = arguments[0] + + catalog_name: str = arguments[3] + + qualified_table_name: str = self.form_qualified_table_name( + value_dict=value_dict, + catalog_name=catalog_name, + data_platform_pair=self.get_platform_pair(), + server=workspace_fqdn, + ) + urn = urn_creator( config=self.config, platform_instance_resolver=self.platform_instance_resolver, data_platform_pair=self.get_platform_pair(), - server=server, + server=workspace_fqdn, qualified_table_name=qualified_table_name, ) @@ -1063,14 +1131,20 @@ class FunctionName(Enum): DATABRICK_DATA_ACCESS = "Databricks.Catalogs" GOOGLE_BIGQUERY_DATA_ACCESS = "GoogleBigQuery.Database" AMAZON_REDSHIFT_DATA_ACCESS = "AmazonRedshift.Database" + DATABRICK_MULTI_CLOUD_DATA_ACCESS = "DatabricksMultiCloud.Catalogs" class SupportedResolver(Enum): - DATABRICK_QUERY = ( + DATABRICKS_QUERY = ( DatabrickDataPlatformTableCreator, FunctionName.DATABRICK_DATA_ACCESS, ) + DATABRICKS_MULTI_CLOUD = ( + DatabrickDataPlatformTableCreator, + FunctionName.DATABRICK_MULTI_CLOUD_DATA_ACCESS, + ) + POSTGRES_SQL = ( PostgresDataPlatformTableCreator, FunctionName.POSTGRESQL_DATA_ACCESS, diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi-lexical-grammar.rule b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi-lexical-grammar.rule index 321ffe1d3a21b..3524f43b25af7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi-lexical-grammar.rule +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi-lexical-grammar.rule @@ -5,7 +5,7 @@ // - Tweak unary_expression to allow arbitrary operators within it. // This is necessary because unary_expression is the base for the // whole relational_expression parse tree. - +// - Added letter_character_and_decimal_digit phrase and updated keyword_or_identifier phrase lexical_unit: lexical_elements? @@ -127,6 +127,7 @@ regular_identifier: available_identifier available_identifier: keyword_or_identifier keyword_or_identifier: letter_character + | letter_character_and_decimal_digit | underscore_character | identifier_start_character identifier_part_characters @@ -157,6 +158,8 @@ underscore_character: "_" letter_character: /[_\-\p{Lu}\p{Ll}\p{Lt}\p{Lm}\p{Lo}\p{Nl}]+/ +letter_character_and_decimal_digit: /[_\-\p{Lu}\p{Ll}\p{Lt}\p{Lm}\p{Lo}\p{Nl}\p{Nd}]+/ + combining_character: /[_\p{Mn}\p{Mc}]+/ decimal_digit_character: /[\p{Nd}]+/ diff --git a/metadata-ingestion/tests/integration/powerbi/test_m_parser.py b/metadata-ingestion/tests/integration/powerbi/test_m_parser.py index d1b56c31d4cf6..0aa290cecd8fd 100644 --- a/metadata-ingestion/tests/integration/powerbi/test_m_parser.py +++ b/metadata-ingestion/tests/integration/powerbi/test_m_parser.py @@ -46,6 +46,9 @@ 'let\n Source = Value.NativeQuery(AmazonRedshift.Database("redshift-url","dev"), "select * from dev.public.category", null, [EnableFolding=true]) \n in Source', 'let\n Source = Databricks.Catalogs("adb-123.azuredatabricks.net", "/sql/1.0/endpoints/12345dc91aa25844", [Catalog=null, Database=null]),\n hive_metastore_Database = Source{[Name="hive_metastore",Kind="Database"]}[Data],\n sandbox_revenue_Schema = hive_metastore_Database{[Name="sandbox_revenue",Kind="Schema"]}[Data],\n public_consumer_price_index_Table = sandbox_revenue_Schema{[Name="public_consumer_price_index",Kind="Table"]}[Data],\n #"Renamed Columns" = Table.RenameColumns(public_consumer_price_index_Table,{{"Country", "country"}, {"Metric", "metric"}}),\n #"Inserted Year" = Table.AddColumn(#"Renamed Columns", "ID", each Date.Year([date_id]) + Date.Month([date_id]), Text.Type),\n #"Added Custom" = Table.AddColumn(#"Inserted Year", "Custom", each Text.Combine({Number.ToText(Date.Year([date_id])), Number.ToText(Date.Month([date_id])), [country]})),\n #"Removed Columns" = Table.RemoveColumns(#"Added Custom",{"ID"}),\n #"Renamed Columns1" = Table.RenameColumns(#"Removed Columns",{{"Custom", "ID"}}),\n #"Filtered Rows" = Table.SelectRows(#"Renamed Columns1", each ([metric] = "Consumer Price Index") and (not Number.IsNaN([value])))\nin\n #"Filtered Rows"', "let\n Source = Value.NativeQuery(Snowflake.Databases(\"bu10758.ap-unknown-2.fakecomputing.com\",\"operations_analytics_warehouse_prod\",[Role=\"OPERATIONS_ANALYTICS_MEMBER\"]){[Name=\"OPERATIONS_ANALYTICS\"]}[Data], \"select #(lf)UPPER(REPLACE(AGENT_NAME,'-','')) AS CLIENT_DIRECTOR,#(lf)TIER,#(lf)UPPER(MANAGER),#(lf)TEAM_TYPE,#(lf)DATE_TARGET,#(lf)MONTHID,#(lf)TARGET_TEAM,#(lf)SELLER_EMAIL,#(lf)concat((UPPER(REPLACE(AGENT_NAME,'-',''))), MONTHID) as AGENT_KEY,#(lf)UNIT_TARGET AS SME_Quota,#(lf)AMV_TARGET AS Revenue_Quota,#(lf)SERVICE_QUOTA,#(lf)BL_TARGET,#(lf)SOFTWARE_QUOTA as Software_Quota#(lf)#(lf)from OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT_TARGETS inner join OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT #(lf)#(lf)where YEAR_TARGET >= 2022#(lf)and TEAM_TYPE = 'Accounting'#(lf)and TARGET_TEAM = 'Enterprise'#(lf)AND TIER = 'Client Director'\", null, [EnableFolding=true])\nin\n Source", + 'let\n Source = DatabricksMultiCloud.Catalogs("abc.cloud.databricks.com", "/sql/gh2cfe3fe1d4c7cd", [Catalog=null, Database=null, EnableAutomaticProxyDiscovery=null]),\n ml_prod_Database = Source{[Name="ml_prod",Kind="Database"]}[Data],\n membership_corn_Schema = ml_prod_Database{[Name="membership_corn",Kind="Schema"]}[Data],\n time_exp_data_v3_Table = membership_corn_Schema{[Name="time_exp_data_v3",Kind="Table"]}[Data],\n #"Renamed Columns" = Table.RenameColumns(time_exp_data_v3_Table)\nin\n #"Renamed Columns"', + 'let\n Source = DatabricksMultiCloud.Catalogs("abc.cloud.databricks.com", "/sql/gh2cfe3fe1d4c7cd", [Catalog="data_analysis", Database="summary", EnableAutomaticProxyDiscovery=null]),\n committee_data_summary_dev = Source{[Item="committee_data",Schema="summary",Catalog="data_analysis"]}[Data],\n #"Added Index" = Table.AddIndexColumn(committee_data_summary_dev, "Index", 1, 1, Int64.Type),\n #"Renamed Columns" = Table.RenameColumns(#"Added Index",)\nin\n #"Renamed Columns"', + 'let\n Source = DatabricksMultiCloud.Catalogs("abc.cloud.databricks.com", "/sql/gh2cfe3fe1d4c7cd", [Catalog="data_analysis", Database="summary", EnableAutomaticProxyDiscovery=null]),\n vips_data_summary_dev = Source{[Item="vips_data",Schema="summary",Catalog="data_analysis"]}[Data],\n #"Changed Type" = Table.TransformColumnTypes(vips_data_summary_dev,{{"vipstartDate", type date}, {"enteredDate", type datetime}, {"estDraftDate", type datetime}, {"estPublDate", type datetime}})\nin\n #"Changed Type"', 'let\n Source = Value.NativeQuery(Snowflake.Databases("0DD93C6BD5A6.snowflakecomputing.com","sales_analytics_warehouse_prod",[Role="sales_analytics_member_ad"]){[Name="ORDERING"]}[Data], "SELECT#(lf) DISTINCT#(lf) T5.PRESENTMENT_START_DATE#(lf),T5.PRESENTMENT_END_DATE#(lf),T5.DISPLAY_NAME#(lf),T5.NAME#(tab)#(lf),T5.PROMO_DISPLAY_NAME#(lf),T5.REGION#(lf),T5.ID#(lf),T5.WALKOUT#(lf),T6.DEAL_ID#(lf),T6.TYPE#(lf),T5.FREE_PERIOD#(lf),T6.PRICE_MODIFICATION#(lf)#(lf)FROM#(lf)#(lf)(#(lf) SELECT #(lf) T1.NAME#(lf),DATE(T1.CREATED_AT) as CREATED_AT#(lf),T1.PROMO_CODE#(lf),T1.STATUS#(lf),DATE(T1.UPDATED_AT) as UPDATED_AT#(lf),T1.ID#(lf),T1.DISPLAY_NAME as PROMO_DISPLAY_NAME#(lf),T4.*#(lf)FROM#(lf)(SELECT#(lf) DISTINCT#(lf) NAME#(lf),CREATED_AT#(lf),PROMO_CODE#(lf),STATUS#(lf),UPDATED_AT#(lf),ID#(lf),DISPLAY_NAME#(lf) FROM RAW.PROMOTIONS#(lf)#(lf)) T1#(lf)INNER JOIN#(lf)#(lf) (#(lf) SELECT #(lf) T3.PRODUCT_STATUS#(lf),T3.CODE#(lf),T3.REGION#(lf),T3.DISPLAY_ORDER_SEQUENCE#(lf),T3.PRODUCT_LINE_ID#(lf),T3.DISPLAY_NAME#(lf),T3.PRODUCT_TYPE#(lf),T3.ID as PROD_TBL_ID#(lf),T3.NAME as PROD_TBL_NAME#(lf),DATE(T2.PRESENTMENT_END_DATE) as PRESENTMENT_END_DATE#(lf),T2.PRICE_COMMITMENT_PERIOD#(lf),T2.NAME as SEAL_TBL_NAME#(lf),DATE(T2.CREATED_AT) as SEAL_TBL_CREATED_AT#(lf),T2.DESCRIPTION#(lf),T2.FREE_PERIOD#(lf),T2.WALKOUT#(lf),T2.PRODUCT_CAT_ID#(lf),T2.PROMOTION_ID#(lf),DATE(T2.PRESENTMENT_START_DATE) as PRESENTMENT_START_DATE#(lf),YEAR(T2.PRESENTMENT_START_DATE) as DEAL_YEAR_START#(lf),MONTH(T2.PRESENTMENT_START_DATE) as DEAL_MONTH_START#(lf),T2.DEAL_TYPE#(lf),DATE(T2.UPDATED_AT) as SEAL_TBL_UPDATED_AT#(lf),T2.ID as SEAL_TBL_ID#(lf),T2.STATUS as SEAL_TBL_STATUS#(lf)FROM#(lf)(SELECT#(lf) DISTINCT#(lf) PRODUCT_STATUS#(lf),CODE#(lf),REGION#(lf),DISPLAY_ORDER_SEQUENCE#(lf),PRODUCT_LINE_ID#(lf),DISPLAY_NAME#(lf),PRODUCT_TYPE#(lf),ID #(lf),NAME #(lf) FROM#(lf) RAW.PRODUCTS#(lf)#(lf)) T3#(lf)INNER JOIN#(lf)(#(lf) SELECT#(lf) DISTINCT#(lf) PRESENTMENT_END_DATE#(lf),PRICE_COMMITMENT_PERIOD#(lf),NAME#(lf),CREATED_AT#(lf),DESCRIPTION#(lf),FREE_PERIOD#(lf),WALKOUT#(lf),PRODUCT_CAT_ID#(lf),PROMOTION_ID#(lf),PRESENTMENT_START_DATE#(lf),DEAL_TYPE#(lf),UPDATED_AT#(lf),ID#(lf),STATUS#(lf) FROM#(lf) RAW.DEALS#(lf)#(lf)) T2#(lf)ON#(lf)T3.ID = T2.PRODUCT_CAT_ID #(lf)WHERE#(lf)T2.PRESENTMENT_START_DATE >= \'2015-01-01\'#(lf)AND#(lf)T2.STATUS = \'active\'#(lf)#(lf))T4#(lf)ON#(lf)T1.ID = T4.PROMOTION_ID#(lf))T5#(lf)INNER JOIN#(lf)RAW.PRICE_MODIFICATIONS T6#(lf)ON#(lf)T5.SEAL_TBL_ID = T6.DEAL_ID", null, [EnableFolding=true]) \n in \n Source', ] @@ -767,9 +770,93 @@ def test_sqlglot_parser(): assert lineage[0].column_lineage[i].upstreams == [] -def test_sqlglot_parser_2(): +def test_databricks_multi_cloud(): table: powerbi_data_classes.Table = powerbi_data_classes.Table( expression=M_QUERIES[25], + name="category", + full_name="dev.public.category", + ) + reporter = PowerBiDashboardSourceReport() + + ctx, config, platform_instance_resolver = get_default_instances() + data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( + table, + reporter, + ctx=ctx, + config=config, + platform_instance_resolver=platform_instance_resolver, + )[0].upstreams + + assert len(data_platform_tables) == 1 + + assert ( + data_platform_tables[0].urn + == "urn:li:dataset:(urn:li:dataPlatform:databricks,ml_prod.membership_corn.time_exp_data_v3,PROD)" + ) + + +def test_databricks_catalog_pattern_1(): + table: powerbi_data_classes.Table = powerbi_data_classes.Table( + expression=M_QUERIES[26], + name="category", + full_name="dev.public.category", + ) + reporter = PowerBiDashboardSourceReport() + + ctx, config, platform_instance_resolver = get_default_instances() + data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( + table, + reporter, + ctx=ctx, + config=config, + platform_instance_resolver=platform_instance_resolver, + )[0].upstreams + + assert len(data_platform_tables) == 1 + + assert ( + data_platform_tables[0].urn + == "urn:li:dataset:(urn:li:dataPlatform:databricks,data_analysis.summary.committee_data,PROD)" + ) + + +def test_databricks_catalog_pattern_2(): + table: powerbi_data_classes.Table = powerbi_data_classes.Table( + expression=M_QUERIES[27], + name="category", + full_name="dev.public.category", + ) + reporter = PowerBiDashboardSourceReport() + + ctx, config, platform_instance_resolver = get_default_instances( + override_config={ + "server_to_platform_instance": { + "abc.cloud.databricks.com": { + "metastore": "central_metastore", + "platform_instance": "abc", + } + } + } + ) + data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( + table, + reporter, + ctx=ctx, + config=config, + platform_instance_resolver=platform_instance_resolver, + )[0].upstreams + + assert len(data_platform_tables) == 1 + + assert ( + data_platform_tables[0].urn + == "urn:li:dataset:(urn:li:dataPlatform:databricks,abc.central_metastore.data_analysis.summary.vips_data,PROD)" + ) + + +def test_sqlglot_parser_2(): + table: powerbi_data_classes.Table = powerbi_data_classes.Table( + expression=M_QUERIES[28], name="SALES_TARGET", full_name="dev.public.sales", )