Skip to content

Commit

Permalink
feat: multi-query lineage for temp upstreams (#11708)
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate authored Oct 25, 2024
1 parent ffefb9d commit 87fa5b8
Show file tree
Hide file tree
Showing 13 changed files with 493 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,9 @@ def _process_table_renames(
env=self.config.env,
)

table_renames[new_urn] = TableRename(prev_urn, new_urn, query_text)
table_renames[new_urn] = TableRename(
prev_urn, new_urn, query_text, timestamp=rename_row.start_time
)

# We want to generate lineage for the previous name too.
all_tables[database][schema].add(prev_name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,15 +354,23 @@ def _parse_audit_log_row(
object_modified_by_ddl = res["object_modified_by_ddl"]

if object_modified_by_ddl and not objects_modified:
ddl_entry: Optional[Union[TableRename, TableSwap]] = None
known_ddl_entry: Optional[Union[TableRename, TableSwap]] = None
with self.structured_reporter.report_exc(
"Error fetching ddl lineage from Snowflake"
):
ddl_entry = self.parse_ddl_query(
res["query_text"], object_modified_by_ddl
known_ddl_entry = self.parse_ddl_query(
res["query_text"],
res["session_id"],
res["query_start_time"],
object_modified_by_ddl,
)
return ddl_entry

if known_ddl_entry:
return known_ddl_entry
elif direct_objects_accessed:
# Unknown ddl relevant for usage. We want to continue execution here
pass
else:
return None
upstreams = []
column_usage = {}

Expand Down Expand Up @@ -459,8 +467,13 @@ def _parse_audit_log_row(
return entry

def parse_ddl_query(
self, query: str, object_modified_by_ddl: dict
self,
query: str,
session_id: str,
timestamp: datetime,
object_modified_by_ddl: dict,
) -> Optional[Union[TableRename, TableSwap]]:
timestamp = timestamp.astimezone(timezone.utc)
if object_modified_by_ddl[
"operationType"
] == "ALTER" and object_modified_by_ddl["properties"].get("swapTargetName"):
Expand All @@ -476,7 +489,7 @@ def parse_ddl_query(
)
)

return TableSwap(urn1, urn2, query)
return TableSwap(urn1, urn2, query, session_id, timestamp)
elif object_modified_by_ddl[
"operationType"
] == "RENAME_TABLE" and object_modified_by_ddl["properties"].get("objectName"):
Expand All @@ -492,7 +505,7 @@ def parse_ddl_query(
)
)

return TableRename(original_un, new_urn, query)
return TableRename(original_un, new_urn, query, session_id, timestamp)
else:
self.report.num_ddl_queries_dropped += 1
return None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ class TableRename:
new_urn: UrnStr
query: Optional[str] = None
session_id: str = _MISSING_SESSION_ID
timestamp: Optional[datetime] = None


@dataclasses.dataclass
Expand All @@ -188,6 +189,7 @@ class TableSwap:
urn2: UrnStr
query: Optional[str] = None
session_id: str = _MISSING_SESSION_ID
timestamp: Optional[datetime] = None

def id(self) -> str:
# TableSwap(A,B) is same as TableSwap(B,A)
Expand Down Expand Up @@ -444,8 +446,7 @@ def __init__(

# Map of session ID -> {temp table name -> query id}
# Needs to use the query_map to find the info about the query.
# This assumes that a temp table is created at most once per session.
self._temp_lineage_map = FileBackedDict[Dict[UrnStr, QueryId]](
self._temp_lineage_map = FileBackedDict[Dict[UrnStr, OrderedSet[QueryId]]](
shared_connection=self._shared_connection, tablename="temp_lineage_map"
)
self._exit_stack.push(self._temp_lineage_map)
Expand Down Expand Up @@ -903,22 +904,20 @@ def add_preparsed_query(
parsed.query_type.is_create()
and parsed.query_type_props.get("temporary")
)
or self.is_temp_table(out_table)
or (
self.is_temp_table(out_table)
or (
require_out_table_schema
and not self._schema_resolver.has_urn(out_table)
)
require_out_table_schema
and not self._schema_resolver.has_urn(out_table)
)
):
# Infer the schema of the output table and track it for later.
if parsed.inferred_schema is not None:
self._inferred_temp_schemas[query_fingerprint] = parsed.inferred_schema

# Also track the lineage for the temp table, for merging purposes later.
self._temp_lineage_map.for_mutation(parsed.session_id, {})[
out_table
] = query_fingerprint
self._temp_lineage_map.for_mutation(parsed.session_id, {}).setdefault(
out_table, OrderedSet()
).add(query_fingerprint)

# Also update schema resolver for missing session id
if parsed.session_id == _MISSING_SESSION_ID and parsed.inferred_schema:
Expand Down Expand Up @@ -962,6 +961,8 @@ def add_table_rename(
downstream_urn=table_rename.new_urn,
upstream_urn=table_rename.original_urn,
),
session_id=table_rename.session_id,
timestamp=table_rename.timestamp,
)
)

Expand Down Expand Up @@ -996,6 +997,11 @@ def add_table_swap(self, table_swap: TableSwap) -> None:
f"\nalter table {table1} swap with {table2}",
upstreams=[table_swap.urn1],
downstream=table_swap.urn2,
column_lineage=self._generate_identity_column_lineage(
upstream_urn=table_swap.urn1, downstream_urn=table_swap.urn2
),
session_id=table_swap.session_id,
timestamp=table_swap.timestamp,
)
)

