diff --git a/metadata-ingestion/src/datahub/api/entities/platformresource/platform_resource.py b/metadata-ingestion/src/datahub/api/entities/platformresource/platform_resource.py index 1556a67a9e555..349b0ff11d84f 100644 --- a/metadata-ingestion/src/datahub/api/entities/platformresource/platform_resource.py +++ b/metadata-ingestion/src/datahub/api/entities/platformresource/platform_resource.py @@ -85,6 +85,7 @@ def scroll_urns_by_filter( self, entity_type: str, extra_or_filters: List[Dict[str, str]], + extra_and_filters: List[Dict[str, str]] = [], ) -> Iterable[str]: """ Scroll through all urns that match the given filters @@ -92,10 +93,26 @@ def scroll_urns_by_filter( key_aspect = self.ENTITY_KEY_ASPECT_MAP.get(entity_type) assert key_aspect, f"No key aspect found for entity type {entity_type}" + if extra_or_filters and extra_and_filters: + raise ValueError( + "Only one of extra_or_filters and extra_and_filters should be provided" + ) count = 1000 - query = " OR ".join( - [f"{filter['field']}:{filter['value']}" for filter in extra_or_filters] + query = ( + " OR ".join( + [ + f"{filter['field']}:\"{filter['value']}\"" + for filter in extra_or_filters + ] + ) + if extra_or_filters + else " AND ".join( + [ + f"{filter['field']}:\"{filter['value']}\"" + for filter in extra_and_filters + ] + ) ) scroll_id = None while True: @@ -252,3 +269,23 @@ def search_by_key( def delete(self, graph_client: DataHubGraph, hard: bool = True) -> None: graph_client.delete_entity(str(PlatformResourceUrn(self.id)), hard=hard) + + @staticmethod + def search_by_filters( + graph_client: DataHubGraph, + and_filters: List[Dict[str, str]] = [], + or_filters: List[Dict[str, str]] = [], + ) -> Iterable["PlatformResource"]: + if and_filters and or_filters: + raise ValueError( + "Only one of and_filters and or_filters should be provided" + ) + openapi_client = OpenAPIGraphClient(graph_client) + for urn in openapi_client.scroll_urns_by_filter( + entity_type="platformResource", + extra_or_filters=or_filters if or_filters else [], + extra_and_filters=and_filters if and_filters else [], + ): + platform_resource = PlatformResource.from_datahub(graph_client, urn) + if platform_resource: + yield platform_resource diff --git a/smoke-test/tests/platform_resources/test_platform_resource.py b/smoke-test/tests/platform_resources/test_platform_resource.py index 7c53f72d843c9..7ebfd4d6ea15b 100644 --- a/smoke-test/tests/platform_resources/test_platform_resource.py +++ b/smoke-test/tests/platform_resources/test_platform_resource.py @@ -112,3 +112,77 @@ def test_platform_resource_non_existent(graph_client, test_id): graph_client=graph_client, ) assert platform_resource is None + + +def test_platform_resource_urn_secondary_key(graph_client, test_id): + key = PlatformResourceKey( + platform=f"test_platform_{test_id}", + resource_type=f"test_resource_type_{test_id}", + primary_key=f"test_primary_key_{test_id}", + ) + dataset_urn = ( + f"urn:li:dataset:(urn:li:dataPlatform:test,test_secondary_key_{test_id},PROD)" + ) + platform_resource = PlatformResource.create( + key=key, + value={"test_key": f"test_value_{test_id}"}, + secondary_keys=[dataset_urn], + ) + platform_resource.to_datahub(graph_client) + wait_for_writes_to_sync() + + read_platform_resources = [ + r + for r in PlatformResource.search_by_key( + graph_client, dataset_urn, primary=False + ) + ] + assert len(read_platform_resources) == 1 + assert read_platform_resources[0] == platform_resource + + +def test_platform_resource_listing_by_resource_type(graph_client, test_id): + # Generate two resources with the same resource type + key1 = PlatformResourceKey( + platform=f"test_platform_{test_id}", + resource_type=f"test_resource_type_{test_id}", + primary_key=f"test_primary_key_1_{test_id}", + ) + platform_resource1 = PlatformResource.create( + key=key1, + value={"test_key": f"test_value_1_{test_id}"}, + ) + platform_resource1.to_datahub(graph_client) + + key2 = PlatformResourceKey( + platform=f"test_platform_{test_id}", + resource_type=f"test_resource_type_{test_id}", + primary_key=f"test_primary_key_2_{test_id}", + ) + platform_resource2 = PlatformResource.create( + key=key2, + value={"test_key": f"test_value_2_{test_id}"}, + ) + platform_resource2.to_datahub(graph_client) + + wait_for_writes_to_sync() + + search_results = [ + r + for r in PlatformResource.search_by_filters( + graph_client, + and_filters=[ + { + "field": "resourceType", + "condition": "EQUAL", + "value": key1.resource_type, + } + ], + ) + ] + assert len(search_results) == 2 + + read_platform_resource_1 = next(r for r in search_results if r.id == key1.id) + read_platform_resource_2 = next(r for r in search_results if r.id == key2.id) + assert read_platform_resource_1 == platform_resource1 + assert read_platform_resource_2 == platform_resource2