Skip to content

Commit

Permalink
Pr review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es committed Oct 30, 2024
1 parent 6e3f358 commit 2b5339e
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 85 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
from typing import Optional, Set

from pydantic import Field, root_validator, validator
from pydantic import Field, root_validator

from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.kafka import KafkaConsumerConnectionConfig
Expand Down Expand Up @@ -96,11 +96,6 @@ class DataHubSourceConfig(StatefulIngestionConfigBase):

urn_pattern: AllowDenyPattern = Field(default=AllowDenyPattern())

@validator("exclude_aspects", pre=True, each_item=True)
def lowercase_exclude_aspects(cls, v):
# We lowercase the exclude_aspects to ensure case insensitivity
return v.lower()

@root_validator(skip_on_failure=True)
def check_ingesting_data(cls, values):
if (
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import contextlib
import json
import logging
from datetime import datetime
from typing import Any, Generic, Iterable, List, Optional, Tuple, TypeVar
from typing import Any, Dict, Generic, Iterable, List, Optional, Tuple, TypeVar

from sqlalchemy import create_engine
from sqlalchemy.engine import Row
from typing_extensions import Protocol

from datahub.emitter.aspect import ASPECT_MAP
from datahub.emitter.mcp import MetadataChangeProposalWrapper
Expand All @@ -21,13 +20,7 @@
# Should work for at least mysql, mariadb, postgres
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S.%f"


class VersionOrderable(Protocol):
createdon: Any # Should restrict to only orderable types
version: int


ROW = TypeVar("ROW", bound=VersionOrderable)
ROW = TypeVar("ROW", bound=Dict[str, Any])


class VersionOrderer(Generic[ROW]):
Expand All @@ -54,22 +47,22 @@ def _process_row(self, row: ROW) -> Iterable[ROW]:
return

yield from self._attempt_queue_flush(row)
if row.version == 0:
if row["version"] == 0:
self._add_to_queue(row)
else:
yield row

def _add_to_queue(self, row: ROW) -> None:
if self.queue is None:
self.queue = (row.createdon, [row])
self.queue = (row["createdon"], [row])
else:
self.queue[1].append(row)

def _attempt_queue_flush(self, row: ROW) -> Iterable[ROW]:
if self.queue is None:
return

if row.createdon > self.queue[0]:
if row["createdon"] > self.queue[0]:
yield from self._flush_queue()

def _flush_queue(self) -> Iterable[ROW]:
Expand All @@ -93,7 +86,7 @@ def __init__(
)

@property
def soft_deleted_urns(self) -> str:
def soft_deleted_urns_query(self) -> str:
return f"""
SELECT DISTINCT mav.urn
FROM {self.engine.dialect.identifier_preparer.quote(self.config.database_table_name)} as mav
Expand All @@ -105,8 +98,6 @@ def soft_deleted_urns(self) -> str:
) as sd ON sd.urn = mav.urn
WHERE sd.removed = true
ORDER BY mav.urn
LIMIT %(limit)s
OFFSET %(offset)s
"""

@property
Expand All @@ -118,100 +109,118 @@ def query(self) -> str:
# Relies on createdon order to reflect version order
# Ordering of entries with the same createdon is handled by VersionOrderer
return f"""
SELECT * from (
SELECT CONCAT(mav.createdon, "-", mav.urn, "-", mav.aspect, "-", mav.version) as row_id ,mav.urn, mav.aspect, mav.metadata, mav.systemmetadata, mav.createdon, mav.version, removed
SELECT *
FROM (
SELECT
mav.urn,
mav.aspect,
mav.metadata,
mav.systemmetadata,
mav.createdon,
mav.version,
removed
FROM {self.engine.dialect.identifier_preparer.quote(self.config.database_table_name)} as mav
LEFT JOIN (SELECT *, JSON_EXTRACT(metadata, '$.removed') as removed from {self.engine.dialect.identifier_preparer.quote(self.config.database_table_name)} where aspect = "status" and version = 0) as sd on sd.urn = mav.urn
LEFT JOIN (
SELECT
*,
JSON_EXTRACT(metadata, '$.removed') as removed
FROM {self.engine.dialect.identifier_preparer.quote(self.config.database_table_name)}
WHERE aspect = 'status'
AND version = 0
) as sd ON sd.urn = mav.urn
WHERE 1 = 1
{"" if self.config.include_all_versions else "AND mav.version = 0"}
{"" if not self.config.exclude_aspects else f"AND LOWER(mav.aspect) NOT IN %(exclude_aspects)s"}
AND mav.createdon >= %(since_createdon)s
ORDER BY createdon, urn, aspect, version
) as t
where row_id > %(last_id)s
{"" if self.config.include_all_versions else "AND mav.version = 0"}
{"" if not self.config.exclude_aspects else "AND LOWER(mav.aspect) NOT IN %(exclude_aspects)s"}
AND mav.createdon >= %(since_createdon)s
ORDER BY
createdon,
urn,
aspect,
version
) as t
WHERE 1=1
{"" if self.config.include_soft_deleted_entities else "AND (removed = false or removed is NULL)"}
ORDER BY createdon, urn, aspect, version
LIMIT %(limit)s
ORDER BY
createdon,
urn,
aspect,
version
limit 1000
"""

def get_aspects(
self, from_createdon: datetime, stop_time: datetime
) -> Iterable[Tuple[MetadataChangeProposalWrapper, datetime]]:
orderer = VersionOrderer[Row](enabled=self.config.include_all_versions)
orderer = VersionOrderer[Dict[str, Any]](
enabled=self.config.include_all_versions
)
rows = self._get_rows(from_createdon=from_createdon, stop_time=stop_time)
for row in orderer(rows):
mcp = self._parse_row(row)
if mcp:
yield mcp, row.createdon
yield mcp, row["createdon"]

def _get_rows(self, from_createdon: datetime, stop_time: datetime) -> Iterable[Row]:
def _get_rows(
self, from_createdon: datetime, stop_time: datetime
) -> Iterable[Dict[str, Any]]:
with self.engine.connect() as conn:
ts = from_createdon
last_id = ""
while ts.timestamp() <= stop_time.timestamp():
logger.debug(f"Polling database aspects from {ts}")
rows = conn.execute(
with contextlib.closing(conn.connection.cursor()) as cursor:
cursor.execute(
self.query,
exclude_aspects=list(self.config.exclude_aspects),
since_createdon=ts.strftime(DATETIME_FORMAT),
last_id=last_id,
limit=self.config.database_query_batch_size,
{
"exclude_aspects": list(self.config.exclude_aspects),
"since_createdon": from_createdon.strftime(DATETIME_FORMAT),
},
)
if not rows.rowcount:
return

for i, row in enumerate(rows):
yield row

if last_id == row.row_id:
return
else:
last_id = row.row_id
ts = row.createdon
columns = [desc[0] for desc in cursor.description]
while True:
rows = cursor.fetchmany(self.config.database_query_batch_size)
if not rows:
return
for row in rows:
yield dict(zip(columns, row))

def get_soft_deleted_rows(self) -> Iterable[Row]:
def get_soft_deleted_rows(self) -> Iterable[Dict[str, Any]]:
"""
Fetches all soft-deleted entities from the database.
Yields:
Row objects containing URNs of soft-deleted entities
"""
with self.engine.connect() as conn:
offset = 0

while True:
logger.debug(f"Polling soft-deleted urns with offset {offset}")

rows = conn.execute(
self.soft_deleted_urns,
{"limit": self.config.database_query_batch_size, "offset": offset},
)

if not rows.rowcount:
break

for row in rows:
yield row

offset += rows.rowcount

def _parse_row(self, row: Row) -> Optional[MetadataChangeProposalWrapper]:
with contextlib.closing(conn.connection.cursor()) as cursor:
logger.debug("Polling soft-deleted urns from database")
cursor.execute(self.soft_deleted_urns_query)
columns = [desc[0] for desc in cursor.description]
while True:
rows = cursor.fetchmany(self.config.database_query_batch_size)
if not rows:
return
for row in rows:
yield dict(zip(columns, row))

def _parse_row(
self, row: Dict[str, Any]
) -> Optional[MetadataChangeProposalWrapper]:
try:
json_aspect = post_json_transform(json.loads(row.metadata))
json_metadata = post_json_transform(json.loads(row.systemmetadata or "{}"))
json_aspect = post_json_transform(json.loads(row["metadata"]))
json_metadata = post_json_transform(
json.loads(row["systemmetadata"] or "{}")
)
system_metadata = SystemMetadataClass.from_obj(json_metadata)
return MetadataChangeProposalWrapper(
entityUrn=row.urn,
aspect=ASPECT_MAP[row.aspect].from_obj(json_aspect),
entityUrn=row["urn"],
aspect=ASPECT_MAP[row["aspect"]].from_obj(json_aspect),
systemMetadata=system_metadata,
changeType=ChangeTypeClass.UPSERT,
)
except Exception as e:
logger.warning(
f"Failed to parse metadata for {row.urn}: {e}", exc_info=True
f'Failed to parse metadata for {row["urn"]}: {e}', exc_info=True
)
self.report.num_database_parse_errors += 1
self.report.database_parse_errors.setdefault(
str(e), LossyDict()
).setdefault(row.aspect, LossyList()).append(row.urn)
).setdefault(row["aspect"], LossyList()).append(row["urn"])
return None
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ def __init__(
connection_config: KafkaConsumerConnectionConfig,
report: DataHubSourceReport,
ctx: PipelineContext,
soft_deleted_urns: List[str] = [],
):
self.config = config
self.connection_config = connection_config
Expand Down Expand Up @@ -97,7 +96,8 @@ def _poll_partition(
)
break

if mcl.aspectName and mcl.aspectName.lower() in self.config.exclude_aspects:
if mcl.aspectName and mcl.aspectName in self.config.exclude_aspects:
self.report.num_kafka_excluded_aspects += 1
continue

# TODO: Consider storing state in kafka instead, via consumer.commit()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
"Cannot exclude soft deleted entities without a database connection"
)
soft_deleted_urns = [
row[0] for row in database_reader.get_soft_deleted_rows()
row["urn"] for row in database_reader.get_soft_deleted_rows()
]

yield from self._get_kafka_workunits(
Expand Down Expand Up @@ -135,7 +135,6 @@ def _get_kafka_workunits(
self.config.kafka_connection,
self.report,
self.ctx,
soft_deleted_urns,
) as reader:
mcls = reader.get_mcls(
from_offsets=from_offsets, stop_time=self.report.stop_time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class DataHubSourceReport(StatefulIngestionReport):

num_kafka_aspects_ingested: int = 0
num_kafka_parse_errors: int = 0
num_kafka_excluded_aspects: int = 0
kafka_parse_errors: LossyDict[str, int] = field(default_factory=LossyDict)

num_timeseries_deletions_dropped: int = 0
Expand Down

0 comments on commit 2b5339e

Please sign in to comment.