Skip to content

Commit

Permalink
feat(ingest/transform): extend ownership transformer to other entities (
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Oct 29, 2024
1 parent bea253a commit 02f0a3d
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.transformer.dataset_transformer import (
DatasetOwnershipTransformer,
)
from datahub.ingestion.transformer.dataset_transformer import OwnershipTransformer
from datahub.metadata.schema_classes import (
BrowsePathsV2Class,
MetadataChangeProposalClass,
Expand All @@ -37,7 +35,7 @@ class AddDatasetOwnershipConfig(TransformerSemanticsConfigModel):
is_container: bool = False


class AddDatasetOwnership(DatasetOwnershipTransformer):
class AddDatasetOwnership(OwnershipTransformer):
"""Transformer that adds owners to datasets according to a callback function."""

ctx: PipelineContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,22 @@ def entity_types(self) -> List[str]:
return ["dataset"]


class OwnershipTransformer(
DatasetTransformer, SingleAspectTransformer, metaclass=ABCMeta
):
def aspect_name(self) -> str:
return "ownership"

def entity_types(self) -> List[str]:
return [
"dataset",
"dataJob",
"dataFlow",
"chart",
"dashboard",
]


class TagTransformer(BaseTransformer, SingleAspectTransformer, metaclass=ABCMeta):
"""Transformer that does transform sequentially on each tag."""

Expand All @@ -47,11 +63,6 @@ def entity_types(self) -> List[str]:
return ["container"]


class DatasetOwnershipTransformer(DatasetTransformer, metaclass=ABCMeta):
def aspect_name(self) -> str:
return "ownership"


class DatasetDomainTransformer(DatasetTransformer, metaclass=ABCMeta):
def aspect_name(self) -> str:
return "domains"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.transformer.dataset_transformer import (
DatasetOwnershipTransformer,
)
from datahub.ingestion.transformer.dataset_transformer import OwnershipTransformer
from datahub.metadata.schema_classes import (
OwnerClass,
OwnershipClass,
Expand All @@ -20,7 +18,7 @@ class PatternCleanUpOwnershipConfig(ConfigModel):
pattern_for_cleanup: List[str]


class PatternCleanUpOwnership(DatasetOwnershipTransformer):
class PatternCleanUpOwnership(OwnershipTransformer):
"""Transformer that clean the ownership URN."""

ctx: PipelineContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,15 @@
from datahub.configuration.common import ConfigModel
from datahub.emitter.mce_builder import Aspect
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.transformer.dataset_transformer import (
DatasetOwnershipTransformer,
)
from datahub.ingestion.transformer.dataset_transformer import OwnershipTransformer
from datahub.metadata.schema_classes import OwnershipClass


class ClearDatasetOwnershipConfig(ConfigModel):
pass


class SimpleRemoveDatasetOwnership(DatasetOwnershipTransformer):
class SimpleRemoveDatasetOwnership(OwnershipTransformer):
"""Transformer that clears all owners on each dataset."""

def __init__(self, config: ClearDatasetOwnershipConfig, ctx: PipelineContext):
Expand Down
57 changes: 42 additions & 15 deletions metadata-ingestion/tests/unit/test_transform_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ def make_dataset_with_properties() -> models.MetadataChangeEventClass:
)


def test_simple_dataset_ownership_transformation(mock_time):
def test_dataset_ownership_transformation(mock_time):
no_owner_aspect = make_generic_dataset()

with_owner_aspect = make_dataset_with_owner()
Expand Down Expand Up @@ -254,7 +254,7 @@ def test_simple_dataset_ownership_transformation(mock_time):
transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs])
)

assert len(outputs) == len(inputs) + 1
assert len(outputs) == len(inputs) + 2

# Check the first entry.
first_ownership_aspect = builder.get_aspect_if_available(
Expand Down Expand Up @@ -287,11 +287,21 @@ def test_simple_dataset_ownership_transformation(mock_time):
]
)

third_ownership_aspect = outputs[4].record.aspect
assert third_ownership_aspect
assert len(third_ownership_aspect.owners) == 2
assert all(
[
owner.type == models.OwnershipTypeClass.DATAOWNER and owner.typeUrn is None
for owner in second_ownership_aspect.owners
]
)

