diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py index e7cc5fc50cf6e..4b84c25965a99 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py @@ -57,6 +57,7 @@ def __init__( self.context = context self.database = database + self.known_urns: Set[str] = set() # will be set later self.aggregator = SqlParsingAggregator( platform=self.platform, @@ -68,6 +69,7 @@ def __init__( generate_operations=False, usage_config=self.config, graph=self.context.graph, + is_temp_table=self._is_temp_table, ) self.report.sql_aggregator = self.aggregator.report @@ -87,7 +89,16 @@ def __init__( self.report.lineage_end_time, ) = self._lineage_v1.get_time_window() - self.known_urns: Set[str] = set() # will be set later + def _is_temp_table(self, name: str) -> bool: + return ( + DatasetUrn.create_from_ids( + self.platform, + name, + env=self.config.env, + platform_instance=self.config.platform_instance, + ).urn() + not in self.known_urns + ) def build( self, @@ -107,15 +118,6 @@ def build( for schema, tables in schemas.items() for table in tables } - self.aggregator._is_temp_table = ( - lambda name: DatasetUrn.create_from_ids( - self.platform, - name, - env=self.config.env, - platform_instance=self.config.platform_instance, - ).urn() - not in self.known_urns - ) # Handle all the temp tables up front. if self.config.resolve_temp_table_in_lineage: diff --git a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py index 5c3a6b5b533a0..f0496379c45b8 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py @@ -537,8 +537,13 @@ def _maybe_format_query(self, query: str) -> str: return query @functools.lru_cache(maxsize=128) - def _name_from_urn(self, urn: UrnStr) -> str: - name = DatasetUrn.from_string(urn).name + def _name_from_urn(self, urn: UrnStr) -> Optional[str]: + urn_obj = DatasetUrn.from_string(urn) + if urn_obj.platform != self.platform.urn(): + # If this is external (e.g. s3), we don't know the name. + return None + + name = urn_obj.name if ( platform_instance := self._schema_resolver.platform_instance ) and name.startswith(platform_instance): @@ -549,14 +554,22 @@ def _name_from_urn(self, urn: UrnStr) -> str: def is_temp_table(self, urn: UrnStr) -> bool: if self._is_temp_table is None: return False - return self._is_temp_table(self._name_from_urn(urn)) + name = self._name_from_urn(urn) + if name is None: + # External tables are not temp tables. + return False + return self._is_temp_table(name) - def is_allowed_table(self, urn: UrnStr) -> bool: + def is_allowed_table(self, urn: UrnStr, allow_external: bool = True) -> bool: if self.is_temp_table(urn): return False if self._is_allowed_table is None: return True - return self._is_allowed_table(self._name_from_urn(urn)) + name = self._name_from_urn(urn) + if name is None: + # Treat external tables specially. + return allow_external + return self._is_allowed_table(name) def add( self, @@ -852,7 +865,7 @@ def add_preparsed_query( upstream_fields = parsed.column_usage or {} for upstream_urn in parsed.upstreams: # If the upstream table is a temp table or otherwise denied by filters, don't log usage for it. - if not self.is_allowed_table(upstream_urn) or ( + if not self.is_allowed_table(upstream_urn, allow_external=False) or ( require_out_table_schema and not self._schema_resolver.has_urn(upstream_urn) ):