Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Sep 30, 2024
2 parents 6fa1a2a + 0187fc6 commit d5814ce
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from collections import defaultdict
from dataclasses import dataclass, field
from hashlib import md5
from typing import Any, Dict, Generator, Iterable, List, Optional, Tuple, Type
from typing import Any, Dict, Generator, Iterable, List, Optional, Tuple, Type, Union

from elasticsearch import Elasticsearch
from pydantic import validator
Expand Down Expand Up @@ -249,6 +249,10 @@ class ElasticsearchSourceConfig(PlatformInstanceConfigMixin, EnvConfigMixin):
password: Optional[str] = Field(
default=None, description="The password credential."
)
api_key: Optional[Union[Any, str]] = Field(
default=None,
description="API Key authentication. Accepts either a list with id and api_key (UTF-8 representation), or a base64 encoded string of id and api_key combined by ':'.",
)

use_ssl: bool = Field(
default=False, description="Whether to use SSL for the connection or not."
Expand Down Expand Up @@ -346,6 +350,7 @@ def __init__(self, config: ElasticsearchSourceConfig, ctx: PipelineContext):
self.client = Elasticsearch(
self.source_config.host,
http_auth=self.source_config.http_auth,
api_key=self.source_config.api_key,
use_ssl=self.source_config.use_ssl,
verify_certs=self.source_config.verify_certs,
ca_certs=self.source_config.ca_certs,
Expand Down
60 changes: 36 additions & 24 deletions metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def _resolve_schema_info(self, urn: str) -> Optional[SchemaInfo]:
def add_schema_metadata(
self, urn: str, schema_metadata: SchemaMetadataClass
) -> None:
schema_info = self._convert_schema_aspect_to_info(schema_metadata)
schema_info = _convert_schema_aspect_to_info(schema_metadata)
self._save_to_cache(urn, schema_info)

def add_raw_schema_info(self, urn: str, schema_info: SchemaInfo) -> None:
Expand All @@ -178,7 +178,7 @@ def with_temp_tables(
) -> SchemaResolverInterface:
extra_schemas = {
urn: (
self._convert_schema_field_list_to_info(fields)
_convert_schema_field_list_to_info(fields)
if fields is not None
else None
)
Expand All @@ -197,28 +197,7 @@ def _fetch_schema_info(self, graph: DataHubGraph, urn: str) -> Optional[SchemaIn
if not aspect:
return None

return self._convert_schema_aspect_to_info(aspect)

@classmethod
def _convert_schema_aspect_to_info(
cls, schema_metadata: SchemaMetadataClass
) -> SchemaInfo:
return cls._convert_schema_field_list_to_info(schema_metadata.fields)

@classmethod
def _convert_schema_field_list_to_info(
cls, schema_fields: List[SchemaFieldClass]
) -> SchemaInfo:
return {
get_simple_field_path_from_v2_field_path(col.fieldPath): (
# The actual types are more of a "nice to have".
col.nativeDataType
or "str"
)
for col in schema_fields
# TODO: We can't generate lineage to columns nested within structs yet.
if "." not in get_simple_field_path_from_v2_field_path(col.fieldPath)
}
return _convert_schema_aspect_to_info(aspect)

@classmethod
def convert_graphql_schema_metadata_to_info(
Expand Down Expand Up @@ -262,3 +241,36 @@ def resolve_table(self, table: _TableName) -> Tuple[str, Optional[SchemaInfo]]:
if urn in self._extra_schemas:
return urn, self._extra_schemas[urn]
return self._base_resolver.resolve_table(table)

def add_temp_tables(
self, temp_tables: Dict[str, Optional[List[SchemaFieldClass]]]
) -> None:
self._extra_schemas.update(
{
urn: (
_convert_schema_field_list_to_info(fields)
if fields is not None
else None
)
for urn, fields in temp_tables.items()
}
)


def _convert_schema_field_list_to_info(
schema_fields: List[SchemaFieldClass],
) -> SchemaInfo:
return {
get_simple_field_path_from_v2_field_path(col.fieldPath): (
# The actual types are more of a "nice to have".
col.nativeDataType
or "str"
)
for col in schema_fields
# TODO: We can't generate lineage to columns nested within structs yet.
if "." not in get_simple_field_path_from_v2_field_path(col.fieldPath)
}


def _convert_schema_aspect_to_info(schema_metadata: SchemaMetadataClass) -> SchemaInfo:
return _convert_schema_field_list_to_info(schema_metadata.fields)
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@
QueryUrn,
SchemaFieldUrn,
)
from datahub.sql_parsing.schema_resolver import SchemaResolver, SchemaResolverInterface
from datahub.sql_parsing.schema_resolver import (
SchemaResolver,
SchemaResolverInterface,
_SchemaResolverWithExtras,
)
from datahub.sql_parsing.sql_parsing_common import QueryType, QueryTypeProps
from datahub.sql_parsing.sqlglot_lineage import (
ColumnLineageInfo,
Expand Down Expand Up @@ -367,6 +371,11 @@ def __init__(
graph=graph,
)
)
# Schema resolver for special case (_MISSING_SESSION_ID)
# This is particularly useful for via temp table lineage if session id is not available.
self._missing_session_schema_resolver = _SchemaResolverWithExtras(
base_resolver=self._schema_resolver, extra_schemas={}
)

# Initialize internal data structures.
# This leans pretty heavily on the our query fingerprinting capabilities.
Expand Down Expand Up @@ -864,6 +873,12 @@ def add_preparsed_query(
out_table
] = query_fingerprint

# Also update schema resolver for missing session id
if parsed.session_id == _MISSING_SESSION_ID and parsed.inferred_schema:
self._missing_session_schema_resolver.add_temp_tables(
{out_table: parsed.inferred_schema}
)

else:
# Non-temp tables immediately generate lineage.
self._lineage_map.for_mutation(out_table, OrderedSet()).add(
Expand Down Expand Up @@ -897,7 +912,9 @@ def _make_schema_resolver_for_session(
self, session_id: str
) -> SchemaResolverInterface:
schema_resolver: SchemaResolverInterface = self._schema_resolver
if session_id in self._temp_lineage_map:
if session_id == _MISSING_SESSION_ID:
schema_resolver = self._missing_session_schema_resolver
elif session_id in self._temp_lineage_map:
temp_table_schemas: Dict[str, Optional[List[models.SchemaFieldClass]]] = {}
for temp_table_urn, query_id in self._temp_lineage_map[session_id].items():
temp_table_schemas[temp_table_urn] = self._inferred_temp_schemas.get(
Expand Down

0 comments on commit d5814ce

Please sign in to comment.