Skip to content

Commit

Permalink
fix(ingest): ignore irrelevant urns from % change computation (datahu…
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate authored Oct 11, 2024
1 parent 43c185d commit d0d09a0
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
from datahub.utilities.dedup_list import deduplicate_list
from datahub.utilities.urns.urn import guess_entity_type

STATEFUL_INGESTION_IGNORED_ENTITY_TYPES = {
"dataProcessInstance",
"query",
}


def pydantic_state_migrator(mapping: Dict[str, str]) -> classmethod:
# mapping would be something like:
Expand Down Expand Up @@ -127,8 +132,11 @@ def get_percent_entities_changed(
:param old_checkpoint_state: the old checkpoint state to compute the relative change percent against.
:return: (1-|intersection(self, old_checkpoint_state)| / |old_checkpoint_state|) * 100.0
"""

old_urns_filtered = filter_ignored_entity_types(old_checkpoint_state.urns)

return compute_percent_entities_changed(
new_entities=self.urns, old_entities=old_checkpoint_state.urns
new_entities=self.urns, old_entities=old_urns_filtered
)

def urn_count(self) -> int:
Expand All @@ -153,3 +161,19 @@ def _get_entity_overlap_and_cardinalities(
new_set = set(new_entities)
old_set = set(old_entities)
return len(new_set.intersection(old_set)), len(old_set), len(new_set)


def filter_ignored_entity_types(urns: List[str]) -> List[str]:
# We previously stored ignored entity urns (e.g.dataProcessInstance) in state.
# For smoother transition from old checkpoint state, without requiring explicit
# setting of `fail_safe_threshold` due to removal of irrelevant urns from new state,
# here, we would ignore irrelevant urns from percentage entities changed computation
# This special handling can be removed after few months.
return [
urn
for urn in urns
if not any(
urn.startswith(f"urn:li:{entityType}")
for entityType in STATEFUL_INGESTION_IGNORED_ENTITY_TYPES
)
]
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import JobId
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.state.checkpoint import Checkpoint
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
from datahub.ingestion.source.state.entity_removal_state import (
STATEFUL_INGESTION_IGNORED_ENTITY_TYPES,
GenericCheckpointState,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfig,
StatefulIngestionConfigBase,
Expand All @@ -27,11 +30,6 @@

logger: logging.Logger = logging.getLogger(__name__)

STATEFUL_INGESTION_IGNORED_ENTITY_TYPES = {
"dataProcessInstance",
"query",
}


class StatefulStaleMetadataRemovalConfig(StatefulIngestionConfig):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from datahub.ingestion.source.state.entity_removal_state import (
compute_percent_entities_changed,
filter_ignored_entity_types,
)

EntList = List[str]
Expand Down Expand Up @@ -46,3 +47,20 @@ def test_change_percent(
new_entities=new_entities, old_entities=old_entities
)
assert actual_percent_change == expected_percent_change


def test_filter_ignored_entity_types():

assert filter_ignored_entity_types(
[
"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)",
"urn:li:dataProcessInstance:478810e859f870a54f72c681f41af619",
"urn:li:query:query1",
]
) == [
"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)",
]

0 comments on commit d0d09a0

Please sign in to comment.