Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored May 9, 2024
2 parents 2f04c58 + 96c605d commit 34d79c9
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 13 deletions.
2 changes: 1 addition & 1 deletion docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe

### Breaking Changes

- #10419 - `aws_session_token` and `aws_region` are now required configurations in the DynamoDB connector. The connector will no longer loop through all AWS regions; instead, it will only use the region passed into the recipe configuration.
- #10419 - `aws_region` is now a required configuration in the DynamoDB connector. The connector will no longer loop through all AWS regions; instead, it will only use the region passed into the recipe configuration.

### Potential Downtime

Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/docs/sources/dynamodb/dynamodb_pre.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
### Prerequisities

Notice of breaking change: in the latest version of the DynamoDB connector, both `aws_session_token` and `aws_region` are required configurations. The connector will no longer loop through all AWS regions; instead, it will only use the region passed into the recipe configuration.
Notice of breaking change: in the latest version of the DynamoDB connector, `aws_region` is now a required configuration. The connector will no longer loop through all AWS regions; instead, it will only use the region passed into the recipe configuration.

In order to execute this source, you need to attach the `AmazonDynamoDBReadOnlyAccess` policy to a user in your AWS account. Then create an API access key and secret for the user.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
KnownQueryLineageInfo,
SqlParsingAggregator,
)
from datahub.utilities.perf_timer import PerfTimer

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -226,13 +227,17 @@ def _populate_lineage_agg(
try:
logger.debug(f"Processing {lineage_type.name} lineage query: {query}")

for lineage_row in RedshiftDataDictionary.get_lineage_rows(
conn=connection, query=query
):
processor(lineage_row)
timer = self.report.lineage_phases_timer.setdefault(
lineage_type.name, PerfTimer()
)
with timer:
for lineage_row in RedshiftDataDictionary.get_lineage_rows(
conn=connection, query=query
):
processor(lineage_row)
except Exception as e:
self.report.warning(
f"extract-{lineage_type.name}",
f"lineage-v2-extract-{lineage_type.name}",
f"Error was {e}, {traceback.format_exc()}",
)
self._lineage_v1.report_status(f"extract-{lineage_type.name}", False)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from datahub.ingestion.source_report.time_window import BaseTimeWindowReport
from datahub.sql_parsing.sql_parsing_aggregator import SqlAggregatorReport
from datahub.utilities.lossy_collections import LossyDict
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.stats_collections import TopKDict


Expand Down Expand Up @@ -55,6 +56,7 @@ class RedshiftReport(

# lineage/usage v2
sql_aggregator: Optional[SqlAggregatorReport] = None
lineage_phases_timer: Dict[str, PerfTimer] = field(default_factory=dict)

def report_dropped(self, key: str) -> None:
self.filtered.append(key)
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
)
from datahub.utilities.lossy_collections import LossyDict, LossyList
from datahub.utilities.ordered_set import OrderedSet
from datahub.utilities.perf_timer import PerfTimer

logger = logging.getLogger(__name__)
QueryId = str
Expand Down Expand Up @@ -156,6 +157,10 @@ class SqlAggregatorReport(Report):
default_factory=LossyDict
)

# SQL parsing (over all invocations).
num_sql_parsed: int = 0
sql_parsing_timer: PerfTimer = dataclasses.field(default_factory=PerfTimer)

# Other lineage loading metrics.
num_known_query_lineage: int = 0
num_known_mapping_lineage: int = 0
Expand Down Expand Up @@ -749,12 +754,14 @@ def _run_sql_parser(
timestamp: Optional[datetime] = None,
user: Optional[CorpUserUrn] = None,
) -> SqlParsingResult:
parsed = sqlglot_lineage(
query,
schema_resolver=schema_resolver,
default_db=default_db,
default_schema=default_schema,
)
with self.report.sql_parsing_timer:
parsed = sqlglot_lineage(
query,
schema_resolver=schema_resolver,
default_db=default_db,
default_schema=default_schema,
)
self.report.num_sql_parsed += 1

# Conditionally log the query.
if self.query_log == QueryLogSetting.STORE_ALL or (
Expand Down

0 comments on commit 34d79c9

Please sign in to comment.