Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored Oct 18, 2024
2 parents 14dd3c0 + 011e5b9 commit f111894
Show file tree
Hide file tree
Showing 26 changed files with 448 additions and 129 deletions.
6 changes: 5 additions & 1 deletion docs/lineage/airflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ conn_id = datahub_rest_default # or datahub_kafka_default
```

| Name | Default value | Description |
|----------------------------|----------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| -------------------------- | -------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| enabled | true | If the plugin should be enabled. |
| conn_id | datahub_rest_default | The name of the datahub connection you set in step 1. |
| cluster | prod | name of the airflow cluster |
Expand Down Expand Up @@ -191,6 +191,10 @@ These operators are supported by OpenLineage, but we haven't tested them yet:
There's also a few operators (e.g. BashOperator, PythonOperator) that have custom extractors, but those extractors don't generate lineage.
-->

Known limitations:

- We do not fully support operators that run multiple SQL statements at once. In these cases, we'll only capture lineage from the first SQL statement.

## Manual Lineage Annotation

### Using `inlets` and `outlets`
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
sqlglot_lib = {
# Using an Acryl fork of sqlglot.
# https://github.com/tobymao/sqlglot/compare/main...hsheth2:sqlglot:main?expand=1
"acryl-sqlglot[rs]==25.20.2.dev6",
"acryl-sqlglot[rs]==25.25.2.dev9",
}

classification_lib = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,34 @@ def scroll_urns_by_filter(
self,
entity_type: str,
extra_or_filters: List[Dict[str, str]],
extra_and_filters: List[Dict[str, str]] = [],
) -> Iterable[str]:
"""
Scroll through all urns that match the given filters
"""

key_aspect = self.ENTITY_KEY_ASPECT_MAP.get(entity_type)
assert key_aspect, f"No key aspect found for entity type {entity_type}"
if extra_or_filters and extra_and_filters:
raise ValueError(
"Only one of extra_or_filters and extra_and_filters should be provided"
)

count = 1000
query = " OR ".join(
[f"{filter['field']}:{filter['value']}" for filter in extra_or_filters]
query = (
" OR ".join(
[
f"{filter['field']}:\"{filter['value']}\""
for filter in extra_or_filters
]
)
if extra_or_filters
else " AND ".join(
[
f"{filter['field']}:\"{filter['value']}\""
for filter in extra_and_filters
]
)
)
scroll_id = None
while True:
Expand Down Expand Up @@ -252,3 +269,23 @@ def search_by_key(

def delete(self, graph_client: DataHubGraph, hard: bool = True) -> None:
graph_client.delete_entity(str(PlatformResourceUrn(self.id)), hard=hard)

@staticmethod
def search_by_filters(
graph_client: DataHubGraph,
and_filters: List[Dict[str, str]] = [],
or_filters: List[Dict[str, str]] = [],
) -> Iterable["PlatformResource"]:
if and_filters and or_filters:
raise ValueError(
"Only one of and_filters and or_filters should be provided"
)
openapi_client = OpenAPIGraphClient(graph_client)
for urn in openapi_client.scroll_urns_by_filter(
entity_type="platformResource",
extra_or_filters=or_filters if or_filters else [],
extra_and_filters=and_filters if and_filters else [],
):
platform_resource = PlatformResource.from_datahub(graph_client, urn)
if platform_resource:
yield platform_resource
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:

self.report.set_ingestion_stage("*", QUERIES_EXTRACTION)

queries_extractor = BigQueryQueriesExtractor(
with BigQueryQueriesExtractor(
connection=self.config.get_bigquery_client(),
schema_api=self.bq_schema_extractor.schema_api,
config=BigQueryQueriesExtractorConfig(
Expand All @@ -288,9 +288,10 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
identifiers=self.identifiers,
schema_resolver=self.sql_parser_schema_resolver,
discovered_tables=self.bq_schema_extractor.table_refs,
)
self.report.queries_extractor = queries_extractor.report
yield from queries_extractor.get_workunits_internal()
) as queries_extractor:
self.report.queries_extractor = queries_extractor.report
yield from queries_extractor.get_workunits_internal()

else:
if self.config.include_usage_statistics:
yield from self.usage_extractor.get_usage_workunits(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:

def get_report(self) -> BigQueryQueriesSourceReport:
return self.report

def close(self) -> None:
self.queries_extractor.close()
self.connection.close()
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
BaseTimeWindowConfig,
get_time_bucket,
)
from datahub.ingestion.api.closeable import Closeable
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.api.source_helpers import auto_workunit
from datahub.ingestion.api.workunit import MetadataWorkUnit
Expand Down Expand Up @@ -114,7 +115,7 @@ class BigQueryQueriesExtractorConfig(BigQueryBaseConfig):
)


class BigQueryQueriesExtractor:
class BigQueryQueriesExtractor(Closeable):
"""
Extracts query audit log and generates usage/lineage/operation workunits.
Expand Down Expand Up @@ -181,6 +182,7 @@ def __init__(
is_allowed_table=self.is_allowed_table,
format_queries=False,
)

self.report.sql_aggregator = self.aggregator.report
self.report.num_discovered_tables = (
len(self.discovered_tables) if self.discovered_tables else None
Expand Down Expand Up @@ -273,12 +275,14 @@ def get_workunits_internal(
self.report.num_unique_queries = len(queries_deduped)
logger.info(f"Found {self.report.num_unique_queries} unique queries")

with self.report.audit_log_load_timer:
with self.report.audit_log_load_timer, queries_deduped:
i = 0
for _, query_instances in queries_deduped.items():
for query in query_instances.values():
if i > 0 and i % 10000 == 0:
logger.info(f"Added {i} query log entries to SQL aggregator")
logger.info(
f"Added {i} query log equeries_dedupedntries to SQL aggregator"
)
if self.report.sql_aggregator:
logger.info(self.report.sql_aggregator.as_string())

Expand All @@ -287,6 +291,11 @@ def get_workunits_internal(

yield from auto_workunit(self.aggregator.gen_metadata())

if not use_cached_audit_log:
queries.close()
shared_connection.close()
audit_log_file.unlink(missing_ok=True)

def deduplicate_queries(
self, queries: FileBackedList[ObservedQuery]
) -> FileBackedDict[Dict[int, ObservedQuery]]:
Expand Down Expand Up @@ -404,6 +413,9 @@ def _parse_audit_log_row(self, row: BigQueryJob) -> ObservedQuery:

return entry

def close(self) -> None:
self.aggregator.close()


def _extract_query_text(row: BigQueryJob) -> str:
# We wrap select statements in a CTE to make them parseable as DML statement.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1074,8 +1074,8 @@ def add_node_to_cll_list(dbt_name: str) -> None:
for upstream in all_nodes_map[dbt_name].upstream_nodes:
schema_nodes.add(upstream)

upstream_node = all_nodes_map[upstream]
if upstream_node.is_ephemeral_model():
upstream_node = all_nodes_map.get(upstream)
if upstream_node and upstream_node.is_ephemeral_model():
add_node_to_cll_list(upstream)

cll_nodes.add(dbt_name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,25 @@ def report_redshift_failure(
error_message = str(e).lower()
if "permission denied" in error_message:
if "svv_table_info" in error_message:
report.report_failure(
report.failure(
title="Permission denied",
message="Failed to extract metadata due to insufficient permission to access 'svv_table_info' table. Please ensure the provided database user has access.",
exc=e,
)
elif "svl_user_info" in error_message:
report.report_failure(
report.failure(
title="Permission denied",
message="Failed to extract metadata due to insufficient permission to access 'svl_user_info' table. Please ensure the provided database user has access.",
exc=e,
)
else:
report.report_failure(
report.failure(
title="Permission denied",
message="Failed to extract metadata due to insufficient permissions.",
exc=e,
)
else:
report.report_failure(
report.failure(
title="Failed to extract some metadata",
message="Failed to extract some metadata from Redshift.",
exc=e,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def parse_alter_table_rename(default_schema: str, query: str) -> Tuple[str, str,
assert isinstance(parsed_query, sqlglot.exp.Alter)
prev_name = parsed_query.this.name
rename_clause = parsed_query.args["actions"][0]
assert isinstance(rename_clause, sqlglot.exp.RenameTable)
assert isinstance(rename_clause, sqlglot.exp.AlterRename)
new_name = rename_clause.this.name

schema = parsed_query.this.db or default_schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import redshift_connector

from datahub.emitter import mce_builder
from datahub.ingestion.api.closeable import Closeable
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.redshift.config import LineageMode, RedshiftConfig
Expand Down Expand Up @@ -39,7 +40,7 @@
logger = logging.getLogger(__name__)


class RedshiftSqlLineageV2:
class RedshiftSqlLineageV2(Closeable):
# does lineage and usage based on SQL parsing.

def __init__(
Expand All @@ -56,6 +57,7 @@ def __init__(
self.context = context

self.database = database

self.aggregator = SqlParsingAggregator(
platform=self.platform,
platform_instance=self.config.platform_instance,
Expand Down Expand Up @@ -436,3 +438,6 @@ def generate(self) -> Iterable[MetadataWorkUnit]:
message="Unexpected error(s) while attempting to extract lineage from SQL queries. See the full logs for more details.",
context=f"Query Parsing Failures: {self.aggregator.report.observed_query_parse_failures}",
)

def close(self) -> None:
self.aggregator.close()
Original file line number Diff line number Diff line change
Expand Up @@ -569,8 +569,7 @@ def temp_table_ddl_query(start_time: datetime, end_time: datetime) -> str:

end_time_str: str = end_time.strftime(redshift_datetime_format)

return rf"""\
-- DataHub Redshift Source temp table DDL query
return rf"""-- DataHub Redshift Source temp table DDL query
select
*
from (
Expand Down Expand Up @@ -645,7 +644,7 @@ def temp_table_ddl_query(start_time: datetime, end_time: datetime) -> str:
)
where
rn = 1
"""
"""

# Add this join to the sql query for more metrics on completed queries
# LEFT JOIN svl_query_metrics_summary sqms ON ss.query = sqms.query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,24 +451,23 @@ def _extract_metadata(
)

if self.config.use_lineage_v2:
lineage_extractor = RedshiftSqlLineageV2(
with RedshiftSqlLineageV2(
config=self.config,
report=self.report,
context=self.ctx,
database=database,
redundant_run_skip_handler=self.redundant_lineage_run_skip_handler,
)

yield from lineage_extractor.aggregator.register_schemas_from_stream(
self.process_schemas(connection, database)
)
) as lineage_extractor:
yield from lineage_extractor.aggregator.register_schemas_from_stream(
self.process_schemas(connection, database)
)

self.report.report_ingestion_stage_start(LINEAGE_EXTRACTION)
yield from self.extract_lineage_v2(
connection=connection,
database=database,
lineage_extractor=lineage_extractor,
)
self.report.report_ingestion_stage_start(LINEAGE_EXTRACTION)
yield from self.extract_lineage_v2(
connection=connection,
database=database,
lineage_extractor=lineage_extractor,
)

all_tables = self.get_all_tables()
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from datahub.configuration.connection_resolver import auto_connection_resolver
from datahub.configuration.oauth import OAuthConfiguration, OAuthIdentityProvider
from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.ingestion.api.closeable import Closeable
from datahub.ingestion.source.snowflake.constants import (
CLIENT_PREFETCH_THREADS,
CLIENT_SESSION_KEEP_ALIVE,
Expand Down Expand Up @@ -364,7 +365,7 @@ def get_connection(self) -> "SnowflakeConnection":
) from e


class SnowflakeConnection:
class SnowflakeConnection(Closeable):
_connection: NativeSnowflakeConnection

def __init__(self, connection: NativeSnowflakeConnection):
Expand Down
Loading

0 comments on commit f111894

Please sign in to comment.