diff --git a/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/client/dagster_generator.py b/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/client/dagster_generator.py index df123b127e040..2fdd0a41edf6c 100644 --- a/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/client/dagster_generator.py +++ b/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/client/dagster_generator.py @@ -1,4 +1,5 @@ from dataclasses import dataclass +from datetime import datetime, timezone from logging import Logger from typing import Any, Callable, Dict, List, NamedTuple, Optional, Sequence, Set from urllib.parse import urlsplit @@ -12,6 +13,7 @@ TableSchemaMetadataValue, ) from dagster._core.execution.stats import RunStepKeyStatsSnapshot, StepEventStatus +from datahub.sql_parsing.sqlglot_utils import get_query_fingerprint try: from dagster._core.snap import JobSnapshot # type: ignore[attr-defined] @@ -32,13 +34,22 @@ make_data_platform_urn, make_dataplatform_instance_urn, make_tag_urn, + make_ts_millis, ) from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph from datahub.metadata._schema_classes import ( + AuditStampClass, BrowsePathEntryClass, BrowsePathsV2Class, GlobalTagsClass, + QueryLanguageClass, + QueryPropertiesClass, + QuerySourceClass, + QueryStatementClass, + QuerySubjectClass, + QuerySubjectsClass, TagAssociationClass, ) from datahub.metadata.com.linkedin.pegasus2avro.schema import ( @@ -63,6 +74,7 @@ SubTypesClass, UpstreamClass, ) +from datahub.metadata.urns import CorpUserUrn from datahub.specific.dataset import DatasetPatchBuilder from datahub.utilities.urns._urn_base import Urn from datahub.utilities.urns.data_flow_urn import DataFlowUrn @@ -73,6 +85,8 @@ DAGSTER_PLATFORM = "dagster" +_DEFAULT_USER_URN = CorpUserUrn("_ingestion") + class Constant: """ @@ -212,6 +226,16 @@ class DatahubDagsterSourceConfig(DatasetSourceConfigMixin): description="Whether to materialize asset dependency in DataHub. It emits a datasetKey for each dependencies. Default is False.", ) + emit_queries: Optional[bool] = pydantic.Field( + default=False, + description="Whether to emit queries aspects. Default is False.", + ) + + emit_assets: Optional[bool] = pydantic.Field( + default=True, + description="Whether to emit assets aspects. Default is True.", + ) + debug_mode: Optional[bool] = pydantic.Field( default=False, description="Whether to enable debug mode", @@ -243,9 +267,10 @@ def job_url_generator(dagster_url: str, dagster_environment: DagsterEnvironment) return base_url -class DagsterGenerator: - asset_group_name_cache: Dict[str, str] = {} +DATAHUB_ASSET_GROUP_NAME_CACHE: Dict[str, str] = {} + +class DagsterGenerator: def __init__( self, logger: Logger, @@ -298,17 +323,16 @@ def update_asset_group_name_cache( if asset_def: for key, group_name in asset_def.group_names_by_key.items(): asset_urn = self.dataset_urn_from_asset(key.path) - DagsterGenerator.asset_group_name_cache[ - asset_urn.urn() - ] = group_name + DATAHUB_ASSET_GROUP_NAME_CACHE[asset_urn.urn()] = group_name if self.config.debug_mode: self.logger.debug( f"Asset group name cache updated: {asset_urn.urn()} -> {group_name}" ) if self.config.debug_mode: self.logger.debug( - f"Asset group name cache: {DagsterGenerator.asset_group_name_cache}" + f"Asset group name cache: {DATAHUB_ASSET_GROUP_NAME_CACHE}" ) + self.logger.info(f"Asset group name cache: {DATAHUB_ASSET_GROUP_NAME_CACHE}") def path_metadata_resolver(self, value: PathMetadataValue) -> Optional[DatasetUrn]: """ @@ -347,6 +371,8 @@ def generate_dataflow( job_snapshot: JobSnapshot, env: str, platform_instance: Optional[str] = None, + remove_double_underscores: bool = True, + add_asset_group_tag: bool = True, ) -> DataFlow: """ Generates a Dataflow object from an Dagster Job Snapshot @@ -358,17 +384,34 @@ def generate_dataflow( if self.dagster_environment.is_cloud: id = f"{self.dagster_environment.branch}/{self.dagster_environment.module}/{job_snapshot.name}" else: - id = f"{self.dagster_environment.module}/{job_snapshot.name}" + module_name = ( + self.dagster_environment.module + if self.dagster_environment.module + else self.dagster_environment.branch + ) + id = f"{module_name}/{job_snapshot.name}" + + flow_name = job_snapshot.name + if remove_double_underscores and flow_name.split("__"): + flow_name = flow_name.split("__")[-1] dataflow = DataFlow( orchestrator=Constant.ORCHESTRATOR, id=id, env=env, - name=job_snapshot.name, + name=flow_name, platform_instance=platform_instance, ) + dataflow.description = job_snapshot.description dataflow.tags = set(job_snapshot.tags.keys()) + if add_asset_group_tag: + asset_group = self.get_asset_group_from_op_name( + job_snapshot.name.split("__") + ) + if asset_group: + dataflow.tags.add(f"asset_group:{asset_group}") + if self.config.dagster_url: dataflow.url = f"{job_url_generator(dagster_url=self.config.dagster_url, dagster_environment=self.dagster_environment)}/jobs/{job_snapshot.name}" flow_property_bag: Dict[str, str] = {} @@ -386,6 +429,8 @@ def generate_datajob( input_datasets: Dict[str, Set[DatasetUrn]], output_datasets: Dict[str, Set[DatasetUrn]], platform_instance: Optional[str] = None, + remove_double_underscores: bool = True, + add_asset_group_tag: bool = True, ) -> DataJob: """ Generates a Datajob object from an Dagster op snapshot @@ -403,8 +448,13 @@ def generate_datajob( flow_id = f"{self.dagster_environment.branch}/{self.dagster_environment.module}/{job_snapshot.name}" job_id = f"{self.dagster_environment.branch}/{self.dagster_environment.module}/{op_def_snap.name}" else: - flow_id = f"{self.dagster_environment.module}/{job_snapshot.name}" - job_id = f"{self.dagster_environment.module}/{op_def_snap.name}" + module_name = ( + self.dagster_environment.module + if self.dagster_environment.module + else self.dagster_environment.branch + ) + flow_id = f"{module_name}/{job_snapshot.name}" + job_id = f"{module_name}/{op_def_snap.name}" dataflow_urn = DataFlowUrn.create_from_ids( orchestrator=Constant.ORCHESTRATOR, @@ -412,10 +462,15 @@ def generate_datajob( env=env, platform_instance=platform_instance, ) + + job_name = op_def_snap.name + if remove_double_underscores and job_name.split("__"): + job_name = job_name.split("__")[-1] + datajob = DataJob( id=job_id, flow_urn=dataflow_urn, - name=op_def_snap.name, + name=job_name, ) if self.config.dagster_url: @@ -424,6 +479,13 @@ def generate_datajob( datajob.description = op_def_snap.description datajob.tags = set(op_def_snap.tags.keys()) + if add_asset_group_tag: + asset_group = self.get_asset_group_from_op_name( + op_def_snap.name.split("__") + ) + if asset_group: + datajob.tags.add(f"asset_group:{asset_group}") + inlets: Set[DatasetUrn] = set() # Add upstream dependencies for this op for upstream_op_name in step_deps[op_def_snap.name]: @@ -667,9 +729,9 @@ def generate_asset_group_tag( if not target_urn: target_urn = asset_urn self.logger.info( - f"Getting {asset_urn.urn()} from Asset Cache: {DagsterGenerator.asset_group_name_cache}" + f"Getting {asset_urn.urn()} from Asset Cache: {DATAHUB_ASSET_GROUP_NAME_CACHE}" ) - group_name = DagsterGenerator.asset_group_name_cache.get(asset_urn.urn()) + group_name = DATAHUB_ASSET_GROUP_NAME_CACHE.get(asset_urn.urn()) if group_name: current_tags: Optional[GlobalTagsClass] = graph.get_aspect( entity_urn=target_urn.urn(), @@ -698,6 +760,20 @@ def generate_asset_group_tag( return None + def _gen_entity_browsepath_aspect( + self, + entity_urn: str, + paths: List[str], + ) -> MetadataWorkUnit: + entries = [BrowsePathEntryClass(id=path) for path in paths] + if self.config.platform_instance: + urn = make_dataplatform_instance_urn("asset", self.config.platform_instance) + entries = [BrowsePathEntryClass(id=urn, urn=urn)] + entries + return MetadataChangeProposalWrapper( + entityUrn=entity_urn, + aspect=BrowsePathsV2Class(entries), + ).as_workunit() + def emit_asset( self, graph: DataHubGraph, @@ -771,7 +847,8 @@ def emit_asset( origin=DatasetUrn.create_from_string(downstream).env, ), ) - self.logger.info(f"mcp: {mcp}") + if self.config.debug_mode: + self.logger.info(f"mcp: {mcp}") graph.emit_mcp(mcp) patch_builder = DatasetPatchBuilder(downstream) @@ -807,20 +884,38 @@ def emit_asset( for patch_event in patch_builder.build(): graph.emit_mcp(patch_event) - self.logger.info(f"asset_key: {asset_key}") + if self.config.debug_mode: + self.logger.info(f"asset_key: {asset_key}") self.generate_browse_path(asset_key=asset_key, urn=dataset_urn, graph=graph) return dataset_urn + def get_asset_group_from_op_name(self, asset_key: Sequence[str]) -> Optional[str]: + """ + Get asset group name from op name + """ + asset_urn = self.dataset_urn_from_asset(asset_key).urn() + asset_group_name = DATAHUB_ASSET_GROUP_NAME_CACHE.get(asset_urn) + if asset_group_name: + self.logger.info( + f"asset_key: {asset_key}, urn: {asset_urn}, asset_group_name: {asset_group_name}" + ) + return asset_group_name + else: + self.logger.info( + f"asset_key: {asset_key}, urn: {asset_urn} not in {DATAHUB_ASSET_GROUP_NAME_CACHE}, asset_group_name: None" + ) + + return None + def generate_browse_path( self, asset_key: Sequence[str], urn: Urn, graph: DataHubGraph ) -> None: """ Generate browse path from asset key """ - asset_group_name = DagsterGenerator.asset_group_name_cache.get( - self.dataset_urn_from_asset(asset_key).urn() - ) browsePaths: List[BrowsePathEntryClass] = [] + + asset_group_name = self.get_asset_group_from_op_name(asset_key) if asset_group_name: browsePaths.append(BrowsePathEntryClass(asset_group_name)) @@ -834,3 +929,51 @@ def generate_browse_path( ), ) graph.emit_mcp(mcp) + + def gen_query_aspect( + self, + graph: DataHubGraph, + platform: str, + query_subject_urns: List[str], + query: str, + job_urn: Optional[str] = None, + ) -> None: + """ + Generate query aspect for lineage + """ + query_id = get_query_fingerprint(query, platform) + + aspects = [ + QueryPropertiesClass( + statement=QueryStatementClass( + value=query, + language=QueryLanguageClass.SQL, + ), + source=QuerySourceClass.SYSTEM, + origin=job_urn if job_urn else None, + created=AuditStampClass( + make_ts_millis(datetime.now(tz=timezone.utc)), + actor=_DEFAULT_USER_URN.urn(), + ), + lastModified=AuditStampClass( + make_ts_millis(datetime.now(tz=timezone.utc)), + actor=_DEFAULT_USER_URN.urn(), + ), + ), + QuerySubjectsClass( + subjects=[QuerySubjectClass(entity=urn) for urn in query_subject_urns] + ), + DataPlatformInstanceClass( + platform=make_data_platform_urn(platform), + ), + SubTypesClass( + typeNames=["Query"], + ), + ] + + mcps = MetadataChangeProposalWrapper.construct_many( + entityUrn=f"urn:li:query:dagster_{query_id}", + aspects=aspects, + ) + for mcp in mcps: + graph.emit_mcp(mcp) diff --git a/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/sensors/datahub_sensors.py b/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/sensors/datahub_sensors.py index ebb2c82d952b1..f6b0629b7ca7b 100644 --- a/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/sensors/datahub_sensors.py +++ b/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/sensors/datahub_sensors.py @@ -46,8 +46,10 @@ create_lineage_sql_parsed_result, ) from datahub.utilities.urns.dataset_urn import DatasetUrn +from datahub.utilities.urns.error import InvalidUrnError from datahub_dagster_plugin.client.dagster_generator import ( + DATAHUB_ASSET_GROUP_NAME_CACHE, Constant, DagsterEnvironment, DagsterGenerator, @@ -268,7 +270,6 @@ def get_dagster_environment( module = code_pointer.module else: context.log.error("Unable to get Module") - return None dagster_environment = DagsterEnvironment( is_cloud=os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", None) is not None, @@ -326,100 +327,188 @@ def process_asset_materialization( dataset_inputs: Dict[str, Set[DatasetUrn]], dataset_outputs: Dict[str, Set[DatasetUrn]], ) -> None: - if ( - log.dagster_event - and log.dagster_event.event_type == DagsterEventType.ASSET_MATERIALIZATION - ): - assert log.step_key + if not self._is_valid_asset_materialization(log): + return - materialization = log.asset_materialization - if not materialization: - return + asset_materialization = log.asset_materialization + if asset_materialization is None: + return - properties = { - key: str(value) for (key, value) in materialization.metadata.items() - } - asset_key = materialization.asset_key.path + asset_key = asset_materialization.asset_key.path + asset_downstream_urn = self._get_asset_downstream_urn( + log, context, dagster_generator, list(asset_key) + ) - asset_downstream_urn: Optional[DatasetUrn] = None - # If DataHub Urn is set then we prefer that as downstream urn - if materialization.metadata.get("datahub_urn") and isinstance( - materialization.metadata.get("datahub_urn"), TextMetadataValue - ): - try: - asset_downstream_urn = DatasetUrn.from_string( - str(materialization.metadata["datahub_urn"].text) - ) - context.log.info( - f"asset_downstream_urn from metadata datahub_urn: {asset_downstream_urn}" - ) - except Exception as e: - context.log.error(f"Error in parsing datahub_urn: {e}") + if not asset_downstream_urn: + return + + properties = { + key: str(value) for (key, value) in asset_materialization.metadata.items() + } + upstreams, downstreams = self._process_lineage( + context=context, + dagster_generator=dagster_generator, + log=log, + asset_downstream_urn=asset_downstream_urn, + ) + + self._emit_or_connect_asset( + context, + dagster_generator, + log, + list(asset_key), + properties, + upstreams, + downstreams, + dataset_inputs, + dataset_outputs, + ) - if not asset_downstream_urn: - asset_downstream_urn = ( - dagster_generator.asset_keys_to_dataset_urn_converter(asset_key) + def _is_valid_asset_materialization(self, log: EventLogEntry) -> bool: + return ( + log.dagster_event is not None + and log.dagster_event.event_type == DagsterEventType.ASSET_MATERIALIZATION + and log.step_key is not None + and log.asset_materialization is not None + ) + + def _get_asset_downstream_urn( + self, + log: EventLogEntry, + context: RunStatusSensorContext, + dagster_generator: DagsterGenerator, + asset_key: List[str], + ) -> Optional[DatasetUrn]: + materialization = log.asset_materialization + if materialization is None: + return None + + asset_downstream_urn: Optional[DatasetUrn] = None + + if materialization.metadata.get("datahub_urn") and isinstance( + materialization.metadata.get("datahub_urn"), TextMetadataValue + ): + try: + asset_downstream_urn = DatasetUrn.from_string( + str(materialization.metadata["datahub_urn"].text) ) context.log.info( - f"asset_downstream_urn from asset keys: {asset_downstream_urn}" + f"asset_downstream_urn from metadata datahub_urn: {asset_downstream_urn}" ) + except Exception as e: + context.log.error(f"Error in parsing datahub_urn: {e}") + + if not asset_downstream_urn: + asset_downstream_urn = ( + dagster_generator.asset_keys_to_dataset_urn_converter(asset_key) + ) + context.log.info( + f"asset_downstream_urn from asset keys: {asset_downstream_urn}" + ) - if asset_downstream_urn: - context.log.info(f"asset_downstream_urn: {asset_downstream_urn}") + return asset_downstream_urn - downstreams = {asset_downstream_urn.urn()} - context.log.info(f"downstreams: {downstreams}") - upstreams: Set[str] = set() - if self.config.enable_asset_query_metadata_parsing: - try: - if ( - materialization - and materialization.metadata - and materialization.metadata.get("Query") - and isinstance( - materialization.metadata.get("Query"), TextMetadataValue - ) - ): - query_metadata = materialization.metadata.get("Query") - assert query_metadata - lineage = self.parse_sql( - context=context, - sql_query=str(query_metadata.text), - env=asset_downstream_urn.env, - platform=asset_downstream_urn.platform.replace( - "urn:li:dataPlatform:", "" - ), + def _process_lineage( + self, + context: RunStatusSensorContext, + dagster_generator: DagsterGenerator, + log: EventLogEntry, + asset_downstream_urn: DatasetUrn, + ) -> Tuple[Set[str], Set[str]]: + downstreams = {asset_downstream_urn.urn()} + upstreams: Set[str] = set() + + if ( + log.asset_materialization + and self.config.enable_asset_query_metadata_parsing + ): + try: + query_metadata = log.asset_materialization.metadata.get("Query") + if isinstance(query_metadata, TextMetadataValue): + lineage = self.parse_sql( + context=context, + sql_query=str(query_metadata.text), + env=asset_downstream_urn.env, + platform=asset_downstream_urn.platform.replace( + "urn:li:dataPlatform:", "" + ), + ) + if lineage and lineage.downstreams: + if self.config.emit_queries: + dagster_generator.gen_query_aspect( + graph=self.graph, + platform=asset_downstream_urn.platform, + query_subject_urns=lineage.upstreams + + lineage.downstreams, + query=str(query_metadata.text), ) - # To make sure we don't process select queries check if downstream is present - if lineage and lineage.downstreams: - downstreams = downstreams.union( - set(lineage.downstreams) - ) - upstreams = upstreams.union(set(lineage.upstreams)) - context.log.info( - f"Upstreams: {upstreams} Downstreams: {downstreams}" - ) - else: - context.log.info( - f"Lineage not found for {query_metadata.text}" - ) - else: - context.log.info("Query not found in metadata") - except Exception as e: - context.log.info(f"Error in processing asset logs: {e}") - - # Emitting asset with upstreams and downstreams - dataset_urn = dagster_generator.emit_asset( - self.graph, - asset_key, - materialization.description, - properties, - downstreams=downstreams, - upstreams=upstreams, - materialize_dependencies=self.config.materialize_dependencies, + downstreams = downstreams.union(set(lineage.downstreams)) + upstreams = upstreams.union(set(lineage.upstreams)) + context.log.info( + f"Upstreams: {upstreams} Downstreams: {downstreams}" + ) + else: + context.log.info(f"Lineage not found for {query_metadata.text}") + else: + context.log.info("Query not found in metadata") + except Exception as e: + context.log.exception(f"Error in processing asset logs: {e}") + + return upstreams, downstreams + + def _emit_or_connect_asset( + self, + context: RunStatusSensorContext, + dagster_generator: DagsterGenerator, + log: EventLogEntry, + asset_key: List[str], + properties: Dict[str, str], + upstreams: Set[str], + downstreams: Set[str], + dataset_inputs: Dict[str, Set[DatasetUrn]], + dataset_outputs: Dict[str, Set[DatasetUrn]], + ) -> None: + if self.config.emit_assets: + context.log.info("Emitting asset metadata...") + dataset_urn = dagster_generator.emit_asset( + self.graph, + asset_key, + log.asset_materialization.description + if log.asset_materialization + else None, + properties, + downstreams=downstreams, + upstreams=upstreams, + materialize_dependencies=self.config.materialize_dependencies, + ) + if log.step_key: + dataset_outputs[log.step_key].add(dataset_urn) + else: + context.log.info( + "Not emitting assets but connecting materialized dataset to DataJobs" + ) + if log.step_key: + dataset_outputs[log.step_key] = dataset_outputs[log.step_key].union( + [DatasetUrn.from_string(d) for d in downstreams] ) - dataset_outputs[log.step_key].add(dataset_urn) + dataset_upstreams: List[DatasetUrn] = [] + + for u in upstreams: + try: + dataset_upstreams.append(DatasetUrn.from_string(u)) + except InvalidUrnError as e: + context.log.error( + f"Error in parsing upstream dataset urn: {e}", exc_info=True + ) + continue + + dataset_inputs[log.step_key] = dataset_inputs[log.step_key].union( + dataset_upstreams + ) + context.log.info( + f"Dataset Inputs: {dataset_inputs[log.step_key]} Dataset Outputs: {dataset_outputs[log.step_key]}" + ) def process_asset_observation( self, @@ -593,7 +682,9 @@ def _emit_asset_metadata( dagster_environment=dagster_environment, ) - context.log.info("Emitting asset metadata...") + context.log.info( + f"Updating asset group name cache... {DATAHUB_ASSET_GROUP_NAME_CACHE}" + ) dagster_generator.update_asset_group_name_cache(context) return SkipReason("Asset metadata processed") @@ -681,10 +772,21 @@ def _emit_metadata( platform_instance=self.config.platform_instance, ) + if ( + dataflow.name + and dataflow.name.startswith("__ASSET_JOB") + and dataflow.name.split("__") + ): + dagster_generator.generate_browse_path( + dataflow.name.split("__"), urn=dataflow.urn, graph=self.graph + ) + dataflow.name = dataflow.name.split("__")[-1] + dataflow.emit(self.graph) if self.config.debug_mode: for mcp in dataflow.generate_mcp(): - context.log.debug(f"Emitted MCP: {mcp}") + if self.config.debug_mode: + context.log.debug(f"Emitted MCP: {mcp}") # Emit dagster job run which get mapped with datahub data process instance entity dagster_generator.emit_job_run( @@ -721,6 +823,16 @@ def _emit_metadata( input_datasets=dataset_inputs, ) + if ( + datajob.name + and datajob.name.startswith("__ASSET_JOB") + and datajob.name.split("__") + ): + dagster_generator.generate_browse_path( + datajob.name.split("__"), urn=datajob.urn, graph=self.graph + ) + datajob.name = datajob.name.split("__")[-1] + datajob.emit(self.graph) if self.config.debug_mode: diff --git a/metadata-ingestion-modules/dagster-plugin/tests/unit/conftest.py b/metadata-ingestion-modules/dagster-plugin/tests/unit/conftest.py new file mode 100644 index 0000000000000..5b9442d048672 --- /dev/null +++ b/metadata-ingestion-modules/dagster-plugin/tests/unit/conftest.py @@ -0,0 +1,21 @@ +import pathlib +import site +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from _pytest.config import Parser + + +def pytest_addoption(parser: "Parser") -> None: + parser.addoption( + "--update-golden-files", + action="store_true", + default=False, + ) + + +# See https://coverage.readthedocs.io/en/latest/subprocess.html#configuring-python-for-sub-process-measurement +coverage_startup_code = "import coverage; coverage.process_startup()" +site_packages_dir = pathlib.Path(site.getsitepackages()[0]) +pth_file_path = site_packages_dir / "datahub_coverage_startup.pth" +pth_file_path.write_text(coverage_startup_code) diff --git a/metadata-ingestion-modules/dagster-plugin/tests/unit/golden/golden_test_emit_metadata_mcps.json b/metadata-ingestion-modules/dagster-plugin/tests/unit/golden/golden_test_emit_metadata_mcps.json new file mode 100644 index 0000000000000..afd67422d1564 --- /dev/null +++ b/metadata-ingestion-modules/dagster-plugin/tests/unit/golden/golden_test_emit_metadata_mcps.json @@ -0,0 +1,658 @@ +[ +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(dagster,prod/etl,PROD)", + "changeType": "UPSERT", + "aspectName": "dataFlowInfo", + "aspect": { + "json": { + "customProperties": {}, + "name": "etl", + "env": "PROD" + } + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(dagster,prod/etl,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(dagster,prod/etl,PROD)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:dagster" + } + } + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(dagster,prod/etl,PROD)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(dagster,prod/etl,PROD)", + "changeType": "UPSERT", + "aspectName": "dataFlowInfo", + "aspect": { + "json": { + "customProperties": {}, + "name": "etl", + "env": "PROD" + } + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(dagster,prod/etl,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(dagster,prod/etl,PROD)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:dagster" + } + } + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(dagster,prod/etl,PROD)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:0fa780ff92120ac3bf49d0bc070bbda2", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceProperties", + "aspect": { + "json": { + "customProperties": { + "job_snapshot_id": "28d03bf7b3138bea153a21f70f29e7c3ffa6ab0a", + "execution_plan_snapshot_id": "0f504b218cd28750ffa8d90a40e9647acba72021", + "has_repository_load_data": "False", + "tags": "{}", + "steps_succeeded": "2", + "steps_failed": "0", + "materializations": "0", + "expectations": "0", + "start_time": "1720681200.0", + "end_time": "1720681200.0" + }, + "name": "12345678123456781234567812345678", + "type": "BATCH_SCHEDULED", + "created": { + "time": 1720681200000, + "actor": "urn:li:corpuser:datahub" + } + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/extract)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": { + "output_result": "{'name': 'result', 'dagster_type_key': 'Any', 'description': None, 'is_required': True, 'metadata': {'datahub.outputs': JsonMetadataValue(data=['urn:li:dataset:(urn:li:dataPlatform:snowflake,tableB,PROD)'])}, 'is_dynamic': False}" + }, + "name": "extract", + "type": { + "string": "COMMAND" + }, + "env": "PROD" + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:0fa780ff92120ac3bf49d0bc070bbda2", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRelationships", + "aspect": { + "json": { + "parentTemplate": "urn:li:dataFlow:(dagster,prod/etl,PROD)", + "upstreamInstances": [] + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/extract)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:0fa780ff92120ac3bf49d0bc070bbda2", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1720681200000, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "status": "STARTED" + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:0fa780ff92120ac3bf49d0bc070bbda2", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1720681200000, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "status": "COMPLETE", + "result": { + "type": "SUCCESS", + "nativeResultType": "dagster" + } + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/extract)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "json": { + "inputDatasets": [], + "outputDatasets": [], + "inputDatajobs": [], + "fineGrainedLineages": [] + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/extract)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:dagster" + } + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/extract)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/extract)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": { + "output_result": "{'name': 'result', 'dagster_type_key': 'Any', 'description': None, 'is_required': True, 'metadata': {'datahub.outputs': JsonMetadataValue(data=['urn:li:dataset:(urn:li:dataPlatform:snowflake,tableB,PROD)'])}, 'is_dynamic': False}" + }, + "name": "extract", + "type": { + "string": "COMMAND" + }, + "env": "PROD" + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/extract)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/extract)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "json": { + "inputDatasets": [], + "outputDatasets": [], + "inputDatajobs": [], + "fineGrainedLineages": [] + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/extract)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:dagster" + } + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/extract)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/transform)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": { + "input.data": "{'name': 'data', 'dagster_type_key': 'Any', 'description': None, 'metadata': {'datahub.inputs': JsonMetadataValue(data=['urn:li:dataset:(urn:li:dataPlatform:snowflake,tableA,PROD)'])}}", + "output_result": "{'name': 'result', 'dagster_type_key': 'Any', 'description': None, 'is_required': True, 'metadata': {}, 'is_dynamic': False}" + }, + "name": "transform", + "type": { + "string": "COMMAND" + }, + "env": "PROD" + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/transform)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/transform)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "json": { + "inputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:snowflake,tableA,PROD)" + ], + "outputDatasets": [], + "inputDatajobs": [], + "fineGrainedLineages": [] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,tableA,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetKey", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake", + "name": "tableA", + "origin": "PROD" + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:8e047f916b20c89e1874518890d6ff7e", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceProperties", + "aspect": { + "json": { + "customProperties": { + "step_key": "extract", + "attempts": "1", + "start_time": "1720681200.0", + "end_time": "1720681200.0" + }, + "name": "12345678123456781234567812345678.prod/extract", + "type": "BATCH_SCHEDULED", + "created": { + "time": 1720681200000, + "actor": "urn:li:corpuser:datahub" + } + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/transform)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:dagster" + } + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:8e047f916b20c89e1874518890d6ff7e", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRelationships", + "aspect": { + "json": { + "parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/extract)", + "upstreamInstances": [] + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/transform)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:8e047f916b20c89e1874518890d6ff7e", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1720681200000, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "status": "STARTED" + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:8e047f916b20c89e1874518890d6ff7e", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1720681200000, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "status": "COMPLETE", + "result": { + "type": "SUCCESS", + "nativeResultType": "dagster" + } + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/transform)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": { + "input.data": "{'name': 'data', 'dagster_type_key': 'Any', 'description': None, 'metadata': {'datahub.inputs': JsonMetadataValue(data=['urn:li:dataset:(urn:li:dataPlatform:snowflake,tableA,PROD)'])}}", + "output_result": "{'name': 'result', 'dagster_type_key': 'Any', 'description': None, 'is_required': True, 'metadata': {}, 'is_dynamic': False}" + }, + "name": "transform", + "type": { + "string": "COMMAND" + }, + "env": "PROD" + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/transform)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/transform)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "json": { + "inputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:snowflake,tableA,PROD)" + ], + "outputDatasets": [], + "inputDatajobs": [], + "fineGrainedLineages": [] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,tableA,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetKey", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake", + "name": "tableA", + "origin": "PROD" + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/transform)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:dagster" + } + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/transform)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:6d553b12b2e5bf4cff8dc527e12e7116", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceProperties", + "aspect": { + "json": { + "customProperties": { + "step_key": "transform", + "attempts": "1", + "start_time": "1720681200.0", + "end_time": "1720681200.0" + }, + "name": "12345678123456781234567812345678.prod/transform", + "type": "BATCH_SCHEDULED", + "created": { + "time": 1720681200000, + "actor": "urn:li:corpuser:datahub" + } + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,tableA,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetKey", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake", + "name": "tableA", + "origin": "PROD" + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:6d553b12b2e5bf4cff8dc527e12e7116", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRelationships", + "aspect": { + "json": { + "parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/transform)", + "upstreamInstances": [] + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:6d553b12b2e5bf4cff8dc527e12e7116", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceInput", + "aspect": { + "json": { + "inputs": [ + "urn:li:dataset:(urn:li:dataPlatform:snowflake,tableA,PROD)" + ] + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:6d553b12b2e5bf4cff8dc527e12e7116", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1720681200000, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "status": "STARTED" + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:6d553b12b2e5bf4cff8dc527e12e7116", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1720681200000, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "status": "COMPLETE", + "result": { + "type": "SUCCESS", + "nativeResultType": "dagster" + } + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion-modules/dagster-plugin/tests/unit/test_dagster.py b/metadata-ingestion-modules/dagster-plugin/tests/unit/test_dagster.py index ac46cfd86fbb9..c951b959f85d4 100644 --- a/metadata-ingestion-modules/dagster-plugin/tests/unit/test_dagster.py +++ b/metadata-ingestion-modules/dagster-plugin/tests/unit/test_dagster.py @@ -1,6 +1,11 @@ +import json +import pathlib +import tempfile +import uuid +from typing import Dict, List, Mapping, Sequence, Set from unittest.mock import Mock, patch -import pytest +import dagster._core.utils from dagster import ( DagsterInstance, In, @@ -11,12 +16,16 @@ job, op, ) -from datahub.api.entities.dataprocess.dataprocess_instance import ( - DataProcessInstanceKey, - InstanceRunResult, +from dagster._core.definitions.job_definition import JobDefinition +from dagster._core.definitions.repository_definition import ( + RepositoryData, + RepositoryDefinition, ) -from datahub.configuration.source_common import DEFAULT_ENV +from dagster._core.definitions.resource_definition import ResourceDefinition +from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.graph.client import DatahubClientConfig +from freezegun import freeze_time +from utils.utils import PytestConfig, check_golden_file from datahub_dagster_plugin.client.dagster_generator import DatahubDagsterSourceConfig from datahub_dagster_plugin.sensors.datahub_sensors import ( @@ -24,12 +33,49 @@ make_datahub_sensor, ) +FROZEN_TIME = "2024-07-11 07:00:00" -@patch("datahub.ingestion.graph.client.DataHubGraph", autospec=True) -@pytest.mark.skip(reason="disabling this test unti it will use proper golden files") +call_num = 0 + + +def make_new_run_id_mock() -> str: + global call_num + call_num += 1 + return f"test_run_id_{call_num}" + + +dagster._core.utils.make_new_run_id = make_new_run_id_mock + + +@patch("datahub_dagster_plugin.sensors.datahub_sensors.DataHubGraph", autospec=True) def test_datahub_sensor(mock_emit): instance = DagsterInstance.ephemeral() - context = build_sensor_context(instance=instance) + + class DummyRepositoryData(RepositoryData): + def __init__(self): + self.sensors = [] + + def get_all_jobs(self) -> Sequence["JobDefinition"]: + return [] + + def get_top_level_resources(self) -> Mapping[str, "ResourceDefinition"]: + """Return all top-level resources in the repository as a list, + such as those provided to the Definitions constructor. + + Returns: + List[ResourceDefinition]: All top-level resources in the repository. + """ + return {} + + def get_env_vars_by_top_level_resource(self) -> Mapping[str, Set[str]]: + return {} + + repository_defintion = RepositoryDefinition( + name="testRepository", repository_data=DummyRepositoryData() + ) + context = build_sensor_context( + instance=instance, repository_def=repository_defintion + ) mock_emit.return_value = Mock() config = DatahubDagsterSourceConfig( @@ -44,9 +90,13 @@ def test_datahub_sensor(mock_emit): assert isinstance(skip_reason, SkipReason) -@patch("datahub_dagster_plugin.sensors.datahub_sensors.DatahubClient", autospec=True) -@pytest.mark.skip(reason="disabling this test unti it will use proper golden files") -def test_emit_metadata(mock_emit): +TEST_UUIDS = ["uuid_{}".format(i) for i in range(10000)] + + +@patch.object(uuid, "uuid4", side_effect=TEST_UUIDS) +@patch("datahub_dagster_plugin.sensors.datahub_sensors.DataHubGraph", autospec=True) +@freeze_time(FROZEN_TIME) +def test_emit_metadata(mock_emit: Mock, pytestconfig: PytestConfig) -> None: mock_emitter = Mock() mock_emit.return_value = mock_emitter @@ -87,7 +137,8 @@ def etl(): transform(extract()) instance = DagsterInstance.ephemeral() - result = etl.execute_in_process(instance=instance) + test_run_id = "12345678123456781234567812345678" + result = etl.execute_in_process(instance=instance, run_id=test_run_id) # retrieve the DagsterRun dagster_run = result.dagster_run @@ -103,201 +154,25 @@ def etl(): dagster_event=dagster_event, ) - DatahubSensors()._emit_metadata(run_status_sensor_context) - - expected_dataflow_urn = ( - f"urn:li:dataFlow:(dagster,{dagster_run.job_name},{DEFAULT_ENV})" - ) - assert mock_emitter.method_calls[1][1][0].aspectName == "dataFlowInfo" - assert mock_emitter.method_calls[1][1][0].entityUrn == expected_dataflow_urn - assert mock_emitter.method_calls[2][1][0].aspectName == "ownership" - assert mock_emitter.method_calls[2][1][0].entityUrn == expected_dataflow_urn - assert mock_emitter.method_calls[3][1][0].aspectName == "globalTags" - assert mock_emitter.method_calls[3][1][0].entityUrn == expected_dataflow_urn - - dpi_id = DataProcessInstanceKey( - cluster=DEFAULT_ENV, - orchestrator="dagster", - id=dagster_run.run_id, - ).guid() - assert ( - mock_emitter.method_calls[7][1][0].aspectName == "dataProcessInstanceProperties" - ) - assert ( - mock_emitter.method_calls[7][1][0].entityUrn - == f"urn:li:dataProcessInstance:{dpi_id}" - ) - assert ( - mock_emitter.method_calls[8][1][0].aspectName - == "dataProcessInstanceRelationships" - ) - assert ( - mock_emitter.method_calls[8][1][0].entityUrn - == f"urn:li:dataProcessInstance:{dpi_id}" - ) - assert ( - mock_emitter.method_calls[9][1][0].aspectName == "dataProcessInstanceRunEvent" - ) - assert ( - mock_emitter.method_calls[9][1][0].entityUrn - == f"urn:li:dataProcessInstance:{dpi_id}" - ) - assert ( - mock_emitter.method_calls[10][1][0].aspectName == "dataProcessInstanceRunEvent" - ) - assert ( - mock_emitter.method_calls[10][1][0].entityUrn - == f"urn:li:dataProcessInstance:{dpi_id}" - ) - assert ( - mock_emitter.method_calls[10][1][0].aspect.result.type - == InstanceRunResult.SUCCESS - ) - assert mock_emitter.method_calls[11][1][0].aspectName == "dataJobInfo" - assert ( - mock_emitter.method_calls[11][1][0].entityUrn - == f"urn:li:dataJob:({expected_dataflow_urn},extract)" - ) - assert mock_emitter.method_calls[12][1][0].aspectName == "dataJobInputOutput" - assert ( - mock_emitter.method_calls[12][1][0].entityUrn - == f"urn:li:dataJob:({expected_dataflow_urn},extract)" - ) - assert mock_emitter.method_calls[13][1][0].aspectName == "status" - assert ( - mock_emitter.method_calls[13][1][0].entityUrn - == "urn:li:dataset:(urn:li:dataPlatform:snowflake,tableB,PROD)" - ) - assert mock_emitter.method_calls[14][1][0].aspectName == "ownership" - assert ( - mock_emitter.method_calls[14][1][0].entityUrn - == f"urn:li:dataJob:({expected_dataflow_urn},extract)" - ) - assert mock_emitter.method_calls[15][1][0].aspectName == "globalTags" - assert ( - mock_emitter.method_calls[15][1][0].entityUrn - == f"urn:li:dataJob:({expected_dataflow_urn},extract)" - ) - dpi_id = DataProcessInstanceKey( - cluster=DEFAULT_ENV, - orchestrator="dagster", - id=f"{dagster_run.run_id}.extract", - ).guid() - assert ( - mock_emitter.method_calls[21][1][0].aspectName - == "dataProcessInstanceProperties" - ) - assert ( - mock_emitter.method_calls[21][1][0].entityUrn - == f"urn:li:dataProcessInstance:{dpi_id}" - ) - assert ( - mock_emitter.method_calls[22][1][0].aspectName - == "dataProcessInstanceRelationships" - ) - assert ( - mock_emitter.method_calls[22][1][0].entityUrn - == f"urn:li:dataProcessInstance:{dpi_id}" - ) - assert mock_emitter.method_calls[23][1][0].aspectName == "dataProcessInstanceOutput" - assert ( - mock_emitter.method_calls[23][1][0].entityUrn - == f"urn:li:dataProcessInstance:{dpi_id}" - ) - assert mock_emitter.method_calls[24][1][0].aspectName == "status" - assert ( - mock_emitter.method_calls[24][1][0].entityUrn - == "urn:li:dataset:(urn:li:dataPlatform:snowflake,tableB,PROD)" - ) - assert ( - mock_emitter.method_calls[25][1][0].aspectName == "dataProcessInstanceRunEvent" - ) - assert ( - mock_emitter.method_calls[25][1][0].entityUrn - == f"urn:li:dataProcessInstance:{dpi_id}" - ) - assert ( - mock_emitter.method_calls[26][1][0].aspectName == "dataProcessInstanceRunEvent" - ) - assert ( - mock_emitter.method_calls[26][1][0].entityUrn - == f"urn:li:dataProcessInstance:{dpi_id}" - ) - assert ( - mock_emitter.method_calls[26][1][0].aspect.result.type - == InstanceRunResult.SUCCESS - ) - assert mock_emitter.method_calls[27][1][0].aspectName == "dataJobInfo" - assert ( - mock_emitter.method_calls[27][1][0].entityUrn - == f"urn:li:dataJob:({expected_dataflow_urn},transform)" - ) - assert mock_emitter.method_calls[28][1][0].aspectName == "dataJobInputOutput" - assert ( - mock_emitter.method_calls[28][1][0].entityUrn - == f"urn:li:dataJob:({expected_dataflow_urn},transform)" - ) - assert mock_emitter.method_calls[29][1][0].aspectName == "status" - assert ( - mock_emitter.method_calls[29][1][0].entityUrn - == "urn:li:dataset:(urn:li:dataPlatform:snowflake,tableA,PROD)" - ) - assert mock_emitter.method_calls[30][1][0].aspectName == "ownership" - assert ( - mock_emitter.method_calls[30][1][0].entityUrn - == f"urn:li:dataJob:({expected_dataflow_urn},transform)" - ) - assert mock_emitter.method_calls[31][1][0].aspectName == "globalTags" - assert ( - mock_emitter.method_calls[31][1][0].entityUrn - == f"urn:li:dataJob:({expected_dataflow_urn},transform)" - ) - dpi_id = DataProcessInstanceKey( - cluster=DEFAULT_ENV, - orchestrator="dagster", - id=f"{dagster_run.run_id}.transform", - ).guid() - assert ( - mock_emitter.method_calls[37][1][0].aspectName - == "dataProcessInstanceProperties" - ) - assert ( - mock_emitter.method_calls[37][1][0].entityUrn - == f"urn:li:dataProcessInstance:{dpi_id}" - ) - assert ( - mock_emitter.method_calls[38][1][0].aspectName - == "dataProcessInstanceRelationships" - ) - assert ( - mock_emitter.method_calls[38][1][0].entityUrn - == f"urn:li:dataProcessInstance:{dpi_id}" - ) - assert mock_emitter.method_calls[39][1][0].aspectName == "dataProcessInstanceInput" - assert ( - mock_emitter.method_calls[39][1][0].entityUrn - == f"urn:li:dataProcessInstance:{dpi_id}" - ) - assert mock_emitter.method_calls[40][1][0].aspectName == "status" - assert ( - mock_emitter.method_calls[40][1][0].entityUrn - == "urn:li:dataset:(urn:li:dataPlatform:snowflake,tableA,PROD)" - ) - assert ( - mock_emitter.method_calls[41][1][0].aspectName == "dataProcessInstanceRunEvent" - ) - assert ( - mock_emitter.method_calls[41][1][0].entityUrn - == f"urn:li:dataProcessInstance:{dpi_id}" - ) - assert ( - mock_emitter.method_calls[42][1][0].aspectName == "dataProcessInstanceRunEvent" - ) - assert ( - mock_emitter.method_calls[42][1][0].entityUrn - == f"urn:li:dataProcessInstance:{dpi_id}" - ) - assert ( - mock_emitter.method_calls[42][1][0].aspect.result.type - == InstanceRunResult.SUCCESS - ) + with tempfile.TemporaryDirectory() as tmp_path: + DatahubSensors()._emit_metadata(run_status_sensor_context) + mcpws: List[Dict] = [] + for mock_call in mock_emitter.method_calls: + if not mock_call.args: + continue + mcpw = mock_call.args[0] + if isinstance(mcpw, MetadataChangeProposalWrapper): + mcpws.append(mcpw.to_obj(simplified_structure=True)) + + with open(f"{tmp_path}/test_emit_metadata_mcps.json", "w") as f: + json_object = json.dumps(mcpws, indent=2) + f.write(json_object) + + check_golden_file( + pytestconfig=pytestconfig, + output_path=pathlib.Path(f"{tmp_path}/test_emit_metadata_mcps.json"), + golden_path=pathlib.Path( + "tests/unit/golden/golden_test_emit_metadata_mcps.json" + ), + ignore_paths=["root[*]['systemMetadata']['created']"], + ) diff --git a/metadata-ingestion-modules/dagster-plugin/tests/unit/utils/__init__.py b/metadata-ingestion-modules/dagster-plugin/tests/unit/utils/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/metadata-ingestion-modules/dagster-plugin/tests/unit/utils/utils.py b/metadata-ingestion-modules/dagster-plugin/tests/unit/utils/utils.py new file mode 100644 index 0000000000000..79e8b758bd0c0 --- /dev/null +++ b/metadata-ingestion-modules/dagster-plugin/tests/unit/utils/utils.py @@ -0,0 +1,30 @@ +import pathlib +from typing import Sequence + +from datahub.testing.compare_metadata_json import assert_metadata_files_equal + +try: + from pytest import Config as PytestConfig # type: ignore[attr-defined] +except ImportError: + # Support for pytest 6.x. + from _pytest.config import Config as PytestConfig # type: ignore + +__all__ = ["PytestConfig"] + + +def check_golden_file( + pytestconfig: PytestConfig, + output_path: pathlib.Path, + golden_path: pathlib.Path, + ignore_paths: Sequence[str] = (), +) -> None: + update_golden = pytestconfig.getoption("--update-golden-files") + + assert_metadata_files_equal( + output_path=output_path, + golden_path=golden_path, + update_golden=update_golden, + copy_output=False, + ignore_paths=ignore_paths, + ignore_order=True, + ) diff --git a/metadata-ingestion/developing.md b/metadata-ingestion/developing.md index 19a18c5275a3b..b713997d8286f 100644 --- a/metadata-ingestion/developing.md +++ b/metadata-ingestion/developing.md @@ -89,7 +89,16 @@ cd metadata-ingestion-modules/gx-plugin source venv/bin/activate datahub version # should print "DataHub CLI version: unavailable (installed in develop mode)" ``` +### (Optional) Set up your Python environment for developing on Dagster Plugin + +From the repository root: +```shell +cd metadata-ingestion-modules/dagster-plugin +../../gradlew :metadata-ingestion-modules:dagster-plugin:installDev +source venv/bin/activate +datahub version # should print "DataHub CLI version: unavailable (installed in develop mode)" +``` ### Common setup issues Common issues (click to expand):