Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/fivetran): support overriding destination db #11701

Merged
merged 2 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
If stateful ingestion is enabled, running ingestion with the latest CLI version will handle the cleanup automatically. Otherwise, we recommend soft deleting all powerbi data via the DataHub CLI:
`datahub delete --platform powerbi --soft` and then re-ingest with the latest CLI version, ensuring the `include_workspace_name_in_dataset_urn` configuration is set to true.

- #11701: The Fivetran `sources_to_database` field is deprecated in favor of setting directly within `sources_to_platform_instance.<key>.database`.

### Breaking Changes

- #11486 - Deprecated Criterion filters using `value`. Use `values` instead. This also deprecates the ability to use comma delimited string to represent multiple values using `value`.
Expand Down
46 changes: 39 additions & 7 deletions metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import dataclasses
import logging
import warnings
from typing import Dict, Optional

import pydantic
from pydantic import Field, root_validator
from typing_extensions import Literal

from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.common import (
AllowDenyPattern,
ConfigModel,
ConfigurationWarning,
)
from datahub.configuration.source_common import DEFAULT_ENV, DatasetSourceConfigMixin
from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.ingestion.api.report import Report
Expand Down Expand Up @@ -144,6 +149,10 @@ def report_connectors_dropped(self, model: str) -> None:


class PlatformDetail(ConfigModel):
platform: Optional[str] = pydantic.Field(
default=None,
description="Override the platform type detection.",
)
platform_instance: Optional[str] = pydantic.Field(
default=None,
description="The instance of the platform that all assets produced by this recipe belong to",
Expand All @@ -152,6 +161,11 @@ class PlatformDetail(ConfigModel):
default=DEFAULT_ENV,
description="The environment that all assets produced by DataHub platform ingestion source belong to",
)
database: Optional[str] = pydantic.Field(
default=None,
description="The database that all assets produced by this connector belong to. "
"For destinations, this defaults to the fivetran log config's database.",
)


class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):
Expand All @@ -172,24 +186,42 @@ class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin
default=True,
description="Populates table->table column lineage.",
)
sources_to_database: Dict[str, str] = pydantic.Field(
default={},
description="A mapping of the connector's all sources to its database. Use connector id as key.",
)

# Configuration for stateful ingestion
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = pydantic.Field(
default=None, description="Airbyte Stateful Ingestion Config."
)

# Fivetran connector all sources to platform instance mapping
sources_to_platform_instance: Dict[str, PlatformDetail] = pydantic.Field(
default={},
description="A mapping of the connector's all sources dataset to platform instance. Use connector id as key.",
description="A mapping from connector id to its platform/instance/env/database details.",
)
# Fivetran destination to platform instance mapping
destination_to_platform_instance: Dict[str, PlatformDetail] = pydantic.Field(
default={},
description="A mapping of destination dataset to platform instance. Use destination id as key.",
description="A mapping of destination id to its platform/instance/env details.",
)

@pydantic.root_validator(pre=True)
def compat_sources_to_database(cls, values: Dict) -> Dict:
if "sources_to_database" in values:
warnings.warn(
"The sources_to_database field is deprecated, please use sources_to_platform_instance instead.",
ConfigurationWarning,
stacklevel=2,
)
mapping = values.pop("sources_to_database")

values.setdefault("sources_to_platform_instance", {})
for key, value in mapping.items():
values["sources_to_platform_instance"].setdefault(key, {})
values["sources_to_platform_instance"][key].setdefault(
"database", value
)

return values

