From 94d424b7d4deed9fb746afe86247ceb5610620c7 Mon Sep 17 00:00:00 2001 From: Davi Arnaut Date: Thu, 9 May 2024 10:22:33 -0500 Subject: [PATCH 1/2] fix(docs): adjust new requirements for DynamoDB ingestion (#10470) --- docs/how/updating-datahub.md | 2 +- metadata-ingestion/docs/sources/dynamodb/dynamodb_pre.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index f5b3600d98306..fb769650bce0d 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -20,7 +20,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe ### Breaking Changes -- #10419 - `aws_session_token` and `aws_region` are now required configurations in the DynamoDB connector. The connector will no longer loop through all AWS regions; instead, it will only use the region passed into the recipe configuration. +- #10419 - `aws_region` is now a required configuration in the DynamoDB connector. The connector will no longer loop through all AWS regions; instead, it will only use the region passed into the recipe configuration. ### Potential Downtime diff --git a/metadata-ingestion/docs/sources/dynamodb/dynamodb_pre.md b/metadata-ingestion/docs/sources/dynamodb/dynamodb_pre.md index d6815774b7df0..0e85ec7a8cc61 100644 --- a/metadata-ingestion/docs/sources/dynamodb/dynamodb_pre.md +++ b/metadata-ingestion/docs/sources/dynamodb/dynamodb_pre.md @@ -1,6 +1,6 @@ ### Prerequisities -Notice of breaking change: in the latest version of the DynamoDB connector, both `aws_session_token` and `aws_region` are required configurations. The connector will no longer loop through all AWS regions; instead, it will only use the region passed into the recipe configuration. +Notice of breaking change: in the latest version of the DynamoDB connector, `aws_region` is now a required configuration. The connector will no longer loop through all AWS regions; instead, it will only use the region passed into the recipe configuration. In order to execute this source, you need to attach the `AmazonDynamoDBReadOnlyAccess` policy to a user in your AWS account. Then create an API access key and secret for the user. From 96c605df711afd049cbac8ef07692f6d9ff7ac93 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 9 May 2024 09:18:26 -0700 Subject: [PATCH 2/2] feat(ingest/redshift): add timers for lineage v2 (#10460) --- .../ingestion/source/redshift/lineage_v2.py | 15 ++++++++++----- .../ingestion/source/redshift/report.py | 2 ++ .../sql_parsing/sql_parsing_aggregator.py | 19 +++++++++++++------ 3 files changed, 25 insertions(+), 11 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py index 45fd1477df44e..797b309f528cc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py @@ -34,6 +34,7 @@ KnownQueryLineageInfo, SqlParsingAggregator, ) +from datahub.utilities.perf_timer import PerfTimer logger = logging.getLogger(__name__) @@ -226,13 +227,17 @@ def _populate_lineage_agg( try: logger.debug(f"Processing {lineage_type.name} lineage query: {query}") - for lineage_row in RedshiftDataDictionary.get_lineage_rows( - conn=connection, query=query - ): - processor(lineage_row) + timer = self.report.lineage_phases_timer.setdefault( + lineage_type.name, PerfTimer() + ) + with timer: + for lineage_row in RedshiftDataDictionary.get_lineage_rows( + conn=connection, query=query + ): + processor(lineage_row) except Exception as e: self.report.warning( - f"extract-{lineage_type.name}", + f"lineage-v2-extract-{lineage_type.name}", f"Error was {e}, {traceback.format_exc()}", ) self._lineage_v1.report_status(f"extract-{lineage_type.name}", False) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/report.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/report.py index e2a035091d0ad..2e6cb8051c91e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/report.py @@ -8,6 +8,7 @@ from datahub.ingestion.source_report.time_window import BaseTimeWindowReport from datahub.sql_parsing.sql_parsing_aggregator import SqlAggregatorReport from datahub.utilities.lossy_collections import LossyDict +from datahub.utilities.perf_timer import PerfTimer from datahub.utilities.stats_collections import TopKDict @@ -55,6 +56,7 @@ class RedshiftReport( # lineage/usage v2 sql_aggregator: Optional[SqlAggregatorReport] = None + lineage_phases_timer: Dict[str, PerfTimer] = field(default_factory=dict) def report_dropped(self, key: str) -> None: self.filtered.append(key) diff --git a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py index f06ca650bab9e..530764e8320cd 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py @@ -51,6 +51,7 @@ ) from datahub.utilities.lossy_collections import LossyDict, LossyList from datahub.utilities.ordered_set import OrderedSet +from datahub.utilities.perf_timer import PerfTimer logger = logging.getLogger(__name__) QueryId = str @@ -156,6 +157,10 @@ class SqlAggregatorReport(Report): default_factory=LossyDict ) + # SQL parsing (over all invocations). + num_sql_parsed: int = 0 + sql_parsing_timer: PerfTimer = dataclasses.field(default_factory=PerfTimer) + # Other lineage loading metrics. num_known_query_lineage: int = 0 num_known_mapping_lineage: int = 0 @@ -749,12 +754,14 @@ def _run_sql_parser( timestamp: Optional[datetime] = None, user: Optional[CorpUserUrn] = None, ) -> SqlParsingResult: - parsed = sqlglot_lineage( - query, - schema_resolver=schema_resolver, - default_db=default_db, - default_schema=default_schema, - ) + with self.report.sql_parsing_timer: + parsed = sqlglot_lineage( + query, + schema_resolver=schema_resolver, + default_db=default_db, + default_schema=default_schema, + ) + self.report.num_sql_parsed += 1 # Conditionally log the query. if self.query_log == QueryLogSetting.STORE_ALL or (