Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Jan 3, 2024
2 parents 1f00dcb + 29f2142 commit 08e5731
Show file tree
Hide file tree
Showing 14 changed files with 604 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ export const SecretBuilderModal = ({ initialState, visible, onSubmit, onCancel }
},
{ whitespace: false },
{ min: 1, max: 50 },
{ pattern: /^[^\s\t${}\\,'"]+$/, message: 'This secret name is not allowed.' },
{ pattern: /^[a-zA-Z_]+[a-zA-Z0-9_]*$/, message: 'Please start the secret name with a letter, followed by letters, digits, or underscores only.' },
]}
hasFeedback
>
Expand Down
3 changes: 3 additions & 0 deletions datahub-web-react/src/app/ingest/source/builder/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ export const CUSTOM = 'custom';
export const CUSTOM_URN = `urn:li:dataPlatform:${CUSTOM}`;
export const UNITY_CATALOG = 'unity-catalog';
export const UNITY_CATALOG_URN = `urn:li:dataPlatform:${UNITY_CATALOG}`;
export const DATABRICKS = 'databricks';
export const DATABRICKS_URN = `urn:li:dataPlatform:${DATABRICKS}`;
export const DBT_CLOUD = 'dbt-cloud';
export const DBT_CLOUD_URN = `urn:li:dataPlatform:dbt`;
export const VERTICA = 'vertica';
Expand Down Expand Up @@ -143,6 +145,7 @@ export const PLATFORM_URN_TO_LOGO = {
[TRINO_URN]: trinoLogo,
[SUPERSET_URN]: supersetLogo,
[UNITY_CATALOG_URN]: databricksLogo,
[DATABRICKS_URN]: databricksLogo,
[VERTICA_URN]: verticaLogo,
[FIVETRAN_URN]: fivetranLogo,
[CSV_URN]: csvLogo,
Expand Down
4 changes: 1 addition & 3 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,7 @@

databricks = {
# 0.1.11 appears to have authentication issues with azure databricks
# 0.16.0 added py.typed support which caused mypy to fail. The databricks sdk is pinned until we resolve mypy issues.
# https://github.com/databricks/databricks-sdk-py/pull/483
"databricks-sdk>=0.9.0,<0.16.0",
"databricks-sdk>=0.9.0",
"pyspark~=3.3.0",
"requests",
# Version 2.4.0 includes sqlalchemy dialect, 2.8.0 includes some bug fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,13 @@ def ensure_profiling_pattern_is_passed_to_profiling(
cls, values: Dict[str, Any]
) -> Dict[str, Any]:
profiling: Optional[GEProfilingConfig] = values.get("profiling")
if profiling is not None and profiling.enabled:
# Note: isinstance() check is required here as unity-catalog source reuses
# SQLCommonConfig with different profiling config than GEProfilingConfig
if (
profiling is not None
and isinstance(profiling, GEProfilingConfig)
and profiling.enabled
):
profiling._allow_deny_patterns = values["profile_pattern"]
return values

Expand Down
16 changes: 7 additions & 9 deletions metadata-ingestion/src/datahub/ingestion/source/unity/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,6 @@ class UnityCatalogAnalyzeProfilerConfig(UnityCatalogProfilerConfig):
description="Number of worker threads to use for profiling. Set to 1 to disable.",
)

@pydantic.root_validator(skip_on_failure=True)
def warehouse_id_required_for_profiling(
cls, values: Dict[str, Any]
) -> Dict[str, Any]:
if values.get("enabled") and not values.get("warehouse_id"):
raise ValueError("warehouse_id must be set when profiling is enabled.")
return values

@property
def include_columns(self):
return not self.profile_table_level_only
Expand Down Expand Up @@ -254,6 +246,7 @@ class UnityCatalogSourceConfig(
description="Generate usage statistics.",
)

