Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Apr 15, 2024
2 parents 8f6d7b4 + 20e2cc7 commit 1afc480
Show file tree
Hide file tree
Showing 8 changed files with 295 additions and 10 deletions.
18 changes: 18 additions & 0 deletions metadata-ingestion/docs/transformer/dataset_transformer.md
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,24 @@ Then define your class to return a list of custom properties, for example:
add_properties_resolver_class: "<your_module>.<your_class>"
```

## Replace ExternalUrl
### Config Details
| Field | Required | Type | Default | Description |
|-----------------------------|----------|---------|---------------|---------------------------------------------|
| `input_pattern` | ✅ | string | | String or pattern to replace |
| `replacement` | ✅ | string | | Replacement string |


Matches the full/partial string in the externalUrl of the dataset properties and replace that with the replacement string

```yaml
transformers:
- type: "replace_external_url"
config:
input_pattern: '\b\w*hub\b'
replacement: "sub"
```

## Simple Add Dataset domains
### Config Details
| Field | Required | Type | Default | Description |
Expand Down
81 changes: 81 additions & 0 deletions metadata-ingestion/examples/mce_files/bootstrap_mce.json
Original file line number Diff line number Diff line change
Expand Up @@ -3612,5 +3612,86 @@
"contentType": "application/json"
},
"systemMetadata": null
},
{
"entityType": "post",
"entityUrn": "urn:li:post:f3a68539-f7e4-4c41-a4fd-9e57c085d8dd",
"changeType": "UPSERT",
"aspectName": "postInfo",
"aspect": {
"json": {
"type": "HOME_PAGE_ANNOUNCEMENT",
"content": {
"title": "Join DataHub Slack",
"type": "LINK",
"link": "https://datahubproject.io/slack?utm_source=quickstart&utm_medium=annoucement&utm_campaign=quickstart_annoucement",
"media": {
"type": "IMAGE",
"location": "https://raw.githubusercontent.com/datahub-project/static-assets/main/imgs/datahub-logo-color-mark.svg"
}
},
"created": 1712547125049,
"lastModified": 1712547125049
}
},
"systemMetadata": {
"lastObserved": 1712548844816,
"runId": "datahub-2024_04_08-13_00_44",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "post",
"entityUrn": "urn:li:post:e9c7e4a5-b7b0-4390-bd94-9a0fe8acd6bf",
"changeType": "UPSERT",
"aspectName": "postInfo",
"aspect": {
"json": {
"type": "HOME_PAGE_ANNOUNCEMENT",
"content": {
"title": "View Docs",
"type": "LINK",
"link": "https://datahubproject.io/docs",
"media": {
"type": "IMAGE",
"location": "https://raw.githubusercontent.com/datahub-project/static-assets/main/imgs/datahub-logo-color-mark.svg"
}
},
"created": 1712547164761,
"lastModified": 1712547164761
}
},
"systemMetadata": {
"lastObserved": 1712548844816,
"runId": "datahub-2024_04_08-13_00_44",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "post",
"entityUrn": "urn:li:post:326e42b3-2965-47d1-9660-50e513af7d6e",
"changeType": "UPSERT",
"aspectName": "postInfo",
"aspect": {
"json": {
"type": "HOME_PAGE_ANNOUNCEMENT",
"content": {
"title": "Upcoming Events",
"type": "LINK",
"link": "https://datahubproject.io/events",
"media": {
"type": "IMAGE",
"location": "https://datahubproject.io/img/acryl-logo-transparent-mark.svg"
}
},
"created": 1712547268882,
"lastModified": 1712547268882
}
},
"systemMetadata": {
"lastObserved": 1712548844817,
"runId": "datahub-2024_04_08-13_00_44",
"lastRunId": "no-run-id-provided"
}
}
]
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,7 @@
"add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:AddDatasetDataProduct",
"simple_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:SimpleAddDatasetDataProduct",
"pattern_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:PatternAddDatasetDataProduct",
"replace_external_url = datahub.ingestion.transformer.replace_external_url:ReplaceExternalUrl"
],
"datahub.ingestion.sink.plugins": [
"file = datahub.ingestion.sink.file:FileSink",
Expand Down
19 changes: 13 additions & 6 deletions metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,19 @@ def get_resource_owners_work_unit(
# If we want to overwrite or there are no existing tags, create a new GlobalTags object
current_ownership = OwnershipClass(owners, lastModified=get_audit_stamp())
else:
current_owner_urns: Set[str] = set(
[owner.owner for owner in current_ownership.owners]
)
owners_filtered: List[OwnerClass] = [
owner for owner in owners if owner.owner not in current_owner_urns
]
owners_filtered: List[OwnerClass] = []
for owner in owners:
owner_exists = False
for current_owner in current_ownership.owners:
if (
owner.owner == current_owner.owner
and owner.type == current_owner.type
):
owner_exists = True
break
if not owner_exists:
owners_filtered.append(owner)

# If there are no new owners to add, we don't need to emit a work unit.
if len(owners_filtered) <= 0:
return None
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import copy
import re
from typing import Any, Dict, Optional, cast

from datahub.configuration.common import ConfigModel
from datahub.emitter.mce_builder import Aspect
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.transformer.dataset_transformer import (
DatasetPropertiesTransformer,
)
from datahub.metadata.schema_classes import DatasetPropertiesClass


class ReplaceExternalUrlConfig(ConfigModel):
input_pattern: str
replacement: str


class ReplaceExternalUrl(DatasetPropertiesTransformer):
"""Transformer that clean the ownership URN."""

ctx: PipelineContext
config: ReplaceExternalUrlConfig

def __init__(
self,
config: ReplaceExternalUrlConfig,
ctx: PipelineContext,
**resolver_args: Dict[str, Any],
):
super().__init__()
self.ctx = ctx
self.config = config
self.resolver_args = resolver_args

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "ReplaceExternalUrl":
config = ReplaceExternalUrlConfig.parse_obj(config_dict)
return cls(config, ctx)

def transform_aspect(
self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect]
) -> Optional[Aspect]:
in_dataset_properties_aspect: DatasetPropertiesClass = cast(
DatasetPropertiesClass, aspect
)

if (
not hasattr(in_dataset_properties_aspect, "externalUrl")
or not in_dataset_properties_aspect.externalUrl
):
return cast(Aspect, in_dataset_properties_aspect)
else:
out_dataset_properties_aspect: DatasetPropertiesClass = copy.deepcopy(
in_dataset_properties_aspect
)

pattern = re.compile(self.config.input_pattern)
replacement = self.config.replacement

out_dataset_properties_aspect.externalUrl = re.sub(
pattern, replacement, in_dataset_properties_aspect.externalUrl
)

return cast(Aspect, out_dataset_properties_aspect)
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,36 @@
"runId": "test-csv-enricher"
}
},
{
"entityType": "dashboard",
"entityUrn": "urn:li:dashboard:(looker,baz)",
"changeType": "UPSERT",
"aspectName": "ownership",
"aspect": {
"json": {
"owners": [
{
"owner": "urn:li:corpuser:datahub",
"type": "BUSINESS_OWNER"
},
{
"owner": "urn:li:corpuser:jdoe",
"type": "BUSINESS_OWNER"
}
],
"ownerTypes": {},
"lastModified": {
"time": 1643871600000,
"actor": "urn:li:corpuser:ingestion"
}
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "test-csv-enricher",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dashboard",
"entityUrn": "urn:li:dashboard:(looker,baz)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ resource,subresource,glossary_terms,tags,owners,ownership_type,description,domai
"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_bar,,[urn:li:tag:Legacy],,,field_bar?
"urn:li:container:DATABASE",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,container description,urn:li:domain:Engineering
"urn:li:chart:(looker,baz1)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description,urn:li:domain:Engineering
"urn:li:dashboard:(looker,baz)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description,urn:li:domain:Engineering
"urn:li:dashboard:(looker,baz)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],BUSINESS_OWNER,new description,urn:li:domain:Engineering
"urn:li:mlFeature:(test_feature_table_all_feature_dtypes,test_BOOL_LIST_feature)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description,urn:li:domain:Engineering
"urn:li:mlFeatureTable:(urn:li:dataPlatform:feast,user_features)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description,urn:li:domain:Engineering
"urn:li:mlPrimaryKey:(test_feature_table_all_feature_dtypes,dummy_entity_1)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description,urn:li:domain:Engineering
Expand All @@ -13,3 +13,4 @@ resource,subresource,glossary_terms,tags,owners,ownership_type,description,domai
"urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_123)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description,urn:li:domain:Engineering
"urn:li:dataFlow:(airflow,dag_abc,PROD)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description,urn:li:domain:Engineering
"urn:li:notebook:(querybook,1234)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description,urn:li:domain:Engineering
"urn:li:dashboard:(looker,baz)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description,urn:li:domain:Engineering
88 changes: 85 additions & 3 deletions metadata-ingestion/tests/unit/test_transform_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,13 @@
ExtractOwnersFromTagsTransformer,
)
from datahub.ingestion.transformer.mark_dataset_status import MarkDatasetStatus
from datahub.ingestion.transformer.pattern_cleanup_ownership import (
PatternCleanUpOwnership,
)
from datahub.ingestion.transformer.remove_dataset_ownership import (
SimpleRemoveDatasetOwnership,
)
from datahub.ingestion.transformer.replace_external_url import ReplaceExternalUrl
from datahub.metadata.schema_classes import (
BrowsePathsClass,
DatasetPropertiesClass,
Expand All @@ -87,9 +91,6 @@
)
from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub.utilities.urns.urn import Urn
from src.datahub.ingestion.transformer.pattern_cleanup_ownership import (
PatternCleanUpOwnership,
)


def make_generic_dataset(
Expand Down Expand Up @@ -3209,3 +3210,84 @@ def test_clean_owner_urn_transformation_should_not_remove_system_identifier(
config: List[Union[re.Pattern, str]] = ["urn:li:corpuser:"]

_test_clean_owner_urns(pipeline_context, in_owner_urns, config, in_owner_urns)


def test_replace_external_url_word_replace(
mock_datahub_graph,
):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_replace_external_url"
)
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)

output = run_dataset_transformer_pipeline(
transformer_type=ReplaceExternalUrl,
aspect=models.DatasetPropertiesClass(
externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml",
customProperties=EXISTING_PROPERTIES.copy(),
),
config={"input_pattern": "datahub", "replacement": "starhub"},
pipeline_context=pipeline_context,
)

assert len(output) == 2
assert output[0].record
assert output[0].record.aspect
assert (
output[0].record.aspect.externalUrl
== "https://github.com/starhub/looker-demo/blob/master/foo.view.lkml"
)


def test_replace_external_regex_replace_1(
mock_datahub_graph,
):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_replace_external_url"
)
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)

output = run_dataset_transformer_pipeline(
transformer_type=ReplaceExternalUrl,
aspect=models.DatasetPropertiesClass(
externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml",
customProperties=EXISTING_PROPERTIES.copy(),
),
config={"input_pattern": r"datahub/.*/", "replacement": "starhub/test/"},
pipeline_context=pipeline_context,
)

assert len(output) == 2
assert output[0].record
assert output[0].record.aspect
assert (
output[0].record.aspect.externalUrl
== "https://github.com/starhub/test/foo.view.lkml"
)


def test_replace_external_regex_replace_2(
mock_datahub_graph,
):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_replace_external_url"
)
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)

output = run_dataset_transformer_pipeline(
transformer_type=ReplaceExternalUrl,
aspect=models.DatasetPropertiesClass(
externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml",
customProperties=EXISTING_PROPERTIES.copy(),
),
config={"input_pattern": r"\b\w*hub\b", "replacement": "test"},
pipeline_context=pipeline_context,
)

assert len(output) == 2
assert output[0].record
assert output[0].record.aspect
assert (
output[0].record.aspect.externalUrl
== "https://test.com/test/looker-demo/blob/master/foo.view.lkml"
)

0 comments on commit 1afc480

Please sign in to comment.