diff --git a/metadata-ingestion/docs/transformer/dataset_transformer.md b/metadata-ingestion/docs/transformer/dataset_transformer.md index 33ff722a0d0dd..66cf47c19de70 100644 --- a/metadata-ingestion/docs/transformer/dataset_transformer.md +++ b/metadata-ingestion/docs/transformer/dataset_transformer.md @@ -7,7 +7,7 @@ The below table shows transformer which can transform aspects of entity [Dataset | Dataset Aspect | Transformer | |---------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | `status` | - [Mark Dataset status](#mark-dataset-status) | -| `ownership` | - [Simple Add Dataset ownership](#simple-add-dataset-ownership)
- [Pattern Add Dataset ownership](#pattern-add-dataset-ownership)
- [Simple Remove Dataset Ownership](#simple-remove-dataset-ownership)
- [Extract Ownership from Tags](#extract-ownership-from-tags) | +| `ownership` | - [Simple Add Dataset ownership](#simple-add-dataset-ownership)
- [Pattern Add Dataset ownership](#pattern-add-dataset-ownership)
- [Simple Remove Dataset Ownership](#simple-remove-dataset-ownership)
- [Extract Ownership from Tags](#extract-ownership-from-tags)
- [Clean suffix prefix from Ownership](#clean-suffix-prefix-from-ownership) | | `globalTags` | - [Simple Add Dataset globalTags ](#simple-add-dataset-globaltags)
- [Pattern Add Dataset globalTags](#pattern-add-dataset-globaltags)
- [Add Dataset globalTags](#add-dataset-globaltags) | | `browsePaths` | - [Set Dataset browsePath](#set-dataset-browsepath) | | `glossaryTerms` | - [Simple Add Dataset glossaryTerms ](#simple-add-dataset-glossaryterms)
- [Pattern Add Dataset glossaryTerms](#pattern-add-dataset-glossaryterms) | @@ -38,6 +38,24 @@ transformers: email_domain: "coolcompany.com" ``` +## Clean suffix prefix from Ownership +### Config Details +| Field | Required | Type | Default | Description | +|-----------------------------|----------|---------|---------------|---------------------------------------------| +| `pattern_for_cleanup` | ✅ | list[string] | | List of suffix/prefix to remove from the Owner URN(s) | + + +Matches against a Onwer URN and remove the matching part from the Owner URN + +```yaml +transformers: + - type: "pattern_cleanup_ownership" + config: + pattern_for_cleanup: + - "ABCDEF" + - (?<=_)(\w+) +``` + ## Mark Dataset Status ### Config Details | Field | Required | Type | Default | Description | diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index b2b451c04f44c..e3e6b502e703b 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -649,6 +649,7 @@ "qlik-sense = datahub.ingestion.source.qlik_sense.qlik_sense:QlikSenseSource", ], "datahub.ingestion.transformer.plugins": [ + "pattern_cleanup_ownership = datahub.ingestion.transformer.pattern_cleanup_ownership:PatternCleanUpOwnership", "simple_remove_dataset_ownership = datahub.ingestion.transformer.remove_dataset_ownership:SimpleRemoveDatasetOwnership", "mark_dataset_status = datahub.ingestion.transformer.mark_dataset_status:MarkDatasetStatus", "set_dataset_browse_path = datahub.ingestion.transformer.add_dataset_browse_path:AddDatasetBrowsePathTransformer", diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index a528508e5944b..785dfeb0e055d 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -29,6 +29,7 @@ DatasetUsageStatisticsClass, MetadataChangeEventClass, MetadataChangeProposalClass, + OwnershipClass as Ownership, StatusClass, TimeWindowSizeClass, ) @@ -79,6 +80,19 @@ def create_dataset_props_patch_builder( return patch_builder +def create_dataset_owners_patch_builder( + dataset_urn: str, + ownership: Ownership, +) -> DatasetPatchBuilder: + """Creates a patch builder with a dataset's owners""" + patch_builder = DatasetPatchBuilder(dataset_urn) + + for owner in ownership.owners: + patch_builder.add_owner(owner) + + return patch_builder + + def auto_status_aspect( stream: Iterable[MetadataWorkUnit], ) -> Iterable[MetadataWorkUnit]: diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py index 143d8dd0e2949..9a326ec584d21 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py @@ -40,6 +40,10 @@ TestableSource, TestConnectionReport, ) +from datahub.ingestion.api.source_helpers import ( + create_dataset_owners_patch_builder, + create_dataset_props_patch_builder, +) from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.aws.s3_util import ( make_s3_urn_for_lineage, @@ -517,17 +521,29 @@ def process_table(self, table: Table, schema: Schema) -> Iterable[MetadataWorkUn yield from self.gen_siblings_workunit(dataset_urn, source_dataset_urn) yield from self.gen_lineage_workunit(dataset_urn, source_dataset_urn) + if ownership: + patch_builder = create_dataset_owners_patch_builder(dataset_urn, ownership) + for patch_mcp in patch_builder.build(): + yield MetadataWorkUnit( + id=f"{dataset_urn}-{patch_mcp.aspectName}", mcp_raw=patch_mcp + ) + + if table_props: + patch_builder = create_dataset_props_patch_builder(dataset_urn, table_props) + for patch_mcp in patch_builder.build(): + yield MetadataWorkUnit( + id=f"{dataset_urn}-{patch_mcp.aspectName}", mcp_raw=patch_mcp + ) + yield from [ mcp.as_workunit() for mcp in MetadataChangeProposalWrapper.construct_many( entityUrn=dataset_urn, aspects=[ - table_props, view_props, sub_type, schema_metadata, domain, - ownership, data_platform_instance, lineage, ], diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/pattern_cleanup_ownership.py b/metadata-ingestion/src/datahub/ingestion/transformer/pattern_cleanup_ownership.py new file mode 100644 index 0000000000000..1e949affd1766 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/transformer/pattern_cleanup_ownership.py @@ -0,0 +1,88 @@ +import re +from typing import List, Optional, Set, cast + +import datahub.emitter.mce_builder as builder +from datahub.configuration.common import ConfigModel +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.transformer.dataset_transformer import ( + DatasetOwnershipTransformer, +) +from datahub.metadata.schema_classes import ( + OwnerClass, + OwnershipClass, + OwnershipTypeClass, +) + +_USER_URN_PREFIX: str = "urn:li:corpuser:" + + +class PatternCleanUpOwnershipConfig(ConfigModel): + pattern_for_cleanup: List[str] + + +class PatternCleanUpOwnership(DatasetOwnershipTransformer): + """Transformer that clean the ownership URN.""" + + ctx: PipelineContext + config: PatternCleanUpOwnershipConfig + + def __init__(self, config: PatternCleanUpOwnershipConfig, ctx: PipelineContext): + super().__init__() + self.ctx = ctx + self.config = config + + @classmethod + def create( + cls, config_dict: dict, ctx: PipelineContext + ) -> "PatternCleanUpOwnership": + config = PatternCleanUpOwnershipConfig.parse_obj(config_dict) + return cls(config, ctx) + + def _get_current_owner_urns(self, entity_urn: str) -> Set[str]: + if self.ctx.graph is not None: + current_ownership = self.ctx.graph.get_ownership(entity_urn=entity_urn) + if current_ownership is not None: + current_owner_urns: Set[str] = set( + [owner.owner for owner in current_ownership.owners] + ) + return current_owner_urns + else: + return set() + else: + return set() + + def transform_aspect( + self, entity_urn: str, aspect_name: str, aspect: Optional[builder.Aspect] + ) -> Optional[builder.Aspect]: + # get current owner URNs from the graph + current_owner_urns = self._get_current_owner_urns(entity_urn) + + # clean all the owners based on the parameters received from config + cleaned_owner_urns: List[str] = [] + for owner_urn in current_owner_urns: + user_id: str = owner_urn.split(_USER_URN_PREFIX)[1] + for value in self.config.pattern_for_cleanup: + user_id = re.sub(value, "", user_id) + + cleaned_owner_urns.append(_USER_URN_PREFIX + user_id) + + ownership_type, ownership_type_urn = builder.validate_ownership_type( + OwnershipTypeClass.DATAOWNER + ) + owners = [ + OwnerClass( + owner=owner, + type=ownership_type, + typeUrn=ownership_type_urn, + ) + for owner in cleaned_owner_urns + ] + + out_ownership_aspect: OwnershipClass = OwnershipClass( + owners=[], + lastModified=None, + ) + + # generate the ownership aspect for the cleaned users + out_ownership_aspect.owners.extend(owners) + return cast(Optional[builder.Aspect], out_ownership_aspect) diff --git a/metadata-ingestion/tests/integration/sql_server/docker-compose.yml b/metadata-ingestion/tests/integration/sql_server/docker-compose.yml index 2ce0433140554..1046321e4f720 100644 --- a/metadata-ingestion/tests/integration/sql_server/docker-compose.yml +++ b/metadata-ingestion/tests/integration/sql_server/docker-compose.yml @@ -8,6 +8,6 @@ services: ACCEPT_EULA: "Y" SA_PASSWORD: "test!Password" ports: - - 51433:1433 + - 21433:1433 volumes: - ./setup:/setup diff --git a/metadata-ingestion/tests/integration/sql_server/source_files/mssql_no_db_to_file.yml b/metadata-ingestion/tests/integration/sql_server/source_files/mssql_no_db_to_file.yml index dc85ded768b61..fbc96b06ef213 100644 --- a/metadata-ingestion/tests/integration/sql_server/source_files/mssql_no_db_to_file.yml +++ b/metadata-ingestion/tests/integration/sql_server/source_files/mssql_no_db_to_file.yml @@ -5,7 +5,7 @@ source: config: username: sa password: test!Password - host_port: localhost:51433 + host_port: localhost:21433 sink: type: file diff --git a/metadata-ingestion/tests/integration/sql_server/source_files/mssql_no_db_with_filter.yml b/metadata-ingestion/tests/integration/sql_server/source_files/mssql_no_db_with_filter.yml index 19a99c62aa9f4..3749499074adf 100644 --- a/metadata-ingestion/tests/integration/sql_server/source_files/mssql_no_db_with_filter.yml +++ b/metadata-ingestion/tests/integration/sql_server/source_files/mssql_no_db_with_filter.yml @@ -5,7 +5,7 @@ source: config: username: sa password: test!Password - host_port: localhost:51433 + host_port: localhost:21433 database_pattern: deny: - NewData diff --git a/metadata-ingestion/tests/integration/sql_server/source_files/mssql_to_file.yml b/metadata-ingestion/tests/integration/sql_server/source_files/mssql_to_file.yml index c53e3cf6b8045..40bef3ff104a3 100644 --- a/metadata-ingestion/tests/integration/sql_server/source_files/mssql_to_file.yml +++ b/metadata-ingestion/tests/integration/sql_server/source_files/mssql_to_file.yml @@ -6,7 +6,7 @@ source: username: sa password: test!Password database: DemoData - host_port: localhost:51433 + host_port: localhost:21433 # use_odbc: True # uri_args: # driver: "ODBC Driver 17 for SQL Server" diff --git a/metadata-ingestion/tests/integration/sql_server/source_files/mssql_with_lower_case_urn.yml b/metadata-ingestion/tests/integration/sql_server/source_files/mssql_with_lower_case_urn.yml index 4e96d137670ba..ff1179034833f 100644 --- a/metadata-ingestion/tests/integration/sql_server/source_files/mssql_with_lower_case_urn.yml +++ b/metadata-ingestion/tests/integration/sql_server/source_files/mssql_with_lower_case_urn.yml @@ -6,7 +6,7 @@ source: username: sa password: test!Password database: DemoData - host_port: localhost:51433 + host_port: localhost:21433 convert_urns_to_lowercase: true # use_odbc: True # uri_args: diff --git a/metadata-ingestion/tests/integration/unity/unity_catalog_mces_golden.json b/metadata-ingestion/tests/integration/unity/unity_catalog_mces_golden.json index 88aa0938942b3..1f0193fef6063 100644 --- a/metadata-ingestion/tests/integration/unity/unity_catalog_mces_golden.json +++ b/metadata-ingestion/tests/integration/unity/unity_catalog_mces_golden.json @@ -216,32 +216,80 @@ { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.view1,PROD)", - "changeType": "UPSERT", + "changeType": "PATCH", "aspectName": "datasetProperties", "aspect": { - "json": { - "customProperties": { - "table_type": "HIVE_VIEW", - "Catalog": "hive_metastore", - "Database": "bronze_kambi", - "Table": "view1", - "Last Access": "UNKNOWN", - "Created By": "Spark 3.2.1", - "Owner": "root", - "table_id": "hive_metastore.bronze_kambi.view1", - "created_at": "2022-06-22 05:14:56" - }, - "externalUrl": "https://dummy.cloud.databricks.com/explore/data/hive_metastore/bronze_kambi/view1", - "name": "view1", - "qualifiedName": "hive_metastore.bronze_kambi.view1", - "created": { - "time": 1655874896000 + "json": [ + { + "op": "add", + "path": "/name", + "value": "view1" }, - "lastModified": { - "time": 1655874896000 + { + "op": "add", + "path": "/created", + "value": { + "time": 1655874896000 + } }, - "tags": [] - } + { + "op": "add", + "path": "/lastModified", + "value": { + "time": 1655874896000 + } + }, + { + "op": "add", + "path": "/qualifiedName", + "value": "hive_metastore.bronze_kambi.view1" + }, + { + "op": "add", + "path": "/customProperties/table_type", + "value": "HIVE_VIEW" + }, + { + "op": "add", + "path": "/customProperties/Catalog", + "value": "hive_metastore" + }, + { + "op": "add", + "path": "/customProperties/Database", + "value": "bronze_kambi" + }, + { + "op": "add", + "path": "/customProperties/Table", + "value": "view1" + }, + { + "op": "add", + "path": "/customProperties/Last Access", + "value": "UNKNOWN" + }, + { + "op": "add", + "path": "/customProperties/Created By", + "value": "Spark 3.2.1" + }, + { + "op": "add", + "path": "/customProperties/Owner", + "value": "root" + }, + { + "op": "add", + "path": "/customProperties/table_id", + "value": "hive_metastore.bronze_kambi.view1" + }, + { + "op": "add", + "path": "/customProperties/created_at", + "value": "2022-06-22 05:14:56" + } + ] }, "systemMetadata": { "lastObserved": 1638860400000, @@ -1240,37 +1288,105 @@ { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.bet,PROD)", - "changeType": "UPSERT", + "changeType": "PATCH", "aspectName": "datasetProperties", "aspect": { - "json": { - "customProperties": { - "storage_location": "dbfs:/user/hive/warehouse/bronze_kambi.db/bet", - "data_source_format": "DELTA", - "table_type": "HIVE_MANAGED_TABLE", - "Catalog": "hive_metastore", - "Database": "bronze_kambi", - "Table": "bet", - "Last Access": "UNKNOWN", - "Created By": "Spark 3.2.1", - "Statistics": "1024 bytes, 3 rows", - "Owner": "root", - "Is_managed_location": "true", - "Table Properties": "[delta.autoOptimize.autoCompact=true,delta.autoOptimize.optimizeWrite=true,delta.minReaderVersion=1,delta.minWriterVersion=2]", - "table_id": "hive_metastore.bronze_kambi.bet", - "created_at": "2022-06-22 05:14:56" - }, - "externalUrl": "https://dummy.cloud.databricks.com/explore/data/hive_metastore/bronze_kambi/bet", - "name": "bet", - "qualifiedName": "hive_metastore.bronze_kambi.bet", - "created": { - "time": 1655874896000 + "json": [ + { + "op": "add", + "path": "/name", + "value": "bet" }, - "lastModified": { - "time": 1655874896000 + { + "op": "add", + "path": "/created", + "value": { + "time": 1655874896000 + } }, - "tags": [] - } + { + "op": "add", + "path": "/lastModified", + "value": { + "time": 1655874896000 + } + }, + { + "op": "add", + "path": "/qualifiedName", + "value": "hive_metastore.bronze_kambi.bet" + }, + { + "op": "add", + "path": "/customProperties/storage_location", + "value": "dbfs:/user/hive/warehouse/bronze_kambi.db/bet" + }, + { + "op": "add", + "path": "/customProperties/data_source_format", + "value": "DELTA" + }, + { + "op": "add", + "path": "/customProperties/table_type", + "value": "HIVE_MANAGED_TABLE" + }, + { + "op": "add", + "path": "/customProperties/Catalog", + "value": "hive_metastore" + }, + { + "op": "add", + "path": "/customProperties/Database", + "value": "bronze_kambi" + }, + { + "op": "add", + "path": "/customProperties/Table", + "value": "bet" + }, + { + "op": "add", + "path": "/customProperties/Last Access", + "value": "UNKNOWN" + }, + { + "op": "add", + "path": "/customProperties/Created By", + "value": "Spark 3.2.1" + }, + { + "op": "add", + "path": "/customProperties/Statistics", + "value": "1024 bytes, 3 rows" + }, + { + "op": "add", + "path": "/customProperties/Owner", + "value": "root" + }, + { + "op": "add", + "path": "/customProperties/Is_managed_location", + "value": "true" + }, + { + "op": "add", + "path": "/customProperties/Table Properties", + "value": "[delta.autoOptimize.autoCompact=true,delta.autoOptimize.optimizeWrite=true,delta.minReaderVersion=1,delta.minWriterVersion=2]" + }, + { + "op": "add", + "path": "/customProperties/table_id", + "value": "hive_metastore.bronze_kambi.bet" + }, + { + "op": "add", + "path": "/customProperties/created_at", + "value": "2022-06-22 05:14:56" + } + ] }, "systemMetadata": { "lastObserved": 1638860400000, @@ -1654,41 +1770,117 @@ { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,quickstart_catalog.quickstart_schema.quickstart_table_external,PROD)", - "changeType": "UPSERT", + "changeType": "PATCH", "aspectName": "datasetProperties", "aspect": { - "json": { - "customProperties": { - "storage_location": "s3://db-02eec1f70bfe4115445be9fdb1aac6ac-s3-root-bucket/metastore/2c983545-d403-4f87-9063-5b7e3b6d3736/tables/cff27aa1-1c6a-4d78-b713-562c660c2896", - "data_source_format": "DELTA", - "generation": "2", - "table_type": "EXTERNAL", - "created_by": "abc@acryl.io", - "delta.lastCommitTimestamp": "1666185711000", - "delta.lastUpdateVersion": "1", - "delta.minReaderVersion": "1", - "delta.minWriterVersion": "2", - "spark.sql.statistics.numRows": "10", - "spark.sql.statistics.totalSize": "512", - "table_id": "cff27aa1-1c6a-4d78-b713-562c660c2896", - "owner": "account users", - "updated_by": "abc@acryl.io", - "updated_at": "2022-10-19 13:27:29.633000+00:00", - "created_at": "2022-10-19 13:21:38.688000+00:00" - }, - "externalUrl": "https://dummy.cloud.databricks.com/explore/data/quickstart_catalog/quickstart_schema/quickstart_table_external", - "name": "quickstart_table_external", - "qualifiedName": "quickstart_catalog.quickstart_schema.quickstart_table_external", - "created": { - "time": 1666185698688, - "actor": "urn:li:corpuser:abc@acryl.io" + "json": [ + { + "op": "add", + "path": "/name", + "value": "quickstart_table_external" }, - "lastModified": { - "time": 1666186049633, - "actor": "urn:li:corpuser:abc@acryl.io" + { + "op": "add", + "path": "/created", + "value": { + "time": 1666185698688, + "actor": "urn:li:corpuser:abc@acryl.io" + } }, - "tags": [] - } + { + "op": "add", + "path": "/lastModified", + "value": { + "time": 1666186049633, + "actor": "urn:li:corpuser:abc@acryl.io" + } + }, + { + "op": "add", + "path": "/qualifiedName", + "value": "quickstart_catalog.quickstart_schema.quickstart_table_external" + }, + { + "op": "add", + "path": "/customProperties/storage_location", + "value": "s3://db-02eec1f70bfe4115445be9fdb1aac6ac-s3-root-bucket/metastore/2c983545-d403-4f87-9063-5b7e3b6d3736/tables/cff27aa1-1c6a-4d78-b713-562c660c2896" + }, + { + "op": "add", + "path": "/customProperties/data_source_format", + "value": "DELTA" + }, + { + "op": "add", + "path": "/customProperties/generation", + "value": "2" + }, + { + "op": "add", + "path": "/customProperties/table_type", + "value": "EXTERNAL" + }, + { + "op": "add", + "path": "/customProperties/created_by", + "value": "abc@acryl.io" + }, + { + "op": "add", + "path": "/customProperties/delta.lastCommitTimestamp", + "value": "1666185711000" + }, + { + "op": "add", + "path": "/customProperties/delta.lastUpdateVersion", + "value": "1" + }, + { + "op": "add", + "path": "/customProperties/delta.minReaderVersion", + "value": "1" + }, + { + "op": "add", + "path": "/customProperties/delta.minWriterVersion", + "value": "2" + }, + { + "op": "add", + "path": "/customProperties/spark.sql.statistics.numRows", + "value": "10" + }, + { + "op": "add", + "path": "/customProperties/spark.sql.statistics.totalSize", + "value": "512" + }, + { + "op": "add", + "path": "/customProperties/table_id", + "value": "cff27aa1-1c6a-4d78-b713-562c660c2896" + }, + { + "op": "add", + "path": "/customProperties/owner", + "value": "account users" + }, + { + "op": "add", + "path": "/customProperties/updated_by", + "value": "abc@acryl.io" + }, + { + "op": "add", + "path": "/customProperties/updated_at", + "value": "2022-10-19 13:27:29.633000+00:00" + }, + { + "op": "add", + "path": "/customProperties/created_at", + "value": "2022-10-19 13:21:38.688000+00:00" + } + ] }, "systemMetadata": { "lastObserved": 1638860400000, @@ -1758,22 +1950,19 @@ { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,quickstart_catalog.quickstart_schema.quickstart_table_external,PROD)", - "changeType": "UPSERT", + "changeType": "PATCH", "aspectName": "ownership", "aspect": { - "json": { - "owners": [ - { + "json": [ + { + "op": "add", + "path": "/owners/urn:li:corpuser:account users/DATAOWNER", + "value": { "owner": "urn:li:corpuser:account users", "type": "DATAOWNER" } - ], - "ownerTypes": {}, - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" } - } + ] }, "systemMetadata": { "lastObserved": 1638860400000, @@ -1876,41 +2065,117 @@ { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,quickstart_catalog.quickstart_schema.quickstart_table,PROD)", - "changeType": "UPSERT", + "changeType": "PATCH", "aspectName": "datasetProperties", "aspect": { - "json": { - "customProperties": { - "storage_location": "s3://db-02eec1f70bfe4115445be9fdb1aac6ac-s3-root-bucket/metastore/2c983545-d403-4f87-9063-5b7e3b6d3736/tables/cff27aa1-1c6a-4d78-b713-562c660c2896", - "data_source_format": "DELTA", - "generation": "2", - "table_type": "MANAGED", - "created_by": "abc@acryl.io", - "delta.lastCommitTimestamp": "1666185711000", - "delta.lastUpdateVersion": "1", - "delta.minReaderVersion": "1", - "delta.minWriterVersion": "2", - "spark.sql.statistics.numRows": "10", - "spark.sql.statistics.totalSize": "512", - "table_id": "cff27aa1-1c6a-4d78-b713-562c660c2896", - "owner": "account users", - "updated_by": "abc@acryl.io", - "updated_at": "2022-10-19 13:27:29.633000+00:00", - "created_at": "2022-10-19 13:21:38.688000+00:00" - }, - "externalUrl": "https://dummy.cloud.databricks.com/explore/data/quickstart_catalog/quickstart_schema/quickstart_table", - "name": "quickstart_table", - "qualifiedName": "quickstart_catalog.quickstart_schema.quickstart_table", - "created": { - "time": 1666185698688, - "actor": "urn:li:corpuser:abc@acryl.io" + "json": [ + { + "op": "add", + "path": "/name", + "value": "quickstart_table" }, - "lastModified": { - "time": 1666186049633, - "actor": "urn:li:corpuser:abc@acryl.io" + { + "op": "add", + "path": "/created", + "value": { + "time": 1666185698688, + "actor": "urn:li:corpuser:abc@acryl.io" + } }, - "tags": [] - } + { + "op": "add", + "path": "/lastModified", + "value": { + "time": 1666186049633, + "actor": "urn:li:corpuser:abc@acryl.io" + } + }, + { + "op": "add", + "path": "/qualifiedName", + "value": "quickstart_catalog.quickstart_schema.quickstart_table" + }, + { + "op": "add", + "path": "/customProperties/storage_location", + "value": "s3://db-02eec1f70bfe4115445be9fdb1aac6ac-s3-root-bucket/metastore/2c983545-d403-4f87-9063-5b7e3b6d3736/tables/cff27aa1-1c6a-4d78-b713-562c660c2896" + }, + { + "op": "add", + "path": "/customProperties/data_source_format", + "value": "DELTA" + }, + { + "op": "add", + "path": "/customProperties/generation", + "value": "2" + }, + { + "op": "add", + "path": "/customProperties/table_type", + "value": "MANAGED" + }, + { + "op": "add", + "path": "/customProperties/created_by", + "value": "abc@acryl.io" + }, + { + "op": "add", + "path": "/customProperties/delta.lastCommitTimestamp", + "value": "1666185711000" + }, + { + "op": "add", + "path": "/customProperties/delta.lastUpdateVersion", + "value": "1" + }, + { + "op": "add", + "path": "/customProperties/delta.minReaderVersion", + "value": "1" + }, + { + "op": "add", + "path": "/customProperties/delta.minWriterVersion", + "value": "2" + }, + { + "op": "add", + "path": "/customProperties/spark.sql.statistics.numRows", + "value": "10" + }, + { + "op": "add", + "path": "/customProperties/spark.sql.statistics.totalSize", + "value": "512" + }, + { + "op": "add", + "path": "/customProperties/table_id", + "value": "cff27aa1-1c6a-4d78-b713-562c660c2896" + }, + { + "op": "add", + "path": "/customProperties/owner", + "value": "account users" + }, + { + "op": "add", + "path": "/customProperties/updated_by", + "value": "abc@acryl.io" + }, + { + "op": "add", + "path": "/customProperties/updated_at", + "value": "2022-10-19 13:27:29.633000+00:00" + }, + { + "op": "add", + "path": "/customProperties/created_at", + "value": "2022-10-19 13:21:38.688000+00:00" + } + ] }, "systemMetadata": { "lastObserved": 1638860400000, @@ -1946,22 +2211,19 @@ { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,quickstart_catalog.quickstart_schema.quickstart_table,PROD)", - "changeType": "UPSERT", + "changeType": "PATCH", "aspectName": "ownership", "aspect": { - "json": { - "owners": [ - { + "json": [ + { + "op": "add", + "path": "/owners/urn:li:corpuser:account users/DATAOWNER", + "value": { "owner": "urn:li:corpuser:account users", "type": "DATAOWNER" } - ], - "ownerTypes": {}, - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" } - } + ] }, "systemMetadata": { "lastObserved": 1638860400000, diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index 2a6176906a0c3..6828c741dda2b 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -87,6 +87,9 @@ ) from datahub.utilities.urns.dataset_urn import DatasetUrn from datahub.utilities.urns.urn import Urn +from src.datahub.ingestion.transformer.pattern_cleanup_ownership import ( + PatternCleanUpOwnership, +) def make_generic_dataset( @@ -2841,3 +2844,348 @@ def test_add_dataset_data_product_transformation(): assert [item["value"]["destinationUrn"] for item in first_data_product_aspect] == [ builder.make_dataset_urn("bigquery", "example1") ] + + +def _test_clean_owner_urns( + in_pipeline_context: Any, + in_owners: List[str], + config: List[Union[re.Pattern, str]], + cleaned_owner_urn: List[str], +) -> None: + # Return fake aspect to simulate server behaviour + def fake_ownership_class(entity_urn: str) -> models.OwnershipClass: + return models.OwnershipClass( + owners=[ + models.OwnerClass(owner=owner, type=models.OwnershipTypeClass.DATAOWNER) + for owner in in_owners + ] + ) + + in_pipeline_context.graph.get_ownership = fake_ownership_class # type: ignore + + output = run_dataset_transformer_pipeline( + transformer_type=PatternCleanUpOwnership, + aspect=models.OwnershipClass( + owners=[ + models.OwnerClass(owner=owner, type=models.OwnershipTypeClass.DATAOWNER) + for owner in in_owners + ] + ), + config={"pattern_for_cleanup": config}, + pipeline_context=in_pipeline_context, + ) + + assert len(output) == 2 + ownership_aspect = output[0].record.aspect + assert isinstance(ownership_aspect, OwnershipClass) + assert len(ownership_aspect.owners) == len(in_owners) + + out_owners = [owner.owner for owner in ownership_aspect.owners] + assert set(out_owners) == set(cleaned_owner_urn) + + +def test_clean_owner_urn_transformation_remove_fixed_string(mock_datahub_graph): + pipeline_context = PipelineContext(run_id="transformer_pipe_line") + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + + user_emails = [ + "ABCDEF:email_id@example.com", + "ABCDEF:123email_id@example.com", + "email_id@example.co.in", + "email_id@example.co.uk", + "email_test:XYZ@example.com", + "email_id:id1@example.com", + "email_id:id2@example.com", + ] + + in_owner_urns: List[str] = [] + for user in user_emails: + in_owner_urns.append( + builder.make_owner_urn(user, owner_type=builder.OwnerType.USER) + ) + + # remove 'ABCDEF:' + config: List[Union[re.Pattern, str]] = ["ABCDEF:"] + expected_user_emails: List[str] = [ + "email_id@example.com", + "123email_id@example.com", + "email_id@example.co.in", + "email_id@example.co.uk", + "email_test:XYZ@example.com", + "email_id:id1@example.com", + "email_id:id2@example.com", + ] + expected_owner_urns: List[str] = [] + for user in expected_user_emails: + expected_owner_urns.append( + builder.make_owner_urn(user, owner_type=builder.OwnerType.USER) + ) + _test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns) + + +def test_clean_owner_urn_transformation_remove_multiple_values(mock_datahub_graph): + pipeline_context = PipelineContext(run_id="transformer_pipe_line") + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + + user_emails = [ + "ABCDEF:email_id@example.com", + "ABCDEF:123email_id@example.com", + "email_id@example.co.in", + "email_id@example.co.uk", + "email_test:XYZ@example.com", + "email_id:id1@example.com", + "email_id:id2@example.com", + ] + + in_owner_urns: List[str] = [] + for user in user_emails: + in_owner_urns.append( + builder.make_owner_urn(user, owner_type=builder.OwnerType.USER) + ) + + # remove multiple values + config: List[Union[re.Pattern, str]] = ["ABCDEF:", "email"] + expected_user_emails: List[str] = [ + "_id@example.com", + "123_id@example.com", + "_id@example.co.in", + "_id@example.co.uk", + "_test:XYZ@example.com", + "_id:id1@example.com", + "_id:id2@example.com", + ] + expected_owner_urns: List[str] = [] + for user in expected_user_emails: + expected_owner_urns.append( + builder.make_owner_urn(user, owner_type=builder.OwnerType.USER) + ) + _test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns) + + +def test_clean_owner_urn_transformation_remove_values_using_regex(mock_datahub_graph): + pipeline_context = PipelineContext(run_id="transformer_pipe_line") + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + + user_emails = [ + "ABCDEF:email_id@example.com", + "ABCDEF:123email_id@example.com", + "email_id@example.co.in", + "email_id@example.co.uk", + "email_test:XYZ@example.com", + "email_id:id1@example.com", + "email_id:id2@example.com", + ] + + in_owner_urns: List[str] = [] + for user in user_emails: + in_owner_urns.append( + builder.make_owner_urn(user, owner_type=builder.OwnerType.USER) + ) + + # remove words after `_` using RegEx i.e. `id`, `test` + config: List[Union[re.Pattern, str]] = [r"(?<=_)(\w+)"] + expected_user_emails: List[str] = [ + "ABCDEF:email_@example.com", + "ABCDEF:123email_@example.com", + "email_@example.co.in", + "email_@example.co.uk", + "email_:XYZ@example.com", + "email_:id1@example.com", + "email_:id2@example.com", + ] + expected_owner_urns: List[str] = [] + for user in expected_user_emails: + expected_owner_urns.append( + builder.make_owner_urn(user, owner_type=builder.OwnerType.USER) + ) + _test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns) + + +def test_clean_owner_urn_transformation_remove_digits(mock_datahub_graph): + pipeline_context = PipelineContext(run_id="transformer_pipe_line") + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + + user_emails = [ + "ABCDEF:email_id@example.com", + "ABCDEF:123email_id@example.com", + "email_id@example.co.in", + "email_id@example.co.uk", + "email_test:XYZ@example.com", + "email_id:id1@example.com", + "email_id:id2@example.com", + ] + + in_owner_urns: List[str] = [] + for user in user_emails: + in_owner_urns.append( + builder.make_owner_urn(user, owner_type=builder.OwnerType.USER) + ) + + # remove digits + config: List[Union[re.Pattern, str]] = [r"\d+"] + expected_user_emails: List[str] = [ + "ABCDEF:email_id@example.com", + "ABCDEF:email_id@example.com", + "email_id@example.co.in", + "email_id@example.co.uk", + "email_test:XYZ@example.com", + "email_id:id@example.com", + "email_id:id@example.com", + ] + expected_owner_urns: List[str] = [] + for user in expected_user_emails: + expected_owner_urns.append( + builder.make_owner_urn(user, owner_type=builder.OwnerType.USER) + ) + _test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns) + + +def test_clean_owner_urn_transformation_remove_pattern(mock_datahub_graph): + pipeline_context = PipelineContext(run_id="transformer_pipe_line") + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + + user_emails = [ + "ABCDEF:email_id@example.com", + "ABCDEF:123email_id@example.com", + "email_id@example.co.in", + "email_id@example.co.uk", + "email_test:XYZ@example.com", + "email_id:id1@example.com", + "email_id:id2@example.com", + ] + + in_owner_urns: List[str] = [] + for user in user_emails: + in_owner_urns.append( + builder.make_owner_urn(user, owner_type=builder.OwnerType.USER) + ) + + # remove `example.*` + config: List[Union[re.Pattern, str]] = [r"@example\.\S*"] + expected_user_emails: List[str] = [ + "ABCDEF:email_id", + "ABCDEF:123email_id", + "email_id", + "email_id", + "email_test:XYZ", + "email_id:id1", + "email_id:id2", + ] + expected_owner_urns: List[str] = [] + for user in expected_user_emails: + expected_owner_urns.append( + builder.make_owner_urn(user, owner_type=builder.OwnerType.USER) + ) + _test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns) + + +def test_clean_owner_urn_transformation_remove_word_in_capital_letters( + mock_datahub_graph, +): + pipeline_context = PipelineContext(run_id="transformer_pipe_line") + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + + user_emails = [ + "ABCDEF:email_id@example.com", + "ABCDEF:123email_id@example.com", + "email_id@example.co.in", + "email_id@example.co.uk", + "email_test:XYZ@example.com", + "email_id:id1@example.com", + "email_id:id2@example.com", + "email_test:XYabZ@example.com", + ] + + in_owner_urns: List[str] = [] + for user in user_emails: + in_owner_urns.append( + builder.make_owner_urn(user, owner_type=builder.OwnerType.USER) + ) + + # if string between `:` and `@` is in CAPITAL then remove it + config: List[Union[re.Pattern, str]] = ["(?<=:)[A-Z]+(?=@)"] + expected_user_emails: List[str] = [ + "ABCDEF:email_id@example.com", + "ABCDEF:123email_id@example.com", + "email_id@example.co.in", + "email_id@example.co.uk", + "email_test:@example.com", + "email_id:id1@example.com", + "email_id:id2@example.com", + "email_test:XYabZ@example.com", + ] + expected_owner_urns: List[str] = [] + for user in expected_user_emails: + expected_owner_urns.append( + builder.make_owner_urn(user, owner_type=builder.OwnerType.USER) + ) + _test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns) + + +def test_clean_owner_urn_transformation_remove_pattern_with_alphanumeric_value( + mock_datahub_graph, +): + pipeline_context = PipelineContext(run_id="transformer_pipe_line") + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + + user_emails = [ + "ABCDEF:email_id@example.com", + "ABCDEF:123email_id@example.com", + "email_id@example.co.in", + "email_id@example.co.uk", + "email_test:XYZ@example.com", + "email_id:id1@example.com", + "email_id:id2@example.com", + ] + + in_owner_urns: List[str] = [] + for user in user_emails: + in_owner_urns.append( + builder.make_owner_urn(user, owner_type=builder.OwnerType.USER) + ) + + # remove any pattern having `id` followed by any digits + config: List[Union[re.Pattern, str]] = [r"id\d+"] + expected_user_emails: List[str] = [ + "ABCDEF:email_id@example.com", + "ABCDEF:123email_id@example.com", + "email_id@example.co.in", + "email_id@example.co.uk", + "email_test:XYZ@example.com", + "email_id:@example.com", + "email_id:@example.com", + ] + expected_owner_urns: List[str] = [] + for user in expected_user_emails: + expected_owner_urns.append( + builder.make_owner_urn(user, owner_type=builder.OwnerType.USER) + ) + _test_clean_owner_urns(pipeline_context, in_owner_urns, config, expected_owner_urns) + + +def test_clean_owner_urn_transformation_should_not_remove_system_identifier( + mock_datahub_graph, +): + pipeline_context = PipelineContext(run_id="transformer_pipe_line") + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + + user_emails = [ + "ABCDEF:email_id@example.com", + "ABCDEF:123email_id@example.com", + "email_id@example.co.in", + "email_id@example.co.uk", + "email_test:XYZ@example.com", + "email_id:id1@example.com", + "email_id:id2@example.com", + ] + + in_owner_urns: List[str] = [] + for user in user_emails: + in_owner_urns.append( + builder.make_owner_urn(user, owner_type=builder.OwnerType.USER) + ) + + # should not remove system identifier + config: List[Union[re.Pattern, str]] = ["urn:li:corpuser:"] + + _test_clean_owner_urns(pipeline_context, in_owner_urns, config, in_owner_urns)