# TODO: Remove `type:ignore` by refactoring config
profiling: Union[UnityCatalogGEProfilerConfig, UnityCatalogAnalyzeProfilerConfig] = Field( # type: ignore
default=UnityCatalogGEProfilerConfig(),
description="Data profiling configuration",
Expand Down Expand Up @@ -316,7 +309,9 @@ def include_metastore_warning(cls, v: bool) -> bool:

@pydantic.root_validator(skip_on_failure=True)
def set_warehouse_id_from_profiling(cls, values: Dict[str, Any]) -> Dict[str, Any]:
profiling: Optional[UnityCatalogProfilerConfig] = values.get("profiling")
profiling: Optional[
Union[UnityCatalogGEProfilerConfig, UnityCatalogAnalyzeProfilerConfig]
] = values.get("profiling")
if not values.get("warehouse_id") and profiling and profiling.warehouse_id:
values["warehouse_id"] = profiling.warehouse_id
if (
Expand All @@ -337,6 +332,9 @@ def set_warehouse_id_from_profiling(cls, values: Dict[str, Any]) -> Dict[str, An
if values.get("warehouse_id") and profiling and not profiling.warehouse_id:
profiling.warehouse_id = values["warehouse_id"]

if profiling and profiling.enabled and not profiling.warehouse_id:
raise ValueError("warehouse_id must be set when profiling is enabled.")

return values

@pydantic.validator("schema_pattern", always=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
from datahub.ingestion.source.unity.proxy_types import (
Catalog,
Column,
ColumnProfile,
CustomCatalogType,
HiveTableType,
Metastore,
Schema,
Table,
TableProfile,
TableReference,
)

logger = logging.getLogger(__name__)
Expand All @@ -38,6 +41,18 @@
"binary": ColumnTypeName.BINARY,
}

NUM_NULLS = "num_nulls"
DISTINCT_COUNT = "distinct_count"
MIN = "min"
MAX = "max"
AVG_COL_LEN = "avg_col_len"
MAX_COL_LEN = "max_col_len"
VERSION = "version"

ROWS = "rows"
BYTES = "bytes"
TABLE_STAT_LIST = {ROWS, BYTES}


class HiveMetastoreProxy(Closeable):
# TODO: Support for view lineage using SQL parsing
Expand Down Expand Up @@ -67,7 +82,7 @@ def get_inspector(sqlalchemy_url: str, options: dict) -> Inspector:

def hive_metastore_catalog(self, metastore: Optional[Metastore]) -> Catalog:
return Catalog(
id=HIVE_METASTORE,
id=f"{metastore.id}.{HIVE_METASTORE}" if metastore else HIVE_METASTORE,
name=HIVE_METASTORE,
comment=None,
metastore=metastore,
Expand Down Expand Up @@ -95,9 +110,14 @@ def hive_metastore_tables(self, schema: Schema) -> Iterable[Table]:
continue
yield self._get_table(schema, table_name, False)

def _get_table(self, schema: Schema, table_name: str, is_view: bool) -> Table:
def _get_table(
self,
schema: Schema,
table_name: str,
is_view: bool = False,
) -> Table:
columns = self._get_columns(schema, table_name)
detailed_info = self._get_table_info(schema, table_name)
detailed_info = self._get_table_info(schema.name, table_name)

comment = detailed_info.pop("Comment", None)
storage_location = detailed_info.pop("Location", None)
Expand Down Expand Up @@ -129,6 +149,74 @@ def _get_table(self, schema: Schema, table_name: str, is_view: bool) -> Table:
comment=comment,
)

def get_table_profile(
self, ref: TableReference, include_column_stats: bool = False
) -> TableProfile:
columns = self._get_columns(
Schema(
id=ref.schema,
name=ref.schema,
# This is okay, as none of this is used in profiling
catalog=self.hive_metastore_catalog(None),
comment=None,
owner=None,
),
ref.table,
)
detailed_info = self._get_table_info(ref.schema, ref.table)

table_stats = (
self._get_cached_table_statistics(detailed_info["Statistics"])
if detailed_info.get("Statistics")
else {}
)

return TableProfile(
num_rows=int(table_stats[ROWS])
if table_stats.get(ROWS) is not None
else None,
total_size=int(table_stats[BYTES])
if table_stats.get(BYTES) is not None
else None,
num_columns=len(columns),
column_profiles=[
self._get_column_profile(column.name, ref) for column in columns
]
if include_column_stats
else [],
)

def _get_column_profile(self, column: str, ref: TableReference) -> ColumnProfile:

props = self._column_describe_extended(ref.schema, ref.table, column)
col_stats = {}
for prop in props:
col_stats[prop[0]] = prop[1]
return ColumnProfile(
name=column,
null_count=int(col_stats[NUM_NULLS])
if col_stats.get(NUM_NULLS) is not None
else None,
distinct_count=int(col_stats[DISTINCT_COUNT])
if col_stats.get(DISTINCT_COUNT) is not None
else None,
min=col_stats.get(MIN),
max=col_stats.get(MAX),
avg_len=col_stats.get(AVG_COL_LEN),
max_len=col_stats.get(MAX_COL_LEN),
version=col_stats.get(VERSION),
)

def _get_cached_table_statistics(self, statistics: str) -> dict:
# statistics is in format "xx bytes" OR "1382 bytes, 2 rows"
table_stats = dict()
for prop in statistics.split(","):
value_key_list = prop.strip().split(" ") # value_key_list -> [value, key]
if len(value_key_list) == 2 and value_key_list[1] in TABLE_STAT_LIST:
table_stats[value_key_list[1]] = value_key_list[0]

return table_stats

def _get_created_at(self, created_at: Optional[str]) -> Optional[datetime]:
return (
datetime.strptime(created_at, "%a %b %d %H:%M:%S %Z %Y")
Expand Down Expand Up @@ -171,8 +259,8 @@ def _get_table_type(self, type: Optional[str]) -> HiveTableType:
else:
return HiveTableType.UNKNOWN

def _get_table_info(self, schema: Schema, table_name: str) -> dict:
rows = self._describe_extended(schema.name, table_name)
def _get_table_info(self, schema_name: str, table_name: str) -> dict:
rows = self._describe_extended(schema_name, table_name)

index = rows.index(("# Detailed Table Information", "", ""))
rows = rows[index + 1 :]
Expand Down Expand Up @@ -235,6 +323,17 @@ def _describe_extended(self, schema_name: str, table_name: str) -> List[Row]:
"""
return self._execute_sql(f"DESCRIBE EXTENDED `{schema_name}`.`{table_name}`")

def _column_describe_extended(
self, schema_name: str, table_name: str, column_name: str
) -> List[Row]:
"""
Rows are structured as shown in examples here
https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-aux-describe-table.html#examples
"""
return self._execute_sql(
f"DESCRIBE EXTENDED `{schema_name}`.`{table_name}` {column_name}"
)

def _execute_sql(self, sql: str) -> List[Row]:
return self.inspector.bind.execute(sql).fetchall()

Expand Down
Loading

0 comments on commit 08e5731

Please sign in to comment.