diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index 85ae17ddf6529..586b1c610dc75 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -37,6 +37,7 @@ from datahub.ingestion.api.source_helpers import ( auto_browse_path_v2, auto_fix_duplicate_schema_field_paths, + auto_fix_empty_field_paths, auto_lowercase_urns, auto_materialize_referenced_tags_terms, auto_status_aspect, @@ -444,6 +445,7 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: partial( auto_fix_duplicate_schema_field_paths, platform=self._infer_platform() ), + partial(auto_fix_empty_field_paths, platform=self._infer_platform()), browse_path_processor, partial(auto_workunit_reporter, self.get_report()), auto_patch_last_modified, diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index 372aef707f232..748d8a8e52a79 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -394,6 +394,50 @@ def auto_fix_duplicate_schema_field_paths( ) +def auto_fix_empty_field_paths( + stream: Iterable[MetadataWorkUnit], + *, + platform: Optional[str] = None, +) -> Iterable[MetadataWorkUnit]: + """Count schema metadata aspects with empty field paths and emit telemetry.""" + + total_schema_aspects = 0 + schemas_with_empty_fields = 0 + empty_field_paths = 0 + + for wu in stream: + schema_metadata = wu.get_aspect_of_type(SchemaMetadataClass) + if schema_metadata: + total_schema_aspects += 1 + + updated_fields: List[SchemaFieldClass] = [] + for field in schema_metadata.fields: + if field.fieldPath: + updated_fields.append(field) + else: + empty_field_paths += 1 + + if empty_field_paths > 0: + logger.info( + f"Fixing empty field paths in schema aspect for {wu.get_urn()} by dropping empty fields" + ) + schema_metadata.fields = updated_fields + schemas_with_empty_fields += 1 + + yield wu + + if schemas_with_empty_fields > 0: + properties = { + "platform": platform, + "total_schema_aspects": total_schema_aspects, + "schemas_with_empty_fields": schemas_with_empty_fields, + "empty_field_paths": empty_field_paths, + } + telemetry.telemetry_instance.ping( + "ingestion_empty_schema_field_paths", properties + ) + + def auto_empty_dataset_usage_statistics( stream: Iterable[MetadataWorkUnit], *,