Skip to content

Commit

Permalink
fix bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 committed Oct 17, 2024
1 parent 41ab402 commit d601c6c
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def parse_alter_table_rename(default_schema: str, query: str) -> Tuple[str, str,
assert isinstance(parsed_query, sqlglot.exp.Alter)
prev_name = parsed_query.this.name
rename_clause = parsed_query.args["actions"][0]
assert isinstance(rename_clause, sqlglot.exp.RenameTable)
assert isinstance(rename_clause, sqlglot.exp.AlterRename)
new_name = rename_clause.this.name

schema = parsed_query.this.db or default_schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2131,7 +2131,7 @@ def _create_lineage_from_unsupported_csql(

fine_grained_lineages: List[FineGrainedLineage] = []
if self.config.extract_column_level_lineage:
logger.info("Extracting CLL from custom sql")
logger.debug("Extracting CLL from custom sql")
fine_grained_lineages = make_fine_grained_lineage_class(
parsed_result, csql_urn, out_columns
)
Expand Down
24 changes: 20 additions & 4 deletions metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,29 @@
SQL_LINEAGE_TIMEOUT_SECONDS = 10


# These rules are a subset of the rules in sqlglot.optimizer.optimizer.RULES.
# If there's a change in their rules, we probably need to re-evaluate our list as well.
assert len(sqlglot.optimizer.optimizer.RULES) == 14

_OPTIMIZE_RULES = (
sqlglot.optimizer.qualify.qualify,
sqlglot.optimizer.optimizer.qualify,
# We need to enable this in order for annotate types to work.
sqlglot.optimizer.unnest_subqueries.unnest_subqueries,
sqlglot.optimizer.qualify_columns.quote_identifiers,
# TODO: If annotate types becomes more stable, we can add it here directly.
sqlglot.optimizer.optimizer.pushdown_projections,
# sqlglot.optimizer.optimizer.normalize, # causes perf issues
sqlglot.optimizer.optimizer.unnest_subqueries,
# sqlglot.optimizer.optimizer.pushdown_predicates, # causes perf issues
# sqlglot.optimizer.optimizer.optimize_joins,
# sqlglot.optimizer.optimizer.eliminate_subqueries,
# sqlglot.optimizer.optimizer.merge_subqueries,
# sqlglot.optimizer.optimizer.eliminate_joins,
# sqlglot.optimizer.optimizer.eliminate_ctes,
sqlglot.optimizer.optimizer.quote_identifiers,
# These three are run separately or not run at all.
# sqlglot.optimizer.optimizer.annotate_types,
# sqlglot.optimizer.canonicalize.canonicalize,
# sqlglot.optimizer.simplify.simplify,
)

_DEBUG_TYPE_ANNOTATIONS = False


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from datahub.configuration.source_common import DEFAULT_ENV
from datahub.emitter.mce_builder import make_schema_field_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.run.pipeline import Pipeline, PipelineContext
from datahub.ingestion.source.tableau.tableau import (
TableauConfig,
Expand All @@ -37,7 +38,7 @@
FineGrainedLineageUpstreamType,
UpstreamLineage,
)
from datahub.metadata.schema_classes import MetadataChangeProposalClass, UpstreamClass
from datahub.metadata.schema_classes import UpstreamClass
from tests.test_helpers import mce_helpers, test_connection_helpers
from tests.test_helpers.state_helpers import (
get_current_checkpoint_from_pipeline,
Expand Down Expand Up @@ -939,11 +940,12 @@ def test_tableau_unsupported_csql():
database_override_map={"production database": "prod"}
)

def test_lineage_metadata(
def check_lineage_metadata(
lineage, expected_entity_urn, expected_upstream_table, expected_cll
):
mcp = cast(MetadataChangeProposalClass, next(iter(lineage)).metadata)
assert mcp.aspect == UpstreamLineage(
mcp = cast(MetadataChangeProposalWrapper, list(lineage)[0].metadata)

expected = UpstreamLineage(
upstreams=[
UpstreamClass(
dataset=expected_upstream_table,
Expand All @@ -966,6 +968,9 @@ def test_lineage_metadata(
)
assert mcp.entityUrn == expected_entity_urn

actual_aspect = mcp.aspect
assert actual_aspect == expected

csql_urn = "urn:li:dataset:(urn:li:dataPlatform:tableau,09988088-05ad-173c-a2f1-f33ba3a13d1a,PROD)"
expected_upstream_table = "urn:li:dataset:(urn:li:dataPlatform:bigquery,my_bigquery_project.invent_dw.UserDetail,PROD)"
expected_cll = {
Expand Down Expand Up @@ -996,7 +1001,7 @@ def test_lineage_metadata(
},
out_columns=[],
)
test_lineage_metadata(
check_lineage_metadata(
lineage=lineage,
expected_entity_urn=csql_urn,
expected_upstream_table=expected_upstream_table,
Expand All @@ -1014,7 +1019,7 @@ def test_lineage_metadata(
},
out_columns=[],
)
test_lineage_metadata(
check_lineage_metadata(
lineage=lineage,
expected_entity_urn=csql_urn,
expected_upstream_table=expected_upstream_table,
Expand Down

0 comments on commit d601c6c

Please sign in to comment.