Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/fivetran): avoid duplicate table lineage entries #11712

Merged
merged 8 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ class FivetranSourceReport(StaleEntityRemovalSourceReport):
def report_connectors_scanned(self, count: int = 1) -> None:
self.connectors_scanned += count

def report_connectors_dropped(self, model: str) -> None:
self.filtered_connectors.append(model)
def report_connectors_dropped(self, connector: str) -> None:
self.filtered_connectors.append(connector)


class PlatformDetail(ConfigModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def __init__(self, config: FivetranSourceConfig, ctx: PipelineContext):

self.audit_log = FivetranLogAPI(self.config.fivetran_log_config)

def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None:
def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, str]:
input_dataset_urn_list: List[DatasetUrn] = []
output_dataset_urn_list: List[DatasetUrn] = []
fine_grained_lineage: List[FineGrainedLineage] = []
Expand All @@ -93,8 +93,11 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None:
connector.connector_type
]
else:
logger.info(
f"Fivetran connector source type: {connector.connector_type} is not supported to mapped with Datahub dataset entity."
self.report.info(
title="Guessing source platform for lineage",
message="We encountered a connector type that we don't fully support yet. "
"We will attempt to guess the platform based on the connector type.",
context=f"{connector.connector_name} (connector_id: {connector.connector_id}, connector_type: {connector.connector_type})",
)
source_details.platform = connector.connector_type

Expand Down Expand Up @@ -170,7 +173,19 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None:
datajob.inlets.extend(input_dataset_urn_list)
datajob.outlets.extend(output_dataset_urn_list)
datajob.fine_grained_lineages.extend(fine_grained_lineage)
return None

return dict(
**{
f"source.{k}": str(v)
for k, v in source_details.dict().items()
if v is not None
},
**{
f"destination.{k}": str(v)
for k, v in destination_details.dict().items()
if v is not None
},
)

def _generate_dataflow_from_connector(self, connector: Connector) -> DataFlow:
return DataFlow(
Expand All @@ -196,23 +211,23 @@ def _generate_datajob_from_connector(self, connector: Connector) -> DataJob:
owners={owner_email} if owner_email else set(),
)

job_property_bag: Dict[str, str] = {}
allowed_connection_keys = [
Constant.PAUSED,
Constant.SYNC_FREQUENCY,
Constant.DESTINATION_ID,
]
for key in allowed_connection_keys:
if hasattr(connector, key) and getattr(connector, key) is not None:
job_property_bag[key] = repr(getattr(connector, key))
datajob.properties = job_property_bag

# Map connector source and destination table with dataset entity
# Also extend the fine grained lineage of column if include_column_lineage is True
self._extend_lineage(connector=connector, datajob=datajob)

lineage_properties = self._extend_lineage(connector=connector, datajob=datajob)
# TODO: Add fine grained lineages of dataset after FineGrainedLineageDownstreamType.DATASET enabled

connector_properties: Dict[str, str] = {
"connector_id": connector.connector_id,
"connector_type": connector.connector_type,
"paused": str(connector.paused),
"sync_frequency": str(connector.sync_frequency),
"destination_id": connector.destination_id,
}
datajob.properties = {
**connector_properties,
**lineage_properties,
}

return datajob

def _generate_dpi_from_job(self, job: Job, datajob: DataJob) -> DataProcessInstance:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,20 +259,23 @@ def get_allowed_connectors_list(
logger.info("Fetching connector list")
connector_list = self._query(self.fivetran_log_query.get_connectors_query())
for connector in connector_list:
connector_id = connector[Constant.CONNECTOR_ID]
connector_name = connector[Constant.CONNECTOR_NAME]
if not connector_patterns.allowed(connector_name):
report.report_connectors_dropped(connector_name)
report.report_connectors_dropped(
f"{connector_name} (connector_id: {connector_id}, dropped due to filter pattern)"
)
continue
if not destination_patterns.allowed(
destination_id := connector[Constant.DESTINATION_ID]
):
report.report_connectors_dropped(
f"{connector_name} (destination_id: {destination_id})"
f"{connector_name} (connector_id: {connector_id}, destination_id: {destination_id})"
)
continue
connectors.append(
Connector(
connector_id=connector[Constant.CONNECTOR_ID],
connector_id=connector_id,
connector_name=connector_name,
connector_type=connector[Constant.CONNECTOR_TYPE_ID],
paused=connector[Constant.PAUSED],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from typing import List

# Safeguards to prevent fetching massive amounts of data.
MAX_TABLE_LINEAGE_PER_CONNECTOR = 50
MAX_COLUMN_LINEAGE_PER_CONNECTOR = 500
MAX_TABLE_LINEAGE_PER_CONNECTOR = 120
MAX_COLUMN_LINEAGE_PER_CONNECTOR = 1000
MAX_JOBS_PER_CONNECTOR = 500


Expand Down Expand Up @@ -33,6 +33,7 @@ def get_connectors_query(self) -> str:
FROM {self.db_clause}connector
WHERE
_fivetran_deleted = FALSE
QUALIFY ROW_NUMBER() OVER (PARTITION BY connector_id ORDER BY _fivetran_synced DESC) = 1
"""

def get_users_query(self) -> str:
Expand Down Expand Up @@ -86,21 +87,29 @@ def get_table_lineage_query(self, connector_ids: List[str]) -> str:

return f"""\
SELECT
stm.connector_id as connector_id,
stm.id as source_table_id,
stm.name as source_table_name,
ssm.name as source_schema_name,
dtm.id as destination_table_id,
dtm.name as destination_table_name,
dsm.name as destination_schema_name
FROM {self.db_clause}table_lineage as tl
JOIN {self.db_clause}source_table_metadata as stm on tl.source_table_id = stm.id
JOIN {self.db_clause}destination_table_metadata as dtm on tl.destination_table_id = dtm.id
JOIN {self.db_clause}source_schema_metadata as ssm on stm.schema_id = ssm.id
JOIN {self.db_clause}destination_schema_metadata as dsm on dtm.schema_id = dsm.id
WHERE stm.connector_id IN ({formatted_connector_ids})
QUALIFY ROW_NUMBER() OVER (PARTITION BY stm.connector_id ORDER BY tl.created_at DESC) <= {MAX_TABLE_LINEAGE_PER_CONNECTOR}
ORDER BY stm.connector_id, tl.created_at DESC
*
FROM (
SELECT
stm.connector_id as connector_id,
stm.id as source_table_id,
stm.name as source_table_name,
ssm.name as source_schema_name,
dtm.id as destination_table_id,
dtm.name as destination_table_name,
dsm.name as destination_schema_name,
tl.created_at as created_at,
ROW_NUMBER() OVER (PARTITION BY stm.connector_id, stm.id, dtm.id ORDER BY tl.created_at DESC) as table_combo_rn
FROM {self.db_clause}table_lineage as tl
JOIN {self.db_clause}source_table_metadata as stm on tl.source_table_id = stm.id
JOIN {self.db_clause}destination_table_metadata as dtm on tl.destination_table_id = dtm.id
JOIN {self.db_clause}source_schema_metadata as ssm on stm.schema_id = ssm.id
JOIN {self.db_clause}destination_schema_metadata as dsm on dtm.schema_id = dsm.id
WHERE stm.connector_id IN ({formatted_connector_ids})
)
-- Ensure that we only get back one entry per source and destination pair.
WHERE table_combo_rn = 1
QUALIFY ROW_NUMBER() OVER (PARTITION BY connector_id ORDER BY created_at DESC) <= {MAX_TABLE_LINEAGE_PER_CONNECTOR}
ORDER BY connector_id, created_at DESC
"""

def get_column_lineage_query(self, connector_ids: List[str]) -> str:
Expand All @@ -109,19 +118,31 @@ def get_column_lineage_query(self, connector_ids: List[str]) -> str:

return f"""\
SELECT
scm.table_id as source_table_id,
dcm.table_id as destination_table_id,
scm.name as source_column_name,
dcm.name as destination_column_name
FROM {self.db_clause}column_lineage as cl
JOIN {self.db_clause}source_column_metadata as scm
ON cl.source_column_id = scm.id
JOIN {self.db_clause}destination_column_metadata as dcm
ON cl.destination_column_id = dcm.id
-- Only joining source_table_metadata to get the connector_id.
JOIN {self.db_clause}source_table_metadata as stm
ON scm.table_id = stm.id
WHERE stm.connector_id IN ({formatted_connector_ids})
QUALIFY ROW_NUMBER() OVER (PARTITION BY stm.connector_id ORDER BY cl.created_at DESC) <= {MAX_COLUMN_LINEAGE_PER_CONNECTOR}
ORDER BY stm.connector_id, cl.created_at DESC
source_table_id,
destination_table_id,
source_column_name,
destination_column_name
FROM (
SELECT
stm.connector_id as connector_id,
scm.table_id as source_table_id,
dcm.table_id as destination_table_id,
scm.name as source_column_name,
dcm.name as destination_column_name,
cl.created_at as created_at,
ROW_NUMBER() OVER (PARTITION BY stm.connector_id, cl.source_column_id, cl.destination_column_id ORDER BY cl.created_at DESC) as column_combo_rn
FROM {self.db_clause}column_lineage as cl
JOIN {self.db_clause}source_column_metadata as scm
ON cl.source_column_id = scm.id
JOIN {self.db_clause}destination_column_metadata as dcm
ON cl.destination_column_id = dcm.id
-- Only joining source_table_metadata to get the connector_id.
JOIN {self.db_clause}source_table_metadata as stm
ON scm.table_id = stm.id
WHERE stm.connector_id IN ({formatted_connector_ids})
)
-- Ensure that we only get back one entry per (connector, source column, destination column) pair.
WHERE column_combo_rn = 1
QUALIFY ROW_NUMBER() OVER (PARTITION BY connector_id ORDER BY created_at DESC) <= {MAX_COLUMN_LINEAGE_PER_CONNECTOR}
ORDER BY connector_id, created_at DESC
"""
Loading
Loading