diff --git a/metadata-ingestion/docs/transformer/dataset_transformer.md b/metadata-ingestion/docs/transformer/dataset_transformer.md index 5421a932dacce..c0a8d31bca4c0 100644 --- a/metadata-ingestion/docs/transformer/dataset_transformer.md +++ b/metadata-ingestion/docs/transformer/dataset_transformer.md @@ -907,6 +907,24 @@ Then define your class to return a list of custom properties, for example: add_properties_resolver_class: "." ``` +## Replace ExternalUrl +### Config Details +| Field | Required | Type | Default | Description | +|-----------------------------|----------|---------|---------------|---------------------------------------------| +| `input_pattern` | ✅ | string | | String or pattern to replace | +| `replacement` | ✅ | string | | Replacement string | + + +Matches the full/partial string in the externalUrl of the dataset properties and replace that with the replacement string + +```yaml +transformers: + - type: "replace_external_url" + config: + input_pattern: '\b\w*hub\b' + replacement: "sub" +``` + ## Simple Add Dataset domains ### Config Details | Field | Required | Type | Default | Description | diff --git a/metadata-ingestion/examples/mce_files/bootstrap_mce.json b/metadata-ingestion/examples/mce_files/bootstrap_mce.json index cefb4d3efb958..fbe6b9953cb4f 100644 --- a/metadata-ingestion/examples/mce_files/bootstrap_mce.json +++ b/metadata-ingestion/examples/mce_files/bootstrap_mce.json @@ -3612,5 +3612,86 @@ "contentType": "application/json" }, "systemMetadata": null + }, + { + "entityType": "post", + "entityUrn": "urn:li:post:f3a68539-f7e4-4c41-a4fd-9e57c085d8dd", + "changeType": "UPSERT", + "aspectName": "postInfo", + "aspect": { + "json": { + "type": "HOME_PAGE_ANNOUNCEMENT", + "content": { + "title": "Join DataHub Slack", + "type": "LINK", + "link": "https://datahubproject.io/slack?utm_source=quickstart&utm_medium=annoucement&utm_campaign=quickstart_annoucement", + "media": { + "type": "IMAGE", + "location": "https://raw.githubusercontent.com/datahub-project/static-assets/main/imgs/datahub-logo-color-mark.svg" + } + }, + "created": 1712547125049, + "lastModified": 1712547125049 + } + }, + "systemMetadata": { + "lastObserved": 1712548844816, + "runId": "datahub-2024_04_08-13_00_44", + "lastRunId": "no-run-id-provided" + } + }, + { + "entityType": "post", + "entityUrn": "urn:li:post:e9c7e4a5-b7b0-4390-bd94-9a0fe8acd6bf", + "changeType": "UPSERT", + "aspectName": "postInfo", + "aspect": { + "json": { + "type": "HOME_PAGE_ANNOUNCEMENT", + "content": { + "title": "View Docs", + "type": "LINK", + "link": "https://datahubproject.io/docs", + "media": { + "type": "IMAGE", + "location": "https://raw.githubusercontent.com/datahub-project/static-assets/main/imgs/datahub-logo-color-mark.svg" + } + }, + "created": 1712547164761, + "lastModified": 1712547164761 + } + }, + "systemMetadata": { + "lastObserved": 1712548844816, + "runId": "datahub-2024_04_08-13_00_44", + "lastRunId": "no-run-id-provided" + } + }, + { + "entityType": "post", + "entityUrn": "urn:li:post:326e42b3-2965-47d1-9660-50e513af7d6e", + "changeType": "UPSERT", + "aspectName": "postInfo", + "aspect": { + "json": { + "type": "HOME_PAGE_ANNOUNCEMENT", + "content": { + "title": "Upcoming Events", + "type": "LINK", + "link": "https://datahubproject.io/events", + "media": { + "type": "IMAGE", + "location": "https://datahubproject.io/img/acryl-logo-transparent-mark.svg" + } + }, + "created": 1712547268882, + "lastModified": 1712547268882 + } + }, + "systemMetadata": { + "lastObserved": 1712548844817, + "runId": "datahub-2024_04_08-13_00_44", + "lastRunId": "no-run-id-provided" + } } ] diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 51a086fff77e4..674450999ad73 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -687,6 +687,7 @@ "add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:AddDatasetDataProduct", "simple_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:SimpleAddDatasetDataProduct", "pattern_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:PatternAddDatasetDataProduct", + "replace_external_url = datahub.ingestion.transformer.replace_external_url:ReplaceExternalUrl" ], "datahub.ingestion.sink.plugins": [ "file = datahub.ingestion.sink.file:FileSink", diff --git a/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py b/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py index 7a2dfa7ae0705..ec3d1715aaece 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py +++ b/metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py @@ -228,12 +228,19 @@ def get_resource_owners_work_unit( # If we want to overwrite or there are no existing tags, create a new GlobalTags object current_ownership = OwnershipClass(owners, lastModified=get_audit_stamp()) else: - current_owner_urns: Set[str] = set( - [owner.owner for owner in current_ownership.owners] - ) - owners_filtered: List[OwnerClass] = [ - owner for owner in owners if owner.owner not in current_owner_urns - ] + owners_filtered: List[OwnerClass] = [] + for owner in owners: + owner_exists = False + for current_owner in current_ownership.owners: + if ( + owner.owner == current_owner.owner + and owner.type == current_owner.type + ): + owner_exists = True + break + if not owner_exists: + owners_filtered.append(owner) + # If there are no new owners to add, we don't need to emit a work unit. if len(owners_filtered) <= 0: return None diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/replace_external_url.py b/metadata-ingestion/src/datahub/ingestion/transformer/replace_external_url.py new file mode 100644 index 0000000000000..c222450f87e63 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/transformer/replace_external_url.py @@ -0,0 +1,65 @@ +import copy +import re +from typing import Any, Dict, Optional, cast + +from datahub.configuration.common import ConfigModel +from datahub.emitter.mce_builder import Aspect +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.transformer.dataset_transformer import ( + DatasetPropertiesTransformer, +) +from datahub.metadata.schema_classes import DatasetPropertiesClass + + +class ReplaceExternalUrlConfig(ConfigModel): + input_pattern: str + replacement: str + + +class ReplaceExternalUrl(DatasetPropertiesTransformer): + """Transformer that clean the ownership URN.""" + + ctx: PipelineContext + config: ReplaceExternalUrlConfig + + def __init__( + self, + config: ReplaceExternalUrlConfig, + ctx: PipelineContext, + **resolver_args: Dict[str, Any], + ): + super().__init__() + self.ctx = ctx + self.config = config + self.resolver_args = resolver_args + + @classmethod + def create(cls, config_dict: dict, ctx: PipelineContext) -> "ReplaceExternalUrl": + config = ReplaceExternalUrlConfig.parse_obj(config_dict) + return cls(config, ctx) + + def transform_aspect( + self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect] + ) -> Optional[Aspect]: + in_dataset_properties_aspect: DatasetPropertiesClass = cast( + DatasetPropertiesClass, aspect + ) + + if ( + not hasattr(in_dataset_properties_aspect, "externalUrl") + or not in_dataset_properties_aspect.externalUrl + ): + return cast(Aspect, in_dataset_properties_aspect) + else: + out_dataset_properties_aspect: DatasetPropertiesClass = copy.deepcopy( + in_dataset_properties_aspect + ) + + pattern = re.compile(self.config.input_pattern) + replacement = self.config.replacement + + out_dataset_properties_aspect.externalUrl = re.sub( + pattern, replacement, in_dataset_properties_aspect.externalUrl + ) + + return cast(Aspect, out_dataset_properties_aspect) diff --git a/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_golden.json b/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_golden.json index d56eb50843dd0..9dfd597615c5a 100644 --- a/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_golden.json +++ b/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_golden.json @@ -306,6 +306,36 @@ "runId": "test-csv-enricher" } }, +{ + "entityType": "dashboard", + "entityUrn": "urn:li:dashboard:(looker,baz)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [ + { + "owner": "urn:li:corpuser:datahub", + "type": "BUSINESS_OWNER" + }, + { + "owner": "urn:li:corpuser:jdoe", + "type": "BUSINESS_OWNER" + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 1643871600000, + "actor": "urn:li:corpuser:ingestion" + } + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "test-csv-enricher", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dashboard", "entityUrn": "urn:li:dashboard:(looker,baz)", diff --git a/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_test_data.csv b/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_test_data.csv index 1589d1ea47652..fdbb4dabaf402 100644 --- a/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_test_data.csv +++ b/metadata-ingestion/tests/integration/csv-enricher/csv_enricher_test_data.csv @@ -4,7 +4,7 @@ resource,subresource,glossary_terms,tags,owners,ownership_type,description,domai "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",field_bar,,[urn:li:tag:Legacy],,,field_bar? "urn:li:container:DATABASE",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,container description,urn:li:domain:Engineering "urn:li:chart:(looker,baz1)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description,urn:li:domain:Engineering -"urn:li:dashboard:(looker,baz)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description,urn:li:domain:Engineering +"urn:li:dashboard:(looker,baz)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],BUSINESS_OWNER,new description,urn:li:domain:Engineering "urn:li:mlFeature:(test_feature_table_all_feature_dtypes,test_BOOL_LIST_feature)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description,urn:li:domain:Engineering "urn:li:mlFeatureTable:(urn:li:dataPlatform:feast,user_features)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description,urn:li:domain:Engineering "urn:li:mlPrimaryKey:(test_feature_table_all_feature_dtypes,dummy_entity_1)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description,urn:li:domain:Engineering @@ -13,3 +13,4 @@ resource,subresource,glossary_terms,tags,owners,ownership_type,description,domai "urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_123)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description,urn:li:domain:Engineering "urn:li:dataFlow:(airflow,dag_abc,PROD)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description,urn:li:domain:Engineering "urn:li:notebook:(querybook,1234)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description,urn:li:domain:Engineering +"urn:li:dashboard:(looker,baz)",,[urn:li:glossaryTerm:CustomerAccount],[urn:li:tag:Legacy],[urn:li:corpuser:datahub|urn:li:corpuser:jdoe],TECHNICAL_OWNER,new description,urn:li:domain:Engineering diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index 3782eb0e275f3..89d4fcca8801c 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -72,9 +72,13 @@ ExtractOwnersFromTagsTransformer, ) from datahub.ingestion.transformer.mark_dataset_status import MarkDatasetStatus +from datahub.ingestion.transformer.pattern_cleanup_ownership import ( + PatternCleanUpOwnership, +) from datahub.ingestion.transformer.remove_dataset_ownership import ( SimpleRemoveDatasetOwnership, ) +from datahub.ingestion.transformer.replace_external_url import ReplaceExternalUrl from datahub.metadata.schema_classes import ( BrowsePathsClass, DatasetPropertiesClass, @@ -87,9 +91,6 @@ ) from datahub.utilities.urns.dataset_urn import DatasetUrn from datahub.utilities.urns.urn import Urn -from src.datahub.ingestion.transformer.pattern_cleanup_ownership import ( - PatternCleanUpOwnership, -) def make_generic_dataset( @@ -3209,3 +3210,84 @@ def test_clean_owner_urn_transformation_should_not_remove_system_identifier( config: List[Union[re.Pattern, str]] = ["urn:li:corpuser:"] _test_clean_owner_urns(pipeline_context, in_owner_urns, config, in_owner_urns) + + +def test_replace_external_url_word_replace( + mock_datahub_graph, +): + pipeline_context: PipelineContext = PipelineContext( + run_id="test_replace_external_url" + ) + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig) + + output = run_dataset_transformer_pipeline( + transformer_type=ReplaceExternalUrl, + aspect=models.DatasetPropertiesClass( + externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml", + customProperties=EXISTING_PROPERTIES.copy(), + ), + config={"input_pattern": "datahub", "replacement": "starhub"}, + pipeline_context=pipeline_context, + ) + + assert len(output) == 2 + assert output[0].record + assert output[0].record.aspect + assert ( + output[0].record.aspect.externalUrl + == "https://github.com/starhub/looker-demo/blob/master/foo.view.lkml" + ) + + +def test_replace_external_regex_replace_1( + mock_datahub_graph, +): + pipeline_context: PipelineContext = PipelineContext( + run_id="test_replace_external_url" + ) + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig) + + output = run_dataset_transformer_pipeline( + transformer_type=ReplaceExternalUrl, + aspect=models.DatasetPropertiesClass( + externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml", + customProperties=EXISTING_PROPERTIES.copy(), + ), + config={"input_pattern": r"datahub/.*/", "replacement": "starhub/test/"}, + pipeline_context=pipeline_context, + ) + + assert len(output) == 2 + assert output[0].record + assert output[0].record.aspect + assert ( + output[0].record.aspect.externalUrl + == "https://github.com/starhub/test/foo.view.lkml" + ) + + +def test_replace_external_regex_replace_2( + mock_datahub_graph, +): + pipeline_context: PipelineContext = PipelineContext( + run_id="test_replace_external_url" + ) + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig) + + output = run_dataset_transformer_pipeline( + transformer_type=ReplaceExternalUrl, + aspect=models.DatasetPropertiesClass( + externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml", + customProperties=EXISTING_PROPERTIES.copy(), + ), + config={"input_pattern": r"\b\w*hub\b", "replacement": "test"}, + pipeline_context=pipeline_context, + ) + + assert len(output) == 2 + assert output[0].record + assert output[0].record.aspect + assert ( + output[0].record.aspect.externalUrl + == "https://test.com/test/looker-demo/blob/master/foo.view.lkml" + )