Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Mar 12, 2024
2 parents c47471f + 4155a62 commit 8606a67
Show file tree
Hide file tree
Showing 41 changed files with 1,825 additions and 1,307 deletions.
6 changes: 5 additions & 1 deletion docs/advanced/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,8 @@ docker-compose -p datahub \

We set up quickstart.sh, dev.sh, and dev-without-neo4j.sh to add the above docker-compose when MONITORING=true. For
instance `MONITORING=true ./docker/quickstart.sh` will add the correct env variables to start collecting traces and
metrics, and also deploy Jaeger, Prometheus, and Grafana. We will soon support this as a flag during quickstart.
metrics, and also deploy Jaeger, Prometheus, and Grafana. We will soon support this as a flag during quickstart.

## Health check endpoint

For monitoring healthiness of your DataHub service, `/admin` endpoint can be used.
10 changes: 7 additions & 3 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@
"acryl-sqlglot==22.3.1.dev3",
}

classification_lib = {
"acryl-datahub-classify==0.0.9",
}

sql_common = (
{
# Required for all SQL sources.
Expand All @@ -121,6 +125,7 @@
}
| usage_common
| sqlglot_lib
| classification_lib
)

sqllineage_lib = {
Expand Down Expand Up @@ -175,7 +180,7 @@
# Clickhouse 0.8.3 adds support for SQLAlchemy 1.4.x
"sqlalchemy-redshift>=0.8.3",
"GeoAlchemy2",
"redshift-connector",
"redshift-connector>=2.1.0",
*sqllineage_lib,
*path_spec_common,
}
Expand All @@ -190,8 +195,7 @@
"pandas",
"cryptography",
"msal",
"acryl-datahub-classify==0.0.9",
}
} | classification_lib

