Skip to content

Commit

Permalink
fix(ingest/redshift): fix unload lineage in lineage_v2 (#11620)
Browse files Browse the repository at this point in the history
Co-authored-by: Tamas Nemeth <treff7es@gmail.com>
  • Loading branch information
hsheth2 and treff7es authored Nov 5, 2024
1 parent e706d15 commit a67981b
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def __init__(
self.context = context

self.database = database
self.known_urns: Set[str] = set() # will be set later

self.aggregator = SqlParsingAggregator(
platform=self.platform,
Expand All @@ -68,6 +69,7 @@ def __init__(
generate_operations=False,
usage_config=self.config,
graph=self.context.graph,
is_temp_table=self._is_temp_table,
)
self.report.sql_aggregator = self.aggregator.report

Expand All @@ -87,7 +89,16 @@ def __init__(
self.report.lineage_end_time,
) = self._lineage_v1.get_time_window()

self.known_urns: Set[str] = set() # will be set later
def _is_temp_table(self, name: str) -> bool:
return (
DatasetUrn.create_from_ids(
self.platform,
name,
env=self.config.env,
platform_instance=self.config.platform_instance,
).urn()
not in self.known_urns
)

def build(
self,
Expand All @@ -107,15 +118,6 @@ def build(
for schema, tables in schemas.items()
for table in tables
}
self.aggregator._is_temp_table = (
lambda name: DatasetUrn.create_from_ids(
self.platform,
name,
env=self.config.env,
platform_instance=self.config.platform_instance,
).urn()
not in self.known_urns
)

# Handle all the temp tables up front.
if self.config.resolve_temp_table_in_lineage:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,8 +537,13 @@ def _maybe_format_query(self, query: str) -> str:
return query

@functools.lru_cache(maxsize=128)
def _name_from_urn(self, urn: UrnStr) -> str:
name = DatasetUrn.from_string(urn).name
def _name_from_urn(self, urn: UrnStr) -> Optional[str]:
urn_obj = DatasetUrn.from_string(urn)
if urn_obj.platform != self.platform.urn():
# If this is external (e.g. s3), we don't know the name.
return None

name = urn_obj.name
if (
platform_instance := self._schema_resolver.platform_instance
) and name.startswith(platform_instance):
Expand All @@ -549,14 +554,22 @@ def _name_from_urn(self, urn: UrnStr) -> str:
def is_temp_table(self, urn: UrnStr) -> bool:
if self._is_temp_table is None:
return False
return self._is_temp_table(self._name_from_urn(urn))
name = self._name_from_urn(urn)
if name is None:
# External tables are not temp tables.
return False
return self._is_temp_table(name)

def is_allowed_table(self, urn: UrnStr) -> bool:
def is_allowed_table(self, urn: UrnStr, allow_external: bool = True) -> bool:
if self.is_temp_table(urn):
return False
if self._is_allowed_table is None:
return True
return self._is_allowed_table(self._name_from_urn(urn))
name = self._name_from_urn(urn)
if name is None:
# Treat external tables specially.
return allow_external
return self._is_allowed_table(name)

def add(
self,
Expand Down Expand Up @@ -852,7 +865,7 @@ def add_preparsed_query(
upstream_fields = parsed.column_usage or {}
for upstream_urn in parsed.upstreams:
# If the upstream table is a temp table or otherwise denied by filters, don't log usage for it.
if not self.is_allowed_table(upstream_urn) or (
if not self.is_allowed_table(upstream_urn, allow_external=False) or (
require_out_table_schema
and not self._schema_resolver.has_urn(upstream_urn)
):
Expand Down

0 comments on commit a67981b

Please sign in to comment.