diff --git a/metadata-ingestion/src/datahub/ingestion/source/elastic_search.py b/metadata-ingestion/src/datahub/ingestion/source/elastic_search.py index a5a195e05a635..8bda5db9a379a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/elastic_search.py +++ b/metadata-ingestion/src/datahub/ingestion/source/elastic_search.py @@ -5,7 +5,7 @@ from collections import defaultdict from dataclasses import dataclass, field from hashlib import md5 -from typing import Any, Dict, Generator, Iterable, List, Optional, Tuple, Type +from typing import Any, Dict, Generator, Iterable, List, Optional, Tuple, Type, Union from elasticsearch import Elasticsearch from pydantic import validator @@ -249,6 +249,10 @@ class ElasticsearchSourceConfig(PlatformInstanceConfigMixin, EnvConfigMixin): password: Optional[str] = Field( default=None, description="The password credential." ) + api_key: Optional[Union[Any, str]] = Field( + default=None, + description="API Key authentication. Accepts either a list with id and api_key (UTF-8 representation), or a base64 encoded string of id and api_key combined by ':'.", + ) use_ssl: bool = Field( default=False, description="Whether to use SSL for the connection or not." @@ -346,6 +350,7 @@ def __init__(self, config: ElasticsearchSourceConfig, ctx: PipelineContext): self.client = Elasticsearch( self.source_config.host, http_auth=self.source_config.http_auth, + api_key=self.source_config.api_key, use_ssl=self.source_config.use_ssl, verify_certs=self.source_config.verify_certs, ca_certs=self.source_config.ca_certs, diff --git a/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py b/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py index ae5d83c2dfc94..c358bee6daae8 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py +++ b/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py @@ -161,7 +161,7 @@ def _resolve_schema_info(self, urn: str) -> Optional[SchemaInfo]: def add_schema_metadata( self, urn: str, schema_metadata: SchemaMetadataClass ) -> None: - schema_info = self._convert_schema_aspect_to_info(schema_metadata) + schema_info = _convert_schema_aspect_to_info(schema_metadata) self._save_to_cache(urn, schema_info) def add_raw_schema_info(self, urn: str, schema_info: SchemaInfo) -> None: @@ -178,7 +178,7 @@ def with_temp_tables( ) -> SchemaResolverInterface: extra_schemas = { urn: ( - self._convert_schema_field_list_to_info(fields) + _convert_schema_field_list_to_info(fields) if fields is not None else None ) @@ -197,28 +197,7 @@ def _fetch_schema_info(self, graph: DataHubGraph, urn: str) -> Optional[SchemaIn if not aspect: return None - return self._convert_schema_aspect_to_info(aspect) - - @classmethod - def _convert_schema_aspect_to_info( - cls, schema_metadata: SchemaMetadataClass - ) -> SchemaInfo: - return cls._convert_schema_field_list_to_info(schema_metadata.fields) - - @classmethod - def _convert_schema_field_list_to_info( - cls, schema_fields: List[SchemaFieldClass] - ) -> SchemaInfo: - return { - get_simple_field_path_from_v2_field_path(col.fieldPath): ( - # The actual types are more of a "nice to have". - col.nativeDataType - or "str" - ) - for col in schema_fields - # TODO: We can't generate lineage to columns nested within structs yet. - if "." not in get_simple_field_path_from_v2_field_path(col.fieldPath) - } + return _convert_schema_aspect_to_info(aspect) @classmethod def convert_graphql_schema_metadata_to_info( @@ -262,3 +241,36 @@ def resolve_table(self, table: _TableName) -> Tuple[str, Optional[SchemaInfo]]: if urn in self._extra_schemas: return urn, self._extra_schemas[urn] return self._base_resolver.resolve_table(table) + + def add_temp_tables( + self, temp_tables: Dict[str, Optional[List[SchemaFieldClass]]] + ) -> None: + self._extra_schemas.update( + { + urn: ( + _convert_schema_field_list_to_info(fields) + if fields is not None + else None + ) + for urn, fields in temp_tables.items() + } + ) + + +def _convert_schema_field_list_to_info( + schema_fields: List[SchemaFieldClass], +) -> SchemaInfo: + return { + get_simple_field_path_from_v2_field_path(col.fieldPath): ( + # The actual types are more of a "nice to have". + col.nativeDataType + or "str" + ) + for col in schema_fields + # TODO: We can't generate lineage to columns nested within structs yet. + if "." not in get_simple_field_path_from_v2_field_path(col.fieldPath) + } + + +def _convert_schema_aspect_to_info(schema_metadata: SchemaMetadataClass) -> SchemaInfo: + return _convert_schema_field_list_to_info(schema_metadata.fields) diff --git a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py index f5908753affde..52934f9f72a70 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py @@ -32,7 +32,11 @@ QueryUrn, SchemaFieldUrn, ) -from datahub.sql_parsing.schema_resolver import SchemaResolver, SchemaResolverInterface +from datahub.sql_parsing.schema_resolver import ( + SchemaResolver, + SchemaResolverInterface, + _SchemaResolverWithExtras, +) from datahub.sql_parsing.sql_parsing_common import QueryType, QueryTypeProps from datahub.sql_parsing.sqlglot_lineage import ( ColumnLineageInfo, @@ -367,6 +371,11 @@ def __init__( graph=graph, ) ) + # Schema resolver for special case (_MISSING_SESSION_ID) + # This is particularly useful for via temp table lineage if session id is not available. + self._missing_session_schema_resolver = _SchemaResolverWithExtras( + base_resolver=self._schema_resolver, extra_schemas={} + ) # Initialize internal data structures. # This leans pretty heavily on the our query fingerprinting capabilities. @@ -864,6 +873,12 @@ def add_preparsed_query( out_table ] = query_fingerprint + # Also update schema resolver for missing session id + if parsed.session_id == _MISSING_SESSION_ID and parsed.inferred_schema: + self._missing_session_schema_resolver.add_temp_tables( + {out_table: parsed.inferred_schema} + ) + else: # Non-temp tables immediately generate lineage. self._lineage_map.for_mutation(out_table, OrderedSet()).add( @@ -897,7 +912,9 @@ def _make_schema_resolver_for_session( self, session_id: str ) -> SchemaResolverInterface: schema_resolver: SchemaResolverInterface = self._schema_resolver - if session_id in self._temp_lineage_map: + if session_id == _MISSING_SESSION_ID: + schema_resolver = self._missing_session_schema_resolver + elif session_id in self._temp_lineage_map: temp_table_schemas: Dict[str, Optional[List[models.SchemaFieldClass]]] = {} for temp_table_urn, query_id in self._temp_lineage_map[session_id].items(): temp_table_schemas[temp_table_urn] = self._inferred_temp_schemas.get(