Skip to content

Commit

Permalink
feat(ingestion/dagster): Dagster assetless ingestion (#11262)
Browse files Browse the repository at this point in the history
Co-authored-by: shubhamjagtap639 <shubham.jagtap@gslab.com>
  • Loading branch information
treff7es and shubhamjagtap639 authored Oct 24, 2024
1 parent 707a02c commit f676043
Show file tree
Hide file tree
Showing 8 changed files with 1,161 additions and 313 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from dataclasses import dataclass
from datetime import datetime, timezone
from logging import Logger
from typing import Any, Callable, Dict, List, NamedTuple, Optional, Sequence, Set
from urllib.parse import urlsplit
Expand All @@ -12,6 +13,7 @@
TableSchemaMetadataValue,
)
from dagster._core.execution.stats import RunStepKeyStatsSnapshot, StepEventStatus
from datahub.sql_parsing.sqlglot_utils import get_query_fingerprint

try:
from dagster._core.snap import JobSnapshot # type: ignore[attr-defined]
Expand All @@ -32,13 +34,22 @@
make_data_platform_urn,
make_dataplatform_instance_urn,
make_tag_urn,
make_ts_millis,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
from datahub.metadata._schema_classes import (
AuditStampClass,
BrowsePathEntryClass,
BrowsePathsV2Class,
GlobalTagsClass,
QueryLanguageClass,
QueryPropertiesClass,
QuerySourceClass,
QueryStatementClass,
QuerySubjectClass,
QuerySubjectsClass,
TagAssociationClass,
)
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
Expand All @@ -63,6 +74,7 @@
SubTypesClass,
UpstreamClass,
)
from datahub.metadata.urns import CorpUserUrn
from datahub.specific.dataset import DatasetPatchBuilder
from datahub.utilities.urns._urn_base import Urn
from datahub.utilities.urns.data_flow_urn import DataFlowUrn
Expand All @@ -73,6 +85,8 @@

DAGSTER_PLATFORM = "dagster"

_DEFAULT_USER_URN = CorpUserUrn("_ingestion")


