diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index 8911d282f86bb..db97f7aa81d7b 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -33,6 +33,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe If stateful ingestion is enabled, running ingestion with the latest CLI version will handle the cleanup automatically. Otherwise, we recommend soft deleting all powerbi data via the DataHub CLI: `datahub delete --platform powerbi --soft` and then re-ingest with the latest CLI version, ensuring the `include_workspace_name_in_dataset_urn` configuration is set to true. +- #11701: The Fivetran `sources_to_database` field is deprecated in favor of setting directly within `sources_to_platform_instance..database`. + ### Breaking Changes - #11486 - Deprecated Criterion filters using `value`. Use `values` instead. This also deprecates the ability to use comma delimited string to represent multiple values using `value`. diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py index 394015500d1c5..1e15f6b395ca5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py @@ -1,12 +1,17 @@ import dataclasses import logging +import warnings from typing import Dict, Optional import pydantic from pydantic import Field, root_validator from typing_extensions import Literal -from datahub.configuration.common import AllowDenyPattern, ConfigModel +from datahub.configuration.common import ( + AllowDenyPattern, + ConfigModel, + ConfigurationWarning, +) from datahub.configuration.source_common import DEFAULT_ENV, DatasetSourceConfigMixin from datahub.configuration.validate_field_rename import pydantic_renamed_field from datahub.ingestion.api.report import Report @@ -144,6 +149,10 @@ def report_connectors_dropped(self, model: str) -> None: class PlatformDetail(ConfigModel): + platform: Optional[str] = pydantic.Field( + default=None, + description="Override the platform type detection.", + ) platform_instance: Optional[str] = pydantic.Field( default=None, description="The instance of the platform that all assets produced by this recipe belong to", @@ -152,6 +161,11 @@ class PlatformDetail(ConfigModel): default=DEFAULT_ENV, description="The environment that all assets produced by DataHub platform ingestion source belong to", ) + database: Optional[str] = pydantic.Field( + default=None, + description="The database that all assets produced by this connector belong to. " + "For destinations, this defaults to the fivetran log config's database.", + ) class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin): @@ -172,24 +186,42 @@ class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin default=True, description="Populates table->table column lineage.", ) - sources_to_database: Dict[str, str] = pydantic.Field( - default={}, - description="A mapping of the connector's all sources to its database. Use connector id as key.", - ) + # Configuration for stateful ingestion stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = pydantic.Field( default=None, description="Airbyte Stateful Ingestion Config." ) + # Fivetran connector all sources to platform instance mapping sources_to_platform_instance: Dict[str, PlatformDetail] = pydantic.Field( default={}, - description="A mapping of the connector's all sources dataset to platform instance. Use connector id as key.", + description="A mapping from connector id to its platform/instance/env/database details.", ) # Fivetran destination to platform instance mapping destination_to_platform_instance: Dict[str, PlatformDetail] = pydantic.Field( default={}, - description="A mapping of destination dataset to platform instance. Use destination id as key.", + description="A mapping of destination id to its platform/instance/env details.", ) + + @pydantic.root_validator(pre=True) + def compat_sources_to_database(cls, values: Dict) -> Dict: + if "sources_to_database" in values: + warnings.warn( + "The sources_to_database field is deprecated, please use sources_to_platform_instance instead.", + ConfigurationWarning, + stacklevel=2, + ) + mapping = values.pop("sources_to_database") + + values.setdefault("sources_to_platform_instance", {}) + for key, value in mapping.items(): + values["sources_to_platform_instance"].setdefault(key, {}) + values["sources_to_platform_instance"][key].setdefault( + "database", value + ) + + return values + history_sync_lookback_period: int = pydantic.Field( 7, description="The number of days to look back when extracting connectors' sync history.", diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index c27ec57c2e99e..907bfa3a167aa 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -81,31 +81,33 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None: output_dataset_urn_list: List[DatasetUrn] = [] fine_grained_lineage: List[FineGrainedLineage] = [] - source_platform_detail: PlatformDetail = PlatformDetail() - destination_platform_detail: PlatformDetail = PlatformDetail() + # TODO: Once Fivetran exposes the database via the API, we shouldn't ask for it via config. + # Get platform details for connector source - source_platform_detail = self.config.sources_to_platform_instance.get( + source_details = self.config.sources_to_platform_instance.get( connector.connector_id, PlatformDetail() ) + if source_details.platform is None: + if connector.connector_type in KNOWN_DATA_PLATFORM_MAPPING: + source_details.platform = KNOWN_DATA_PLATFORM_MAPPING[ + connector.connector_type + ] + else: + logger.info( + f"Fivetran connector source type: {connector.connector_type} is not supported to mapped with Datahub dataset entity." + ) + source_details.platform = connector.connector_type # Get platform details for destination - destination_platform_detail = self.config.destination_to_platform_instance.get( + destination_details = self.config.destination_to_platform_instance.get( connector.destination_id, PlatformDetail() ) - - # Get database for connector source - # TODO: Once Fivetran exposes this, we shouldn't ask for it via config. - source_database: Optional[str] = self.config.sources_to_database.get( - connector.connector_id - ) - - if connector.connector_type in KNOWN_DATA_PLATFORM_MAPPING: - source_platform = KNOWN_DATA_PLATFORM_MAPPING[connector.connector_type] - else: - source_platform = connector.connector_type - logger.info( - f"Fivetran connector source type: {connector.connector_type} is not supported to mapped with Datahub dataset entity." + if destination_details.platform is None: + destination_details.platform = ( + self.config.fivetran_log_config.destination_platform ) + if destination_details.database is None: + destination_details.database = self.audit_log.fivetran_log_database if len(connector.lineage) >= MAX_TABLE_LINEAGE_PER_CONNECTOR: self.report.warning( @@ -117,22 +119,22 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None: for lineage in connector.lineage: input_dataset_urn = DatasetUrn.create_from_ids( - platform_id=source_platform, + platform_id=source_details.platform, table_name=( - f"{source_database.lower()}.{lineage.source_table}" - if source_database + f"{source_details.database.lower()}.{lineage.source_table}" + if source_details.database else lineage.source_table ), - env=source_platform_detail.env, - platform_instance=source_platform_detail.platform_instance, + env=source_details.env, + platform_instance=source_details.platform_instance, ) input_dataset_urn_list.append(input_dataset_urn) output_dataset_urn = DatasetUrn.create_from_ids( - platform_id=self.config.fivetran_log_config.destination_platform, - table_name=f"{self.audit_log.fivetran_log_database.lower()}.{lineage.destination_table}", - env=destination_platform_detail.env, - platform_instance=destination_platform_detail.platform_instance, + platform_id=destination_details.platform, + table_name=f"{destination_details.database.lower()}.{lineage.destination_table}", + env=destination_details.env, + platform_instance=destination_details.platform_instance, ) output_dataset_urn_list.append(output_dataset_urn) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py index 34dd252ec72b7..39c4d7712b4fc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py @@ -1,9 +1,9 @@ from typing import List # Safeguards to prevent fetching massive amounts of data. -MAX_TABLE_LINEAGE_PER_CONNECTOR = 100 -MAX_COLUMN_LINEAGE_PER_CONNECTOR = 3000 -MAX_JOBS_PER_CONNECTOR = 1000 +MAX_TABLE_LINEAGE_PER_CONNECTOR = 50 +MAX_COLUMN_LINEAGE_PER_CONNECTOR = 500 +MAX_JOBS_PER_CONNECTOR = 500 class FivetranLogQuery: diff --git a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py index f49f499fe43b4..2e6c2b1370d16 100644 --- a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py +++ b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py @@ -13,6 +13,7 @@ from datahub.ingestion.source.fivetran.config import ( BigQueryDestinationConfig, FivetranSourceConfig, + PlatformDetail, SnowflakeDestinationConfig, ) from datahub.ingestion.source.fivetran.fivetran import FivetranSource @@ -180,11 +181,9 @@ def test_fivetran_with_snowflake_dest(pytestconfig, tmp_path): "interval_unconstitutional", ] }, - "sources_to_database": { - "calendar_elected": "postgres_db", - }, "sources_to_platform_instance": { "calendar_elected": { + "database": "postgres_db", "env": "DEV", } }, @@ -271,12 +270,11 @@ def test_fivetran_with_snowflake_dest_and_null_connector_user(pytestconfig, tmp_ "interval_unconstitutional", ] }, - "sources_to_database": { - "calendar_elected": "postgres_db", - }, "sources_to_platform_instance": { "calendar_elected": { + "platform": "postgres", "env": "DEV", + "database": "postgres_db", } }, }, @@ -374,3 +372,34 @@ def test_rename_destination_config(): match="destination_config is deprecated, please use snowflake_destination_config instead.", ): FivetranSourceConfig.parse_obj(config_dict) + + +def test_compat_sources_to_database() -> None: + config_dict = { + # We just need a valid fivetran_log_config to test the compat transformation. + "fivetran_log_config": { + "destination_platform": "snowflake", + "snowflake_destination_config": { + "account_id": "testid", + "warehouse": "test_wh", + "username": "test", + "password": "test@123", + "database": "test_database", + "role": "testrole", + "log_schema": "test", + }, + }, + "sources_to_database": {"calendar_elected": "my_db", "connector_2": "my_db_2"}, + "sources_to_platform_instance": {"calendar_elected": {"env": "DEV"}}, + } + + with pytest.warns( + ConfigurationWarning, + match=r"sources_to_database.*deprecated", + ): + config = FivetranSourceConfig.parse_obj(config_dict) + + assert config.sources_to_platform_instance == { + "calendar_elected": PlatformDetail(env="DEV", database="my_db"), + "connector_2": PlatformDetail(database="my_db_2"), + }