diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_ownership.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_ownership.py index 54be2e5fac1e3..b107a62c905b4 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_ownership.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_ownership.py @@ -13,9 +13,7 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.graph.client import DataHubGraph -from datahub.ingestion.transformer.dataset_transformer import ( - DatasetOwnershipTransformer, -) +from datahub.ingestion.transformer.dataset_transformer import OwnershipTransformer from datahub.metadata.schema_classes import ( BrowsePathsV2Class, MetadataChangeProposalClass, @@ -37,7 +35,7 @@ class AddDatasetOwnershipConfig(TransformerSemanticsConfigModel): is_container: bool = False -class AddDatasetOwnership(DatasetOwnershipTransformer): +class AddDatasetOwnership(OwnershipTransformer): """Transformer that adds owners to datasets according to a callback function.""" ctx: PipelineContext diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py index 42dd54f4a584a..00b3a9ba59f92 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py @@ -27,6 +27,22 @@ def entity_types(self) -> List[str]: return ["dataset"] +class OwnershipTransformer( + DatasetTransformer, SingleAspectTransformer, metaclass=ABCMeta +): + def aspect_name(self) -> str: + return "ownership" + + def entity_types(self) -> List[str]: + return [ + "dataset", + "dataJob", + "dataFlow", + "chart", + "dashboard", + ] + + class TagTransformer(BaseTransformer, SingleAspectTransformer, metaclass=ABCMeta): """Transformer that does transform sequentially on each tag.""" @@ -47,11 +63,6 @@ def entity_types(self) -> List[str]: return ["container"] -class DatasetOwnershipTransformer(DatasetTransformer, metaclass=ABCMeta): - def aspect_name(self) -> str: - return "ownership" - - class DatasetDomainTransformer(DatasetTransformer, metaclass=ABCMeta): def aspect_name(self) -> str: return "domains" diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/pattern_cleanup_ownership.py b/metadata-ingestion/src/datahub/ingestion/transformer/pattern_cleanup_ownership.py index 8ef61ab9679e6..f17546d6f7299 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/pattern_cleanup_ownership.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/pattern_cleanup_ownership.py @@ -4,9 +4,7 @@ import datahub.emitter.mce_builder as builder from datahub.configuration.common import ConfigModel from datahub.ingestion.api.common import PipelineContext -from datahub.ingestion.transformer.dataset_transformer import ( - DatasetOwnershipTransformer, -) +from datahub.ingestion.transformer.dataset_transformer import OwnershipTransformer from datahub.metadata.schema_classes import ( OwnerClass, OwnershipClass, @@ -20,7 +18,7 @@ class PatternCleanUpOwnershipConfig(ConfigModel): pattern_for_cleanup: List[str] -class PatternCleanUpOwnership(DatasetOwnershipTransformer): +class PatternCleanUpOwnership(OwnershipTransformer): """Transformer that clean the ownership URN.""" ctx: PipelineContext diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/remove_dataset_ownership.py b/metadata-ingestion/src/datahub/ingestion/transformer/remove_dataset_ownership.py index f5d71a4340554..934e2a13d5631 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/remove_dataset_ownership.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/remove_dataset_ownership.py @@ -3,9 +3,7 @@ from datahub.configuration.common import ConfigModel from datahub.emitter.mce_builder import Aspect from datahub.ingestion.api.common import PipelineContext -from datahub.ingestion.transformer.dataset_transformer import ( - DatasetOwnershipTransformer, -) +from datahub.ingestion.transformer.dataset_transformer import OwnershipTransformer from datahub.metadata.schema_classes import OwnershipClass @@ -13,7 +11,7 @@ class ClearDatasetOwnershipConfig(ConfigModel): pass -class SimpleRemoveDatasetOwnership(DatasetOwnershipTransformer): +class SimpleRemoveDatasetOwnership(OwnershipTransformer): """Transformer that clears all owners on each dataset.""" def __init__(self, config: ClearDatasetOwnershipConfig, ctx: PipelineContext): diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index 4e9a38cb37ae6..389f7b70b3311 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -220,7 +220,7 @@ def make_dataset_with_properties() -> models.MetadataChangeEventClass: ) -def test_simple_dataset_ownership_transformation(mock_time): +def test_dataset_ownership_transformation(mock_time): no_owner_aspect = make_generic_dataset() with_owner_aspect = make_dataset_with_owner() @@ -254,7 +254,7 @@ def test_simple_dataset_ownership_transformation(mock_time): transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs]) ) - assert len(outputs) == len(inputs) + 1 + assert len(outputs) == len(inputs) + 2 # Check the first entry. first_ownership_aspect = builder.get_aspect_if_available( @@ -287,11 +287,21 @@ def test_simple_dataset_ownership_transformation(mock_time): ] ) + third_ownership_aspect = outputs[4].record.aspect + assert third_ownership_aspect + assert len(third_ownership_aspect.owners) == 2 + assert all( + [ + owner.type == models.OwnershipTypeClass.DATAOWNER and owner.typeUrn is None + for owner in second_ownership_aspect.owners + ] + ) + # Verify that the third entry is unchanged. assert inputs[2] == outputs[2].record # Verify that the last entry is EndOfStream - assert inputs[3] == outputs[4].record + assert inputs[-1] == outputs[-1].record def test_simple_dataset_ownership_with_type_transformation(mock_time): @@ -1003,6 +1013,7 @@ def test_pattern_dataset_ownership_transformation(mock_time): "rules": { ".*example1.*": [builder.make_user_urn("person1")], ".*example2.*": [builder.make_user_urn("person2")], + ".*dag_abc.*": [builder.make_user_urn("person2")], } }, "ownership_type": "DATAOWNER", @@ -1014,7 +1025,9 @@ def test_pattern_dataset_ownership_transformation(mock_time): transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs]) ) - assert len(outputs) == len(inputs) + 1 # additional MCP due to the no-owner MCE + assert ( + len(outputs) == len(inputs) + 2 + ) # additional MCP due to the no-owner MCE + datajob # Check the first entry. assert inputs[0] == outputs[0].record @@ -1042,6 +1055,16 @@ def test_pattern_dataset_ownership_transformation(mock_time): ] ) + third_ownership_aspect = outputs[4].record.aspect + assert third_ownership_aspect + assert len(third_ownership_aspect.owners) == 1 + assert all( + [ + owner.type == models.OwnershipTypeClass.DATAOWNER + for owner in third_ownership_aspect.owners + ] + ) + # Verify that the third entry is unchanged. assert inputs[2] == outputs[2].record @@ -1122,14 +1145,14 @@ def fake_get_aspect( pipeline_context.graph.get_aspect = fake_get_aspect # type: ignore # No owner aspect for the first dataset - no_owner_aspect = models.MetadataChangeEventClass( + no_owner_aspect_dataset = models.MetadataChangeEventClass( proposedSnapshot=models.DatasetSnapshotClass( urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)", aspects=[models.StatusClass(removed=False)], ), ) # Dataset with an existing owner - with_owner_aspect = models.MetadataChangeEventClass( + with_owner_aspect_dataset = models.MetadataChangeEventClass( proposedSnapshot=models.DatasetSnapshotClass( urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)", aspects=[ @@ -1148,8 +1171,7 @@ def fake_get_aspect( ), ) - # Not a dataset, should be ignored - not_a_dataset = models.MetadataChangeEventClass( + datajob = models.MetadataChangeEventClass( proposedSnapshot=models.DataJobSnapshotClass( urn="urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)", aspects=[ @@ -1163,9 +1185,9 @@ def fake_get_aspect( ) inputs = [ - no_owner_aspect, - with_owner_aspect, - not_a_dataset, + no_owner_aspect_dataset, + with_owner_aspect_dataset, + datajob, EndOfStream(), ] @@ -1176,6 +1198,7 @@ def fake_get_aspect( "rules": { ".*example1.*": [builder.make_user_urn("person1")], ".*example2.*": [builder.make_user_urn("person2")], + ".*dag_abc.*": [builder.make_user_urn("person3")], } }, "ownership_type": "DATAOWNER", @@ -1188,9 +1211,9 @@ def fake_get_aspect( transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs]) ) - assert len(outputs) == len(inputs) + 3 + assert len(outputs) == len(inputs) + 4 - # Check the first entry. + # Check that DatasetSnapshotClass has not changed assert inputs[0] == outputs[0].record # Check the ownership for the first dataset (example1) @@ -1217,12 +1240,16 @@ def fake_get_aspect( ] ) + third_ownership_aspect = outputs[4].record.aspect + assert third_ownership_aspect + assert len(third_ownership_aspect.owners) == 1 # new for datajob + # Check container ownerships for i in range(2): - container_ownership_aspect = outputs[i + 4].record.aspect + container_ownership_aspect = outputs[i + 5].record.aspect assert container_ownership_aspect ownership = json.loads(container_ownership_aspect.value.decode("utf-8")) - assert len(ownership) == 2 + assert len(ownership) == 3 assert ownership[0]["value"]["owner"] == builder.make_user_urn("person1") assert ownership[1]["value"]["owner"] == builder.make_user_urn("person2")