trino = {
"trino[sqlalchemy]>=0.308",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,10 @@ def get_columns_to_classify(
f"Skipping column {dataset_name}.{schema_field.fieldPath} from classification"
)
continue

# TODO: Let's auto-skip passing sample_data for complex(array/struct) columns
# for initial rollout

column_infos.append(
ColumnInfo(
metadata=Metadata(
Expand All @@ -243,9 +247,11 @@ def get_columns_to_classify(
"Dataset_Name": dataset_name,
}
),
values=sample_data[schema_field.fieldPath]
if schema_field.fieldPath in sample_data.keys()
else [],
values=(
sample_data[schema_field.fieldPath]
if schema_field.fieldPath in sample_data.keys()
else []
),
)
)

Expand Down
31 changes: 20 additions & 11 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
)
from datahub.configuration.source_common import DatasetSourceConfigMixin
from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated
from datahub.configuration.validate_field_removal import pydantic_removed_field
from datahub.emitter import mce_builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
Expand Down Expand Up @@ -114,7 +115,7 @@
infer_output_schema,
sqlglot_lineage,
)
from datahub.sql_parsing.sqlglot_utils import detach_ctes
from datahub.sql_parsing.sqlglot_utils import detach_ctes, try_format_query
from datahub.utilities.mapping import Constants, OperationProcessor
from datahub.utilities.time import datetime_to_ts_millis
from datahub.utilities.topological_sort import topological_sort
Expand Down Expand Up @@ -297,10 +298,6 @@ class DBTCommonConfig(
description="When enabled, converts column URNs to lowercase to ensure cross-platform compatibility. "
"If `target_platform` is Snowflake, the default is True.",
)
use_compiled_code: bool = Field(
default=False,
description="When enabled, uses the compiled dbt code instead of the raw dbt node definition.",
)
test_warnings_are_errors: bool = Field(
default=False,
description="When enabled, dbt test warnings will be treated as failures.",
Expand All @@ -320,6 +317,15 @@ class DBTCommonConfig(
description="When enabled, emits incremental/patch lineage for non-dbt entities. When disabled, re-states lineage on each run.",
)

_remove_use_compiled_code = pydantic_removed_field("use_compiled_code")

include_compiled_code: bool = Field(
# TODO: Once the formattedViewLogic field model change is included in a server
# release, probably 0.13.1, we can flip the default to True.
default=False,
description="When enabled, includes the compiled code in the emitted metadata.",
)

@validator("target_platform")
def validate_target_platform_value(cls, target_platform: str) -> str:
if target_platform.lower() == DBT_PLATFORM:
Expand Down Expand Up @@ -1269,18 +1275,21 @@ def get_external_url(self, node: DBTNode) -> Optional[str]:
def _create_view_properties_aspect(
self, node: DBTNode
) -> Optional[ViewPropertiesClass]:
view_logic = (
node.compiled_code if self.config.use_compiled_code else node.raw_code
)

if node.language != "sql" or not view_logic:
if node.language != "sql" or not node.raw_code:
return None

compiled_code = None
if self.config.include_compiled_code and node.compiled_code:
compiled_code = try_format_query(
node.compiled_code, platform=self.config.target_platform
)

materialized = node.materialization in {"table", "incremental", "snapshot"}
view_properties = ViewPropertiesClass(
materialized=materialized,
viewLanguage="SQL",
viewLogic=view_logic,
viewLogic=node.raw_code,
formattedViewLogic=compiled_code,
)
return view_properties

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ class RedshiftConfig(
description="The default schema to use if the sql parser fails to parse the schema with `sql_based` lineage collector",
)

is_serverless: bool = Field(
default=False,
description="Whether target Redshift instance is serverless (alternative is provisioned cluster)",
)

use_lineage_v2: bool = Field(
default=False,
description="Whether to use the new SQL-based lineage collector.",
Expand Down
30 changes: 19 additions & 11 deletions metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.aws.s3_util import strip_s3_prefix
from datahub.ingestion.source.redshift.config import LineageMode, RedshiftConfig
from datahub.ingestion.source.redshift.query import RedshiftQuery
from datahub.ingestion.source.redshift.query import (
RedshiftCommonQuery,
RedshiftProvisionedQuery,
RedshiftServerlessQuery,
)
from datahub.ingestion.source.redshift.redshift_schema import (
LineageRow,
RedshiftDataDictionary,
Expand Down Expand Up @@ -158,6 +162,10 @@ def __init__(
self.context = context
self._lineage_map: Dict[str, LineageItem] = defaultdict()

self.queries: RedshiftCommonQuery = RedshiftProvisionedQuery()
if self.config.is_serverless:
self.queries = RedshiftServerlessQuery()

self.redundant_run_skip_handler = redundant_run_skip_handler
self.start_time, self.end_time = (
self.report.lineage_start_time,
Expand Down Expand Up @@ -659,7 +667,7 @@ def populate_lineage(
LineageMode.MIXED,
}:
# Populate table level lineage by getting upstream tables from stl_scan redshift table
query = RedshiftQuery.stl_scan_based_lineage_query(
query = self.queries.stl_scan_based_lineage_query(
self.config.database,
self.start_time,
self.end_time,
Expand All @@ -670,7 +678,7 @@ def populate_lineage(
LineageMode.MIXED,
}:
# Populate table level lineage by parsing table creating sqls
query = RedshiftQuery.list_insert_create_queries_sql(
query = self.queries.list_insert_create_queries_sql(
db_name=database,
start_time=self.start_time,
end_time=self.end_time,
Expand All @@ -679,23 +687,23 @@ def populate_lineage(

if self.config.include_views and self.config.include_view_lineage:
# Populate table level lineage for views
query = RedshiftQuery.view_lineage_query()
query = self.queries.view_lineage_query()
populate_calls.append((query, LineageCollectorType.VIEW))

# Populate table level lineage for late binding views
query = RedshiftQuery.list_late_view_ddls_query()
query = self.queries.list_late_view_ddls_query()
populate_calls.append((query, LineageCollectorType.VIEW_DDL_SQL_PARSING))

if self.config.include_copy_lineage:
query = RedshiftQuery.list_copy_commands_sql(
query = self.queries.list_copy_commands_sql(
db_name=database,
start_time=self.start_time,
end_time=self.end_time,
)
populate_calls.append((query, LineageCollectorType.COPY))

if self.config.include_unload_lineage:
query = RedshiftQuery.list_unload_commands_sql(
query = self.queries.list_unload_commands_sql(
db_name=database,
start_time=self.start_time,
end_time=self.end_time,
Expand Down Expand Up @@ -831,7 +839,7 @@ def _process_table_renames(
# new urn -> prev urn
table_renames: Dict[str, str] = {}

query = RedshiftQuery.alter_table_rename_query(
query = self.queries.alter_table_rename_query(
db_name=database,
start_time=self.start_time,
end_time=self.end_time,
Expand Down Expand Up @@ -869,7 +877,7 @@ def _process_table_renames(
def get_temp_tables(
self, connection: redshift_connector.Connection
) -> List[TempTableRow]:
ddl_query: str = RedshiftQuery.temp_table_ddl_query(
ddl_query: str = self.queries.temp_table_ddl_query(
start_time=self.config.start_time,
end_time=self.config.end_time,
)
Expand All @@ -892,9 +900,9 @@ def find_temp_tables(
matched_temp_tables: List[TempTableRow] = []

for table_name in temp_table_names:
prefixes = RedshiftQuery.get_temp_table_clause(table_name)
prefixes = self.queries.get_temp_table_clause(table_name)
prefixes.extend(
RedshiftQuery.get_temp_table_clause(table_name.split(".")[-1])
self.queries.get_temp_table_clause(table_name.split(".")[-1])
)

for row in temp_table_rows:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
LineageCollectorType,
RedshiftLineageExtractor,
)
from datahub.ingestion.source.redshift.query import RedshiftQuery
from datahub.ingestion.source.redshift.query import (
RedshiftCommonQuery,
RedshiftProvisionedQuery,
RedshiftServerlessQuery,
)
from datahub.ingestion.source.redshift.redshift_schema import (
LineageRow,
RedshiftDataDictionary,
Expand Down Expand Up @@ -64,6 +68,10 @@ def __init__(
)
self.report.sql_aggregator = self.aggregator.report

self.queries: RedshiftCommonQuery = RedshiftProvisionedQuery()
if self.config.is_serverless:
self.queries = RedshiftServerlessQuery()

self._lineage_v1 = RedshiftLineageExtractor(
config=config,
report=report,
Expand Down Expand Up @@ -131,7 +139,7 @@ def build(
LineageMode.MIXED,
}:
# Populate lineage by parsing table creating sqls
query = RedshiftQuery.list_insert_create_queries_sql(
query = self.queries.list_insert_create_queries_sql(
db_name=self.database,
start_time=self.start_time,
end_time=self.end_time,
Expand All @@ -148,7 +156,7 @@ def build(
LineageMode.MIXED,
}:
# Populate lineage by getting upstream tables from stl_scan redshift table
query = RedshiftQuery.stl_scan_based_lineage_query(
query = self.queries.stl_scan_based_lineage_query(
self.database,
self.start_time,
self.end_time,
Expand All @@ -159,13 +167,13 @@ def build(

if self.config.include_views and self.config.include_view_lineage:
# Populate lineage for views
query = RedshiftQuery.view_lineage_query()
query = self.queries.view_lineage_query()
populate_calls.append(
(LineageCollectorType.VIEW, query, self._process_view_lineage)
)

# Populate lineage for late binding views
query = RedshiftQuery.list_late_view_ddls_query()
query = self.queries.list_late_view_ddls_query()
populate_calls.append(
(
LineageCollectorType.VIEW_DDL_SQL_PARSING,
Expand All @@ -176,7 +184,7 @@ def build(

if self.config.include_copy_lineage:
# Populate lineage for copy commands.
query = RedshiftQuery.list_copy_commands_sql(
query = self.queries.list_copy_commands_sql(
db_name=self.database,
start_time=self.start_time,
end_time=self.end_time,
Expand All @@ -187,7 +195,7 @@ def build(

if self.config.include_unload_lineage:
# Populate lineage for unload commands.
query = RedshiftQuery.list_unload_commands_sql(
query = self.queries.list_unload_commands_sql(
db_name=self.database,
start_time=self.start_time,
end_time=self.end_time,
Expand Down
Loading

0 comments on commit 8606a67

Please sign in to comment.