Expand All @@ -1007,6 +1013,11 @@ def add_table_swap(self, table_swap: TableSwap) -> None:
f"alter table {table2} swap with {table1}",
upstreams=[table_swap.urn2],
downstream=table_swap.urn1,
column_lineage=self._generate_identity_column_lineage(
upstream_urn=table_swap.urn2, downstream_urn=table_swap.urn1
),
session_id=table_swap.session_id,
timestamp=table_swap.timestamp,
)
)

Expand All @@ -1018,10 +1029,13 @@ def _make_schema_resolver_for_session(
schema_resolver = self._missing_session_schema_resolver
elif session_id in self._temp_lineage_map:
temp_table_schemas: Dict[str, Optional[List[models.SchemaFieldClass]]] = {}
for temp_table_urn, query_id in self._temp_lineage_map[session_id].items():
temp_table_schemas[temp_table_urn] = self._inferred_temp_schemas.get(
query_id
)
for temp_table_urn, query_ids in self._temp_lineage_map[session_id].items():
for query_id in query_ids:
temp_table_schemas[
temp_table_urn
] = self._inferred_temp_schemas.get(query_id)
if temp_table_schemas:
break

if temp_table_schemas:
schema_resolver = self._schema_resolver.with_temp_tables(
Expand Down Expand Up @@ -1515,6 +1529,13 @@ class QueryLineageInfo:
column_lineage: List[ColumnLineageInfo]
confidence_score: float

def _merge_lineage_from(self, other_query: "QueryLineageInfo") -> None:
self.upstreams += other_query.upstreams
self.column_lineage += other_query.column_lineage
self.confidence_score = min(
self.confidence_score, other_query.confidence_score
)

def _recurse_into_query(
query: QueryMetadata, recursion_path: List[QueryId]
) -> QueryLineageInfo:
Expand All @@ -1531,15 +1552,24 @@ def _recurse_into_query(
# Find all the temp tables that this query depends on.
temp_upstream_queries: Dict[UrnStr, QueryLineageInfo] = {}
for upstream in query.upstreams:
upstream_query_id = self._temp_lineage_map.get(session_id, {}).get(
upstream_query_ids = self._temp_lineage_map.get(session_id, {}).get(
upstream
)
if upstream_query_id:
upstream_query = self._query_map.get(upstream_query_id)
if upstream_query:
temp_upstream_queries[upstream] = _recurse_into_query(
upstream_query, recursion_path
)
if upstream_query_ids:
for upstream_query_id in upstream_query_ids:
upstream_query = self._query_map.get(upstream_query_id)
if upstream_query:
temp_query_lineage_info = _recurse_into_query(
upstream_query, recursion_path
)
if upstream in temp_upstream_queries:
temp_upstream_queries[upstream]._merge_lineage_from(
temp_query_lineage_info
)
else:
temp_upstream_queries[
upstream
] = temp_query_lineage_info

# Compute merged upstreams.
new_upstreams = OrderedSet[UrnStr]()
Expand Down Expand Up @@ -1605,7 +1635,17 @@ def _recurse_into_query(
# - Update the lineage info
# - Update the query text to combine the queries

composite_query_id = self._composite_query_id(composed_of_queries)
ordered_queries = [
self._query_map[query_id] for query_id in reversed(composed_of_queries)
]
if all(q.latest_timestamp for q in ordered_queries):
ordered_queries = sorted(
ordered_queries,
key=lambda query: make_ts_millis(query.latest_timestamp) or 0,
)
composite_query_id = self._composite_query_id(
[q.query_id for q in ordered_queries]
)
composed_of_queries_truncated: LossyList[str] = LossyList()
for query_id in composed_of_queries:
composed_of_queries_truncated.append(query_id)
Expand All @@ -1614,10 +1654,7 @@ def _recurse_into_query(
] = composed_of_queries_truncated

merged_query_text = ";\n\n".join(
[
self._query_map[query_id].formatted_query_string
for query_id in reversed(composed_of_queries)
]
[q.formatted_query_string for q in ordered_queries]
)

resolved_query = dataclasses.replace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:bigquery,gcp-staging.smoke_test_db.base_table,PROD)",
"type": "TRANSFORMED",
"query": "urn:li:query:composite_0db44e02f671b69df68565346e9d2b68c7166fccf75bd494f34560bfa16c381b"
"query": "urn:li:query:composite_29c38b444a8740d9cc549168e2e0e3657fc00430520f615119bfc3e9fb94112d"
}
]
}
Expand All @@ -206,7 +206,7 @@
},
{
"entityType": "query",
"entityUrn": "urn:li:query:composite_0db44e02f671b69df68565346e9d2b68c7166fccf75bd494f34560bfa16c381b",
"entityUrn": "urn:li:query:composite_29c38b444a8740d9cc549168e2e0e3657fc00430520f615119bfc3e9fb94112d",
"changeType": "UPSERT",
"aspectName": "queryProperties",
"aspect": {
Expand Down Expand Up @@ -425,7 +425,7 @@
},
{
"entityType": "query",
"entityUrn": "urn:li:query:composite_0db44e02f671b69df68565346e9d2b68c7166fccf75bd494f34560bfa16c381b",
"entityUrn": "urn:li:query:composite_29c38b444a8740d9cc549168e2e0e3657fc00430520f615119bfc3e9fb94112d",
"changeType": "UPSERT",
"aspectName": "querySubjects",
"aspect": {
Expand Down Expand Up @@ -508,7 +508,7 @@
},
{
"entityType": "query",
"entityUrn": "urn:li:query:composite_0db44e02f671b69df68565346e9d2b68c7166fccf75bd494f34560bfa16c381b",
"entityUrn": "urn:li:query:composite_29c38b444a8740d9cc549168e2e0e3657fc00430520f615119bfc3e9fb94112d",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
Expand Down Expand Up @@ -7890,7 +7890,7 @@
},
{
"entityType": "query",
"entityUrn": "urn:li:query:composite_0db44e02f671b69df68565346e9d2b68c7166fccf75bd494f34560bfa16c381b",
"entityUrn": "urn:li:query:composite_29c38b444a8740d9cc549168e2e0e3657fc00430520f615119bfc3e9fb94112d",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.upstream1,PROD)",
"type": "TRANSFORMED",
"query": "urn:li:query:composite_c89ee7c127c64a5d3a42ee875305087991891c80f42a25012910524bd2c77c45"
"query": "urn:li:query:composite_48c238412066895ccad5d27f9425ce969b2c0633203627eb476d0c9e5357825a"
},
{
"auditStamp": {
Expand All @@ -31,7 +31,7 @@
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.upstream2,PROD)",
"type": "TRANSFORMED",
"query": "urn:li:query:composite_c89ee7c127c64a5d3a42ee875305087991891c80f42a25012910524bd2c77c45"
"query": "urn:li:query:composite_48c238412066895ccad5d27f9425ce969b2c0633203627eb476d0c9e5357825a"
}
],
"fineGrainedLineages": [
Expand All @@ -45,7 +45,7 @@
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.prod_foo,PROD),a)"
],
"confidenceScore": 0.2,
"query": "urn:li:query:composite_c89ee7c127c64a5d3a42ee875305087991891c80f42a25012910524bd2c77c45"
"query": "urn:li:query:composite_48c238412066895ccad5d27f9425ce969b2c0633203627eb476d0c9e5357825a"
},
{
"upstreamType": "FIELD_SET",
Expand All @@ -57,7 +57,7 @@
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.prod_foo,PROD),b)"
],
"confidenceScore": 0.2,
"query": "urn:li:query:composite_c89ee7c127c64a5d3a42ee875305087991891c80f42a25012910524bd2c77c45"
"query": "urn:li:query:composite_48c238412066895ccad5d27f9425ce969b2c0633203627eb476d0c9e5357825a"
},
{
"upstreamType": "FIELD_SET",
Expand All @@ -69,15 +69,15 @@
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.prod_foo,PROD),c)"
],
"confidenceScore": 0.2,
"query": "urn:li:query:composite_c89ee7c127c64a5d3a42ee875305087991891c80f42a25012910524bd2c77c45"
"query": "urn:li:query:composite_48c238412066895ccad5d27f9425ce969b2c0633203627eb476d0c9e5357825a"
}
]
}
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:composite_c89ee7c127c64a5d3a42ee875305087991891c80f42a25012910524bd2c77c45",
"entityUrn": "urn:li:query:composite_48c238412066895ccad5d27f9425ce969b2c0633203627eb476d0c9e5357825a",
"changeType": "UPSERT",
"aspectName": "queryProperties",
"aspect": {
Expand All @@ -100,7 +100,7 @@
},
{
"entityType": "query",
"entityUrn": "urn:li:query:composite_c89ee7c127c64a5d3a42ee875305087991891c80f42a25012910524bd2c77c45",
"entityUrn": "urn:li:query:composite_48c238412066895ccad5d27f9425ce969b2c0633203627eb476d0c9e5357825a",
"changeType": "UPSERT",
"aspectName": "querySubjects",
"aspect": {
Expand Down Expand Up @@ -130,7 +130,7 @@
},
{
"entityType": "query",
"entityUrn": "urn:li:query:composite_c89ee7c127c64a5d3a42ee875305087991891c80f42a25012910524bd2c77c45",
"entityUrn": "urn:li:query:composite_48c238412066895ccad5d27f9425ce969b2c0633203627eb476d0c9e5357825a",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
Expand Down
Loading

0 comments on commit 87fa5b8

Please sign in to comment.