From d601c6cbdb336c473855de934981d21c53f9f66f Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 17 Oct 2024 15:40:17 -0700 Subject: [PATCH] fix bugs --- .../ingestion/source/redshift/lineage.py | 2 +- .../ingestion/source/tableau/tableau.py | 2 +- .../datahub/sql_parsing/sqlglot_lineage.py | 24 +++++++++++++++---- .../tableau/test_tableau_ingest.py | 17 ++++++++----- 4 files changed, 33 insertions(+), 12 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py index 0e4cb6f1599e5..fe491ccb31850 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py @@ -133,7 +133,7 @@ def parse_alter_table_rename(default_schema: str, query: str) -> Tuple[str, str, assert isinstance(parsed_query, sqlglot.exp.Alter) prev_name = parsed_query.this.name rename_clause = parsed_query.args["actions"][0] - assert isinstance(rename_clause, sqlglot.exp.RenameTable) + assert isinstance(rename_clause, sqlglot.exp.AlterRename) new_name = rename_clause.this.name schema = parsed_query.this.db or default_schema diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py index 9f011790990ec..2c17a5a322f05 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py @@ -2131,7 +2131,7 @@ def _create_lineage_from_unsupported_csql( fine_grained_lineages: List[FineGrainedLineage] = [] if self.config.extract_column_level_lineage: - logger.info("Extracting CLL from custom sql") + logger.debug("Extracting CLL from custom sql") fine_grained_lineages = make_fine_grained_lineage_class( parsed_result, csql_urn, out_columns ) diff --git a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py index e732253542a1d..6a7ff5be6d1ea 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py @@ -64,13 +64,29 @@ SQL_LINEAGE_TIMEOUT_SECONDS = 10 +# These rules are a subset of the rules in sqlglot.optimizer.optimizer.RULES. +# If there's a change in their rules, we probably need to re-evaluate our list as well. +assert len(sqlglot.optimizer.optimizer.RULES) == 14 + _OPTIMIZE_RULES = ( - sqlglot.optimizer.qualify.qualify, + sqlglot.optimizer.optimizer.qualify, # We need to enable this in order for annotate types to work. - sqlglot.optimizer.unnest_subqueries.unnest_subqueries, - sqlglot.optimizer.qualify_columns.quote_identifiers, - # TODO: If annotate types becomes more stable, we can add it here directly. + sqlglot.optimizer.optimizer.pushdown_projections, + # sqlglot.optimizer.optimizer.normalize, # causes perf issues + sqlglot.optimizer.optimizer.unnest_subqueries, + # sqlglot.optimizer.optimizer.pushdown_predicates, # causes perf issues + # sqlglot.optimizer.optimizer.optimize_joins, + # sqlglot.optimizer.optimizer.eliminate_subqueries, + # sqlglot.optimizer.optimizer.merge_subqueries, + # sqlglot.optimizer.optimizer.eliminate_joins, + # sqlglot.optimizer.optimizer.eliminate_ctes, + sqlglot.optimizer.optimizer.quote_identifiers, + # These three are run separately or not run at all. + # sqlglot.optimizer.optimizer.annotate_types, + # sqlglot.optimizer.canonicalize.canonicalize, + # sqlglot.optimizer.simplify.simplify, ) + _DEBUG_TYPE_ANNOTATIONS = False diff --git a/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py b/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py index 5a5552a78c56f..3798df07000c8 100644 --- a/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py +++ b/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py @@ -19,6 +19,7 @@ from datahub.configuration.source_common import DEFAULT_ENV from datahub.emitter.mce_builder import make_schema_field_urn +from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.run.pipeline import Pipeline, PipelineContext from datahub.ingestion.source.tableau.tableau import ( TableauConfig, @@ -37,7 +38,7 @@ FineGrainedLineageUpstreamType, UpstreamLineage, ) -from datahub.metadata.schema_classes import MetadataChangeProposalClass, UpstreamClass +from datahub.metadata.schema_classes import UpstreamClass from tests.test_helpers import mce_helpers, test_connection_helpers from tests.test_helpers.state_helpers import ( get_current_checkpoint_from_pipeline, @@ -939,11 +940,12 @@ def test_tableau_unsupported_csql(): database_override_map={"production database": "prod"} ) - def test_lineage_metadata( + def check_lineage_metadata( lineage, expected_entity_urn, expected_upstream_table, expected_cll ): - mcp = cast(MetadataChangeProposalClass, next(iter(lineage)).metadata) - assert mcp.aspect == UpstreamLineage( + mcp = cast(MetadataChangeProposalWrapper, list(lineage)[0].metadata) + + expected = UpstreamLineage( upstreams=[ UpstreamClass( dataset=expected_upstream_table, @@ -966,6 +968,9 @@ def test_lineage_metadata( ) assert mcp.entityUrn == expected_entity_urn + actual_aspect = mcp.aspect + assert actual_aspect == expected + csql_urn = "urn:li:dataset:(urn:li:dataPlatform:tableau,09988088-05ad-173c-a2f1-f33ba3a13d1a,PROD)" expected_upstream_table = "urn:li:dataset:(urn:li:dataPlatform:bigquery,my_bigquery_project.invent_dw.UserDetail,PROD)" expected_cll = { @@ -996,7 +1001,7 @@ def test_lineage_metadata( }, out_columns=[], ) - test_lineage_metadata( + check_lineage_metadata( lineage=lineage, expected_entity_urn=csql_urn, expected_upstream_table=expected_upstream_table, @@ -1014,7 +1019,7 @@ def test_lineage_metadata( }, out_columns=[], ) - test_lineage_metadata( + check_lineage_metadata( lineage=lineage, expected_entity_urn=csql_urn, expected_upstream_table=expected_upstream_table,