Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Mar 22, 2024
2 parents 0214c6e + dd502ae commit e72908e
Show file tree
Hide file tree
Showing 12 changed files with 886 additions and 139 deletions.
20 changes: 19 additions & 1 deletion metadata-ingestion/docs/transformer/dataset_transformer.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ The below table shows transformer which can transform aspects of entity [Dataset
| Dataset Aspect | Transformer |
|---------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `status` | - [Mark Dataset status](#mark-dataset-status) |
| `ownership` | - [Simple Add Dataset ownership](#simple-add-dataset-ownership)<br/> - [Pattern Add Dataset ownership](#pattern-add-dataset-ownership)<br/> - [Simple Remove Dataset Ownership](#simple-remove-dataset-ownership)<br/> - [Extract Ownership from Tags](#extract-ownership-from-tags) |
| `ownership` | - [Simple Add Dataset ownership](#simple-add-dataset-ownership)<br/> - [Pattern Add Dataset ownership](#pattern-add-dataset-ownership)<br/> - [Simple Remove Dataset Ownership](#simple-remove-dataset-ownership)<br/> - [Extract Ownership from Tags](#extract-ownership-from-tags)<br/> - [Clean suffix prefix from Ownership](#clean-suffix-prefix-from-ownership) |
| `globalTags` | - [Simple Add Dataset globalTags ](#simple-add-dataset-globaltags)<br/> - [Pattern Add Dataset globalTags](#pattern-add-dataset-globaltags)<br/> - [Add Dataset globalTags](#add-dataset-globaltags) |
| `browsePaths` | - [Set Dataset browsePath](#set-dataset-browsepath) |
| `glossaryTerms` | - [Simple Add Dataset glossaryTerms ](#simple-add-dataset-glossaryterms)<br/> - [Pattern Add Dataset glossaryTerms](#pattern-add-dataset-glossaryterms) |
Expand Down Expand Up @@ -38,6 +38,24 @@ transformers:
email_domain: "coolcompany.com"
```
## Clean suffix prefix from Ownership
### Config Details
| Field | Required | Type | Default | Description |
|-----------------------------|----------|---------|---------------|---------------------------------------------|
| `pattern_for_cleanup` | ✅ | list[string] | | List of suffix/prefix to remove from the Owner URN(s) |


Matches against a Onwer URN and remove the matching part from the Owner URN

```yaml
transformers:
- type: "pattern_cleanup_ownership"
config:
pattern_for_cleanup:
- "ABCDEF"
- (?<=_)(\w+)
```

## Mark Dataset Status
### Config Details
| Field | Required | Type | Default | Description |
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,7 @@
"qlik-sense = datahub.ingestion.source.qlik_sense.qlik_sense:QlikSenseSource",
],
"datahub.ingestion.transformer.plugins": [
"pattern_cleanup_ownership = datahub.ingestion.transformer.pattern_cleanup_ownership:PatternCleanUpOwnership",
"simple_remove_dataset_ownership = datahub.ingestion.transformer.remove_dataset_ownership:SimpleRemoveDatasetOwnership",
"mark_dataset_status = datahub.ingestion.transformer.mark_dataset_status:MarkDatasetStatus",
"set_dataset_browse_path = datahub.ingestion.transformer.add_dataset_browse_path:AddDatasetBrowsePathTransformer",
Expand Down
14 changes: 14 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/api/source_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
DatasetUsageStatisticsClass,
MetadataChangeEventClass,
MetadataChangeProposalClass,
OwnershipClass as Ownership,
StatusClass,
TimeWindowSizeClass,
)
Expand Down Expand Up @@ -79,6 +80,19 @@ def create_dataset_props_patch_builder(
return patch_builder


def create_dataset_owners_patch_builder(
dataset_urn: str,
ownership: Ownership,
) -> DatasetPatchBuilder:
"""Creates a patch builder with a dataset's owners"""
patch_builder = DatasetPatchBuilder(dataset_urn)

for owner in ownership.owners:
patch_builder.add_owner(owner)

return patch_builder


def auto_status_aspect(
stream: Iterable[MetadataWorkUnit],
) -> Iterable[MetadataWorkUnit]:
Expand Down
20 changes: 18 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/unity/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
TestableSource,
TestConnectionReport,
)
from datahub.ingestion.api.source_helpers import (
create_dataset_owners_patch_builder,
create_dataset_props_patch_builder,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws.s3_util import (
make_s3_urn_for_lineage,
Expand Down Expand Up @@ -517,17 +521,29 @@ def process_table(self, table: Table, schema: Schema) -> Iterable[MetadataWorkUn
yield from self.gen_siblings_workunit(dataset_urn, source_dataset_urn)
yield from self.gen_lineage_workunit(dataset_urn, source_dataset_urn)

if ownership:
patch_builder = create_dataset_owners_patch_builder(dataset_urn, ownership)
for patch_mcp in patch_builder.build():
yield MetadataWorkUnit(
id=f"{dataset_urn}-{patch_mcp.aspectName}", mcp_raw=patch_mcp
)

if table_props:
patch_builder = create_dataset_props_patch_builder(dataset_urn, table_props)
for patch_mcp in patch_builder.build():
yield MetadataWorkUnit(
id=f"{dataset_urn}-{patch_mcp.aspectName}", mcp_raw=patch_mcp
)

yield from [
mcp.as_workunit()
for mcp in MetadataChangeProposalWrapper.construct_many(
entityUrn=dataset_urn,
aspects=[
table_props,
view_props,
sub_type,
schema_metadata,
domain,
ownership,
data_platform_instance,
lineage,
],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import re
from typing import List, Optional, Set, cast

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.transformer.dataset_transformer import (
DatasetOwnershipTransformer,
)
from datahub.metadata.schema_classes import (
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
)

_USER_URN_PREFIX: str = "urn:li:corpuser:"


class PatternCleanUpOwnershipConfig(ConfigModel):
pattern_for_cleanup: List[str]


class PatternCleanUpOwnership(DatasetOwnershipTransformer):
"""Transformer that clean the ownership URN."""

ctx: PipelineContext
config: PatternCleanUpOwnershipConfig

def __init__(self, config: PatternCleanUpOwnershipConfig, ctx: PipelineContext):
super().__init__()
self.ctx = ctx
self.config = config

@classmethod
def create(
cls, config_dict: dict, ctx: PipelineContext
) -> "PatternCleanUpOwnership":
config = PatternCleanUpOwnershipConfig.parse_obj(config_dict)
return cls(config, ctx)

def _get_current_owner_urns(self, entity_urn: str) -> Set[str]:
if self.ctx.graph is not None:
current_ownership = self.ctx.graph.get_ownership(entity_urn=entity_urn)
if current_ownership is not None:
current_owner_urns: Set[str] = set(
[owner.owner for owner in current_ownership.owners]
)
return current_owner_urns
else:
return set()
else:
return set()

def transform_aspect(
self, entity_urn: str, aspect_name: str, aspect: Optional[builder.Aspect]
) -> Optional[builder.Aspect]:
# get current owner URNs from the graph
current_owner_urns = self._get_current_owner_urns(entity_urn)

# clean all the owners based on the parameters received from config
cleaned_owner_urns: List[str] = []
for owner_urn in current_owner_urns:
user_id: str = owner_urn.split(_USER_URN_PREFIX)[1]
for value in self.config.pattern_for_cleanup:
user_id = re.sub(value, "", user_id)

cleaned_owner_urns.append(_USER_URN_PREFIX + user_id)

ownership_type, ownership_type_urn = builder.validate_ownership_type(
OwnershipTypeClass.DATAOWNER
)
owners = [
OwnerClass(
owner=owner,
type=ownership_type,
typeUrn=ownership_type_urn,
)
for owner in cleaned_owner_urns
]

out_ownership_aspect: OwnershipClass = OwnershipClass(
owners=[],
lastModified=None,
)

# generate the ownership aspect for the cleaned users
out_ownership_aspect.owners.extend(owners)
return cast(Optional[builder.Aspect], out_ownership_aspect)
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ services:
ACCEPT_EULA: "Y"
SA_PASSWORD: "test!Password"
ports:
- 51433:1433
- 21433:1433
volumes:
- ./setup:/setup
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ source:
config:
username: sa
password: test!Password
host_port: localhost:51433
host_port: localhost:21433

sink:
type: file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ source:
config:
username: sa
password: test!Password
host_port: localhost:51433
host_port: localhost:21433
database_pattern:
deny:
- NewData
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ source:
username: sa
password: test!Password
database: DemoData
host_port: localhost:51433
host_port: localhost:21433
# use_odbc: True
# uri_args:
# driver: "ODBC Driver 17 for SQL Server"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ source:
username: sa
password: test!Password
database: DemoData
host_port: localhost:51433
host_port: localhost:21433
convert_urns_to_lowercase: true
# use_odbc: True
# uri_args:
Expand Down
Loading

0 comments on commit e72908e

Please sign in to comment.