class Constant:
"""
Expand Down Expand Up @@ -212,6 +226,16 @@ class DatahubDagsterSourceConfig(DatasetSourceConfigMixin):
description="Whether to materialize asset dependency in DataHub. It emits a datasetKey for each dependencies. Default is False.",
)

emit_queries: Optional[bool] = pydantic.Field(
default=False,
description="Whether to emit queries aspects. Default is False.",
)

emit_assets: Optional[bool] = pydantic.Field(
default=True,
description="Whether to emit assets aspects. Default is True.",
)

debug_mode: Optional[bool] = pydantic.Field(
default=False,
description="Whether to enable debug mode",
Expand Down Expand Up @@ -243,9 +267,10 @@ def job_url_generator(dagster_url: str, dagster_environment: DagsterEnvironment)
return base_url


class DagsterGenerator:
asset_group_name_cache: Dict[str, str] = {}
DATAHUB_ASSET_GROUP_NAME_CACHE: Dict[str, str] = {}


class DagsterGenerator:
def __init__(
self,
logger: Logger,
Expand Down Expand Up @@ -298,17 +323,16 @@ def update_asset_group_name_cache(
if asset_def:
for key, group_name in asset_def.group_names_by_key.items():
asset_urn = self.dataset_urn_from_asset(key.path)
DagsterGenerator.asset_group_name_cache[
asset_urn.urn()
] = group_name
DATAHUB_ASSET_GROUP_NAME_CACHE[asset_urn.urn()] = group_name
if self.config.debug_mode:
self.logger.debug(
f"Asset group name cache updated: {asset_urn.urn()} -> {group_name}"
)
if self.config.debug_mode:
self.logger.debug(
f"Asset group name cache: {DagsterGenerator.asset_group_name_cache}"
f"Asset group name cache: {DATAHUB_ASSET_GROUP_NAME_CACHE}"
)
self.logger.info(f"Asset group name cache: {DATAHUB_ASSET_GROUP_NAME_CACHE}")

def path_metadata_resolver(self, value: PathMetadataValue) -> Optional[DatasetUrn]:
"""
Expand Down Expand Up @@ -347,6 +371,8 @@ def generate_dataflow(
job_snapshot: JobSnapshot,
env: str,
platform_instance: Optional[str] = None,
remove_double_underscores: bool = True,
add_asset_group_tag: bool = True,
) -> DataFlow:
"""
Generates a Dataflow object from an Dagster Job Snapshot
Expand All @@ -358,17 +384,34 @@ def generate_dataflow(
if self.dagster_environment.is_cloud:
id = f"{self.dagster_environment.branch}/{self.dagster_environment.module}/{job_snapshot.name}"
else:
id = f"{self.dagster_environment.module}/{job_snapshot.name}"
module_name = (
self.dagster_environment.module
if self.dagster_environment.module
else self.dagster_environment.branch
)
id = f"{module_name}/{job_snapshot.name}"

flow_name = job_snapshot.name
if remove_double_underscores and flow_name.split("__"):
flow_name = flow_name.split("__")[-1]

dataflow = DataFlow(
orchestrator=Constant.ORCHESTRATOR,
id=id,
env=env,
name=job_snapshot.name,
name=flow_name,
platform_instance=platform_instance,
)

dataflow.description = job_snapshot.description
dataflow.tags = set(job_snapshot.tags.keys())
if add_asset_group_tag:
asset_group = self.get_asset_group_from_op_name(
job_snapshot.name.split("__")
)
if asset_group:
dataflow.tags.add(f"asset_group:{asset_group}")

if self.config.dagster_url:
dataflow.url = f"{job_url_generator(dagster_url=self.config.dagster_url, dagster_environment=self.dagster_environment)}/jobs/{job_snapshot.name}"
flow_property_bag: Dict[str, str] = {}
Expand All @@ -386,6 +429,8 @@ def generate_datajob(
input_datasets: Dict[str, Set[DatasetUrn]],
output_datasets: Dict[str, Set[DatasetUrn]],
platform_instance: Optional[str] = None,
remove_double_underscores: bool = True,
add_asset_group_tag: bool = True,
) -> DataJob:
"""
Generates a Datajob object from an Dagster op snapshot
Expand All @@ -403,19 +448,29 @@ def generate_datajob(
flow_id = f"{self.dagster_environment.branch}/{self.dagster_environment.module}/{job_snapshot.name}"
job_id = f"{self.dagster_environment.branch}/{self.dagster_environment.module}/{op_def_snap.name}"
else:
flow_id = f"{self.dagster_environment.module}/{job_snapshot.name}"
job_id = f"{self.dagster_environment.module}/{op_def_snap.name}"
module_name = (
self.dagster_environment.module
if self.dagster_environment.module
else self.dagster_environment.branch
)
flow_id = f"{module_name}/{job_snapshot.name}"
job_id = f"{module_name}/{op_def_snap.name}"

dataflow_urn = DataFlowUrn.create_from_ids(
orchestrator=Constant.ORCHESTRATOR,
flow_id=flow_id,
env=env,
platform_instance=platform_instance,
)

job_name = op_def_snap.name
if remove_double_underscores and job_name.split("__"):
job_name = job_name.split("__")[-1]

datajob = DataJob(
id=job_id,
flow_urn=dataflow_urn,
name=op_def_snap.name,
name=job_name,
)

if self.config.dagster_url:
Expand All @@ -424,6 +479,13 @@ def generate_datajob(
datajob.description = op_def_snap.description
datajob.tags = set(op_def_snap.tags.keys())

if add_asset_group_tag:
asset_group = self.get_asset_group_from_op_name(
op_def_snap.name.split("__")
)
if asset_group:
datajob.tags.add(f"asset_group:{asset_group}")

inlets: Set[DatasetUrn] = set()
# Add upstream dependencies for this op
for upstream_op_name in step_deps[op_def_snap.name]:
Expand Down Expand Up @@ -667,9 +729,9 @@ def generate_asset_group_tag(
if not target_urn:
target_urn = asset_urn
self.logger.info(
f"Getting {asset_urn.urn()} from Asset Cache: {DagsterGenerator.asset_group_name_cache}"
f"Getting {asset_urn.urn()} from Asset Cache: {DATAHUB_ASSET_GROUP_NAME_CACHE}"
)
group_name = DagsterGenerator.asset_group_name_cache.get(asset_urn.urn())
group_name = DATAHUB_ASSET_GROUP_NAME_CACHE.get(asset_urn.urn())
if group_name:
current_tags: Optional[GlobalTagsClass] = graph.get_aspect(
entity_urn=target_urn.urn(),
Expand Down Expand Up @@ -698,6 +760,20 @@ def generate_asset_group_tag(

return None

def _gen_entity_browsepath_aspect(
self,
entity_urn: str,
paths: List[str],
) -> MetadataWorkUnit:
entries = [BrowsePathEntryClass(id=path) for path in paths]
if self.config.platform_instance:
urn = make_dataplatform_instance_urn("asset", self.config.platform_instance)
entries = [BrowsePathEntryClass(id=urn, urn=urn)] + entries
return MetadataChangeProposalWrapper(
entityUrn=entity_urn,
aspect=BrowsePathsV2Class(entries),
).as_workunit()

def emit_asset(
self,
graph: DataHubGraph,
Expand Down Expand Up @@ -771,7 +847,8 @@ def emit_asset(
origin=DatasetUrn.create_from_string(downstream).env,
),
)
self.logger.info(f"mcp: {mcp}")
if self.config.debug_mode:
self.logger.info(f"mcp: {mcp}")
graph.emit_mcp(mcp)

patch_builder = DatasetPatchBuilder(downstream)
Expand Down Expand Up @@ -807,20 +884,38 @@ def emit_asset(
for patch_event in patch_builder.build():
graph.emit_mcp(patch_event)

self.logger.info(f"asset_key: {asset_key}")
if self.config.debug_mode:
self.logger.info(f"asset_key: {asset_key}")
self.generate_browse_path(asset_key=asset_key, urn=dataset_urn, graph=graph)
return dataset_urn

def get_asset_group_from_op_name(self, asset_key: Sequence[str]) -> Optional[str]:
"""
Get asset group name from op name
"""
asset_urn = self.dataset_urn_from_asset(asset_key).urn()
asset_group_name = DATAHUB_ASSET_GROUP_NAME_CACHE.get(asset_urn)
if asset_group_name:
self.logger.info(
f"asset_key: {asset_key}, urn: {asset_urn}, asset_group_name: {asset_group_name}"
)
return asset_group_name
else:
self.logger.info(
f"asset_key: {asset_key}, urn: {asset_urn} not in {DATAHUB_ASSET_GROUP_NAME_CACHE}, asset_group_name: None"
)

return None

def generate_browse_path(
self, asset_key: Sequence[str], urn: Urn, graph: DataHubGraph
) -> None:
"""
Generate browse path from asset key
"""
asset_group_name = DagsterGenerator.asset_group_name_cache.get(
self.dataset_urn_from_asset(asset_key).urn()
)
browsePaths: List[BrowsePathEntryClass] = []

asset_group_name = self.get_asset_group_from_op_name(asset_key)
if asset_group_name:
browsePaths.append(BrowsePathEntryClass(asset_group_name))

Expand All @@ -834,3 +929,51 @@ def generate_browse_path(
),
)
graph.emit_mcp(mcp)

def gen_query_aspect(
self,
graph: DataHubGraph,
platform: str,
query_subject_urns: List[str],
query: str,
job_urn: Optional[str] = None,
) -> None:
"""
Generate query aspect for lineage
"""
query_id = get_query_fingerprint(query, platform)

aspects = [
QueryPropertiesClass(
statement=QueryStatementClass(
value=query,
language=QueryLanguageClass.SQL,
),
source=QuerySourceClass.SYSTEM,
origin=job_urn if job_urn else None,
created=AuditStampClass(
make_ts_millis(datetime.now(tz=timezone.utc)),
actor=_DEFAULT_USER_URN.urn(),
),
lastModified=AuditStampClass(
make_ts_millis(datetime.now(tz=timezone.utc)),
actor=_DEFAULT_USER_URN.urn(),
),
),
QuerySubjectsClass(
subjects=[QuerySubjectClass(entity=urn) for urn in query_subject_urns]
),
DataPlatformInstanceClass(
platform=make_data_platform_urn(platform),
),
SubTypesClass(
typeNames=["Query"],
),
]

mcps = MetadataChangeProposalWrapper.construct_many(
entityUrn=f"urn:li:query:dagster_{query_id}",
aspects=aspects,
)
for mcp in mcps:
graph.emit_mcp(mcp)
Loading

0 comments on commit f676043

Please sign in to comment.