From b33ad0a788b837783411f1539173d91071453fde Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Wed, 30 Oct 2024 17:41:45 +0100 Subject: [PATCH 01/21] feat(ingest/datahub): Add way to filter soft deleted entities (#11738) --- .../ingestion/source/datahub/config.py | 15 +- .../source/datahub/datahub_api_reader.py | 13 +- .../source/datahub/datahub_database_reader.py | 153 ++++++++++++------ .../source/datahub/datahub_kafka_reader.py | 5 + .../source/datahub/datahub_source.py | 42 +++-- .../ingestion/source/datahub/report.py | 2 + .../tests/unit/test_datahub_source.py | 52 +++--- 7 files changed, 190 insertions(+), 92 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py index 9705d63912b8d..a3304334cb1eb 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py @@ -1,5 +1,5 @@ import os -from typing import Optional +from typing import Optional, Set from pydantic import Field, root_validator @@ -35,6 +35,19 @@ class DataHubSourceConfig(StatefulIngestionConfigBase): ), ) + include_soft_deleted_entities: bool = Field( + default=True, + description=( + "If enabled, include entities that have been soft deleted. " + "Otherwise, include all entities regardless of removal status. " + ), + ) + + exclude_aspects: Set[str] = Field( + default_factory=set, + description="Set of aspect names to exclude from ingestion", + ) + database_query_batch_size: int = Field( default=DEFAULT_DATABASE_BATCH_SIZE, description="Number of records to fetch from the database at a time", diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_api_reader.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_api_reader.py index 6986aac0a7757..382a0d548e38d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_api_reader.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_api_reader.py @@ -26,11 +26,17 @@ def __init__( self.report = report self.graph = graph - def get_aspects(self) -> Iterable[MetadataChangeProposalWrapper]: + def get_urns(self) -> Iterable[str]: urns = self.graph.get_urns_by_filter( - status=RemovedStatusFilter.ALL, + status=RemovedStatusFilter.ALL + if self.config.include_soft_deleted_entities + else RemovedStatusFilter.NOT_SOFT_DELETED, batch_size=self.config.database_query_batch_size, ) + return urns + + def get_aspects(self) -> Iterable[MetadataChangeProposalWrapper]: + urns = self.get_urns() tasks: List[futures.Future[Iterable[MetadataChangeProposalWrapper]]] = [] with futures.ThreadPoolExecutor( max_workers=self.config.max_workers @@ -43,6 +49,9 @@ def get_aspects(self) -> Iterable[MetadataChangeProposalWrapper]: def _get_aspects_for_urn(self, urn: str) -> Iterable[MetadataChangeProposalWrapper]: aspects: Dict[str, _Aspect] = self.graph.get_entity_semityped(urn) # type: ignore for aspect in aspects.values(): + if aspect.get_aspect_name().lower() in self.config.exclude_aspects: + continue + yield MetadataChangeProposalWrapper( entityUrn=urn, aspect=aspect, diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_database_reader.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_database_reader.py index e4f1bb275487e..faa281097de4c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_database_reader.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_database_reader.py @@ -1,11 +1,10 @@ +import contextlib import json import logging from datetime import datetime -from typing import Any, Generic, Iterable, List, Optional, Tuple, TypeVar +from typing import Any, Dict, Generic, Iterable, List, Optional, Tuple, TypeVar from sqlalchemy import create_engine -from sqlalchemy.engine import Row -from typing_extensions import Protocol from datahub.emitter.aspect import ASPECT_MAP from datahub.emitter.mcp import MetadataChangeProposalWrapper @@ -21,13 +20,7 @@ # Should work for at least mysql, mariadb, postgres DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S.%f" - -class VersionOrderable(Protocol): - createdon: Any # Should restrict to only orderable types - version: int - - -ROW = TypeVar("ROW", bound=VersionOrderable) +ROW = TypeVar("ROW", bound=Dict[str, Any]) class VersionOrderer(Generic[ROW]): @@ -54,14 +47,14 @@ def _process_row(self, row: ROW) -> Iterable[ROW]: return yield from self._attempt_queue_flush(row) - if row.version == 0: + if row["version"] == 0: self._add_to_queue(row) else: yield row def _add_to_queue(self, row: ROW) -> None: if self.queue is None: - self.queue = (row.createdon, [row]) + self.queue = (row["createdon"], [row]) else: self.queue[1].append(row) @@ -69,7 +62,7 @@ def _attempt_queue_flush(self, row: ROW) -> Iterable[ROW]: if self.queue is None: return - if row.createdon > self.queue[0]: + if row["createdon"] > self.queue[0]: yield from self._flush_queue() def _flush_queue(self) -> Iterable[ROW]: @@ -92,6 +85,21 @@ def __init__( **connection_config.options, ) + @property + def soft_deleted_urns_query(self) -> str: + return f""" + SELECT DISTINCT mav.urn + FROM {self.engine.dialect.identifier_preparer.quote(self.config.database_table_name)} as mav + JOIN ( + SELECT *, + JSON_EXTRACT(metadata, '$.removed') as removed + FROM {self.engine.dialect.identifier_preparer.quote(self.config.database_table_name)} + WHERE aspect = "status" AND version = 0 + ) as sd ON sd.urn = mav.urn + WHERE sd.removed = true + ORDER BY mav.urn + """ + @property def query(self) -> str: # May repeat rows for the same date @@ -101,66 +109,117 @@ def query(self) -> str: # Relies on createdon order to reflect version order # Ordering of entries with the same createdon is handled by VersionOrderer return f""" - SELECT urn, aspect, metadata, systemmetadata, createdon, version - FROM {self.engine.dialect.identifier_preparer.quote(self.config.database_table_name)} - WHERE createdon >= %(since_createdon)s - {"" if self.config.include_all_versions else "AND version = 0"} - ORDER BY createdon, urn, aspect, version - LIMIT %(limit)s - OFFSET %(offset)s + SELECT * + FROM ( + SELECT + mav.urn, + mav.aspect, + mav.metadata, + mav.systemmetadata, + mav.createdon, + mav.version, + removed + FROM {self.engine.dialect.identifier_preparer.quote(self.config.database_table_name)} as mav + LEFT JOIN ( + SELECT + *, + JSON_EXTRACT(metadata, '$.removed') as removed + FROM {self.engine.dialect.identifier_preparer.quote(self.config.database_table_name)} + WHERE aspect = 'status' + AND version = 0 + ) as sd ON sd.urn = mav.urn + WHERE 1 = 1 + {"" if self.config.include_all_versions else "AND mav.version = 0"} + {"" if not self.config.exclude_aspects else "AND mav.aspect NOT IN %(exclude_aspects)s"} + AND mav.createdon >= %(since_createdon)s + ORDER BY + createdon, + urn, + aspect, + version + ) as t + WHERE 1=1 + {"" if self.config.include_soft_deleted_entities else "AND (removed = false or removed is NULL)"} + ORDER BY + createdon, + urn, + aspect, + version """ def get_aspects( self, from_createdon: datetime, stop_time: datetime ) -> Iterable[Tuple[MetadataChangeProposalWrapper, datetime]]: - orderer = VersionOrderer[Row](enabled=self.config.include_all_versions) + orderer = VersionOrderer[Dict[str, Any]]( + enabled=self.config.include_all_versions + ) rows = self._get_rows(from_createdon=from_createdon, stop_time=stop_time) for row in orderer(rows): mcp = self._parse_row(row) if mcp: - yield mcp, row.createdon + yield mcp, row["createdon"] - def _get_rows(self, from_createdon: datetime, stop_time: datetime) -> Iterable[Row]: + def _get_rows( + self, from_createdon: datetime, stop_time: datetime + ) -> Iterable[Dict[str, Any]]: with self.engine.connect() as conn: - ts = from_createdon - offset = 0 - while ts.timestamp() <= stop_time.timestamp(): - logger.debug(f"Polling database aspects from {ts}") - rows = conn.execute( + with contextlib.closing(conn.connection.cursor()) as cursor: + cursor.execute( self.query, - since_createdon=ts.strftime(DATETIME_FORMAT), - limit=self.config.database_query_batch_size, - offset=offset, + { + "exclude_aspects": list(self.config.exclude_aspects), + "since_createdon": from_createdon.strftime(DATETIME_FORMAT), + }, ) - if not rows.rowcount: - return - for i, row in enumerate(rows): - yield row + columns = [desc[0] for desc in cursor.description] + while True: + rows = cursor.fetchmany(self.config.database_query_batch_size) + if not rows: + return + for row in rows: + yield dict(zip(columns, row)) - if ts == row.createdon: - offset += i + 1 - else: - ts = row.createdon - offset = 0 + def get_soft_deleted_rows(self) -> Iterable[Dict[str, Any]]: + """ + Fetches all soft-deleted entities from the database. - def _parse_row(self, row: Row) -> Optional[MetadataChangeProposalWrapper]: + Yields: + Row objects containing URNs of soft-deleted entities + """ + with self.engine.connect() as conn: + with contextlib.closing(conn.connection.cursor()) as cursor: + logger.debug("Polling soft-deleted urns from database") + cursor.execute(self.soft_deleted_urns_query) + columns = [desc[0] for desc in cursor.description] + while True: + rows = cursor.fetchmany(self.config.database_query_batch_size) + if not rows: + return + for row in rows: + yield dict(zip(columns, row)) + + def _parse_row( + self, row: Dict[str, Any] + ) -> Optional[MetadataChangeProposalWrapper]: try: - json_aspect = post_json_transform(json.loads(row.metadata)) - json_metadata = post_json_transform(json.loads(row.systemmetadata or "{}")) + json_aspect = post_json_transform(json.loads(row["metadata"])) + json_metadata = post_json_transform( + json.loads(row["systemmetadata"] or "{}") + ) system_metadata = SystemMetadataClass.from_obj(json_metadata) return MetadataChangeProposalWrapper( - entityUrn=row.urn, - aspect=ASPECT_MAP[row.aspect].from_obj(json_aspect), + entityUrn=row["urn"], + aspect=ASPECT_MAP[row["aspect"]].from_obj(json_aspect), systemMetadata=system_metadata, changeType=ChangeTypeClass.UPSERT, ) except Exception as e: logger.warning( - f"Failed to parse metadata for {row.urn}: {e}", exc_info=True + f'Failed to parse metadata for {row["urn"]}: {e}', exc_info=True ) self.report.num_database_parse_errors += 1 self.report.database_parse_errors.setdefault( str(e), LossyDict() - ).setdefault(row.aspect, LossyList()).append(row.urn) + ).setdefault(row["aspect"], LossyList()).append(row["urn"]) return None diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_kafka_reader.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_kafka_reader.py index d9e53e87c2cea..56a3d55abb184 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_kafka_reader.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_kafka_reader.py @@ -36,6 +36,7 @@ def __init__( self.connection_config = connection_config self.report = report self.group_id = f"{KAFKA_GROUP_PREFIX}-{ctx.pipeline_name}" + self.ctx = ctx def __enter__(self) -> "DataHubKafkaReader": self.consumer = DeserializingConsumer( @@ -95,6 +96,10 @@ def _poll_partition( ) break + if mcl.aspectName and mcl.aspectName in self.config.exclude_aspects: + self.report.num_kafka_excluded_aspects += 1 + continue + # TODO: Consider storing state in kafka instead, via consumer.commit() yield mcl, PartitionOffset(partition=msg.partition(), offset=msg.offset()) diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py index 0204a864e2b9e..de212ca9a6771 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py @@ -62,13 +62,18 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: self.report.stop_time = datetime.now(tz=timezone.utc) logger.info(f"Ingesting DataHub metadata up until {self.report.stop_time}") state = self.stateful_ingestion_handler.get_last_run_state() + database_reader: Optional[DataHubDatabaseReader] = None if self.config.pull_from_datahub_api: yield from self._get_api_workunits() if self.config.database_connection is not None: + database_reader = DataHubDatabaseReader( + self.config, self.config.database_connection, self.report + ) + yield from self._get_database_workunits( - from_createdon=state.database_createdon_datetime + from_createdon=state.database_createdon_datetime, reader=database_reader ) self._commit_progress() else: @@ -77,7 +82,19 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: ) if self.config.kafka_connection is not None: - yield from self._get_kafka_workunits(from_offsets=state.kafka_offsets) + soft_deleted_urns = [] + if not self.config.include_soft_deleted_entities: + if database_reader is None: + raise ValueError( + "Cannot exclude soft deleted entities without a database connection" + ) + soft_deleted_urns = [ + row["urn"] for row in database_reader.get_soft_deleted_rows() + ] + + yield from self._get_kafka_workunits( + from_offsets=state.kafka_offsets, soft_deleted_urns=soft_deleted_urns + ) self._commit_progress() else: logger.info( @@ -85,15 +102,9 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: ) def _get_database_workunits( - self, from_createdon: datetime + self, from_createdon: datetime, reader: DataHubDatabaseReader ) -> Iterable[MetadataWorkUnit]: - if self.config.database_connection is None: - return - logger.info(f"Fetching database aspects starting from {from_createdon}") - reader = DataHubDatabaseReader( - self.config, self.config.database_connection, self.report - ) mcps = reader.get_aspects(from_createdon, self.report.stop_time) for i, (mcp, createdon) in enumerate(mcps): @@ -113,20 +124,29 @@ def _get_database_workunits( self._commit_progress(i) def _get_kafka_workunits( - self, from_offsets: Dict[int, int] + self, from_offsets: Dict[int, int], soft_deleted_urns: List[str] = [] ) -> Iterable[MetadataWorkUnit]: if self.config.kafka_connection is None: return logger.info("Fetching timeseries aspects from kafka") with DataHubKafkaReader( - self.config, self.config.kafka_connection, self.report, self.ctx + self.config, + self.config.kafka_connection, + self.report, + self.ctx, ) as reader: mcls = reader.get_mcls( from_offsets=from_offsets, stop_time=self.report.stop_time ) for i, (mcl, offset) in enumerate(mcls): mcp = MetadataChangeProposalWrapper.try_from_mcl(mcl) + if mcp.entityUrn in soft_deleted_urns: + self.report.num_timeseries_soft_deleted_aspects_dropped += 1 + logger.debug( + f"Dropping soft-deleted aspect of {mcp.aspectName} on {mcp.entityUrn}" + ) + continue if mcp.changeType == ChangeTypeClass.DELETE: self.report.num_timeseries_deletions_dropped += 1 logger.debug( diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/report.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/report.py index 73e5a798a1553..721fc87989442 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/report.py @@ -20,6 +20,8 @@ class DataHubSourceReport(StatefulIngestionReport): num_kafka_aspects_ingested: int = 0 num_kafka_parse_errors: int = 0 + num_kafka_excluded_aspects: int = 0 kafka_parse_errors: LossyDict[str, int] = field(default_factory=LossyDict) num_timeseries_deletions_dropped: int = 0 + num_timeseries_soft_deleted_aspects_dropped: int = 0 diff --git a/metadata-ingestion/tests/unit/test_datahub_source.py b/metadata-ingestion/tests/unit/test_datahub_source.py index adc131362b326..67b2b85d9af6d 100644 --- a/metadata-ingestion/tests/unit/test_datahub_source.py +++ b/metadata-ingestion/tests/unit/test_datahub_source.py @@ -1,51 +1,41 @@ -from dataclasses import dataclass +from typing import Any, Dict import pytest -from datahub.ingestion.source.datahub.datahub_database_reader import ( - VersionOrderable, - VersionOrderer, -) - - -@dataclass -class MockRow(VersionOrderable): - createdon: int - version: int - urn: str +from datahub.ingestion.source.datahub.datahub_database_reader import VersionOrderer @pytest.fixture def rows(): return [ - MockRow(0, 0, "one"), - MockRow(0, 1, "one"), - MockRow(0, 0, "two"), - MockRow(0, 0, "three"), - MockRow(0, 1, "three"), - MockRow(0, 2, "three"), - MockRow(0, 1, "two"), - MockRow(0, 4, "three"), - MockRow(0, 5, "three"), - MockRow(1, 6, "three"), - MockRow(1, 0, "four"), - MockRow(2, 0, "five"), - MockRow(2, 1, "six"), - MockRow(2, 0, "six"), - MockRow(3, 0, "seven"), - MockRow(3, 0, "eight"), + {"createdon": 0, "version": 0, "urn": "one"}, + {"createdon": 0, "version": 1, "urn": "one"}, + {"createdon": 0, "version": 0, "urn": "two"}, + {"createdon": 0, "version": 0, "urn": "three"}, + {"createdon": 0, "version": 1, "urn": "three"}, + {"createdon": 0, "version": 2, "urn": "three"}, + {"createdon": 0, "version": 1, "urn": "two"}, + {"createdon": 0, "version": 4, "urn": "three"}, + {"createdon": 0, "version": 5, "urn": "three"}, + {"createdon": 1, "version": 6, "urn": "three"}, + {"createdon": 1, "version": 0, "urn": "four"}, + {"createdon": 2, "version": 0, "urn": "five"}, + {"createdon": 2, "version": 1, "urn": "six"}, + {"createdon": 2, "version": 0, "urn": "six"}, + {"createdon": 3, "version": 0, "urn": "seven"}, + {"createdon": 3, "version": 0, "urn": "eight"}, ] def test_version_orderer(rows): - orderer = VersionOrderer[MockRow](enabled=True) + orderer = VersionOrderer[Dict[str, Any]](enabled=True) ordered_rows = list(orderer(rows)) assert ordered_rows == sorted( - ordered_rows, key=lambda x: (x.createdon, x.version == 0) + ordered_rows, key=lambda x: (x["createdon"], x["version"] == 0) ) def test_version_orderer_disabled(rows): - orderer = VersionOrderer[MockRow](enabled=False) + orderer = VersionOrderer[Dict[str, Any]](enabled=False) ordered_rows = list(orderer(rows)) assert ordered_rows == rows From 93f76def1f9e693e735fff2c88133cae080cdd09 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 30 Oct 2024 10:38:33 -0700 Subject: [PATCH 02/21] fix(ingest): pin teradata dep (#11760) --- metadata-ingestion/setup.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index aa6dcaeeff039..db273c6e3ce46 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -484,7 +484,11 @@ "teradata": sql_common | usage_common | sqlglot_lib - | {"teradatasqlalchemy>=17.20.0.0"}, + | { + # On 2024-10-30, teradatasqlalchemy 20.0.0.2 was released. This version seemed to cause issues + # in our CI, so we're pinning the version for now. + "teradatasqlalchemy>=17.20.0.0,<=20.0.0.2", + }, "trino": sql_common | trino, "starburst-trino-usage": sql_common | usage_common | trino, "nifi": {"requests", "packaging", "requests-gssapi"}, From 799c4520567f87a4c938d86ed1bd6d1cb35f2c00 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 30 Oct 2024 11:36:38 -0700 Subject: [PATCH 03/21] fix(ingest): reduce asyncio in check_upgrade (#11734) --- .../src/datahub/cli/ingest_cli.py | 55 +++++---------- .../src/datahub/upgrade/upgrade.py | 70 +++++++++++++------ 2 files changed, 67 insertions(+), 58 deletions(-) diff --git a/metadata-ingestion/src/datahub/cli/ingest_cli.py b/metadata-ingestion/src/datahub/cli/ingest_cli.py index 8654e6c282ba8..51f095751f7dd 100644 --- a/metadata-ingestion/src/datahub/cli/ingest_cli.py +++ b/metadata-ingestion/src/datahub/cli/ingest_cli.py @@ -1,4 +1,3 @@ -import asyncio import csv import json import logging @@ -24,6 +23,7 @@ from datahub.ingestion.run.pipeline import Pipeline from datahub.telemetry import telemetry from datahub.upgrade import upgrade +from datahub.utilities.perf_timer import PerfTimer logger = logging.getLogger(__name__) @@ -126,7 +126,7 @@ def run( ) -> None: """Ingest metadata into DataHub.""" - async def run_pipeline_to_completion(pipeline: Pipeline) -> int: + def run_pipeline_to_completion(pipeline: Pipeline) -> int: logger.info("Starting metadata ingestion") with click_spinner.spinner(disable=no_spinner or no_progress): try: @@ -166,44 +166,25 @@ async def run_pipeline_to_completion(pipeline: Pipeline) -> int: # The default is "datahub" reporting. The extra flag will disable it. report_to = None - async def run_ingestion_and_check_upgrade() -> int: - # TRICKY: We want to make sure that the Pipeline.create() call happens on the - # same thread as the rest of the ingestion. As such, we must initialize the - # pipeline inside the async function so that it happens on the same event - # loop, and hence the same thread. - - # logger.debug(f"Using config: {pipeline_config}") - pipeline = Pipeline.create( - pipeline_config, - dry_run=dry_run, - preview_mode=preview, - preview_workunits=preview_workunits, - report_to=report_to, - no_progress=no_progress, - raw_config=raw_pipeline_config, - ) + # logger.debug(f"Using config: {pipeline_config}") + pipeline = Pipeline.create( + pipeline_config, + dry_run=dry_run, + preview_mode=preview, + preview_workunits=preview_workunits, + report_to=report_to, + no_progress=no_progress, + raw_config=raw_pipeline_config, + ) + with PerfTimer() as timer: + ret = run_pipeline_to_completion(pipeline) - version_stats_future = asyncio.ensure_future( - upgrade.retrieve_version_stats(pipeline.ctx.graph) + # The main ingestion has completed. If it was successful, potentially show an upgrade nudge message. + if ret == 0: + upgrade.check_upgrade_post( + main_method_runtime=timer.elapsed_seconds(), graph=pipeline.ctx.graph ) - ingestion_future = asyncio.ensure_future(run_pipeline_to_completion(pipeline)) - ret = await ingestion_future - - # The main ingestion has completed. If it was successful, potentially show an upgrade nudge message. - if ret == 0: - try: - # we check the other futures quickly on success - version_stats = await asyncio.wait_for(version_stats_future, 0.5) - upgrade.maybe_print_upgrade_message(version_stats=version_stats) - except Exception as e: - logger.debug( - f"timed out with {e} waiting for version stats to be computed... skipping ahead." - ) - - return ret - loop = asyncio.get_event_loop() - ret = loop.run_until_complete(run_ingestion_and_check_upgrade()) if ret: sys.exit(ret) # don't raise SystemExit if there's no error diff --git a/metadata-ingestion/src/datahub/upgrade/upgrade.py b/metadata-ingestion/src/datahub/upgrade/upgrade.py index dd2829ba0d236..fb14514588e5f 100644 --- a/metadata-ingestion/src/datahub/upgrade/upgrade.py +++ b/metadata-ingestion/src/datahub/upgrade/upgrade.py @@ -13,6 +13,7 @@ from datahub import __version__ from datahub.cli.config_utils import load_client_config from datahub.ingestion.graph.client import DataHubGraph +from datahub.utilities.perf_timer import PerfTimer log = logging.getLogger(__name__) @@ -113,7 +114,7 @@ async def get_server_config(gms_url: str, token: Optional[str]) -> dict: async with aiohttp.ClientSession() as session: config_endpoint = f"{gms_url}/config" - async with session.get(config_endpoint) as dh_response: + async with session.get(config_endpoint, headers=headers) as dh_response: dh_response_json = await dh_response.json() return dh_response_json @@ -167,7 +168,28 @@ async def get_server_version_stats( return (server_type, server_version, current_server_release_date) -async def retrieve_version_stats( +def retrieve_version_stats( + timeout: float, graph: Optional[DataHubGraph] = None +) -> Optional[DataHubVersionStats]: + version_stats: Optional[DataHubVersionStats] = None + + async def _get_version_with_timeout() -> None: + # TODO: Once we're on Python 3.11+, replace with asyncio.timeout. + stats_future = _retrieve_version_stats(graph) + + try: + nonlocal version_stats + version_stats = await asyncio.wait_for(stats_future, timeout=timeout) + except asyncio.TimeoutError: + log.debug("Timed out while fetching version stats") + + loop = asyncio.get_event_loop() + loop.run_until_complete(_get_version_with_timeout()) + + return version_stats + + +async def _retrieve_version_stats( server: Optional[DataHubGraph] = None, ) -> Optional[DataHubVersionStats]: try: @@ -263,7 +285,7 @@ def is_client_server_compatible(client: VersionStats, server: VersionStats) -> i return server.version.micro - client.version.micro -def maybe_print_upgrade_message( # noqa: C901 +def _maybe_print_upgrade_message( # noqa: C901 version_stats: Optional[DataHubVersionStats], ) -> None: # noqa: C901 days_before_cli_stale = 7 @@ -378,28 +400,34 @@ def maybe_print_upgrade_message( # noqa: C901 pass +def clip(val: float, min_val: float, max_val: float) -> float: + return max(min_val, min(val, max_val)) + + +def check_upgrade_post( + main_method_runtime: float, + graph: Optional[DataHubGraph] = None, +) -> None: + # Guarantees: this method will not throw, and will not block for more than 3 seconds. + + version_stats_timeout = clip(main_method_runtime / 10, 0.7, 3.0) + try: + version_stats = retrieve_version_stats( + timeout=version_stats_timeout, graph=graph + ) + _maybe_print_upgrade_message(version_stats=version_stats) + except Exception as e: + log.debug(f"Failed to check for upgrades due to {e}") + + def check_upgrade(func: Callable[..., T]) -> Callable[..., T]: @wraps(func) def async_wrapper(*args: Any, **kwargs: Any) -> Any: - async def run_inner_func(): - return func(*args, **kwargs) - - async def run_func_check_upgrade(): - version_stats_future = asyncio.ensure_future(retrieve_version_stats()) - main_func_future = asyncio.ensure_future(run_inner_func()) - ret = await main_func_future - - # the main future has returned - # we check the other futures quickly - try: - version_stats = await asyncio.wait_for(version_stats_future, 0.5) - maybe_print_upgrade_message(version_stats=version_stats) - except Exception: - log.debug("timed out waiting for version stats to be computed") + with PerfTimer() as timer: + ret = func(*args, **kwargs) - return ret + check_upgrade_post(main_method_runtime=timer.elapsed_seconds()) - loop = asyncio.get_event_loop() - loop.run_until_complete(run_func_check_upgrade()) + return ret return async_wrapper From 143fc011fa41734f0aefb17f449a78461db68205 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 30 Oct 2024 11:40:45 -0700 Subject: [PATCH 04/21] feat(ingest/powerbi): add timeouts for m-query parsing (#11753) --- metadata-ingestion/setup.py | 13 ++++- .../source/bigquery_v2/bigquery_schema.py | 4 +- .../ingestion/source/powerbi/config.py | 19 ++++--- .../source/powerbi/m_query/parser.py | 24 ++++++-- .../source/powerbi/m_query/resolver.py | 5 +- .../ingestion/source/powerbi/powerbi.py | 56 ++++++++++++------- .../powerbi/rest_api_wrapper/data_classes.py | 3 + .../powerbi/rest_api_wrapper/data_resolver.py | 6 +- .../powerbi/rest_api_wrapper/powerbi_api.py | 34 +++++++---- .../datahub/utilities/threading_timeout.py | 42 ++++++++++++++ .../powerbi/golden_test_container.json | 10 +++- ..._config_and_modified_since_admin_only.json | 5 +- .../powerbi/golden_test_personal_ingest.json | 5 +- .../unit/utilities/test_threading_timeout.py | 31 ++++++++++ 14 files changed, 201 insertions(+), 56 deletions(-) create mode 100644 metadata-ingestion/src/datahub/utilities/threading_timeout.py create mode 100644 metadata-ingestion/tests/unit/utilities/test_threading_timeout.py diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index db273c6e3ce46..606c2b89303b7 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -276,6 +276,10 @@ *path_spec_common, } +threading_timeout_common = { + "stopit==1.1.2", +} + abs_base = { "azure-core==1.29.4", "azure-identity>=1.17.1", @@ -492,7 +496,14 @@ "trino": sql_common | trino, "starburst-trino-usage": sql_common | usage_common | trino, "nifi": {"requests", "packaging", "requests-gssapi"}, - "powerbi": microsoft_common | {"lark[regex]==1.1.4", "sqlparse"} | sqlglot_lib, + "powerbi": ( + ( + microsoft_common + | {"lark[regex]==1.1.4", "sqlparse", "more-itertools"} + | sqlglot_lib + | threading_timeout_common + ) + ), "powerbi-report-server": powerbi_report_server, "vertica": sql_common | {"vertica-sqlalchemy-dialect[vertica-python]==0.0.8.2"}, "unity-catalog": databricks | sql_common | sqllineage_lib, diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index 4f18c22c108a6..58317b108bef4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -175,7 +175,7 @@ def __init__( def get_query_result(self, query: str) -> RowIterator: def _should_retry(exc: BaseException) -> bool: - logger.debug(f"Exception occured for job query. Reason: {exc}") + logger.debug(f"Exception occurred for job query. Reason: {exc}") # Jobs sometimes fail with transient errors. # This is not currently handled by the python-bigquery client. # https://github.com/googleapis/python-bigquery/issues/23 @@ -197,7 +197,7 @@ def _should_retry(exc: BaseException) -> bool: def get_projects(self, max_results_per_page: int = 100) -> List[BigqueryProject]: def _should_retry(exc: BaseException) -> bool: logger.debug( - f"Exception occured for project.list api. Reason: {exc}. Retrying api request..." + f"Exception occurred for project.list api. Reason: {exc}. Retrying api request..." ) self.report.num_list_projects_retry_request += 1 return True diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py index 8a3f8ed6131a2..0aec9a589cf27 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py @@ -19,6 +19,7 @@ from datahub.ingestion.source.state.stateful_ingestion_base import ( StatefulIngestionConfigBase, ) +from datahub.utilities.lossy_collections import LossyList logger = logging.getLogger(__name__) @@ -176,11 +177,18 @@ class SupportedDataPlatform(Enum): @dataclass class PowerBiDashboardSourceReport(StaleEntityRemovalSourceReport): + all_workspace_count: int = 0 + filtered_workspace_names: LossyList[str] = dataclass_field( + default_factory=LossyList + ) + filtered_workspace_types: LossyList[str] = dataclass_field( + default_factory=LossyList + ) + dashboards_scanned: int = 0 charts_scanned: int = 0 filtered_dashboards: List[str] = dataclass_field(default_factory=list) filtered_charts: List[str] = dataclass_field(default_factory=list) - number_of_workspaces: int = 0 def report_dashboards_scanned(self, count: int = 1) -> None: self.dashboards_scanned += count @@ -194,9 +202,6 @@ def report_dashboards_dropped(self, model: str) -> None: def report_charts_dropped(self, view: str) -> None: self.filtered_charts.append(view) - def report_number_of_workspaces(self, number_of_workspaces: int) -> None: - self.number_of_workspaces = number_of_workspaces - def default_for_dataset_type_mapping() -> Dict[str, str]: dict_: dict = {} @@ -331,7 +336,7 @@ class PowerBiDashboardSourceConfig( ) workspace_id_as_urn_part: bool = pydantic.Field( default=False, - description="It is recommended to set this to True only if you have legacy workspaces based on Office 365 groups, as those workspaces can have identical names." + description="It is recommended to set this to True only if you have legacy workspaces based on Office 365 groups, as those workspaces can have identical names. " "To maintain backward compatibility, this is set to False which uses workspace name", ) # Enable/Disable extracting ownership information of Dashboard @@ -371,8 +376,8 @@ class PowerBiDashboardSourceConfig( # any existing tags defined to those entities extract_endorsements_to_tags: bool = pydantic.Field( default=False, - description="Whether to extract endorsements to tags, note that this may overwrite existing tags. Admin API " - "access is required is this setting is enabled", + description="Whether to extract endorsements to tags, note that this may overwrite existing tags. " + "Admin API access is required if this setting is enabled.", ) filter_dataset_endorsements: AllowDenyPattern = pydantic.Field( default=AllowDenyPattern.allow_all(), diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py index 3edaaed2ff814..086ce2c263b0c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py @@ -1,6 +1,7 @@ import functools import importlib.resources as pkg_resource import logging +import os from typing import Dict, List import lark @@ -19,9 +20,12 @@ TRACE_POWERBI_MQUERY_PARSER, ) from datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes import Table +from datahub.utilities.threading_timeout import TimeoutException, threading_timeout logger = logging.getLogger(__name__) +_M_QUERY_PARSE_TIMEOUT = int(os.getenv("DATAHUB_POWERBI_M_QUERY_PARSE_TIMEOUT", 60)) + @functools.lru_cache(maxsize=1) def get_lark_parser() -> Lark: @@ -41,7 +45,8 @@ def _parse_expression(expression: str) -> Tree: expression = expression.replace("\u00a0", " ") logger.debug(f"Parsing expression = {expression}") - parse_tree: Tree = lark_parser.parse(expression) + with threading_timeout(_M_QUERY_PARSE_TIMEOUT): + parse_tree: Tree = lark_parser.parse(expression) if TRACE_POWERBI_MQUERY_PARSER: logger.debug(parse_tree.pretty()) @@ -83,17 +88,26 @@ def get_upstream_tables( context=f"table-full-name={table.full_name}, expression={table.expression}, message={message}", ) return [] + except KeyboardInterrupt: + raise + except TimeoutException: + reporter.warning( + title="M-Query Parsing Timeout", + message=f"M-Query parsing timed out after {_M_QUERY_PARSE_TIMEOUT} seconds. Lineage for this table will not be extracted.", + context=f"table-full-name={table.full_name}, expression={table.expression}", + ) + return [] except ( BaseException ) as e: # TODO: Debug why BaseException is needed here and below. if isinstance(e, lark.exceptions.UnexpectedCharacters): - title = "Unexpected Character Found" + error_type = "Unexpected Character Error" else: - title = "Unknown Parsing Error" + error_type = "Unknown Parsing Error" reporter.warning( - title=title, - message="Unknown parsing error", + title="Unable to extract lineage from M-Query expression", + message=f"Got an '{error_type}' while parsing the expression. Lineage will be missing for this table.", context=f"table-full-name={table.full_name}, expression={table.expression}", exc=e, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py index 20fb0b5facbbc..a5fb6fd2673ac 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py @@ -473,8 +473,9 @@ def internal( ) if v_statement is None: self.reporter.report_warning( - f"{self.table.full_name}-variable-statement", - f"output variable ({current_identifier}) statement not found in table expression", + title="Unable to extract lineage from M-Query expression", + message="Lineage will be incomplete.", + context=f"table-full-name={self.table.full_name}: output-variable={current_identifier} not found in table expression", ) return None diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py index 72336afbaacd0..b2afdc3e40931 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py @@ -7,6 +7,8 @@ from datetime import datetime from typing import Iterable, List, Optional, Tuple, Union +import more_itertools + import datahub.emitter.mce_builder as builder import datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes as powerbi_data_classes from datahub.emitter.mcp import MetadataChangeProposalWrapper @@ -795,6 +797,11 @@ def generate_container_for_workspace( container_key=self.workspace_key, name=workspace.name, sub_types=[workspace.type], + extra_properties={ + "workspace_id": workspace.id, + "workspace_name": workspace.name, + "workspace_type": workspace.type, + }, ) return container_work_units @@ -1256,20 +1263,33 @@ def create(cls, config_dict, ctx): def get_allowed_workspaces(self) -> List[powerbi_data_classes.Workspace]: all_workspaces = self.powerbi_client.get_workspaces() + logger.info(f"Number of workspaces = {len(all_workspaces)}") + self.reporter.all_workspace_count = len(all_workspaces) + logger.debug( + f"All workspaces: {[workspace.format_name_for_logger() for workspace in all_workspaces]}" + ) - allowed_wrk = [ - workspace - for workspace in all_workspaces - if self.source_config.workspace_id_pattern.allowed(workspace.id) - and workspace.type in self.source_config.workspace_type_filter - ] + allowed_workspaces = [] + for workspace in all_workspaces: + if not self.source_config.workspace_id_pattern.allowed(workspace.id): + self.reporter.filtered_workspace_names.append( + f"{workspace.id} - {workspace.name}" + ) + continue + elif workspace.type not in self.source_config.workspace_type_filter: + self.reporter.filtered_workspace_types.append( + f"{workspace.id} - {workspace.name} (type = {workspace.type})" + ) + continue + else: + allowed_workspaces.append(workspace) - logger.info(f"Number of workspaces = {len(all_workspaces)}") - self.reporter.report_number_of_workspaces(len(all_workspaces)) - logger.info(f"Number of allowed workspaces = {len(allowed_wrk)}") - logger.debug(f"Workspaces = {all_workspaces}") + logger.info(f"Number of allowed workspaces = {len(allowed_workspaces)}") + logger.debug( + f"Allowed workspaces: {[workspace.format_name_for_logger() for workspace in allowed_workspaces]}" + ) - return allowed_wrk + return allowed_workspaces def validate_dataset_type_mapping(self): powerbi_data_platforms: List[str] = [ @@ -1480,16 +1500,10 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: # Fetch PowerBi workspace for given workspace identifier allowed_workspaces = self.get_allowed_workspaces() - workspaces_len = len(allowed_workspaces) - - batch_size = ( - self.source_config.scan_batch_size - ) # 100 is the maximum allowed for powerbi scan - num_batches = (workspaces_len + batch_size - 1) // batch_size - batches = [ - allowed_workspaces[i * batch_size : (i + 1) * batch_size] - for i in range(num_batches) - ] + + batches = more_itertools.chunked( + allowed_workspaces, self.source_config.scan_batch_size + ) for batch_workspaces in batches: for workspace in self.powerbi_client.fill_workspaces( batch_workspaces, self.reporter diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_classes.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_classes.py index 9407ef7a51b58..fc5cd76458a51 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_classes.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_classes.py @@ -96,6 +96,9 @@ def get_workspace_key( instance=platform_instance, ) + def format_name_for_logger(self) -> str: + return f"{self.name} ({self.id})" + @dataclass class DataSource: diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py index f8fff2391d10b..7a47c40976bec 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py @@ -811,7 +811,7 @@ def _is_scan_result_ready( res.raise_for_status() if res.json()[Constant.STATUS].upper() == Constant.SUCCEEDED: - logger.info(f"Scan result is available for scan id({scan_id})") + logger.debug(f"Scan result is available for scan id({scan_id})") return True if retry == max_retry: @@ -898,8 +898,8 @@ def get_users(self, workspace_id: str, entity: str, entity_id: str) -> List[User return users def get_scan_result(self, scan_id: str) -> Optional[dict]: - logger.info("Fetching scan result") - logger.info(f"{Constant.SCAN_ID}={scan_id}") + logger.debug("Fetching scan result") + logger.debug(f"{Constant.SCAN_ID}={scan_id}") scan_result_get_endpoint = AdminAPIResolver.API_ENDPOINTS[ Constant.SCAN_RESULT_GET ] diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_api.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_api.py index b67f257d9eb5b..e137f175c15ad 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_api.py @@ -303,7 +303,7 @@ def _get_scan_result(self, workspace_ids: List[str]) -> Any: ) return None - logger.info("Waiting for scan to complete") + logger.debug("Waiting for scan to complete") if ( self.__admin_api_resolver.wait_for_scan_to_complete( scan_id=scan_id, timeout=self.__config.scan_timeout @@ -355,22 +355,32 @@ def _get_workspace_datasets(self, workspace: Workspace) -> dict: logger.debug("Processing scan result for datasets") for dataset_dict in datasets: - dataset_instance: PowerBIDataset = self._get_resolver().get_dataset( - workspace=workspace, - dataset_id=dataset_dict[Constant.ID], - ) + dataset_id = dataset_dict[Constant.ID] + try: + dataset_instance = self._get_resolver().get_dataset( + workspace=workspace, + dataset_id=dataset_id, + ) + if dataset_instance is None: + continue + except Exception as e: + self.reporter.warning( + title="Unable to fetch dataset details", + message="Skipping this dataset due to the error. Metadata will be incomplete.", + context=f"workspace={workspace.name}, dataset-id={dataset_id}", + exc=e, + ) + continue # fetch + set dataset parameters try: dataset_parameters = self._get_resolver().get_dataset_parameters( workspace_id=workspace.id, - dataset_id=dataset_dict[Constant.ID], + dataset_id=dataset_id, ) dataset_instance.parameters = dataset_parameters except Exception as e: - logger.info( - f"Unable to fetch dataset parameters for {dataset_dict[Constant.ID]}: {e}" - ) + logger.info(f"Unable to fetch dataset parameters for {dataset_id}: {e}") if self.__config.extract_endorsements_to_tags: dataset_instance.tags = self._parse_endorsement( @@ -564,8 +574,7 @@ def _fill_metadata_from_scan_result( ) else: logger.info( - "Skipping endorsements tag as extract_endorsements_to_tags is set to " - "false " + "Skipping endorsements tag as extract_endorsements_to_tags is not enabled" ) self._populate_app_details( @@ -641,6 +650,9 @@ def fill_dashboard_tags() -> None: def fill_workspaces( self, workspaces: List[Workspace], reporter: PowerBiDashboardSourceReport ) -> Iterable[Workspace]: + logger.info( + f"Fetching initial metadata for workspaces: {[workspace.format_name_for_logger() for workspace in workspaces]}" + ) workspaces = self._fill_metadata_from_scan_result(workspaces=workspaces) # First try to fill the admin detail as some regular metadata contains lineage to admin metadata diff --git a/metadata-ingestion/src/datahub/utilities/threading_timeout.py b/metadata-ingestion/src/datahub/utilities/threading_timeout.py new file mode 100644 index 0000000000000..e2caf57ad2116 --- /dev/null +++ b/metadata-ingestion/src/datahub/utilities/threading_timeout.py @@ -0,0 +1,42 @@ +import contextlib +import functools +import platform +from typing import ContextManager + +from stopit import ThreadingTimeout as _ThreadingTimeout, TimeoutException + +__all__ = ["threading_timeout", "TimeoutException"] + + +@functools.lru_cache(maxsize=1) +def _is_cpython() -> bool: + """Check if we're running on CPython.""" + return platform.python_implementation() == "CPython" + + +def threading_timeout(timeout: float) -> ContextManager[None]: + """A timeout context manager that uses stopit's ThreadingTimeout underneath. + + This is only supported on CPython. + That's because stopit.ThreadingTimeout uses a CPython-internal method to raise + an exception (the timeout error) in another thread. See stopit.threadstop.async_raise. + + Reference: https://github.com/glenfant/stopit + + Args: + timeout: The timeout in seconds. If <= 0, no timeout is applied. + + Raises: + RuntimeError: If the timeout is not supported on the current Python implementation. + TimeoutException: If the timeout is exceeded. + """ + + if timeout <= 0: + return contextlib.nullcontext() + + if not _is_cpython(): + raise RuntimeError( + f"Timeout is only supported on CPython, not {platform.python_implementation()}" + ) + + return _ThreadingTimeout(timeout, swallow_exc=False) diff --git a/metadata-ingestion/tests/integration/powerbi/golden_test_container.json b/metadata-ingestion/tests/integration/powerbi/golden_test_container.json index e8be3aa9c0ac7..1039240942a5e 100644 --- a/metadata-ingestion/tests/integration/powerbi/golden_test_container.json +++ b/metadata-ingestion/tests/integration/powerbi/golden_test_container.json @@ -8,7 +8,10 @@ "json": { "customProperties": { "platform": "powerbi", - "workspace": "demo-workspace" + "workspace": "demo-workspace", + "workspace_id": "64ED5CAD-7C10-4684-8180-826122881108", + "workspace_name": "demo-workspace", + "workspace_type": "Workspace" }, "name": "demo-workspace" } @@ -3957,7 +3960,10 @@ "json": { "customProperties": { "platform": "powerbi", - "workspace": "second-demo-workspace" + "workspace": "second-demo-workspace", + "workspace_id": "64ED5CAD-7C22-4684-8180-826122881108", + "workspace_name": "second-demo-workspace", + "workspace_type": "Workspace" }, "name": "second-demo-workspace" } diff --git a/metadata-ingestion/tests/integration/powerbi/golden_test_most_config_and_modified_since_admin_only.json b/metadata-ingestion/tests/integration/powerbi/golden_test_most_config_and_modified_since_admin_only.json index e134d795af9ef..0d3a0c0cc6f97 100644 --- a/metadata-ingestion/tests/integration/powerbi/golden_test_most_config_and_modified_since_admin_only.json +++ b/metadata-ingestion/tests/integration/powerbi/golden_test_most_config_and_modified_since_admin_only.json @@ -8,7 +8,10 @@ "json": { "customProperties": { "platform": "powerbi", - "workspace": "64ED5CAD-7C10-4684-8180-826122881108" + "workspace": "64ED5CAD-7C10-4684-8180-826122881108", + "workspace_id": "64ED5CAD-7C10-4684-8180-826122881108", + "workspace_name": "demo-workspace", + "workspace_type": "Workspace" }, "name": "demo-workspace" } diff --git a/metadata-ingestion/tests/integration/powerbi/golden_test_personal_ingest.json b/metadata-ingestion/tests/integration/powerbi/golden_test_personal_ingest.json index f8c0fdc17c880..c605f939235cd 100644 --- a/metadata-ingestion/tests/integration/powerbi/golden_test_personal_ingest.json +++ b/metadata-ingestion/tests/integration/powerbi/golden_test_personal_ingest.json @@ -8,7 +8,10 @@ "json": { "customProperties": { "platform": "powerbi", - "workspace": "Jane Smith Workspace" + "workspace": "Jane Smith Workspace", + "workspace_id": "90E9E256-3D6D-4D38-86C8-6CCCBD8C170C", + "workspace_name": "Jane Smith Workspace", + "workspace_type": "PersonalGroup" }, "name": "Jane Smith Workspace" } diff --git a/metadata-ingestion/tests/unit/utilities/test_threading_timeout.py b/metadata-ingestion/tests/unit/utilities/test_threading_timeout.py new file mode 100644 index 0000000000000..c52d18bdd55c2 --- /dev/null +++ b/metadata-ingestion/tests/unit/utilities/test_threading_timeout.py @@ -0,0 +1,31 @@ +import time + +import pytest + +from datahub.utilities.threading_timeout import TimeoutException, threading_timeout + + +def test_timeout_no_timeout(): + # Should complete without raising an exception + with threading_timeout(1.0): + time.sleep(0.1) + + +def test_timeout_raises(): + # Should raise TimeoutException + with pytest.raises(TimeoutException): + with threading_timeout(0.1): + time.sleep(0.5) + + +def test_timeout_early_exit(): + # Test that context manager handles other exceptions properly + with pytest.raises(ValueError): + with threading_timeout(1.0): + raise ValueError("Early exit") + + +def test_timeout_zero(): + # Should not raise an exception + with threading_timeout(0.0): + pass From cf3b08fecb3f9991a4f06c4828c3224ccbbb40ed Mon Sep 17 00:00:00 2001 From: RyanHolstien Date: Wed, 30 Oct 2024 13:41:09 -0500 Subject: [PATCH 05/21] fix(structuredProperties): fixes underscore behavior in structured property names (#11746) Co-authored-by: david-leifker <114954101+david-leifker@users.noreply.github.com> --- .../models/StructuredPropertyUtils.java | 41 ++++--- .../request/AggregationQueryBuilderTest.java | 112 ++++++++++++++++++ .../request/SearchRequestHandlerTest.java | 55 +++++++++ .../metadata/search/utils/ESUtilsTest.java | 84 +++++++++++++ .../test_structured_properties.py | 22 ++-- 5 files changed, 282 insertions(+), 32 deletions(-) diff --git a/entity-registry/src/main/java/com/linkedin/metadata/models/StructuredPropertyUtils.java b/entity-registry/src/main/java/com/linkedin/metadata/models/StructuredPropertyUtils.java index 885be5e00b945..41ef9c25a0f3e 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/models/StructuredPropertyUtils.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/models/StructuredPropertyUtils.java @@ -6,7 +6,6 @@ import static com.linkedin.metadata.Constants.STRUCTURED_PROPERTY_MAPPING_FIELD; import static com.linkedin.metadata.Constants.STRUCTURED_PROPERTY_MAPPING_FIELD_PREFIX; import static com.linkedin.metadata.Constants.STRUCTURED_PROPERTY_MAPPING_VERSIONED_FIELD; -import static com.linkedin.metadata.Constants.STRUCTURED_PROPERTY_MAPPING_VERSIONED_FIELD_PREFIX; import com.google.common.collect.ImmutableSet; import com.linkedin.common.Status; @@ -120,12 +119,13 @@ public static Optional toStructuredPropertyFacetName( lookupDefinitionFromFilterOrFacetName( @Nonnull String fieldOrFacetName, @Nullable AspectRetriever aspectRetriever) { if (fieldOrFacetName.startsWith(STRUCTURED_PROPERTY_MAPPING_FIELD + ".")) { - String fqn = - fieldOrFacetName - .substring(STRUCTURED_PROPERTY_MAPPING_FIELD.length() + 1) - .replace(".keyword", "") - .replace(".delimited", ""); + // Coming in from the UI this is structuredProperties. + any particular specifier for + // subfield (.keyword etc) + String fqn = fieldOrFacetToFQN(fieldOrFacetName); + + // FQN Maps directly to URN with urn:li:structuredProperties:FQN Urn urn = toURNFromFQN(fqn); + Map> result = Objects.requireNonNull(aspectRetriever) .getLatestAspectObjects( @@ -223,9 +223,8 @@ public static void validateStructuredPropertyFQN( * @param fqn structured property's fqn * @return the expected structured property urn */ - private static Urn toURNFromFQN(@Nonnull String fqn) { - return UrnUtils.getUrn( - String.join(":", "urn:li", STRUCTURED_PROPERTY_ENTITY_NAME, fqn.replace('_', '.'))); + public static Urn toURNFromFQN(@Nonnull String fqn) { + return UrnUtils.getUrn(String.join(":", "urn:li", STRUCTURED_PROPERTY_ENTITY_NAME, fqn)); } public static void validateFilter( @@ -235,12 +234,13 @@ public static void validateFilter( return; } - Set fieldNames = new HashSet<>(); + Set fqns = new HashSet<>(); if (filter.getCriteria() != null) { for (Criterion c : filter.getCriteria()) { if (c.getField().startsWith(STRUCTURED_PROPERTY_MAPPING_FIELD_PREFIX)) { - fieldNames.add(stripStructuredPropertyPrefix(c.getField())); + String fqn = fieldOrFacetToFQN(c.getField()); + fqns.add(fqn); } } } @@ -249,24 +249,23 @@ public static void validateFilter( for (ConjunctiveCriterion cc : filter.getOr()) { for (Criterion c : cc.getAnd()) { if (c.getField().startsWith(STRUCTURED_PROPERTY_MAPPING_FIELD_PREFIX)) { - fieldNames.add(stripStructuredPropertyPrefix(c.getField())); + String fqn = fieldOrFacetToFQN(c.getField()); + fqns.add(fqn); } } } } - if (!fieldNames.isEmpty()) { - validateStructuredPropertyFQN(fieldNames, Objects.requireNonNull(aspectRetriever)); + if (!fqns.isEmpty()) { + validateStructuredPropertyFQN(fqns, Objects.requireNonNull(aspectRetriever)); } } - private static String stripStructuredPropertyPrefix(String s) { - if (s.startsWith(STRUCTURED_PROPERTY_MAPPING_VERSIONED_FIELD_PREFIX)) { - return s.substring(STRUCTURED_PROPERTY_MAPPING_VERSIONED_FIELD.length() + 1).split("[.]")[0]; - } else if (s.startsWith(STRUCTURED_PROPERTY_MAPPING_FIELD_PREFIX)) { - return s.substring(STRUCTURED_PROPERTY_MAPPING_FIELD.length() + 1).split("[.]")[0]; - } - return s; + private static String fieldOrFacetToFQN(String fieldOrFacet) { + return fieldOrFacet + .substring(STRUCTURED_PROPERTY_MAPPING_FIELD.length() + 1) + .replace(".keyword", "") + .replace(".delimited", ""); } public static Date toDate(PrimitivePropertyValue value) throws DateTimeParseException { diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/query/request/AggregationQueryBuilderTest.java b/metadata-io/src/test/java/com/linkedin/metadata/search/query/request/AggregationQueryBuilderTest.java index 1381e9560b7e5..cef463802a6b1 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/query/request/AggregationQueryBuilderTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/query/request/AggregationQueryBuilderTest.java @@ -44,6 +44,8 @@ public class AggregationQueryBuilderTest { public void setup() throws RemoteInvocationException, URISyntaxException { Urn helloUrn = Urn.createFromString("urn:li:structuredProperty:hello"); Urn abFghTenUrn = Urn.createFromString("urn:li:structuredProperty:ab.fgh.ten"); + Urn underscoresAndDotsUrn = + Urn.createFromString("urn:li:structuredProperty:under.scores.and.dots_make_a_mess"); // legacy aspectRetriever = mock(AspectRetriever.class); @@ -75,6 +77,21 @@ public void setup() throws RemoteInvocationException, URISyntaxException { STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME, new Aspect(structPropAbFghTenDefinition.data())))); + StructuredPropertyDefinition structPropUnderscoresAndDotsDefinition = + new StructuredPropertyDefinition(); + structPropUnderscoresAndDotsDefinition.setVersion(null, SetMode.REMOVE_IF_NULL); + structPropUnderscoresAndDotsDefinition.setValueType( + Urn.createFromString(DATA_TYPE_URN_PREFIX + "string")); + structPropUnderscoresAndDotsDefinition.setQualifiedName("under.scores.and.dots_make_a_mess"); + structPropUnderscoresAndDotsDefinition.setDisplayName("under.scores.and.dots_make_a_mess"); + when(aspectRetriever.getLatestAspectObjects(eq(Set.of(underscoresAndDotsUrn)), anySet())) + .thenReturn( + Map.of( + underscoresAndDotsUrn, + Map.of( + STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME, + new Aspect(structPropUnderscoresAndDotsDefinition.data())))); + // V1 aspectRetrieverV1 = mock(AspectRetriever.class); when(aspectRetrieverV1.getEntityRegistry()) @@ -105,6 +122,21 @@ public void setup() throws RemoteInvocationException, URISyntaxException { Map.of( STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME, new Aspect(structPropAbFghTenDefinitionV1.data())))); + + StructuredPropertyDefinition structPropUnderscoresAndDotsDefinitionV1 = + new StructuredPropertyDefinition(); + structPropUnderscoresAndDotsDefinitionV1.setVersion("00000000000001"); + structPropUnderscoresAndDotsDefinitionV1.setValueType( + Urn.createFromString(DATA_TYPE_URN_PREFIX + "string")); + structPropUnderscoresAndDotsDefinitionV1.setQualifiedName("under.scores.and.dots_make_a_mess"); + structPropUnderscoresAndDotsDefinitionV1.setDisplayName("under.scores.and.dots_make_a_mess"); + when(aspectRetrieverV1.getLatestAspectObjects(eq(Set.of(underscoresAndDotsUrn)), anySet())) + .thenReturn( + Map.of( + underscoresAndDotsUrn, + Map.of( + STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME, + new Aspect(structPropUnderscoresAndDotsDefinitionV1.data())))); } @Test @@ -269,6 +301,46 @@ public void testAggregateOverStructuredProperty() { Set.of("structuredProperties.ab_fgh_ten.keyword", "structuredProperties.hello.keyword")); } + @Test + public void testAggregateOverStructuredPropertyNamespaced() { + SearchConfiguration config = new SearchConfiguration(); + config.setMaxTermBucketSize(25); + + AggregationQueryBuilder builder = + new AggregationQueryBuilder( + config, ImmutableMap.of(mock(EntitySpec.class), ImmutableList.of())); + + List aggs = + builder.getAggregations( + TestOperationContexts.systemContextNoSearchAuthorization(aspectRetriever), + List.of("structuredProperties.under.scores.and.dots_make_a_mess")); + Assert.assertEquals(aggs.size(), 1); + AggregationBuilder aggBuilder = aggs.get(0); + Assert.assertTrue(aggBuilder instanceof TermsAggregationBuilder); + TermsAggregationBuilder agg = (TermsAggregationBuilder) aggBuilder; + // Check that field name is sanitized to correct field name + Assert.assertEquals( + agg.field(), + "structuredProperties.under_scores_and_dots_make_a_mess.keyword", + "Terms aggregate must be on a keyword or subfield keyword"); + + // Two structured properties + aggs = + builder.getAggregations( + TestOperationContexts.systemContextNoSearchAuthorization(aspectRetriever), + List.of( + "structuredProperties.under.scores.and.dots_make_a_mess", + "structuredProperties.hello")); + Assert.assertEquals(aggs.size(), 2); + Assert.assertEquals( + aggs.stream() + .map(aggr -> ((TermsAggregationBuilder) aggr).field()) + .collect(Collectors.toSet()), + Set.of( + "structuredProperties.under_scores_and_dots_make_a_mess.keyword", + "structuredProperties.hello.keyword")); + } + @Test public void testAggregateOverStructuredPropertyV1() { SearchConfiguration config = new SearchConfiguration(); @@ -309,6 +381,46 @@ public void testAggregateOverStructuredPropertyV1() { "structuredProperties._versioned.hello.00000000000001.string.keyword")); } + @Test + public void testAggregateOverStructuredPropertyNamespacedV1() { + SearchConfiguration config = new SearchConfiguration(); + config.setMaxTermBucketSize(25); + + AggregationQueryBuilder builder = + new AggregationQueryBuilder( + config, ImmutableMap.of(mock(EntitySpec.class), ImmutableList.of())); + + List aggs = + builder.getAggregations( + TestOperationContexts.systemContextNoSearchAuthorization(aspectRetrieverV1), + List.of("structuredProperties.under.scores.and.dots_make_a_mess")); + Assert.assertEquals(aggs.size(), 1); + AggregationBuilder aggBuilder = aggs.get(0); + Assert.assertTrue(aggBuilder instanceof TermsAggregationBuilder); + TermsAggregationBuilder agg = (TermsAggregationBuilder) aggBuilder; + // Check that field name is sanitized to correct field name + Assert.assertEquals( + agg.field(), + "structuredProperties._versioned.under_scores_and_dots_make_a_mess.00000000000001.string.keyword", + "Terms aggregation must be on a keyword field or subfield."); + + // Two structured properties + aggs = + builder.getAggregations( + TestOperationContexts.systemContextNoSearchAuthorization(aspectRetrieverV1), + List.of( + "structuredProperties.under.scores.and.dots_make_a_mess", + "structuredProperties._versioned.hello.00000000000001.string")); + Assert.assertEquals(aggs.size(), 2); + Assert.assertEquals( + aggs.stream() + .map(aggr -> ((TermsAggregationBuilder) aggr).field()) + .collect(Collectors.toSet()), + Set.of( + "structuredProperties._versioned.under_scores_and_dots_make_a_mess.00000000000001.string.keyword", + "structuredProperties._versioned.hello.00000000000001.string.keyword")); + } + @Test public void testAggregateOverFieldsAndStructProp() { SearchableAnnotation annotation1 = diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/query/request/SearchRequestHandlerTest.java b/metadata-io/src/test/java/com/linkedin/metadata/search/query/request/SearchRequestHandlerTest.java index a3ef62760d797..718a00d067ce5 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/query/request/SearchRequestHandlerTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/query/request/SearchRequestHandlerTest.java @@ -1,24 +1,34 @@ package com.linkedin.metadata.search.query.request; import static com.linkedin.datahub.graphql.resolvers.search.SearchUtils.SEARCHABLE_ENTITY_TYPES; +import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME; import static com.linkedin.metadata.utils.CriterionUtils.buildCriterion; import static com.linkedin.metadata.utils.CriterionUtils.buildExistsCriterion; import static com.linkedin.metadata.utils.CriterionUtils.buildIsNullCriterion; import static com.linkedin.metadata.utils.SearchUtil.*; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.testng.Assert.*; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.linkedin.common.urn.Urn; +import com.linkedin.data.DataMap; import com.linkedin.data.template.StringArray; import com.linkedin.datahub.graphql.generated.EntityType; import com.linkedin.datahub.graphql.types.entitytype.EntityTypeMapper; +import com.linkedin.entity.Aspect; import com.linkedin.metadata.TestEntitySpecBuilder; +import com.linkedin.metadata.aspect.AspectRetriever; +import com.linkedin.metadata.aspect.GraphRetriever; import com.linkedin.metadata.config.search.ExactMatchConfiguration; import com.linkedin.metadata.config.search.PartialConfiguration; import com.linkedin.metadata.config.search.SearchConfiguration; import com.linkedin.metadata.config.search.WordGramConfiguration; +import com.linkedin.metadata.entity.SearchRetriever; import com.linkedin.metadata.models.EntitySpec; +import com.linkedin.metadata.models.StructuredPropertyUtils; import com.linkedin.metadata.query.filter.Condition; import com.linkedin.metadata.query.filter.ConjunctiveCriterion; import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray; @@ -28,6 +38,8 @@ import com.linkedin.metadata.search.elasticsearch.query.filter.QueryFilterRewriteChain; import com.linkedin.metadata.search.elasticsearch.query.request.SearchRequestHandler; import io.datahubproject.metadata.context.OperationContext; +import io.datahubproject.metadata.context.RetrieverContext; +import io.datahubproject.test.metadata.context.TestOperationContexts; import io.datahubproject.test.search.config.SearchCommonTestConfiguration; import java.util.ArrayList; import java.util.Collection; @@ -633,6 +645,49 @@ public void testBrowsePathQueryFilter() { assertEquals(((ExistsQueryBuilder) mustHaveV1.must().get(0)).fieldName(), "browsePaths"); } + @Test(expectedExceptions = IllegalArgumentException.class) + public void testInvalidStructuredProperty() { + AspectRetriever aspectRetriever = mock(AspectRetriever.class); + Map> aspectResponse = new HashMap<>(); + DataMap statusData = new DataMap(); + statusData.put("removed", true); + Aspect status = new Aspect(statusData); + Urn structPropUrn = StructuredPropertyUtils.toURNFromFQN("under.scores.and.dots.make_a_mess"); + aspectResponse.put(structPropUrn, ImmutableMap.of(STATUS_ASPECT_NAME, status)); + when(aspectRetriever.getLatestAspectObjects( + Collections.singleton(structPropUrn), ImmutableSet.of(STATUS_ASPECT_NAME))) + .thenReturn(aspectResponse); + OperationContext mockRetrieverContext = + TestOperationContexts.systemContextNoSearchAuthorization( + RetrieverContext.builder() + .aspectRetriever(aspectRetriever) + .graphRetriever(mock(GraphRetriever.class)) + .searchRetriever(mock(SearchRetriever.class)) + .build()); + + Criterion structuredPropCriterion = + buildExistsCriterion("structuredProperties.under.scores.and.dots.make_a_mess"); + + CriterionArray criterionArray = new CriterionArray(); + criterionArray.add(structuredPropCriterion); + + ConjunctiveCriterion conjunctiveCriterion = new ConjunctiveCriterion(); + conjunctiveCriterion.setAnd(criterionArray); + + ConjunctiveCriterionArray conjunctiveCriterionArray = new ConjunctiveCriterionArray(); + conjunctiveCriterionArray.add(conjunctiveCriterion); + + Filter filter = new Filter(); + filter.setOr(conjunctiveCriterionArray); + + BoolQueryBuilder test = + SearchRequestHandler.getFilterQuery( + mockRetrieverContext.withSearchFlags(flags -> flags.setFulltext(false)), + filter, + new HashMap<>(), + QueryFilterRewriteChain.EMPTY); + } + @Test public void testQueryByDefault() { final Set COMMON = diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/utils/ESUtilsTest.java b/metadata-io/src/test/java/com/linkedin/metadata/search/utils/ESUtilsTest.java index 6665faacae337..892f7088e7f61 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/utils/ESUtilsTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/utils/ESUtilsTest.java @@ -43,6 +43,8 @@ public class ESUtilsTest { @BeforeClass public static void setup() throws RemoteInvocationException, URISyntaxException { Urn abFghTenUrn = Urn.createFromString("urn:li:structuredProperty:ab.fgh.ten"); + Urn underscoresAndDotsUrn = + Urn.createFromString("urn:li:structuredProperty:under.scores.and.dots_make_a_mess"); // legacy aspectRetriever = mock(AspectRetriever.class); @@ -62,6 +64,20 @@ public static void setup() throws RemoteInvocationException, URISyntaxException STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME, new Aspect(structPropAbFghTenDefinition.data())))); + StructuredPropertyDefinition structPropUnderscoresAndDotsDefinition = + new StructuredPropertyDefinition(); + structPropUnderscoresAndDotsDefinition.setVersion(null, SetMode.REMOVE_IF_NULL); + structPropUnderscoresAndDotsDefinition.setValueType( + Urn.createFromString(DATA_TYPE_URN_PREFIX + "string")); + structPropUnderscoresAndDotsDefinition.setQualifiedName("under.scores.and.dots_make_a_mess"); + when(aspectRetriever.getLatestAspectObjects(eq(Set.of(underscoresAndDotsUrn)), anySet())) + .thenReturn( + Map.of( + underscoresAndDotsUrn, + Map.of( + STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME, + new Aspect(structPropUnderscoresAndDotsDefinition.data())))); + // V1 aspectRetrieverV1 = mock(AspectRetriever.class); when(aspectRetrieverV1.getEntityRegistry()) @@ -80,6 +96,20 @@ public static void setup() throws RemoteInvocationException, URISyntaxException Map.of( STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME, new Aspect(structPropAbFghTenDefinitionV1.data())))); + + StructuredPropertyDefinition structPropUnderscoresAndDotsDefinitionV1 = + new StructuredPropertyDefinition(); + structPropUnderscoresAndDotsDefinitionV1.setVersion("00000000000001"); + structPropUnderscoresAndDotsDefinitionV1.setValueType( + Urn.createFromString(DATA_TYPE_URN_PREFIX + "string")); + structPropUnderscoresAndDotsDefinitionV1.setQualifiedName("under.scores.and.dots_make_a_mess"); + when(aspectRetrieverV1.getLatestAspectObjects(eq(Set.of(underscoresAndDotsUrn)), anySet())) + .thenReturn( + Map.of( + underscoresAndDotsUrn, + Map.of( + STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME, + new Aspect(structPropUnderscoresAndDotsDefinitionV1.data())))); } @Test @@ -703,6 +733,31 @@ public void testGetQueryBuilderFromStructPropEqualsValue() { Assert.assertEquals(result.toString(), expected); } + @Test + public void testGetQueryBuilderFromNamespacedStructPropEqualsValue() { + + final Criterion singleValueCriterion = + buildCriterion( + "structuredProperties.under.scores.and.dots_make_a_mess", Condition.EQUAL, "value1"); + + OperationContext opContext = mock(OperationContext.class); + when(opContext.getAspectRetriever()).thenReturn(aspectRetriever); + QueryBuilder result = + ESUtils.getQueryBuilderFromCriterion( + singleValueCriterion, false, new HashMap<>(), opContext, QueryFilterRewriteChain.EMPTY); + String expected = + "{\n" + + " \"terms\" : {\n" + + " \"structuredProperties.under_scores_and_dots_make_a_mess.keyword\" : [\n" + + " \"value1\"\n" + + " ],\n" + + " \"boost\" : 1.0,\n" + + " \"_name\" : \"structuredProperties.under.scores.and.dots_make_a_mess\"\n" + + " }\n" + + "}"; + Assert.assertEquals(result.toString(), expected); + } + @Test public void testGetQueryBuilderFromStructPropEqualsValueV1() { @@ -731,6 +786,35 @@ public void testGetQueryBuilderFromStructPropEqualsValueV1() { Assert.assertEquals(result.toString(), expected); } + @Test + public void testGetQueryBuilderFromNamespacedStructPropEqualsValueV1() { + + final Criterion singleValueCriterion = + buildCriterion( + "structuredProperties.under.scores.and.dots_make_a_mess", Condition.EQUAL, "value1"); + + OperationContext opContextV1 = mock(OperationContext.class); + when(opContextV1.getAspectRetriever()).thenReturn(aspectRetrieverV1); + QueryBuilder result = + ESUtils.getQueryBuilderFromCriterion( + singleValueCriterion, + false, + new HashMap<>(), + opContextV1, + QueryFilterRewriteChain.EMPTY); + String expected = + "{\n" + + " \"terms\" : {\n" + + " \"structuredProperties._versioned.under_scores_and_dots_make_a_mess.00000000000001.string.keyword\" : [\n" + + " \"value1\"\n" + + " ],\n" + + " \"boost\" : 1.0,\n" + + " \"_name\" : \"structuredProperties.under.scores.and.dots_make_a_mess\"\n" + + " }\n" + + "}"; + Assert.assertEquals(result.toString(), expected); + } + @Test public void testGetQueryBuilderFromStructPropExists() { final Criterion singleValueCriterion = buildExistsCriterion("structuredProperties.ab.fgh.ten"); diff --git a/smoke-test/tests/structured_properties/test_structured_properties.py b/smoke-test/tests/structured_properties/test_structured_properties.py index 8b6fead789b3f..f0509aad77625 100644 --- a/smoke-test/tests/structured_properties/test_structured_properties.py +++ b/smoke-test/tests/structured_properties/test_structured_properties.py @@ -165,13 +165,13 @@ def get_property_from_entity( return None -def to_es_name(property_name=None, namespace=default_namespace, qualified_name=None): +def to_es_filter_name( + property_name=None, namespace=default_namespace, qualified_name=None +): if property_name: - namespace_field = namespace.replace(".", "_") - return f"structuredProperties.{namespace_field}_{property_name}" + return f"structuredProperties.{namespace}.{property_name}" else: - escaped_qualified_name = qualified_name.replace(".", "_") - return f"structuredProperties.{escaped_qualified_name}" + return f"structuredProperties.{qualified_name}" # @tenacity.retry( @@ -446,7 +446,7 @@ def test_structured_property_search( graph_client.get_urns_by_filter( extraFilters=[ { - "field": to_es_name(dataset_property_name), + "field": to_es_filter_name(dataset_property_name), "negated": "false", "condition": "EXISTS", } @@ -475,7 +475,7 @@ def test_structured_property_search( entity_types=["tag"], extraFilters=[ { - "field": to_es_name( + "field": to_es_filter_name( field_property_name, namespace="io.datahubproject.test" ), "negated": "false", @@ -492,7 +492,7 @@ def test_structured_property_search( entity_types=["dataset", "tag"], extraFilters=[ { - "field": to_es_name(dataset_property_name), + "field": to_es_filter_name(dataset_property_name), "negated": "false", "condition": "EXISTS", } @@ -690,7 +690,7 @@ def test_dataset_structured_property_soft_delete_search_filter_validation( graph_client.get_urns_by_filter( extraFilters=[ { - "field": to_es_name(dataset_property_name), + "field": to_es_filter_name(property_name=dataset_property_name), "negated": "false", "condition": "EXISTS", } @@ -710,7 +710,7 @@ def test_dataset_structured_property_soft_delete_search_filter_validation( graph_client.get_urns_by_filter( extraFilters=[ { - "field": to_es_name(dataset_property_name), + "field": to_es_filter_name(property_name=dataset_property_name), "negated": "false", "condition": "EXISTS", } @@ -779,7 +779,7 @@ def validate_search(qualified_name, expected): graph_client.get_urns_by_filter( extraFilters=[ { - "field": to_es_name(qualified_name=qualified_name), + "field": to_es_filter_name(qualified_name=qualified_name), "negated": "false", "condition": "EXISTS", } From 5c0377a761a2f251d3a48b909b92b984a5d2fbd6 Mon Sep 17 00:00:00 2001 From: John Joyce Date: Wed, 30 Oct 2024 14:53:38 -0700 Subject: [PATCH 06/21] feat(ui): Support markdown for incident descriptions (#11759) Co-authored-by: John Joyce --- .../Incident/components/AddIncidentModal.tsx | 21 +++++++++++++++++-- .../Incident/components/IncidentListItem.tsx | 3 ++- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/datahub-web-react/src/app/entity/shared/tabs/Incident/components/AddIncidentModal.tsx b/datahub-web-react/src/app/entity/shared/tabs/Incident/components/AddIncidentModal.tsx index fda8c9cda2d0d..663575f062093 100644 --- a/datahub-web-react/src/app/entity/shared/tabs/Incident/components/AddIncidentModal.tsx +++ b/datahub-web-react/src/app/entity/shared/tabs/Incident/components/AddIncidentModal.tsx @@ -1,7 +1,7 @@ import React, { useState } from 'react'; import { message, Modal, Button, Form, Input, Typography, Select } from 'antd'; import { useApolloClient } from '@apollo/client'; -import TextArea from 'antd/lib/input/TextArea'; +import styled from 'styled-components'; import analytics, { EventType, EntityActionType } from '../../../../../analytics'; import { useEntityData } from '../../../EntityContext'; import { EntityType, IncidentSourceType, IncidentState, IncidentType } from '../../../../../../types.generated'; @@ -9,6 +9,12 @@ import { INCIDENT_DISPLAY_TYPES, PAGE_SIZE, addActiveIncidentToCache } from '../ import { useRaiseIncidentMutation } from '../../../../../../graphql/mutations.generated'; import handleGraphQLError from '../../../../../shared/handleGraphQLError'; import { useUserContext } from '../../../../../context/useUserContext'; +import { Editor } from '../../Documentation/components/editor/Editor'; +import { ANTD_GRAY } from '../../../constants'; + +const StyledEditor = styled(Editor)` + border: 1px solid ${ANTD_GRAY[4.5]}; +`; type AddIncidentProps = { open: boolean; @@ -112,6 +118,7 @@ export const AddIncidentModal = ({ open, onClose, refetch }: AddIncidentProps) = open={open} destroyOnClose onCancel={handleClose} + width={600} footer={[