Skip to content

Commit

Permalink
fix(ingest): drop empty fields (datahub-project#11613)
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Oct 15, 2024
1 parent be1b880 commit 1eec2c4
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 0 deletions.
2 changes: 2 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from datahub.ingestion.api.source_helpers import (
auto_browse_path_v2,
auto_fix_duplicate_schema_field_paths,
auto_fix_empty_field_paths,
auto_lowercase_urns,
auto_materialize_referenced_tags_terms,
auto_status_aspect,
Expand Down Expand Up @@ -444,6 +445,7 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
partial(
auto_fix_duplicate_schema_field_paths, platform=self._infer_platform()
),
partial(auto_fix_empty_field_paths, platform=self._infer_platform()),
browse_path_processor,
partial(auto_workunit_reporter, self.get_report()),
auto_patch_last_modified,
Expand Down
44 changes: 44 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/api/source_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,50 @@ def auto_fix_duplicate_schema_field_paths(
)


def auto_fix_empty_field_paths(
stream: Iterable[MetadataWorkUnit],
*,
platform: Optional[str] = None,
) -> Iterable[MetadataWorkUnit]:
"""Count schema metadata aspects with empty field paths and emit telemetry."""

total_schema_aspects = 0
schemas_with_empty_fields = 0
empty_field_paths = 0

for wu in stream:
schema_metadata = wu.get_aspect_of_type(SchemaMetadataClass)
if schema_metadata:
total_schema_aspects += 1

updated_fields: List[SchemaFieldClass] = []
for field in schema_metadata.fields:
if field.fieldPath:
updated_fields.append(field)
else:
empty_field_paths += 1

if empty_field_paths > 0:
logger.info(
f"Fixing empty field paths in schema aspect for {wu.get_urn()} by dropping empty fields"
)
schema_metadata.fields = updated_fields
schemas_with_empty_fields += 1

yield wu

if schemas_with_empty_fields > 0:
properties = {
"platform": platform,
"total_schema_aspects": total_schema_aspects,
"schemas_with_empty_fields": schemas_with_empty_fields,
"empty_field_paths": empty_field_paths,
}
telemetry.telemetry_instance.ping(
"ingestion_empty_schema_field_paths", properties
)


def auto_empty_dataset_usage_statistics(
stream: Iterable[MetadataWorkUnit],
*,
Expand Down

0 comments on commit 1eec2c4

Please sign in to comment.