Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/transform): extend ownership transformer to other entities #11700

Merged
merged 4 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.transformer.dataset_transformer import (
DatasetOwnershipTransformer,
)
from datahub.ingestion.transformer.dataset_transformer import OwnershipTransformer
from datahub.metadata.schema_classes import (
BrowsePathsV2Class,
MetadataChangeProposalClass,
Expand All @@ -37,7 +35,7 @@ class AddDatasetOwnershipConfig(TransformerSemanticsConfigModel):
is_container: bool = False


class AddDatasetOwnership(DatasetOwnershipTransformer):
class AddDatasetOwnership(OwnershipTransformer):
"""Transformer that adds owners to datasets according to a callback function."""

ctx: PipelineContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,22 @@ def entity_types(self) -> List[str]:
return ["dataset"]


class OwnershipTransformer(
DatasetTransformer, SingleAspectTransformer, metaclass=ABCMeta
):
def aspect_name(self) -> str:
return "ownership"

def entity_types(self) -> List[str]:
return [
"dataset",
"dataJob",
"dataFlow",
"chart",
"dashboard",
]


class TagTransformer(BaseTransformer, SingleAspectTransformer, metaclass=ABCMeta):
"""Transformer that does transform sequentially on each tag."""

Expand All @@ -47,11 +63,6 @@ def entity_types(self) -> List[str]:
return ["container"]


class DatasetOwnershipTransformer(DatasetTransformer, metaclass=ABCMeta):
def aspect_name(self) -> str:
return "ownership"


class DatasetDomainTransformer(DatasetTransformer, metaclass=ABCMeta):
def aspect_name(self) -> str:
return "domains"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.transformer.dataset_transformer import (
DatasetOwnershipTransformer,
)
from datahub.ingestion.transformer.dataset_transformer import OwnershipTransformer
from datahub.metadata.schema_classes import (
OwnerClass,
OwnershipClass,
Expand All @@ -20,7 +18,7 @@ class PatternCleanUpOwnershipConfig(ConfigModel):
pattern_for_cleanup: List[str]


class PatternCleanUpOwnership(DatasetOwnershipTransformer):
class PatternCleanUpOwnership(OwnershipTransformer):
"""Transformer that clean the ownership URN."""

ctx: PipelineContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,15 @@
from datahub.configuration.common import ConfigModel
from datahub.emitter.mce_builder import Aspect
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.transformer.dataset_transformer import (
DatasetOwnershipTransformer,
)
from datahub.ingestion.transformer.dataset_transformer import OwnershipTransformer
from datahub.metadata.schema_classes import OwnershipClass


class ClearDatasetOwnershipConfig(ConfigModel):
pass


class SimpleRemoveDatasetOwnership(DatasetOwnershipTransformer):
class SimpleRemoveDatasetOwnership(OwnershipTransformer):
"""Transformer that clears all owners on each dataset."""

def __init__(self, config: ClearDatasetOwnershipConfig, ctx: PipelineContext):
Expand Down
57 changes: 42 additions & 15 deletions metadata-ingestion/tests/unit/test_transform_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ def make_dataset_with_properties() -> models.MetadataChangeEventClass:
)


def test_simple_dataset_ownership_transformation(mock_time):
def test_dataset_ownership_transformation(mock_time):
no_owner_aspect = make_generic_dataset()

anshbansal marked this conversation as resolved.
Show resolved Hide resolved
with_owner_aspect = make_dataset_with_owner()
Expand Down Expand Up @@ -254,7 +254,7 @@ def test_simple_dataset_ownership_transformation(mock_time):
transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs])
)

assert len(outputs) == len(inputs) + 1
assert len(outputs) == len(inputs) + 2

# Check the first entry.
first_ownership_aspect = builder.get_aspect_if_available(
Expand Down Expand Up @@ -287,11 +287,21 @@ def test_simple_dataset_ownership_transformation(mock_time):
]
)

third_ownership_aspect = outputs[4].record.aspect
assert third_ownership_aspect
assert len(third_ownership_aspect.owners) == 2
assert all(
[
owner.type == models.OwnershipTypeClass.DATAOWNER and owner.typeUrn is None
for owner in second_ownership_aspect.owners
]
)

