Skip to content

Commit

Permalink
feat(glue): make ownership configurable in glue source (#4078)
Browse files Browse the repository at this point in the history
  • Loading branch information
gabe-lyons authored Feb 7, 2022
1 parent 622d7bf commit ec062b6
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 7 deletions.
1 change: 1 addition & 0 deletions metadata-ingestion/source_docs/glue.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ Note that a `.` is used to denote nested fields in the YAML recipe.
| `ignore_unsupported_connectors` | | `True` | Whether to ignore unsupported connectors. If disabled, an error will be raised. |
| `emit_s3_lineage` | | `True` | Whether to emit S3-to-Glue lineage. |
| `glue_s3_lineage_direction` | | `upstream` | If `upstream`, S3 is upstream to Glue. If `downstream` S3 is downstream to Glue. |
| `extract_owners` | | `True` | When enabled, extracts ownership from Glue directly and overwrites existing owners. When disabled, ownership is left empty for datasets. |

## Compatibility

Expand Down
20 changes: 13 additions & 7 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@

class GlueSourceConfig(AwsSourceConfig):

extract_owners: Optional[bool] = True
extract_transforms: Optional[bool] = True
underlying_platform: Optional[str] = None
ignore_unsupported_connectors: Optional[bool] = True
Expand Down Expand Up @@ -89,6 +90,7 @@ class GlueSource(Source):

def __init__(self, config: GlueSourceConfig, ctx: PipelineContext):
super().__init__(ctx)
self.extract_owners = config.extract_owners
self.source_config = config
self.report = GlueSourceReport()
self.glue_client = config.glue_client
Expand Down Expand Up @@ -612,7 +614,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
yield dataset_wu

def _extract_record(self, table: Dict, table_name: str) -> MetadataChangeEvent:
def get_owner() -> OwnershipClass:
def get_owner() -> Optional[OwnershipClass]:
owner = table.get("Owner")
if owner:
owners = [
Expand All @@ -621,11 +623,10 @@ def get_owner() -> OwnershipClass:
type=OwnershipTypeClass.DATAOWNER,
)
]
else:
owners = []
return OwnershipClass(
owners=owners,
)
return OwnershipClass(
owners=owners,
)
return None

def get_dataset_properties() -> DatasetPropertiesClass:
return DatasetPropertiesClass(
Expand Down Expand Up @@ -680,7 +681,12 @@ def get_schema_metadata(glue_source: GlueSource) -> SchemaMetadata:
)

dataset_snapshot.aspects.append(Status(removed=False))
dataset_snapshot.aspects.append(get_owner())

if self.extract_owners:
optional_owner_aspect = get_owner()
if optional_owner_aspect is not None:
dataset_snapshot.aspects.append(optional_owner_aspect)

dataset_snapshot.aspects.append(get_dataset_properties())
dataset_snapshot.aspects.append(get_schema_metadata(self))

Expand Down

0 comments on commit ec062b6

Please sign in to comment.