Skip to content

Commit

Permalink
ingest app as dashboard
Browse files Browse the repository at this point in the history
  • Loading branch information
sid-acryl committed Oct 15, 2024
1 parent be1b880 commit 7593a77
Show file tree
Hide file tree
Showing 6 changed files with 302 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class BIAssetSubTypes(StrEnum):
# PowerBI
POWERBI_TILE = "PowerBI Tile"
POWERBI_PAGE = "PowerBI Page"
POWERBI_APP = "App"

# Mode
MODE_REPORT = "Report"
Expand Down
70 changes: 70 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionSourceBase,
)
from datahub.metadata._schema_classes import EdgeClass
from datahub.metadata.com.linkedin.pegasus2avro.common import ChangeAuditStamps
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
FineGrainedLineage,
Expand Down Expand Up @@ -1306,6 +1307,73 @@ def extract_independent_datasets(
)
)

def emit_app(
self, workspace: powerbi_data_classes.Workspace
) -> Iterable[MetadataWorkUnit]:
if workspace.app is None:
return

edges: List[EdgeClass] = [
EdgeClass(
destinationUrn=builder.make_dashboard_urn(
platform=self.source_config.platform_name,
platform_instance=self.source_config.platform_instance,
name=powerbi_data_classes.Dashboard.get_urn_part_by_id(
app_dashboard.original_dashboard_id
),
)
)
for app_dashboard in workspace.app.dashboards
]

edges.extend(
[
EdgeClass(
destinationUrn=builder.make_dashboard_urn(
platform=self.source_config.platform_name,
platform_instance=self.source_config.platform_instance,
name=powerbi_data_classes.Report.get_urn_part_by_id(
app_report.original_report_id
),
)
)
for app_report in workspace.app.reports
]
)

if edges:
dashboard_info: DashboardInfoClass = DashboardInfoClass(
title=workspace.app.name,
description=workspace.app.description
if workspace.app.description
else workspace.app.name,
# lastModified=workspace.app.last_update,
lastModified=ChangeAuditStamps(),
dashboards=edges,
)

dashboard_urn: str = builder.make_dashboard_urn(
platform=self.source_config.platform_name,
platform_instance=self.source_config.platform_instance,
name=powerbi_data_classes.Dashboard.get_urn_part_by_id(
workspace.app.id
),
)

yield MetadataChangeProposalWrapper(
entityUrn=dashboard_urn,
aspect=dashboard_info,
).as_workunit()

yield MetadataChangeProposalWrapper(
entityUrn=dashboard_urn, aspect=StatusClass(removed=False)
).as_workunit()

yield MetadataChangeProposalWrapper(
entityUrn=dashboard_urn,
aspect=SubTypesClass(typeNames=[BIAssetSubTypes.POWERBI_APP]),
).as_workunit()