# Verify that the third entry is unchanged.
assert inputs[2] == outputs[2].record

# Verify that the last entry is EndOfStream
assert inputs[3] == outputs[4].record
assert inputs[-1] == outputs[-1].record


def test_simple_dataset_ownership_with_type_transformation(mock_time):
Expand Down Expand Up @@ -1003,6 +1013,7 @@ def test_pattern_dataset_ownership_transformation(mock_time):
"rules": {
".*example1.*": [builder.make_user_urn("person1")],
".*example2.*": [builder.make_user_urn("person2")],
".*dag_abc.*": [builder.make_user_urn("person2")],
}
},
"ownership_type": "DATAOWNER",
Expand All @@ -1014,7 +1025,9 @@ def test_pattern_dataset_ownership_transformation(mock_time):
transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs])
)

assert len(outputs) == len(inputs) + 1 # additional MCP due to the no-owner MCE
assert (
len(outputs) == len(inputs) + 2
) # additional MCP due to the no-owner MCE + datajob

# Check the first entry.
assert inputs[0] == outputs[0].record
Expand Down Expand Up @@ -1042,6 +1055,16 @@ def test_pattern_dataset_ownership_transformation(mock_time):
]
)

third_ownership_aspect = outputs[4].record.aspect
assert third_ownership_aspect
assert len(third_ownership_aspect.owners) == 1
assert all(
[
owner.type == models.OwnershipTypeClass.DATAOWNER
for owner in third_ownership_aspect.owners
]
)

# Verify that the third entry is unchanged.
assert inputs[2] == outputs[2].record

Expand Down Expand Up @@ -1122,14 +1145,14 @@ def fake_get_aspect(
pipeline_context.graph.get_aspect = fake_get_aspect # type: ignore

# No owner aspect for the first dataset
no_owner_aspect = models.MetadataChangeEventClass(
no_owner_aspect_dataset = models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)",
aspects=[models.StatusClass(removed=False)],
),
)
# Dataset with an existing owner
with_owner_aspect = models.MetadataChangeEventClass(
with_owner_aspect_dataset = models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)",
aspects=[
Expand All @@ -1148,8 +1171,7 @@ def fake_get_aspect(
),
)

# Not a dataset, should be ignored
not_a_dataset = models.MetadataChangeEventClass(
datajob = models.MetadataChangeEventClass(
proposedSnapshot=models.DataJobSnapshotClass(
urn="urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)",
aspects=[
Expand All @@ -1163,9 +1185,9 @@ def fake_get_aspect(
)

inputs = [
no_owner_aspect,
with_owner_aspect,
not_a_dataset,
no_owner_aspect_dataset,
with_owner_aspect_dataset,
datajob,
EndOfStream(),
]

Expand All @@ -1176,6 +1198,7 @@ def fake_get_aspect(
"rules": {
".*example1.*": [builder.make_user_urn("person1")],
".*example2.*": [builder.make_user_urn("person2")],
".*dag_abc.*": [builder.make_user_urn("person3")],
}
},
"ownership_type": "DATAOWNER",
Expand All @@ -1188,9 +1211,9 @@ def fake_get_aspect(
transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs])
)

assert len(outputs) == len(inputs) + 3
assert len(outputs) == len(inputs) + 4

# Check the first entry.
# Check that DatasetSnapshotClass has not changed
assert inputs[0] == outputs[0].record

# Check the ownership for the first dataset (example1)
Expand All @@ -1217,12 +1240,16 @@ def fake_get_aspect(
]
)

third_ownership_aspect = outputs[4].record.aspect
assert third_ownership_aspect
assert len(third_ownership_aspect.owners) == 1 # new for datajob

# Check container ownerships
for i in range(2):
container_ownership_aspect = outputs[i + 4].record.aspect
container_ownership_aspect = outputs[i + 5].record.aspect
assert container_ownership_aspect
ownership = json.loads(container_ownership_aspect.value.decode("utf-8"))
assert len(ownership) == 2
assert len(ownership) == 3
assert ownership[0]["value"]["owner"] == builder.make_user_urn("person1")
assert ownership[1]["value"]["owner"] == builder.make_user_urn("person2")

Expand Down

0 comments on commit 02f0a3d

Please sign in to comment.