Skip to content

Commit

Permalink
feat(sdk):platform-resource - complex queries (#11675)
Browse files Browse the repository at this point in the history
  • Loading branch information
shirshanka authored Oct 19, 2024
1 parent 8f7f2c1 commit 3b1b762
Show file tree
Hide file tree
Showing 6 changed files with 617 additions and 94 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Dict, Iterable, List, Optional, Union
from typing import Callable, Dict, Iterable, List, Optional, Tuple, Type, Union, cast

from avrogen.dict_wrapper import DictWrapper
from pydantic import BaseModel
Expand All @@ -14,7 +14,14 @@
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import DatahubKey
from datahub.ingestion.graph.client import DataHubGraph
from datahub.metadata.urns import PlatformResourceUrn
from datahub.metadata.urns import DataPlatformUrn, PlatformResourceUrn, Urn
from datahub.utilities.openapi_utils import OpenAPIGraphClient
from datahub.utilities.search_utils import (
ElasticDocumentQuery,
ElasticsearchQueryBuilder,
LogicalOperator,
SearchField,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -69,71 +76,75 @@ def to_resource_info(self) -> models.PlatformResourceInfoClass:
)


class OpenAPIGraphClient:
class DataPlatformInstanceUrn:
"""
A simple implementation of a URN class for DataPlatformInstance.
Since this is not present in the URN registry, we need to implement it here.
"""

ENTITY_KEY_ASPECT_MAP = {
aspect_type.ASPECT_INFO.get("keyForEntity"): name
for name, aspect_type in models.ASPECT_NAME_MAP.items()
if aspect_type.ASPECT_INFO.get("keyForEntity")
}
@staticmethod
def create_from_id(platform_instance_urn: str) -> Urn:
if platform_instance_urn.startswith("urn:li:platformInstance:"):
string_urn = platform_instance_urn
else:
string_urn = f"urn:li:platformInstance:{platform_instance_urn}"
return Urn.from_string(string_urn)

def __init__(self, graph: DataHubGraph):
self.graph = graph
self.openapi_base = graph._gms_server.rstrip("/") + "/openapi/v3"

def scroll_urns_by_filter(
self,
entity_type: str,
extra_or_filters: List[Dict[str, str]],
extra_and_filters: List[Dict[str, str]] = [],
) -> Iterable[str]:
"""
Scroll through all urns that match the given filters
"""
class UrnSearchField(SearchField):
"""
A search field that supports URN values.
TODO: Move this to search_utils after we make this more generic.
"""

key_aspect = self.ENTITY_KEY_ASPECT_MAP.get(entity_type)
assert key_aspect, f"No key aspect found for entity type {entity_type}"
if extra_or_filters and extra_and_filters:
raise ValueError(
"Only one of extra_or_filters and extra_and_filters should be provided"
)
def __init__(self, field_name: str, urn_value_extractor: Callable[[str], Urn]):
self.urn_value_extractor = urn_value_extractor
super().__init__(field_name)

count = 1000
query = (
" OR ".join(
[
f"{filter['field']}:\"{filter['value']}\""
for filter in extra_or_filters
]
)
if extra_or_filters
else " AND ".join(
[
f"{filter['field']}:\"{filter['value']}\""
for filter in extra_and_filters
]
)
def get_search_value(self, value: str) -> str:
return str(self.urn_value_extractor(value))


class PlatformResourceSearchField(SearchField):
def __init__(self, field_name: str):
super().__init__(field_name)

@classmethod
def from_search_field(
cls, search_field: SearchField
) -> "PlatformResourceSearchField":
# pretends to be a class method, but just returns the input
return search_field # type: ignore


class PlatformResourceSearchFields:
PRIMARY_KEY = PlatformResourceSearchField("primaryKey")
RESOURCE_TYPE = PlatformResourceSearchField("resourceType")
SECONDARY_KEYS = PlatformResourceSearchField("secondaryKeys")
PLATFORM = PlatformResourceSearchField.from_search_field(
UrnSearchField(
field_name="platform.keyword",
urn_value_extractor=DataPlatformUrn.create_from_id,
)
scroll_id = None
while True:
response = self.graph._get_generic(
self.openapi_base + f"/entity/{entity_type.lower()}",
params={
"systemMetadata": "false",
"includeSoftDelete": "false",
"skipCache": "false",
"aspects": [key_aspect],
"scrollId": scroll_id,
"count": count,
"query": query,
},
)
entities = response.get("entities", [])
scroll_id = response.get("scrollId")
for entity in entities:
yield entity["urn"]
if not scroll_id:
break
)
PLATFORM_INSTANCE = PlatformResourceSearchField.from_search_field(
UrnSearchField(
field_name="platformInstance.keyword",
urn_value_extractor=DataPlatformInstanceUrn.create_from_id,
)
)


class ElasticPlatformResourceQuery(ElasticDocumentQuery[PlatformResourceSearchField]):
def __init__(self):
super().__init__()

@classmethod
def create_from(
cls: Type["ElasticPlatformResourceQuery"],
*args: Tuple[Union[str, PlatformResourceSearchField], str],
) -> "ElasticPlatformResourceQuery":
return cast(ElasticPlatformResourceQuery, super().create_from(*args))


class PlatformResource(BaseModel):
Expand All @@ -147,6 +158,12 @@ def remove(
cls,
key: PlatformResourceKey,
) -> "PlatformResource":
"""
Creates a PlatformResource object with the removed status set to True.
Removed PlatformResource objects are used to soft-delete resources from
the graph.
To hard-delete a resource, use the delete method.
"""
return cls(
id=key.id,
removed=True,
Expand Down Expand Up @@ -240,28 +257,38 @@ def from_datahub(

@staticmethod
def search_by_key(
graph_client: DataHubGraph, key: str, primary: bool = True
graph_client: DataHubGraph,
key: str,
primary: bool = True,
is_exact: bool = True,
) -> Iterable["PlatformResource"]:
extra_or_filters = []
extra_or_filters.append(
{
"field": "primaryKey",
"condition": "EQUAL",
"value": key,
}
"""
Searches for PlatformResource entities by primary or secondary key.
:param graph_client: DataHubGraph client
:param key: The key to search for
:param primary: Whether to search for primary only or expand the search
to secondary keys (default: True)
:param is_exact: Whether to search for an exact match (default: True)
:return: An iterable of PlatformResource objects
"""

elastic_platform_resource_group = (
ElasticPlatformResourceQuery.create_from()
.group(LogicalOperator.OR)
.add_field_match(
PlatformResourceSearchFields.PRIMARY_KEY, key, is_exact=is_exact
)
)
if not primary: # we expand the search to secondary keys
extra_or_filters.append(
{
"field": "secondaryKeys",
"condition": "EQUAL",
"value": key,
}
elastic_platform_resource_group.add_field_match(
PlatformResourceSearchFields.SECONDARY_KEYS, key, is_exact=is_exact
)
query = elastic_platform_resource_group.end()
openapi_client = OpenAPIGraphClient(graph_client)
for urn in openapi_client.scroll_urns_by_filter(
entity_type="platformResource",
extra_or_filters=extra_or_filters,
query=query,
):
platform_resource = PlatformResource.from_datahub(graph_client, urn)
if platform_resource:
Expand All @@ -273,18 +300,16 @@ def delete(self, graph_client: DataHubGraph, hard: bool = True) -> None:
@staticmethod
def search_by_filters(
graph_client: DataHubGraph,
and_filters: List[Dict[str, str]] = [],
or_filters: List[Dict[str, str]] = [],
query: Union[
ElasticPlatformResourceQuery,
ElasticDocumentQuery,
ElasticsearchQueryBuilder,
],
) -> Iterable["PlatformResource"]:
if and_filters and or_filters:
raise ValueError(
"Only one of and_filters and or_filters should be provided"
)
openapi_client = OpenAPIGraphClient(graph_client)
for urn in openapi_client.scroll_urns_by_filter(
entity_type="platformResource",
extra_or_filters=or_filters if or_filters else [],
extra_and_filters=and_filters if and_filters else [],
query=query,
):
platform_resource = PlatformResource.from_datahub(graph_client, urn)
if platform_resource:
Expand Down
69 changes: 69 additions & 0 deletions metadata-ingestion/src/datahub/utilities/openapi_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import logging
from typing import Iterable, Union

import datahub.metadata.schema_classes as models
from datahub.ingestion.graph.client import DataHubGraph
from datahub.utilities.search_utils import (
ElasticDocumentQuery,
ElasticsearchQueryBuilder,
)

logger = logging.getLogger(__name__)


class OpenAPIGraphClient:
"""
An experimental client for the DataHubGraph that uses the OpenAPI endpoints
to query entities and aspects.
Does not support all features of the DataHubGraph.
API is subject to change.
DO NOT USE THIS UNLESS YOU KNOW WHAT YOU ARE DOING.
"""

ENTITY_KEY_ASPECT_MAP = {
aspect_type.ASPECT_INFO.get("keyForEntity"): name
for name, aspect_type in models.ASPECT_NAME_MAP.items()
if aspect_type.ASPECT_INFO.get("keyForEntity")
}

def __init__(self, graph: DataHubGraph):
self.graph = graph
self.openapi_base = graph._gms_server.rstrip("/") + "/openapi/v3"

def scroll_urns_by_filter(
self,
entity_type: str,
query: Union[ElasticDocumentQuery, ElasticsearchQueryBuilder],
) -> Iterable[str]:
"""
Scroll through all urns that match the given filters.
"""

key_aspect = self.ENTITY_KEY_ASPECT_MAP.get(entity_type)
assert key_aspect, f"No key aspect found for entity type {entity_type}"

count = 1000
string_query = query.build()
scroll_id = None
logger.debug(f"Scrolling with query: {string_query}")
while True:
response = self.graph._get_generic(
self.openapi_base + f"/entity/{entity_type.lower()}",
params={
"systemMetadata": "false",
"includeSoftDelete": "false",
"skipCache": "false",
"aspects": [key_aspect],
"scrollId": scroll_id,
"count": count,
"query": string_query,
},
)
entities = response.get("entities", [])
scroll_id = response.get("scrollId")
for entity in entities:
yield entity["urn"]
if not scroll_id:
break
Loading

0 comments on commit 3b1b762

Please sign in to comment.