def get_workspace_workunit(
self, workspace: powerbi_data_classes.Workspace
) -> Iterable[MetadataWorkUnit]:
Expand All @@ -1318,6 +1386,8 @@ def get_workspace_workunit(
# Return workunit to a Datahub Ingestion framework
yield workunit

yield from self.emit_app(workspace=workspace)

for dashboard in workspace.dashboards:
try:
# Fetch PowerBi users for dashboards
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,31 @@ class DatasetKey(ContainerKey):
dataset: str


@dataclass
class AppDashboard:
id: str
original_dashboard_id: str


@dataclass
class AppReport:
id: str
original_report_id: str


@dataclass
class App:
id: str
name: str
description: Optional[str]
last_update: Optional[str]
dashboards: List["AppDashboard"]
reports: List["AppReport"]

def get_urn_part(self):
return f"apps.{self.id}"


@dataclass
class Workspace:
id: str
Expand All @@ -49,6 +74,7 @@ class Workspace:
dashboard_endorsements: Dict[str, List[str]]
scan_result: dict
independent_datasets: List["PowerBIDataset"]
app: Optional["App"]

def get_urn_part(self, workspace_id_as_urn_part: Optional[bool] = False) -> str:
# shouldn't use workspace name, as they can be the same?
Expand Down Expand Up @@ -229,9 +255,14 @@ class Report:
pages: List["Page"]
users: List["User"]
tags: List[str]
app_reference: Optional["App"]

def get_urn_part(self):
return f"reports.{self.id}"
return Report.get_urn_part_by_id(self.id)

@staticmethod
def get_urn_part_by_id(id_: str) -> str:
return f"reports.{id_}"


@dataclass
Expand Down Expand Up @@ -267,9 +298,14 @@ class Dashboard:
users: List["User"]
tags: List[str]
webUrl: Optional[str]
app_reference: Optional["App"]

def get_urn_part(self):
return f"dashboards.{self.id}"
return Dashboard.get_urn_part_by_id(self.id)

@staticmethod
def get_urn_part_by_id(id_: str) -> str:
return f"dashboards.{id_}"

def __members(self):
return (self.id,)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
from functools import lru_cache
from time import sleep
from typing import Any, Dict, Iterator, List, Optional, Union

Expand All @@ -13,6 +14,7 @@
from datahub.configuration.common import AllowDenyPattern, ConfigurationError
from datahub.ingestion.source.powerbi.config import Constant
from datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes import (
App,
Column,
Dashboard,
Measure,
Expand Down Expand Up @@ -97,6 +99,8 @@ def __init__(
),
)

self.get_app = lru_cache(maxsize=128)(self.__get_app)

@abstractmethod
def get_groups_endpoint(self) -> str:
pass
Expand Down Expand Up @@ -143,6 +147,13 @@ def get_dataset_parameters(
def get_users(self, workspace_id: str, entity: str, entity_id: str) -> List[User]:
pass

@abstractmethod
def _get_app(
self,
app_id: str,
) -> Optional[Dict]:
pass

def _get_authority_url(self):
return f"{DataResolverBase.AUTHORITY}{self.__tenant_id}"

Expand Down Expand Up @@ -221,6 +232,7 @@ def get_dashboards(self, workspace: Workspace) -> List[Dashboard]:
tiles=[],
users=[],
tags=[],
app_reference=None, # It is getting set later from scan_result
)
for instance in dashboards_dict
if (
Expand Down Expand Up @@ -282,6 +294,7 @@ def fetch_reports():
users=[], # It will be fetched using Admin Fetcher based on condition
tags=[], # It will be fetched using Admin Fetcher based on condition
dataset=workspace.datasets.get(raw_instance.get(Constant.DATASET_ID)),
app_reference=None, # It is getting set later from scan-result
)
for raw_instance in fetch_reports()
if Constant.APP_ID
Expand Down Expand Up @@ -414,6 +427,37 @@ def itr_pages(

page_number += 1

def __get_app(
self,
app_id: str,
) -> Optional[App]:

raw_app: Optional[Dict] = self._get_app(
app_id=app_id,
)

if raw_app is None:
return None

assert (
Constant.ID in raw_app
), f"{Constant.ID} is required field not present in server response"

assert (
Constant.NAME in raw_app
), f"{Constant.NAME} is required field not present in server response"

return App(
id=raw_app[Constant.ID],
name=raw_app[Constant.NAME],
description=raw_app.get(Constant.DESCRIPTION),
last_update=raw_app.get(Constant.LAST_UPDATE),
dashboards=[], # dashboards and reports of App are available in scan-result response
reports=[], # There is an App section in documentation https://learn.microsoft.com/en-us/rest/api/power-bi/dashboards/get-dashboards-in-group#code-try-0
# However the report API mentioned in that section is not returning the reports
# We will collect these details from the scan-result.
)


class RegularAPIResolver(DataResolverBase):
# Regular access endpoints
Expand Down Expand Up @@ -683,6 +727,27 @@ def profile_dataset(

table.column_count = column_count

def _get_app(
self,
app_id: str,
) -> Optional[Dict]:

app_endpoint = self.API_ENDPOINTS[Constant.GET_WORKSPACE_APP].format(
MY_ORG_URL=DataResolverBase.MY_ORG_URL,
APP_ID=app_id,
)
# Hit PowerBi
logger.debug(f"Request to app URL={app_endpoint}")

response = self._request_session.get(
url=app_endpoint,
headers=self.get_authorization_header(),
)

response.raise_for_status()

return response.json()


class AdminAPIResolver(DataResolverBase):
# Admin access endpoints
Expand Down Expand Up @@ -996,3 +1061,22 @@ def profile_dataset(
) -> None:
logger.debug("Profile dataset is unsupported in Admin API")
return None

def _get_app(
self,
app_id: str,
) -> Optional[Dict]:

app_endpoint = self.API_ENDPOINTS[Constant.GET_WORKSPACE_APP].format(
POWERBI_ADMIN_BASE_URL=DataResolverBase.ADMIN_BASE_URL,
APP_ID=app_id,
)
# Hit PowerBi
logger.debug(f"Request to app URL={app_endpoint}")

for page in self.itr_pages(endpoint=app_endpoint):
for app in page:
if Constant.ID in app and app_id == app[Constant.ID]:
return app

return None
Loading

0 comments on commit 7593a77

Please sign in to comment.