From 2b5339e80abad5b607860a7e1084f3d51a796c72 Mon Sep 17 00:00:00 2001 From: treff7es Date: Wed, 30 Oct 2024 16:05:12 +0100 Subject: [PATCH] Pr review fixes --- .../ingestion/source/datahub/config.py | 7 +- .../source/datahub/datahub_database_reader.py | 159 +++++++++--------- .../source/datahub/datahub_kafka_reader.py | 4 +- .../source/datahub/datahub_source.py | 3 +- .../ingestion/source/datahub/report.py | 1 + 5 files changed, 89 insertions(+), 85 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py index 76630f636ae60..a3304334cb1eb 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py @@ -1,7 +1,7 @@ import os from typing import Optional, Set -from pydantic import Field, root_validator, validator +from pydantic import Field, root_validator from datahub.configuration.common import AllowDenyPattern from datahub.configuration.kafka import KafkaConsumerConnectionConfig @@ -96,11 +96,6 @@ class DataHubSourceConfig(StatefulIngestionConfigBase): urn_pattern: AllowDenyPattern = Field(default=AllowDenyPattern()) - @validator("exclude_aspects", pre=True, each_item=True) - def lowercase_exclude_aspects(cls, v): - # We lowercase the exclude_aspects to ensure case insensitivity - return v.lower() - @root_validator(skip_on_failure=True) def check_ingesting_data(cls, values): if ( diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_database_reader.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_database_reader.py index c6d011fc44397..1c5150b2cb850 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_database_reader.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_database_reader.py @@ -1,11 +1,10 @@ +import contextlib import json import logging from datetime import datetime -from typing import Any, Generic, Iterable, List, Optional, Tuple, TypeVar +from typing import Any, Dict, Generic, Iterable, List, Optional, Tuple, TypeVar from sqlalchemy import create_engine -from sqlalchemy.engine import Row -from typing_extensions import Protocol from datahub.emitter.aspect import ASPECT_MAP from datahub.emitter.mcp import MetadataChangeProposalWrapper @@ -21,13 +20,7 @@ # Should work for at least mysql, mariadb, postgres DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S.%f" - -class VersionOrderable(Protocol): - createdon: Any # Should restrict to only orderable types - version: int - - -ROW = TypeVar("ROW", bound=VersionOrderable) +ROW = TypeVar("ROW", bound=Dict[str, Any]) class VersionOrderer(Generic[ROW]): @@ -54,14 +47,14 @@ def _process_row(self, row: ROW) -> Iterable[ROW]: return yield from self._attempt_queue_flush(row) - if row.version == 0: + if row["version"] == 0: self._add_to_queue(row) else: yield row def _add_to_queue(self, row: ROW) -> None: if self.queue is None: - self.queue = (row.createdon, [row]) + self.queue = (row["createdon"], [row]) else: self.queue[1].append(row) @@ -69,7 +62,7 @@ def _attempt_queue_flush(self, row: ROW) -> Iterable[ROW]: if self.queue is None: return - if row.createdon > self.queue[0]: + if row["createdon"] > self.queue[0]: yield from self._flush_queue() def _flush_queue(self) -> Iterable[ROW]: @@ -93,7 +86,7 @@ def __init__( ) @property - def soft_deleted_urns(self) -> str: + def soft_deleted_urns_query(self) -> str: return f""" SELECT DISTINCT mav.urn FROM {self.engine.dialect.identifier_preparer.quote(self.config.database_table_name)} as mav @@ -105,8 +98,6 @@ def soft_deleted_urns(self) -> str: ) as sd ON sd.urn = mav.urn WHERE sd.removed = true ORDER BY mav.urn - LIMIT %(limit)s - OFFSET %(offset)s """ @property @@ -118,58 +109,79 @@ def query(self) -> str: # Relies on createdon order to reflect version order # Ordering of entries with the same createdon is handled by VersionOrderer return f""" - SELECT * from ( - SELECT CONCAT(mav.createdon, "-", mav.urn, "-", mav.aspect, "-", mav.version) as row_id ,mav.urn, mav.aspect, mav.metadata, mav.systemmetadata, mav.createdon, mav.version, removed + SELECT * + FROM ( + SELECT + mav.urn, + mav.aspect, + mav.metadata, + mav.systemmetadata, + mav.createdon, + mav.version, + removed FROM {self.engine.dialect.identifier_preparer.quote(self.config.database_table_name)} as mav - LEFT JOIN (SELECT *, JSON_EXTRACT(metadata, '$.removed') as removed from {self.engine.dialect.identifier_preparer.quote(self.config.database_table_name)} where aspect = "status" and version = 0) as sd on sd.urn = mav.urn + LEFT JOIN ( + SELECT + *, + JSON_EXTRACT(metadata, '$.removed') as removed + FROM {self.engine.dialect.identifier_preparer.quote(self.config.database_table_name)} + WHERE aspect = 'status' + AND version = 0 + ) as sd ON sd.urn = mav.urn WHERE 1 = 1 - {"" if self.config.include_all_versions else "AND mav.version = 0"} - {"" if not self.config.exclude_aspects else f"AND LOWER(mav.aspect) NOT IN %(exclude_aspects)s"} - AND mav.createdon >= %(since_createdon)s - ORDER BY createdon, urn, aspect, version - ) as t - where row_id > %(last_id)s + {"" if self.config.include_all_versions else "AND mav.version = 0"} + {"" if not self.config.exclude_aspects else "AND LOWER(mav.aspect) NOT IN %(exclude_aspects)s"} + AND mav.createdon >= %(since_createdon)s + ORDER BY + createdon, + urn, + aspect, + version + ) as t + WHERE 1=1 {"" if self.config.include_soft_deleted_entities else "AND (removed = false or removed is NULL)"} - ORDER BY createdon, urn, aspect, version - LIMIT %(limit)s + ORDER BY + createdon, + urn, + aspect, + version + limit 1000 """ def get_aspects( self, from_createdon: datetime, stop_time: datetime ) -> Iterable[Tuple[MetadataChangeProposalWrapper, datetime]]: - orderer = VersionOrderer[Row](enabled=self.config.include_all_versions) + orderer = VersionOrderer[Dict[str, Any]]( + enabled=self.config.include_all_versions + ) rows = self._get_rows(from_createdon=from_createdon, stop_time=stop_time) for row in orderer(rows): mcp = self._parse_row(row) if mcp: - yield mcp, row.createdon + yield mcp, row["createdon"] - def _get_rows(self, from_createdon: datetime, stop_time: datetime) -> Iterable[Row]: + def _get_rows( + self, from_createdon: datetime, stop_time: datetime + ) -> Iterable[Dict[str, Any]]: with self.engine.connect() as conn: - ts = from_createdon - last_id = "" - while ts.timestamp() <= stop_time.timestamp(): - logger.debug(f"Polling database aspects from {ts}") - rows = conn.execute( + with contextlib.closing(conn.connection.cursor()) as cursor: + cursor.execute( self.query, - exclude_aspects=list(self.config.exclude_aspects), - since_createdon=ts.strftime(DATETIME_FORMAT), - last_id=last_id, - limit=self.config.database_query_batch_size, + { + "exclude_aspects": list(self.config.exclude_aspects), + "since_createdon": from_createdon.strftime(DATETIME_FORMAT), + }, ) - if not rows.rowcount: - return - - for i, row in enumerate(rows): - yield row - if last_id == row.row_id: - return - else: - last_id = row.row_id - ts = row.createdon + columns = [desc[0] for desc in cursor.description] + while True: + rows = cursor.fetchmany(self.config.database_query_batch_size) + if not rows: + return + for row in rows: + yield dict(zip(columns, row)) - def get_soft_deleted_rows(self) -> Iterable[Row]: + def get_soft_deleted_rows(self) -> Iterable[Dict[str, Any]]: """ Fetches all soft-deleted entities from the database. @@ -177,41 +189,38 @@ def get_soft_deleted_rows(self) -> Iterable[Row]: Row objects containing URNs of soft-deleted entities """ with self.engine.connect() as conn: - offset = 0 - - while True: - logger.debug(f"Polling soft-deleted urns with offset {offset}") - - rows = conn.execute( - self.soft_deleted_urns, - {"limit": self.config.database_query_batch_size, "offset": offset}, - ) - - if not rows.rowcount: - break - - for row in rows: - yield row - - offset += rows.rowcount - - def _parse_row(self, row: Row) -> Optional[MetadataChangeProposalWrapper]: + with contextlib.closing(conn.connection.cursor()) as cursor: + logger.debug("Polling soft-deleted urns from database") + cursor.execute(self.soft_deleted_urns_query) + columns = [desc[0] for desc in cursor.description] + while True: + rows = cursor.fetchmany(self.config.database_query_batch_size) + if not rows: + return + for row in rows: + yield dict(zip(columns, row)) + + def _parse_row( + self, row: Dict[str, Any] + ) -> Optional[MetadataChangeProposalWrapper]: try: - json_aspect = post_json_transform(json.loads(row.metadata)) - json_metadata = post_json_transform(json.loads(row.systemmetadata or "{}")) + json_aspect = post_json_transform(json.loads(row["metadata"])) + json_metadata = post_json_transform( + json.loads(row["systemmetadata"] or "{}") + ) system_metadata = SystemMetadataClass.from_obj(json_metadata) return MetadataChangeProposalWrapper( - entityUrn=row.urn, - aspect=ASPECT_MAP[row.aspect].from_obj(json_aspect), + entityUrn=row["urn"], + aspect=ASPECT_MAP[row["aspect"]].from_obj(json_aspect), systemMetadata=system_metadata, changeType=ChangeTypeClass.UPSERT, ) except Exception as e: logger.warning( - f"Failed to parse metadata for {row.urn}: {e}", exc_info=True + f'Failed to parse metadata for {row["urn"]}: {e}', exc_info=True ) self.report.num_database_parse_errors += 1 self.report.database_parse_errors.setdefault( str(e), LossyDict() - ).setdefault(row.aspect, LossyList()).append(row.urn) + ).setdefault(row["aspect"], LossyList()).append(row["urn"]) return None diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_kafka_reader.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_kafka_reader.py index 9e80a7bb8162e..56a3d55abb184 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_kafka_reader.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_kafka_reader.py @@ -31,7 +31,6 @@ def __init__( connection_config: KafkaConsumerConnectionConfig, report: DataHubSourceReport, ctx: PipelineContext, - soft_deleted_urns: List[str] = [], ): self.config = config self.connection_config = connection_config @@ -97,7 +96,8 @@ def _poll_partition( ) break - if mcl.aspectName and mcl.aspectName.lower() in self.config.exclude_aspects: + if mcl.aspectName and mcl.aspectName in self.config.exclude_aspects: + self.report.num_kafka_excluded_aspects += 1 continue # TODO: Consider storing state in kafka instead, via consumer.commit() diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py index d8e32e5b076b6..de212ca9a6771 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py @@ -89,7 +89,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: "Cannot exclude soft deleted entities without a database connection" ) soft_deleted_urns = [ - row[0] for row in database_reader.get_soft_deleted_rows() + row["urn"] for row in database_reader.get_soft_deleted_rows() ] yield from self._get_kafka_workunits( @@ -135,7 +135,6 @@ def _get_kafka_workunits( self.config.kafka_connection, self.report, self.ctx, - soft_deleted_urns, ) as reader: mcls = reader.get_mcls( from_offsets=from_offsets, stop_time=self.report.stop_time diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/report.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/report.py index 46212839870a0..721fc87989442 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/report.py @@ -20,6 +20,7 @@ class DataHubSourceReport(StatefulIngestionReport): num_kafka_aspects_ingested: int = 0 num_kafka_parse_errors: int = 0 + num_kafka_excluded_aspects: int = 0 kafka_parse_errors: LossyDict[str, int] = field(default_factory=LossyDict) num_timeseries_deletions_dropped: int = 0