Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Jun 6, 2024
2 parents f01846a + 3b8cda6 commit 3d02c26
Show file tree
Hide file tree
Showing 12 changed files with 277 additions and 78 deletions.
2 changes: 1 addition & 1 deletion docs/lineage/airflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ enabled = True # default
| -------------------------- | -------------------- | ---------------------------------------------------------------------------------------- |
| enabled | true | If the plugin should be enabled. |
| conn_id | datahub_rest_default | The name of the datahub rest connection. |
| cluster | prod | name of the airflow cluster |
| cluster | prod | name of the airflow cluster, this is equivalent to the `env` of the instance |
| capture_ownership_info | true | Extract DAG ownership. |
| capture_tags_info | true | Extract DAG tags. |
| capture_executions | true | Extract task runs and success/failure statuses. This will show up in DataHub "Runs" tab. |
Expand Down
33 changes: 33 additions & 0 deletions docs/managed-datahub/subscription-and-notification.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,39 @@ You can view and manage the group’s subscriptions on the group’s page on Dat
<img width="70%" src="https://raw.githubusercontent.com/datahub-project/static-assets/main/imgs/saas/subscription-and-notification/s_n-group-subscriptions.png"/>
</p>

### Subscribing to Assertions
You can always subscribe to _all assertion status changes_ on a table using the steps outlined in the earlier sections. However, in some cases you may want to only be notified about specific assertions on a table. For instance, a table may contain several subsets of information, segmented by a category column - so there may be several different checks for each category. As a consumer, you may only care about the freshness check that runs on one specific category of this larger table.

You can subscribe to individual assertions by clicking the bell button on the assertion itself - either in the list view:
<p align="center">
<img width="70%" alt="1" src="https://raw.githubusercontent.com/datahub-project/static-assets/main/imgs/saas/subscription-and-notification/s_n-assertion-sub-1.jpg" />
</p>

Or on the assertion's profile page:
<p align="center">
<img width="70%" alt="2" src="https://raw.githubusercontent.com/datahub-project/static-assets/main/imgs/saas/subscription-and-notification/s_n-assertion-sub-2.jpg" />
</p>


Note: if you are subscribed to all assertions at the dataset level, then you will not be able to **Unsubscribe** from an individual assertion.
<p align="center">
<img width="70%" alt="3" src="https://raw.githubusercontent.com/datahub-project/static-assets/main/imgs/saas/subscription-and-notification/s_n-assertion-sub-unsub-1.jpg" />
</p>

You must first remove your dataset-level subscription:
<p align="center">
<img width="70%" alt="4" src="https://raw.githubusercontent.com/datahub-project/static-assets/main/imgs/saas/subscription-and-notification/s_n-assertion-sub-unsub-2.jpg" />
<img width="70%" alt="5" src="https://raw.githubusercontent.com/datahub-project/static-assets/main/imgs/saas/subscription-and-notification/s_n-assertion-sub-unsub-3.jpg" />
</p>


Then select individual assertions you'd like to subscribe to:
<p align="center">
<img width="70%" alt="7" src="https://raw.githubusercontent.com/datahub-project/static-assets/main/imgs/saas/subscription-and-notification/s_n-assertion-sub-resub-1.jpg" />
</p>



## FAQ

<details>
Expand Down
23 changes: 23 additions & 0 deletions metadata-ingestion/developing.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,30 @@ cd metadata-ingestion-modules/airflow-plugin
../../gradlew :metadata-ingestion-modules:airflow-plugin:installDev
source venv/bin/activate
datahub version # should print "DataHub CLI version: unavailable (installed in develop mode)"

# start the airflow web server
export AIRFLOW_HOME=~/airflow
airflow webserver --port 8090 -d

# start the airflow scheduler
airflow scheduler

# access the airflow service and run any of the DAG
# open http://localhost:8090/
# select any DAG and click on the `play arrow` button to start the DAG

# add the debug lines in the codebase, i.e. in ./src/datahub_airflow_plugin/datahub_listener.py
logger.debug("this is the sample debug line")