history_sync_lookback_period: int = pydantic.Field(
7,
description="The number of days to look back when extracting connectors' sync history.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,31 +81,33 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None:
output_dataset_urn_list: List[DatasetUrn] = []
fine_grained_lineage: List[FineGrainedLineage] = []

source_platform_detail: PlatformDetail = PlatformDetail()
destination_platform_detail: PlatformDetail = PlatformDetail()
# TODO: Once Fivetran exposes the database via the API, we shouldn't ask for it via config.

# Get platform details for connector source
source_platform_detail = self.config.sources_to_platform_instance.get(
source_details = self.config.sources_to_platform_instance.get(
connector.connector_id, PlatformDetail()
)
if source_details.platform is None:
if connector.connector_type in KNOWN_DATA_PLATFORM_MAPPING:
source_details.platform = KNOWN_DATA_PLATFORM_MAPPING[
connector.connector_type
]
else:
logger.info(
f"Fivetran connector source type: {connector.connector_type} is not supported to mapped with Datahub dataset entity."
)
source_details.platform = connector.connector_type

# Get platform details for destination
destination_platform_detail = self.config.destination_to_platform_instance.get(
destination_details = self.config.destination_to_platform_instance.get(
connector.destination_id, PlatformDetail()
)

# Get database for connector source
# TODO: Once Fivetran exposes this, we shouldn't ask for it via config.
source_database: Optional[str] = self.config.sources_to_database.get(
connector.connector_id
)

if connector.connector_type in KNOWN_DATA_PLATFORM_MAPPING:
source_platform = KNOWN_DATA_PLATFORM_MAPPING[connector.connector_type]
else:
source_platform = connector.connector_type
logger.info(
f"Fivetran connector source type: {connector.connector_type} is not supported to mapped with Datahub dataset entity."
if destination_details.platform is None:
destination_details.platform = (
self.config.fivetran_log_config.destination_platform
)
if destination_details.database is None:
destination_details.database = self.audit_log.fivetran_log_database

if len(connector.lineage) >= MAX_TABLE_LINEAGE_PER_CONNECTOR:
self.report.warning(
Expand All @@ -117,22 +119,22 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None:

for lineage in connector.lineage:
input_dataset_urn = DatasetUrn.create_from_ids(
platform_id=source_platform,
platform_id=source_details.platform,
table_name=(
f"{source_database.lower()}.{lineage.source_table}"
if source_database
f"{source_details.database.lower()}.{lineage.source_table}"
if source_details.database
else lineage.source_table
),
env=source_platform_detail.env,
platform_instance=source_platform_detail.platform_instance,
env=source_details.env,
platform_instance=source_details.platform_instance,
)
input_dataset_urn_list.append(input_dataset_urn)

output_dataset_urn = DatasetUrn.create_from_ids(
platform_id=self.config.fivetran_log_config.destination_platform,
table_name=f"{self.audit_log.fivetran_log_database.lower()}.{lineage.destination_table}",
env=destination_platform_detail.env,
platform_instance=destination_platform_detail.platform_instance,
platform_id=destination_details.platform,
table_name=f"{destination_details.database.lower()}.{lineage.destination_table}",
env=destination_details.env,
platform_instance=destination_details.platform_instance,
)
output_dataset_urn_list.append(output_dataset_urn)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from typing import List

# Safeguards to prevent fetching massive amounts of data.
MAX_TABLE_LINEAGE_PER_CONNECTOR = 100
MAX_COLUMN_LINEAGE_PER_CONNECTOR = 3000
MAX_JOBS_PER_CONNECTOR = 1000
MAX_TABLE_LINEAGE_PER_CONNECTOR = 50
MAX_COLUMN_LINEAGE_PER_CONNECTOR = 500
MAX_JOBS_PER_CONNECTOR = 500


class FivetranLogQuery:
Expand Down
41 changes: 35 additions & 6 deletions metadata-ingestion/tests/integration/fivetran/test_fivetran.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from datahub.ingestion.source.fivetran.config import (
BigQueryDestinationConfig,
FivetranSourceConfig,
PlatformDetail,
SnowflakeDestinationConfig,
)
from datahub.ingestion.source.fivetran.fivetran import FivetranSource
Expand Down Expand Up @@ -180,11 +181,9 @@ def test_fivetran_with_snowflake_dest(pytestconfig, tmp_path):
"interval_unconstitutional",
]
},
"sources_to_database": {
"calendar_elected": "postgres_db",
},
"sources_to_platform_instance": {
"calendar_elected": {
"database": "postgres_db",
"env": "DEV",
}
},
Expand Down Expand Up @@ -271,12 +270,11 @@ def test_fivetran_with_snowflake_dest_and_null_connector_user(pytestconfig, tmp_
"interval_unconstitutional",
]
},
"sources_to_database": {
"calendar_elected": "postgres_db",
},
"sources_to_platform_instance": {
"calendar_elected": {
"platform": "postgres",
"env": "DEV",
"database": "postgres_db",
}
},
},
Expand Down Expand Up @@ -374,3 +372,34 @@ def test_rename_destination_config():
match="destination_config is deprecated, please use snowflake_destination_config instead.",
):
FivetranSourceConfig.parse_obj(config_dict)


def test_compat_sources_to_database() -> None:
config_dict = {
# We just need a valid fivetran_log_config to test the compat transformation.
"fivetran_log_config": {
"destination_platform": "snowflake",
"snowflake_destination_config": {
"account_id": "testid",
"warehouse": "test_wh",
"username": "test",
"password": "test@123",
"database": "test_database",
"role": "testrole",
"log_schema": "test",
},
},
"sources_to_database": {"calendar_elected": "my_db", "connector_2": "my_db_2"},
"sources_to_platform_instance": {"calendar_elected": {"env": "DEV"}},
}

with pytest.warns(
ConfigurationWarning,
match=r"sources_to_database.*deprecated",
):
config = FivetranSourceConfig.parse_obj(config_dict)

assert config.sources_to_platform_instance == {
"calendar_elected": PlatformDetail(env="DEV", database="my_db"),
"connector_2": PlatformDetail(database="my_db_2"),
}
Loading