Skip to content

Commit

Permalink
fix(ingestion/powerbi): Databricks support for table lineage (datahub…
Browse files Browse the repository at this point in the history
  • Loading branch information
sid-acryl authored May 22, 2024
1 parent ad3b8f9 commit 666de9e
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 18 deletions.
1 change: 1 addition & 0 deletions metadata-ingestion/docs/sources/powerbi/powerbi_pre.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ PowerBI Source supports M-Query expression for below listed PowerBI Data Sources
3. PostgreSQL
4. Microsoft SQL Server
5. Google BigQuery
6. Databricks

Native SQL query parsing is supported for `Snowflake` and `Amazon Redshift` data-sources.

Expand Down
19 changes: 16 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,16 @@ class PlatformDetail(ConfigModel):
)


class DataBricksPlatformDetail(PlatformDetail):
"""
metastore is an additional field used in Databricks connector to generate the dataset urn
"""

metastore: str = pydantic.Field(
description="Databricks Unity Catalog metastore name.",
)


class OwnershipMapping(ConfigModel):
create_corp_user: bool = pydantic.Field(
default=True, description="Whether ingest PowerBI user as Datahub Corpuser"
Expand Down Expand Up @@ -268,11 +278,14 @@ class PowerBiDashboardSourceConfig(
hidden_from_docs=True,
)
# PowerBI datasource's server to platform instance mapping
server_to_platform_instance: Dict[str, PlatformDetail] = pydantic.Field(
server_to_platform_instance: Dict[
str, Union[PlatformDetail, DataBricksPlatformDetail]
] = pydantic.Field(
default={},
description="A mapping of PowerBI datasource's server i.e host[:port] to Data platform instance."
" :port is optional and only needed if your datasource server is running on non-standard port."
"For Google BigQuery the datasource's server is google bigquery project name",
" :port is optional and only needed if your datasource server is running on non-standard port. "
"For Google BigQuery the datasource's server is google bigquery project name. "
"For Databricks Unity Catalog the datasource's server is workspace FQDN.",
)
# deprecated warning
_dataset_type_mapping = pydantic_field_deprecated(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def get_upstream_tables(
config: PowerBiDashboardSourceConfig,
parameters: Dict[str, str] = {},
) -> List[resolver.Lineage]:

if table.expression is None:
logger.debug(f"Expression is none for table {table.full_name}")
return []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import datahub.emitter.mce_builder as builder
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.powerbi.config import (
DataBricksPlatformDetail,
DataPlatformPair,
PlatformDetail,
PowerBiDashboardSourceConfig,
Expand Down Expand Up @@ -155,6 +156,17 @@ def get_db_detail_from_argument(

return arguments[0], arguments[1]

@staticmethod
def get_tokens(
arg_list: Tree,
) -> List[str]:
arguments: List[str] = tree_function.strip_char_from_list(
values=tree_function.remove_whitespaces_from_list(
tree_function.token_values(arg_list)
),
)
return arguments

def parse_custom_sql(
self, query: str, server: str, database: Optional[str], schema: Optional[str]
) -> Lineage:
Expand Down Expand Up @@ -760,19 +772,69 @@ def create_lineage(


class DatabrickDataPlatformTableCreator(AbstractDataPlatformTableCreator):
def form_qualified_table_name(
self,
value_dict: Dict[Any, Any],
catalog_name: str,
data_platform_pair: DataPlatformPair,
server: str,
) -> str:
# database and catalog names are same in M-Query
db_name: str = (
catalog_name if "Database" not in value_dict else value_dict["Database"]
)

schema_name: str = value_dict["Schema"]

table_name: str = value_dict["Table"]

platform_detail: PlatformDetail = (
self.platform_instance_resolver.get_platform_instance(
PowerBIPlatformDetail(
data_platform_pair=data_platform_pair,
data_platform_server=server,
)
)
)

metastore: Optional[str] = None

qualified_table_name: str = f"{db_name}.{schema_name}.{table_name}"

if isinstance(platform_detail, DataBricksPlatformDetail):
metastore = platform_detail.metastore

if metastore is not None:
return f"{metastore}.{qualified_table_name}"

return qualified_table_name

def create_lineage(
self, data_access_func_detail: DataAccessFunctionDetail
) -> Lineage:
logger.debug(
f"Processing Databrick data-access function detail {data_access_func_detail}"
)
value_dict = {}
value_dict: Dict[str, str] = {}
temp_accessor: Optional[
Union[IdentifierAccessor, AbstractIdentifierAccessor]
] = data_access_func_detail.identifier_accessor

while temp_accessor:
if isinstance(temp_accessor, IdentifierAccessor):
value_dict[temp_accessor.items["Kind"]] = temp_accessor.items["Name"]
# Condition to handle databricks M-query pattern where table, schema and database all are present in
# same invoke statement
if all(
element in temp_accessor.items
for element in ["Item", "Schema", "Catalog"]
):
value_dict["Schema"] = temp_accessor.items["Schema"]
value_dict["Table"] = temp_accessor.items["Item"]
else:
value_dict[temp_accessor.items["Kind"]] = temp_accessor.items[
"Name"
]

if temp_accessor.next is not None:
temp_accessor = temp_accessor.next
else:
Expand All @@ -783,24 +845,30 @@ def create_lineage(
)
return Lineage.empty()

db_name: str = value_dict["Database"]
schema_name: str = value_dict["Schema"]
table_name: str = value_dict["Table"]

qualified_table_name: str = f"{db_name}.{schema_name}.{table_name}"

server, _ = self.get_db_detail_from_argument(data_access_func_detail.arg_list)
if server is None:
arguments = self.get_tokens(data_access_func_detail.arg_list)
if len(arguments) < 4:
logger.info(
f"server information is not available for {qualified_table_name}. Skipping upstream table"
f"Databricks workspace and catalog information in arguments({arguments}). "
f"Skipping upstream table"
)
return Lineage.empty()

workspace_fqdn: str = arguments[0]

catalog_name: str = arguments[3]

qualified_table_name: str = self.form_qualified_table_name(
value_dict=value_dict,
catalog_name=catalog_name,
data_platform_pair=self.get_platform_pair(),
server=workspace_fqdn,
)

urn = urn_creator(
config=self.config,
platform_instance_resolver=self.platform_instance_resolver,
data_platform_pair=self.get_platform_pair(),
server=server,
server=workspace_fqdn,
qualified_table_name=qualified_table_name,
)

Expand Down Expand Up @@ -1063,14 +1131,20 @@ class FunctionName(Enum):
DATABRICK_DATA_ACCESS = "Databricks.Catalogs"
GOOGLE_BIGQUERY_DATA_ACCESS = "GoogleBigQuery.Database"
AMAZON_REDSHIFT_DATA_ACCESS = "AmazonRedshift.Database"
DATABRICK_MULTI_CLOUD_DATA_ACCESS = "DatabricksMultiCloud.Catalogs"


class SupportedResolver(Enum):
DATABRICK_QUERY = (
DATABRICKS_QUERY = (
DatabrickDataPlatformTableCreator,
FunctionName.DATABRICK_DATA_ACCESS,
)

DATABRICKS_MULTI_CLOUD = (
DatabrickDataPlatformTableCreator,
FunctionName.DATABRICK_MULTI_CLOUD_DATA_ACCESS,
)

POSTGRES_SQL = (
PostgresDataPlatformTableCreator,
FunctionName.POSTGRESQL_DATA_ACCESS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// - Tweak unary_expression to allow arbitrary operators within it.
// This is necessary because unary_expression is the base for the
// whole relational_expression parse tree.

// - Added letter_character_and_decimal_digit phrase and updated keyword_or_identifier phrase

lexical_unit: lexical_elements?

Expand Down Expand Up @@ -127,6 +127,7 @@ regular_identifier: available_identifier
available_identifier: keyword_or_identifier

keyword_or_identifier: letter_character
| letter_character_and_decimal_digit
| underscore_character
| identifier_start_character identifier_part_characters

Expand Down Expand Up @@ -157,6 +158,8 @@ underscore_character: "_"

letter_character: /[_\-\p{Lu}\p{Ll}\p{Lt}\p{Lm}\p{Lo}\p{Nl}]+/

letter_character_and_decimal_digit: /[_\-\p{Lu}\p{Ll}\p{Lt}\p{Lm}\p{Lo}\p{Nl}\p{Nd}]+/

combining_character: /[_\p{Mn}\p{Mc}]+/

decimal_digit_character: /[\p{Nd}]+/
Expand Down
89 changes: 88 additions & 1 deletion metadata-ingestion/tests/integration/powerbi/test_m_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
'let\n Source = Value.NativeQuery(AmazonRedshift.Database("redshift-url","dev"), "select * from dev.public.category", null, [EnableFolding=true]) \n in Source',
'let\n Source = Databricks.Catalogs("adb-123.azuredatabricks.net", "/sql/1.0/endpoints/12345dc91aa25844", [Catalog=null, Database=null]),\n hive_metastore_Database = Source{[Name="hive_metastore",Kind="Database"]}[Data],\n sandbox_revenue_Schema = hive_metastore_Database{[Name="sandbox_revenue",Kind="Schema"]}[Data],\n public_consumer_price_index_Table = sandbox_revenue_Schema{[Name="public_consumer_price_index",Kind="Table"]}[Data],\n #"Renamed Columns" = Table.RenameColumns(public_consumer_price_index_Table,{{"Country", "country"}, {"Metric", "metric"}}),\n #"Inserted Year" = Table.AddColumn(#"Renamed Columns", "ID", each Date.Year([date_id]) + Date.Month([date_id]), Text.Type),\n #"Added Custom" = Table.AddColumn(#"Inserted Year", "Custom", each Text.Combine({Number.ToText(Date.Year([date_id])), Number.ToText(Date.Month([date_id])), [country]})),\n #"Removed Columns" = Table.RemoveColumns(#"Added Custom",{"ID"}),\n #"Renamed Columns1" = Table.RenameColumns(#"Removed Columns",{{"Custom", "ID"}}),\n #"Filtered Rows" = Table.SelectRows(#"Renamed Columns1", each ([metric] = "Consumer Price Index") and (not Number.IsNaN([value])))\nin\n #"Filtered Rows"',
"let\n Source = Value.NativeQuery(Snowflake.Databases(\"bu10758.ap-unknown-2.fakecomputing.com\",\"operations_analytics_warehouse_prod\",[Role=\"OPERATIONS_ANALYTICS_MEMBER\"]){[Name=\"OPERATIONS_ANALYTICS\"]}[Data], \"select #(lf)UPPER(REPLACE(AGENT_NAME,'-','')) AS CLIENT_DIRECTOR,#(lf)TIER,#(lf)UPPER(MANAGER),#(lf)TEAM_TYPE,#(lf)DATE_TARGET,#(lf)MONTHID,#(lf)TARGET_TEAM,#(lf)SELLER_EMAIL,#(lf)concat((UPPER(REPLACE(AGENT_NAME,'-',''))), MONTHID) as AGENT_KEY,#(lf)UNIT_TARGET AS SME_Quota,#(lf)AMV_TARGET AS Revenue_Quota,#(lf)SERVICE_QUOTA,#(lf)BL_TARGET,#(lf)SOFTWARE_QUOTA as Software_Quota#(lf)#(lf)from OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT_TARGETS inner join OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT #(lf)#(lf)where YEAR_TARGET >= 2022#(lf)and TEAM_TYPE = 'Accounting'#(lf)and TARGET_TEAM = 'Enterprise'#(lf)AND TIER = 'Client Director'\", null, [EnableFolding=true])\nin\n Source",
'let\n Source = DatabricksMultiCloud.Catalogs("abc.cloud.databricks.com", "/sql/gh2cfe3fe1d4c7cd", [Catalog=null, Database=null, EnableAutomaticProxyDiscovery=null]),\n ml_prod_Database = Source{[Name="ml_prod",Kind="Database"]}[Data],\n membership_corn_Schema = ml_prod_Database{[Name="membership_corn",Kind="Schema"]}[Data],\n time_exp_data_v3_Table = membership_corn_Schema{[Name="time_exp_data_v3",Kind="Table"]}[Data],\n #"Renamed Columns" = Table.RenameColumns(time_exp_data_v3_Table)\nin\n #"Renamed Columns"',
'let\n Source = DatabricksMultiCloud.Catalogs("abc.cloud.databricks.com", "/sql/gh2cfe3fe1d4c7cd", [Catalog="data_analysis", Database="summary", EnableAutomaticProxyDiscovery=null]),\n committee_data_summary_dev = Source{[Item="committee_data",Schema="summary",Catalog="data_analysis"]}[Data],\n #"Added Index" = Table.AddIndexColumn(committee_data_summary_dev, "Index", 1, 1, Int64.Type),\n #"Renamed Columns" = Table.RenameColumns(#"Added Index",)\nin\n #"Renamed Columns"',
'let\n Source = DatabricksMultiCloud.Catalogs("abc.cloud.databricks.com", "/sql/gh2cfe3fe1d4c7cd", [Catalog="data_analysis", Database="summary", EnableAutomaticProxyDiscovery=null]),\n vips_data_summary_dev = Source{[Item="vips_data",Schema="summary",Catalog="data_analysis"]}[Data],\n #"Changed Type" = Table.TransformColumnTypes(vips_data_summary_dev,{{"vipstartDate", type date}, {"enteredDate", type datetime}, {"estDraftDate", type datetime}, {"estPublDate", type datetime}})\nin\n #"Changed Type"',
'let\n Source = Value.NativeQuery(Snowflake.Databases("0DD93C6BD5A6.snowflakecomputing.com","sales_analytics_warehouse_prod",[Role="sales_analytics_member_ad"]){[Name="ORDERING"]}[Data], "SELECT#(lf) DISTINCT#(lf) T5.PRESENTMENT_START_DATE#(lf),T5.PRESENTMENT_END_DATE#(lf),T5.DISPLAY_NAME#(lf),T5.NAME#(tab)#(lf),T5.PROMO_DISPLAY_NAME#(lf),T5.REGION#(lf),T5.ID#(lf),T5.WALKOUT#(lf),T6.DEAL_ID#(lf),T6.TYPE#(lf),T5.FREE_PERIOD#(lf),T6.PRICE_MODIFICATION#(lf)#(lf)FROM#(lf)#(lf)(#(lf) SELECT #(lf) T1.NAME#(lf),DATE(T1.CREATED_AT) as CREATED_AT#(lf),T1.PROMO_CODE#(lf),T1.STATUS#(lf),DATE(T1.UPDATED_AT) as UPDATED_AT#(lf),T1.ID#(lf),T1.DISPLAY_NAME as PROMO_DISPLAY_NAME#(lf),T4.*#(lf)FROM#(lf)(SELECT#(lf) DISTINCT#(lf) NAME#(lf),CREATED_AT#(lf),PROMO_CODE#(lf),STATUS#(lf),UPDATED_AT#(lf),ID#(lf),DISPLAY_NAME#(lf) FROM RAW.PROMOTIONS#(lf)#(lf)) T1#(lf)INNER JOIN#(lf)#(lf) (#(lf) SELECT #(lf) T3.PRODUCT_STATUS#(lf),T3.CODE#(lf),T3.REGION#(lf),T3.DISPLAY_ORDER_SEQUENCE#(lf),T3.PRODUCT_LINE_ID#(lf),T3.DISPLAY_NAME#(lf),T3.PRODUCT_TYPE#(lf),T3.ID as PROD_TBL_ID#(lf),T3.NAME as PROD_TBL_NAME#(lf),DATE(T2.PRESENTMENT_END_DATE) as PRESENTMENT_END_DATE#(lf),T2.PRICE_COMMITMENT_PERIOD#(lf),T2.NAME as SEAL_TBL_NAME#(lf),DATE(T2.CREATED_AT) as SEAL_TBL_CREATED_AT#(lf),T2.DESCRIPTION#(lf),T2.FREE_PERIOD#(lf),T2.WALKOUT#(lf),T2.PRODUCT_CAT_ID#(lf),T2.PROMOTION_ID#(lf),DATE(T2.PRESENTMENT_START_DATE) as PRESENTMENT_START_DATE#(lf),YEAR(T2.PRESENTMENT_START_DATE) as DEAL_YEAR_START#(lf),MONTH(T2.PRESENTMENT_START_DATE) as DEAL_MONTH_START#(lf),T2.DEAL_TYPE#(lf),DATE(T2.UPDATED_AT) as SEAL_TBL_UPDATED_AT#(lf),T2.ID as SEAL_TBL_ID#(lf),T2.STATUS as SEAL_TBL_STATUS#(lf)FROM#(lf)(SELECT#(lf) DISTINCT#(lf) PRODUCT_STATUS#(lf),CODE#(lf),REGION#(lf),DISPLAY_ORDER_SEQUENCE#(lf),PRODUCT_LINE_ID#(lf),DISPLAY_NAME#(lf),PRODUCT_TYPE#(lf),ID #(lf),NAME #(lf) FROM#(lf) RAW.PRODUCTS#(lf)#(lf)) T3#(lf)INNER JOIN#(lf)(#(lf) SELECT#(lf) DISTINCT#(lf) PRESENTMENT_END_DATE#(lf),PRICE_COMMITMENT_PERIOD#(lf),NAME#(lf),CREATED_AT#(lf),DESCRIPTION#(lf),FREE_PERIOD#(lf),WALKOUT#(lf),PRODUCT_CAT_ID#(lf),PROMOTION_ID#(lf),PRESENTMENT_START_DATE#(lf),DEAL_TYPE#(lf),UPDATED_AT#(lf),ID#(lf),STATUS#(lf) FROM#(lf) RAW.DEALS#(lf)#(lf)) T2#(lf)ON#(lf)T3.ID = T2.PRODUCT_CAT_ID #(lf)WHERE#(lf)T2.PRESENTMENT_START_DATE >= \'2015-01-01\'#(lf)AND#(lf)T2.STATUS = \'active\'#(lf)#(lf))T4#(lf)ON#(lf)T1.ID = T4.PROMOTION_ID#(lf))T5#(lf)INNER JOIN#(lf)RAW.PRICE_MODIFICATIONS T6#(lf)ON#(lf)T5.SEAL_TBL_ID = T6.DEAL_ID", null, [EnableFolding=true]) \n in \n Source',
]

Expand Down Expand Up @@ -767,9 +770,93 @@ def test_sqlglot_parser():
assert lineage[0].column_lineage[i].upstreams == []


def test_sqlglot_parser_2():
def test_databricks_multi_cloud():
table: powerbi_data_classes.Table = powerbi_data_classes.Table(
expression=M_QUERIES[25],
name="category",
full_name="dev.public.category",
)
reporter = PowerBiDashboardSourceReport()

ctx, config, platform_instance_resolver = get_default_instances()
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
table,
reporter,
ctx=ctx,
config=config,
platform_instance_resolver=platform_instance_resolver,
)[0].upstreams

assert len(data_platform_tables) == 1

assert (
data_platform_tables[0].urn
== "urn:li:dataset:(urn:li:dataPlatform:databricks,ml_prod.membership_corn.time_exp_data_v3,PROD)"
)


def test_databricks_catalog_pattern_1():
table: powerbi_data_classes.Table = powerbi_data_classes.Table(
expression=M_QUERIES[26],
name="category",
full_name="dev.public.category",
)
reporter = PowerBiDashboardSourceReport()

ctx, config, platform_instance_resolver = get_default_instances()
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
table,
reporter,
ctx=ctx,
config=config,
platform_instance_resolver=platform_instance_resolver,
)[0].upstreams

assert len(data_platform_tables) == 1

assert (
data_platform_tables[0].urn
== "urn:li:dataset:(urn:li:dataPlatform:databricks,data_analysis.summary.committee_data,PROD)"
)


def test_databricks_catalog_pattern_2():
table: powerbi_data_classes.Table = powerbi_data_classes.Table(
expression=M_QUERIES[27],
name="category",
full_name="dev.public.category",
)
reporter = PowerBiDashboardSourceReport()

ctx, config, platform_instance_resolver = get_default_instances(
override_config={
"server_to_platform_instance": {
"abc.cloud.databricks.com": {
"metastore": "central_metastore",
"platform_instance": "abc",
}
}
}
)
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
table,
reporter,
ctx=ctx,
config=config,
platform_instance_resolver=platform_instance_resolver,
)[0].upstreams

assert len(data_platform_tables) == 1

assert (
data_platform_tables[0].urn
== "urn:li:dataset:(urn:li:dataPlatform:databricks,abc.central_metastore.data_analysis.summary.vips_data,PROD)"
)


def test_sqlglot_parser_2():
table: powerbi_data_classes.Table = powerbi_data_classes.Table(
expression=M_QUERIES[28],
name="SALES_TARGET",
full_name="dev.public.sales",
)
Expand Down

0 comments on commit 666de9e

Please sign in to comment.