# Verify that the third entry is unchanged.
assert inputs[2] == outputs[2].record

# Verify that the last entry is EndOfStream
assert inputs[3] == outputs[4].record
assert inputs[-1] == outputs[-1].record
anshbansal marked this conversation as resolved.
Show resolved Hide resolved


def test_simple_dataset_ownership_with_type_transformation(mock_time):
Expand Down Expand Up @@ -1003,6 +1013,7 @@ def test_pattern_dataset_ownership_transformation(mock_time):
"rules": {
".*example1.*": [builder.make_user_urn("person1")],
".*example2.*": [builder.make_user_urn("person2")],
".*dag_abc.*": [builder.make_user_urn("person2")],
}
},
"ownership_type": "DATAOWNER",
Expand All @@ -1014,7 +1025,9 @@ def test_pattern_dataset_ownership_transformation(mock_time):
transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs])
)

assert len(outputs) == len(inputs) + 1 # additional MCP due to the no-owner MCE
assert (
len(outputs) == len(inputs) + 2
) # additional MCP due to the no-owner MCE + datajob

# Check the first entry.
assert inputs[0] == outputs[0].record
Expand Down Expand Up @@ -1042,6 +1055,16 @@ def test_pattern_dataset_ownership_transformation(mock_time):
]
)

third_ownership_aspect = outputs[4].record.aspect
assert third_ownership_aspect
assert len(third_ownership_aspect.owners) == 1
assert all(
[
owner.type == models.OwnershipTypeClass.DATAOWNER
for owner in third_ownership_aspect.owners
]
)

# Verify that the third entry is unchanged.
assert inputs[2] == outputs[2].record

Expand Down Expand Up @@ -1122,14 +1145,14 @@ def fake_get_aspect(
pipeline_context.graph.get_aspect = fake_get_aspect # type: ignore

# No owner aspect for the first dataset
no_owner_aspect = models.MetadataChangeEventClass(
no_owner_aspect_dataset = models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)",
aspects=[models.StatusClass(removed=False)],
),
)
# Dataset with an existing owner
with_owner_aspect = models.MetadataChangeEventClass(
with_owner_aspect_dataset = models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)",
aspects=[
Expand All @@ -1148,8 +1171,7 @@ def fake_get_aspect(
),
)

# Not a dataset, should be ignored
not_a_dataset = models.MetadataChangeEventClass(
datajob = models.MetadataChangeEventClass(
proposedSnapshot=models.DataJobSnapshotClass(
urn="urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)",
aspects=[
Expand All @@ -1163,9 +1185,9 @@ def fake_get_aspect(
)

inputs = [
no_owner_aspect,
with_owner_aspect,
not_a_dataset,
no_owner_aspect_dataset,
with_owner_aspect_dataset,
datajob,
EndOfStream(),
]

Expand All @@ -1176,6 +1198,7 @@ def fake_get_aspect(
"rules": {
".*example1.*": [builder.make_user_urn("person1")],
".*example2.*": [builder.make_user_urn("person2")],
".*dag_abc.*": [builder.make_user_urn("person3")],
}
},
"ownership_type": "DATAOWNER",
Expand All @@ -1188,9 +1211,9 @@ def fake_get_aspect(
transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs])
)

assert len(outputs) == len(inputs) + 3
assert len(outputs) == len(inputs) + 4

# Check the first entry.
# Check that DatasetSnapshotClass has not changed
assert inputs[0] == outputs[0].record

# Check the ownership for the first dataset (example1)
Expand All @@ -1217,12 +1240,16 @@ def fake_get_aspect(
]
)

third_ownership_aspect = outputs[4].record.aspect
assert third_ownership_aspect
assert len(third_ownership_aspect.owners) == 1 # new for datajob

# Check container ownerships
for i in range(2):
container_ownership_aspect = outputs[i + 4].record.aspect
container_ownership_aspect = outputs[i + 5].record.aspect
assert container_ownership_aspect
ownership = json.loads(container_ownership_aspect.value.decode("utf-8"))
assert len(ownership) == 2
assert len(ownership) == 3
assert ownership[0]["value"]["owner"] == builder.make_user_urn("person1")
assert ownership[1]["value"]["owner"] == builder.make_user_urn("person2")

Expand Down
Loading