# run the DAG again and you can see the debug lines in the task_run log at,
#1. click on the `timestamp` in the `Last Run` column
#2. select the task
#3. click on the `log` option
```


> **P.S. if you are not able to see the log lines, then restart the `airflow scheduler` and rerun the DAG**
### (Optional) Set up your Python environment for developing on Dagster Plugin

From the repository root:
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/entrypoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
"--log-file",
type=click.Path(dir_okay=False),
default=None,
help="Enable debug logging.",
help="Write debug-level logs to a file.",
)
@click.version_option(
version=datahub_package.nice_version_name(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ def classification_workunit_processor(
table_name = ".".join(table_id)
if not classification_handler.is_classification_enabled_for_table(table_name):
yield from table_wu_generator
return

for wu in table_wu_generator:
maybe_schema_metadata = wu.get_aspect_of_type(SchemaMetadata)
if (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,7 @@ def _generate_single_profile(
if custom_sql is not None:
ge_config["query"] = custom_sql

batch = None
with self._ge_context() as ge_context, PerfTimer() as timer:
try:
logger.info(f"Profiling {pretty_name}")
Expand Down Expand Up @@ -1219,7 +1220,7 @@ def _generate_single_profile(
self.report.report_warning(pretty_name, f"Profiling exception {e}")
return None
finally:
if self.base_engine.engine.name == TRINO:
if batch is not None and self.base_engine.engine.name == TRINO:
self._drop_trino_temp_table(batch)

def _get_ge_dataset(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ def __init__(self, ctx: PipelineContext, config: SnowflakeV2Config):
graph=self.ctx.graph,
generate_usage_statistics=False,
generate_operations=False,
format_queries=self.config.format_sql_queries,
)
self.report.sql_aggregator = self.aggregator.report

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ class SqlAggregatorReport(Report):
# SQL parsing (over all invocations).
num_sql_parsed: int = 0
sql_parsing_timer: PerfTimer = dataclasses.field(default_factory=PerfTimer)
sql_fingerprinting_timer: PerfTimer = dataclasses.field(default_factory=PerfTimer)
sql_formatting_timer: PerfTimer = dataclasses.field(default_factory=PerfTimer)

# Other lineage loading metrics.
num_known_query_lineage: int = 0
Expand Down Expand Up @@ -381,7 +383,8 @@ def _initialize_schema_resolver_from_graph(self, graph: DataHubGraph) -> None:

def _maybe_format_query(self, query: str) -> str:
if self.format_queries:
return try_format_query(query, self.platform.platform_name)
with self.report.sql_formatting_timer:
return try_format_query(query, self.platform.platform_name)
return query

def add_known_query_lineage(
Expand All @@ -405,9 +408,12 @@ def add_known_query_lineage(
self.report.num_known_query_lineage += 1

# Generate a fingerprint for the query.
query_fingerprint = get_query_fingerprint(
known_query_lineage.query_text, platform=self.platform.platform_name
)
with self.report.sql_fingerprinting_timer:
query_fingerprint = get_query_fingerprint(
known_query_lineage.query_text,
platform=self.platform.platform_name,
fast=True,
)
formatted_query = self._maybe_format_query(known_query_lineage.query_text)

# Register the query.
Expand Down
88 changes: 83 additions & 5 deletions metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import functools
import hashlib
import logging
import re
from typing import Dict, Iterable, Optional, Tuple, Union

import sqlglot
Expand Down Expand Up @@ -109,6 +110,80 @@ def _expression_to_string(
return expression.sql(dialect=get_dialect(platform))


_BASIC_NORMALIZATION_RULES = {
# Remove /* */ comments.
re.compile(r"/\*.*?\*/", re.DOTALL): "",
# Remove -- comments.
re.compile(r"--.*$"): "",
# Replace all runs of whitespace with a single space.
re.compile(r"\s+"): " ",
# Remove leading and trailing whitespace and trailing semicolons.
re.compile(r"^\s+|[\s;]+$"): "",
# Replace anything that looks like a number with a placeholder.
re.compile(r"\b\d+\b"): "?",
# Replace anything that looks like a string with a placeholder.
re.compile(r"'[^']*'"): "?",
# Replace sequences of IN/VALUES with a single placeholder.
re.compile(r"\b(IN|VALUES)\s*\(\?(?:, \?)*\)", re.IGNORECASE): r"\1 (?)",
# Normalize parenthesis spacing.
re.compile(r"\( "): "(",
re.compile(r" \)"): ")",
}
_TABLE_NAME_NORMALIZATION_RULES = {
# Replace UUID-like strings with a placeholder (both - and _ variants).
re.compile(
r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}",
re.IGNORECASE,
): "00000000-0000-0000-0000-000000000000",
re.compile(
r"[0-9a-f]{8}_[0-9a-f]{4}_[0-9a-f]{4}_[0-9a-f]{4}_[0-9a-f]{12}",
re.IGNORECASE,
): "00000000_0000_0000_0000_000000000000",
# GE temporary table names (prefix + 8 digits of a UUIDv4)
re.compile(
r"\b(ge_tmp_|ge_temp_|gx_temp_)[0-9a-f]{8}\b", re.IGNORECASE
): r"\1abcdefgh",
# Date-suffixed table names (e.g. _20210101)
re.compile(r"\b(\w+)(19|20)\d{4}\b"): r"\1YYYYMM",
re.compile(r"\b(\w+)(19|20)\d{6}\b"): r"\1YYYYMMDD",
re.compile(r"\b(\w+)(19|20)\d{8}\b"): r"\1YYYYMMDDHH",
re.compile(r"\b(\w+)(19|20)\d{10}\b"): r"\1YYYYMMDDHHMM",
}


def generalize_query_fast(
expression: sqlglot.exp.ExpOrStr,
dialect: DialectOrStr,
change_table_names: bool = False,
) -> str:
"""Variant of `generalize_query` that only does basic normalization.
Args:
expression: The SQL query to generalize.
dialect: The SQL dialect to use.
change_table_names: If True, replace table names with placeholders. Note
that this should only be used for query filtering purposes, as it
violates the general assumption that the queries with the same fingerprint
have the same lineage/usage/etc.
Returns:
The generalized SQL query.
"""

if isinstance(expression, sqlglot.exp.Expression):
expression = expression.sql(dialect=get_dialect(dialect))
query_text = expression

REGEX_REPLACEMENTS = {
**_BASIC_NORMALIZATION_RULES,
**(_TABLE_NAME_NORMALIZATION_RULES if change_table_names else {}),
}

for pattern, replacement in REGEX_REPLACEMENTS.items():
query_text = pattern.sub(replacement, query_text)
return query_text


def generalize_query(expression: sqlglot.exp.ExpOrStr, dialect: DialectOrStr) -> str:
"""
Generalize/normalize a SQL query.
Expand Down Expand Up @@ -172,11 +247,14 @@ def generate_hash(text: str) -> str:


def get_query_fingerprint_debug(
expression: sqlglot.exp.ExpOrStr, platform: DialectOrStr
expression: sqlglot.exp.ExpOrStr, platform: DialectOrStr, fast: bool = False
) -> Tuple[str, Optional[str]]:
try:
dialect = get_dialect(platform)
expression_sql = generalize_query(expression, dialect=dialect)
if not fast:
dialect = get_dialect(platform)
expression_sql = generalize_query(expression, dialect=dialect)
else:
expression_sql = generalize_query_fast(expression, dialect=platform)
except (ValueError, sqlglot.errors.SqlglotError) as e:
if not isinstance(expression, str):
raise
Expand All @@ -193,7 +271,7 @@ def get_query_fingerprint_debug(


def get_query_fingerprint(
expression: sqlglot.exp.ExpOrStr, platform: DialectOrStr
expression: sqlglot.exp.ExpOrStr, platform: DialectOrStr, fast: bool = False
) -> str:
"""Get a fingerprint for a SQL query.
Expand All @@ -215,7 +293,7 @@ def get_query_fingerprint(
The fingerprint for the SQL query.
"""

return get_query_fingerprint_debug(expression, platform)[0]
return get_query_fingerprint_debug(expression, platform, fast=fast)[0]


@functools.lru_cache(maxsize=FORMAT_QUERY_CACHE_SIZE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ def test_snowflake_basic(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
include_table_lineage=True,
include_view_lineage=True,
include_usage_stats=True,
format_sql_queries=True,
validate_upstreams_against_patterns=False,
include_operational_stats=True,
email_as_user_identifier=True,
Expand Down Expand Up @@ -213,6 +214,7 @@ def test_snowflake_private_link(pytestconfig, tmp_path, mock_time, mock_datahub_
include_views=True,
include_view_lineage=True,
include_usage_stats=False,
format_sql_queries=True,
incremental_lineage=False,
include_operational_stats=False,
platform_instance="instance1",
Expand Down
Loading

0 comments on commit 3d02c26

Please sign in to comment.