Skip to content

Commit

Permalink
feat(ingest/redshift): add timers for lineage v2 (datahub-project#10460)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored May 9, 2024
1 parent 94d424b commit 96c605d
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
KnownQueryLineageInfo,
SqlParsingAggregator,
)
from datahub.utilities.perf_timer import PerfTimer

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -226,13 +227,17 @@ def _populate_lineage_agg(
try:
logger.debug(f"Processing {lineage_type.name} lineage query: {query}")

for lineage_row in RedshiftDataDictionary.get_lineage_rows(
conn=connection, query=query
):
processor(lineage_row)
timer = self.report.lineage_phases_timer.setdefault(
lineage_type.name, PerfTimer()
)
with timer:
for lineage_row in RedshiftDataDictionary.get_lineage_rows(
conn=connection, query=query
):
processor(lineage_row)
except Exception as e:
self.report.warning(
f"extract-{lineage_type.name}",
f"lineage-v2-extract-{lineage_type.name}",
f"Error was {e}, {traceback.format_exc()}",
)
self._lineage_v1.report_status(f"extract-{lineage_type.name}", False)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from datahub.ingestion.source_report.time_window import BaseTimeWindowReport
from datahub.sql_parsing.sql_parsing_aggregator import SqlAggregatorReport
from datahub.utilities.lossy_collections import LossyDict
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.stats_collections import TopKDict


Expand Down Expand Up @@ -55,6 +56,7 @@ class RedshiftReport(

# lineage/usage v2
sql_aggregator: Optional[SqlAggregatorReport] = None
lineage_phases_timer: Dict[str, PerfTimer] = field(default_factory=dict)

def report_dropped(self, key: str) -> None:
self.filtered.append(key)
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
)
from datahub.utilities.lossy_collections import LossyDict, LossyList
from datahub.utilities.ordered_set import OrderedSet
from datahub.utilities.perf_timer import PerfTimer

logger = logging.getLogger(__name__)
QueryId = str
Expand Down Expand Up @@ -156,6 +157,10 @@ class SqlAggregatorReport(Report):
default_factory=LossyDict
)

# SQL parsing (over all invocations).
num_sql_parsed: int = 0
sql_parsing_timer: PerfTimer = dataclasses.field(default_factory=PerfTimer)

# Other lineage loading metrics.
num_known_query_lineage: int = 0
num_known_mapping_lineage: int = 0
Expand Down Expand Up @@ -749,12 +754,14 @@ def _run_sql_parser(
timestamp: Optional[datetime] = None,
user: Optional[CorpUserUrn] = None,
) -> SqlParsingResult:
parsed = sqlglot_lineage(
query,
schema_resolver=schema_resolver,
default_db=default_db,
default_schema=default_schema,
)
with self.report.sql_parsing_timer:
parsed = sqlglot_lineage(
query,
schema_resolver=schema_resolver,
default_db=default_db,
default_schema=default_schema,
)
self.report.num_sql_parsed += 1

# Conditionally log the query.
if self.query_log == QueryLogSetting.STORE_ALL or (
Expand Down

0 comments on commit 96c605d

Please sign in to comment.